diff --git a/src/glvd/cli/ingest_debsec.py b/src/glvd/cli/ingest_debsec.py index 33efbbd..5841e5d 100644 --- a/src/glvd/cli/ingest_debsec.py +++ b/src/glvd/cli/ingest_debsec.py @@ -52,10 +52,11 @@ async def import_cve_update( new_entries = file_cve.get(entry.dist.deb_codename) if not new_entries: + await session.delete(entry) continue new_entry = new_entries.pop((entry.cve_id, entry.deb_source), None) if not new_entry: - # XXX: Delete entry + await session.delete(entry) continue # Update object in place. Only real changes will be commited diff --git a/src/glvd/cli/ingest_debsrc.py b/src/glvd/cli/ingest_debsrc.py index 6cb2eb5..0989cdc 100644 --- a/src/glvd/cli/ingest_debsrc.py +++ b/src/glvd/cli/ingest_debsrc.py @@ -53,7 +53,7 @@ async def import_update( new_entry = file.pop(entry.deb_source, None) if not new_entry: - # XXX: Delete entry + await session.delete(entry) continue # Update object in place. Only real changes will be commited diff --git a/tests/cli/test_ingest_debsec.py b/tests/cli/test_ingest_debsec.py index 496719b..e65abb4 100644 --- a/tests/cli/test_ingest_debsec.py +++ b/tests/cli/test_ingest_debsec.py @@ -1,5 +1,7 @@ # SPDX-License-Identifier: MIT +import pytest + from sqlalchemy import select from glvd.cli.ingest_debsec import IngestDebsec @@ -7,29 +9,35 @@ from glvd.data.debsec_cve import DebsecCveFile +@pytest.mark.incremental class TestIngestDebsec: - async def test_import_cve(self, db_session): + async def test_import_cve_insert(self, db_session_class): f = DebsecCveFile() f[''] = { ('TEST-1', 'hello'): DebsecCve( cve_id='TEST-1', deb_source='hello', deb_version_fixed='2', - ) + ), + ('TEST-2', 'hello'): DebsecCve( + cve_id='TEST-2', + deb_source='hello', + deb_version_fixed='2', + ), } f['bookworm'] = { ('TEST-1', 'hello'): DebsecCve( cve_id='TEST-1', deb_source='hello', deb_version_fixed='1', - ) + ), } ingest = IngestDebsec('debian', None) - await ingest.import_cve_insert(db_session, f) + await ingest.import_cve_insert(db_session_class, f) - r = (await db_session.execute(select(DebsecCve).order_by(DebsecCve.cve_id, DebsecCve.deb_version_fixed))).all() - assert len(r) == 2 + r = (await db_session_class.execute(select(DebsecCve).order_by(DebsecCve.cve_id, DebsecCve.deb_version_fixed))).all() + assert len(r) == 3 t = r.pop(0)[0] assert t.cve_id == 'TEST-1' assert t.dist.cpe_version == '12' @@ -42,20 +50,108 @@ async def test_import_cve(self, db_session): assert t.deb_source == 'hello' assert t.deb_version_fixed == '2' + t = r.pop(0)[0] + assert t.cve_id == 'TEST-2' + assert t.dist.cpe_version == '' + assert t.deb_source == 'hello' + assert t.deb_version_fixed == '2' + + async def test_import_cve_update(self, db_session_class): + f = DebsecCveFile() f[''] = { ('TEST-1', 'hello'): DebsecCve( cve_id='TEST-1', deb_source='hello', deb_version_fixed='3', - ) + ), + ('TEST-2', 'hello'): DebsecCve( + cve_id='TEST-2', + deb_source='hello', + deb_version_fixed='3', + ), } + f['bookworm'] = { + ('TEST-1', 'hello'): DebsecCve( + cve_id='TEST-1', + deb_source='hello', + deb_version_fixed='1', + ), + } + + ingest = IngestDebsec('debian', None) + await ingest.import_cve_update(db_session_class, f) - await ingest.import_cve_update(db_session, f) + r = (await db_session_class.execute(select(DebsecCve).order_by(DebsecCve.cve_id, DebsecCve.deb_version_fixed))).all() + assert len(r) == 3 + t = r.pop(0)[0] + assert t.cve_id == 'TEST-1' + assert t.dist.cpe_version == '12' + assert t.deb_source == 'hello' + assert t.deb_version_fixed == '1' - r = (await db_session.execute(select(DebsecCve).order_by(DebsecCve.cve_id, DebsecCve.deb_version_fixed))).all() + t = r.pop(0)[0] + assert t.cve_id == 'TEST-1' + assert t.dist.cpe_version == '' + assert t.deb_source == 'hello' + assert t.deb_version_fixed == '3' + + t = r.pop(0)[0] + assert t.cve_id == 'TEST-2' + assert t.dist.cpe_version == '' + assert t.deb_source == 'hello' + assert t.deb_version_fixed == '3' + + async def test_import_cve_delete_entry(self, db_session_class): + f = DebsecCveFile() + f[''] = { + ('TEST-1', 'hello'): DebsecCve( + cve_id='TEST-1', + deb_source='hello', + deb_version_fixed='3', + ), + } + f['bookworm'] = { + ('TEST-1', 'hello'): DebsecCve( + cve_id='TEST-1', + deb_source='hello', + deb_version_fixed='1', + ), + } + + ingest = IngestDebsec('debian', None) + await ingest.import_cve_update(db_session_class, f) + + r = (await db_session_class.execute(select(DebsecCve).order_by(DebsecCve.cve_id, DebsecCve.deb_version_fixed))).all() assert len(r) == 2 - t = r.pop(1)[0] + t = r.pop(0)[0] + assert t.cve_id == 'TEST-1' + assert t.dist.cpe_version == '12' + assert t.deb_source == 'hello' + assert t.deb_version_fixed == '1' + + t = r.pop(0)[0] assert t.cve_id == 'TEST-1' assert t.dist.cpe_version == '' assert t.deb_source == 'hello' assert t.deb_version_fixed == '3' + + async def test_import_cve_delete_dist(self, db_session_class): + f = DebsecCveFile() + f['bookworm'] = { + ('TEST-1', 'hello'): DebsecCve( + cve_id='TEST-1', + deb_source='hello', + deb_version_fixed='1', + ), + } + + ingest = IngestDebsec('debian', None) + await ingest.import_cve_update(db_session_class, f) + + r = (await db_session_class.execute(select(DebsecCve).order_by(DebsecCve.cve_id, DebsecCve.deb_version_fixed))).all() + assert len(r) == 1 + t = r.pop(0)[0] + assert t.cve_id == 'TEST-1' + assert t.dist.cpe_version == '12' + assert t.deb_source == 'hello' + assert t.deb_version_fixed == '1' diff --git a/tests/cli/test_ingest_debsrc.py b/tests/cli/test_ingest_debsrc.py index 3ad9f5f..1044c2c 100644 --- a/tests/cli/test_ingest_debsrc.py +++ b/tests/cli/test_ingest_debsrc.py @@ -1,5 +1,7 @@ # SPDX-License-Identifier: MIT +import pytest + from sqlalchemy import select from glvd.cli.ingest_debsrc import IngestDebsrc @@ -7,8 +9,9 @@ from glvd.data.debsrc import DebsrcFile +@pytest.mark.incremental class TestIngestDebsrc: - async def test_import(self, db_session): + async def test_import_insert(self, db_session_class): f = DebsrcFile() f['test1'] = Debsrc( deb_source='test1', @@ -20,9 +23,9 @@ async def test_import(self, db_session): ) ingest = IngestDebsrc('debian', 'bookworm', None) - await ingest.import_insert(db_session, f) + await ingest.import_insert(db_session_class, f) - r = (await db_session.execute(select(Debsrc).order_by(Debsrc.deb_source))).all() + r = (await db_session_class.execute(select(Debsrc).order_by(Debsrc.deb_source))).all() assert len(r) == 2 t = r.pop(0)[0] assert t.dist.cpe_version == '12' @@ -34,16 +37,45 @@ async def test_import(self, db_session): assert t.deb_source == 'test2' assert t.deb_version == '2' + async def test_import_update(self, db_session_class): + f = DebsrcFile() + f['test1'] = Debsrc( + deb_source='test1', + deb_version='1', + ) f['test2'] = Debsrc( deb_source='test2', deb_version='3', ) - await ingest.import_update(db_session, f) + ingest = IngestDebsrc('debian', 'bookworm', None) + await ingest.import_update(db_session_class, f) - r = (await db_session.execute(select(Debsrc).order_by(Debsrc.deb_source))).all() + r = (await db_session_class.execute(select(Debsrc).order_by(Debsrc.deb_source))).all() assert len(r) == 2 - t = r.pop(1)[0] + t = r.pop(0)[0] + assert t.dist.cpe_version == '12' + assert t.deb_source == 'test1' + assert t.deb_version == '1' + + t = r.pop(0)[0] assert t.dist.cpe_version == '12' assert t.deb_source == 'test2' assert t.deb_version == '3' + + async def test_import_delete(self, db_session_class): + f = DebsrcFile() + f['test1'] = Debsrc( + deb_source='test1', + deb_version='1', + ) + + ingest = IngestDebsrc('debian', 'bookworm', None) + await ingest.import_update(db_session_class, f) + + r = (await db_session_class.execute(select(Debsrc).order_by(Debsrc.deb_source))).all() + assert len(r) == 1 + t = r.pop(0)[0] + assert t.dist.cpe_version == '12' + assert t.deb_source == 'test1' + assert t.deb_version == '1' diff --git a/tests/conftest.py b/tests/conftest.py index 49f9b14..136ac98 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -84,3 +84,69 @@ async def db_session(db_engine): async with async_sessionmaker(db_engine)() as session: yield session await session.rollback() + + +@pytest.fixture(scope='class') +async def db_session_class(db_engine): + ''' + Provides an asynchronous SQLAlchemy database session. + + Similar to db_session, but scoped on class. So it can be used in + combination with @pytest.mark.incremental. + + This should never be commited, or it will affect other tests. + ''' + async with async_sessionmaker(db_engine)() as session: + yield session + await session.rollback() + + +# Based on +# https://docs.pytest.org/en/7.4.x/example/simple.html#incremental-testing-test-steps +# https://docs.pytest.org/en/7.4.x/license.html +class _PytestIncremental: + test_failed: dict[type, dict[tuple[int, ...], str]] + + def __init__(self) -> None: + self.test_failed = {} + + # retrieve the index of the test (if parametrize is used in combination with incremental) + def _index(self, item: pytest.Function) -> tuple[int, ...]: + if callspec := getattr(item, 'callspec', None): + return tuple(callspec.indices.values()) + else: + return () + + def runtest_makereport(self, item: pytest.Function, call: pytest.CallInfo) -> None: + if call.excinfo is not None: + self.test_failed.setdefault(item.cls, {}).setdefault( + self._index(item), + item.originalname or item.name, + ) + + def runtest_setup(self, item: pytest.Function) -> None: + if test_failed := self.test_failed.get(item.cls, None): + if (test_name := test_failed.get(self._index(item), None)): + pytest.xfail('previous test failed ({})'.format(test_name)) + + +dispatch_keywords = { + 'incremental': _PytestIncremental(), +} + + +def pytest_configure(config: pytest.Config) -> None: + for k, v in dispatch_keywords.items(): + config.addinivalue_line('markers', k) + + +def pytest_runtest_makereport(item: pytest.Function, call: pytest.CallInfo) -> None: + for k, v in dispatch_keywords.items(): + if k in item.keywords: + v.runtest_makereport(item, call) + + +def pytest_runtest_setup(item: pytest.Function): + for k, v in dispatch_keywords.items(): + if k in item.keywords: + v.runtest_setup(item)