Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: RHEL CSAF provider #758

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/nightly-quality-gate.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
echo "providers=$content" >> $GITHUB_OUTPUT

validate-provider:
runs-on: ubuntu-22.04
runs-on: ubuntu-22.04-4core-16gb
needs: select-providers
strategy:
matrix:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pr-quality-gate.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
echo "providers=$content" >> $GITHUB_OUTPUT

validate-provider:
runs-on: ubuntu-22.04
runs-on: ubuntu-22.04-4core-16gb
needs: select-providers
if: contains(github.event.pull_request.labels.*.name, 'run-pr-quality-gate')

Expand Down
240 changes: 143 additions & 97 deletions poetry.lock

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ pytest-snapshot = "^0.9.0"
mashumaro = "^3.10"
iso8601 = "^2.1.0"
zstandard = ">=0.22,<0.24"
packageurl-python = "^0.16.0"
deepdiff = "^8.0.1"

[tool.poetry.group.dev.dependencies]
pytest = ">=7.2.2,<9.0.0"
Expand Down Expand Up @@ -110,8 +112,8 @@ exclude = '''(?x)(
| ^src/vunnel/providers/mariner/model/ # generated code
| ^src/vunnel/providers/nvd/parser\.py$ # ported from enterprise, never had type hints
| ^src/vunnel/providers/oracle/parser\.py$ # ported from enterprise, never had type hints
| ^src/vunnel/providers/rhel/parser\.py$ # ported from enterprise, never had type hints
| ^src/vunnel/providers/rhel/oval_parser\.py$ # ported from enterprise, never had type hints
| ^src/vunnel/providers/rhel_legacy/parser\.py$ # ported from enterprise, never had type hints
| ^src/vunnel/providers/rhel_legacy/oval_parser\.py$ # ported from enterprise, never had type hints
| ^src/vunnel/providers/sles/parser\.py$ # ported from enterprise, never had type hints
| ^src/vunnel/providers/ubuntu/git\.py$ # ported from enterprise, never had type hints
| ^src/vunnel/providers/ubuntu/parser\.py$ # ported from enterprise, never had type hints
Expand Down
77 changes: 41 additions & 36 deletions src/vunnel/providers/rhel/__init__.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
from __future__ import annotations

import os
import datetime
from dataclasses import dataclass, field
from typing import TYPE_CHECKING

from vunnel import provider, result, schema

from .parser import Parser
from vunnel import provider, result
from vunnel.providers.rhel_csaf import Config as CSAFConfig
from vunnel.providers.rhel_csaf import Provider as RHEL_CSAFProvider
from vunnel.providers.rhel_legacy import Config as LegacyConfig
from vunnel.providers.rhel_legacy import Provider as LegacyRHELProvider

if TYPE_CHECKING:
import datetime
CSAF_PROVIDER = "csaf"
LEGACY_PROVIDER = "legacy"


@dataclass
Expand All @@ -24,43 +23,49 @@ class Config:
parallelism: int = 4
full_sync_interval: int = 2
skip_namespaces: list[str] = field(default_factory=lambda: ["rhel:3", "rhel:4"])
provider_type: str = CSAF_PROVIDER # "legacy" or "csaf"

def to_specific_config(self) -> LegacyConfig | CSAFConfig:
if self.provider_type == LEGACY_PROVIDER:
return LegacyConfig(
runtime=self.runtime,
request_timeout=self.request_timeout,
parallelism=self.parallelism,
full_sync_interval=self.full_sync_interval,
skip_namespaces=self.skip_namespaces,
)
if self.provider_type == CSAF_PROVIDER:
# other fields don't make sense for CSAF Provider
return CSAFConfig(
runtime=self.runtime,
skip_namespaces=self.skip_namespaces,
)
raise ValueError(f"invalid config type {self.provider_type}, expected '{LEGACY_PROVIDER}' or '{CSAF_PROVIDER}'")

class Provider(provider.Provider):

__schema__ = schema.OSSchema()
__distribution_version__ = int(__schema__.major_version)

class Provider(provider.Provider):
def __init__(self, root: str, config: Config | None = None):
if not config:
config = Config()
super().__init__(root, runtime_cfg=config.runtime)
self.logger.info(f"config: {config}")
self._provider_instance: RHEL_CSAFProvider | LegacyRHELProvider | None = None
specific_config = config.to_specific_config()
if isinstance(specific_config, CSAFConfig):
self.logger.info(f"Using provider: {config.provider_type}")
self._provider_instance = RHEL_CSAFProvider(root, specific_config)
elif isinstance(specific_config, LegacyConfig):
self.logger.info(f"Using provider: {config.provider_type}")
self._provider_instance = LegacyRHELProvider(root, specific_config)
else:
raise ValueError(f"unknown provider type {config.provider_type}, expected {LEGACY_PROVIDER} or {CSAF_PROVIDER}")
self.config = config

self.logger.debug(f"config: {config}")

self.parser = Parser(
workspace=self.workspace,
download_timeout=self.config.request_timeout,
max_workers=self.config.parallelism,
full_sync_interval=self.config.full_sync_interval,
skip_namespaces=self.config.skip_namespaces,
logger=self.logger,
)

@classmethod
def name(cls) -> str:
return "rhel"

def update(self, last_updated: datetime.datetime | None) -> tuple[list[str], int]:
with self.results_writer() as writer:
for namespace, vuln_id, record in self.parser.get(skip_if_exists=self.config.runtime.skip_if_exists):
namespace = namespace.lower()
vuln_id = vuln_id.lower()
writer.write(
identifier=os.path.join(namespace, vuln_id),
schema=self.__schema__,
payload=record,
)

return self.parser.urls, len(writer)
def update(self, last_updated: datetime.datetime | None = None) -> tuple[list[str], int]:
if not self._provider_instance:
raise ValueError("attempt to call update on delegating provider with no underlying provider")
return self._provider_instance.update(last_updated)
50 changes: 50 additions & 0 deletions src/vunnel/providers/rhel_csaf/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import datetime
import os
from dataclasses import dataclass, field

from vunnel import provider, result, schema

from .parser import Parser


@dataclass
class Config:
runtime: provider.RuntimeConfig = field(
default_factory=lambda: provider.RuntimeConfig(
result_store=result.StoreStrategy.SQLITE,
existing_results=result.ResultStatePolicy.DELETE_BEFORE_WRITE,
),
)
skip_namespaces: list[str] = field(default_factory=lambda: ["rhel:3", "rhel:4"])


class Provider(provider.Provider):

__schema__ = schema.OSSchema()
__distribution_version__ = int(__schema__.major_version)

def __init__(self, root: str, config: Config | None = None):
if not config:
config = Config()
super().__init__(root, runtime_cfg=config.runtime)
self.config = config

self.logger.debug(f"config: {config}")
self.parser = Parser(workspace=self.workspace, logger=self.logger)

@classmethod
def name(cls) -> str:
return "rhel"

def update(self, last_updated: datetime.datetime | None) -> tuple[list[str], int]:
with self.results_writer() as writer:
for namespace, vuln_id, record in self.parser.get(skip_if_exists=self.config.runtime.skip_if_exists):
namespace = namespace.lower()
vuln_id = vuln_id.lower()
writer.write(
identifier=os.path.join(namespace, vuln_id),
schema=self.__schema__,
payload=record,
)

return self.parser.urls, len(writer)
146 changes: 146 additions & 0 deletions src/vunnel/providers/rhel_csaf/parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import concurrent.futures
import contextlib
import csv
import logging
import os
from collections.abc import Generator
from datetime import datetime, timezone
from typing import Any

from vunnel.providers.rhel_csaf.transformer import vulnerabilities_by_namespace
from vunnel.utils import http
from vunnel.utils.archive import extract
from vunnel.utils.csaf_types import from_path
from vunnel.utils.vulnerability import Vulnerability
from vunnel.workspace import Workspace

VEX_LATEST_URL = "https://security.access.redhat.com/data/csaf/v2/vex/archive_latest.txt"
VEX_CHANGES_URL = "https://security.access.redhat.com/data/csaf/v2/vex/changes.csv"
VEX_DELETIONS_URL = "https://security.access.redhat.com/data/csaf/v2/vex/deletions.csv"
ADVISORIES_LATEST_URL = "https://security.access.redhat.com/data/csaf/v2/advisories/archive_latest.txt"


class Parser:
def __init__(
self,
workspace: Workspace,
download_timeout: int | None = None,
skip_namespaces: list[str] | None = None,
logger: logging.Logger | None = None,
):
self.workspace = workspace
self.download_timeout = download_timeout if isinstance(download_timeout, int) else 125
self.skip_namespaces = skip_namespaces if isinstance(skip_namespaces, list) else ["rhel:3", "rhel:4"]
self.rhsa_dict = None
self.urls: list[str] = []
self.vex_archive_date: datetime | None = None

self.download_path = os.path.join(self.workspace.input_path, "vex_archive.tar.zst")
self.advisory_download_path = os.path.join(self.workspace.input_path, "advisory_archive.tar.zst")
self.csaf_path = os.path.join(self.workspace.input_path, "csaf")
os.makedirs(self.csaf_path, exist_ok=True)
self.advisories_path = os.path.join(self.workspace.input_path, "advisories")

if not logger:
logger = logging.getLogger(self.__class__.__name__)
self.logger = logger
self.logger.debug("starting of RHEL CSAF parser")

def get_archive_date(self, archive_filename: str) -> datetime:
date_part = archive_filename.removeprefix("csaf_vex_").removesuffix(".tar.zst")
return datetime.strptime(date_part, "%Y-%m-%d").replace(tzinfo=timezone.utc) # noqa: UP017

def download_stream(self, url: str, dest: str) -> None:
with http.get(url, logger=self.logger, stream=True) as resp, open(dest, "wb") as fh:
for chunk in resp.iter_content(chunk_size=65536): # 64k chunks
if chunk:
fh.write(chunk)

def download_vex_archive(self) -> None:
latest_resp = http.get(url=VEX_LATEST_URL, logger=self.logger)
archive_filename = latest_resp.content.decode()
self.vex_archive_date = self.get_archive_date(archive_filename)
archive_url = VEX_LATEST_URL.replace("archive_latest.txt", archive_filename)
self.urls = [archive_url]
self.download_stream(archive_url, self.download_path)
changes_path = os.path.join(self.csaf_path, "changes.csv")
self.download_stream(VEX_CHANGES_URL, changes_path)
deletions_path = os.path.join(self.csaf_path, "deletions.csv")
self.download_stream(VEX_DELETIONS_URL, deletions_path)

def extract_all(self) -> None:
os.makedirs(self.csaf_path, exist_ok=True)
extract(self.download_path, self.csaf_path)

def process_changes_and_deletions(self) -> None:
"""process the changes and deletions. deletions.csv is the list of CSAF JSON
files that have been deleted. Download it and loop over it, deleting all
referenced files. changes.csv is a date-sorted list of when each CSAF JSON
file changed. Download it, and loop over the rows, until we get back to the
date of the archive, keeping a list of unique files, to get the set of files
that have changed since the archive was published. Re-download all of them, over-writing
whatever data was in the archive."""
changes_path = os.path.join(self.csaf_path, "changes.csv")
deletions_path = os.path.join(self.csaf_path, "deletions.csv")
with open(deletions_path, newline="") as fh:
reader = csv.reader(fh)
for row in reader:
deleted_fragment = row[0]
# suppress FileNotFound because deleting the same file twice
# should no-op rather than raise an error
with contextlib.suppress(FileNotFoundError):
os.remove(os.path.join(self.csaf_path, deleted_fragment))
seen_files = set()
with open(changes_path, newline="") as fh:
reader = csv.reader(fh)
for row in reader:
# row is like "2021/cve-2021-47265.json","2024-11-08T18:28:22+00:00"
changed_file = row[0]
date_str = row[1]
change_date = datetime.fromisoformat(date_str)
if self.vex_archive_date and change_date < self.vex_archive_date:
break
if changed_file in seen_files:
continue
seen_files.add(changed_file)
with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
futures = {
executor.submit(
self.download_stream,
url=VEX_LATEST_URL.replace("archive_latest.txt", changed_file),
dest=os.path.join(self.csaf_path, changed_file),
): changed_file
for changed_file in seen_files
}
concurrent.futures.wait(futures.keys())
for future, changed_file in futures.items():
if future.exception() is not None:
self.logger.warning(f"Failed to download {changed_file}: {future.exception()}")

def fetch(self) -> None:
self.download_vex_archive()
self.extract_all()
self.process_changes_and_deletions()

def _csaf_vex_files(self) -> Generator[str]:
for root, _, files in os.walk(self.csaf_path):
for file in files:
if file.endswith(".json"):
yield os.path.join(root, file)

def process(self) -> Generator[tuple[str, str, Vulnerability]]:
for file_path in self._csaf_vex_files():
try:
self.logger.debug(f"processing {file_path}")
c = from_path(file_path)
ns_to_vulns = vulnerabilities_by_namespace(c, set(self.skip_namespaces))
for ns, vuln in ns_to_vulns.items():
yield ns, vuln.Name, vuln

except Exception as e:
self.logger.warning(f"failed to process {file_path}: {e}")

def get(self, skip_if_exists: bool = False) -> Generator[tuple[str, str, dict[str, Any]]]:
self.fetch()
for namespace, vuln_id, record in self.process():
yield namespace, vuln_id, record.to_payload()
Loading