Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Epss provider prototype #634

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/vunnel/providers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
amazon,
chainguard,
debian,
epss,
github,
mariner,
nvd,
Expand Down Expand Up @@ -40,6 +41,7 @@
ubuntu.Provider.name(): ubuntu.Provider,
wolfi.Provider.name(): wolfi.Provider,
chainguard.Provider.name(): chainguard.Provider,
epss.Provider.name(): epss.Provider,
}


Expand Down
91 changes: 91 additions & 0 deletions src/vunnel/providers/epss/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
from __future__ import annotations

from dataclasses import dataclass, field
from typing import TYPE_CHECKING

from vunnel import provider, result, schema

from .parser import Parser

if TYPE_CHECKING:
import datetime

# NOTE, CHANGE ME!: a unique and semantically useful name for this provider
PROVIDER_NAME = "epss"

# NOTE, CHANGE ME!: the data shape that all entries produced by this provider conform to
SCHEMA = schema.EPSSSchema()


@dataclass
class Config:
runtime: provider.RuntimeConfig = field(
default_factory=lambda: provider.RuntimeConfig(
result_store=result.StoreStrategy.SQLITE,
existing_results=result.ResultStatePolicy.DELETE_BEFORE_WRITE,
),
)
request_timeout: int = 125

# NOTE, CHANGE ME!: Example for fetching secrets from the environment and sanitizing output.
# It is important to sanitize the __str__ method so that these secrets are not accidentally
# written to log output.
#
# token: str = "env:VUNNEL_AWESOME_TOKEN"
#
# def __post_init__(self) -> None:
# if self.token.startswith("env:"):
# self.token = os.environ.get(self.token[4:], "")
#
# def __str__(self) -> str:
# # sanitize secrets from any output
# tok_value = self.token
# str_value = super().__str__()
# if not tok_value:
# return str_value
# return str_value.replace(tok_value, "********")


class Provider(provider.Provider):
def __init__(self, root: str, config: Config | None = None):
if not config:
config = Config()

super().__init__(root, runtime_cfg=config.runtime)
self.config = config
self.logger.debug(f"config: {config}")

self.parser = Parser(
ws=self.workspace,
download_timeout=self.config.request_timeout,
logger=self.logger,
# NOTE, CHANGE ME!: example of passing a config secret to the parser to download the vulnerability data
# token=self.config.token
)

# this provider requires the previous state from former runs
provider.disallow_existing_input_policy(config.runtime)

@classmethod
def name(cls) -> str:
return PROVIDER_NAME

def update(self, last_updated: datetime.datetime | None) -> tuple[list[str], int]:
# NOTE: CHANGE ME! Why is last_updated passed in here? This allows you to be able to make decisions about
# incremental updates of the existing vulnerability data state instead of needing to download all
# vulnerability data from the source. For an example of this see the NVD provider implementation at
# https://github.com/anchore/vunnel/blob/main/src/vunnel/providers/nvd/manager.py

with self.results_writer() as writer:
for vuln_id, record in self.parser.get():
if not vuln_id:
continue
vuln_id = vuln_id.lower()

writer.write(
identifier=vuln_id,
schema=SCHEMA,
payload=record,
)

return self.parser.urls, len(writer)
134 changes: 134 additions & 0 deletions src/vunnel/providers/epss/parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
from __future__ import annotations

import datetime
import gzip

# import json
import logging
import os
from io import BytesIO
from typing import TYPE_CHECKING

import requests

from vunnel import utils, workspace

if TYPE_CHECKING:
from collections.abc import Generator


NAMESPACE = "epss"


class Parser:
# _json_url_ = "https://api.first.org/data/v1/epss"
# _json_file_ = "epss_data.jsonl"
_csv_url_ = "https://epss.cyentia.com/epss_scores-{}.csv.gz"
_csv_file_ = "epss_data.csv"

def __init__(self, ws: workspace.Workspace, download_timeout: int = 125, logger: logging.Logger | None = None):
self.workspace = ws
self.download_timeout = download_timeout
# self.json_file_path = os.path.join(ws.input_path, self._json_file_)
self.csv_file_path = os.path.join(ws.input_path, self._csv_file_)
year = datetime.datetime.now(tz=datetime.timezone.utc).year
month = datetime.datetime.now(tz=datetime.timezone.utc).month
day = datetime.datetime.now(tz=datetime.timezone.utc).day
self.datestring = f"{year}-{month:02}-{day:02}"
self._csv_url_ = self._csv_url_.format(self.datestring)

# self.urls = [self._json_url_]
self.urls = [self._csv_url_]

if not logger:
logger = logging.getLogger(self.__class__.__name__)
self.logger = logger

def get(self) -> Generator[tuple[str | None, dict[str, str]], None, None]:
"""
Download, load and normalize EPSS data
:return:
"""
self._download()
yield from self._normalize()

@utils.retry_with_backoff()
def _download(self) -> None:
self.logger.info(f"downloading vulnerability data from {self._csv_url_}")

# NOTE: consider backing off to 'try yesterday' if the 'now'
# CSV doesn't exist (yet), if this is run on day boundries

# NOTE: consider skipping the whole execution if the 'now' CSV
# has already been processed soas not to waste time - the EPSS
# CSV bundle is only updated daily

r = requests.get(self._csv_url_, timeout=self.download_timeout)
r.raise_for_status()

gzbuf = BytesIO(r.content)
with gzip.GzipFile(fileobj=gzbuf, mode="rb") as GZFH, open(self.csv_file_path, "wb") as FH:
FH.write(GZFH.read())

def _normalize(self) -> Generator[tuple[str | None, dict[str, str]], None, None]:
with open(self.csv_file_path, encoding="utf-8") as FH:
for csv_line in FH.readlines():
if not csv_line.startswith("CVE"):
continue
try:
# {"cve": "CVE-2024-6775", "epss": "0.000430000", "percentile": "0.092910000", "date": "2024-07-17"}
toks = csv_line.split(",")
input_record = {
"cve": toks[0],
"epss": toks[1],
"percentile": toks[2],
"date": self.datestring,
}
except Exception as err:
self.logger.warning(f"couldn't parse CSV line from input - {csv_line}: {err}")
input_record = None
if not input_record:
continue
yield input_record.get("cve"), input_record

# NOTE: these next two implementations (ending with _json()) are
# not used, leaving here as alternative method for getting the
# same data but from the EPSS API as opposed to the simpler csv
# bundle. related member variables (json things) would need to be
# uncommented as well to use this implementation

# @utils.retry_with_backoff()
# def _download_api_json(self):
# self.logger.info(f"downloading vulnerability data from {self._json_url_}")
# total = 1000
# limit = 10000
# offset = 0
# current = 0
# r = requests.get(self._json_url_, params={"limit": limit, "offset": offset}, timeout=self.download_timeout)
# r.raise_for_status()

# with open(self.json_file_path, "w", encoding="utf-8") as f:
# done = False
# while not done:
# for record in r.json().get("data", []):
# f.write(json.dumps(record) + "\n")
# current = current + limit
# total = r.json().get("total", 0)
# if current >= total:
# done = True
# else:
# offset = offset + limit
# r = requests.get(self._json_url_, params={"limit": limit, "offset": offset}, timeout=self.download_timeout)
# r.raise_for_status()

# def _normalize_api_json(self):
# with open(self.json_file_path, encoding="utf-8") as f:
# for json_line in f.readlines():
# try:
# input_record = json.loads(json_line)
# except:
# logger.warning(f"couldn't parse (json loads) line from input: {json_line}")
# input_record = None
# if not input_record:
# continue
# yield input_record.get("cve"), input_record
8 changes: 8 additions & 0 deletions src/vunnel/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
OS_SCHEMA_VERSION = "1.0.0"
NVD_SCHEMA_VERSION = "1.0.0"
OSV_SCHEMA_VERSION = "1.6.1"
EPSS_SCHEMA_VERSION = "1.0.0"


@dataclass(frozen=True)
Expand All @@ -37,6 +38,13 @@ def ProviderListingSchema(version: str = PROVIDER_ARCHIVE_LISTING_SCHEMA_VERSION
)


def EPSSSchema(version: str = EPSS_SCHEMA_VERSION) -> Schema:
return Schema(
version=version,
url=f"https://raw.githubusercontent.com/anchore/vunnel/main/schema/vulnerability/epss/schema-{version}.json",
)


def ProviderStateSchema(version: str = PROVIDER_WORKSPACE_STATE_SCHEMA_VERSION) -> Schema:
return Schema(
version=version,
Expand Down
Loading