Skip to content

Commit

Permalink
chore: remove file scoped noqa in RHEL parser
Browse files Browse the repository at this point in the history
Suppress lints that would require control flow changes; accept some
automatic fixes.

Signed-off-by: Will Murphy <[email protected]>
  • Loading branch information
willmurphyscode committed Oct 26, 2023
1 parent b3ff497 commit 536c233
Showing 1 changed file with 35 additions and 30 deletions.
65 changes: 35 additions & 30 deletions src/vunnel/providers/rhel/parser.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# flake8: noqa
from __future__ import annotations

import concurrent.futures
Expand Down Expand Up @@ -45,8 +44,14 @@ class Parser:
__full_dir_name__ = "full"
__last_full_sync_filename__ = "last_full_sync"

def __init__(
self, workspace, download_timeout=None, max_workers=None, full_sync_interval=None, skip_namespaces=None, logger=None
def __init__( # noqa: PLR0913
self,
workspace,
download_timeout=None,
max_workers=None,
full_sync_interval=None,
skip_namespaces=None,
logger=None,
):
self.workspace = workspace
self.cve_dir_path = os.path.join(workspace.input_path, self.__cve_dir_name__)
Expand All @@ -68,7 +73,7 @@ def _download_minimal_cves(self, page, limit=100):
path_params = {"per_page": str(limit), "page": page}

self.logger.info(
f"downloading CVE list from url={self.__summary_url__} count={path_params['per_page']} page={path_params['page']}"
f"downloading CVE list from url={self.__summary_url__} count={path_params['per_page']} page={path_params['page']}",
)
r = requests.get(
self.__summary_url__,
Expand Down Expand Up @@ -141,7 +146,7 @@ def _sync_cves(self, skip_if_exists=False, do_full_sync=True): # noqa
:return:
"""

now = dt.utcnow()
now = dt.utcnow() # noqa: DTZ003

# setup workspace for full cves
full_cve_dir = os.path.join(self.cve_dir_path, self.__full_dir_name__)
Expand Down Expand Up @@ -245,12 +250,12 @@ def _sync_cves(self, skip_if_exists=False, do_full_sync=True): # noqa

if api_cve_set.difference(fs_min_cve_set):
self.logger.warning(
f"CVEs reported by api missing min content on fs: {api_cve_set.difference(fs_min_cve_set)}"
f"CVEs reported by api missing min content on fs: {api_cve_set.difference(fs_min_cve_set)}",
)

if api_cve_set.difference(fs_full_cve_set):
self.logger.warning(
f"CVEs reported by api missing full content on fs: {api_cve_set.difference(fs_min_cve_set)}"
f"CVEs reported by api missing full content on fs: {api_cve_set.difference(fs_min_cve_set)}",
)
except Exception:
self.logger.debug("ignoring errors reconciling api cves with fs content", exc_info=True)
Expand Down Expand Up @@ -391,7 +396,7 @@ def _get_name_version(package):
else: # not compliant with rpm filename spec, could be an app stream
name = colon_comps[0] # best guess for name, fall back to rhsa for version lookup

else: # no epoch foo-bar-2.3.4-5.el6_7.8 or something else totally different
else: # no epoch foo-bar-2.3.4-5.el6_7.8 or something else totally different # noqa: PLR5501
if package.count("-") >= 2: #
name_other_comps = package.rsplit("-", 2) # split name-version-release.arch.rpm into max 3 chunks
name = name_other_comps[0] # only the name matters
Expand All @@ -401,7 +406,7 @@ def _get_name_version(package):

return name, version

def _parse_affected_release(self, cve_id: str, content) -> list[FixedIn]:
def _parse_affected_release(self, cve_id: str, content) -> list[FixedIn]: # noqa: C901, PLR0912, PLR0915
fixed_ins = []
ars = content.get("affected_release", [])

Expand Down Expand Up @@ -459,20 +464,20 @@ def _parse_affected_release(self, cve_id: str, content) -> list[FixedIn]:
final_m = rhsa_module
else:
self.logger.debug(
f"{cve_id}, platform={ar_obj.platform} : no matches found for {ar_obj.rhsa_id} and package={ar_obj.name} Falling back to CVE version {ar_obj.version}"
f"{cve_id}, platform={ar_obj.platform} : no matches found for {ar_obj.rhsa_id} and package={ar_obj.name} Falling back to CVE version {ar_obj.version}", # noqa: E501
)
final_v = ar_obj.version
final_m = rhsa_module
else:
self.logger.debug(
f"{cve_id}, platform={ar_obj.platform} : no associated RHSA for package={ar_obj.name} Falling back to CVE version {ar_obj.version}"
f"{cve_id}, platform={ar_obj.platform} : no associated RHSA for package={ar_obj.name} Falling back to CVE version {ar_obj.version}", # noqa: E501
)
final_v = ar_obj.version
final_m = None

elif ar_obj.rhsa_id: # package name missing but there's at least an rhsa ID to go off
self.logger.debug(
f"{cve_id}, platform={ar_obj.platform} : missing package, trying to find a match using {ar_obj.rhsa_id} and other affected releases"
f"{cve_id}, platform={ar_obj.platform} : missing package, trying to find a match using {ar_obj.rhsa_id} and other affected releases", # noqa: E501
)

possible_packages = (
Expand All @@ -484,19 +489,19 @@ def _parse_affected_release(self, cve_id: str, content) -> list[FixedIn]:

if rhsa_version:
self.logger.debug(
f"{cve_id} platform={ar_obj.platform} : found RHSA match package={pkg_name} version={rhsa_version}"
f"{cve_id} platform={ar_obj.platform} : found RHSA match package={pkg_name} version={rhsa_version}",
)
final_v = rhsa_version
final_m = rhsa_module

platform_packages[ar_obj.platform].add(
pkg_name
pkg_name,
) # add it to guessed package names to avoid repeats
ar_obj.name = pkg_name
break
else:
self.logger.debug(
f"{cve_id}, platform={ar_obj.platform} : no package name matches found using {ar_obj.rhsa_id} and other affected releases"
f"{cve_id}, platform={ar_obj.platform} : no package name matches found using {ar_obj.rhsa_id} and other affected releases", # noqa: E501
)
final_v = None
final_m = None
Expand All @@ -507,7 +512,7 @@ def _parse_affected_release(self, cve_id: str, content) -> list[FixedIn]:

if not ar_obj.name or not final_v:
self.logger.debug(
f"{cve_id}, platform={ar_obj.platform} : skipping affected release record as all attempts to deduce package name and or version were futile"
f"{cve_id}, platform={ar_obj.platform} : skipping affected release record as all attempts to deduce package name and or version were futile", # noqa: E501
)
continue

Expand All @@ -518,12 +523,12 @@ def _parse_affected_release(self, cve_id: str, content) -> list[FixedIn]:
if prev_ar_obj:
if rpm.compare_versions(prev_ar_obj.version, ar_obj.version) < 0:
self.logger.debug(
f"{cve_id}, platform={prev_ar_obj.platform}, package={prev_ar_obj.name}, module={prev_ar_obj.module} : multiple fix versions found, {ar_obj.version} > {prev_ar_obj.version}"
f"{cve_id}, platform={prev_ar_obj.platform}, package={prev_ar_obj.name}, module={prev_ar_obj.module} : multiple fix versions found, {ar_obj.version} > {prev_ar_obj.version}", # noqa: E501
)
final_ar_objs[(ar_obj.name, ar_obj.platform, ar_obj.module)] = ar_obj
else:
self.logger.debug(
f"{cve_id}, platform={prev_ar_obj.platform}, package={prev_ar_obj.name}, module={prev_ar_obj.module} : multiple fix versions found, {ar_obj.version} <= {prev_ar_obj.version}"
f"{cve_id}, platform={prev_ar_obj.platform}, package={prev_ar_obj.name}, module={prev_ar_obj.module} : multiple fix versions found, {ar_obj.version} <= {prev_ar_obj.version}", # noqa: E501
)
else:
final_ar_objs[(ar_obj.name, ar_obj.platform, ar_obj.module)] = ar_obj
Expand Down Expand Up @@ -558,12 +563,12 @@ def _parse_affected_release(self, cve_id: str, content) -> list[FixedIn]:
del platform_packages
del final_ar_objs
del all_ar_objs
except Exception: # nosec
pass
except Exception:
self.logger.info("exception freeing up intermediate data structures", exc_info=True)

return fixed_ins

def _parse_package_state(self, cve_id: str, fixed: list[FixedIn], content) -> list[FixedIn]:
def _parse_package_state(self, cve_id: str, fixed: list[FixedIn], content) -> list[FixedIn]: # noqa: C901
affected: list[FixedIn] = []
out_of_support: list[FixedIn] = [] # Track items out of support to be able to add them if others are affected
pss = content.get("package_state", [])
Expand Down Expand Up @@ -602,7 +607,7 @@ def _parse_package_state(self, cve_id: str, fixed: list[FixedIn], content) -> li
version="None",
module=module,
advisory=Advisory(wont_fix=False, rhsa_id=None, link=None, severity=None),
)
),
)
elif state in ["Will not fix"]:
affected.append(
Expand All @@ -612,7 +617,7 @@ def _parse_package_state(self, cve_id: str, fixed: list[FixedIn], content) -> li
version="None",
module=module,
advisory=Advisory(wont_fix=True, rhsa_id=None, link=None, severity=None),
)
),
)
elif state in ["Out of support scope"]:
out_of_support.append(
Expand All @@ -622,7 +627,7 @@ def _parse_package_state(self, cve_id: str, fixed: list[FixedIn], content) -> li
version="None",
module=module,
advisory=Advisory(wont_fix=True, rhsa_id=None, link=None, severity=None),
)
),
)
elif state in [
"New",
Expand All @@ -638,7 +643,7 @@ def _parse_package_state(self, cve_id: str, fixed: list[FixedIn], content) -> li

return affected + out_of_support

def _parse_cve(self, cve_id, content):
def _parse_cve(self, cve_id, content): # noqa: C901, PLR0912, PLR0915
# logger.debug('Parsing {}'.format(cve_id))

results = []
Expand Down Expand Up @@ -667,7 +672,7 @@ def _parse_cve(self, cve_id, content):
sev = "Unknown"

details = content.get("details", [])
if details and isinstance(details, list):
if details and isinstance(details, list): # noqa: SIM108
description = details[-1]
else:
description = "" # leaving this empty to be compatible with some old client side logic that expects it
Expand Down Expand Up @@ -697,7 +702,7 @@ def _parse_cve(self, cve_id, content):
item.module,
) in platform_package_module_tuples:
self.logger.debug(
f"{cve_id}, platform={item.platform}, package={item.package}, module={item.module} : partial fix found but package is still vulnerable. Ignoring fix version {item.version}"
f"{cve_id}, platform={item.platform}, package={item.package}, module={item.module} : partial fix found but package is still vulnerable. Ignoring fix version {item.version}", # noqa: E501
)
continue

Expand Down Expand Up @@ -735,7 +740,7 @@ def _parse_cve(self, cve_id, content):
{
"ID": artifact.advisory.rhsa_id,
"Link": artifact.advisory.link,
}
},
)

v["Vulnerability"]["FixedIn"].append(
Expand All @@ -746,7 +751,7 @@ def _parse_cve(self, cve_id, content):
"VersionFormat": "rpm", # hard code version format for now
"NamespaceName": ns,
"VendorAdvisory": a,
}
},
)

results.append(NamespacePayload(namespace=ns, payload=v))
Expand Down Expand Up @@ -796,7 +801,7 @@ def get(self, skip_if_exists=False):


class AffectedRelease:
def __init__(self, name=None, version=None, platform=None, rhsa_id=None, module=None):
def __init__(self, name=None, version=None, platform=None, rhsa_id=None, module=None): # noqa: PLR0913
self.name = name
self.version = version
self.platform = platform
Expand Down

0 comments on commit 536c233

Please sign in to comment.