From b71895298e476d50a4b9bf9d51378fd59609525f Mon Sep 17 00:00:00 2001 From: Bastian Blank Date: Mon, 27 Nov 2023 12:20:54 +0100 Subject: [PATCH] Factor out the cpe matcher --- src/glvd/cli/ingest_debsec.py | 60 ++--------------------------- src/glvd/cli/ingest_debsrc.py | 60 ++--------------------------- src/glvd/data/dist_cpe.py | 71 +++++++++++++++++++++++++++++++++++ tests/data/test_dist_cpe.py | 58 ++++++++++++++++++++++++++++ 4 files changed, 137 insertions(+), 112 deletions(-) create mode 100644 src/glvd/data/dist_cpe.py create mode 100644 tests/data/test_dist_cpe.py diff --git a/src/glvd/cli/ingest_debsec.py b/src/glvd/cli/ingest_debsec.py index 68f54a6..33efbbd 100644 --- a/src/glvd/cli/ingest_debsec.py +++ b/src/glvd/cli/ingest_debsec.py @@ -16,69 +16,17 @@ from ..database import Base, DistCpe, DebsecCve from ..data.debsec_cve import DebsecCveFile +from ..data.dist_cpe import DistCpeMapper logger = logging.getLogger(__name__) class IngestDebsec: - class DistCpeMapper: - cpe_vendor: str - cpe_product: str - - def __call__(self, codename: str) -> DistCpe: - raise NotImplementedError - - # XXX: Move mapper somewhere else - class DistCpeMapperDebian(DistCpeMapper): - cpe_vendor = 'debian' - cpe_product = 'debian_linux' - - def __call__(self, codename: str) -> DistCpe: - version: str = { - 'woody': '3.0', - 'sarge': '3.1', - 'etch': '4.0', - 'lenny': '5.0', - 'squeeze': '6.0', - 'wheezy': '7', - 'jessie': '8', - 'stretch': '9', - 'buster': '10', - 'bullseye': '11', - 'bookworm': '12', - 'trixie': '13', - 'forky': '14', - '': '', - }[codename] - return DistCpe( - cpe_vendor='debian', - cpe_product='debian_linux', - cpe_version=version, - deb_codename=codename, - ) - - class DistCpeMapperGardenlinux(DistCpeMapper): - cpe_vendor = 'sap' - cpe_product = 'gardenlinux' - - def __call__(self, codename: str) -> DistCpe: - return DistCpe( - cpe_vendor='sap', - cpe_product='gardenlinux', - cpe_version=codename, - deb_codename=codename, - ) - - dist_cpe_mapper: dict[str, DistCpeMapper] = { - 'debian': DistCpeMapperDebian(), - 'gardenlinux': DistCpeMapperGardenlinux(), - } - def __init__(self, cpe_product: str, path: Path) -> None: self.path = path - self.dist_base = self.dist_cpe_mapper[cpe_product] + self.dist_base = DistCpeMapper.new(cpe_product) def read_cve(self) -> DebsecCveFile: r = DebsecCveFile() @@ -172,8 +120,8 @@ async def __call__( parser = argparse.ArgumentParser() parser.add_argument( 'cpe_product', - choices=sorted(IngestDebsec.dist_cpe_mapper.keys()), - help=f'CPE product used for data, supported: {" ".join(sorted(IngestDebsec.dist_cpe_mapper.keys()))}', + choices=sorted(DistCpeMapper.keys()), + help=f'CPE product used for data, supported: {" ".join(sorted(DistCpeMapper.keys()))}', metavar='CPE_PRODUCT', ) parser.add_argument( diff --git a/src/glvd/cli/ingest_debsrc.py b/src/glvd/cli/ingest_debsrc.py index 89fa642..6cb2eb5 100644 --- a/src/glvd/cli/ingest_debsrc.py +++ b/src/glvd/cli/ingest_debsrc.py @@ -16,69 +16,17 @@ from ..database import Base, DistCpe, Debsrc from ..data.debsrc import DebsrcFile +from ..data.dist_cpe import DistCpeMapper logger = logging.getLogger(__name__) class IngestDebsrc: - class DistCpeMapper: - cpe_vendor: str - cpe_product: str - - def __call__(self, codename: str) -> DistCpe: - raise NotImplementedError - - # XXX: Move mapper somewhere else - class DistCpeMapperDebian(DistCpeMapper): - cpe_vendor = 'debian' - cpe_product = 'debian_linux' - - def __call__(self, codename: str) -> DistCpe: - version: str = { - 'woody': '3.0', - 'sarge': '3.1', - 'etch': '4.0', - 'lenny': '5.0', - 'squeeze': '6.0', - 'wheezy': '7', - 'jessie': '8', - 'stretch': '9', - 'buster': '10', - 'bullseye': '11', - 'bookworm': '12', - 'trixie': '13', - 'forky': '14', - '': '', - }[codename] - return DistCpe( - cpe_vendor='debian', - cpe_product='debian_linux', - cpe_version=version, - deb_codename=codename, - ) - - class DistCpeMapperGardenlinux(DistCpeMapper): - cpe_vendor = 'sap' - cpe_product = 'gardenlinux' - - def __call__(self, codename: str) -> DistCpe: - return DistCpe( - cpe_vendor='sap', - cpe_product='gardenlinux', - cpe_version=codename, - deb_codename=codename, - ) - - dist_cpe_mapper: dict[str, DistCpeMapper] = { - 'debian': DistCpeMapperDebian(), - 'gardenlinux': DistCpeMapperGardenlinux(), - } - def __init__(self, cpe_product: str, deb_codename: str, file: Path) -> None: self.file = file - self.dist = self.dist_cpe_mapper[cpe_product](deb_codename) + self.dist = DistCpeMapper.new(cpe_product)(deb_codename) def read(self) -> DebsrcFile: r = DebsrcFile() @@ -160,8 +108,8 @@ async def __call__( parser = argparse.ArgumentParser() parser.add_argument( 'cpe_product', - choices=sorted(IngestDebsrc.dist_cpe_mapper.keys()), - help=f'CPE product used for data, supported: {" ".join(sorted(IngestDebsrc.dist_cpe_mapper.keys()))}', + choices=sorted(DistCpeMapper.keys()), + help=f'CPE product used for data, supported: {" ".join(sorted(DistCpeMapper.keys()))}', metavar='CPE_PRODUCT', ) parser.add_argument( diff --git a/src/glvd/data/dist_cpe.py b/src/glvd/data/dist_cpe.py new file mode 100644 index 0000000..0ce69aa --- /dev/null +++ b/src/glvd/data/dist_cpe.py @@ -0,0 +1,71 @@ +# SPDX-License-Identifier: MIT + +from __future__ import annotations + +from collections.abc import Collection + +from ..database import DistCpe + + +class DistCpeMapper: + cpe_vendor: str + cpe_product: str + + def __call__(self, codename: str) -> DistCpe: + raise NotImplementedError + + @staticmethod + def keys() -> Collection[str]: + return { + 'debian', + 'gardenlinux', + } + + @staticmethod + def new(match: str) -> DistCpeMapper: + return { + 'debian': DistCpeMapperDebian, + 'gardenlinux': DistCpeMapperGardenlinux, + }[match]() + + +class DistCpeMapperDebian(DistCpeMapper): + cpe_vendor = 'debian' + cpe_product = 'debian_linux' + + def __call__(self, codename: str) -> DistCpe: + version: str = { + 'woody': '3.0', + 'sarge': '3.1', + 'etch': '4.0', + 'lenny': '5.0', + 'squeeze': '6.0', + 'wheezy': '7', + 'jessie': '8', + 'stretch': '9', + 'buster': '10', + 'bullseye': '11', + 'bookworm': '12', + 'trixie': '13', + 'forky': '14', + '': '', + }[codename] + return DistCpe( + cpe_vendor='debian', + cpe_product='debian_linux', + cpe_version=version, + deb_codename=codename, + ) + + +class DistCpeMapperGardenlinux(DistCpeMapper): + cpe_vendor = 'sap' + cpe_product = 'gardenlinux' + + def __call__(self, codename: str) -> DistCpe: + return DistCpe( + cpe_vendor='sap', + cpe_product='gardenlinux', + cpe_version=codename, + deb_codename=codename, + ) diff --git a/tests/data/test_dist_cpe.py b/tests/data/test_dist_cpe.py new file mode 100644 index 0000000..54b343e --- /dev/null +++ b/tests/data/test_dist_cpe.py @@ -0,0 +1,58 @@ +# SPDX-License-Identifier: MIT + +import pytest + +from glvd.data.dist_cpe import DistCpeMapper + + +class TestDistCpeMapper: + def test(self) -> None: + with pytest.raises(NotImplementedError): + DistCpeMapper()('') + + +class TestDistCpeMapperDebian: + @pytest.mark.parametrize( + 'codename,version', + [ + ('woody', '3.0'), + ('sarge', '3.1'), + ('etch', '4.0'), + ('lenny', '5.0'), + ('squeeze', '6.0'), + ('wheezy', '7'), + ('jessie', '8'), + ('stretch', '9'), + ('buster', '10'), + ('bullseye', '11'), + ('bookworm', '12'), + ('trixie', '13'), + ('forky', '14'), + ('', ''), + ], + ) + def test_valid(self, codename: str, version: str) -> None: + m = DistCpeMapper.new('debian') + c = m(codename) + + assert c.cpe_vendor == 'debian' + assert c.cpe_product == 'debian_linux' + assert c.cpe_version == version + assert c.deb_codename == codename + + def test_invalid(self) -> None: + m = DistCpeMapper.new('debian') + + with pytest.raises(KeyError): + m('invalid') + + +class TestDistCpeMapperGardenlinux: + def test_valid(self) -> None: + m = DistCpeMapper.new('gardenlinux') + c = m('999.9') + + assert c.cpe_vendor == 'sap' + assert c.cpe_product == 'gardenlinux' + assert c.cpe_version == '999.9' + assert c.deb_codename == '999.9'