From 38ba4ff3152a4f0e697858a9327bec76e5f7d1ac Mon Sep 17 00:00:00 2001 From: Bastian Blank Date: Tue, 19 Dec 2023 10:28:56 +0100 Subject: [PATCH 1/2] Add client tool to get CVE for installed packages --- pyproject.toml | 1 + src/glvd/cli/client/__main__.py | 1 + src/glvd/cli/client/cve_apt.py | 91 +++++++++++++++++++++++++++++++++ 3 files changed, 93 insertions(+) create mode 100644 src/glvd/cli/client/cve_apt.py diff --git a/pyproject.toml b/pyproject.toml index 2978eeb..bb0d190 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,6 +35,7 @@ asyncio_mode = "auto" [[tool.mypy.overrides]] module = [ + "apt", "requests", "requests.adapters", "urllib3" diff --git a/src/glvd/cli/client/__main__.py b/src/glvd/cli/client/__main__.py index 1529d4b..cdd59bd 100644 --- a/src/glvd/cli/client/__main__.py +++ b/src/glvd/cli/client/__main__.py @@ -7,6 +7,7 @@ # Import to register all the commands from . import ( # noqa: F401 cve, + cve_apt, ) diff --git a/src/glvd/cli/client/cve_apt.py b/src/glvd/cli/client/cve_apt.py new file mode 100644 index 0000000..ac1e995 --- /dev/null +++ b/src/glvd/cli/client/cve_apt.py @@ -0,0 +1,91 @@ +# SPDX-License-Identifier: MIT + +from __future__ import annotations + +import argparse +import json +import logging +import sys +import urllib.parse + +from glvd.data.cvss import CvssSeverity +from glvd.util import requests +from . import cli + + +logger = logging.getLogger(__name__) + + +class ClientCveApt: + server: str + cvss3_severity_min: CvssSeverity + + @staticmethod + @cli.register( + 'cve-apt', + arguments=[ + cli.prepare_argument( + '--cvss3-severity-min', + choices=[i.name for i in CvssSeverity if i != CvssSeverity.NONE], + default='LOW', + help='only return CVE with at least this CVSS severity', + ), + ], + ) + def run(*, argparser: argparse.ArgumentParser, cvss3_severity_min: str, server: str, debug: bool) -> None: + logging.basicConfig(level=debug and logging.DEBUG or logging.INFO) + + # Python-Apt is no PYPI reachable dependency, it only exists on Debian systems as package. + try: + import apt # noqa: F401 + except ImportError: + argparser.error('please install python3-apt') + + ClientCveApt(server, CvssSeverity[cvss3_severity_min])() + + def __init__(self, server: str, cvss3_severity_min: CvssSeverity) -> None: + self.server = server + self.cvss3_severity_min = cvss3_severity_min + + def get_sources(self) -> set[tuple[str, str, str, str]]: + import apt + + ret = set() + + for pkg in apt.Cache(): + if inst := pkg.installed: + for origin in inst.origins: + if origin.origin: + ret.add((origin.origin, origin.codename, inst.source_name, inst.source_version)) + + return ret + + def request_data(self) -> dict[str, list[str]]: + return { + 'source[]': [ + '_'.join(i) + for i in sorted(self.get_sources()) + ], + } + + def request_params(self) -> dict[str, str]: + return { + 'cvssV3SeverityMin': self.cvss3_severity_min.name, + } + + def __call__(self) -> None: + with requests.RetrySession() as rsession: + resp = rsession.post( + urllib.parse.urljoin(self.server, 'v1/cves/findBySources'), + params=self.request_params(), + data=self.request_data(), + ) + if resp.status_code == 200: + data = resp.json() + json.dump(data, sys.stdout, indent=2) + else: + resp.raise_for_status() + + +if __name__ == '__main__': + ClientCveApt.run() From f52b296b656472559b46759fef66708e8d895fdf Mon Sep 17 00:00:00 2001 From: Bastian Blank Date: Tue, 19 Dec 2023 10:29:06 +0100 Subject: [PATCH 2/2] Ignore unknown distributions on source search --- src/glvd/web/v1_cves.py | 35 ++++++++++++++++++++--------------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/src/glvd/web/v1_cves.py b/src/glvd/web/v1_cves.py index 9c73d98..6e9dfb9 100644 --- a/src/glvd/web/v1_cves.py +++ b/src/glvd/web/v1_cves.py @@ -184,22 +184,27 @@ async def get_sources() -> tuple[Any, int, dict[str, str]]: # can remove the enormous time spent on compiling queries stmts_source = [] for i, j in source_by_dist.items(): - dist = DistCpeMapper.new(i[0])(i[1]) - dist_id = (await conn.execute( - sa.select(DistCpe.id) - .where(DistCpe.cpe_vendor == dist.cpe_vendor) - .where(DistCpe.cpe_product == dist.cpe_product) - .where(DistCpe.cpe_version == dist.cpe_version) - )).one()[0] - - for source, version in j: - stmts_source.append( - sa.select( - sa.literal(dist_id).label('dist_id'), - sa.literal(source).label('deb_source'), - sa.cast(sa.literal(version), DebVersion).label('deb_version'), + try: + dist = DistCpeMapper.new(i[0].lower())(i[1].lower()) + except KeyError: + # XXX: How to handle unknown distributions? + pass + else: + dist_id = (await conn.execute( + sa.select(DistCpe.id) + .where(DistCpe.cpe_vendor == dist.cpe_vendor) + .where(DistCpe.cpe_product == dist.cpe_product) + .where(DistCpe.cpe_version == dist.cpe_version) + )).one()[0] + + for source, version in j: + stmts_source.append( + sa.select( + sa.literal(dist_id).label('dist_id'), + sa.literal(source).label('deb_source'), + sa.cast(sa.literal(version), DebVersion).label('deb_version'), + ) ) - ) # If we found no source at all if not stmts_source: