From 320594607726c153140b75ae08ddb2621a2d71f2 Mon Sep 17 00:00:00 2001 From: Bastian Blank Date: Thu, 14 Dec 2023 16:19:36 +0100 Subject: [PATCH] Search by CVSS severity --- openapi-v1.yaml | 8 ++++---- setup.cfg | 1 + src/glvd/web/v1_cves.py | 20 +++++++++++++++++++- tests/web/test_v1_cves.py | 28 ++++++++++++++++++++++++++-- 4 files changed, 50 insertions(+), 7 deletions(-) diff --git a/openapi-v1.yaml b/openapi-v1.yaml index 42ce6a4..8944ea9 100644 --- a/openapi-v1.yaml +++ b/openapi-v1.yaml @@ -24,12 +24,12 @@ paths: required: true schema: type: string - - name: cvssV3Severity + - name: cvssV3SeverityMin in: query - description: Not implemented schema: type: string enum: + - UNIMPORTANT - LOW - MEDIUM - HIGH @@ -52,12 +52,12 @@ paths: - cve summary: Finds CVE by source packages parameters: - - name: cvssV3Severity + - name: cvssV3SeverityMin in: query - description: Not implemented schema: type: string enum: + - UNIMPORTANT - LOW - MEDIUM - HIGH diff --git a/setup.cfg b/setup.cfg index dfe6c60..4682f97 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,3 +1,4 @@ [flake8] ignore = E501 + W503 diff --git a/src/glvd/web/v1_cves.py b/src/glvd/web/v1_cves.py index d0a37cf..9c73d98 100644 --- a/src/glvd/web/v1_cves.py +++ b/src/glvd/web/v1_cves.py @@ -13,8 +13,9 @@ ) from ..database import AllCve, DebCve, DistCpe -from ..database.types import DebVersion +from ..database.types import CvssSeverityType, DebVersion from ..data.cpe import Cpe +from ..data.cvss import CvssSeverity from ..data.dist_cpe import DistCpeMapper bp = Blueprint('nvd', __name__, url_prefix='/v1/cves') @@ -49,6 +50,7 @@ dist_cpe.cpe_vendor = :cpe_vendor AND dist_cpe.cpe_product = :cpe_product AND dist_cpe.cpe_version LIKE :cpe_version AND + COALESCE(deb_cve.cvss_severity, 0) >= :cvss_severity_min AND deb_cve.deb_source = :deb_source AND ( deb_cve.deb_version_fixed > :deb_version OR @@ -64,6 +66,7 @@ bindparam('cpe_vendor'), bindparam('cpe_product'), bindparam('cpe_version'), + bindparam('cvss_severity_min', type_=CvssSeverityType), bindparam('deb_source'), bindparam('deb_version'), ) @@ -82,6 +85,7 @@ dist_cpe.cpe_vendor = :cpe_vendor AND dist_cpe.cpe_product = :cpe_product AND dist_cpe.cpe_version LIKE :cpe_version AND + COALESCE(deb_cve.cvss_severity, 0) >= :cvss_severity_min AND deb_cve.deb_source LIKE :deb_source AND deb_cve.debsec_vulnerable = TRUE ORDER BY @@ -94,6 +98,7 @@ bindparam('cpe_vendor'), bindparam('cpe_product'), bindparam('cpe_version'), + bindparam('cvss_severity_min', type_=CvssSeverityType), bindparam('deb_source'), ) ) @@ -126,11 +131,17 @@ async def get_cpe_name() -> tuple[Any, int]: if not cpe.is_debian: return 'Not Debian related CPE', 400 + cvss_severity_min = ( + request.args.get('cvssV3SeverityMin', type=CvssSeverity.__getitem__) + or CvssSeverity.NONE + ) + if cpe.other_debian.deb_source and deb_version: stmt = stmt_cpe_version.bindparams( cpe_vendor=cpe.vendor, cpe_product=cpe.product, cpe_version=cpe.version or '%', + cvss_severity_min=cvss_severity_min, deb_source=cpe.other_debian.deb_source, deb_version=deb_version, ) @@ -139,6 +150,7 @@ async def get_cpe_name() -> tuple[Any, int]: cpe_vendor=cpe.vendor, cpe_product=cpe.product, cpe_version=cpe.version or '%', + cvss_severity_min=cvss_severity_min, deb_source=cpe.other_debian.deb_source or '%', ) @@ -155,6 +167,11 @@ async def get_sources() -> tuple[Any, int, dict[str, str]]: if request.method == 'OPTIONS': return ('', 204, headers_cors) + if cvss_severity_min := request.args.get('cvssV3SeverityMin', type=CvssSeverity.__getitem__): + stmt_cvss_severity_min = DebCve.cvss_severity >= cvss_severity_min + else: + stmt_cvss_severity_min = sa.true() + async with getattr(current_app, 'db_begin')() as conn: # Aggregate by product/codename source_by_dist: dict[tuple[str, str], set[tuple[str, str]]] = {} @@ -206,6 +223,7 @@ async def get_sources() -> tuple[Any, int, dict[str, str]]: DebCve.deb_version_fixed > subquery_source.c.deb_version, DebCve.deb_version_fixed.is_(None), ), + stmt_cvss_severity_min, ) ) ).cte() diff --git a/tests/web/test_v1_cves.py b/tests/web/test_v1_cves.py index b739463..67ef436 100644 --- a/tests/web/test_v1_cves.py +++ b/tests/web/test_v1_cves.py @@ -5,6 +5,7 @@ import json from glvd.database import AllCve, DebCve, DistCpe +from glvd.data.cvss import CvssSeverity class TestCveId: @@ -39,7 +40,7 @@ async def test_nonexist(self, client): class TestCpeName: @pytest.fixture(autouse=True, scope='class') async def setup_example(self, db_session_class): - for i in ('TEST-fixed', 'TEST-vuln'): + for i in ('TEST-fixed', 'TEST-vuln', 'TEST-vuln-unimportant'): db_session_class.add(AllCve( cve_id=i, data={ @@ -65,7 +66,17 @@ async def setup_example(self, db_session_class): )) db_session_class.add(DebCve( cve_id='TEST-vuln', - cvss_severity=None, + cvss_severity=CvssSeverity.HIGH, + deb_source='test', + deb_version='1', + deb_version_fixed='2', + debsec_vulnerable=True, + dist=self.dist, + data_cpe_match={}, + )) + db_session_class.add(DebCve( + cve_id='TEST-vuln-unimportant', + cvss_severity=CvssSeverity.UNIMPORTANT, deb_source='test', deb_version='1', deb_version_fixed='2', @@ -83,6 +94,7 @@ async def test_simple(self, client): assert resp.status_code == 200 assert {i['id'] for i in json.loads((await resp.data))} == { 'TEST-vuln', + 'TEST-vuln-unimportant', } async def test_source(self, client): @@ -93,6 +105,7 @@ async def test_source(self, client): assert resp.status_code == 200 assert {i['id'] for i in json.loads((await resp.data))} == { 'TEST-vuln', + 'TEST-vuln-unimportant', } async def test_version(self, client): @@ -104,6 +117,7 @@ async def test_version(self, client): assert {i['id'] for i in json.loads((await resp.data))} == { 'TEST-fixed', 'TEST-vuln', + 'TEST-vuln-unimportant', } async def test_nonexist(self, client): @@ -113,3 +127,13 @@ async def test_nonexist(self, client): assert resp.status_code == 200 assert json.loads((await resp.data)) == [] + + async def test_severity(self, client): + resp = await client.get( + r'/v1/cves/findByCpe?cpeName=cpe:2.3:o:debian:debian_linux:13:*:*:*:*:*:*:*&cvssV3SeverityMin=MEDIUM', + ) + + assert resp.status_code == 200 + assert {i['id'] for i in json.loads((await resp.data))} == { + 'TEST-vuln', + }