From c35cd784fa19103c8b8699d9d59ba3b12afc96f3 Mon Sep 17 00:00:00 2001 From: Bastian Blank Date: Fri, 15 Dec 2023 13:06:29 +0100 Subject: [PATCH] Add client tool to get single CVE --- src/glvd/cli/__main__.py | 1 + src/glvd/cli/client_cve.py | 76 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+) create mode 100644 src/glvd/cli/client_cve.py diff --git a/src/glvd/cli/__main__.py b/src/glvd/cli/__main__.py index ef299a0..a94b916 100644 --- a/src/glvd/cli/__main__.py +++ b/src/glvd/cli/__main__.py @@ -6,6 +6,7 @@ # Import to register all the commands from . import ( # noqa: F401 + client_cve, combine_all, combine_deb, ingest_debsec, diff --git a/src/glvd/cli/client_cve.py b/src/glvd/cli/client_cve.py new file mode 100644 index 0000000..c7cadf5 --- /dev/null +++ b/src/glvd/cli/client_cve.py @@ -0,0 +1,76 @@ +# SPDX-License-Identifier: MIT + +from __future__ import annotations + +import json +import logging +import sys +import urllib.parse +from datetime import datetime, timedelta, timezone +from typing import Any + +import asyncio +from sqlalchemy import func, select +from sqlalchemy.dialects.postgresql import insert +from sqlalchemy.ext.asyncio import ( + AsyncConnection, + AsyncEngine, + create_async_engine, +) + +from ..database import Base, NvdCve +from ..util import requests +from . import cli + + +logger = logging.getLogger(__name__) + + +class ClientCve: + server: str + + @staticmethod + @cli.register( + 'cve', + arguments=[ + cli.add_argument( + 'cve', + help='the CVE to look up', + metavar='CVE', + ), + cli.add_argument( + '--server', + default='http://localhost:5000', + help='the server to use', + ), + cli.add_argument( + '--debug', + action='store_true', + help='enable debug output', + ), + ] + ) + def run(cve: str, server: str, debug: bool) -> None: + logging.basicConfig(level=debug and logging.DEBUG or logging.INFO) + ClientCve(cve, server)() + + def __init__(self, cve: str, server: str) -> None: + self.cve = cve + self.server = server + + def __call__(self) -> None: + with requests.RetrySession() as rsession: + resp = rsession.get( + urllib.parse.urljoin(self.server, f'v1/cves/{self.cve}'), + ) + if resp.status_code == 200: + data = resp.json() + json.dump(data, sys.stdout, indent=2) + elif resp.status_code == 404: + print(f'{self.cve} not found', file=sys.stderr) + else: + resp.raise_for_status() + + +if __name__ == '__main__': + ClientCve.run()