From 41ad23b7dd2ea65a81db936997cd9dd1fb436e11 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 15 Sep 2023 21:40:36 +0000 Subject: [PATCH 1/4] chore(deps-dev): Bump ruff from 0.0.254 to 0.0.290 Bumps [ruff](https://github.com/astral-sh/ruff) from 0.0.254 to 0.0.290. - [Release notes](https://github.com/astral-sh/ruff/releases) - [Changelog](https://github.com/astral-sh/ruff/blob/main/BREAKING_CHANGES.md) - [Commits](https://github.com/astral-sh/ruff/compare/v0.0.254...v0.0.290) --- updated-dependencies: - dependency-name: ruff dependency-type: direct:development update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- poetry.lock | 38 +++++++++++++++++++------------------- pyproject.toml | 2 +- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/poetry.lock b/poetry.lock index 9691dcc2..0dbe3e1a 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1618,28 +1618,28 @@ files = [ [[package]] name = "ruff" -version = "0.0.254" +version = "0.0.290" description = "An extremely fast Python linter, written in Rust." optional = false python-versions = ">=3.7" files = [ - {file = "ruff-0.0.254-py3-none-macosx_10_7_x86_64.whl", hash = "sha256:dd58c500d039fb381af8d861ef456c3e94fd6855c3d267d6c6718c9a9fe07be0"}, - {file = "ruff-0.0.254-py3-none-macosx_10_9_x86_64.macosx_11_0_arm64.macosx_10_9_universal2.whl", hash = "sha256:688379050ae05394a6f9f9c8471587fd5dcf22149bd4304a4ede233cc4ef89a1"}, - {file = "ruff-0.0.254-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ac1429be6d8bd3db0bf5becac3a38bd56f8421447790c50599cd90fd53417ec4"}, - {file = "ruff-0.0.254-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:059a380c08e849b6f312479b18cc63bba2808cff749ad71555f61dd930e3c9a2"}, - {file = "ruff-0.0.254-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:b3f15d5d033fd3dcb85d982d6828ddab94134686fac2c02c13a8822aa03e1321"}, - {file = "ruff-0.0.254-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:8deba44fd563361c488dedec90dc330763ee0c01ba54e17df54ef5820079e7e0"}, - {file = "ruff-0.0.254-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:ef20bf798ffe634090ad3dc2e8aa6a055f08c448810a2f800ab716cc18b80107"}, - {file = "ruff-0.0.254-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:0deb1d7226ea9da9b18881736d2d96accfa7f328c67b7410478cc064ad1fa6aa"}, - {file = "ruff-0.0.254-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:27d39d697fdd7df1f2a32c1063756ee269ad8d5345c471ee3ca450636d56e8c6"}, - {file = "ruff-0.0.254-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:2fc21d060a3197ac463596a97d9b5db2d429395938b270ded61dd60f0e57eb21"}, - {file = "ruff-0.0.254-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:f70dc93bc9db15cccf2ed2a831938919e3e630993eeea6aba5c84bc274237885"}, - {file = "ruff-0.0.254-py3-none-musllinux_1_2_i686.whl", hash = "sha256:09c764bc2bd80c974f7ce1f73a46092c286085355a5711126af351b9ae4bea0c"}, - {file = "ruff-0.0.254-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:d4385cdd30153b7aa1d8f75dfd1ae30d49c918ead7de07e69b7eadf0d5538a1f"}, - {file = "ruff-0.0.254-py3-none-win32.whl", hash = "sha256:c38291bda4c7b40b659e8952167f386e86ec29053ad2f733968ff1d78b4c7e15"}, - {file = "ruff-0.0.254-py3-none-win_amd64.whl", hash = "sha256:e15742df0f9a3615fbdc1ee9a243467e97e75bf88f86d363eee1ed42cedab1ec"}, - {file = "ruff-0.0.254-py3-none-win_arm64.whl", hash = "sha256:b435afc4d65591399eaf4b2af86e441a71563a2091c386cadf33eaa11064dc09"}, - {file = "ruff-0.0.254.tar.gz", hash = "sha256:0eb66c9520151d3bd950ea43b3a088618a8e4e10a5014a72687881e6f3606312"}, + {file = "ruff-0.0.290-py3-none-macosx_10_7_x86_64.whl", hash = "sha256:0e2b09ac4213b11a3520221083866a5816616f3ae9da123037b8ab275066fbac"}, + {file = "ruff-0.0.290-py3-none-macosx_10_9_x86_64.macosx_11_0_arm64.macosx_10_9_universal2.whl", hash = "sha256:4ca6285aa77b3d966be32c9a3cd531655b3d4a0171e1f9bf26d66d0372186767"}, + {file = "ruff-0.0.290-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:35e3550d1d9f2157b0fcc77670f7bb59154f223bff281766e61bdd1dd854e0c5"}, + {file = "ruff-0.0.290-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:d748c8bd97874f5751aed73e8dde379ce32d16338123d07c18b25c9a2796574a"}, + {file = "ruff-0.0.290-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:982af5ec67cecd099e2ef5e238650407fb40d56304910102d054c109f390bf3c"}, + {file = "ruff-0.0.290-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:bbd37352cea4ee007c48a44c9bc45a21f7ba70a57edfe46842e346651e2b995a"}, + {file = "ruff-0.0.290-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:1d9be6351b7889462912e0b8185a260c0219c35dfd920fb490c7f256f1d8313e"}, + {file = "ruff-0.0.290-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:75cdc7fe32dcf33b7cec306707552dda54632ac29402775b9e212a3c16aad5e6"}, + {file = "ruff-0.0.290-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:eb07f37f7aecdbbc91d759c0c09870ce0fb3eed4025eebedf9c4b98c69abd527"}, + {file = "ruff-0.0.290-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:2ab41bc0ba359d3f715fc7b705bdeef19c0461351306b70a4e247f836b9350ed"}, + {file = "ruff-0.0.290-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:150bf8050214cea5b990945b66433bf9a5e0cef395c9bc0f50569e7de7540c86"}, + {file = "ruff-0.0.290-py3-none-musllinux_1_2_i686.whl", hash = "sha256:75386ebc15fe5467248c039f5bf6a0cfe7bfc619ffbb8cd62406cd8811815fca"}, + {file = "ruff-0.0.290-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:ac93eadf07bc4ab4c48d8bb4e427bf0f58f3a9c578862eb85d99d704669f5da0"}, + {file = "ruff-0.0.290-py3-none-win32.whl", hash = "sha256:461fbd1fb9ca806d4e3d5c745a30e185f7cf3ca77293cdc17abb2f2a990ad3f7"}, + {file = "ruff-0.0.290-py3-none-win_amd64.whl", hash = "sha256:f1f49f5ec967fd5778813780b12a5650ab0ebcb9ddcca28d642c689b36920796"}, + {file = "ruff-0.0.290-py3-none-win_arm64.whl", hash = "sha256:ae5a92dfbdf1f0c689433c223f8dac0782c2b2584bd502dfdbc76475669f1ba1"}, + {file = "ruff-0.0.290.tar.gz", hash = "sha256:949fecbc5467bb11b8db810a7fa53c7e02633856ee6bd1302b2f43adcd71b88d"}, ] [[package]] @@ -2095,4 +2095,4 @@ testing = ["big-O", "flake8 (<5)", "jaraco.functools", "jaraco.itertools", "more [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "26f42c13f138b04e1bf7df0741a95735f0705d9fac1732d9094be69c61e3d10d" +content-hash = "b731a5aac6183e0bf9c777ef272fb2ddf01236f5034af6e23fdf770bc75e05d7" diff --git a/pyproject.toml b/pyproject.toml index fb0779d6..e94dfc73 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -76,7 +76,7 @@ types-requests = "^2.28.11.7" mypy = "^1.1" radon = ">=5.1,<7.0" dunamai = "^1.15.0" -ruff = "^0.0.254" +ruff = ">=0.0.254,<0.0.291" yardstick = {git = "https://github.com/anchore/yardstick", rev = "v0.7.0"} tabulate = "0.9.0" From f83b3c8edeecccfd59bde678f9811e14aab0dae8 Mon Sep 17 00:00:00 2001 From: Will Murphy Date: Fri, 22 Sep 2023 09:52:47 -0400 Subject: [PATCH 2/4] big refactor of debian parser to appease ruff Signed-off-by: Will Murphy --- src/vunnel/cli/cli.py | 2 +- src/vunnel/cli/config.py | 3 +- src/vunnel/providers/amazon/parser.py | 3 +- src/vunnel/providers/debian/parser.py | 202 ++++++++++++++------------ src/vunnel/providers/github/parser.py | 11 +- src/vunnel/providers/wolfi/parser.py | 2 +- 6 files changed, 121 insertions(+), 102 deletions(-) diff --git a/src/vunnel/cli/cli.py b/src/vunnel/cli/cli.py index 66e528d5..87638fa9 100644 --- a/src/vunnel/cli/cli.py +++ b/src/vunnel/cli/cli.py @@ -179,7 +179,7 @@ def clear_provider(cfg: config.Application, provider_names: str, _input: bool, r @click.argument("provider_names", metavar="PROVIDER", nargs=-1) @click.option("--show-empty", default=False, is_flag=True, help="show providers with no state") @click.pass_obj -def status_provider(cfg: config.Application, provider_names: str, show_empty: bool) -> None: +def status_provider(cfg: config.Application, provider_names: str, show_empty: bool) -> None: # noqa: C901 print(cfg.root) selected_names = provider_names if provider_names else providers.names() diff --git a/src/vunnel/cli/config.py b/src/vunnel/cli/config.py index 1cc1105b..e5d1c258 100644 --- a/src/vunnel/cli/config.py +++ b/src/vunnel/cli/config.py @@ -40,7 +40,8 @@ def _normalize_name(name: str) -> str: @dataclass class Log: slim: bool = os.environ.get("VUNNEL_LOG_SLIM", default="false") == "true" - level: str = os.environ.get("VUNNEL_LOG_LEVEL", default="INFO") + level: str = os.environ.get("VUNNEL_LOG_LEVEL", default="INFO") # noqa: RUF009, str is immutable so RUF009 does not apply + # see https://docs.astral.sh/ruff/rules/function-call-in-dataclass-default-argument/ show_timestamp: bool = os.environ.get("VUNNEL_LOG_SHOW_TIMESTAMP", default="false") == "true" show_level: bool = os.environ.get("VUNNEL_LOG_SHOW_LEVEL", default="true") == "true" diff --git a/src/vunnel/providers/amazon/parser.py b/src/vunnel/providers/amazon/parser.py index ee5efb4b..97e21a7c 100644 --- a/src/vunnel/providers/amazon/parser.py +++ b/src/vunnel/providers/amazon/parser.py @@ -96,8 +96,7 @@ def _get_alas_html(self, alas_url, alas_file, skip_if_exists=True): if skip_if_exists and os.path.exists(alas_file): # read alas from disk if its available self.logger.debug(f"loading existing ALAS from {alas_file}") with open(alas_file, encoding="utf-8") as fp: - content = fp.read() - return content + return fp.read() try: self.logger.debug(f"downloading ALAS from {alas_url}") diff --git a/src/vunnel/providers/debian/parser.py b/src/vunnel/providers/debian/parser.py index ab57f630..560ffaad 100644 --- a/src/vunnel/providers/debian/parser.py +++ b/src/vunnel/providers/debian/parser.py @@ -129,7 +129,7 @@ def _get_cve_to_dsalist(self, dsa): return ns_cve_dsalist # noqa - def _parse_dsa_record(self, dsa_lines): + def _parse_dsa_record(self, dsa_lines): # noqa: C901 """ :param dsa_lines: @@ -281,8 +281,18 @@ def _normalize_json(self, ns_cve_dsalist=None): # noqa: PLR0912,PLR0915 if ns_cve_dsalist is None: ns_cve_dsalist = {} - vuln_records = {} + vuln_records = self.get_vuln_records(ns_cve_dsalist, adv_mets, data) + + adv_mets.clear() + # all_dsas.clear() + # all_matched_dsas.clear() + + return vuln_records + + def get_vuln_records(self, ns_cve_dsalist, adv_mets, data): + vuln_records = {} + # TODO: refactor such that each loop iteration sets a key in vuln records for pkg in data: for vid in data[pkg]: # skip non CVE vids @@ -326,37 +336,13 @@ def _normalize_json(self, ns_cve_dsalist=None): # noqa: PLR0912,PLR0915 vuln_record = vuln_records[relno][vid] # populate the static information about the new vuln record - vuln_record["Vulnerability"]["Description"] = vulnerability_data.get("description", "") - vuln_record["Vulnerability"]["Name"] = str(vid) - vuln_record["Vulnerability"]["NamespaceName"] = "debian:" + str(relno) - vuln_record["Vulnerability"]["Link"] = "https://security-tracker.debian.org/tracker/" + str(vid) - vuln_record["Vulnerability"]["Severity"] = "Unknown" + self.populate_static_information(vid, vulnerability_data, relno, vuln_record) else: vuln_record = vuln_records[relno][vid] # set severity # from https://anonscm.debian.org/viewvc/secure-testing/bin/tracker_service.py - sev = None - if "urgency" in distro_record: - if distro_record["urgency"] in ["low", "low**"]: - sev = "Low" - elif distro_record["urgency"] in ["medium", "medium**"]: - sev = "Medium" - elif distro_record["urgency"] in ["high", "high**"]: - sev = "High" - elif distro_record["urgency"] in [ - "unimportant", - "end-of-life", - ]: - sev = "Negligible" - elif nvd_severity: # no match to urgency found - sev = nvd_severity # fallback to nvd severity - else: - sev = "Unknown" - elif nvd_severity: # urgency element is not present - sev = nvd_severity # fallback to nvd severity - else: - sev = "Unknown" + sev = self.get_severity(nvd_severity, distro_record) if ( sev @@ -366,72 +352,19 @@ def _normalize_json(self, ns_cve_dsalist=None): # noqa: PLR0912,PLR0915 vuln_record["Vulnerability"]["Severity"] = sev # add fixedIn - skip_fixedin = False - fixed_el = { - "Name": pkg, - "NamespaceName": "debian:" + str(relno), - "VersionFormat": "dpkg", - } - - if "fixed_version" in distro_record: - fixed_el["Version"] = distro_record["fixed_version"] - if distro_record["fixed_version"] == "0": - # version == 0 should mean that the - # package was determined to not be - # vulnerable in the distro namespace - # (from reviewing - # https://security-tracker.debian.org/tracker/) - skip_fixedin = True - else: - fixed_el["Version"] = "None" + skip_fixedin, fixed_el = self.add_fixedin_info(pkg, distro_record, relno) if not skip_fixedin: # collect metrics for vendor advisory - met_ns = vuln_record["Vulnerability"]["NamespaceName"] - met_sev = vuln_record["Vulnerability"]["Severity"] - - if met_ns not in adv_mets: - adv_mets[met_ns] = { - met_sev: { - "dsa": {"fixed": 0, "notfixed": 0}, - "nodsa": {"fixed": 0, "notfixed": 0}, - "neither": {"fixed": 0, "notfixed": 0}, - }, - } - - if met_sev not in adv_mets[met_ns]: - adv_mets[met_ns][met_sev] = { - "dsa": {"fixed": 0, "notfixed": 0}, - "nodsa": {"fixed": 0, "notfixed": 0}, - "neither": {"fixed": 0, "notfixed": 0}, - } + met_ns, met_sev = self.collect_vuln_metrics(adv_mets, vuln_record) # find DSAs associated with the CVE and package in the namespace matched_dsas = [dsa for dsa in ns_cve_dsalist.get(rel, {}).get(vid, []) if dsa.pkg == pkg] + sev_count_key = "notfixed" if fixed_el["Version"] == "None" else "fixed" # add vendor advisory information to the fixed in record - if matched_dsas: - fixed_el["VendorAdvisory"] = { - "NoAdvisory": False, - "AdvisorySummary": [{"ID": x.dsa, "Link": x.link} for x in matched_dsas], - } - # all_matched_dsas |= set([x.dsa for x in matched_dsas]) - adv_mets[met_ns][met_sev]["dsa"][ - "notfixed" if fixed_el["Version"] == "None" else "fixed" - ] += 1 - elif "nodsa" in distro_record: - fixed_el["VendorAdvisory"] = {"NoAdvisory": True} - adv_mets[met_ns][met_sev]["nodsa"][ - "notfixed" if fixed_el["Version"] == "None" else "fixed" - ] += 1 - else: - fixed_el["VendorAdvisory"] = { - "NoAdvisory": False, - "AdvisorySummary": [], - } - adv_mets[met_ns][met_sev]["neither"][ - "notfixed" if fixed_el["Version"] == "None" else "fixed" - ] += 1 + vendor_advisory = self.add_advisory_info(adv_mets, distro_record, met_ns, met_sev, matched_dsas, sev_count_key) + fixed_el["VendorAdvisory"] = vendor_advisory # append fixed in record to vulnerability vuln_record["Vulnerability"]["FixedIn"].append(fixed_el) @@ -448,12 +381,101 @@ def _normalize_json(self, ns_cve_dsalist=None): # noqa: PLR0912,PLR0915 self.logger.exception(f"ignoring error parsing vuln: {vid}, pkg: {pkg}, rel: {rel}") self.logger.debug(f"metrics for advisory information: {json.dumps(adv_mets)}") + return vuln_records - adv_mets.clear() - # all_dsas.clear() - # all_matched_dsas.clear() + def add_advisory_info(self, adv_mets, distro_record, met_ns, met_sev, matched_dsas, sev_count_key): + vendor_advisory = None + if matched_dsas: + vendor_advisory = { + "NoAdvisory": False, + "AdvisorySummary": [{"ID": x.dsa, "Link": x.link} for x in matched_dsas], + } + # all_matched_dsas |= set([x.dsa for x in matched_dsas]) + adv_mets[met_ns][met_sev]["dsa"][sev_count_key] += 1 + elif "nodsa" in distro_record: + vendor_advisory = {"NoAdvisory": True} + adv_mets[met_ns][met_sev]["nodsa"][sev_count_key] += 1 + else: + vendor_advisory = { + "NoAdvisory": False, + "AdvisorySummary": [], + } + adv_mets[met_ns][met_sev]["neither"][sev_count_key] += 1 + return vendor_advisory - return vuln_records + def collect_vuln_metrics(self, adv_mets, vuln_record): + met_ns = vuln_record["Vulnerability"]["NamespaceName"] + met_sev = vuln_record["Vulnerability"]["Severity"] + + if met_ns not in adv_mets: + adv_mets[met_ns] = { + met_sev: { + "dsa": {"fixed": 0, "notfixed": 0}, + "nodsa": {"fixed": 0, "notfixed": 0}, + "neither": {"fixed": 0, "notfixed": 0}, + }, + } + + if met_sev not in adv_mets[met_ns]: + adv_mets[met_ns][met_sev] = { + "dsa": {"fixed": 0, "notfixed": 0}, + "nodsa": {"fixed": 0, "notfixed": 0}, + "neither": {"fixed": 0, "notfixed": 0}, + } + + return met_ns,met_sev + + def add_fixedin_info(self, pkg, distro_record, relno): + skip_fixedin = False + fixed_el = { + "Name": pkg, + "NamespaceName": "debian:" + str(relno), + "VersionFormat": "dpkg", + } + + if "fixed_version" in distro_record: + fixed_el["Version"] = distro_record["fixed_version"] + if distro_record["fixed_version"] == "0": + # version == 0 should mean that the + # package was determined to not be + # vulnerable in the distro namespace + # (from reviewing + # https://security-tracker.debian.org/tracker/) + skip_fixedin = True + else: + fixed_el["Version"] = "None" + return skip_fixedin,fixed_el + + def populate_static_information(self, vid, vulnerability_data, relno, vuln_record): + vuln_record["Vulnerability"]["Description"] = vulnerability_data.get("description", "") + vuln_record["Vulnerability"]["Name"] = str(vid) + vuln_record["Vulnerability"]["NamespaceName"] = "debian:" + str(relno) + vuln_record["Vulnerability"]["Link"] = "https://security-tracker.debian.org/tracker/" + str(vid) + vuln_record["Vulnerability"]["Severity"] = "Unknown" + + def get_severity(self, nvd_severity, distro_record): + sev = None + if "urgency" in distro_record: + if distro_record["urgency"] in ["low", "low**"]: + sev = "Low" + elif distro_record["urgency"] in ["medium", "medium**"]: + sev = "Medium" + elif distro_record["urgency"] in ["high", "high**"]: + sev = "High" + elif distro_record["urgency"] in [ + "unimportant", + "end-of-life", + ]: + sev = "Negligible" + elif nvd_severity: # no match to urgency found + sev = nvd_severity # fallback to nvd severity + else: + sev = "Unknown" + elif nvd_severity: # urgency element is not present + sev = nvd_severity # fallback to nvd severity + else: + sev = "Unknown" + return sev def _get_legacy_records(self): legacy_records = {} diff --git a/src/vunnel/providers/github/parser.py b/src/vunnel/providers/github/parser.py index 95e9d4cb..e3791731 100644 --- a/src/vunnel/providers/github/parser.py +++ b/src/vunnel/providers/github/parser.py @@ -427,9 +427,9 @@ def graphql_advisories(cursor=None, timestamp=None, vuln_cursor=None): "%sclassifications: [GENERAL, MALWARE], first: 100, orderBy: {field: UPDATED_AT, direction: ASC}" % vuln_after ) - return """ + return f""" {{ - {} {{ + {caller} {{ nodes {{ ghsaId classification @@ -446,7 +446,7 @@ def graphql_advisories(cursor=None, timestamp=None, vuln_cursor=None): references {{ url }} - vulnerabilities({}) {{ + vulnerabilities({vulnerabilities}) {{ pageInfo {{ endCursor hasNextPage @@ -474,10 +474,7 @@ def graphql_advisories(cursor=None, timestamp=None, vuln_cursor=None): }} }} }} - """.format( - caller, - vulnerabilities, - ) + """ class NodeParser(dict): diff --git a/src/vunnel/providers/wolfi/parser.py b/src/vunnel/providers/wolfi/parser.py index 5edc294a..84d25e81 100644 --- a/src/vunnel/providers/wolfi/parser.py +++ b/src/vunnel/providers/wolfi/parser.py @@ -23,7 +23,7 @@ def __init__( # noqa: PLR0913 url: str, namespace: str, download_timeout: int = 125, - logger: logging.Logger = None, # noqa: PLR0913 + logger: logging.Logger | None = None, # noqa: PLR0913 ): self.download_timeout = download_timeout self.secdb_dir_path = os.path.join(workspace.input_path, self._secdb_dir_) From b207d53b05c4501cabf6c3bfd15aeff28a26e025 Mon Sep 17 00:00:00 2001 From: Will Murphy Date: Fri, 22 Sep 2023 11:06:30 -0400 Subject: [PATCH 3/4] more linter fixes Signed-off-by: Will Murphy --- src/vunnel/providers/amazon/parser.py | 3 +- src/vunnel/providers/debian/parser.py | 72 +++++++++++++++------------ 2 files changed, 41 insertions(+), 34 deletions(-) diff --git a/src/vunnel/providers/amazon/parser.py b/src/vunnel/providers/amazon/parser.py index 97e21a7c..1fe5d4fa 100644 --- a/src/vunnel/providers/amazon/parser.py +++ b/src/vunnel/providers/amazon/parser.py @@ -5,6 +5,7 @@ import re from collections import namedtuple from html.parser import HTMLParser +from typing import ClassVar import defusedxml.ElementTree as ET import requests @@ -206,7 +207,7 @@ def __init__(self): class PackagesHTMLParser(HTMLParser): _new_packages_tuple_ = ("id", "new_packages") - _arch_list_ = ["x86_64:", "noarch:", "src:"] + _arch_list_: ClassVar[list[str]] = ["x86_64:", "noarch:", "src:"] def __init__(self): self.fixes = [] diff --git a/src/vunnel/providers/debian/parser.py b/src/vunnel/providers/debian/parser.py index 560ffaad..b14b49dc 100644 --- a/src/vunnel/providers/debian/parser.py +++ b/src/vunnel/providers/debian/parser.py @@ -281,7 +281,6 @@ def _normalize_json(self, ns_cve_dsalist=None): # noqa: PLR0912,PLR0915 if ns_cve_dsalist is None: ns_cve_dsalist = {} - vuln_records = self.get_vuln_records(ns_cve_dsalist, adv_mets, data) adv_mets.clear() @@ -363,7 +362,14 @@ def get_vuln_records(self, ns_cve_dsalist, adv_mets, data): sev_count_key = "notfixed" if fixed_el["Version"] == "None" else "fixed" # add vendor advisory information to the fixed in record - vendor_advisory = self.add_advisory_info(adv_mets, distro_record, met_ns, met_sev, matched_dsas, sev_count_key) + vendor_advisory = self.add_advisory_info( + adv_mets, + distro_record, + met_ns, + met_sev, + matched_dsas, + sev_count_key, + ) fixed_el["VendorAdvisory"] = vendor_advisory # append fixed in record to vulnerability @@ -387,19 +393,19 @@ def add_advisory_info(self, adv_mets, distro_record, met_ns, met_sev, matched_ds vendor_advisory = None if matched_dsas: vendor_advisory = { - "NoAdvisory": False, - "AdvisorySummary": [{"ID": x.dsa, "Link": x.link} for x in matched_dsas], - } - # all_matched_dsas |= set([x.dsa for x in matched_dsas]) + "NoAdvisory": False, + "AdvisorySummary": [{"ID": x.dsa, "Link": x.link} for x in matched_dsas], + } + # all_matched_dsas |= set([x.dsa for x in matched_dsas]) adv_mets[met_ns][met_sev]["dsa"][sev_count_key] += 1 elif "nodsa" in distro_record: vendor_advisory = {"NoAdvisory": True} adv_mets[met_ns][met_sev]["nodsa"][sev_count_key] += 1 else: vendor_advisory = { - "NoAdvisory": False, - "AdvisorySummary": [], - } + "NoAdvisory": False, + "AdvisorySummary": [], + } adv_mets[met_ns][met_sev]["neither"][sev_count_key] += 1 return vendor_advisory @@ -409,42 +415,42 @@ def collect_vuln_metrics(self, adv_mets, vuln_record): if met_ns not in adv_mets: adv_mets[met_ns] = { - met_sev: { - "dsa": {"fixed": 0, "notfixed": 0}, - "nodsa": {"fixed": 0, "notfixed": 0}, - "neither": {"fixed": 0, "notfixed": 0}, - }, - } + met_sev: { + "dsa": {"fixed": 0, "notfixed": 0}, + "nodsa": {"fixed": 0, "notfixed": 0}, + "neither": {"fixed": 0, "notfixed": 0}, + }, + } if met_sev not in adv_mets[met_ns]: adv_mets[met_ns][met_sev] = { - "dsa": {"fixed": 0, "notfixed": 0}, - "nodsa": {"fixed": 0, "notfixed": 0}, - "neither": {"fixed": 0, "notfixed": 0}, - } + "dsa": {"fixed": 0, "notfixed": 0}, + "nodsa": {"fixed": 0, "notfixed": 0}, + "neither": {"fixed": 0, "notfixed": 0}, + } - return met_ns,met_sev + return met_ns, met_sev def add_fixedin_info(self, pkg, distro_record, relno): skip_fixedin = False fixed_el = { - "Name": pkg, - "NamespaceName": "debian:" + str(relno), - "VersionFormat": "dpkg", - } + "Name": pkg, + "NamespaceName": "debian:" + str(relno), + "VersionFormat": "dpkg", + } if "fixed_version" in distro_record: fixed_el["Version"] = distro_record["fixed_version"] if distro_record["fixed_version"] == "0": - # version == 0 should mean that the - # package was determined to not be - # vulnerable in the distro namespace - # (from reviewing - # https://security-tracker.debian.org/tracker/) + # version == 0 should mean that the + # package was determined to not be + # vulnerable in the distro namespace + # (from reviewing + # https://security-tracker.debian.org/tracker/) skip_fixedin = True else: fixed_el["Version"] = "None" - return skip_fixedin,fixed_el + return skip_fixedin, fixed_el def populate_static_information(self, vid, vulnerability_data, relno, vuln_record): vuln_record["Vulnerability"]["Description"] = vulnerability_data.get("description", "") @@ -463,9 +469,9 @@ def get_severity(self, nvd_severity, distro_record): elif distro_record["urgency"] in ["high", "high**"]: sev = "High" elif distro_record["urgency"] in [ - "unimportant", - "end-of-life", - ]: + "unimportant", + "end-of-life", + ]: sev = "Negligible" elif nvd_severity: # no match to urgency found sev = nvd_severity # fallback to nvd severity From a978b6169dba6c34538e00a459cf18a4961546c8 Mon Sep 17 00:00:00 2001 From: Will Murphy Date: Fri, 22 Sep 2023 11:36:17 -0400 Subject: [PATCH 4/4] more linter appeasing Signed-off-by: Will Murphy --- src/vunnel/providers/debian/parser.py | 30 +++++++++++--------------- src/vunnel/providers/mariner/parser.py | 2 +- src/vunnel/utils/__init__.py | 2 +- 3 files changed, 14 insertions(+), 20 deletions(-) diff --git a/src/vunnel/providers/debian/parser.py b/src/vunnel/providers/debian/parser.py index b14b49dc..9896aeb9 100644 --- a/src/vunnel/providers/debian/parser.py +++ b/src/vunnel/providers/debian/parser.py @@ -289,18 +289,14 @@ def _normalize_json(self, ns_cve_dsalist=None): # noqa: PLR0912,PLR0915 return vuln_records - def get_vuln_records(self, ns_cve_dsalist, adv_mets, data): + def get_vuln_records(self, ns_cve_dsalist, adv_mets, data): # noqa: PLR0912, C901 vuln_records = {} - # TODO: refactor such that each loop iteration sets a key in vuln records for pkg in data: - for vid in data[pkg]: - # skip non CVE vids - if not re.match("^CVE.*", vid): - continue - + # only process CVEs + for vid in filter(lambda x: re.match("^CVE.*", x), data[pkg]): # gather NVD data for this CVE. Pulling this logic out of the for loop as NVD data remains the same # regardless of the debian release. - nvd_severity = None + # nvd_severity = None # if session: # try: # nvd_severity = nvd.get_severity(vid, session=session) @@ -341,7 +337,7 @@ def get_vuln_records(self, ns_cve_dsalist, adv_mets, data): # set severity # from https://anonscm.debian.org/viewvc/secure-testing/bin/tracker_service.py - sev = self.get_severity(nvd_severity, distro_record) + sev = self.get_severity(distro_record) if ( sev @@ -356,21 +352,19 @@ def get_vuln_records(self, ns_cve_dsalist, adv_mets, data): if not skip_fixedin: # collect metrics for vendor advisory met_ns, met_sev = self.collect_vuln_metrics(adv_mets, vuln_record) + sev_dict = adv_mets[met_ns][met_sev] # find DSAs associated with the CVE and package in the namespace matched_dsas = [dsa for dsa in ns_cve_dsalist.get(rel, {}).get(vid, []) if dsa.pkg == pkg] sev_count_key = "notfixed" if fixed_el["Version"] == "None" else "fixed" # add vendor advisory information to the fixed in record - vendor_advisory = self.add_advisory_info( - adv_mets, + fixed_el["VendorAdvisory"] = self.add_advisory_info( + sev_dict, distro_record, - met_ns, - met_sev, matched_dsas, sev_count_key, ) - fixed_el["VendorAdvisory"] = vendor_advisory # append fixed in record to vulnerability vuln_record["Vulnerability"]["FixedIn"].append(fixed_el) @@ -389,7 +383,7 @@ def get_vuln_records(self, ns_cve_dsalist, adv_mets, data): self.logger.debug(f"metrics for advisory information: {json.dumps(adv_mets)}") return vuln_records - def add_advisory_info(self, adv_mets, distro_record, met_ns, met_sev, matched_dsas, sev_count_key): + def add_advisory_info(self, sev_dict, distro_record, matched_dsas, sev_count_key): vendor_advisory = None if matched_dsas: vendor_advisory = { @@ -397,16 +391,16 @@ def add_advisory_info(self, adv_mets, distro_record, met_ns, met_sev, matched_ds "AdvisorySummary": [{"ID": x.dsa, "Link": x.link} for x in matched_dsas], } # all_matched_dsas |= set([x.dsa for x in matched_dsas]) - adv_mets[met_ns][met_sev]["dsa"][sev_count_key] += 1 + sev_dict["dsa"][sev_count_key] += 1 elif "nodsa" in distro_record: vendor_advisory = {"NoAdvisory": True} - adv_mets[met_ns][met_sev]["nodsa"][sev_count_key] += 1 + sev_dict["nodsa"][sev_count_key] += 1 else: vendor_advisory = { "NoAdvisory": False, "AdvisorySummary": [], } - adv_mets[met_ns][met_sev]["neither"][sev_count_key] += 1 + sev_dict["neither"][sev_count_key] += 1 return vendor_advisory def collect_vuln_metrics(self, adv_mets, vuln_record): diff --git a/src/vunnel/providers/mariner/parser.py b/src/vunnel/providers/mariner/parser.py index 35b301b4..a42e2050 100644 --- a/src/vunnel/providers/mariner/parser.py +++ b/src/vunnel/providers/mariner/parser.py @@ -31,7 +31,7 @@ def __init__(self, oval_file_path: str, logger: logging.Logger): fail_on_unknown_properties=False, ) xml_parser = XmlParser(config=parser_config) - root = etree.parse(oval_file_path) + root = etree.parse(oval_file_path) # noqa: S320 # not parsing untrusted input nsmap = etree.XPath("/*")(root)[0].nsmap default = nsmap[None] nsmap["default"] = default diff --git a/src/vunnel/utils/__init__.py b/src/vunnel/utils/__init__.py index b73b41e9..9604e758 100644 --- a/src/vunnel/utils/__init__.py +++ b/src/vunnel/utils/__init__.py @@ -32,7 +32,7 @@ def wrapper(*args: Any, **kwargs: Any) -> Any: logger.exception(f"failed after {retries} retries") raise - sleep = backoff_in_seconds * 2**attempt + random.uniform(0, 1) # nosec + sleep = backoff_in_seconds * 2**attempt + random.uniform(0, 1) # nosec # noqa: S311 logger.warning(f"{f} failed. Retrying in {int(sleep)} seconds (attempt {attempt+1} of {retries})") time.sleep(sleep) attempt += 1