Skip to content

Commit

Permalink
Store CVSS severity in combined deb information
Browse files Browse the repository at this point in the history
  • Loading branch information
credbbl committed Dec 13, 2023
1 parent 6271fe9 commit 869377f
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 13 deletions.
52 changes: 42 additions & 10 deletions src/glvd/cli/combine_deb.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from ..database import Base, DistCpe, DebCve
from ..data.cpe import Cpe, CpeOtherDebian
from ..data.cvss import CvssSeverity


logger = logging.getLogger(__name__)
Expand All @@ -31,24 +32,42 @@ class CombineDeb:
stmt_combine_new = (
text('''
SELECT
cve.cve_id
, src.deb_source
, src.deb_version
, cve.deb_version_fixed
, COALESCE(src.deb_version < cve.deb_version_fixed, TRUE) AS debsec_vulnerable
debsec_cve.cve_id
, nvd_cve.data
, debsrc.deb_source
, debsrc.deb_version
, debsec_cve.deb_version_fixed
, COALESCE(debsrc.deb_version < debsec_cve.deb_version_fixed, TRUE) AS debsec_vulnerable
, debsec_note
FROM
debsrc as src
LEFT OUTER JOIN debsec_cve AS cve ON src.deb_source = cve.deb_source
debsrc
LEFT OUTER JOIN debsec_cve ON debsec_cve.deb_source = debsrc.deb_source
INNER JOIN nvd_cve ON nvd_cve.cve_id = debsec_cve.cve_id
WHERE
src.dist_id = :dist_id
AND cve.dist_id = ANY(:dists_fallback_id)
debsrc.dist_id = :dist_id
AND debsec_cve.dist_id = ANY(:dists_fallback_id)
''')
.bindparams(
bindparam('dist_id'),
bindparam('dists_fallback_id'),
)
)

def extract_cvss_severity(
self,
entry: Any,
) -> CvssSeverity | None:
if metrics := entry.get('metrics'):
for i in ('cvssMetricV31', 'cvssMetricV30'):
if metric := metrics.get(i):
l = [i for i in metric if i.get('type', None) == 'Primary']
if l and (severity := l[0].get('cvssData', {}).get('baseSeverity')):
try:
return CvssSeverity[severity]
except KeyError:
return None
return None

async def combine_dists(
self,
session: AsyncSession,
Expand Down Expand Up @@ -120,7 +139,19 @@ async def combine(
'dist_id': dist.id,
'dists_fallback_id': [i.id for i in dists_fallback],
}):
cve_id, deb_source, deb_version, deb_version_fixed, debsec_vulnerable = r
(
cve_id,
nvd_data,
deb_source,
deb_version,
deb_version_fixed,
debsec_vulnerable,
debsec_note
) = r

cvss_severity = self.extract_cvss_severity(nvd_data)
if debsec_note == 'unimportant':
cvss_severity = CvssSeverity.LOW

cpe = Cpe(
part=Cpe.PART.OS,
Expand All @@ -144,6 +175,7 @@ async def combine(
new_entries[(cve_id, deb_source)] = DebCve(
dist=dist,
cve_id=cve_id,
cvss_severity=cvss_severity,
deb_source=deb_source,
deb_version=deb_version,
deb_version_fixed=deb_version_fixed,
Expand Down
28 changes: 28 additions & 0 deletions src/glvd/data/cvss.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# SPDX-License-Identifier: MIT

from __future__ import annotations

import enum


@enum.verify(enum.UNIQUE)
class CvssSeverity(enum.Enum):
NONE = 0
LOW = 1
MEDIUM = 2
HIGH = 3
CRITICAL = 4

@classmethod
def from_score(cls, score: int) -> Self:
if score < 0 or score > 10:
raise ValueError
if score >= 9:
return cls.CRITICAL
if score >= 7:
return cls.HIGH
if score >= 4:
return cls.MEDIUM
if score > 0:
return cls.LOW
return cls.NONE
9 changes: 8 additions & 1 deletion src/glvd/database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,19 @@
Text,
)

from .types import DebVersion
from ..data.cvss import CvssSeverity
from .types import (
CvssSeverityType,
DebVersion,
)


class Base(MappedAsDataclass, DeclarativeBase):
type_annotation_map = {
str: Text,
datetime: DateTime(timezone=True),
Any: JSON,
CvssSeverity: CvssSeverityType,
}


Expand Down Expand Up @@ -95,6 +100,7 @@ class DebCve(Base):
dist_id = mapped_column(ForeignKey(DistCpe.id), primary_key=True)
cve_id: Mapped[str] = mapped_column(primary_key=True)
last_mod: Mapped[datetime] = mapped_column(init=False, server_default=func.now(), onupdate=func.now())
cvss_severity: Mapped[Optional[CvssSeverity]] = mapped_column()
deb_source: Mapped[str] = mapped_column(primary_key=True)
deb_version: Mapped[str] = mapped_column(DebVersion)
deb_version_fixed: Mapped[Optional[str]] = mapped_column(DebVersion)
Expand All @@ -114,6 +120,7 @@ class DebCve(Base):
)

def merge(self, other: Self) -> None:
self.cvss_severity = other.cvss_severity
self.deb_version = other.deb_version
self.deb_version_fixed = other.deb_version_fixed
self.debsec_vulnerable = other.debsec_vulnerable
Expand Down
23 changes: 22 additions & 1 deletion src/glvd/database/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,28 @@

from __future__ import annotations

from sqlalchemy.types import UserDefinedType
from sqlalchemy.types import (
INT,
TypeDecorator,
UserDefinedType,
)

from ..data.cvss import CvssSeverity


class CvssSeverityType(TypeDecorator):
cache_ok = True
impl = INT

def process_bind_param(self, value: CvssSeverity | None, dialect) -> int:
if value is not None:
return value.value
return None

def process_result_value(self, value: int | None, dialect) -> CvssSeverity:
if value is not None:
return CvssSeverity(value)
return None


class DebVersion(UserDefinedType):
Expand Down
29 changes: 28 additions & 1 deletion tests/cli/test_combine_deb.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
# SPDX-License-Identifier: MIT

from datetime import datetime

from sqlalchemy import select

from glvd.cli.combine_deb import CombineDeb
from glvd.data.cvss import CvssSeverity
from glvd.data.dist_cpe import DistCpeMapper
from glvd.database import DistCpe, DebCve, DebsecCve, Debsrc
from glvd.database import DistCpe, DebCve, DebsecCve, Debsrc, NvdCve


class TestIngestDebsrc:
Expand All @@ -25,12 +28,29 @@ def debsrc(self, *, dist: DistCpe) -> Debsrc:
dist=dist,
)

def nvd_cve(self) -> NvdCve:
return NvdCve(
cve_id='TEST-1',
last_mod=datetime.now(),
data={
'metrics': {
'cvssMetricV31': [{
'type': 'Primary',
'cvssData': {
'baseSeverity': 'MEDIUM',
},
}],
},
},
)

async def test_combine_base(self, db_session):
dist_test = self.dist_mapper('trixie')

db_session.add(dist_test)
db_session.add(self.debsec_cve(deb_version_fixed='1', dist=dist_test))
db_session.add(self.debsrc(dist=dist_test))
db_session.add(self.nvd_cve())
await db_session.flush()

combine = CombineDeb()
Expand All @@ -40,6 +60,7 @@ async def test_combine_base(self, db_session):
assert len(r) == 1
t = r.pop(0)[0]
assert t.dist == dist_test
assert t.cvss_severity is CvssSeverity.MEDIUM
assert t.deb_source == 'test'
assert t.deb_version == '1'
assert t.debsec_vulnerable is False
Expand All @@ -58,6 +79,7 @@ async def test_combine_base_vulnerable(self, db_session):
db_session.add(dist_test)
db_session.add(self.debsec_cve(deb_version_fixed='2', dist=dist_test))
db_session.add(self.debsrc(dist=dist_test))
db_session.add(self.nvd_cve())
await db_session.flush()

combine = CombineDeb()
Expand All @@ -67,6 +89,7 @@ async def test_combine_base_vulnerable(self, db_session):
assert len(r) == 1
t = r.pop(0)[0]
assert t.dist == dist_test
assert t.cvss_severity is CvssSeverity.MEDIUM
assert t.deb_source == 'test'
assert t.deb_version == '1'
assert t.debsec_vulnerable is True
Expand All @@ -86,6 +109,7 @@ async def test_combine_fallback(self, db_session):
db_session.add(dist_test)
db_session.add(self.debsec_cve(deb_version_fixed='1', dist=dist_fallback))
db_session.add(self.debsrc(dist=dist_test))
db_session.add(self.nvd_cve())
await db_session.flush()

combine = CombineDeb()
Expand All @@ -95,6 +119,7 @@ async def test_combine_fallback(self, db_session):
assert len(r) == 1
t = r.pop(0)[0]
assert t.dist == dist_test
assert t.cvss_severity is CvssSeverity.MEDIUM
assert t.deb_source == 'test'
assert t.deb_version == '1'
assert t.debsec_vulnerable is False
Expand All @@ -114,6 +139,7 @@ async def test_combine_fallback_vulnerable(self, db_session):
db_session.add(dist_test)
db_session.add(self.debsec_cve(deb_version_fixed='2', dist=dist_fallback))
db_session.add(self.debsrc(dist=dist_test))
db_session.add(self.nvd_cve())
await db_session.flush()

combine = CombineDeb()
Expand All @@ -123,6 +149,7 @@ async def test_combine_fallback_vulnerable(self, db_session):
assert len(r) == 1
t = r.pop(0)[0]
assert t.dist == dist_test
assert t.cvss_severity is CvssSeverity.MEDIUM
assert t.deb_source == 'test'
assert t.deb_version == '1'
assert t.debsec_vulnerable is True
Expand Down
24 changes: 24 additions & 0 deletions tests/data/test_cvss.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# SPDX-License-Identifier: MIT

import pytest

from glvd.data.cvss import CvssSeverity


class TestCvssSeverity:
@pytest.mark.parametrize('score,value', [
(0, CvssSeverity.NONE),
(1, CvssSeverity.LOW),
(4, CvssSeverity.MEDIUM),
(7, CvssSeverity.HIGH),
(9, CvssSeverity.CRITICAL),
(10, CvssSeverity.CRITICAL),
])
def test_from_score(self, score, value):
a = CvssSeverity.from_score(score)
assert a is value

@pytest.mark.parametrize('score', [-0.1,10.1])
def test_from_score_fail(self, score):
with pytest.raises(ValueError):
CvssSeverity.from_score(score)
2 changes: 2 additions & 0 deletions tests/web/test_v1_cves.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ async def setup_example(self, db_session_class):
db_session_class.add(self.dist)
db_session_class.add(DebCve(
cve_id='TEST-fixed',
cvss_severity=None,
deb_source='test',
deb_version='1',
deb_version_fixed='1',
Expand All @@ -64,6 +65,7 @@ async def setup_example(self, db_session_class):
))
db_session_class.add(DebCve(
cve_id='TEST-vuln',
cvss_severity=None,
deb_source='test',
deb_version='1',
deb_version_fixed='2',
Expand Down

0 comments on commit 869377f

Please sign in to comment.