Skip to content

Commit

Permalink
Store CVSS severity in combined deb information
Browse files Browse the repository at this point in the history
  • Loading branch information
credbbl committed Dec 14, 2023
1 parent 6271fe9 commit 5eeed09
Show file tree
Hide file tree
Showing 7 changed files with 198 additions and 18 deletions.
55 changes: 45 additions & 10 deletions src/glvd/cli/combine_deb.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import logging
from typing import (
Any,
AsyncGenerator,
)

Expand All @@ -22,6 +23,7 @@

from ..database import Base, DistCpe, DebCve
from ..data.cpe import Cpe, CpeOtherDebian
from ..data.cvss import CvssSeverity


logger = logging.getLogger(__name__)
Expand All @@ -31,24 +33,42 @@ class CombineDeb:
stmt_combine_new = (
text('''
SELECT
cve.cve_id
, src.deb_source
, src.deb_version
, cve.deb_version_fixed
, COALESCE(src.deb_version < cve.deb_version_fixed, TRUE) AS debsec_vulnerable
debsec_cve.cve_id
, nvd_cve.data
, debsrc.deb_source
, debsrc.deb_version
, debsec_cve.deb_version_fixed
, COALESCE(debsrc.deb_version < debsec_cve.deb_version_fixed, TRUE) AS debsec_vulnerable
, debsec_note
FROM
debsrc as src
LEFT OUTER JOIN debsec_cve AS cve ON src.deb_source = cve.deb_source
debsrc
LEFT OUTER JOIN debsec_cve ON debsec_cve.deb_source = debsrc.deb_source
INNER JOIN nvd_cve ON nvd_cve.cve_id = debsec_cve.cve_id
WHERE
src.dist_id = :dist_id
AND cve.dist_id = ANY(:dists_fallback_id)
debsrc.dist_id = :dist_id
AND debsec_cve.dist_id = ANY(:dists_fallback_id)
''')
.bindparams(
bindparam('dist_id'),
bindparam('dists_fallback_id'),
)
)

def extract_cvss_severity(
self,
entry: Any,
) -> CvssSeverity | None:
if metrics := entry.get('metrics'):
for i in ('cvssMetricV31', 'cvssMetricV30'):
if metrics_single := metrics.get(i):
metrics_primary = [i for i in metrics_single if i.get('type', None) == 'Primary']
if metrics_primary and (severity := metrics_primary[0].get('cvssData', {}).get('baseSeverity')):
try:
return CvssSeverity[severity]
except KeyError:
return None
return None

async def combine_dists(
self,
session: AsyncSession,
Expand Down Expand Up @@ -120,7 +140,19 @@ async def combine(
'dist_id': dist.id,
'dists_fallback_id': [i.id for i in dists_fallback],
}):
cve_id, deb_source, deb_version, deb_version_fixed, debsec_vulnerable = r
(
cve_id,
nvd_data,
deb_source,
deb_version,
deb_version_fixed,
debsec_vulnerable,
debsec_note
) = r

cvss_severity = self.extract_cvss_severity(nvd_data)
if debsec_note == 'unimportant':
cvss_severity = CvssSeverity.UNIMPORTANT

cpe = Cpe(
part=Cpe.PART.OS,
Expand All @@ -138,12 +170,15 @@ async def combine(
'vulnerable': debsec_vulnerable,
}

if cvss_severity:
cpe_match['deb']['cvssSeverity'] = cvss_severity.name
if deb_version_fixed:
cpe_match['deb']['versionEndExcluding'] = deb_version_fixed

new_entries[(cve_id, deb_source)] = DebCve(
dist=dist,
cve_id=cve_id,
cvss_severity=cvss_severity,
deb_source=deb_source,
deb_version=deb_version,
deb_version_fixed=deb_version_fixed,
Expand Down
29 changes: 29 additions & 0 deletions src/glvd/data/cvss.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# SPDX-License-Identifier: MIT

from __future__ import annotations

import enum


@enum.verify(enum.UNIQUE)
class CvssSeverity(enum.Enum):
NONE = 0
UNIMPORTANT = 1
LOW = 2
MEDIUM = 3
HIGH = 4
CRITICAL = 5

@classmethod
def from_score(cls, score: int) -> CvssSeverity:
if score < 0 or score > 10:
raise ValueError
if score >= 9:
return cls.CRITICAL
if score >= 7:
return cls.HIGH
if score >= 4:
return cls.MEDIUM
if score > 0:
return cls.LOW
return cls.NONE
9 changes: 8 additions & 1 deletion src/glvd/database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,19 @@
Text,
)

from .types import DebVersion
from ..data.cvss import CvssSeverity
from .types import (
CvssSeverityType,
DebVersion,
)


class Base(MappedAsDataclass, DeclarativeBase):
type_annotation_map = {
str: Text,
datetime: DateTime(timezone=True),
Any: JSON,
CvssSeverity: CvssSeverityType,
}


Expand Down Expand Up @@ -95,6 +100,7 @@ class DebCve(Base):
dist_id = mapped_column(ForeignKey(DistCpe.id), primary_key=True)
cve_id: Mapped[str] = mapped_column(primary_key=True)
last_mod: Mapped[datetime] = mapped_column(init=False, server_default=func.now(), onupdate=func.now())
cvss_severity: Mapped[Optional[CvssSeverity]] = mapped_column()
deb_source: Mapped[str] = mapped_column(primary_key=True)
deb_version: Mapped[str] = mapped_column(DebVersion)
deb_version_fixed: Mapped[Optional[str]] = mapped_column(DebVersion)
Expand All @@ -114,6 +120,7 @@ class DebCve(Base):
)

def merge(self, other: Self) -> None:
self.cvss_severity = other.cvss_severity
self.deb_version = other.deb_version
self.deb_version_fixed = other.deb_version_fixed
self.debsec_vulnerable = other.debsec_vulnerable
Expand Down
23 changes: 22 additions & 1 deletion src/glvd/database/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,28 @@

from __future__ import annotations

from sqlalchemy.types import UserDefinedType
from sqlalchemy.types import (
INT,
TypeDecorator,
UserDefinedType,
)

from ..data.cvss import CvssSeverity


class CvssSeverityType(TypeDecorator):
cache_ok = True
impl = INT

def process_bind_param(self, value: CvssSeverity | None, dialect) -> int | None:
if value is not None:
return value.value
return None

def process_result_value(self, value: int | None, dialect) -> CvssSeverity | None:
if value is not None:
return CvssSeverity(value)
return None


class DebVersion(UserDefinedType):
Expand Down
74 changes: 68 additions & 6 deletions tests/cli/test_combine_deb.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
# SPDX-License-Identifier: MIT

from datetime import datetime

from sqlalchemy import select

from glvd.cli.combine_deb import CombineDeb
from glvd.data.cvss import CvssSeverity
from glvd.data.dist_cpe import DistCpeMapper
from glvd.database import DistCpe, DebCve, DebsecCve, Debsrc
from glvd.database import DistCpe, DebCve, DebsecCve, Debsrc, NvdCve


class TestIngestDebsrc:
dist_mapper = DistCpeMapper.new('debian')

def debsec_cve(self, *, deb_version_fixed: str, dist: DistCpe) -> DebsecCve:
def debsec_cve(self, *, deb_version_fixed: str, debsec_note: str | None, dist: DistCpe) -> DebsecCve:
return DebsecCve(
cve_id='TEST-1',
deb_source='test',
deb_version_fixed=deb_version_fixed,
debsec_note=debsec_note,
dist=dist,
)

Expand All @@ -25,12 +29,29 @@ def debsrc(self, *, dist: DistCpe) -> Debsrc:
dist=dist,
)

def nvd_cve(self) -> NvdCve:
return NvdCve(
cve_id='TEST-1',
last_mod=datetime.now(),
data={
'metrics': {
'cvssMetricV31': [{
'type': 'Primary',
'cvssData': {
'baseSeverity': 'MEDIUM',
},
}],
},
},
)

async def test_combine_base(self, db_session):
dist_test = self.dist_mapper('trixie')

db_session.add(dist_test)
db_session.add(self.debsec_cve(deb_version_fixed='1', dist=dist_test))
db_session.add(self.debsec_cve(deb_version_fixed='1', debsec_note=None, dist=dist_test))
db_session.add(self.debsrc(dist=dist_test))
db_session.add(self.nvd_cve())
await db_session.flush()

combine = CombineDeb()
Expand All @@ -40,12 +61,14 @@ async def test_combine_base(self, db_session):
assert len(r) == 1
t = r.pop(0)[0]
assert t.dist == dist_test
assert t.cvss_severity is CvssSeverity.MEDIUM
assert t.deb_source == 'test'
assert t.deb_version == '1'
assert t.debsec_vulnerable is False
assert t.data_cpe_match == {
'criteria': r'cpe:2.3:o:debian:debian_linux:13:*:*:*:*:*:*:deb_source\=test',
'deb': {
'cvssSeverity': 'MEDIUM',
'versionEndExcluding': '1',
'versionLatest': '1',
},
Expand All @@ -56,8 +79,9 @@ async def test_combine_base_vulnerable(self, db_session):
dist_test = self.dist_mapper('trixie')

db_session.add(dist_test)
db_session.add(self.debsec_cve(deb_version_fixed='2', dist=dist_test))
db_session.add(self.debsec_cve(deb_version_fixed='2', debsec_note=None, dist=dist_test))
db_session.add(self.debsrc(dist=dist_test))
db_session.add(self.nvd_cve())
await db_session.flush()

combine = CombineDeb()
Expand All @@ -67,12 +91,14 @@ async def test_combine_base_vulnerable(self, db_session):
assert len(r) == 1
t = r.pop(0)[0]
assert t.dist == dist_test
assert t.cvss_severity is CvssSeverity.MEDIUM
assert t.deb_source == 'test'
assert t.deb_version == '1'
assert t.debsec_vulnerable is True
assert t.data_cpe_match == {
'criteria': r'cpe:2.3:o:debian:debian_linux:13:*:*:*:*:*:*:deb_source\=test',
'deb': {
'cvssSeverity': 'MEDIUM',
'versionEndExcluding': '2',
'versionLatest': '1',
},
Expand All @@ -84,8 +110,9 @@ async def test_combine_fallback(self, db_session):
dist_test = self.dist_mapper('trixie')

db_session.add(dist_test)
db_session.add(self.debsec_cve(deb_version_fixed='1', dist=dist_fallback))
db_session.add(self.debsec_cve(deb_version_fixed='1', debsec_note=None, dist=dist_fallback))
db_session.add(self.debsrc(dist=dist_test))
db_session.add(self.nvd_cve())
await db_session.flush()

combine = CombineDeb()
Expand All @@ -95,12 +122,14 @@ async def test_combine_fallback(self, db_session):
assert len(r) == 1
t = r.pop(0)[0]
assert t.dist == dist_test
assert t.cvss_severity is CvssSeverity.MEDIUM
assert t.deb_source == 'test'
assert t.deb_version == '1'
assert t.debsec_vulnerable is False
assert t.data_cpe_match == {
'criteria': r'cpe:2.3:o:debian:debian_linux:13:*:*:*:*:*:*:deb_source\=test',
'deb': {
'cvssSeverity': 'MEDIUM',
'versionEndExcluding': '1',
'versionLatest': '1',
},
Expand All @@ -112,8 +141,9 @@ async def test_combine_fallback_vulnerable(self, db_session):
dist_test = self.dist_mapper('trixie')

db_session.add(dist_test)
db_session.add(self.debsec_cve(deb_version_fixed='2', dist=dist_fallback))
db_session.add(self.debsec_cve(deb_version_fixed='2', debsec_note=None, dist=dist_fallback))
db_session.add(self.debsrc(dist=dist_test))
db_session.add(self.nvd_cve())
await db_session.flush()

combine = CombineDeb()
Expand All @@ -123,14 +153,46 @@ async def test_combine_fallback_vulnerable(self, db_session):
assert len(r) == 1
t = r.pop(0)[0]
assert t.dist == dist_test
assert t.cvss_severity is CvssSeverity.MEDIUM
assert t.deb_source == 'test'
assert t.deb_version == '1'
assert t.debsec_vulnerable is True
assert t.data_cpe_match == {
'criteria': r'cpe:2.3:o:debian:debian_linux:13:*:*:*:*:*:*:deb_source\=test',
'deb': {
'cvssSeverity': 'MEDIUM',
'versionEndExcluding': '2',
'versionLatest': '1',
},
'vulnerable': True,
}

async def test_combine_unimportant(self, db_session):
dist_test = self.dist_mapper('trixie')

db_session.add(dist_test)
db_session.add(self.debsec_cve(deb_version_fixed='1', debsec_note='unimportant', dist=dist_test))
db_session.add(self.debsrc(dist=dist_test))
db_session.add(self.nvd_cve())
await db_session.flush()

combine = CombineDeb()
await combine.combine(db_session)

r = (await db_session.execute(select(DebCve).order_by(DebCve.deb_source))).all()
assert len(r) == 1
t = r.pop(0)[0]
assert t.dist == dist_test
assert t.cvss_severity is CvssSeverity.UNIMPORTANT
assert t.deb_source == 'test'
assert t.deb_version == '1'
assert t.debsec_vulnerable is False
assert t.data_cpe_match == {
'criteria': r'cpe:2.3:o:debian:debian_linux:13:*:*:*:*:*:*:deb_source\=test',
'deb': {
'cvssSeverity': 'UNIMPORTANT',
'versionEndExcluding': '1',
'versionLatest': '1',
},
'vulnerable': False,
}
Loading

0 comments on commit 5eeed09

Please sign in to comment.