diff --git a/openapi-v1.yaml b/openapi-v1.yaml index c083b31..42ce6a4 100644 --- a/openapi-v1.yaml +++ b/openapi-v1.yaml @@ -46,6 +46,47 @@ paths: schema: {} '400': description: parameter is invalid + /cves/findBySources: + post: + tags: + - cve + summary: Finds CVE by source packages + parameters: + - name: cvssV3Severity + in: query + description: Not implemented + schema: + type: string + enum: + - LOW + - MEDIUM + - HIGH + - CRITICAL + requestBody: + required: true + content: + application/x-www-form-urlencoded: + schema: + type: object + properties: + source[]: + type: array + items: + type: string + example: + - 'debian_bookworm_glibc_2.36-9+deb12u3' + encoding: + source[]: + style: form + explode: true + responses: + '200': + description: successful operation + content: + application/json: + schema: {} + '400': + description: parameter is invalid /cves/{cveId}: get: tags: diff --git a/src/glvd/web/v1_cves.py b/src/glvd/web/v1_cves.py index 734bc63..d0a37cf 100644 --- a/src/glvd/web/v1_cves.py +++ b/src/glvd/web/v1_cves.py @@ -1,14 +1,21 @@ # SPDX-License-Identifier: MIT -from typing import Any +from typing import ( + Any, + cast, +) from quart import Blueprint, current_app, request +import sqlalchemy as sa from sqlalchemy import ( bindparam, text, ) +from ..database import AllCve, DebCve, DistCpe +from ..database.types import DebVersion from ..data.cpe import Cpe +from ..data.dist_cpe import DistCpeMapper bp = Blueprint('nvd', __name__, url_prefix='/v1/cves') @@ -91,6 +98,10 @@ ) ) +headers_cors: dict[str, str] = { + 'Access-Control-Allow-Origin': '*', +} + @bp.route('/') async def get_cve_id(cve_id: str) -> tuple[Any, int]: @@ -136,3 +147,86 @@ async def get_cpe_name() -> tuple[Any, int]: (await conn.execute(stmt)).one()[0], 200 ) + + +@bp.route('/findBySources', methods=['POST', 'OPTIONS']) +async def get_sources() -> tuple[Any, int, dict[str, str]]: + # Handle pre-flight request to allow CORS + if request.method == 'OPTIONS': + return ('', 204, headers_cors) + + async with getattr(current_app, 'db_begin')() as conn: + # Aggregate by product/codename + source_by_dist: dict[tuple[str, str], set[tuple[str, str]]] = {} + for source in (await request.form).getlist('source[]', type=str): + s = cast(tuple[str, str, str, str], tuple(source.split('_', 4))) + source_by_dist.setdefault(s[0:2], set()).add(s[2:4]) + + # Create dynamic table (as CTE) to find many sources at the same time + # XXX: Replace with "real" temporary table, use COPY IN, check if this + # can remove the enormous time spent on compiling queries + stmts_source = [] + for i, j in source_by_dist.items(): + dist = DistCpeMapper.new(i[0])(i[1]) + dist_id = (await conn.execute( + sa.select(DistCpe.id) + .where(DistCpe.cpe_vendor == dist.cpe_vendor) + .where(DistCpe.cpe_product == dist.cpe_product) + .where(DistCpe.cpe_version == dist.cpe_version) + )).one()[0] + + for source, version in j: + stmts_source.append( + sa.select( + sa.literal(dist_id).label('dist_id'), + sa.literal(source).label('deb_source'), + sa.cast(sa.literal(version), DebVersion).label('deb_version'), + ) + ) + + # If we found no source at all + if not stmts_source: + return ('', 400, headers_cors) + + # We deduplicate source entries ourselves, so we can just ask the db to + # take all entries via "UNION ALL", instead of forcing it to first sort + # and remove duplicates by using "UNION" + subquery_source = sa.union_all(*stmts_source).cte(name='source') + + # Find (unique) CVE ID for given sources + subquery_cve_id = ( + sa.select(DebCve.cve_id) + .distinct() + .join( + subquery_source, + sa.and_( + DebCve.dist_id == subquery_source.c.dist_id, + DebCve.deb_source == subquery_source.c.deb_source, + sa.or_( + DebCve.deb_version_fixed > subquery_source.c.deb_version, + DebCve.deb_version_fixed.is_(None), + ), + ) + ) + ).cte() + + # Find CVE data for given ID + subquery_cve_data = ( + sa.select(AllCve.data) + .order_by(AllCve.cve_id) + .join(subquery_cve_id, AllCve.cve_id == subquery_cve_id.c.cve_id) + ).cte() + + # Generate JSON array + query = ( + sa.select( + sa.func.coalesce( + # json_agg() creates a JSON array + sa.func.json_agg(subquery_cve_data.c.data), + # We never want NULL, so we generate an empty array + sa.text("'[]'::json") + ) + ) + ) + + return ((await conn.execute(query)).one()[0], 200, headers_cors)