Skip to content

Commit

Permalink
fix: remove sha256 verification for oval parser (#505)
Browse files Browse the repository at this point in the history
fix: disable validation of checksum given PULP_MANIFEST inconsistencies

note: the PULP_MANIFEST for oval files has inconsistent inclusions of the sha256 for their verification and comparrison
---------
Signed-off-by: Christopher Phillips <[email protected]>
  • Loading branch information
spiffcs authored Mar 8, 2024
1 parent 1416dd1 commit 477011a
Showing 1 changed file with 20 additions and 97 deletions.
117 changes: 20 additions & 97 deletions src/vunnel/providers/rhel/oval_parser.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
from __future__ import annotations

import bz2
import hashlib
import logging
import os
import re

import requests

Expand Down Expand Up @@ -52,97 +50,28 @@ def urls(self) -> list[str]:
def _get_workspace_xml_filepath(self, oval_url_path: str) -> str:
return os.path.join(self.workspace.input_path, oval_url_path.rstrip(".bz2"))

def _get_sha256_map(self, base_url: str, manifest_path: str, oval_paths: list[str]) -> dict[str, str]:
@utils.retry_with_backoff()
def _download_oval_file(self, base_url: str, oval_url_path: str):
xml_file_path = self._get_workspace_xml_filepath(oval_url_path=oval_url_path)
try:
manifest_url = f"{base_url}/{manifest_path}"
self.logger.info(f"begin processing manifest from {manifest_url}")
path_to_sha = {}
r = requests.get(manifest_url, timeout=self.download_timeout)
oval_url = f"{base_url}/{oval_url_path}"
self.logger.info(f"begin downloading OVAL file from {oval_url}")
r = requests.get(oval_url, stream=True, timeout=self.download_timeout)
if r.status_code == 200:
unmatched_oval_paths = set(oval_paths.copy())

for line in r.text.splitlines():
matched_path = None
for p in unmatched_oval_paths:
pattern = re.compile(rf"{p},([^,]+),.*")
match = pattern.match(line.strip())
if match:
path_to_sha[p] = match.group(1)
matched_path = p
break

if matched_path:
unmatched_oval_paths.remove(matched_path)

if len(unmatched_oval_paths) == 0:
break

if len(unmatched_oval_paths) > 0:
error = f"no matching sha256 found for {unmatched_oval_paths}"
self.logger.error(error)
raise Exception(error)

self.logger.info(f"finish processing manifest from {manifest_url}")
return path_to_sha
# ensure any nested directories get created
os.makedirs(os.path.dirname(xml_file_path), exist_ok=True)
with open(xml_file_path, "wb") as extracted:
decompressor = bz2.BZ2Decompressor()
for chunk in r.iter_content(chunk_size=1024):
uncchunk = decompressor.decompress(chunk)
extracted.write(uncchunk)

self.logger.info(f"finish downloading OVAL file from {oval_url}")
else: # noqa: RET505
error = f"GET {manifest_url} failed with HTTP error {r.status_code}"
self.logger.error(error)
raise Exception(error)
except Exception as e:
raise Exception("Error fetching/processing sha256") from e

@utils.retry_with_backoff()
def _download_oval_file(self, base_url: str, oval_url_path: str, path_to_sha: dict[str, str]) -> str:
download = True
xml_file_path = self._get_workspace_xml_filepath(oval_url_path=oval_url_path)
xml_sha256_file_path = f"{xml_file_path}.sha256sum"
previous_sha256 = None
self.logger.info(f"comparing hashes for oval file {xml_file_path}")

if os.path.exists(xml_file_path) and os.path.exists(xml_sha256_file_path):
with open(xml_sha256_file_path) as fp:
previous_sha256 = fp.read().strip()

latest_sha256 = path_to_sha.get(oval_url_path)

if previous_sha256 and latest_sha256:
self.logger.debug(f"{oval_url_path}: previous sha256: {previous_sha256}, latest sha256: {latest_sha256}")
download = previous_sha256.lower() != latest_sha256.lower()

if download:
try:
oval_url = f"{base_url}/{oval_url_path}"
self.logger.info(f"begin downloading OVAL file from {oval_url}")
r = requests.get(oval_url, stream=True, timeout=self.download_timeout)
if r.status_code == 200:
# compute the sha256 as the file is decompressed
sha256 = hashlib.sha256()
# ensure any nested directories get created
os.makedirs(os.path.dirname(xml_file_path), exist_ok=True)
with open(xml_file_path, "wb") as extracted:
decompressor = bz2.BZ2Decompressor()
for chunk in r.iter_content(chunk_size=1024):
uncchunk = decompressor.decompress(chunk)
extracted.write(uncchunk)
sha256.update(uncchunk)

self.logger.info(f"finish downloading OVAL file from {oval_url}")
sha256sum = str(sha256.hexdigest()).lower()
self.logger.debug(f"sha256 for {xml_file_path}: {sha256sum}")

# save the sha256 to another file
with open(xml_sha256_file_path, "w") as fp:
fp.write(sha256sum)

return sha256sum
else: # noqa: RET505
raise Exception(f"GET {oval_url} failed with HTTP error {r.status_code}")
except Exception:
self.logger.exception("error downloading OVAL file")
raise Exception("error downloading OVAL file") # noqa: B904
else:
self.logger.info(f"stored checksum matches server checksum for {xml_file_path}. Skipping download")
return latest_sha256
raise Exception(f"GET {oval_url} failed with HTTP error {r.status_code}")
except Exception:
self.logger.exception("error downloading OVAL file")
raise Exception("error downloading OVAL file") # noqa: B904

@utils.retry_with_backoff()
def _download(self):
Expand All @@ -151,18 +80,12 @@ def _download(self):
manifest_path = m["manifest_path"]
oval_paths = m["oval_paths"]
skip_download = m.get("skip_download", False)
path_to_sha = None

if not skip_download:
path_to_sha = self._get_sha256_map(base_url, manifest_path, oval_paths)

self._urls.add(f"{base_url}/{manifest_path}")

for p in oval_paths:
self._urls.add(f"{base_url}/{p}")

if not skip_download:
self._download_oval_file(base_url, p, path_to_sha)
self._download_oval_file(base_url, p)

def xml_paths(self):
paths = []
Expand Down

0 comments on commit 477011a

Please sign in to comment.