Skip to content

Commit 376a4d5

Browse files
authored
Merge pull request #2111 from ziadhany/alpine-migrate
Migrate Alpine importer to advisory V2
2 parents b1ba74c + 421c089 commit 376a4d5

File tree

5 files changed

+911
-0
lines changed

5 files changed

+911
-0
lines changed

vulnerabilities/importers/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
from vulnerabilities.pipelines import nvd_importer
4242
from vulnerabilities.pipelines import pypa_importer
4343
from vulnerabilities.pipelines import pysec_importer
44+
from vulnerabilities.pipelines.v2_importers import alpine_linux_importer as alpine_linux_importer_v2
4445
from vulnerabilities.pipelines.v2_importers import aosp_importer as aosp_importer_v2
4546
from vulnerabilities.pipelines.v2_importers import apache_httpd_importer as apache_httpd_v2
4647
from vulnerabilities.pipelines.v2_importers import apache_kafka_importer as apache_kafka_importer_v2
@@ -111,6 +112,7 @@
111112
apache_tomcat_v2.ApacheTomcatImporterPipeline,
112113
retiredotnet_importer_v2.RetireDotnetImporterPipeline,
113114
ubuntu_osv_importer_v2.UbuntuOSVImporterPipeline,
115+
alpine_linux_importer_v2.AlpineLinuxImporterPipeline,
114116
nvd_importer.NVDImporterPipeline,
115117
github_importer.GitHubAPIImporterPipeline,
116118
gitlab_importer.GitLabImporterPipeline,
Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# VulnerableCode is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
10+
import logging
11+
from pathlib import Path
12+
from typing import Any
13+
from typing import Iterable
14+
from typing import List
15+
from typing import Mapping
16+
17+
from fetchcode.vcs import fetch_via_vcs
18+
from packageurl import PackageURL
19+
from univers.version_range import AlpineLinuxVersionRange
20+
from univers.versions import InvalidVersion
21+
22+
from vulnerabilities.importer import AdvisoryDataV2
23+
from vulnerabilities.importer import AffectedPackageV2
24+
from vulnerabilities.importer import ReferenceV2
25+
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2
26+
from vulnerabilities.references import WireSharkReferenceV2
27+
from vulnerabilities.references import XsaReferenceV2
28+
from vulnerabilities.references import ZbxReferenceV2
29+
from vulnerabilities.utils import get_advisory_url
30+
from vulnerabilities.utils import load_json
31+
32+
33+
class AlpineLinuxImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
34+
"""Collect Alpine Linux advisories."""
35+
36+
pipeline_id = "alpine_linux_importer_v2"
37+
spdx_license_expression = "CC-BY-SA-4.0"
38+
license_url = "https://secdb.alpinelinux.org/license.txt"
39+
repo_url = "git+https://github.com/aboutcode-org/aboutcode-mirror-alpine-secdb/"
40+
41+
@classmethod
42+
def steps(cls):
43+
return (
44+
cls.clone,
45+
cls.collect_and_store_advisories,
46+
)
47+
48+
def advisories_count(self) -> int:
49+
base_path = Path(self.vcs_response.dest_dir) / "secdb"
50+
return sum(
51+
len(pkg.get("advisories", []))
52+
for data in (load_json(p) for p in base_path.rglob("*.json"))
53+
for pkg in data.get("packages", [])
54+
)
55+
56+
def clone(self):
57+
self.log(f"Cloning `{self.repo_url}`")
58+
self.vcs_response = fetch_via_vcs(self.repo_url)
59+
60+
def collect_advisories(self) -> Iterable[AdvisoryDataV2]:
61+
base_path = Path(self.vcs_response.dest_dir) / "secdb"
62+
for file_path in base_path.glob("**/*.json"):
63+
advisory_url = get_advisory_url(
64+
file=file_path,
65+
base_path=base_path,
66+
url="https://secdb.alpinelinux.org/",
67+
)
68+
69+
record = load_json(file_path)
70+
if not record or not record["packages"]:
71+
self.log(
72+
f'"packages" not found in {advisory_url!r}',
73+
level=logging.DEBUG,
74+
)
75+
continue
76+
yield from process_record(record=record, url=advisory_url, logger=self.log)
77+
78+
def clean_downloads(self):
79+
"""Cleanup any temporary repository data."""
80+
if self.vcs_response:
81+
self.log(f"Removing cloned repository")
82+
self.vcs_response.delete()
83+
84+
def on_failure(self):
85+
"""Ensure cleanup is always performed on failure."""
86+
self.clean_downloads()
87+
88+
89+
def check_for_attributes(record, logger) -> bool:
90+
attributes = ["distroversion", "reponame", "archs"]
91+
for attribute in attributes:
92+
if attribute not in record:
93+
logger(
94+
f'"{attribute!r}" not found in {record!r}',
95+
level=logging.DEBUG,
96+
)
97+
return False
98+
return True
99+
100+
101+
def process_record(record: dict, url: str, logger: callable) -> Iterable[AdvisoryDataV2]:
102+
"""
103+
Return a list of AdvisoryData objects by processing data
104+
present in that `record`
105+
"""
106+
if not record.get("packages"):
107+
logger(
108+
f'"packages" not found in this record {record!r}',
109+
level=logging.DEBUG,
110+
)
111+
return []
112+
113+
for package in record["packages"]:
114+
if not package["pkg"]:
115+
logger(
116+
f'"pkg" not found in this package {package!r}',
117+
level=logging.DEBUG,
118+
)
119+
continue
120+
if not check_for_attributes(record, logger):
121+
continue
122+
yield from load_advisories(
123+
pkg_infos=package["pkg"],
124+
distroversion=record["distroversion"],
125+
reponame=record["reponame"],
126+
archs=record["archs"],
127+
url=url,
128+
logger=logger,
129+
)
130+
131+
132+
def load_advisories(
133+
pkg_infos: Mapping[str, Any],
134+
distroversion: str,
135+
reponame: str,
136+
archs: List[str],
137+
url: str,
138+
logger: callable,
139+
) -> Iterable[AdvisoryDataV2]:
140+
"""
141+
Yield AdvisoryData by mapping data from `pkg_infos`
142+
and form PURL for AffectedPackages by using
143+
`distroversion`, `reponame`, `archs`
144+
"""
145+
if not pkg_infos.get("name"):
146+
logger(
147+
f'"name" is not available in package {pkg_infos!r}',
148+
level=logging.DEBUG,
149+
)
150+
return []
151+
152+
for version, fixed_vulns in pkg_infos["secfixes"].items():
153+
if not fixed_vulns:
154+
logger(
155+
f"No fixed vulnerabilities in version {version!r}",
156+
level=logging.DEBUG,
157+
)
158+
continue
159+
160+
# fixed_vulns is a list of strings and each string is a space-separated
161+
# list of aliases and CVES
162+
for vuln_ids in fixed_vulns:
163+
aliases = vuln_ids.strip().split(" ")
164+
vuln_id = aliases[0]
165+
166+
references = []
167+
if vuln_id.startswith("XSA"):
168+
references.append(XsaReferenceV2.from_id(xsa_id=vuln_id))
169+
170+
elif vuln_id.startswith("ZBX"):
171+
references.append(ZbxReferenceV2.from_id(zbx_id=vuln_id))
172+
173+
elif vuln_id.startswith("wnpa-sec"):
174+
references.append(WireSharkReferenceV2.from_id(wnpa_sec_id=vuln_id))
175+
176+
elif vuln_id.startswith("CVE"):
177+
references.append(
178+
ReferenceV2(
179+
reference_id=vuln_id,
180+
url=f"https://nvd.nist.gov/vuln/detail/{vuln_id}",
181+
)
182+
)
183+
184+
qualifiers = {
185+
"distroversion": distroversion,
186+
"reponame": reponame,
187+
}
188+
189+
affected_packages = []
190+
191+
fixed_version_range = None
192+
try:
193+
fixed_version_range = AlpineLinuxVersionRange.from_versions([version])
194+
except InvalidVersion as e:
195+
logger(
196+
f"{version!r} is not a valid AlpineVersion {e!r}",
197+
level=logging.DEBUG,
198+
)
199+
200+
if not isinstance(archs, List):
201+
logger(
202+
f"{archs!r} is not of `List` instance",
203+
level=logging.DEBUG,
204+
)
205+
continue
206+
207+
if archs and fixed_version_range:
208+
for arch in archs:
209+
qualifiers["arch"] = arch
210+
purl = PackageURL(
211+
type="apk",
212+
namespace="alpine",
213+
name=pkg_infos["name"],
214+
qualifiers=qualifiers,
215+
)
216+
affected_packages.append(
217+
AffectedPackageV2(
218+
package=purl,
219+
fixed_version_range=fixed_version_range,
220+
)
221+
)
222+
223+
if not archs and fixed_version_range:
224+
# there is no arch, this is not an arch-specific package
225+
purl = PackageURL(
226+
type="apk",
227+
namespace="alpine",
228+
name=pkg_infos["name"],
229+
qualifiers=qualifiers,
230+
)
231+
affected_packages.append(
232+
AffectedPackageV2(
233+
package=purl,
234+
fixed_version_range=fixed_version_range,
235+
)
236+
)
237+
238+
advisory_id = f"{pkg_infos['name']}/{qualifiers['distroversion']}/{version}/{vuln_id}"
239+
yield AdvisoryDataV2(
240+
advisory_id=advisory_id,
241+
aliases=aliases,
242+
references=references,
243+
affected_packages=affected_packages,
244+
url=url,
245+
)

vulnerabilities/references.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#
99

1010
from vulnerabilities.importer import Reference
11+
from vulnerabilities.importer import ReferenceV2
1112

1213

1314
class XsaReference(Reference):
@@ -75,3 +76,70 @@ def from_id(cls, wnpa_sec_id):
7576
reference_id=wnpa_sec_id,
7677
url=f"https://www.wireshark.org/security/{wnpa_sec_id}.html",
7778
)
79+
80+
81+
class XsaReferenceV2:
82+
"""
83+
A Xen advisory reference. See https://xenbits.xen.org/xsa
84+
"""
85+
86+
@classmethod
87+
def from_id(cls, xsa_id):
88+
"""
89+
Return a new XsaReference from an XSA-XXXX id.
90+
"""
91+
if not xsa_id or not xsa_id.lower().startswith("xsa"):
92+
return ValueError(f"Not a Xen reference. Does not start with XSA: {xsa_id!r}")
93+
_, numid = xsa_id.rsplit("-")
94+
return ReferenceV2(
95+
reference_id=xsa_id,
96+
url=f"https://xenbits.xen.org/xsa/advisory-{numid}.html",
97+
)
98+
99+
@classmethod
100+
def from_number(cls, number):
101+
"""
102+
Return a new XsaReference from an XSA number.
103+
"""
104+
return ReferenceV2(
105+
reference_id=f"XSA-{number}",
106+
url=f"https://xenbits.xen.org/xsa/advisory-{number}.html",
107+
)
108+
109+
110+
class ZbxReferenceV2:
111+
"""
112+
A Zabbix advisory reference. See https://support.zabbix.com
113+
"""
114+
115+
@classmethod
116+
def from_id(cls, zbx_id):
117+
"""
118+
Return a new ZbxReference from an ZBX-XXXX id.
119+
"""
120+
if not zbx_id or not zbx_id.lower().startswith("zbx"):
121+
return ValueError(f"Not a Zabbix reference. Does not start with ZBX: {zbx_id!r}")
122+
return ReferenceV2(
123+
reference_id=zbx_id,
124+
url=f"https://support.zabbix.com/browse/{zbx_id}",
125+
)
126+
127+
128+
class WireSharkReferenceV2:
129+
"""
130+
A Wireshark advisory reference. See https://www.wireshark.org/security
131+
"""
132+
133+
@classmethod
134+
def from_id(cls, wnpa_sec_id):
135+
"""
136+
Return a new WireSharkReference from an wnpa-sec-XXXX id.
137+
"""
138+
if not wnpa_sec_id or not wnpa_sec_id.lower().startswith("wnpa-sec"):
139+
return ValueError(
140+
f"Not a WireShark reference. Does not start with wnpa-sec: {wnpa_sec_id!r}"
141+
)
142+
return ReferenceV2(
143+
reference_id=wnpa_sec_id,
144+
url=f"https://www.wireshark.org/security/{wnpa_sec_id}.html",
145+
)

0 commit comments

Comments
 (0)