diff --git a/dje/tests/testfiles/test_dataset_pp_only.json b/dje/tests/testfiles/test_dataset_pp_only.json
index 988add2d..03cb35a5 100644
--- a/dje/tests/testfiles/test_dataset_pp_only.json
+++ b/dje/tests/testfiles/test_dataset_pp_only.json
@@ -74,6 +74,7 @@
"uuid": "565737ed-ab23-46eb-bbcf-185da2da50dc",
"created_date": "2011-08-24T09:20:01Z",
"last_modified_date": "2011-08-24T09:20:01Z",
+ "risk_score": null,
"keywords": [
"Framework"
],
diff --git a/product_portfolio/migrations/0015_product_risk_score_productaffectedbyvulnerability_and_more.py b/product_portfolio/migrations/0015_product_risk_score_productaffectedbyvulnerability_and_more.py
new file mode 100644
index 00000000..e3930eac
--- /dev/null
+++ b/product_portfolio/migrations/0015_product_risk_score_productaffectedbyvulnerability_and_more.py
@@ -0,0 +1,42 @@
+# Generated by Django 5.2.8 on 2025-12-17 12:00
+
+import django.db.models.deletion
+import dje.models
+import uuid
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ('dje', '0012_alter_dataspaceconfiguration_sourcehut_token'),
+ ('product_portfolio', '0014_scancodeproject_infer_download_urls'),
+ ('vulnerabilities', '0005_vulnerabilityanalysis_is_reachable_and_more'),
+ ]
+
+ operations = [
+ migrations.AddField(
+ model_name='product',
+ name='risk_score',
+ field=models.DecimalField(blank=True, decimal_places=1, help_text='Risk score between 0.0 and 10.0, where higher values indicate greater vulnerability risk for the package.', max_digits=3, null=True),
+ ),
+ migrations.CreateModel(
+ name='ProductAffectedByVulnerability',
+ fields=[
+ ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
+ ('uuid', models.UUIDField(default=uuid.uuid4, editable=False, verbose_name='UUID')),
+ ('dataspace', models.ForeignKey(editable=False, help_text='A Dataspace is an independent, exclusive set of DejaCode data, which can be either nexB master reference data or installation-specific data.', on_delete=django.db.models.deletion.PROTECT, to='dje.dataspace')),
+ ('product', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='product_portfolio.product')),
+ ('vulnerability', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='vulnerabilities.vulnerability')),
+ ],
+ options={
+ 'unique_together': {('dataspace', 'uuid'), ('product', 'vulnerability')},
+ },
+ bases=(dje.models.DataspaceForeignKeyValidationMixin, models.Model),
+ ),
+ migrations.AddField(
+ model_name='product',
+ name='affected_by_vulnerabilities',
+ field=models.ManyToManyField(help_text='Vulnerabilities directly affecting this product.', related_name='affected_%(class)ss', through='product_portfolio.ProductAffectedByVulnerability', to='vulnerabilities.vulnerability'),
+ ),
+ ]
diff --git a/product_portfolio/models.py b/product_portfolio/models.py
index c82db0b4..9cb6bdca 100644
--- a/product_portfolio/models.py
+++ b/product_portfolio/models.py
@@ -50,6 +50,8 @@
from dje.validators import validate_url_segment
from dje.validators import validate_version
from vulnerabilities.fetch import fetch_for_packages
+from vulnerabilities.models import AffectedByVulnerabilityMixin
+from vulnerabilities.models import AffectedByVulnerabilityRelationship
RELATION_LICENSE_EXPRESSION_HELP_TEXT = _(
"The License Expression assigned to a DejaCode Product Package or Product "
@@ -204,7 +206,13 @@ def get_related_secured_queryset(self, user):
BaseProductMixin = component_mixin_factory("product")
-class Product(BaseProductMixin, FieldChangesMixin, KeywordsMixin, DataspacedModel):
+class Product(
+ BaseProductMixin,
+ FieldChangesMixin,
+ KeywordsMixin,
+ AffectedByVulnerabilityMixin,
+ DataspacedModel,
+):
license_expression = models.CharField(
max_length=1024,
blank=True,
@@ -278,6 +286,13 @@ class Product(BaseProductMixin, FieldChangesMixin, KeywordsMixin, DataspacedMode
through="ProductPackage",
)
+ affected_by_vulnerabilities = models.ManyToManyField(
+ to="vulnerabilities.Vulnerability",
+ through="ProductAffectedByVulnerability",
+ related_name="affected_%(class)ss",
+ help_text=_("Vulnerabilities directly affecting this product."),
+ )
+
objects = ProductSecuredManager()
# WARNING: Bypass the security system implemented in ProductSecuredManager.
@@ -634,6 +649,16 @@ def get_vulnerability_qs(self, prefetch_related_packages=False, risk_threshold=N
return vulnerability_qs
+class ProductAffectedByVulnerability(AffectedByVulnerabilityRelationship):
+ product = models.ForeignKey(
+ to="product_portfolio.Product",
+ on_delete=models.CASCADE,
+ )
+
+ class Meta:
+ unique_together = (("product", "vulnerability"), ("dataspace", "uuid"))
+
+
class ProductRelationStatus(BaseStatusMixin, DataspacedModel):
class Meta(BaseStatusMixin.Meta):
verbose_name_plural = _("product relation status")
@@ -731,7 +756,7 @@ def update_license_unknown(self):
product_package.update_license_unknown()
def annotate_weighted_risk_score(self):
- """Annotate the Queeryset with the weighted_risk_score computed value."""
+ """Annotate the Queryset with the weighted_risk_score computed value."""
purpose = ProductItemPurpose.objects.filter(productpackage=OuterRef("pk"))
package = Package.objects.filter(productpackages=OuterRef("pk"))
diff --git a/product_portfolio/templates/product_portfolio/tables/product_list_table.html b/product_portfolio/templates/product_portfolio/tables/product_list_table.html
index 1d8b1295..b69e966b 100644
--- a/product_portfolio/templates/product_portfolio/tables/product_list_table.html
+++ b/product_portfolio/templates/product_portfolio/tables/product_list_table.html
@@ -29,7 +29,7 @@
R
{% endif %}
- {% if product.is_vulnerable %}
+ {% if product.has_vulnerable_packages %}
{% include 'component_catalog/includes/vulnerability_icon_link.html' with url=product.get_absolute_url only %}
diff --git a/product_portfolio/tests/test_models.py b/product_portfolio/tests/test_models.py
index f5f032bc..f5abe9d2 100644
--- a/product_portfolio/tests/test_models.py
+++ b/product_portfolio/tests/test_models.py
@@ -155,8 +155,11 @@ def test_product_model_all_packages(self):
def test_product_model_get_vulnerable_packages(self):
self.assertEqual(0, self.product1.get_vulnerable_packages().count())
- package1 = make_package(self.dataspace, is_vulnerable=True, risk_score=5.0)
+ package1 = make_package(self.dataspace)
+ vulnerability1 = make_vulnerability(self.dataspace, risk_score=5.0)
+ package1.add_affected_by(vulnerability1)
make_product_package(self.product1, package1)
+
self.assertEqual(1, self.product1.get_vulnerable_packages().count())
self.assertEqual(0, self.product1.get_vulnerable_packages(risk_threshold=6.0).count())
self.assertEqual(1, self.product1.get_vulnerable_packages(risk_threshold=4.0).count())
@@ -600,6 +603,27 @@ def test_product_model_improve_packages_from_purldb(self, mock_update_from_purld
pp1.refresh_from_db()
self.assertEqual("apache-2.0", pp1.license_expression)
+ def test_product_model_affected_by_vulnerabilities(self):
+ vulnerability1 = make_vulnerability(self.dataspace, risk_score=1.0)
+ vulnerability2 = make_vulnerability(self.dataspace, risk_score=10.0)
+ vulnerability3 = make_vulnerability(self.dataspace, risk_score=5.0)
+
+ vulnerability1.add_affected(self.product1)
+ affected_by = self.product1.affected_by_vulnerabilities.all()
+ self.assertQuerySetEqual([vulnerability1], affected_by)
+ self.product1.refresh_from_db()
+ self.assertEqual(1.0, self.product1.risk_score)
+
+ vulnerability2.add_affected(self.product1)
+ affected_by = self.product1.affected_by_vulnerabilities.order_by("id")
+ self.assertQuerySetEqual([vulnerability1, vulnerability2], affected_by)
+ self.product1.refresh_from_db()
+ self.assertEqual(10.0, self.product1.risk_score)
+
+ vulnerability3.add_affected(self.product1)
+ self.product1.refresh_from_db()
+ self.assertEqual(10.0, self.product1.risk_score)
+
def test_product_model_get_vulnerability_qs(self):
package1 = make_package(self.dataspace)
package2 = make_package(self.dataspace)
diff --git a/product_portfolio/views.py b/product_portfolio/views.py
index 9b55fa63..f20b8b56 100644
--- a/product_portfolio/views.py
+++ b/product_portfolio/views.py
@@ -208,7 +208,7 @@ def get_queryset(self):
)
.annotate(
productinventoryitem_count=Count("productinventoryitem", distinct=True),
- is_vulnerable=Exists(vulnerable_productpackage_qs),
+ has_vulnerable_packages=Exists(vulnerable_productpackage_qs),
)
.order_by(
"name",
diff --git a/vulnerabilities/fetch.py b/vulnerabilities/fetch.py
index 961bffa3..b922127c 100644
--- a/vulnerabilities/fetch.py
+++ b/vulnerabilities/fetch.py
@@ -135,7 +135,7 @@ def create_or_update_vulnerability(
if updated_fields:
results["updated"] += 1
- vulnerability.add_affected_packages(affected_packages)
+ vulnerability.add_affected(affected_packages)
return vulnerability
diff --git a/vulnerabilities/models.py b/vulnerabilities/models.py
index c50d1ea1..456c9b74 100644
--- a/vulnerabilities/models.py
+++ b/vulnerabilities/models.py
@@ -60,7 +60,7 @@ class Vulnerability(HistoryDateFieldsMixin, DataspacedModel):
A software vulnerability with a unique identifier and alternate aliases.
Adapted from the VulnerableCode models at
- https://github.com/nexB/vulnerablecode/blob/main/vulnerabilities/models.py#L164
+ https://github.com/nexB/vulnerablecode/blob/main/vulnerabilities/models.py
Note that this model implements the HistoryDateFieldsMixin but not the
HistoryUserFieldsMixin as the Vulnerability records are usually created
@@ -172,31 +172,12 @@ def cve(self):
return alias
def add_affected(self, instances):
- """
- Assign the ``instances`` (Package or Component) as affected to this
- vulnerability.
- """
- from component_catalog.models import Component
- from component_catalog.models import Package
-
- if not isinstance(instances, list):
+ """Assign the ``instances`` (Package or Product) as affected by this vulnerability."""
+ if not isinstance(instances, (list, tuple, models.QuerySet)):
instances = [instances]
for instance in instances:
- if isinstance(instance, Package):
- self.add_affected_packages([instance])
- if isinstance(instance, Component):
- self.add_affected_components([instance])
-
- def add_affected_packages(self, packages):
- """Assign the ``packages`` as affected to this vulnerability."""
- through_defaults = {"dataspace_id": self.dataspace_id}
- self.affected_packages.add(*packages, through_defaults=through_defaults)
-
- def add_affected_components(self, components):
- """Assign the ``components`` as affected to this vulnerability."""
- through_defaults = {"dataspace_id": self.dataspace_id}
- self.affected_components.add(*components, through_defaults=through_defaults)
+ instance.add_affected_by(vulnerability=self)
@classmethod
def create_from_data(cls, dataspace, data, validate=False, affecting=None):
@@ -420,6 +401,21 @@ class Meta:
def is_vulnerable(self):
return self.affected_by_vulnerabilities.exists()
+ def update_risk_score(self):
+ """Calculate and save the maximum risk score from all affected vulnerabilities."""
+ qs = self.affected_by_vulnerabilities.aggregate(models.Max("risk_score"))
+ max_score = qs["risk_score__max"]
+
+ self.risk_score = max_score
+ self.save(update_fields=["risk_score"])
+ return self.risk_score
+
+ def add_affected_by(self, vulnerability):
+ """Add ``vulnerability`` as affecting this instance."""
+ through_defaults = {"dataspace_id": self.dataspace_id}
+ self.affected_by_vulnerabilities.add(vulnerability, through_defaults=through_defaults)
+ self.update_risk_score()
+
def get_entry_for_package(self, vulnerablecode):
if not self.package_url:
return
@@ -487,7 +483,7 @@ def create_vulnerabilities(self, vulnerabilities_data):
through_defaults = {"dataspace_id": self.dataspace_id}
self.affected_by_vulnerabilities.add(*vulnerabilities, through_defaults=through_defaults)
- self.update(risk_score=vulnerability_data["risk_score"])
+ self.update_risk_score()
if isinstance(self, Package):
self.productpackages.update_weighted_risk_score()
diff --git a/vulnerabilities/tests/test_models.py b/vulnerabilities/tests/test_models.py
index 87e528b9..bbb6053c 100644
--- a/vulnerabilities/tests/test_models.py
+++ b/vulnerabilities/tests/test_models.py
@@ -82,19 +82,74 @@ def test_vulnerability_mixin_create_vulnerabilities(self):
response_file = self.data / "vulnerabilities" / "idna_3.6_response.json"
response_json = json.loads(response_file.read_text())
vulnerabilities_data = response_json["results"][0]["affected_by_vulnerabilities"]
+ vulnerabilities_data.append({"vulnerability_id": "VCID-0002", "risk_score": 5.0})
package1 = make_package(self.dataspace, package_url="pkg:pypi/idna@3.6")
product1 = make_product(self.dataspace, inventory=[package1])
package1.create_vulnerabilities(vulnerabilities_data)
- self.assertEqual(1, Vulnerability.objects.scope(self.dataspace).count())
- self.assertEqual(1, package1.affected_by_vulnerabilities.count())
- vulnerability = package1.affected_by_vulnerabilities.get()
- self.assertEqual("VCID-j3au-usaz-aaag", vulnerability.vulnerability_id)
-
- self.assertEqual(8.4, package1.risk_score)
+ self.assertEqual(2, Vulnerability.objects.scope(self.dataspace).count())
+ self.assertEqual("8.4", str(package1.risk_score))
self.assertEqual("8.4", str(product1.productpackages.get().weighted_risk_score))
+ def test_vulnerability_mixin_update_risk_score(self):
+ package1 = make_package(self.dataspace)
+
+ # Test with no vulnerabilities
+ package1.update_risk_score()
+ self.assertIsNone(package1.risk_score)
+
+ # Test with one vulnerability with risk score
+ vulnerability1 = make_vulnerability(dataspace=self.dataspace, risk_score=7.5)
+ vulnerability1.add_affected(package1)
+ package1.update_risk_score()
+ self.assertEqual("7.5", str(package1.risk_score))
+
+ # Test with multiple vulnerabilities, should use max
+ vulnerability2 = make_vulnerability(dataspace=self.dataspace, risk_score=9.2)
+ vulnerability2.add_affected(package1)
+ package1.update_risk_score()
+ self.assertEqual("9.2", str(package1.risk_score))
+
+ # Test with vulnerability with lower risk score, should keep max
+ vulnerability3 = make_vulnerability(dataspace=self.dataspace, risk_score=3.1)
+ vulnerability3.add_affected(package1)
+ package1.update_risk_score()
+ self.assertEqual("9.2", str(package1.risk_score))
+
+ # Test with all vulnerabilities having NULL risk scores
+ package2 = make_package(self.dataspace)
+ vulnerability4 = make_vulnerability(dataspace=self.dataspace, risk_score=None)
+ vulnerability5 = make_vulnerability(dataspace=self.dataspace, risk_score=None)
+ vulnerability4.add_affected(package2)
+ vulnerability5.add_affected(package2)
+ package2.update_risk_score()
+ self.assertIsNone(package2.risk_score)
+
+ def test_vulnerability_mixin_add_affected_by(self):
+ package1 = make_package(self.dataspace)
+
+ vulnerability1 = make_vulnerability(self.dataspace, risk_score=1.0)
+ vulnerability2 = make_vulnerability(self.dataspace, risk_score=10.0)
+ vulnerability3 = make_vulnerability(self.dataspace, risk_score=5.0)
+
+ package1.add_affected_by(vulnerability1)
+ package1.refresh_from_db()
+ self.assertEqual("1.0", str(package1.risk_score))
+
+ package1.add_affected_by(vulnerability2)
+ package1.refresh_from_db()
+ self.assertEqual("10.0", str(package1.risk_score))
+
+ package1.add_affected_by(vulnerability3)
+ package1.refresh_from_db()
+ self.assertEqual("10.0", str(package1.risk_score))
+
+ self.assertEqual(package1, vulnerability1.affected_packages.get())
+ self.assertEqual(package1, vulnerability2.affected_packages.get())
+ self.assertEqual(package1, vulnerability3.affected_packages.get())
+ self.assertEqual(3, package1.affected_by_vulnerabilities.count())
+
def test_vulnerability_model_affected_packages_m2m(self):
package1 = make_package(self.dataspace)
vulnerability1 = make_vulnerability(dataspace=self.dataspace, affecting=package1)