Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: remove file scoped noqa for ubuntu parser #372

Merged
merged 2 commits into from
Oct 30, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 67 additions & 65 deletions src/vunnel/providers/ubuntu/parser.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# flake8: noqa
from __future__ import annotations

import concurrent.futures
Expand All @@ -10,14 +9,17 @@
import re
import time
from dataclasses import asdict, dataclass, field
from typing import Any, Generator
from typing import TYPE_CHECKING, Any

import orjson

from vunnel.workspace import Workspace

from .git import GitWrapper

if TYPE_CHECKING:
from collections.abc import Generator

from vunnel.workspace import Workspace

namespace = "ubuntu"

default_max_workers = 8
Expand Down Expand Up @@ -45,15 +47,15 @@
"pending": True,
# Indicates a vuln with release coming in the expected version. We ignore here for now assuming it will be in 'released' status shortly.
"active": True, # The package is affected, needs fixing and is being worked on.
"deferred": True
"deferred": True,
# The package is affected by work on fix is deferred for some reason. In this case, ignore any version info as it may be random (e.g. a date)
}

patch_merge_criteria = {
"status": re.compile(r"ignored"),
# match end-of-life, end-of-standard-support, out-of-standard-support anywhere in the line
"version": re.compile(
r"(^|.*\s+)(end[\s-]of[\s-]life|end[\s-]of[\s-]standard[\s-]support|out[\s-]of[\s-]standard[\s-]support)($|\s+|\,.*)"
r"(^|.*\s+)(end[\s-]of[\s-]life|end[\s-]of[\s-]standard[\s-]support|out[\s-]of[\s-]standard[\s-]support)($|\s+|\,.*)",
),
}

Expand Down Expand Up @@ -99,22 +101,22 @@ class JsonifierMixin:
def json(self):
jsonified = {}
for k, v in vars(self).items():
if not k[0] == "_":
if type(v) == list or type(v) == set:
if k[0] != "_":
if isinstance(v, (list, set)):
jsonified[k] = [x.json() if hasattr(x, "json") and callable(x.json) else x for x in v]
elif type(v) == dict:
elif isinstance(v, dict):
jsonified[k] = {x: y.json() if hasattr(y, "json") and callable(y.json) else y for x, y in v.items()}
elif hasattr(v, "json"):
jsonified[k] = v.json()
else:
if hasattr(v, "json"):
jsonified[k] = v.json()
else:
jsonified[k] = v
jsonified[k] = v
return jsonified


# Driver V2 Design Changes:
# Because the ubuntu data is removed when a release hits EOL (patch records changed from a state to 'ignored (reached end-of-life)'), the way to keep the actual
# state for all cves is to track the last update prior to EOL, which requires traversing revision history. The v2 driver does this to maintain an accurate state
# Because the ubuntu data is removed when a release hits EOL (patch records changed from a state to 'ignored (reached end-of-life)'),
# the way to keep the actual state for all cves is to track the last update prior to EOL, which requires traversing revision history.
# The v2 driver does this to maintain an accurate state
# for each cve. It will be very slow on initial run but can be optimized by pre-populating the cache of merged state


Expand Down Expand Up @@ -211,9 +213,8 @@ def check_header(expected: str, lines: list[str]):

# This is not a full match, only prefix to handle cases where the header isn't quite within spec (e.g. notes added etc).
if not lines[0].strip().startswith(expected + ":"):
raise ValueError("Expected header {}, found {}".format(expected, lines[0]))
else:
lines.pop(0)
raise ValueError(f"Expected header {expected}, found {lines[0]}")
lines.pop(0)


def parse_list(header: str, lines: list[str]) -> list[str]:
Expand All @@ -237,7 +238,7 @@ def parse_list(header: str, lines: list[str]) -> list[str]:
return refs


def parse_patch(header: str, lines: list[str]) -> list[Patch]:
def parse_patch(header: str, lines: list[str]) -> list[Patch]: # noqa: C901
"""
Parse a patch spec of the form:
Patches_<packagename>:
Expand Down Expand Up @@ -275,7 +276,7 @@ def parse_patch(header: str, lines: list[str]) -> list[Patch]:
patch.priority = priority
lines.pop(0)
continue
else:
else: # noqa: RET507
match = _patches_regex.match(line)
if match:
version = None
Expand Down Expand Up @@ -344,8 +345,7 @@ def get_patch_section(header_line: str) -> str | None:
match = _patches_header_regex.match(header_line)
if match:
return match.group(1)
else:
return None
return None


def check_release(releasename: str) -> bool:
Expand Down Expand Up @@ -431,11 +431,11 @@ def parse_cve_file(cve_id: str, content_lines: list[str]) -> CVEFile:
else:
patch_name = get_patch_section(section)
p_match = _patches_regex.match(line)
if patch_name:
if patch_name: # noqa: SIM114
patches = parse_patch(section, lines)
parsed.patches += patches
continue
elif p_match and map_namespace(p_match.group(1)):
elif p_match and map_namespace(p_match.group(1)): # noqa: RET507
patches = parse_patch(section, lines)
parsed.patches += patches
continue
Expand All @@ -461,7 +461,7 @@ def map_namespace(release_name: str) -> str | None:
return None


def map_parsed(parsed_cve: CVEFile, logger: logging.Logger | None = None):
def map_parsed(parsed_cve: CVEFile, logger: logging.Logger | None = None): # noqa: C901, PLR0912
"""
Maps a parsed CVE dict into a Vulnerability object.

Expand All @@ -481,7 +481,7 @@ def map_parsed(parsed_cve: CVEFile, logger: logging.Logger | None = None):
# Map keyed by namespace name
vulns = {}
if not (parsed_cve.name):
logger.error("could not find a Name for parsed cve: {}".format(asdict(parsed_cve)))
logger.error(f"could not find a Name for parsed cve: {asdict(parsed_cve)}")
return []

for p in parsed_cve.patches:
Expand Down Expand Up @@ -519,9 +519,11 @@ def map_parsed(parsed_cve: CVEFile, logger: logging.Logger | None = None):
pkg.Version = p.version
if pkg.Version is None:
logger.debug(
'found CVE {} in ubuntu version {} with "released" status for pkg {} but no version for release. Released patches should have version info, but missing in source data. Marking package as not vulnerable'.format(
r.Name, r.NamespaceName, pkg.Name
)
'found CVE {} in ubuntu version {} with "released" status for pkg {} but no version for release. Released patches should have version info, but missing in source data. Marking package as not vulnerable'.format( # noqa: E501, G001
r.Name,
r.NamespaceName,
pkg.Name,
),
)
continue
# Strange condition where a release was done but no version found. In this case, we'll omit the FixedIn record.
Expand All @@ -539,8 +541,7 @@ def map_parsed(parsed_cve: CVEFile, logger: logging.Logger | None = None):
if pkg_sev > r.Severity:
r.Severity = pkg_sev

s = set(vulns.values())
return s
return set(vulns.values())


def filter_resolved_patches(cve: CVEFile, dpt_list: list[DistroPkg]) -> dict[DistroPkg, Patch]:
Expand All @@ -551,7 +552,7 @@ def filter_resolved_patches(cve: CVEFile, dpt_list: list[DistroPkg]) -> dict[Dis
:param dpt_list: list of DistroPkg objects
:return:
"""
filtered_map = dict()
filtered_map = {}
for dpt in dpt_list:
matched_p = next(
(p for p in cve.patches if dpt.distro == p.distro and dpt.pkg == p.package and not check_merge(p)),
Expand All @@ -571,7 +572,7 @@ def filter_merged_patches(cve: CVEFile, dpt_list: list[DistroPkg]) -> dict[Distr
:param dpt_list: list of DistroPkg objects
:return:
"""
filtered_map: dict[DistroPkg, Patch] = dict()
filtered_map: dict[DistroPkg, Patch] = {}
for dpt in dpt_list:
matched_p = next(
(p for p in cve.patches if dpt.distro == p.distro and dpt.pkg == p.package),
Expand Down Expand Up @@ -602,7 +603,7 @@ class Parser:

# Revision 9000 on the sec bzr repo before trusty eol. revno 9000 on 2015-01-27.

def __init__(
def __init__( # noqa: PLR0913
self,
workspace: Workspace,
logger: logging.Logger | None = None,
Expand Down Expand Up @@ -651,7 +652,7 @@ def fetch(self, skip_if_exists=False):
yield from map_parsed(merged_cve, self.logger)
self.logger.info("finish loading processed CVE content and transforming into vulnerabilities")

def _process_data(self, vc_dir: str, to_rev: str, from_rev: str | None = None):
def _process_data(self, vc_dir: str, to_rev: str, from_rev: str | None = None): # noqa: C901
self.logger.info(f"processing data from git repository: {vc_dir}, from revision: {from_rev}, to revision: {to_rev}")

self.git_wrapper.prepare_cve_revision_history()
Expand All @@ -660,12 +661,12 @@ def _process_data(self, vc_dir: str, to_rev: str, from_rev: str | None = None):
updated_paths = []
deleted_ids = []
if from_rev and to_rev and from_rev != to_rev:
self.logger.debug("fetching changes to CVEs between revisions {} and {}".format(from_rev, to_rev))
self.logger.debug(f"fetching changes to CVEs between revisions {from_rev} and {to_rev}")
modified, removed = self.git_wrapper.get_merged_change_set(from_rev=from_rev, to_rev=to_rev)
updated_paths = list(modified.values()) if modified else []
deleted_ids = list(removed.keys()) if removed else []
self.logger.info("detected {} CVE updates (add/modify/rename)".format(len(updated_paths)))
self.logger.info("detected {} CVE deletions".format(len(deleted_ids)))
self.logger.info(f"detected {len(updated_paths)} CVE updates (add/modify/rename)")
self.logger.info(f"detected {len(deleted_ids)} CVE deletions")

# Load cves from active and retired directories and spool merged state to disk
# note: this is an IO bound operation, so a thread pool will suffice for now
Expand All @@ -678,7 +679,7 @@ def worker(fn, cve_id: str, *args, **kwargs):
try:
return fn(cve_id, *args, **kwargs)
except:
self.logger.exception("error processing {}".format(cve_id))
self.logger.exception(f"error processing {cve_id}")
raise

futures = []
Expand Down Expand Up @@ -713,11 +714,18 @@ def worker(fn, cve_id: str, *args, **kwargs):
# Remove merged state of deleted cves
self.logger.info("begin processing deletes")
for cve_id in deleted_ids:
self.logger.debug("{} is no longer relevant, deleting merged CVE state if any".format(cve_id))
self.logger.debug(f"{cve_id} is no longer relevant, deleting merged CVE state if any")
self._delete_merged_cve(cve_id)
self.logger.info("finish processing deletes")

def _process_cve(self, cve_id: str, cve_rel_path: str, f: str, to_rev: str, updated_paths: list[str]) -> CVEFile | None:
def _process_cve( # noqa: PLR0913
self,
cve_id: str,
cve_rel_path: str,
f: str,
to_rev: str,
updated_paths: list[str],
) -> CVEFile | None:
self.logger.debug(f"begin processing {cve_id} to rev {to_rev}")

if cve_rel_path in updated_paths:
Expand Down Expand Up @@ -746,14 +754,13 @@ def _load_last_processed_rev(self):
last_processed_rev_path = os.path.join(self.norm_workspace, self._last_processed_rev_file)
if os.path.exists(last_processed_rev_path):
return self._bzr_to_git_transition_commit
else:
return None
return None

def _save_last_processed_rev(self, revno: str):
last_processed_rev_path = os.path.join(self.norm_workspace, self._last_processed_rev_file_git)

with open(last_processed_rev_path, "w") as f:
f.write("{}".format(revno))
f.write(f"{revno}")

def _load_merged_cve(self, cve_id: str) -> CVEFile | None:
if os.path.exists(os.path.join(self.norm_workspace, cve_id)):
Expand All @@ -766,8 +773,8 @@ def _load_merged_cve(self, cve_id: str) -> CVEFile | None:
def _save_merged_cve(self, cve_id: str, merged_cve: CVEFile):
filepath = os.path.join(self.norm_workspace, cve_id)
with open(filepath, "wb") as f:
self.logger.trace(f"writing record to {filepath!r}") # type: ignore
f.write(orjson.dumps(asdict(merged_cve), f)) # type: ignore
self.logger.trace(f"writing record to {filepath!r}") # type: ignore # noqa: PGH003
f.write(orjson.dumps(asdict(merged_cve), f)) # type: ignore # noqa: PGH003

def _delete_merged_cve(self, cve_id):
if os.path.exists(os.path.join(self.norm_workspace, cve_id)):
Expand Down Expand Up @@ -795,7 +802,7 @@ def _reprocess_merged_cve(self, cve_id: str, cve_rel_path: str):

if not saved_state:
self.logger.debug(f"no saved state found for {cve_id}")
return
return None

# reprocess only ignored patches
merged_patches, ignored_patches, to_be_merged_map = self._categorize_patches(saved_state.ignored_patches)
Expand All @@ -804,10 +811,9 @@ def _reprocess_merged_cve(self, cve_id: str, cve_rel_path: str):
if merged_patches or to_be_merged_map:
self.logger.debug("found unresolved patches in previously merged state, could be a new distro")
# Process revision history for eol-ed packages that need to be merged
saved_state.priority if not None else "Unknown"
if to_be_merged_map:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worth trying to figure out what this line was meant to do (ruff flagged it as an expression whose result is never assigned). Does anyone have some more context here?

if self.enable_rev_history:
self.logger.debug("attempting to resolve patches using revision history for {}".format(cve_rel_path))
self.logger.debug(f"attempting to resolve patches using revision history for {cve_rel_path}")
(
resolved_patches,
pending_dpt_list,
Expand All @@ -821,10 +827,10 @@ def _reprocess_merged_cve(self, cve_id: str, cve_rel_path: str):
merged_patches.extend(resolved_patches)
if pending_dpt_list:
self.logger.debug(
"exhausted all revisions for {} but could not resolve patches: {}".format(
"exhausted all revisions for {} but could not resolve patches: {}".format( # noqa: G001
cve_rel_path,
[to_be_merged_map[x] for x in pending_dpt_list],
)
),
)
merged_patches.extend([to_be_merged_map[x] for x in pending_dpt_list])

Expand All @@ -837,7 +843,7 @@ def _reprocess_merged_cve(self, cve_id: str, cve_rel_path: str):
self.logger.debug("revision history processing is disabled. Merging unresolved patches as they are")
merged_patches.extend(to_be_merged_map.values())

# pulling this outside of the revision history block for fixing ENTERPRISE-195. saved state should be updated if there are mergeable or to-be-merged packages
# pulling this outside of the revision history block for fixing ENTERPRISE-195. saved state should be updated if there are mergeable or to-be-merged packages # noqa: E501
# there might already be resolved patches, extend it with merged patches, don't overwrite it
if saved_state.patches:
saved_state.patches.extend(merged_patches)
Expand Down Expand Up @@ -889,7 +895,7 @@ def _merge_cve(self, cve_id: str, cve_rel_path: str, cve_abs_path: str, repo_cur
:param repo_current_rev:
:return:
"""
self.logger.debug("merging CVE {}".format(cve_rel_path))
self.logger.debug(f"merging CVE {cve_rel_path}")

with open(cve_abs_path) as cve_file:
raw_content = cve_file.readlines()
Expand Down Expand Up @@ -918,17 +924,14 @@ def _merge_cve(self, cve_id: str, cve_rel_path: str, cve_abs_path: str, repo_cur
merged_patches.extend(resolved_patches)
if pending_dpt_list:
self.logger.debug(
"exhausted all revisions for {} but could not resolve patches: {}".format(
cve_rel_path,
[to_be_merged_map[x] for x in pending_dpt_list],
)
f"exhausted all revisions for {cve_rel_path} but could not resolve patches: {[to_be_merged_map[x] for x in pending_dpt_list]}", # noqa: E501
)
merged_patches.extend([to_be_merged_map[x] for x in pending_dpt_list])

del resolved_patches[:]
del pending_dpt_list[:]

else: # merge with saved state if any
else: # merge with saved state if any # noqa: PLR5501
if saved_cve:
self.logger.debug("revision history processing is disabled. Resolving patches using saved cve state")
rev_matched_map = filter_merged_patches(saved_cve, list(to_be_merged_map.keys()))
Expand All @@ -939,7 +942,7 @@ def _merge_cve(self, cve_id: str, cve_rel_path: str, cve_abs_path: str, repo_cur
rev_matched_map.clear()
else:
self.logger.debug(
"revision history processing is disabled and no saved state found. Skipping patch resolution"
"revision history processing is disabled and no saved state found. Skipping patch resolution",
)
merged_patches.extend(list(to_be_merged_map.values()))

Expand All @@ -955,7 +958,7 @@ def _merge_cve(self, cve_id: str, cve_rel_path: str, cve_abs_path: str, repo_cur

return parsed_cve

def _resolve_patches_using_history(
def _resolve_patches_using_history( # noqa: C901, PLR0912, PLR0915, PLR0913
self,
cve_id: str,
cve_rel_path: str,
Expand All @@ -981,10 +984,9 @@ def _resolve_patches_using_history(
pending_dpt_list: list[DistroPkg] = copy.deepcopy(to_be_merged_dpt_list)

# last processed commit
if saved_state and saved_state.git_last_processed_rev:
saved_cve_last_processed_rev = saved_state.git_last_processed_rev
else:
saved_cve_last_processed_rev = None
saved_cve_last_processed_rev = (
saved_state.git_last_processed_rev if saved_state and saved_state.git_last_processed_rev else None
)

# fetch log of revision history for this file, its in the most recent - least recent order
since_revs = self.git_wrapper.get_revision_history(cve_id, cve_rel_path, saved_cve_last_processed_rev)
Expand Down Expand Up @@ -1082,7 +1084,7 @@ def _resolve_patches_using_history(
self.logger.debug("Merge with saved state resolved all relevant patches")

metrics["time_elapsed"] = int((time.time() - t) * 1000) / float(1000)
self.logger.trace("metrics from processing revision history for {}: {}".format(cve_rel_path, metrics))
self.logger.trace(f"metrics from processing revision history for {cve_rel_path}: {metrics}")

return resolved_patches, pending_dpt_list, cve_latest_rev

Expand Down
Loading