Skip to content

Commit

Permalink
Merge pull request #14 from gardenlinux/support-delete
Browse files Browse the repository at this point in the history
Support deletion in some ingest cases
  • Loading branch information
waldiTM authored Nov 29, 2023
2 parents 12fefa3 + 2469b32 commit 10e9b58
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 18 deletions.
3 changes: 2 additions & 1 deletion src/glvd/cli/ingest_debsec.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ async def import_cve_update(

new_entries = file_cve.get(entry.dist.deb_codename)
if not new_entries:
await session.delete(entry)
continue
new_entry = new_entries.pop((entry.cve_id, entry.deb_source), None)
if not new_entry:
# XXX: Delete entry
await session.delete(entry)
continue

# Update object in place. Only real changes will be commited
Expand Down
2 changes: 1 addition & 1 deletion src/glvd/cli/ingest_debsrc.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async def import_update(

new_entry = file.pop(entry.deb_source, None)
if not new_entry:
# XXX: Delete entry
await session.delete(entry)
continue

# Update object in place. Only real changes will be commited
Expand Down
116 changes: 106 additions & 10 deletions tests/cli/test_ingest_debsec.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,43 @@
# SPDX-License-Identifier: MIT

import pytest

from sqlalchemy import select

from glvd.cli.ingest_debsec import IngestDebsec
from glvd.database import DebsecCve
from glvd.data.debsec_cve import DebsecCveFile


@pytest.mark.incremental
class TestIngestDebsec:
async def test_import_cve(self, db_session):
async def test_import_cve_insert(self, db_session_class):
f = DebsecCveFile()
f[''] = {
('TEST-1', 'hello'): DebsecCve(
cve_id='TEST-1',
deb_source='hello',
deb_version_fixed='2',
)
),
('TEST-2', 'hello'): DebsecCve(
cve_id='TEST-2',
deb_source='hello',
deb_version_fixed='2',
),
}
f['bookworm'] = {
('TEST-1', 'hello'): DebsecCve(
cve_id='TEST-1',
deb_source='hello',
deb_version_fixed='1',
)
),
}

ingest = IngestDebsec('debian', None)
await ingest.import_cve_insert(db_session, f)
await ingest.import_cve_insert(db_session_class, f)

r = (await db_session.execute(select(DebsecCve).order_by(DebsecCve.cve_id, DebsecCve.deb_version_fixed))).all()
assert len(r) == 2
r = (await db_session_class.execute(select(DebsecCve).order_by(DebsecCve.cve_id, DebsecCve.deb_version_fixed))).all()
assert len(r) == 3
t = r.pop(0)[0]
assert t.cve_id == 'TEST-1'
assert t.dist.cpe_version == '12'
Expand All @@ -42,20 +50,108 @@ async def test_import_cve(self, db_session):
assert t.deb_source == 'hello'
assert t.deb_version_fixed == '2'

t = r.pop(0)[0]
assert t.cve_id == 'TEST-2'
assert t.dist.cpe_version == ''
assert t.deb_source == 'hello'
assert t.deb_version_fixed == '2'

async def test_import_cve_update(self, db_session_class):
f = DebsecCveFile()
f[''] = {
('TEST-1', 'hello'): DebsecCve(
cve_id='TEST-1',
deb_source='hello',
deb_version_fixed='3',
)
),
('TEST-2', 'hello'): DebsecCve(
cve_id='TEST-2',
deb_source='hello',
deb_version_fixed='3',
),
}
f['bookworm'] = {
('TEST-1', 'hello'): DebsecCve(
cve_id='TEST-1',
deb_source='hello',
deb_version_fixed='1',
),
}

ingest = IngestDebsec('debian', None)
await ingest.import_cve_update(db_session_class, f)

await ingest.import_cve_update(db_session, f)
r = (await db_session_class.execute(select(DebsecCve).order_by(DebsecCve.cve_id, DebsecCve.deb_version_fixed))).all()
assert len(r) == 3
t = r.pop(0)[0]
assert t.cve_id == 'TEST-1'
assert t.dist.cpe_version == '12'
assert t.deb_source == 'hello'
assert t.deb_version_fixed == '1'

r = (await db_session.execute(select(DebsecCve).order_by(DebsecCve.cve_id, DebsecCve.deb_version_fixed))).all()
t = r.pop(0)[0]
assert t.cve_id == 'TEST-1'
assert t.dist.cpe_version == ''
assert t.deb_source == 'hello'
assert t.deb_version_fixed == '3'

t = r.pop(0)[0]
assert t.cve_id == 'TEST-2'
assert t.dist.cpe_version == ''
assert t.deb_source == 'hello'
assert t.deb_version_fixed == '3'

async def test_import_cve_delete_entry(self, db_session_class):
f = DebsecCveFile()
f[''] = {
('TEST-1', 'hello'): DebsecCve(
cve_id='TEST-1',
deb_source='hello',
deb_version_fixed='3',
),
}
f['bookworm'] = {
('TEST-1', 'hello'): DebsecCve(
cve_id='TEST-1',
deb_source='hello',
deb_version_fixed='1',
),
}

ingest = IngestDebsec('debian', None)
await ingest.import_cve_update(db_session_class, f)

r = (await db_session_class.execute(select(DebsecCve).order_by(DebsecCve.cve_id, DebsecCve.deb_version_fixed))).all()
assert len(r) == 2
t = r.pop(1)[0]
t = r.pop(0)[0]
assert t.cve_id == 'TEST-1'
assert t.dist.cpe_version == '12'
assert t.deb_source == 'hello'
assert t.deb_version_fixed == '1'

t = r.pop(0)[0]
assert t.cve_id == 'TEST-1'
assert t.dist.cpe_version == ''
assert t.deb_source == 'hello'
assert t.deb_version_fixed == '3'

async def test_import_cve_delete_dist(self, db_session_class):
f = DebsecCveFile()
f['bookworm'] = {
('TEST-1', 'hello'): DebsecCve(
cve_id='TEST-1',
deb_source='hello',
deb_version_fixed='1',
),
}

ingest = IngestDebsec('debian', None)
await ingest.import_cve_update(db_session_class, f)

r = (await db_session_class.execute(select(DebsecCve).order_by(DebsecCve.cve_id, DebsecCve.deb_version_fixed))).all()
assert len(r) == 1
t = r.pop(0)[0]
assert t.cve_id == 'TEST-1'
assert t.dist.cpe_version == '12'
assert t.deb_source == 'hello'
assert t.deb_version_fixed == '1'
44 changes: 38 additions & 6 deletions tests/cli/test_ingest_debsrc.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
# SPDX-License-Identifier: MIT

import pytest

from sqlalchemy import select

from glvd.cli.ingest_debsrc import IngestDebsrc
from glvd.database import Debsrc
from glvd.data.debsrc import DebsrcFile


@pytest.mark.incremental
class TestIngestDebsrc:
async def test_import(self, db_session):
async def test_import_insert(self, db_session_class):
f = DebsrcFile()
f['test1'] = Debsrc(
deb_source='test1',
Expand All @@ -20,9 +23,9 @@ async def test_import(self, db_session):
)

ingest = IngestDebsrc('debian', 'bookworm', None)
await ingest.import_insert(db_session, f)
await ingest.import_insert(db_session_class, f)

r = (await db_session.execute(select(Debsrc).order_by(Debsrc.deb_source))).all()
r = (await db_session_class.execute(select(Debsrc).order_by(Debsrc.deb_source))).all()
assert len(r) == 2
t = r.pop(0)[0]
assert t.dist.cpe_version == '12'
Expand All @@ -34,16 +37,45 @@ async def test_import(self, db_session):
assert t.deb_source == 'test2'
assert t.deb_version == '2'

async def test_import_update(self, db_session_class):
f = DebsrcFile()
f['test1'] = Debsrc(
deb_source='test1',
deb_version='1',
)
f['test2'] = Debsrc(
deb_source='test2',
deb_version='3',
)

await ingest.import_update(db_session, f)
ingest = IngestDebsrc('debian', 'bookworm', None)
await ingest.import_update(db_session_class, f)

r = (await db_session.execute(select(Debsrc).order_by(Debsrc.deb_source))).all()
r = (await db_session_class.execute(select(Debsrc).order_by(Debsrc.deb_source))).all()
assert len(r) == 2
t = r.pop(1)[0]
t = r.pop(0)[0]
assert t.dist.cpe_version == '12'
assert t.deb_source == 'test1'
assert t.deb_version == '1'

t = r.pop(0)[0]
assert t.dist.cpe_version == '12'
assert t.deb_source == 'test2'
assert t.deb_version == '3'

async def test_import_delete(self, db_session_class):
f = DebsrcFile()
f['test1'] = Debsrc(
deb_source='test1',
deb_version='1',
)

ingest = IngestDebsrc('debian', 'bookworm', None)
await ingest.import_update(db_session_class, f)

r = (await db_session_class.execute(select(Debsrc).order_by(Debsrc.deb_source))).all()
assert len(r) == 1
t = r.pop(0)[0]
assert t.dist.cpe_version == '12'
assert t.deb_source == 'test1'
assert t.deb_version == '1'
66 changes: 66 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,69 @@ async def db_session(db_engine):
async with async_sessionmaker(db_engine)() as session:
yield session
await session.rollback()


@pytest.fixture(scope='class')
async def db_session_class(db_engine):
'''
Provides an asynchronous SQLAlchemy database session.
Similar to db_session, but scoped on class. So it can be used in
combination with @pytest.mark.incremental.
This should never be commited, or it will affect other tests.
'''
async with async_sessionmaker(db_engine)() as session:
yield session
await session.rollback()


# Based on
# https://docs.pytest.org/en/7.4.x/example/simple.html#incremental-testing-test-steps
# https://docs.pytest.org/en/7.4.x/license.html
class _PytestIncremental:
test_failed: dict[type, dict[tuple[int, ...], str]]

def __init__(self) -> None:
self.test_failed = {}

# retrieve the index of the test (if parametrize is used in combination with incremental)
def _index(self, item: pytest.Function) -> tuple[int, ...]:
if callspec := getattr(item, 'callspec', None):
return tuple(callspec.indices.values())
else:
return ()

def runtest_makereport(self, item: pytest.Function, call: pytest.CallInfo) -> None:
if call.excinfo is not None:
self.test_failed.setdefault(item.cls, {}).setdefault(
self._index(item),
item.originalname or item.name,
)

def runtest_setup(self, item: pytest.Function) -> None:
if test_failed := self.test_failed.get(item.cls, None):
if (test_name := test_failed.get(self._index(item), None)):
pytest.xfail('previous test failed ({})'.format(test_name))


dispatch_keywords = {
'incremental': _PytestIncremental(),
}


def pytest_configure(config: pytest.Config) -> None:
for k, v in dispatch_keywords.items():
config.addinivalue_line('markers', k)


def pytest_runtest_makereport(item: pytest.Function, call: pytest.CallInfo) -> None:
for k, v in dispatch_keywords.items():
if k in item.keywords:
v.runtest_makereport(item, call)


def pytest_runtest_setup(item: pytest.Function):
for k, v in dispatch_keywords.items():
if k in item.keywords:
v.runtest_setup(item)

0 comments on commit 10e9b58

Please sign in to comment.