diff --git a/dje/tests/testfiles/test_dataset_pp_only.json b/dje/tests/testfiles/test_dataset_pp_only.json index 988add2d..03cb35a5 100644 --- a/dje/tests/testfiles/test_dataset_pp_only.json +++ b/dje/tests/testfiles/test_dataset_pp_only.json @@ -74,6 +74,7 @@ "uuid": "565737ed-ab23-46eb-bbcf-185da2da50dc", "created_date": "2011-08-24T09:20:01Z", "last_modified_date": "2011-08-24T09:20:01Z", + "risk_score": null, "keywords": [ "Framework" ], diff --git a/product_portfolio/migrations/0015_product_risk_score_productaffectedbyvulnerability_and_more.py b/product_portfolio/migrations/0015_product_risk_score_productaffectedbyvulnerability_and_more.py new file mode 100644 index 00000000..e3930eac --- /dev/null +++ b/product_portfolio/migrations/0015_product_risk_score_productaffectedbyvulnerability_and_more.py @@ -0,0 +1,42 @@ +# Generated by Django 5.2.8 on 2025-12-17 12:00 + +import django.db.models.deletion +import dje.models +import uuid +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('dje', '0012_alter_dataspaceconfiguration_sourcehut_token'), + ('product_portfolio', '0014_scancodeproject_infer_download_urls'), + ('vulnerabilities', '0005_vulnerabilityanalysis_is_reachable_and_more'), + ] + + operations = [ + migrations.AddField( + model_name='product', + name='risk_score', + field=models.DecimalField(blank=True, decimal_places=1, help_text='Risk score between 0.0 and 10.0, where higher values indicate greater vulnerability risk for the package.', max_digits=3, null=True), + ), + migrations.CreateModel( + name='ProductAffectedByVulnerability', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('uuid', models.UUIDField(default=uuid.uuid4, editable=False, verbose_name='UUID')), + ('dataspace', models.ForeignKey(editable=False, help_text='A Dataspace is an independent, exclusive set of DejaCode data, which can be either nexB master reference data or installation-specific data.', on_delete=django.db.models.deletion.PROTECT, to='dje.dataspace')), + ('product', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='product_portfolio.product')), + ('vulnerability', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='vulnerabilities.vulnerability')), + ], + options={ + 'unique_together': {('dataspace', 'uuid'), ('product', 'vulnerability')}, + }, + bases=(dje.models.DataspaceForeignKeyValidationMixin, models.Model), + ), + migrations.AddField( + model_name='product', + name='affected_by_vulnerabilities', + field=models.ManyToManyField(help_text='Vulnerabilities directly affecting this product.', related_name='affected_%(class)ss', through='product_portfolio.ProductAffectedByVulnerability', to='vulnerabilities.vulnerability'), + ), + ] diff --git a/product_portfolio/models.py b/product_portfolio/models.py index c82db0b4..9cb6bdca 100644 --- a/product_portfolio/models.py +++ b/product_portfolio/models.py @@ -50,6 +50,8 @@ from dje.validators import validate_url_segment from dje.validators import validate_version from vulnerabilities.fetch import fetch_for_packages +from vulnerabilities.models import AffectedByVulnerabilityMixin +from vulnerabilities.models import AffectedByVulnerabilityRelationship RELATION_LICENSE_EXPRESSION_HELP_TEXT = _( "The License Expression assigned to a DejaCode Product Package or Product " @@ -204,7 +206,13 @@ def get_related_secured_queryset(self, user): BaseProductMixin = component_mixin_factory("product") -class Product(BaseProductMixin, FieldChangesMixin, KeywordsMixin, DataspacedModel): +class Product( + BaseProductMixin, + FieldChangesMixin, + KeywordsMixin, + AffectedByVulnerabilityMixin, + DataspacedModel, +): license_expression = models.CharField( max_length=1024, blank=True, @@ -278,6 +286,13 @@ class Product(BaseProductMixin, FieldChangesMixin, KeywordsMixin, DataspacedMode through="ProductPackage", ) + affected_by_vulnerabilities = models.ManyToManyField( + to="vulnerabilities.Vulnerability", + through="ProductAffectedByVulnerability", + related_name="affected_%(class)ss", + help_text=_("Vulnerabilities directly affecting this product."), + ) + objects = ProductSecuredManager() # WARNING: Bypass the security system implemented in ProductSecuredManager. @@ -634,6 +649,16 @@ def get_vulnerability_qs(self, prefetch_related_packages=False, risk_threshold=N return vulnerability_qs +class ProductAffectedByVulnerability(AffectedByVulnerabilityRelationship): + product = models.ForeignKey( + to="product_portfolio.Product", + on_delete=models.CASCADE, + ) + + class Meta: + unique_together = (("product", "vulnerability"), ("dataspace", "uuid")) + + class ProductRelationStatus(BaseStatusMixin, DataspacedModel): class Meta(BaseStatusMixin.Meta): verbose_name_plural = _("product relation status") @@ -731,7 +756,7 @@ def update_license_unknown(self): product_package.update_license_unknown() def annotate_weighted_risk_score(self): - """Annotate the Queeryset with the weighted_risk_score computed value.""" + """Annotate the Queryset with the weighted_risk_score computed value.""" purpose = ProductItemPurpose.objects.filter(productpackage=OuterRef("pk")) package = Package.objects.filter(productpackages=OuterRef("pk")) diff --git a/product_portfolio/templates/product_portfolio/tables/product_list_table.html b/product_portfolio/templates/product_portfolio/tables/product_list_table.html index 1d8b1295..b69e966b 100644 --- a/product_portfolio/templates/product_portfolio/tables/product_list_table.html +++ b/product_portfolio/templates/product_portfolio/tables/product_list_table.html @@ -29,7 +29,7 @@ R {% endif %} - {% if product.is_vulnerable %} + {% if product.has_vulnerable_packages %}
  • {% include 'component_catalog/includes/vulnerability_icon_link.html' with url=product.get_absolute_url only %}
  • diff --git a/product_portfolio/tests/test_models.py b/product_portfolio/tests/test_models.py index f5f032bc..f5abe9d2 100644 --- a/product_portfolio/tests/test_models.py +++ b/product_portfolio/tests/test_models.py @@ -155,8 +155,11 @@ def test_product_model_all_packages(self): def test_product_model_get_vulnerable_packages(self): self.assertEqual(0, self.product1.get_vulnerable_packages().count()) - package1 = make_package(self.dataspace, is_vulnerable=True, risk_score=5.0) + package1 = make_package(self.dataspace) + vulnerability1 = make_vulnerability(self.dataspace, risk_score=5.0) + package1.add_affected_by(vulnerability1) make_product_package(self.product1, package1) + self.assertEqual(1, self.product1.get_vulnerable_packages().count()) self.assertEqual(0, self.product1.get_vulnerable_packages(risk_threshold=6.0).count()) self.assertEqual(1, self.product1.get_vulnerable_packages(risk_threshold=4.0).count()) @@ -600,6 +603,27 @@ def test_product_model_improve_packages_from_purldb(self, mock_update_from_purld pp1.refresh_from_db() self.assertEqual("apache-2.0", pp1.license_expression) + def test_product_model_affected_by_vulnerabilities(self): + vulnerability1 = make_vulnerability(self.dataspace, risk_score=1.0) + vulnerability2 = make_vulnerability(self.dataspace, risk_score=10.0) + vulnerability3 = make_vulnerability(self.dataspace, risk_score=5.0) + + vulnerability1.add_affected(self.product1) + affected_by = self.product1.affected_by_vulnerabilities.all() + self.assertQuerySetEqual([vulnerability1], affected_by) + self.product1.refresh_from_db() + self.assertEqual(1.0, self.product1.risk_score) + + vulnerability2.add_affected(self.product1) + affected_by = self.product1.affected_by_vulnerabilities.order_by("id") + self.assertQuerySetEqual([vulnerability1, vulnerability2], affected_by) + self.product1.refresh_from_db() + self.assertEqual(10.0, self.product1.risk_score) + + vulnerability3.add_affected(self.product1) + self.product1.refresh_from_db() + self.assertEqual(10.0, self.product1.risk_score) + def test_product_model_get_vulnerability_qs(self): package1 = make_package(self.dataspace) package2 = make_package(self.dataspace) diff --git a/product_portfolio/views.py b/product_portfolio/views.py index 9b55fa63..f20b8b56 100644 --- a/product_portfolio/views.py +++ b/product_portfolio/views.py @@ -208,7 +208,7 @@ def get_queryset(self): ) .annotate( productinventoryitem_count=Count("productinventoryitem", distinct=True), - is_vulnerable=Exists(vulnerable_productpackage_qs), + has_vulnerable_packages=Exists(vulnerable_productpackage_qs), ) .order_by( "name", diff --git a/vulnerabilities/fetch.py b/vulnerabilities/fetch.py index 961bffa3..b922127c 100644 --- a/vulnerabilities/fetch.py +++ b/vulnerabilities/fetch.py @@ -135,7 +135,7 @@ def create_or_update_vulnerability( if updated_fields: results["updated"] += 1 - vulnerability.add_affected_packages(affected_packages) + vulnerability.add_affected(affected_packages) return vulnerability diff --git a/vulnerabilities/models.py b/vulnerabilities/models.py index c50d1ea1..456c9b74 100644 --- a/vulnerabilities/models.py +++ b/vulnerabilities/models.py @@ -60,7 +60,7 @@ class Vulnerability(HistoryDateFieldsMixin, DataspacedModel): A software vulnerability with a unique identifier and alternate aliases. Adapted from the VulnerableCode models at - https://github.com/nexB/vulnerablecode/blob/main/vulnerabilities/models.py#L164 + https://github.com/nexB/vulnerablecode/blob/main/vulnerabilities/models.py Note that this model implements the HistoryDateFieldsMixin but not the HistoryUserFieldsMixin as the Vulnerability records are usually created @@ -172,31 +172,12 @@ def cve(self): return alias def add_affected(self, instances): - """ - Assign the ``instances`` (Package or Component) as affected to this - vulnerability. - """ - from component_catalog.models import Component - from component_catalog.models import Package - - if not isinstance(instances, list): + """Assign the ``instances`` (Package or Product) as affected by this vulnerability.""" + if not isinstance(instances, (list, tuple, models.QuerySet)): instances = [instances] for instance in instances: - if isinstance(instance, Package): - self.add_affected_packages([instance]) - if isinstance(instance, Component): - self.add_affected_components([instance]) - - def add_affected_packages(self, packages): - """Assign the ``packages`` as affected to this vulnerability.""" - through_defaults = {"dataspace_id": self.dataspace_id} - self.affected_packages.add(*packages, through_defaults=through_defaults) - - def add_affected_components(self, components): - """Assign the ``components`` as affected to this vulnerability.""" - through_defaults = {"dataspace_id": self.dataspace_id} - self.affected_components.add(*components, through_defaults=through_defaults) + instance.add_affected_by(vulnerability=self) @classmethod def create_from_data(cls, dataspace, data, validate=False, affecting=None): @@ -420,6 +401,21 @@ class Meta: def is_vulnerable(self): return self.affected_by_vulnerabilities.exists() + def update_risk_score(self): + """Calculate and save the maximum risk score from all affected vulnerabilities.""" + qs = self.affected_by_vulnerabilities.aggregate(models.Max("risk_score")) + max_score = qs["risk_score__max"] + + self.risk_score = max_score + self.save(update_fields=["risk_score"]) + return self.risk_score + + def add_affected_by(self, vulnerability): + """Add ``vulnerability`` as affecting this instance.""" + through_defaults = {"dataspace_id": self.dataspace_id} + self.affected_by_vulnerabilities.add(vulnerability, through_defaults=through_defaults) + self.update_risk_score() + def get_entry_for_package(self, vulnerablecode): if not self.package_url: return @@ -487,7 +483,7 @@ def create_vulnerabilities(self, vulnerabilities_data): through_defaults = {"dataspace_id": self.dataspace_id} self.affected_by_vulnerabilities.add(*vulnerabilities, through_defaults=through_defaults) - self.update(risk_score=vulnerability_data["risk_score"]) + self.update_risk_score() if isinstance(self, Package): self.productpackages.update_weighted_risk_score() diff --git a/vulnerabilities/tests/test_models.py b/vulnerabilities/tests/test_models.py index 87e528b9..bbb6053c 100644 --- a/vulnerabilities/tests/test_models.py +++ b/vulnerabilities/tests/test_models.py @@ -82,19 +82,74 @@ def test_vulnerability_mixin_create_vulnerabilities(self): response_file = self.data / "vulnerabilities" / "idna_3.6_response.json" response_json = json.loads(response_file.read_text()) vulnerabilities_data = response_json["results"][0]["affected_by_vulnerabilities"] + vulnerabilities_data.append({"vulnerability_id": "VCID-0002", "risk_score": 5.0}) package1 = make_package(self.dataspace, package_url="pkg:pypi/idna@3.6") product1 = make_product(self.dataspace, inventory=[package1]) package1.create_vulnerabilities(vulnerabilities_data) - self.assertEqual(1, Vulnerability.objects.scope(self.dataspace).count()) - self.assertEqual(1, package1.affected_by_vulnerabilities.count()) - vulnerability = package1.affected_by_vulnerabilities.get() - self.assertEqual("VCID-j3au-usaz-aaag", vulnerability.vulnerability_id) - - self.assertEqual(8.4, package1.risk_score) + self.assertEqual(2, Vulnerability.objects.scope(self.dataspace).count()) + self.assertEqual("8.4", str(package1.risk_score)) self.assertEqual("8.4", str(product1.productpackages.get().weighted_risk_score)) + def test_vulnerability_mixin_update_risk_score(self): + package1 = make_package(self.dataspace) + + # Test with no vulnerabilities + package1.update_risk_score() + self.assertIsNone(package1.risk_score) + + # Test with one vulnerability with risk score + vulnerability1 = make_vulnerability(dataspace=self.dataspace, risk_score=7.5) + vulnerability1.add_affected(package1) + package1.update_risk_score() + self.assertEqual("7.5", str(package1.risk_score)) + + # Test with multiple vulnerabilities, should use max + vulnerability2 = make_vulnerability(dataspace=self.dataspace, risk_score=9.2) + vulnerability2.add_affected(package1) + package1.update_risk_score() + self.assertEqual("9.2", str(package1.risk_score)) + + # Test with vulnerability with lower risk score, should keep max + vulnerability3 = make_vulnerability(dataspace=self.dataspace, risk_score=3.1) + vulnerability3.add_affected(package1) + package1.update_risk_score() + self.assertEqual("9.2", str(package1.risk_score)) + + # Test with all vulnerabilities having NULL risk scores + package2 = make_package(self.dataspace) + vulnerability4 = make_vulnerability(dataspace=self.dataspace, risk_score=None) + vulnerability5 = make_vulnerability(dataspace=self.dataspace, risk_score=None) + vulnerability4.add_affected(package2) + vulnerability5.add_affected(package2) + package2.update_risk_score() + self.assertIsNone(package2.risk_score) + + def test_vulnerability_mixin_add_affected_by(self): + package1 = make_package(self.dataspace) + + vulnerability1 = make_vulnerability(self.dataspace, risk_score=1.0) + vulnerability2 = make_vulnerability(self.dataspace, risk_score=10.0) + vulnerability3 = make_vulnerability(self.dataspace, risk_score=5.0) + + package1.add_affected_by(vulnerability1) + package1.refresh_from_db() + self.assertEqual("1.0", str(package1.risk_score)) + + package1.add_affected_by(vulnerability2) + package1.refresh_from_db() + self.assertEqual("10.0", str(package1.risk_score)) + + package1.add_affected_by(vulnerability3) + package1.refresh_from_db() + self.assertEqual("10.0", str(package1.risk_score)) + + self.assertEqual(package1, vulnerability1.affected_packages.get()) + self.assertEqual(package1, vulnerability2.affected_packages.get()) + self.assertEqual(package1, vulnerability3.affected_packages.get()) + self.assertEqual(3, package1.affected_by_vulnerabilities.count()) + def test_vulnerability_model_affected_packages_m2m(self): package1 = make_package(self.dataspace) vulnerability1 = make_vulnerability(dataspace=self.dataspace, affecting=package1)