Skip to content

Commit

Permalink
Merge pull request #16 from gardenlinux/web-search-cpe
Browse files Browse the repository at this point in the history
Search CVE by Debian dist and source
  • Loading branch information
waldiTM authored Nov 30, 2023
2 parents 6500ef9 + 4776244 commit 8ed58d6
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 18 deletions.
32 changes: 21 additions & 11 deletions src/glvd/data/cpe.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,29 @@ class CpeOtherDebian:
deb_version: str | None = dataclasses.field(default=None)

@classmethod
def parse(cls, cpe: Cpe) -> str | CpeOtherDebian | CpeAny | None:
if not (cpe.part, cpe.vendor, cpe.product) in [
(CpePart.OS, 'debian', 'debian_linux'),
(CpePart.OS, 'sap', 'gardenlinux'),
] or not isinstance(cpe.other, str):
return cpe.other

def parse(cls, i: str) -> str | CpeOtherDebian | CpeAny | None:
try:
kw: dict[str, str | None] = {}
for f in cpe.other.split(','):
for f in i.split(','):
k, v = f.split('=', 1)
kw[k] = v
return cls(**kw)
except ValueError:
return cpe.other
return cls()

def __str__(self) -> str:
m: list[str] = []
allany: bool = True

for field in dataclasses.fields(self):
if j := getattr(self, field.name):
m.append(f'{field.name}={j}')
return ','.join(m)
allany = False

if allany:
return '*'
else:
return ','.join(m)


@dataclasses.dataclass
Expand All @@ -64,6 +65,7 @@ class Cpe:
target_sw: str | CpeAny | None = dataclasses.field(default=ANY)
target_hw: str | CpeAny | None = dataclasses.field(default=ANY)
other: str | CpeOtherDebian | CpeAny | None = dataclasses.field(default=ANY)
is_debian: bool = dataclasses.field(init=False, default=False)

__re = re.compile(r'''
^cpe:2.3:
Expand Down Expand Up @@ -95,7 +97,15 @@ class Cpe:
__re_unquote = re.compile(r'''\\([!"#$%&'()+,/:;<=>@[\]^`{|}~])''')

def __post_init__(self) -> None:
self.other = CpeOtherDebian.parse(self)
if (self.part, self.vendor, self.product) in [
(CpePart.OS, 'debian', 'debian_linux'),
(CpePart.OS, 'sap', 'gardenlinux'),
]:
self.is_debian = True
if isinstance(self.other, str):
self.other = CpeOtherDebian.parse(self.other)
elif self.other is self.ANY:
self.other = CpeOtherDebian()

@classmethod
def _parse_one(cls, field: dataclasses.Field, v: str, /) -> Any:
Expand Down
17 changes: 15 additions & 2 deletions src/glvd/database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
Self,
)

from sqlalchemy import ForeignKey
from sqlalchemy import (
ForeignKey,
Index,
)
from sqlalchemy.orm import (
DeclarativeBase,
Mapped,
Expand Down Expand Up @@ -94,11 +97,21 @@ class DebCve(Base):
last_mod: Mapped[datetime] = mapped_column(init=False, server_default=func.now(), onupdate=func.now())
deb_source: Mapped[str] = mapped_column(primary_key=True)
deb_version: Mapped[str] = mapped_column(DebVersion)
debsec_vulnerable: Mapped[bool]
debsec_vulnerable: Mapped[bool] = mapped_column()
data_cpe_match: Mapped[Any]

dist: Mapped[Optional[DistCpe]] = relationship(lazy='selectin', default=None)

__table_args__ = (
Index(
'deb_cve_search',
dist_id,
debsec_vulnerable,
deb_source,
deb_version,
),
)

def merge(self, other: Self) -> None:
self.deb_version = other.deb_version
self.debsec_vulnerable = other.debsec_vulnerable
Expand Down
60 changes: 55 additions & 5 deletions src/glvd/web/nvd.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,60 @@
text,
)

from ..data.cpe import Cpe

bp = Blueprint('nvd', __name__)


# XXX: Can we replace that with a view, which combines data and data_configurations in the database?
stmt_cve_deb_cpe = (
text('''
SELECT
nvd_cve.data,
array_to_json(
array_remove(
array_agg(deb_cve.data_cpe_match),
NULL
)
) AS data_cpe_matches
FROM
nvd_cve
LEFT OUTER JOIN deb_cve USING (cve_id)
INNER JOIN dist_cpe ON (deb_cve.dist_id = dist_cpe.id)
WHERE
dist_cpe.cpe_vendor = :cpe_vendor AND
dist_cpe.cpe_product = :cpe_product AND
dist_cpe.cpe_version = :cpe_version AND
deb_cve.deb_source LIKE :deb_source AND
deb_cve.debsec_vulnerable = TRUE
GROUP BY
nvd_cve.cve_id
''')
.bindparams(
bindparam('cpe_vendor'),
bindparam('cpe_product'),
bindparam('cpe_version'),
bindparam('deb_source'),
)
)

stmt_cve_deb_cve_id = (
text('''
SELECT
nvd_cve.data
, array_to_json(array_remove(array_agg(deb_cve.data_cpe_match), NULL)) AS data_cpe_matchess
nvd_cve.data,
array_to_json(
array_remove(
array_agg(deb_cve.data_cpe_match),
NULL
)
) AS data_cpe_matches
FROM
nvd_cve
LEFT OUTER JOIN deb_cve USING (cve_id)
WHERE cve_id = :cve_id
GROUP BY (nvd_cve.data)
WHERE
cve_id = :cve_id
GROUP BY
nvd_cve.cve_id
''')
.bindparams(
bindparam('cve_id'),
Expand All @@ -29,7 +69,17 @@

@bp.route('/rest/json/cves/2.0+deb')
async def nvd_cve_deb():
if cve_id := request.args.get('cveId', type=str):
if cpe_name := request.args.get('cpeName', type=str):
cpe = Cpe.parse(cpe_name)
if not cpe.is_debian:
return 'Not Debian related CPE', 400
stmt = stmt_cve_deb_cpe.bindparams(
cpe_vendor=cpe.vendor,
cpe_product=cpe.product,
cpe_version=cpe.version,
deb_source=cpe.other.deb_source or '%',
)
elif cve_id := request.args.get('cveId', type=str):
stmt = stmt_cve_deb_cve_id.bindparams(cve_id=cve_id)

async with current_app.db_begin() as conn:
Expand Down
12 changes: 12 additions & 0 deletions tests/data/test_cpe.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def test_init(self):
assert c.target_sw is Cpe.ANY
assert c.target_hw is Cpe.ANY
assert c.other is Cpe.ANY
assert c.is_debian is False

def test_parse(self):
s = r'cpe:2.3:h:a:b:c\:\%\*\;c:d:*:-:-:-:*:*'
Expand All @@ -32,6 +33,7 @@ def test_parse(self):
assert c.target_sw is None
assert c.target_hw is Cpe.ANY
assert c.other is Cpe.ANY
assert c.is_debian is False
assert str(c) == s

def test_debian(self):
Expand All @@ -49,4 +51,14 @@ def test_debian(self):
assert c.target_hw is Cpe.ANY
assert c.other.deb_source == 'hello'
assert c.other.deb_version == '1'
assert c.is_debian is True
assert str(c) == s

def test_debian_any(self):
s = r'cpe:2.3:o:debian:debian_linux:12:d:*:*:*:*:*:*'
c = Cpe.parse(s)

assert c.other.deb_source is None
assert c.other.deb_version is None
assert c.is_debian is True
assert str(c) == s

0 comments on commit 8ed58d6

Please sign in to comment.