From 5eeed09bfc016c9e75f23295b7cb929cacd4a016 Mon Sep 17 00:00:00 2001 From: Bastian Blank Date: Wed, 13 Dec 2023 09:43:40 +0100 Subject: [PATCH] Store CVSS severity in combined deb information --- src/glvd/cli/combine_deb.py | 55 +++++++++++++++++++++----- src/glvd/data/cvss.py | 29 ++++++++++++++ src/glvd/database/__init__.py | 9 ++++- src/glvd/database/types.py | 23 ++++++++++- tests/cli/test_combine_deb.py | 74 ++++++++++++++++++++++++++++++++--- tests/data/test_cvss.py | 24 ++++++++++++ tests/web/test_v1_cves.py | 2 + 7 files changed, 198 insertions(+), 18 deletions(-) create mode 100644 src/glvd/data/cvss.py create mode 100644 tests/data/test_cvss.py diff --git a/src/glvd/cli/combine_deb.py b/src/glvd/cli/combine_deb.py index 33e0315..1530eea 100644 --- a/src/glvd/cli/combine_deb.py +++ b/src/glvd/cli/combine_deb.py @@ -4,6 +4,7 @@ import logging from typing import ( + Any, AsyncGenerator, ) @@ -22,6 +23,7 @@ from ..database import Base, DistCpe, DebCve from ..data.cpe import Cpe, CpeOtherDebian +from ..data.cvss import CvssSeverity logger = logging.getLogger(__name__) @@ -31,17 +33,20 @@ class CombineDeb: stmt_combine_new = ( text(''' SELECT - cve.cve_id - , src.deb_source - , src.deb_version - , cve.deb_version_fixed - , COALESCE(src.deb_version < cve.deb_version_fixed, TRUE) AS debsec_vulnerable + debsec_cve.cve_id + , nvd_cve.data + , debsrc.deb_source + , debsrc.deb_version + , debsec_cve.deb_version_fixed + , COALESCE(debsrc.deb_version < debsec_cve.deb_version_fixed, TRUE) AS debsec_vulnerable + , debsec_note FROM - debsrc as src - LEFT OUTER JOIN debsec_cve AS cve ON src.deb_source = cve.deb_source + debsrc + LEFT OUTER JOIN debsec_cve ON debsec_cve.deb_source = debsrc.deb_source + INNER JOIN nvd_cve ON nvd_cve.cve_id = debsec_cve.cve_id WHERE - src.dist_id = :dist_id - AND cve.dist_id = ANY(:dists_fallback_id) + debsrc.dist_id = :dist_id + AND debsec_cve.dist_id = ANY(:dists_fallback_id) ''') .bindparams( bindparam('dist_id'), @@ -49,6 +54,21 @@ class CombineDeb: ) ) + def extract_cvss_severity( + self, + entry: Any, + ) -> CvssSeverity | None: + if metrics := entry.get('metrics'): + for i in ('cvssMetricV31', 'cvssMetricV30'): + if metrics_single := metrics.get(i): + metrics_primary = [i for i in metrics_single if i.get('type', None) == 'Primary'] + if metrics_primary and (severity := metrics_primary[0].get('cvssData', {}).get('baseSeverity')): + try: + return CvssSeverity[severity] + except KeyError: + return None + return None + async def combine_dists( self, session: AsyncSession, @@ -120,7 +140,19 @@ async def combine( 'dist_id': dist.id, 'dists_fallback_id': [i.id for i in dists_fallback], }): - cve_id, deb_source, deb_version, deb_version_fixed, debsec_vulnerable = r + ( + cve_id, + nvd_data, + deb_source, + deb_version, + deb_version_fixed, + debsec_vulnerable, + debsec_note + ) = r + + cvss_severity = self.extract_cvss_severity(nvd_data) + if debsec_note == 'unimportant': + cvss_severity = CvssSeverity.UNIMPORTANT cpe = Cpe( part=Cpe.PART.OS, @@ -138,12 +170,15 @@ async def combine( 'vulnerable': debsec_vulnerable, } + if cvss_severity: + cpe_match['deb']['cvssSeverity'] = cvss_severity.name if deb_version_fixed: cpe_match['deb']['versionEndExcluding'] = deb_version_fixed new_entries[(cve_id, deb_source)] = DebCve( dist=dist, cve_id=cve_id, + cvss_severity=cvss_severity, deb_source=deb_source, deb_version=deb_version, deb_version_fixed=deb_version_fixed, diff --git a/src/glvd/data/cvss.py b/src/glvd/data/cvss.py new file mode 100644 index 0000000..ed5b2ab --- /dev/null +++ b/src/glvd/data/cvss.py @@ -0,0 +1,29 @@ +# SPDX-License-Identifier: MIT + +from __future__ import annotations + +import enum + + +@enum.verify(enum.UNIQUE) +class CvssSeverity(enum.Enum): + NONE = 0 + UNIMPORTANT = 1 + LOW = 2 + MEDIUM = 3 + HIGH = 4 + CRITICAL = 5 + + @classmethod + def from_score(cls, score: int) -> CvssSeverity: + if score < 0 or score > 10: + raise ValueError + if score >= 9: + return cls.CRITICAL + if score >= 7: + return cls.HIGH + if score >= 4: + return cls.MEDIUM + if score > 0: + return cls.LOW + return cls.NONE diff --git a/src/glvd/database/__init__.py b/src/glvd/database/__init__.py index f69fd36..0bd8b1a 100644 --- a/src/glvd/database/__init__.py +++ b/src/glvd/database/__init__.py @@ -27,7 +27,11 @@ Text, ) -from .types import DebVersion +from ..data.cvss import CvssSeverity +from .types import ( + CvssSeverityType, + DebVersion, +) class Base(MappedAsDataclass, DeclarativeBase): @@ -35,6 +39,7 @@ class Base(MappedAsDataclass, DeclarativeBase): str: Text, datetime: DateTime(timezone=True), Any: JSON, + CvssSeverity: CvssSeverityType, } @@ -95,6 +100,7 @@ class DebCve(Base): dist_id = mapped_column(ForeignKey(DistCpe.id), primary_key=True) cve_id: Mapped[str] = mapped_column(primary_key=True) last_mod: Mapped[datetime] = mapped_column(init=False, server_default=func.now(), onupdate=func.now()) + cvss_severity: Mapped[Optional[CvssSeverity]] = mapped_column() deb_source: Mapped[str] = mapped_column(primary_key=True) deb_version: Mapped[str] = mapped_column(DebVersion) deb_version_fixed: Mapped[Optional[str]] = mapped_column(DebVersion) @@ -114,6 +120,7 @@ class DebCve(Base): ) def merge(self, other: Self) -> None: + self.cvss_severity = other.cvss_severity self.deb_version = other.deb_version self.deb_version_fixed = other.deb_version_fixed self.debsec_vulnerable = other.debsec_vulnerable diff --git a/src/glvd/database/types.py b/src/glvd/database/types.py index 1497fe5..98827dd 100644 --- a/src/glvd/database/types.py +++ b/src/glvd/database/types.py @@ -2,7 +2,28 @@ from __future__ import annotations -from sqlalchemy.types import UserDefinedType +from sqlalchemy.types import ( + INT, + TypeDecorator, + UserDefinedType, +) + +from ..data.cvss import CvssSeverity + + +class CvssSeverityType(TypeDecorator): + cache_ok = True + impl = INT + + def process_bind_param(self, value: CvssSeverity | None, dialect) -> int | None: + if value is not None: + return value.value + return None + + def process_result_value(self, value: int | None, dialect) -> CvssSeverity | None: + if value is not None: + return CvssSeverity(value) + return None class DebVersion(UserDefinedType): diff --git a/tests/cli/test_combine_deb.py b/tests/cli/test_combine_deb.py index b130124..1c1a715 100644 --- a/tests/cli/test_combine_deb.py +++ b/tests/cli/test_combine_deb.py @@ -1,20 +1,24 @@ # SPDX-License-Identifier: MIT +from datetime import datetime + from sqlalchemy import select from glvd.cli.combine_deb import CombineDeb +from glvd.data.cvss import CvssSeverity from glvd.data.dist_cpe import DistCpeMapper -from glvd.database import DistCpe, DebCve, DebsecCve, Debsrc +from glvd.database import DistCpe, DebCve, DebsecCve, Debsrc, NvdCve class TestIngestDebsrc: dist_mapper = DistCpeMapper.new('debian') - def debsec_cve(self, *, deb_version_fixed: str, dist: DistCpe) -> DebsecCve: + def debsec_cve(self, *, deb_version_fixed: str, debsec_note: str | None, dist: DistCpe) -> DebsecCve: return DebsecCve( cve_id='TEST-1', deb_source='test', deb_version_fixed=deb_version_fixed, + debsec_note=debsec_note, dist=dist, ) @@ -25,12 +29,29 @@ def debsrc(self, *, dist: DistCpe) -> Debsrc: dist=dist, ) + def nvd_cve(self) -> NvdCve: + return NvdCve( + cve_id='TEST-1', + last_mod=datetime.now(), + data={ + 'metrics': { + 'cvssMetricV31': [{ + 'type': 'Primary', + 'cvssData': { + 'baseSeverity': 'MEDIUM', + }, + }], + }, + }, + ) + async def test_combine_base(self, db_session): dist_test = self.dist_mapper('trixie') db_session.add(dist_test) - db_session.add(self.debsec_cve(deb_version_fixed='1', dist=dist_test)) + db_session.add(self.debsec_cve(deb_version_fixed='1', debsec_note=None, dist=dist_test)) db_session.add(self.debsrc(dist=dist_test)) + db_session.add(self.nvd_cve()) await db_session.flush() combine = CombineDeb() @@ -40,12 +61,14 @@ async def test_combine_base(self, db_session): assert len(r) == 1 t = r.pop(0)[0] assert t.dist == dist_test + assert t.cvss_severity is CvssSeverity.MEDIUM assert t.deb_source == 'test' assert t.deb_version == '1' assert t.debsec_vulnerable is False assert t.data_cpe_match == { 'criteria': r'cpe:2.3:o:debian:debian_linux:13:*:*:*:*:*:*:deb_source\=test', 'deb': { + 'cvssSeverity': 'MEDIUM', 'versionEndExcluding': '1', 'versionLatest': '1', }, @@ -56,8 +79,9 @@ async def test_combine_base_vulnerable(self, db_session): dist_test = self.dist_mapper('trixie') db_session.add(dist_test) - db_session.add(self.debsec_cve(deb_version_fixed='2', dist=dist_test)) + db_session.add(self.debsec_cve(deb_version_fixed='2', debsec_note=None, dist=dist_test)) db_session.add(self.debsrc(dist=dist_test)) + db_session.add(self.nvd_cve()) await db_session.flush() combine = CombineDeb() @@ -67,12 +91,14 @@ async def test_combine_base_vulnerable(self, db_session): assert len(r) == 1 t = r.pop(0)[0] assert t.dist == dist_test + assert t.cvss_severity is CvssSeverity.MEDIUM assert t.deb_source == 'test' assert t.deb_version == '1' assert t.debsec_vulnerable is True assert t.data_cpe_match == { 'criteria': r'cpe:2.3:o:debian:debian_linux:13:*:*:*:*:*:*:deb_source\=test', 'deb': { + 'cvssSeverity': 'MEDIUM', 'versionEndExcluding': '2', 'versionLatest': '1', }, @@ -84,8 +110,9 @@ async def test_combine_fallback(self, db_session): dist_test = self.dist_mapper('trixie') db_session.add(dist_test) - db_session.add(self.debsec_cve(deb_version_fixed='1', dist=dist_fallback)) + db_session.add(self.debsec_cve(deb_version_fixed='1', debsec_note=None, dist=dist_fallback)) db_session.add(self.debsrc(dist=dist_test)) + db_session.add(self.nvd_cve()) await db_session.flush() combine = CombineDeb() @@ -95,12 +122,14 @@ async def test_combine_fallback(self, db_session): assert len(r) == 1 t = r.pop(0)[0] assert t.dist == dist_test + assert t.cvss_severity is CvssSeverity.MEDIUM assert t.deb_source == 'test' assert t.deb_version == '1' assert t.debsec_vulnerable is False assert t.data_cpe_match == { 'criteria': r'cpe:2.3:o:debian:debian_linux:13:*:*:*:*:*:*:deb_source\=test', 'deb': { + 'cvssSeverity': 'MEDIUM', 'versionEndExcluding': '1', 'versionLatest': '1', }, @@ -112,8 +141,9 @@ async def test_combine_fallback_vulnerable(self, db_session): dist_test = self.dist_mapper('trixie') db_session.add(dist_test) - db_session.add(self.debsec_cve(deb_version_fixed='2', dist=dist_fallback)) + db_session.add(self.debsec_cve(deb_version_fixed='2', debsec_note=None, dist=dist_fallback)) db_session.add(self.debsrc(dist=dist_test)) + db_session.add(self.nvd_cve()) await db_session.flush() combine = CombineDeb() @@ -123,14 +153,46 @@ async def test_combine_fallback_vulnerable(self, db_session): assert len(r) == 1 t = r.pop(0)[0] assert t.dist == dist_test + assert t.cvss_severity is CvssSeverity.MEDIUM assert t.deb_source == 'test' assert t.deb_version == '1' assert t.debsec_vulnerable is True assert t.data_cpe_match == { 'criteria': r'cpe:2.3:o:debian:debian_linux:13:*:*:*:*:*:*:deb_source\=test', 'deb': { + 'cvssSeverity': 'MEDIUM', 'versionEndExcluding': '2', 'versionLatest': '1', }, 'vulnerable': True, } + + async def test_combine_unimportant(self, db_session): + dist_test = self.dist_mapper('trixie') + + db_session.add(dist_test) + db_session.add(self.debsec_cve(deb_version_fixed='1', debsec_note='unimportant', dist=dist_test)) + db_session.add(self.debsrc(dist=dist_test)) + db_session.add(self.nvd_cve()) + await db_session.flush() + + combine = CombineDeb() + await combine.combine(db_session) + + r = (await db_session.execute(select(DebCve).order_by(DebCve.deb_source))).all() + assert len(r) == 1 + t = r.pop(0)[0] + assert t.dist == dist_test + assert t.cvss_severity is CvssSeverity.UNIMPORTANT + assert t.deb_source == 'test' + assert t.deb_version == '1' + assert t.debsec_vulnerable is False + assert t.data_cpe_match == { + 'criteria': r'cpe:2.3:o:debian:debian_linux:13:*:*:*:*:*:*:deb_source\=test', + 'deb': { + 'cvssSeverity': 'UNIMPORTANT', + 'versionEndExcluding': '1', + 'versionLatest': '1', + }, + 'vulnerable': False, + } diff --git a/tests/data/test_cvss.py b/tests/data/test_cvss.py new file mode 100644 index 0000000..41a8f66 --- /dev/null +++ b/tests/data/test_cvss.py @@ -0,0 +1,24 @@ +# SPDX-License-Identifier: MIT + +import pytest + +from glvd.data.cvss import CvssSeverity + + +class TestCvssSeverity: + @pytest.mark.parametrize('score,value', [ + (0, CvssSeverity.NONE), + (1, CvssSeverity.LOW), + (4, CvssSeverity.MEDIUM), + (7, CvssSeverity.HIGH), + (9, CvssSeverity.CRITICAL), + (10, CvssSeverity.CRITICAL), + ]) + def test_from_score(self, score, value): + a = CvssSeverity.from_score(score) + assert a is value + + @pytest.mark.parametrize('score', [-0.1, 10.1]) + def test_from_score_fail(self, score): + with pytest.raises(ValueError): + CvssSeverity.from_score(score) diff --git a/tests/web/test_v1_cves.py b/tests/web/test_v1_cves.py index 6fedf9b..b739463 100644 --- a/tests/web/test_v1_cves.py +++ b/tests/web/test_v1_cves.py @@ -55,6 +55,7 @@ async def setup_example(self, db_session_class): db_session_class.add(self.dist) db_session_class.add(DebCve( cve_id='TEST-fixed', + cvss_severity=None, deb_source='test', deb_version='1', deb_version_fixed='1', @@ -64,6 +65,7 @@ async def setup_example(self, db_session_class): )) db_session_class.add(DebCve( cve_id='TEST-vuln', + cvss_severity=None, deb_source='test', deb_version='1', deb_version_fixed='2',