Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion socketsecurity/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
__author__ = 'socket.dev'
__version__ = '2.0.9'
__version__ = '2.0.10'
254 changes: 125 additions & 129 deletions socketsecurity/core/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import os
import re
import json
import logging
logging.basicConfig(level=logging.DEBUG)

from pathlib import Path
from mdutils import MdUtils
Expand All @@ -16,7 +18,7 @@ class Messages:
def map_severity_to_sarif(severity: str) -> str:
"""
Map Socket severity levels to SARIF levels (GitHub code scanning).

'low' -> 'note'
'medium' or 'middle' -> 'warning'
'high' or 'critical' -> 'error'
Expand All @@ -39,115 +41,89 @@ def find_line_in_file(packagename: str, packageversion: str, manifest_file: str)
Supports:
1) JSON-based manifest files (package-lock.json, Pipfile.lock, composer.lock)
- Locates a dictionary entry with the matching package & version
- Does a rough line-based search to find the actual line in the raw text
2) Text-based (requirements.txt, package.json, yarn.lock, etc.)
- Uses compiled regex patterns to detect a match line by line
- Searches the raw text for the key
2) Text-based (requirements.txt, package.json, yarn.lock, pnpm-lock.yaml, etc.)
- Uses regex patterns to detect a match line by line
"""
# Extract just the file name to detect manifest type
file_type = Path(manifest_file).name
logging.debug("Processing file for line lookup: %s", manifest_file)

# ----------------------------------------------------
# 1) JSON-based manifest files
# ----------------------------------------------------
if file_type in ["package-lock.json", "Pipfile.lock", "composer.lock"]:
try:
# Read entire file so we can parse JSON and also do raw line checks
with open(manifest_file, "r", encoding="utf-8") as f:
raw_text = f.read()

# Attempt JSON parse
logging.debug("Read %d characters from %s", len(raw_text), manifest_file)
data = json.loads(raw_text)

# In practice, you may need to check data["dependencies"], data["default"], etc.
# This is an example approach.
packages_dict = (
data.get("packages")
or data.get("default")
or data.get("dependencies")
or {}
)

logging.debug("Found package keys in %s: %s", manifest_file, list(packages_dict.keys()))
found_key = None
found_info = None
# Locate a dictionary entry whose 'version' matches
for key, value in packages_dict.items():
# For NPM package-lock, keys might look like "node_modules/axios"
if key.endswith(packagename) and "version" in value:
if value["version"] == packageversion:
found_key = key
found_info = value
break

if found_key and found_info:
# Search lines to approximate the correct line number
needle_key = f'"{found_key}":' # e.g. "node_modules/axios":
needle_version = f'"version": "{packageversion}"'
needle_key = f'"{found_key}":'
lines = raw_text.splitlines()
best_line = 1
snippet = None

logging.debug("Total lines in %s: %d", manifest_file, len(lines))
for i, line in enumerate(lines, start=1):
if (needle_key in line) or (needle_version in line):
best_line = i
snippet = line.strip()
break # On first match, stop

# If we found an approximate line, return it; else fallback to line 1
if best_line > 0 and snippet:
return best_line, snippet
else:
return 1, f'"{found_key}": {found_info}'
if needle_key in line:
logging.debug("Found match at line %d in %s: %s", i, manifest_file, line.strip())
return i, line.strip()
return 1, f'"{found_key}": {found_info}'
else:
return 1, f"{packagename} {packageversion} (not found in {manifest_file})"

except (FileNotFoundError, json.JSONDecodeError):
except (FileNotFoundError, json.JSONDecodeError) as e:
logging.error("Error reading %s: %s", manifest_file, e)
return 1, f"Error reading {manifest_file}"

# ----------------------------------------------------
# 2) Text-based / line-based manifests
# ----------------------------------------------------
# Define a dictionary of patterns for common manifest types
search_patterns = {
"package.json": rf'"{packagename}":\s*"{packageversion}"',
"yarn.lock": rf'{packagename}@{packageversion}',
"pnpm-lock.yaml": rf'"{re.escape(packagename)}"\s*:\s*\{{[^}}]*"version":\s*"{re.escape(packageversion)}"',
"requirements.txt": rf'^{re.escape(packagename)}\s*(?:==|===|!=|>=|<=|~=|\s+)?\s*{re.escape(packageversion)}(?:\s*;.*)?$',
"pyproject.toml": rf'{packagename}\s*=\s*"{packageversion}"',
"Pipfile": rf'"{packagename}"\s*=\s*"{packageversion}"',
"go.mod": rf'require\s+{re.escape(packagename)}\s+{re.escape(packageversion)}',
"go.sum": rf'{re.escape(packagename)}\s+{re.escape(packageversion)}',
"pom.xml": rf'<artifactId>{re.escape(packagename)}</artifactId>\s*<version>{re.escape(packageversion)}</version>',
"build.gradle": rf'implementation\s+"{re.escape(packagename)}:{re.escape(packageversion)}"',
"Gemfile": rf'gem\s+"{re.escape(packagename)}",\s*"{re.escape(packageversion)}"',
"Gemfile.lock": rf'\s+{re.escape(packagename)}\s+\({re.escape(packageversion)}\)',
".csproj": rf'<PackageReference\s+Include="{re.escape(packagename)}"\s+Version="{re.escape(packageversion)}"\s*/>',
".fsproj": rf'<PackageReference\s+Include="{re.escape(packagename)}"\s+Version="{re.escape(packageversion)}"\s*/>',
"paket.dependencies": rf'nuget\s+{re.escape(packagename)}\s+{re.escape(packageversion)}',
"Cargo.toml": rf'{re.escape(packagename)}\s*=\s*"{re.escape(packageversion)}"',
"build.sbt": rf'"{re.escape(packagename)}"\s*%\s*"{re.escape(packageversion)}"',
"Podfile": rf'pod\s+"{re.escape(packagename)}",\s*"{re.escape(packageversion)}"',
"Package.swift": rf'\.package\(name:\s*"{re.escape(packagename)}",\s*url:\s*".*?",\s*version:\s*"{re.escape(packageversion)}"\)',
"mix.exs": rf'\{{:{re.escape(packagename)},\s*"{re.escape(packageversion)}"\}}',
"composer.json": rf'"{re.escape(packagename)}":\s*"{re.escape(packageversion)}"',
"conanfile.txt": rf'{re.escape(packagename)}/{re.escape(packageversion)}',
"vcpkg.json": rf'"{re.escape(packagename)}":\s*"{re.escape(packageversion)}"',
}

# If no specific pattern is found for this file name, fallback to a naive approach
searchstring = search_patterns.get(file_type, rf'{re.escape(packagename)}.*{re.escape(packageversion)}')
# For pnpm-lock.yaml, use a special regex pattern.
if file_type.lower() == "pnpm-lock.yaml":
searchstring = rf'^\s*/{re.escape(packagename)}/{re.escape(packageversion)}:'
else:
search_patterns = {
"package.json": rf'"{packagename}":\s*"[\^~]?{re.escape(packageversion)}"',
"yarn.lock": rf'{packagename}@{packageversion}',
"requirements.txt": rf'^{re.escape(packagename)}\s*(?:==|===|!=|>=|<=|~=|\s+)?\s*{re.escape(packageversion)}(?:\s*;.*)?$',
"pyproject.toml": rf'{packagename}\s*=\s*"{re.escape(packageversion)}"',
"Pipfile": rf'"{packagename}"\s*=\s*"{re.escape(packageversion)}"',
"go.mod": rf'require\s+{re.escape(packagename)}\s+{re.escape(packageversion)}',
"go.sum": rf'{re.escape(packagename)}\s+{re.escape(packageversion)}',
"pom.xml": rf'<artifactId>{re.escape(packagename)}</artifactId>\s*<version>{re.escape(packageversion)}</version>',
"build.gradle": rf'implementation\s+"{re.escape(packagename)}:{re.escape(packageversion)}"',
"Gemfile": rf'gem\s+"{re.escape(packagename)}",\s*"{re.escape(packageversion)}"',
"Gemfile.lock": rf'\s+{re.escape(packagename)}\s+\({re.escape(packageversion)}\)',
".csproj": rf'<PackageReference\s+Include="{re.escape(packagename)}"\s+Version="{re.escape(packageversion)}"\s*/>',
".fsproj": rf'<PackageReference\s+Include="{re.escape(packagename)}"\s+Version="{re.escape(packageversion)}"\s*/>',
"paket.dependencies": rf'nuget\s+{re.escape(packagename)}\s+{re.escape(packageversion)}',
"Cargo.toml": rf'{re.escape(packagename)}\s*=\s*"{re.escape(packageversion)}"',
"build.sbt": rf'"{re.escape(packagename)}"\s*%\s*"{re.escape(packageversion)}"',
"Podfile": rf'pod\s+"{re.escape(packagename)}",\s*"{re.escape(packageversion)}"',
"Package.swift": rf'\.package\(name:\s*"{re.escape(packagename)}",\s*url:\s*".*?",\s*version:\s*"{re.escape(packageversion)}"\)',
"mix.exs": rf'\{{:{re.escape(packagename)},\s*"{re.escape(packageversion)}"\}}',
"composer.json": rf'"{re.escape(packagename)}":\s*"{re.escape(packageversion)}"',
"conanfile.txt": rf'{re.escape(packagename)}/{re.escape(packageversion)}',
"vcpkg.json": rf'"{re.escape(packagename)}":\s*"{re.escape(packageversion)}"',
}
searchstring = search_patterns.get(file_type, rf'{re.escape(packagename)}.*{re.escape(packageversion)}')

logging.debug("Using search pattern for %s: %s", file_type, searchstring)
try:
# Read file lines and search for a match
with open(manifest_file, 'r', encoding="utf-8") as file:
lines = [line.rstrip("\n") for line in file]
logging.debug("Total lines in %s: %d", manifest_file, len(lines))
for line_number, line_content in enumerate(lines, start=1):
# For Python conditional dependencies, ignore everything after first ';'
line_main = line_content.split(";", 1)[0].strip()

# Use a case-insensitive regex search
if re.search(searchstring, line_main, re.IGNORECASE):
logging.debug("Match found at line %d in %s: %s", line_number, manifest_file, line_content.strip())
return line_number, line_content.strip()

except FileNotFoundError:
return 1, f"{manifest_file} not found"
except Exception as e:
Expand Down Expand Up @@ -181,7 +157,6 @@ def get_manifest_type_url(manifest_file: str, pkg_name: str, pkg_version: str) -
"composer.json": "composer",
"vcpkg.json": "vcpkg",
}

file_type = Path(manifest_file).name
url_prefix = manifest_to_url_prefix.get(file_type, "unknown")
return f"https://socket.dev/{url_prefix}/package/{pkg_name}/alerts/{pkg_version}"
Expand All @@ -191,29 +166,33 @@ def create_security_comment_sarif(diff) -> dict:
"""
Create SARIF-compliant output from the diff report, including dynamic URL generation
based on manifest type and improved <br/> formatting for GitHub SARIF display.

This function now:
- Processes every alert in diff.new_alerts.
- For alerts with multiple manifest files, generates an individual SARIF result for each file.
- Appends the manifest file name to the rule ID and name to make each result unique.
- Does NOT fall back to 'requirements.txt' if no manifest file is provided.
- Adds detailed logging to validate our assumptions.

"""
scan_failed = False
if len(diff.new_alerts) == 0:
for alert in diff.new_alerts:
alert: Issue
if alert.error:
scan_failed = True
break

sarif_data = {
"$schema": "https://json.schemastore.org/sarif-2.1.0.json",
"version": "2.1.0",
"runs": [
{
"tool": {
"driver": {
"name": "Socket Security",
"informationUri": "https://socket.dev",
"rules": []
}
},
"results": []
}
]
"runs": [{
"tool": {
"driver": {
"name": "Socket Security",
"informationUri": "https://socket.dev",
"rules": []
}
},
"results": []
}]
}

rules_map = {}
Expand All @@ -222,60 +201,77 @@ def create_security_comment_sarif(diff) -> dict:
for alert in diff.new_alerts:
pkg_name = alert.pkg_name
pkg_version = alert.pkg_version
rule_id = f"{pkg_name}=={pkg_version}"
base_rule_id = f"{pkg_name}=={pkg_version}"
severity = alert.severity

# Generate the correct URL for the alert based on manifest type
introduced_list = alert.introduced_by
manifest_file = introduced_list[0][1] if introduced_list and isinstance(introduced_list[0], list) else alert.manifests or "requirements.txt"
socket_url = Messages.get_manifest_type_url(manifest_file, pkg_name, pkg_version)

# Prepare descriptions with <br/> replacements
short_desc = f"{alert.props.get('note', '')}<br/><br/>Suggested Action:<br/>{alert.suggestion}<br/><a href=\"{socket_url}\">{socket_url}</a>"
full_desc = "{} - {}".format(alert.title, alert.description.replace('\r\n', '<br/>'))

# Identify the line and snippet in the manifest file
line_number, line_content = Messages.find_line_in_file(pkg_name, pkg_version, manifest_file)
if line_number < 1:
line_number = 1 # Ensure SARIF compliance

# Create the rule if not already defined
if rule_id not in rules_map:
rules_map[rule_id] = {
"id": rule_id,
"name": f"{pkg_name}=={pkg_version}",
"shortDescription": {"text": f"Alert generated for {rule_id} by Socket Security"},
"fullDescription": {"text": full_desc},
"helpUri": socket_url,
"defaultConfiguration": {
"level": Messages.map_severity_to_sarif(severity)
},
}
logging.debug("Alert %s - introduced_by: %s, manifests: %s", base_rule_id, alert.introduced_by, getattr(alert, 'manifests', None))
manifest_files = []
if alert.introduced_by and isinstance(alert.introduced_by, list):
for entry in alert.introduced_by:
if isinstance(entry, (list, tuple)) and len(entry) >= 2:
files = [f.strip() for f in entry[1].split(";") if f.strip()]
manifest_files.extend(files)
elif isinstance(entry, str):
manifest_files.extend([m.strip() for m in entry.split(";") if m.strip()])
elif hasattr(alert, 'manifests') and alert.manifests:
manifest_files = [mf.strip() for mf in alert.manifests.split(";") if mf.strip()]

logging.debug("Alert %s - extracted manifest_files: %s", base_rule_id, manifest_files)
if not manifest_files:
logging.error("Alert %s: No manifest file found; cannot determine file location.", base_rule_id)
continue

logging.debug("Alert %s - using manifest_files for processing: %s", base_rule_id, manifest_files)

# Create an individual SARIF result for each manifest file.
for mf in manifest_files:
logging.debug("Alert %s - Processing manifest file: %s", base_rule_id, mf)
socket_url = Messages.get_manifest_type_url(mf, pkg_name, pkg_version)
line_number, line_content = Messages.find_line_in_file(pkg_name, pkg_version, mf)
if line_number < 1:
line_number = 1
logging.debug("Alert %s: Manifest %s, line %d: %s", base_rule_id, mf, line_number, line_content)

# Create a unique rule id and name by appending the manifest file.
unique_rule_id = f"{base_rule_id} ({mf})"
rule_name = f"Alert {base_rule_id} ({mf})"

short_desc = (f"{alert.props.get('note', '')}<br/><br/>Suggested Action:<br/>{alert.suggestion}"
f"<br/><a href=\"{socket_url}\">{socket_url}</a>")
full_desc = "{} - {}".format(alert.title, alert.description.replace('\r\n', '<br/>'))

if unique_rule_id not in rules_map:
rules_map[unique_rule_id] = {
"id": unique_rule_id,
"name": rule_name,
"shortDescription": {"text": rule_name},
"fullDescription": {"text": full_desc},
"helpUri": socket_url,
"defaultConfiguration": {
"level": Messages.map_severity_to_sarif(severity)
},
}

# Add the SARIF result
result_obj = {
"ruleId": rule_id,
"message": {"text": short_desc},
"locations": [
{
result_obj = {
"ruleId": unique_rule_id,
"message": {"text": short_desc},
"locations": [{
"physicalLocation": {
"artifactLocation": {"uri": manifest_file},
"artifactLocation": {"uri": mf},
"region": {
"startLine": line_number,
"snippet": {"text": line_content},
},
}
}
],
}
results_list.append(result_obj)
}]
}
results_list.append(result_obj)

# Attach rules and results
sarif_data["runs"][0]["tool"]["driver"]["rules"] = list(rules_map.values())
sarif_data["runs"][0]["results"] = results_list

return sarif_data

@staticmethod
def create_security_comment_json(diff: Diff) -> dict:
scan_failed = False
Expand Down
Loading