diff --git a/openapi-v1.yaml b/openapi-v1.yaml new file mode 100644 index 0000000..c083b31 --- /dev/null +++ b/openapi-v1.yaml @@ -0,0 +1,67 @@ +openapi: 3.0.3 +info: + title: Garden Linux Vulnerability Database + contact: + email: gardenlinux@gardenlinux.io + license: + name: MIT + version: 0.1.0 +servers: +- url: http://localhost:5000/v1 +tags: +- name: cve + description: Everything about CVE +paths: + /cves/findByCpe: + get: + tags: + - cve + summary: Finds CVE by CPE + parameters: + - name: cpeName + in: query + description: CPE name to search for, only Debian/Garden Linux related CPE can be used + required: true + schema: + type: string + - name: cvssV3Severity + in: query + description: Not implemented + schema: + type: string + enum: + - LOW + - MEDIUM + - HIGH + - CRITICAL + - name: debVersionEnd + in: query + schema: + type: string + responses: + '200': + description: successful operation + content: + application/json: + schema: {} + '400': + description: parameter is invalid + /cves/{cveId}: + get: + tags: + - cve + summary: Find CVE by ID + parameters: + - name: cveId + in: path + description: ID of CVE + required: true + schema: + type: string + responses: + '200': + description: successful operation + content: + application/json: {} + '404': + description: CVE not found diff --git a/src/glvd/web/__init__.py b/src/glvd/web/__init__.py index 5bc17e0..8d8c034 100644 --- a/src/glvd/web/__init__.py +++ b/src/glvd/web/__init__.py @@ -41,7 +41,7 @@ def create_app(): QuartDb(app) - from .nvd import bp as bp_nvd - app.register_blueprint(bp_nvd) + from .v1_cves import bp as bp_v1_cves + app.register_blueprint(bp_v1_cves) return app diff --git a/src/glvd/web/nvd.py b/src/glvd/web/v1_cves.py similarity index 51% rename from src/glvd/web/nvd.py rename to src/glvd/web/v1_cves.py index 0efcffb..0b83c04 100644 --- a/src/glvd/web/nvd.py +++ b/src/glvd/web/v1_cves.py @@ -8,10 +8,26 @@ from ..data.cpe import Cpe -bp = Blueprint('nvd', __name__) +bp = Blueprint('nvd', __name__, url_prefix='/v1/cves') -stmt_cve_deb_cpe_version = ( +stmt_cve_id = ( + text(''' + SELECT + all_cve.data AS cve + FROM + all_cve + WHERE + cve_id = :cve_id + GROUP BY + all_cve.cve_id + ''') + .bindparams( + bindparam('cve_id'), + ) +) + +stmt_cpe_version = ( text(''' WITH data AS ( SELECT @@ -33,12 +49,7 @@ all_cve.cve_id ) SELECT - json_build_object( - 'format', 'NVD_CVE', - 'version', '2.0+deb', - 'vulnerabilities', coalesce(json_agg(data), '[]'::json) - )::text - FROM data + coalesce(json_agg(data.cve), '[]'::json)::text FROM data ''') .bindparams( bindparam('cpe_vendor'), @@ -49,7 +60,7 @@ ) ) -stmt_cve_deb_cpe_vulnerable = ( +stmt_cpe_vulnerable = ( text(''' WITH data AS ( SELECT @@ -68,12 +79,7 @@ all_cve.cve_id ) SELECT - json_build_object( - 'format', 'NVD_CVE', - 'version', '2.0+deb', - 'vulnerabilities', coalesce(json_agg(data), '[]'::json) - )::text - FROM data + coalesce(json_agg(data.cve), '[]'::json)::text FROM data ''') .bindparams( bindparam('cpe_vendor'), @@ -83,55 +89,43 @@ ) ) -stmt_cve_deb_cve_id = ( - text(''' - WITH data AS ( - SELECT - all_cve.data AS cve - FROM - all_cve - WHERE - cve_id = :cve_id - GROUP BY - all_cve.cve_id - ) - SELECT - json_build_object( - 'format', 'NVD_CVE', - 'version', '2.0+deb', - 'vulnerabilities', coalesce(json_agg(data), '[]'::json) - )::text - FROM data - ''') - .bindparams( - bindparam('cve_id'), - ) -) +@bp.route('/') +async def get_cve_id(cve_id): + stmt = stmt_cve_id.bindparams(cve_id=cve_id) -@bp.route('/rest/json/cves/2.0+deb') -async def nvd_cve_deb(): - if cpe_name := request.args.get('virtualMatchString', type=str): - cpe = Cpe.parse(cpe_name) - if not cpe.is_debian: - return 'Not Debian related CPE', 400 - if cpe.other.deb_source and (deb_version := request.args.get('debVersionEnd', type=str)): - stmt = stmt_cve_deb_cpe_version.bindparams( - cpe_vendor=cpe.vendor, - cpe_product=cpe.product, - cpe_version=cpe.version or '%', - deb_source=cpe.other.deb_source, - deb_version=deb_version, - ) + async with current_app.db_begin() as conn: + data = (await conn.execute(stmt)).one_or_none() + + if data: + return data[0], 200 else: - stmt = stmt_cve_deb_cpe_vulnerable.bindparams( - cpe_vendor=cpe.vendor, - cpe_product=cpe.product, - cpe_version=cpe.version or '%', - deb_source=cpe.other.deb_source or '%', - ) - elif cve_id := request.args.get('cveId', type=str): - stmt = stmt_cve_deb_cve_id.bindparams(cve_id=cve_id) + return 'Not found', 404 + + +@bp.route('/findByCpe') +async def get_cpe_name(): + cpe = Cpe.parse(request.args.get('cpeName', type=str)) + deb_version = request.args.get('debVersionEnd', type=str) + + if not cpe.is_debian: + return 'Not Debian related CPE', 400 + + if cpe.other.deb_source and deb_version: + stmt = stmt_cpe_version.bindparams( + cpe_vendor=cpe.vendor, + cpe_product=cpe.product, + cpe_version=cpe.version or '%', + deb_source=cpe.other.deb_source, + deb_version=deb_version, + ) + else: + stmt = stmt_cpe_vulnerable.bindparams( + cpe_vendor=cpe.vendor, + cpe_product=cpe.product, + cpe_version=cpe.version or '%', + deb_source=cpe.other.deb_source or '%', + ) async with current_app.db_begin() as conn: return ( diff --git a/tests/web/test_nvd_cve.py b/tests/web/test_nvd_cve.py deleted file mode 100644 index f0fcee2..0000000 --- a/tests/web/test_nvd_cve.py +++ /dev/null @@ -1,56 +0,0 @@ -# SPDX-License-Identifier: MIT - -import pytest - -import json - -from glvd.database import AllCve - - -class TestNvdCve: - @pytest.fixture(autouse=True, scope='class') - async def setup_example(self, db_session_class): - for i in range(2): - db_session_class.add(AllCve( - cve_id=f'TEST-{i}', - data={ - 'id': f'TEST-{i}', - }, - )) - await db_session_class.flush() - - async def test_deb_cveid_simple(self, client): - resp = await client.get( - '/rest/json/cves/2.0+deb', - query_string={ - 'cveId': 'TEST-0', - }, - ) - - assert resp.status_code == 200 - assert json.loads((await resp.data)) == { - 'format': 'NVD_CVE', - 'version': '2.0+deb', - 'vulnerabilities': [ - { - 'cve': { - 'id': 'TEST-0', - }, - }, - ], - } - - async def test_deb_cveid_nonexist(self, client): - resp = await client.get( - '/rest/json/cves/2.0+deb', - query_string={ - 'cveId': 'TEST-NONEXIST', - }, - ) - - assert resp.status_code == 200 - assert json.loads((await resp.data)) == { - 'format': 'NVD_CVE', - 'version': '2.0+deb', - 'vulnerabilities': [] - } diff --git a/tests/web/test_v1_cves.py b/tests/web/test_v1_cves.py new file mode 100644 index 0000000..6fedf9b --- /dev/null +++ b/tests/web/test_v1_cves.py @@ -0,0 +1,113 @@ +# SPDX-License-Identifier: MIT + +import pytest + +import json + +from glvd.database import AllCve, DebCve, DistCpe + + +class TestCveId: + @pytest.fixture(autouse=True, scope='class') + async def setup_example(self, db_session_class): + db_session_class.add(AllCve( + cve_id='TEST-0', + data={ + 'id': 'TEST-0', + }, + )) + await db_session_class.flush() + + async def test_simple(self, client): + resp = await client.get( + '/v1/cves/TEST-0', + ) + + assert resp.status_code == 200 + assert json.loads((await resp.data)) == { + 'id': 'TEST-0', + } + + async def test_nonexist(self, client): + resp = await client.get( + '/v1/cves/TEST-NONEXIST', + ) + + assert resp.status_code == 404 + + +class TestCpeName: + @pytest.fixture(autouse=True, scope='class') + async def setup_example(self, db_session_class): + for i in ('TEST-fixed', 'TEST-vuln'): + db_session_class.add(AllCve( + cve_id=i, + data={ + 'id': i, + }, + )) + self.dist = DistCpe( + cpe_vendor='debian', + cpe_product='debian_linux', + cpe_version='13', + deb_codename='trixie', + ) + db_session_class.add(self.dist) + db_session_class.add(DebCve( + cve_id='TEST-fixed', + deb_source='test', + deb_version='1', + deb_version_fixed='1', + debsec_vulnerable=False, + dist=self.dist, + data_cpe_match={}, + )) + db_session_class.add(DebCve( + cve_id='TEST-vuln', + deb_source='test', + deb_version='1', + deb_version_fixed='2', + debsec_vulnerable=True, + dist=self.dist, + data_cpe_match={}, + )) + await db_session_class.flush() + + async def test_simple(self, client): + resp = await client.get( + r'/v1/cves/findByCpe?cpeName=cpe:2.3:o:debian:debian_linux:13:*:*:*:*:*:*:*', + ) + + assert resp.status_code == 200 + assert {i['id'] for i in json.loads((await resp.data))} == { + 'TEST-vuln', + } + + async def test_source(self, client): + resp = await client.get( + r'/v1/cves/findByCpe?cpeName=cpe:2.3:o:debian:debian_linux:13:*:*:*:*:*:*:deb_source\=test', + ) + + assert resp.status_code == 200 + assert {i['id'] for i in json.loads((await resp.data))} == { + 'TEST-vuln', + } + + async def test_version(self, client): + resp = await client.get( + r'/v1/cves/findByCpe?cpeName=cpe:2.3:o:debian:debian_linux:13:*:*:*:*:*:*:deb_source\=test&debVersionEnd=0', + ) + + assert resp.status_code == 200 + assert {i['id'] for i in json.loads((await resp.data))} == { + 'TEST-fixed', + 'TEST-vuln', + } + + async def test_nonexist(self, client): + resp = await client.get( + r'/v1/cves/findByCpe?cpeName=cpe:2.3:o:debian:debian_linux:14:*:*:*:*:*:*:*', + ) + + assert resp.status_code == 200 + assert json.loads((await resp.data)) == []