Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert #284 + fix namespace resolution for quality gate testing #307

Merged
merged 5 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,020 changes: 509 additions & 511 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ types-requests = "^2.28.11.7"
mypy = "^1.1"
radon = ">=5.1,<7.0"
dunamai = "^1.15.0"
ruff = ">=0.0.254,<0.0.292"
ruff = "^0.0.254"
yardstick = {git = "https://github.com/anchore/yardstick", rev = "v0.7.0"}
tabulate = "0.9.0"

Expand Down
2 changes: 1 addition & 1 deletion src/vunnel/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def clear_provider(cfg: config.Application, provider_names: str, _input: bool, r
@click.argument("provider_names", metavar="PROVIDER", nargs=-1)
@click.option("--show-empty", default=False, is_flag=True, help="show providers with no state")
@click.pass_obj
def status_provider(cfg: config.Application, provider_names: str, show_empty: bool) -> None: # noqa: C901
def status_provider(cfg: config.Application, provider_names: str, show_empty: bool) -> None:
print(cfg.root)
selected_names = provider_names if provider_names else providers.names()

Expand Down
3 changes: 1 addition & 2 deletions src/vunnel/cli/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ def _normalize_name(name: str) -> str:
@dataclass
class Log:
slim: bool = os.environ.get("VUNNEL_LOG_SLIM", default="false") == "true"
level: str = os.environ.get("VUNNEL_LOG_LEVEL", default="INFO") # noqa: RUF009, str is immutable so RUF009 does not apply
# see https://docs.astral.sh/ruff/rules/function-call-in-dataclass-default-argument/
level: str = os.environ.get("VUNNEL_LOG_LEVEL", default="INFO")
show_timestamp: bool = os.environ.get("VUNNEL_LOG_SHOW_TIMESTAMP", default="false") == "true"
show_level: bool = os.environ.get("VUNNEL_LOG_SHOW_LEVEL", default="true") == "true"

Expand Down
6 changes: 3 additions & 3 deletions src/vunnel/providers/amazon/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import re
from collections import namedtuple
from html.parser import HTMLParser
from typing import ClassVar

import defusedxml.ElementTree as ET
import requests
Expand Down Expand Up @@ -97,7 +96,8 @@ def _get_alas_html(self, alas_url, alas_file, skip_if_exists=True):
if skip_if_exists and os.path.exists(alas_file): # read alas from disk if its available
self.logger.debug(f"loading existing ALAS from {alas_file}")
with open(alas_file, encoding="utf-8") as fp:
return fp.read()
content = fp.read()
return content

try:
self.logger.debug(f"downloading ALAS from {alas_url}")
Expand Down Expand Up @@ -210,7 +210,7 @@ def __init__(self):

class PackagesHTMLParser(HTMLParser):
_new_packages_tuple_ = ("id", "new_packages")
_arch_list_: ClassVar[list[str]] = ["x86_64:", "noarch:", "src:"]
_arch_list_ = ["x86_64:", "noarch:", "src:"]

def __init__(self):
self.fixes = []
Expand Down
216 changes: 97 additions & 119 deletions src/vunnel/providers/debian/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@


# Only releases presenting this mapping will be output by the driver, maintain it with new releases.
# Can also be extended via configuration
# Can also be extended via configuration.
debian_distro_map = {
"trixie": "13",
"bookworm": "12",
Expand Down Expand Up @@ -129,7 +129,7 @@ def _get_cve_to_dsalist(self, dsa):
return ns_cve_dsalist

# noqa
def _parse_dsa_record(self, dsa_lines): # noqa: C901
def _parse_dsa_record(self, dsa_lines):
"""

:param dsa_lines:
Expand Down Expand Up @@ -281,22 +281,17 @@ def _normalize_json(self, ns_cve_dsalist=None): # noqa: PLR0912,PLR0915
if ns_cve_dsalist is None:
ns_cve_dsalist = {}

vuln_records = self.get_vuln_records(ns_cve_dsalist, adv_mets, data)

adv_mets.clear()
# all_dsas.clear()
# all_matched_dsas.clear()

return vuln_records

def get_vuln_records(self, ns_cve_dsalist, adv_mets, data): # noqa: PLR0912, C901
vuln_records = {}

for pkg in data:
# only process CVEs
for vid in filter(lambda x: re.match("^CVE.*", x), data[pkg]):
for vid in data[pkg]:
# skip non CVE vids
if not re.match("^CVE.*", vid):
continue

# gather NVD data for this CVE. Pulling this logic out of the for loop as NVD data remains the same
# regardless of the debian release.
# nvd_severity = None
nvd_severity = None
# if session:
# try:
# nvd_severity = nvd.get_severity(vid, session=session)
Expand Down Expand Up @@ -331,13 +326,37 @@ def get_vuln_records(self, ns_cve_dsalist, adv_mets, data): # noqa: PLR0912, C9
vuln_record = vuln_records[relno][vid]

# populate the static information about the new vuln record
self.populate_static_information(vid, vulnerability_data, relno, vuln_record)
vuln_record["Vulnerability"]["Description"] = vulnerability_data.get("description", "")
vuln_record["Vulnerability"]["Name"] = str(vid)
vuln_record["Vulnerability"]["NamespaceName"] = "debian:" + str(relno)
vuln_record["Vulnerability"]["Link"] = "https://security-tracker.debian.org/tracker/" + str(vid)
vuln_record["Vulnerability"]["Severity"] = "Unknown"
else:
vuln_record = vuln_records[relno][vid]

# set severity
# from https://anonscm.debian.org/viewvc/secure-testing/bin/tracker_service.py
sev = self.get_severity(distro_record)
sev = None
if "urgency" in distro_record:
if distro_record["urgency"] in ["low", "low**"]:
sev = "Low"
elif distro_record["urgency"] in ["medium", "medium**"]:
sev = "Medium"
elif distro_record["urgency"] in ["high", "high**"]:
sev = "High"
elif distro_record["urgency"] in [
"unimportant",
"end-of-life",
]:
sev = "Negligible"
elif nvd_severity: # no match to urgency found
sev = nvd_severity # fallback to nvd severity
else:
sev = "Unknown"
elif nvd_severity: # urgency element is not present
sev = nvd_severity # fallback to nvd severity
else:
sev = "Unknown"

if (
sev
Expand All @@ -347,24 +366,72 @@ def get_vuln_records(self, ns_cve_dsalist, adv_mets, data): # noqa: PLR0912, C9
vuln_record["Vulnerability"]["Severity"] = sev

# add fixedIn
skip_fixedin, fixed_el = self.add_fixedin_info(pkg, distro_record, relno)
skip_fixedin = False
fixed_el = {
"Name": pkg,
"NamespaceName": "debian:" + str(relno),
"VersionFormat": "dpkg",
}

if "fixed_version" in distro_record:
fixed_el["Version"] = distro_record["fixed_version"]
if distro_record["fixed_version"] == "0":
# version == 0 should mean that the
# package was determined to not be
# vulnerable in the distro namespace
# (from reviewing
# https://security-tracker.debian.org/tracker/)
skip_fixedin = True
else:
fixed_el["Version"] = "None"

if not skip_fixedin:
# collect metrics for vendor advisory
met_ns, met_sev = self.collect_vuln_metrics(adv_mets, vuln_record)
sev_dict = adv_mets[met_ns][met_sev]
met_ns = vuln_record["Vulnerability"]["NamespaceName"]
met_sev = vuln_record["Vulnerability"]["Severity"]

if met_ns not in adv_mets:
adv_mets[met_ns] = {
met_sev: {
"dsa": {"fixed": 0, "notfixed": 0},
"nodsa": {"fixed": 0, "notfixed": 0},
"neither": {"fixed": 0, "notfixed": 0},
},
}

if met_sev not in adv_mets[met_ns]:
adv_mets[met_ns][met_sev] = {
"dsa": {"fixed": 0, "notfixed": 0},
"nodsa": {"fixed": 0, "notfixed": 0},
"neither": {"fixed": 0, "notfixed": 0},
}

# find DSAs associated with the CVE and package in the namespace
matched_dsas = [dsa for dsa in ns_cve_dsalist.get(rel, {}).get(vid, []) if dsa.pkg == pkg]
sev_count_key = "notfixed" if fixed_el["Version"] == "None" else "fixed"

# add vendor advisory information to the fixed in record
fixed_el["VendorAdvisory"] = self.add_advisory_info(
sev_dict,
distro_record,
matched_dsas,
sev_count_key,
)
if matched_dsas:
fixed_el["VendorAdvisory"] = {
"NoAdvisory": False,
"AdvisorySummary": [{"ID": x.dsa, "Link": x.link} for x in matched_dsas],
}
# all_matched_dsas |= set([x.dsa for x in matched_dsas])
adv_mets[met_ns][met_sev]["dsa"][
"notfixed" if fixed_el["Version"] == "None" else "fixed"
] += 1
elif "nodsa" in distro_record:
fixed_el["VendorAdvisory"] = {"NoAdvisory": True}
adv_mets[met_ns][met_sev]["nodsa"][
"notfixed" if fixed_el["Version"] == "None" else "fixed"
] += 1
else:
fixed_el["VendorAdvisory"] = {
"NoAdvisory": False,
"AdvisorySummary": [],
}
adv_mets[met_ns][met_sev]["neither"][
"notfixed" if fixed_el["Version"] == "None" else "fixed"
] += 1

# append fixed in record to vulnerability
vuln_record["Vulnerability"]["FixedIn"].append(fixed_el)
Expand All @@ -381,101 +448,12 @@ def get_vuln_records(self, ns_cve_dsalist, adv_mets, data): # noqa: PLR0912, C9
self.logger.exception(f"ignoring error parsing vuln: {vid}, pkg: {pkg}, rel: {rel}")

self.logger.debug(f"metrics for advisory information: {json.dumps(adv_mets)}")
return vuln_records

def add_advisory_info(self, sev_dict, distro_record, matched_dsas, sev_count_key):
vendor_advisory = None
if matched_dsas:
vendor_advisory = {
"NoAdvisory": False,
"AdvisorySummary": [{"ID": x.dsa, "Link": x.link} for x in matched_dsas],
}
# all_matched_dsas |= set([x.dsa for x in matched_dsas])
sev_dict["dsa"][sev_count_key] += 1
elif "nodsa" in distro_record:
vendor_advisory = {"NoAdvisory": True}
sev_dict["nodsa"][sev_count_key] += 1
else:
vendor_advisory = {
"NoAdvisory": False,
"AdvisorySummary": [],
}
sev_dict["neither"][sev_count_key] += 1
return vendor_advisory

def collect_vuln_metrics(self, adv_mets, vuln_record):
met_ns = vuln_record["Vulnerability"]["NamespaceName"]
met_sev = vuln_record["Vulnerability"]["Severity"]

if met_ns not in adv_mets:
adv_mets[met_ns] = {
met_sev: {
"dsa": {"fixed": 0, "notfixed": 0},
"nodsa": {"fixed": 0, "notfixed": 0},
"neither": {"fixed": 0, "notfixed": 0},
},
}

if met_sev not in adv_mets[met_ns]:
adv_mets[met_ns][met_sev] = {
"dsa": {"fixed": 0, "notfixed": 0},
"nodsa": {"fixed": 0, "notfixed": 0},
"neither": {"fixed": 0, "notfixed": 0},
}
adv_mets.clear()
# all_dsas.clear()
# all_matched_dsas.clear()

return met_ns, met_sev

def add_fixedin_info(self, pkg, distro_record, relno):
skip_fixedin = False
fixed_el = {
"Name": pkg,
"NamespaceName": "debian:" + str(relno),
"VersionFormat": "dpkg",
}

if "fixed_version" in distro_record:
fixed_el["Version"] = distro_record["fixed_version"]
if distro_record["fixed_version"] == "0":
# version == 0 should mean that the
# package was determined to not be
# vulnerable in the distro namespace
# (from reviewing
# https://security-tracker.debian.org/tracker/)
skip_fixedin = True
else:
fixed_el["Version"] = "None"
return skip_fixedin, fixed_el

def populate_static_information(self, vid, vulnerability_data, relno, vuln_record):
vuln_record["Vulnerability"]["Description"] = vulnerability_data.get("description", "")
vuln_record["Vulnerability"]["Name"] = str(vid)
vuln_record["Vulnerability"]["NamespaceName"] = "debian:" + str(relno)
vuln_record["Vulnerability"]["Link"] = "https://security-tracker.debian.org/tracker/" + str(vid)
vuln_record["Vulnerability"]["Severity"] = "Unknown"

def get_severity(self, nvd_severity, distro_record):
sev = None
if "urgency" in distro_record:
if distro_record["urgency"] in ["low", "low**"]:
sev = "Low"
elif distro_record["urgency"] in ["medium", "medium**"]:
sev = "Medium"
elif distro_record["urgency"] in ["high", "high**"]:
sev = "High"
elif distro_record["urgency"] in [
"unimportant",
"end-of-life",
]:
sev = "Negligible"
elif nvd_severity: # no match to urgency found
sev = nvd_severity # fallback to nvd severity
else:
sev = "Unknown"
elif nvd_severity: # urgency element is not present
sev = nvd_severity # fallback to nvd severity
else:
sev = "Unknown"
return sev
return vuln_records

def _get_legacy_records(self):
legacy_records = {}
Expand Down
11 changes: 7 additions & 4 deletions src/vunnel/providers/github/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,9 +429,9 @@ def graphql_advisories(cursor=None, timestamp=None, vuln_cursor=None):
"%sclassifications: [GENERAL, MALWARE], first: 100, orderBy: {field: UPDATED_AT, direction: ASC}" % vuln_after
)

return f"""
return """
{{
{caller} {{
{} {{
nodes {{
ghsaId
classification
Expand All @@ -448,7 +448,7 @@ def graphql_advisories(cursor=None, timestamp=None, vuln_cursor=None):
references {{
url
}}
vulnerabilities({vulnerabilities}) {{
vulnerabilities({}) {{
pageInfo {{
endCursor
hasNextPage
Expand Down Expand Up @@ -476,7 +476,10 @@ def graphql_advisories(cursor=None, timestamp=None, vuln_cursor=None):
}}
}}
}}
"""
""".format(
caller,
vulnerabilities,
)


class NodeParser(dict):
Expand Down
2 changes: 1 addition & 1 deletion src/vunnel/providers/mariner/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def __init__(self, oval_file_path: str, logger: logging.Logger):
fail_on_unknown_properties=False,
)
xml_parser = XmlParser(config=parser_config)
root = etree.parse(oval_file_path) # noqa: S320 # not parsing untrusted input
root = etree.parse(oval_file_path)
nsmap = etree.XPath("/*")(root)[0].nsmap
default = nsmap[None]
nsmap["default"] = default
Expand Down
2 changes: 1 addition & 1 deletion src/vunnel/providers/wolfi/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def __init__( # noqa: PLR0913
url: str,
namespace: str,
download_timeout: int = 125,
logger: logging.Logger | None = None, # noqa: PLR0913
logger: logging.Logger = None, # noqa: PLR0913
):
self.download_timeout = download_timeout
self.secdb_dir_path = os.path.join(workspace.input_path, self._secdb_dir_)
Expand Down
2 changes: 1 addition & 1 deletion src/vunnel/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def wrapper(*args: Any, **kwargs: Any) -> Any:
logger.exception(f"failed after {retries} retries")
raise

sleep = backoff_in_seconds * 2**attempt + random.uniform(0, 1) # nosec # noqa: S311
sleep = backoff_in_seconds * 2**attempt + random.uniform(0, 1) # nosec
logger.warning(f"{f} failed. Retrying in {int(sleep)} seconds (attempt {attempt+1} of {retries})")
time.sleep(sleep)
attempt += 1
Expand Down
Loading