Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support deletion in some ingest cases #14

Merged
merged 2 commits into from
Nov 29, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/glvd/cli/ingest_debsec.py
Original file line number Diff line number Diff line change
@@ -52,10 +52,11 @@ async def import_cve_update(

new_entries = file_cve.get(entry.dist.deb_codename)
if not new_entries:
await session.delete(entry)
continue
new_entry = new_entries.pop((entry.cve_id, entry.deb_source), None)
if not new_entry:
# XXX: Delete entry
await session.delete(entry)
continue

# Update object in place. Only real changes will be commited
2 changes: 1 addition & 1 deletion src/glvd/cli/ingest_debsrc.py
Original file line number Diff line number Diff line change
@@ -53,7 +53,7 @@ async def import_update(

new_entry = file.pop(entry.deb_source, None)
if not new_entry:
# XXX: Delete entry
await session.delete(entry)
continue

# Update object in place. Only real changes will be commited
116 changes: 106 additions & 10 deletions tests/cli/test_ingest_debsec.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,43 @@
# SPDX-License-Identifier: MIT

import pytest

from sqlalchemy import select

from glvd.cli.ingest_debsec import IngestDebsec
from glvd.database import DebsecCve
from glvd.data.debsec_cve import DebsecCveFile


@pytest.mark.incremental
class TestIngestDebsec:
async def test_import_cve(self, db_session):
async def test_import_cve_insert(self, db_session_class):
f = DebsecCveFile()
f[''] = {
('TEST-1', 'hello'): DebsecCve(
cve_id='TEST-1',
deb_source='hello',
deb_version_fixed='2',
)
),
('TEST-2', 'hello'): DebsecCve(
cve_id='TEST-2',
deb_source='hello',
deb_version_fixed='2',
),
}
f['bookworm'] = {
('TEST-1', 'hello'): DebsecCve(
cve_id='TEST-1',
deb_source='hello',
deb_version_fixed='1',
)
),
}

ingest = IngestDebsec('debian', None)
await ingest.import_cve_insert(db_session, f)
await ingest.import_cve_insert(db_session_class, f)

r = (await db_session.execute(select(DebsecCve).order_by(DebsecCve.cve_id, DebsecCve.deb_version_fixed))).all()
assert len(r) == 2
r = (await db_session_class.execute(select(DebsecCve).order_by(DebsecCve.cve_id, DebsecCve.deb_version_fixed))).all()
assert len(r) == 3
t = r.pop(0)[0]
assert t.cve_id == 'TEST-1'
assert t.dist.cpe_version == '12'
@@ -42,20 +50,108 @@ async def test_import_cve(self, db_session):
assert t.deb_source == 'hello'
assert t.deb_version_fixed == '2'

t = r.pop(0)[0]
assert t.cve_id == 'TEST-2'
assert t.dist.cpe_version == ''
assert t.deb_source == 'hello'
assert t.deb_version_fixed == '2'

async def test_import_cve_update(self, db_session_class):
f = DebsecCveFile()
f[''] = {
('TEST-1', 'hello'): DebsecCve(
cve_id='TEST-1',
deb_source='hello',
deb_version_fixed='3',
)
),
('TEST-2', 'hello'): DebsecCve(
cve_id='TEST-2',
deb_source='hello',
deb_version_fixed='3',
),
}
f['bookworm'] = {
('TEST-1', 'hello'): DebsecCve(
cve_id='TEST-1',
deb_source='hello',
deb_version_fixed='1',
),
}

ingest = IngestDebsec('debian', None)
await ingest.import_cve_update(db_session_class, f)

await ingest.import_cve_update(db_session, f)
r = (await db_session_class.execute(select(DebsecCve).order_by(DebsecCve.cve_id, DebsecCve.deb_version_fixed))).all()
assert len(r) == 3
t = r.pop(0)[0]
assert t.cve_id == 'TEST-1'
assert t.dist.cpe_version == '12'
assert t.deb_source == 'hello'
assert t.deb_version_fixed == '1'

r = (await db_session.execute(select(DebsecCve).order_by(DebsecCve.cve_id, DebsecCve.deb_version_fixed))).all()
t = r.pop(0)[0]
assert t.cve_id == 'TEST-1'
assert t.dist.cpe_version == ''
assert t.deb_source == 'hello'
assert t.deb_version_fixed == '3'

t = r.pop(0)[0]
assert t.cve_id == 'TEST-2'
assert t.dist.cpe_version == ''
assert t.deb_source == 'hello'
assert t.deb_version_fixed == '3'

async def test_import_cve_delete_entry(self, db_session_class):
f = DebsecCveFile()
f[''] = {
('TEST-1', 'hello'): DebsecCve(
cve_id='TEST-1',
deb_source='hello',
deb_version_fixed='3',
),
}
f['bookworm'] = {
('TEST-1', 'hello'): DebsecCve(
cve_id='TEST-1',
deb_source='hello',
deb_version_fixed='1',
),
}

ingest = IngestDebsec('debian', None)
await ingest.import_cve_update(db_session_class, f)

r = (await db_session_class.execute(select(DebsecCve).order_by(DebsecCve.cve_id, DebsecCve.deb_version_fixed))).all()
assert len(r) == 2
t = r.pop(1)[0]
t = r.pop(0)[0]
assert t.cve_id == 'TEST-1'
assert t.dist.cpe_version == '12'
assert t.deb_source == 'hello'
assert t.deb_version_fixed == '1'

t = r.pop(0)[0]
assert t.cve_id == 'TEST-1'
assert t.dist.cpe_version == ''
assert t.deb_source == 'hello'
assert t.deb_version_fixed == '3'

async def test_import_cve_delete_dist(self, db_session_class):
f = DebsecCveFile()
f['bookworm'] = {
('TEST-1', 'hello'): DebsecCve(
cve_id='TEST-1',
deb_source='hello',
deb_version_fixed='1',
),
}

ingest = IngestDebsec('debian', None)
await ingest.import_cve_update(db_session_class, f)

r = (await db_session_class.execute(select(DebsecCve).order_by(DebsecCve.cve_id, DebsecCve.deb_version_fixed))).all()
assert len(r) == 1
t = r.pop(0)[0]
assert t.cve_id == 'TEST-1'
assert t.dist.cpe_version == '12'
assert t.deb_source == 'hello'
assert t.deb_version_fixed == '1'
44 changes: 38 additions & 6 deletions tests/cli/test_ingest_debsrc.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
# SPDX-License-Identifier: MIT

import pytest

from sqlalchemy import select

from glvd.cli.ingest_debsrc import IngestDebsrc
from glvd.database import Debsrc
from glvd.data.debsrc import DebsrcFile


@pytest.mark.incremental
class TestIngestDebsrc:
async def test_import(self, db_session):
async def test_import_insert(self, db_session_class):
f = DebsrcFile()
f['test1'] = Debsrc(
deb_source='test1',
@@ -20,9 +23,9 @@ async def test_import(self, db_session):
)

ingest = IngestDebsrc('debian', 'bookworm', None)
await ingest.import_insert(db_session, f)
await ingest.import_insert(db_session_class, f)

r = (await db_session.execute(select(Debsrc).order_by(Debsrc.deb_source))).all()
r = (await db_session_class.execute(select(Debsrc).order_by(Debsrc.deb_source))).all()
assert len(r) == 2
t = r.pop(0)[0]
assert t.dist.cpe_version == '12'
@@ -34,16 +37,45 @@ async def test_import(self, db_session):
assert t.deb_source == 'test2'
assert t.deb_version == '2'

async def test_import_update(self, db_session_class):
f = DebsrcFile()
f['test1'] = Debsrc(
deb_source='test1',
deb_version='1',
)
f['test2'] = Debsrc(
deb_source='test2',
deb_version='3',
)

await ingest.import_update(db_session, f)
ingest = IngestDebsrc('debian', 'bookworm', None)
await ingest.import_update(db_session_class, f)

r = (await db_session.execute(select(Debsrc).order_by(Debsrc.deb_source))).all()
r = (await db_session_class.execute(select(Debsrc).order_by(Debsrc.deb_source))).all()
assert len(r) == 2
t = r.pop(1)[0]
t = r.pop(0)[0]
assert t.dist.cpe_version == '12'
assert t.deb_source == 'test1'
assert t.deb_version == '1'

t = r.pop(0)[0]
assert t.dist.cpe_version == '12'
assert t.deb_source == 'test2'
assert t.deb_version == '3'

async def test_import_delete(self, db_session_class):
f = DebsrcFile()
f['test1'] = Debsrc(
deb_source='test1',
deb_version='1',
)

ingest = IngestDebsrc('debian', 'bookworm', None)
await ingest.import_update(db_session_class, f)

r = (await db_session_class.execute(select(Debsrc).order_by(Debsrc.deb_source))).all()
assert len(r) == 1
t = r.pop(0)[0]
assert t.dist.cpe_version == '12'
assert t.deb_source == 'test1'
assert t.deb_version == '1'
66 changes: 66 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -84,3 +84,69 @@ async def db_session(db_engine):
async with async_sessionmaker(db_engine)() as session:
yield session
await session.rollback()


@pytest.fixture(scope='class')
async def db_session_class(db_engine):
'''
Provides an asynchronous SQLAlchemy database session.

Similar to db_session, but scoped on class. So it can be used in
combination with @pytest.mark.incremental.

This should never be commited, or it will affect other tests.
'''
async with async_sessionmaker(db_engine)() as session:
yield session
await session.rollback()


# Based on
# https://docs.pytest.org/en/7.4.x/example/simple.html#incremental-testing-test-steps
# https://docs.pytest.org/en/7.4.x/license.html
class _PytestIncremental:
test_failed: dict[type, dict[tuple[int, ...], str]]

def __init__(self) -> None:
self.test_failed = {}

# retrieve the index of the test (if parametrize is used in combination with incremental)
def _index(self, item: pytest.Function) -> tuple[int, ...]:
if callspec := getattr(item, 'callspec', None):
return tuple(callspec.indices.values())
else:
return ()

def runtest_makereport(self, item: pytest.Function, call: pytest.CallInfo) -> None:
if call.excinfo is not None:
self.test_failed.setdefault(item.cls, {}).setdefault(
self._index(item),
item.originalname or item.name,
)

def runtest_setup(self, item: pytest.Function) -> None:
if test_failed := self.test_failed.get(item.cls, None):
if (test_name := test_failed.get(self._index(item), None)):
pytest.xfail('previous test failed ({})'.format(test_name))


dispatch_keywords = {
'incremental': _PytestIncremental(),
}


def pytest_configure(config: pytest.Config) -> None:
for k, v in dispatch_keywords.items():
config.addinivalue_line('markers', k)


def pytest_runtest_makereport(item: pytest.Function, call: pytest.CallInfo) -> None:
for k, v in dispatch_keywords.items():
if k in item.keywords:
v.runtest_makereport(item, call)


def pytest_runtest_setup(item: pytest.Function):
for k, v in dispatch_keywords.items():
if k in item.keywords:
v.runtest_setup(item)