Skip to content

Commit

Permalink
Merge pull request #26 from gardenlinux/find-sources
Browse files Browse the repository at this point in the history
Allow to search by source packages
  • Loading branch information
waldiTM authored Dec 12, 2023
2 parents 152ad93 + a332439 commit 264d7b1
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 1 deletion.
41 changes: 41 additions & 0 deletions openapi-v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,47 @@ paths:
schema: {}
'400':
description: parameter is invalid
/cves/findBySources:
post:
tags:
- cve
summary: Finds CVE by source packages
parameters:
- name: cvssV3Severity
in: query
description: Not implemented
schema:
type: string
enum:
- LOW
- MEDIUM
- HIGH
- CRITICAL
requestBody:
required: true
content:
application/x-www-form-urlencoded:
schema:
type: object
properties:
source[]:
type: array
items:
type: string
example:
- 'debian_bookworm_glibc_2.36-9+deb12u3'
encoding:
source[]:
style: form
explode: true
responses:
'200':
description: successful operation
content:
application/json:
schema: {}
'400':
description: parameter is invalid
/cves/{cveId}:
get:
tags:
Expand Down
96 changes: 95 additions & 1 deletion src/glvd/web/v1_cves.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
# SPDX-License-Identifier: MIT

from typing import Any
from typing import (
Any,
cast,
)

from quart import Blueprint, current_app, request
import sqlalchemy as sa
from sqlalchemy import (
bindparam,
text,
)

from ..database import AllCve, DebCve, DistCpe
from ..database.types import DebVersion
from ..data.cpe import Cpe
from ..data.dist_cpe import DistCpeMapper

bp = Blueprint('nvd', __name__, url_prefix='/v1/cves')

Expand Down Expand Up @@ -91,6 +98,10 @@
)
)

headers_cors: dict[str, str] = {
'Access-Control-Allow-Origin': '*',
}


@bp.route('/<cve_id>')
async def get_cve_id(cve_id: str) -> tuple[Any, int]:
Expand Down Expand Up @@ -136,3 +147,86 @@ async def get_cpe_name() -> tuple[Any, int]:
(await conn.execute(stmt)).one()[0],
200
)


@bp.route('/findBySources', methods=['POST', 'OPTIONS'])
async def get_sources() -> tuple[Any, int, dict[str, str]]:
# Handle pre-flight request to allow CORS
if request.method == 'OPTIONS':
return ('', 204, headers_cors)

async with getattr(current_app, 'db_begin')() as conn:
# Aggregate by product/codename
source_by_dist: dict[tuple[str, str], set[tuple[str, str]]] = {}
for source in (await request.form).getlist('source[]', type=str):
s = cast(tuple[str, str, str, str], tuple(source.split('_', 4)))
source_by_dist.setdefault(s[0:2], set()).add(s[2:4])

# Create dynamic table (as CTE) to find many sources at the same time
# XXX: Replace with "real" temporary table, use COPY IN, check if this
# can remove the enormous time spent on compiling queries
stmts_source = []
for i, j in source_by_dist.items():
dist = DistCpeMapper.new(i[0])(i[1])
dist_id = (await conn.execute(
sa.select(DistCpe.id)
.where(DistCpe.cpe_vendor == dist.cpe_vendor)
.where(DistCpe.cpe_product == dist.cpe_product)
.where(DistCpe.cpe_version == dist.cpe_version)
)).one()[0]

for source, version in j:
stmts_source.append(
sa.select(
sa.literal(dist_id).label('dist_id'),
sa.literal(source).label('deb_source'),
sa.cast(sa.literal(version), DebVersion).label('deb_version'),
)
)

# If we found no source at all
if not stmts_source:
return ('', 400, headers_cors)

# We deduplicate source entries ourselves, so we can just ask the db to
# take all entries via "UNION ALL", instead of forcing it to first sort
# and remove duplicates by using "UNION"
subquery_source = sa.union_all(*stmts_source).cte(name='source')

# Find (unique) CVE ID for given sources
subquery_cve_id = (
sa.select(DebCve.cve_id)
.distinct()
.join(
subquery_source,
sa.and_(
DebCve.dist_id == subquery_source.c.dist_id,
DebCve.deb_source == subquery_source.c.deb_source,
sa.or_(
DebCve.deb_version_fixed > subquery_source.c.deb_version,
DebCve.deb_version_fixed.is_(None),
),
)
)
).cte()

# Find CVE data for given ID
subquery_cve_data = (
sa.select(AllCve.data)
.order_by(AllCve.cve_id)
.join(subquery_cve_id, AllCve.cve_id == subquery_cve_id.c.cve_id)
).cte()

# Generate JSON array
query = (
sa.select(
sa.func.coalesce(
# json_agg() creates a JSON array
sa.func.json_agg(subquery_cve_data.c.data),
# We never want NULL, so we generate an empty array
sa.text("'[]'::json")
)
)
)

return ((await conn.execute(query)).one()[0], 200, headers_cors)

0 comments on commit 264d7b1

Please sign in to comment.