Skip to content

Commit

Permalink
Merge pull request #57 from gardenlinux/client-cve-apt
Browse files Browse the repository at this point in the history
Add client tool to get CVE for installed packages
  • Loading branch information
waldiTM authored Dec 19, 2023
2 parents 36ae5bf + f52b296 commit 52a8363
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 15 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ asyncio_mode = "auto"

[[tool.mypy.overrides]]
module = [
"apt",
"requests",
"requests.adapters",
"urllib3"
Expand Down
1 change: 1 addition & 0 deletions src/glvd/cli/client/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
# Import to register all the commands
from . import ( # noqa: F401
cve,
cve_apt,
)


Expand Down
91 changes: 91 additions & 0 deletions src/glvd/cli/client/cve_apt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# SPDX-License-Identifier: MIT

from __future__ import annotations

import argparse
import json
import logging
import sys
import urllib.parse

from glvd.data.cvss import CvssSeverity
from glvd.util import requests
from . import cli


logger = logging.getLogger(__name__)


class ClientCveApt:
server: str
cvss3_severity_min: CvssSeverity

@staticmethod
@cli.register(
'cve-apt',
arguments=[
cli.prepare_argument(
'--cvss3-severity-min',
choices=[i.name for i in CvssSeverity if i != CvssSeverity.NONE],
default='LOW',
help='only return CVE with at least this CVSS severity',
),
],
)
def run(*, argparser: argparse.ArgumentParser, cvss3_severity_min: str, server: str, debug: bool) -> None:
logging.basicConfig(level=debug and logging.DEBUG or logging.INFO)

# Python-Apt is no PYPI reachable dependency, it only exists on Debian systems as package.
try:
import apt # noqa: F401
except ImportError:
argparser.error('please install python3-apt')

ClientCveApt(server, CvssSeverity[cvss3_severity_min])()

def __init__(self, server: str, cvss3_severity_min: CvssSeverity) -> None:
self.server = server
self.cvss3_severity_min = cvss3_severity_min

def get_sources(self) -> set[tuple[str, str, str, str]]:
import apt

ret = set()

for pkg in apt.Cache():
if inst := pkg.installed:
for origin in inst.origins:
if origin.origin:
ret.add((origin.origin, origin.codename, inst.source_name, inst.source_version))

return ret

def request_data(self) -> dict[str, list[str]]:
return {
'source[]': [
'_'.join(i)
for i in sorted(self.get_sources())
],
}

def request_params(self) -> dict[str, str]:
return {
'cvssV3SeverityMin': self.cvss3_severity_min.name,
}

def __call__(self) -> None:
with requests.RetrySession() as rsession:
resp = rsession.post(
urllib.parse.urljoin(self.server, 'v1/cves/findBySources'),
params=self.request_params(),
data=self.request_data(),
)
if resp.status_code == 200:
data = resp.json()
json.dump(data, sys.stdout, indent=2)
else:
resp.raise_for_status()


if __name__ == '__main__':
ClientCveApt.run()
35 changes: 20 additions & 15 deletions src/glvd/web/v1_cves.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,22 +184,27 @@ async def get_sources() -> tuple[Any, int, dict[str, str]]:
# can remove the enormous time spent on compiling queries
stmts_source = []
for i, j in source_by_dist.items():
dist = DistCpeMapper.new(i[0])(i[1])
dist_id = (await conn.execute(
sa.select(DistCpe.id)
.where(DistCpe.cpe_vendor == dist.cpe_vendor)
.where(DistCpe.cpe_product == dist.cpe_product)
.where(DistCpe.cpe_version == dist.cpe_version)
)).one()[0]

for source, version in j:
stmts_source.append(
sa.select(
sa.literal(dist_id).label('dist_id'),
sa.literal(source).label('deb_source'),
sa.cast(sa.literal(version), DebVersion).label('deb_version'),
try:
dist = DistCpeMapper.new(i[0].lower())(i[1].lower())
except KeyError:
# XXX: How to handle unknown distributions?
pass
else:
dist_id = (await conn.execute(
sa.select(DistCpe.id)
.where(DistCpe.cpe_vendor == dist.cpe_vendor)
.where(DistCpe.cpe_product == dist.cpe_product)
.where(DistCpe.cpe_version == dist.cpe_version)
)).one()[0]

for source, version in j:
stmts_source.append(
sa.select(
sa.literal(dist_id).label('dist_id'),
sa.literal(source).label('deb_source'),
sa.cast(sa.literal(version), DebVersion).label('deb_version'),
)
)
)

# If we found no source at all
if not stmts_source:
Expand Down

0 comments on commit 52a8363

Please sign in to comment.