Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 187 additions & 53 deletions tool/extract_deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@
Support npm, yarn classic, yarn berry, pnpm
"""

import re
import os
import subprocess
import json
import logging
import sys
import shutil
from collections import defaultdict
import json
import hashlib
from pathlib import Path
import yaml

from tool.tool_config import PNPM_LIST_COMMAND, get_cache_manager, YarnLockParser
from tool.tool_config import PNPM_LIST_COMMAND, get_cache_manager, YarnLockParser, get_package_url, get_registry_url

Check failure on line 18 in tool/extract_deps.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (I001)

tool/extract_deps.py:5:1: I001 Import block is un-sorted or un-formatted

cache_manager = get_cache_manager()

Expand All @@ -27,6 +27,52 @@
RESOLVE_PLUGINS_LOG = "/tmp/plugins.log"


def build_tree_structure_with_links(paths, package_manager):
tree = {}
for path in paths:
current_level = tree
for node in path[:-1]:
label = f"[{node}]({get_package_url(node, package_manager)})"
if label not in current_level:
current_level[label] = {}
current_level = current_level[label]
return tree


def format_tree_as_text(tree, target_package, package_manager, indent="", is_last_child=True):
if not tree:
return f"{indent}└── [{target_package}]({get_package_url(target_package, package_manager)})"

lines = []
items = list(tree.items())
for i, (label, subtree) in enumerate(items):
is_last = i == len(items) - 1
connector = "└──" if is_last else "├──"
lines.append(f"{indent}{connector} {label}")
child_indent = indent + (" " if is_last else "│ ")

if not subtree:
lines.append(f"{child_indent}└── [{target_package}]({get_package_url(target_package, package_manager)})")
else:
child_lines = format_tree_as_text(subtree, target_package, package_manager, child_indent, is_last)
lines.extend(child_lines if isinstance(child_lines, list) else [child_lines])
return lines


def format_paths_for_markdown(paths, target_package, package_manager):
if not paths:
return ""

tree = build_tree_structure_with_links(paths, package_manager)
if not tree:
return f"<details><summary>1 path</summary><pre>{target_package}</pre></details>"

tree_lines = format_tree_as_text(tree, target_package, package_manager)
tree_text = "<br>".join(tree_lines)
summary_text = f"{len(paths)} path{'s' if len(paths) != 1 else ''}"
return f"<details><summary>{summary_text}</summary><pre>{tree_text}</pre></details>"


def get_lockfile_hash(lockfile_content):
"""Generate a hash of the lockfile to detect changes"""
return hashlib.sha256(str(lockfile_content).encode()).hexdigest()
Expand Down Expand Up @@ -116,79 +162,145 @@

def extract_deps_from_npm(repo_path, npm_lock_file):
"""
Extract dependencies from a "package-lock.json" file.
Extract dependencies from an npm project using npm list command.

Args:
repo_path (str): The project's source code repository.
npm_lock_file (dict): The content of the npm lock file.
repo_path (str): The project's source code repository path.
npm_lock_file (str): The npm lock file path.

Returns:
dict: A dictionary containing the extracted dependencies and patches.
"""
lock_file_json = json.loads(npm_lock_file)
lockfile_hash = get_lockfile_hash(lock_file_json)
# Generate cache key based on repo path and project info
lockfile_hash = get_lockfile_hash(npm_lock_file)
if not lockfile_hash:
logging.error("No lockfile found in %s", repo_path)
return {"resolutions": [], "patches": []}

cached_deps = cache_manager.extracted_deps_cache.get_dependencies(repo_path, lockfile_hash)
if cached_deps:
logging.info(f"Using cached dependencies for {repo_path}")
return cached_deps

try:
# If we reach here, we need to resolve dependencies
current_dir = os.getcwd()
os.chdir(repo_path)
# Run npm list to get dependency tree
logging.info("Running npm list to extract dependencies...")
result = subprocess.run(
["npm", "list", "--json", "--all", "--long", "--package-lock-only"],
cwd=repo_path,
capture_output=True,
text=True,
check=False, # Don't fail on warnings/missing peer deps
)
os.chdir(current_dir)

if result.returncode != 0 and result.returncode != 1:
# Return code 1 is common for missing peer deps, which is OK
logging.error(f"npm list failed with return code {result.returncode}")
logging.error(f"stderr: {result.stderr}")
return {"resolutions": [], "patches": [], "aliased_packages": {}}

npm_data = json.loads(result.stdout)
# Parse project name and version from npm list output
project_name = npm_data.get("name")
project_version = npm_data.get("version")

patches = []
pkg_name_with_resolution = set()
aliased_packages = {}
deps_list_data = {}
parent_packages = {} # Maps package -> set of immediate parents
dependency_tree = {} # Maps package -> complete dependency info

parent_packages = {}
if lock_file_json.get("packages") and isinstance(lock_file_json["packages"], dict):
for package_path, package_info in lock_file_json["packages"].items():
if package_path.startswith("node_modules/"):
package_name = package_path.split("/", 1)[1]
if "node_modules" in package_name:
package_name = package_name.split("node_modules/")[-1]

resolution = package_name
if package_info.get("version"):
version = package_info["version"]
# Handle npm aliases
original_name = package_info.get("name")
if original_name:
logging.warning(f"Found npm alias for {original_name}@{version}")
aliased_packages[f"{original_name}@{version}"] = package_name
package_name = original_name

resolution = f"{package_name}@{version}"
pkg_name_with_resolution.add(resolution)

if package_info.get("dependencies"):
for dep_name, version in package_info["dependencies"].items():
parent_packages.setdefault(f"{dep_name}@{version}", set()).add(resolution)

deps_list_data = {
"resolutions": list(
{
"info": info,
"parent": list(parent_packages.get(info, set())),
}
for info in sorted(pkg_name_with_resolution)
),
"patches": patches,
"aliased_packages": aliased_packages,
}
# Add root package
root_name = npm_data.get("name", project_name)
root_version = npm_data.get("version", project_version)
root_resolution = f"{root_name}@{root_version}"
pkg_name_with_resolution.add(root_resolution)

cache_manager.extracted_deps_cache.cache_dependencies(repo_path, lockfile_hash, deps_list_data)
def process_dependencies(deps_dict, parent_resolution, current_path=None):
"""Recursively process dependencies from npm list output"""
if not deps_dict:
return

return deps_list_data
if current_path is None:
current_path = [parent_resolution]

except (IOError, ValueError, KeyError) as e:
logging.error(
"An error occurred while extracting dependencies from package-lock.json: %s",
str(e),
)
for dep_name, dep_info in deps_dict.items():
if not isinstance(dep_info, dict):
continue

dep_version = dep_info.get("version")
if not dep_version:
continue

# Handle npm aliases (like "my-lodash": "npm:lodash@4.17.21")
original_name = dep_name
if dep_name != dep_info.get("name", dep_name):
real_name = dep_info.get("name", dep_name)
logging.info(f"Found npm alias: {dep_name} -> {real_name}@{dep_version}")
aliased_packages[f"{real_name}@{dep_version}"] = f"{dep_name}@{dep_version}"
original_name = real_name

dep_resolution = f"{original_name}@{dep_version}"
pkg_name_with_resolution.add(dep_resolution)

# Map this dependency to its immediate parent
parent_packages.setdefault(dep_resolution, set()).add(parent_resolution)

# Build the full path to this dependency
full_path = current_path + [dep_resolution]

# Store all paths to this dependency
if dep_resolution not in dependency_tree:
dependency_tree[dep_resolution] = {"paths": [], "immediate_parents": set()}

dependency_tree[dep_resolution]["paths"].append(full_path[:])
dependency_tree[dep_resolution]["immediate_parents"].add(parent_resolution)

return {"resolutions": [], "patches": [], "aliased_packages": []}
# Check for patches (if using patch-package or similar)
if dep_info.get("patched"):
patches.append({"info": dep_resolution})

# Recursively process nested dependencies
if dep_info.get("dependencies"):
process_dependencies(dep_info["dependencies"], dep_resolution, full_path)

# Process all dependencies starting from root
if npm_data.get("dependencies"):
process_dependencies(npm_data["dependencies"], root_resolution)

deps_list_data = {
"resolutions": list(
{
"info": info,
"parent": format_paths_for_markdown(dependency_tree.get(info, {}).get("paths", []), info, "npm"),
}
for info in sorted(pkg_name_with_resolution)
),
"patches": patches,
"aliased_packages": aliased_packages,
}

cache_manager.extracted_deps_cache.cache_dependencies(repo_path, lockfile_hash, deps_list_data)

logging.info(f"Extracted {len(pkg_name_with_resolution)} dependencies from npm list")
return deps_list_data

except subprocess.CalledProcessError as e:
os.chdir(current_dir)
logging.error(f"Error running npm list: {e}")
logging.error(f"stderr: {e.stderr}")
return {"resolutions": [], "patches": [], "aliased_packages": {}}
except json.JSONDecodeError as e:
os.chdir(current_dir)
logging.error(f"Error parsing npm list JSON output: {e}")
return {"resolutions": [], "patches": [], "aliased_packages": {}}
except Exception as e:
os.chdir(current_dir)
logging.error(f"Unexpected error in extract_deps_from_npm: {e}")
return {"resolutions": [], "patches": [], "aliased_packages": {}}


def extract_deps_from_yarn_berry(repo_path, yarn_lock_file):
Expand Down Expand Up @@ -698,9 +810,31 @@
for plugin in retrieved_plugins
]

dependency_tree = defaultdict(lambda: {"paths": [], "immediate_parents": set()})
pkg_name_with_resolution = set()

for dep in parsed_deps + parsed_plugins:
child = dep["info"]
parent = dep["parent"]
pkg_name_with_resolution.add(child)

if parent:
dependency_tree[child]["paths"].append([parent, child])
dependency_tree[child]["immediate_parents"].add(parent)
else:
# Root dependency
dependency_tree[child]["paths"].append([child])

# Create the result
deps_list_data = {
"resolutions": list({item["info"]: item for item in parsed_plugins + parsed_deps}.values()),
"resolutions": list(
{
"info": info,
"parent": format_paths_for_markdown(dependency_tree.get(info, {}).get("paths", []), info, "maven"),
"command": command,
}
for info, command in {item["info"]: item["command"] for item in parsed_deps + parsed_plugins}.items()
),
"patches": [],
}

Expand Down
5 changes: 3 additions & 2 deletions tool/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ def get_lockfile(project_repo_name, release_version, package_manager):
try:
lockfile_name = LOOKING_FOR[package_manager]
logging.info(f"Getting {lockfile_name} for {project_repo_name}@{release_version}")
logging.info(f"Package manager: {package_manager}")
except KeyError:
logging.error("Invalid package manager or lack of lockfile: %s", package_manager)
raise ValueError("Invalid package manager or lack of lockfile.")
Expand Down Expand Up @@ -230,6 +229,7 @@ def get_deps(folder_path, project_repo_name, release_version, package_manager):
deps_list_all = None

logging.info("Getting dependencies for %s@%s...", project_repo_name, release_version)
logging.info(f"Package manager: {package_manager}")

# if it is a pnpm monorepo
if package_manager == "pnpm":
Expand All @@ -252,8 +252,9 @@ def get_deps(folder_path, project_repo_name, release_version, package_manager):
patches_info = extract_deps.get_patches_info(project_repo_name, yarn_file)

elif package_manager == "npm":
repo_path = tool_config.clone_repo(project_repo_name, release_version)
npm_file, _, _ = get_lockfile(project_repo_name, release_version, package_manager)
deps_list_all = extract_deps.extract_deps_from_npm(project_repo_name, npm_file)
deps_list_all = extract_deps.extract_deps_from_npm(repo_path, npm_file)

elif package_manager == "maven":
# Maven is more complex, because of child packages in the repo/pom; this requires to clone the whole repo
Expand Down
6 changes: 3 additions & 3 deletions tool/report_static.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import subprocess
from datetime import datetime
import pandas as pd
from tool.tool_config import DEFAULT_ENABLED_CHECKS
from tool.tool_config import DEFAULT_ENABLED_CHECKS, get_package_url, get_registry_url
import logging
import re

Expand Down Expand Up @@ -145,7 +145,7 @@ def create_dataframe(data, deps_list, package_manager, enabled_checks, config):
"all_deprecated": package_data.get("package_info", {}).get("all_deprecated", None),
"signature_present": package_data.get("code_signature", {}).get("signature_present"),
"signature_valid": package_data.get("code_signature", {}).get("signature_valid"),
"parent": f"`{package_data.get("parent", "-")}`",
"parent": package_data.get("parent", "-"),
"command": f"`{package_data.get("command", "-")}`",
"is_github": source_code_data.get("is_github", False),
"github_url": source_code_data.get("github_url", "Could not find repo from package registry"),
Expand Down Expand Up @@ -858,7 +858,7 @@ def write_summary(
md_file.write("\n")
break

md_file.write("#### Ignored Smells\n\n")
md_file.write("\n#### Ignored Smells\n\n")
md_file.write("\nThe following smells were configured to be ignored in this project:\n\n")
for report in ignored_reports:
if ignored_reports[report]["enabled"]:
Expand Down
22 changes: 22 additions & 0 deletions tool/tool_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1191,3 +1191,25 @@ def get_last_page_info(
logging.error(f"Failed after {max_retries} attempts: {e}")
return None
time.sleep(retry_delay * (attempt + 1))


def get_package_url(package_name, package_manager):
if package_manager == "maven":
ga, v = package_name.split("@")
g, a = ga.split(":")
return f"https://central.sonatype.com/artifact/{g}/{a}/{v}"
elif package_manager in ["npm", "yarn-berry", "yarn-classic", "pnpm"]:
name_in_url = "/v/".join(package_name.rsplit("@", 1)) # replaces last occurrence of @ for /v/
return f"https://npmjs.com/package/{name_in_url}"
raise ValueError("Package Manager not supported for acquiring package URL.")


def get_registry_url(package_name, package_manager):
if package_manager == "maven":
ga, v = package_name.split("@")
g, a = ga.split(":")
return f"https://central.sonatype.com/artifact/{g}/{a}/{v}"
elif package_manager in ["npm", "yarn-berry", "yarn-classic", "pnpm"]:
name_in_url = "/".join(package_name.rsplit("@", 1)) # replaces last occurrence of @ for /v/
return f"https://registry.npmjs.com/{name_in_url}"
raise ValueError("Package Manager not supported for acquiring registry URL.")
Loading