Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to search by source packages #26

Merged
merged 1 commit into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions openapi-v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,47 @@ paths:
schema: {}
'400':
description: parameter is invalid
/cves/findBySources:
post:
tags:
- cve
summary: Finds CVE by source packages
parameters:
- name: cvssV3Severity
in: query
description: Not implemented
schema:
type: string
enum:
- LOW
- MEDIUM
- HIGH
- CRITICAL
requestBody:
required: true
content:
application/x-www-form-urlencoded:
schema:
type: object
properties:
source[]:
type: array
items:
type: string
example:
- 'debian_bookworm_glibc_2.36-9+deb12u3'
encoding:
source[]:
style: form
explode: true
responses:
'200':
description: successful operation
content:
application/json:
schema: {}
'400':
description: parameter is invalid
/cves/{cveId}:
get:
tags:
Expand Down
96 changes: 95 additions & 1 deletion src/glvd/web/v1_cves.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
# SPDX-License-Identifier: MIT

from typing import Any
from typing import (
Any,
cast,
)

from quart import Blueprint, current_app, request
import sqlalchemy as sa
from sqlalchemy import (
bindparam,
text,
)

from ..database import AllCve, DebCve, DistCpe
from ..database.types import DebVersion
from ..data.cpe import Cpe
from ..data.dist_cpe import DistCpeMapper

bp = Blueprint('nvd', __name__, url_prefix='/v1/cves')

Expand Down Expand Up @@ -91,6 +98,10 @@
)
)

headers_cors: dict[str, str] = {
'Access-Control-Allow-Origin': '*',
}


@bp.route('/<cve_id>')
async def get_cve_id(cve_id: str) -> tuple[Any, int]:
Expand Down Expand Up @@ -136,3 +147,86 @@ async def get_cpe_name() -> tuple[Any, int]:
(await conn.execute(stmt)).one()[0],
200
)


@bp.route('/findBySources', methods=['POST', 'OPTIONS'])
async def get_sources() -> tuple[Any, int, dict[str, str]]:
# Handle pre-flight request to allow CORS
if request.method == 'OPTIONS':
return ('', 204, headers_cors)

async with getattr(current_app, 'db_begin')() as conn:
# Aggregate by product/codename
source_by_dist: dict[tuple[str, str], set[tuple[str, str]]] = {}
for source in (await request.form).getlist('source[]', type=str):
s = cast(tuple[str, str, str, str], tuple(source.split('_', 4)))
source_by_dist.setdefault(s[0:2], set()).add(s[2:4])

# Create dynamic table (as CTE) to find many sources at the same time
# XXX: Replace with "real" temporary table, use COPY IN, check if this
# can remove the enormous time spent on compiling queries
stmts_source = []
for i, j in source_by_dist.items():
dist = DistCpeMapper.new(i[0])(i[1])
dist_id = (await conn.execute(
sa.select(DistCpe.id)
.where(DistCpe.cpe_vendor == dist.cpe_vendor)
.where(DistCpe.cpe_product == dist.cpe_product)
.where(DistCpe.cpe_version == dist.cpe_version)
)).one()[0]

for source, version in j:
stmts_source.append(
sa.select(
sa.literal(dist_id).label('dist_id'),
sa.literal(source).label('deb_source'),
sa.cast(sa.literal(version), DebVersion).label('deb_version'),
)
)

# If we found no source at all
if not stmts_source:
return ('', 400, headers_cors)

# We deduplicate source entries ourselves, so we can just ask the db to
# take all entries via "UNION ALL", instead of forcing it to first sort
# and remove duplicates by using "UNION"
subquery_source = sa.union_all(*stmts_source).cte(name='source')

# Find (unique) CVE ID for given sources
subquery_cve_id = (
sa.select(DebCve.cve_id)
.distinct()
.join(
subquery_source,
sa.and_(
DebCve.dist_id == subquery_source.c.dist_id,
DebCve.deb_source == subquery_source.c.deb_source,
sa.or_(
DebCve.deb_version_fixed > subquery_source.c.deb_version,
DebCve.deb_version_fixed.is_(None),
),
)
)
).cte()

# Find CVE data for given ID
subquery_cve_data = (
sa.select(AllCve.data)
.order_by(AllCve.cve_id)
.join(subquery_cve_id, AllCve.cve_id == subquery_cve_id.c.cve_id)
).cte()

# Generate JSON array
query = (
sa.select(
sa.func.coalesce(
# json_agg() creates a JSON array
sa.func.json_agg(subquery_cve_data.c.data),
# We never want NULL, so we generate an empty array
sa.text("'[]'::json")
)
)
)

return ((await conn.execute(query)).one()[0], 200, headers_cors)
Loading