From 052c1260d7d78681eb23f3b200b270a035a39160 Mon Sep 17 00:00:00 2001 From: Bastian Blank Date: Thu, 30 Nov 2023 09:44:52 +0100 Subject: [PATCH 1/3] Provide info if CPE is Debian related --- src/glvd/data/cpe.py | 32 +++++++++++++++++++++----------- tests/data/test_cpe.py | 12 ++++++++++++ 2 files changed, 33 insertions(+), 11 deletions(-) diff --git a/src/glvd/data/cpe.py b/src/glvd/data/cpe.py index c46c6fc..b1d271d 100644 --- a/src/glvd/data/cpe.py +++ b/src/glvd/data/cpe.py @@ -25,28 +25,29 @@ class CpeOtherDebian: deb_version: str | None = dataclasses.field(default=None) @classmethod - def parse(cls, cpe: Cpe) -> str | CpeOtherDebian | CpeAny | None: - if not (cpe.part, cpe.vendor, cpe.product) in [ - (CpePart.OS, 'debian', 'debian_linux'), - (CpePart.OS, 'sap', 'gardenlinux'), - ] or not isinstance(cpe.other, str): - return cpe.other - + def parse(cls, i: str) -> str | CpeOtherDebian | CpeAny | None: try: kw: dict[str, str | None] = {} - for f in cpe.other.split(','): + for f in i.split(','): k, v = f.split('=', 1) kw[k] = v return cls(**kw) except ValueError: - return cpe.other + return cls() def __str__(self) -> str: m: list[str] = [] + allany: bool = True + for field in dataclasses.fields(self): if j := getattr(self, field.name): m.append(f'{field.name}={j}') - return ','.join(m) + allany = False + + if allany: + return '*' + else: + return ','.join(m) @dataclasses.dataclass @@ -64,6 +65,7 @@ class Cpe: target_sw: str | CpeAny | None = dataclasses.field(default=ANY) target_hw: str | CpeAny | None = dataclasses.field(default=ANY) other: str | CpeOtherDebian | CpeAny | None = dataclasses.field(default=ANY) + is_debian: bool = dataclasses.field(init=False, default=False) __re = re.compile(r''' ^cpe:2.3: @@ -95,7 +97,15 @@ class Cpe: __re_unquote = re.compile(r'''\\([!"#$%&'()+,/:;<=>@[\]^`{|}~])''') def __post_init__(self) -> None: - self.other = CpeOtherDebian.parse(self) + if (self.part, self.vendor, self.product) in [ + (CpePart.OS, 'debian', 'debian_linux'), + (CpePart.OS, 'sap', 'gardenlinux'), + ]: + self.is_debian = True + if isinstance(self.other, str): + self.other = CpeOtherDebian.parse(self.other) + elif self.other is self.ANY: + self.other = CpeOtherDebian() @classmethod def _parse_one(cls, field: dataclasses.Field, v: str, /) -> Any: diff --git a/tests/data/test_cpe.py b/tests/data/test_cpe.py index 6f6cebb..241610f 100644 --- a/tests/data/test_cpe.py +++ b/tests/data/test_cpe.py @@ -17,6 +17,7 @@ def test_init(self): assert c.target_sw is Cpe.ANY assert c.target_hw is Cpe.ANY assert c.other is Cpe.ANY + assert c.is_debian is False def test_parse(self): s = r'cpe:2.3:h:a:b:c\:\%\*\;c:d:*:-:-:-:*:*' @@ -32,6 +33,7 @@ def test_parse(self): assert c.target_sw is None assert c.target_hw is Cpe.ANY assert c.other is Cpe.ANY + assert c.is_debian is False assert str(c) == s def test_debian(self): @@ -49,4 +51,14 @@ def test_debian(self): assert c.target_hw is Cpe.ANY assert c.other.deb_source == 'hello' assert c.other.deb_version == '1' + assert c.is_debian is True + assert str(c) == s + + def test_debian_any(self): + s = r'cpe:2.3:o:debian:debian_linux:12:d:*:*:*:*:*:*' + c = Cpe.parse(s) + + assert c.other.deb_source is None + assert c.other.deb_version is None + assert c.is_debian is True assert str(c) == s From 4cba7d2140087f0fc1641be5f73dddba123501e4 Mon Sep 17 00:00:00 2001 From: Bastian Blank Date: Thu, 30 Nov 2023 11:18:02 +0100 Subject: [PATCH 2/3] Search CVE by Debian dist and source --- src/glvd/web/nvd.py | 60 +++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 55 insertions(+), 5 deletions(-) diff --git a/src/glvd/web/nvd.py b/src/glvd/web/nvd.py index 57bc230..34c1045 100644 --- a/src/glvd/web/nvd.py +++ b/src/glvd/web/nvd.py @@ -6,20 +6,60 @@ text, ) +from ..data.cpe import Cpe + bp = Blueprint('nvd', __name__) # XXX: Can we replace that with a view, which combines data and data_configurations in the database? +stmt_cve_deb_cpe = ( + text(''' + SELECT + nvd_cve.data, + array_to_json( + array_remove( + array_agg(deb_cve.data_cpe_match), + NULL + ) + ) AS data_cpe_matches + FROM + nvd_cve + LEFT OUTER JOIN deb_cve USING (cve_id) + INNER JOIN dist_cpe ON (deb_cve.dist_id = dist_cpe.id) + WHERE + dist_cpe.cpe_vendor = :cpe_vendor AND + dist_cpe.cpe_product = :cpe_product AND + dist_cpe.cpe_version = :cpe_version AND + deb_cve.deb_source LIKE :deb_source AND + deb_cve.debsec_vulnerable = TRUE + GROUP BY + nvd_cve.cve_id + ''') + .bindparams( + bindparam('cpe_vendor'), + bindparam('cpe_product'), + bindparam('cpe_version'), + bindparam('deb_source'), + ) +) + stmt_cve_deb_cve_id = ( text(''' SELECT - nvd_cve.data - , array_to_json(array_remove(array_agg(deb_cve.data_cpe_match), NULL)) AS data_cpe_matchess + nvd_cve.data, + array_to_json( + array_remove( + array_agg(deb_cve.data_cpe_match), + NULL + ) + ) AS data_cpe_matches FROM nvd_cve LEFT OUTER JOIN deb_cve USING (cve_id) - WHERE cve_id = :cve_id - GROUP BY (nvd_cve.data) + WHERE + cve_id = :cve_id + GROUP BY + nvd_cve.cve_id ''') .bindparams( bindparam('cve_id'), @@ -29,7 +69,17 @@ @bp.route('/rest/json/cves/2.0+deb') async def nvd_cve_deb(): - if cve_id := request.args.get('cveId', type=str): + if cpe_name := request.args.get('cpeName', type=str): + cpe = Cpe.parse(cpe_name) + if not cpe.is_debian: + return 'Not Debian related CPE', 400 + stmt = stmt_cve_deb_cpe.bindparams( + cpe_vendor=cpe.vendor, + cpe_product=cpe.product, + cpe_version=cpe.version, + deb_source=cpe.other.deb_source or '%', + ) + elif cve_id := request.args.get('cveId', type=str): stmt = stmt_cve_deb_cve_id.bindparams(cve_id=cve_id) async with current_app.db_begin() as conn: From 477624404f0b038d288a095139f56ff69aa0c196 Mon Sep 17 00:00:00 2001 From: Bastian Blank Date: Thu, 30 Nov 2023 13:59:37 +0100 Subject: [PATCH 3/3] Make search actually fast enough --- src/glvd/database/__init__.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/src/glvd/database/__init__.py b/src/glvd/database/__init__.py index c111017..6514cfa 100644 --- a/src/glvd/database/__init__.py +++ b/src/glvd/database/__init__.py @@ -9,7 +9,10 @@ Self, ) -from sqlalchemy import ForeignKey +from sqlalchemy import ( + ForeignKey, + Index, +) from sqlalchemy.orm import ( DeclarativeBase, Mapped, @@ -94,11 +97,21 @@ class DebCve(Base): last_mod: Mapped[datetime] = mapped_column(init=False, server_default=func.now(), onupdate=func.now()) deb_source: Mapped[str] = mapped_column(primary_key=True) deb_version: Mapped[str] = mapped_column(DebVersion) - debsec_vulnerable: Mapped[bool] + debsec_vulnerable: Mapped[bool] = mapped_column() data_cpe_match: Mapped[Any] dist: Mapped[Optional[DistCpe]] = relationship(lazy='selectin', default=None) + __table_args__ = ( + Index( + 'deb_cve_search', + dist_id, + debsec_vulnerable, + deb_source, + deb_version, + ), + ) + def merge(self, other: Self) -> None: self.deb_version = other.deb_version self.debsec_vulnerable = other.debsec_vulnerable