From 74d2cde36bcb49de563e4d9d0d2fb823a9eee681 Mon Sep 17 00:00:00 2001 From: William Coe Date: Sun, 14 Aug 2022 19:35:28 +0800 Subject: [PATCH 1/2] Support stripping metadata from updates. Updates metadata can be quite large and cause systems with low memory to OOM refreshing it. Add support for dropping from the metadata architectures that are not in use in any system the mirror serves. This is achieved in much the same way as snapshotting is implemneted: pull metadata, rewrite, update the various values like checksum and size so DNF is happy to consume it and add to repomd.xml. Some refactoring was performed as much of the code used for snapshotting is also useful here. --- rpm_s3_mirror.spec | 1 + rpm_s3_mirror/config.py | 4 +- rpm_s3_mirror/mirror.py | 32 ++++++-- rpm_s3_mirror/repository.py | 152 +++++++++++++++++++++++++++++------- rpm_s3_mirror/s3.py | 1 - 5 files changed, 155 insertions(+), 35 deletions(-) diff --git a/rpm_s3_mirror.spec b/rpm_s3_mirror.spec index 2747253..ecee73a 100644 --- a/rpm_s3_mirror.spec +++ b/rpm_s3_mirror.spec @@ -14,6 +14,7 @@ Requires: python3-dateutil Requires: python3-boto3 Requires: python3-lxml Requires: systemd +Requires: zchunk %undefine _missing_build_ids_terminate_build diff --git a/rpm_s3_mirror/config.py b/rpm_s3_mirror/config.py index ad25310..834adbd 100644 --- a/rpm_s3_mirror/config.py +++ b/rpm_s3_mirror/config.py @@ -16,6 +16,7 @@ DEFAULTS = { "scratch_dir": "/var/tmp/", "max_workers": 4, + "trim_updates_to_arches": [], } @@ -31,6 +32,7 @@ class Config: scratch_dir = None max_workers = None upstream_repositories = None + trim_updates_to_arches = None _config = DEFAULTS def __init__(self): @@ -63,7 +65,7 @@ def _populate_required(self): raise ConfigError(f"Missing required environment variable: {key.upper()}") else: continue - elif key == "upstream_repositories": + elif key in ("upstream_repositories", "trim_updates_to_arches"): value = value.split(",") elif key == "max_workers": value = int(value) diff --git a/rpm_s3_mirror/mirror.py b/rpm_s3_mirror/mirror.py index 70a86c6..256619f 100644 --- a/rpm_s3_mirror/mirror.py +++ b/rpm_s3_mirror/mirror.py @@ -1,9 +1,10 @@ # Copyright (c) 2020 Aiven, Helsinki, Finland. https://aiven.io/ import logging +import os import re from contextlib import suppress -from os.path import join +from os.path import join, basename from tempfile import TemporaryDirectory import time @@ -132,10 +133,31 @@ def _sync_repository(self, upstream_repository: RPMRepository): manifest_path = join(manifest_location, "manifest.json") self.s3.put_manifest(location=manifest_path, manifest=manifest) - # Finally, overwrite the repomd.xml file to make our changes live - # Copy from the cached file in /var/tmp, because sync_packages may take a bit of time, and repomd.xml may - # change in upstream. - self.s3.overwrite_repomd(repomd_xml_local_path=cache_xml_path, base_url=upstream_repository.base_url) + # The metadata can be quite large and cause dnf to use excessive memory processing + # the updates. As an optimization, support dropping architectures that we know + # we don't need. + if self.config.trim_updates_to_arches: + self.log.info("Trimming metadata to: %s", self.config.trim_updates_to_arches) + metadata_scratch = os.path.join(temp_dir, "metadata") + os.makedirs(metadata_scratch, exist_ok=True) + with open(cache_xml_path, "rb") as f: + repodata_files = upstream_repository.strip_metadata( + xml_bytes=f.read(), + target_arches=self.config.trim_updates_to_arches, + scratch_dir=metadata_scratch, + ) + base_path = urlparse(upstream_repository.base_url).path[1:] # need to strip the leading slash + for file_path in repodata_files.upload_files: + dest_path = join(base_path, "repodata", basename(file_path)) + self.log.info("Uploading: %s -> %s", file_path, dest_path) + self.s3.put_object( + local_path=file_path, + key=dest_path, + ) + else: + # Overwrite the repomd.xml file from upstream to make our changes live. + self.log.info("Overwriting repomd.xml") + self.s3.overwrite_repomd(repomd_xml_local_path=cache_xml_path, base_url=upstream_repository.base_url) self.log.info("Updated mirror with %s packages", len(new_packages)) self.stats.gauge( metric="s3_mirror_sync_seconds", diff --git a/rpm_s3_mirror/repository.py b/rpm_s3_mirror/repository.py index b63469b..2a94a59 100644 --- a/rpm_s3_mirror/repository.py +++ b/rpm_s3_mirror/repository.py @@ -1,9 +1,12 @@ # Copyright (c) 2020 Aiven, Helsinki, Finland. https://aiven.io/ - +import dataclasses +import re +import subprocess from collections import namedtuple from datetime import datetime -from typing import Iterator, Dict +from typing import Iterator, Dict, Optional, Tuple from urllib.parse import urlparse +from xml.etree.ElementTree import ElementTree from lxml.etree import fromstring, Element, tostring # pylint: disable=no-name-in-module from lxml.etree import XMLParser # pylint: disable=no-name-in-module @@ -126,20 +129,86 @@ def __iter__(self) -> Iterator[Package]: "checksum", ], ) -SnapshotPrimary = namedtuple( - "SnapshotPrimary", - [ - "open_checksum", - "checksum", - "checksum_type", - "size", - "open_size", - "local_path", - "location", - ], -) -Snapshot = namedtuple("Snapshot", ["sync_files", "upload_files"]) +RepoDataFiles = namedtuple("RepoDataFiles", ["sync_files", "upload_files"]) + + +@dataclasses.dataclass +class SectionMetadata: + size: int + open_size: int + open_checksum: str + checksum: str + local_path: str + location: str + checksum_type: str = "sha256" + header_checksum: Optional[str] = None + header_size: Optional[int] = None + + +class UpdateInfoSection: + def __init__(self, path: str): + # TOD: Add support for .xz metadata. + if not path.endswith(".zck"): + raise ValueError("Only .zck files are supported") + self.path = path + + def strip_to_arches(self, arches, scratch_dir): + result = subprocess.run(["unzck", self.path, "--stdout"], stdout=subprocess.PIPE, check=True) + root = safe_parse_xml(result.stdout) + self._strip(root, arches) + return self._update_metadata(root, scratch_dir) + + def _update_metadata(self, root, scratch_dir): + stripped_path = os.path.join(scratch_dir, "stripped.xml") + ElementTree(root).write(stripped_path) + + # Take open checksum and size. + sha256_out = subprocess.check_output(["sha256sum", stripped_path], text=True) + open_checksum = sha256_out.split()[0] + open_size = os.path.getsize(stripped_path) + + # Now compress and take compressed checksum,size. + compressed_stripped_path = os.path.join(scratch_dir, "stripped.xml.zck") + subprocess.check_call(["zck", stripped_path, "-o", compressed_stripped_path]) + sha256_compressed_out = subprocess.check_output(["sha256sum", compressed_stripped_path], text=True) + checksum = sha256_compressed_out.split()[0] + size = os.path.getsize(compressed_stripped_path) + + # We also need some ZChunk specific metadata. + header_out = subprocess.check_output(["zck_read_header", compressed_stripped_path], text=True) + header_checksum, header_size = self._parse_zck_read_header(output=header_out) + final_path = os.path.join(scratch_dir, f"{checksum}-updateinfo.xml.zck") + + # Rename it in the same format as the other sections. + os.rename(compressed_stripped_path, final_path) + + return SectionMetadata( + size=size, + open_size=open_size, + header_size=header_size, + header_checksum=header_checksum, + open_checksum=open_checksum, + checksum=checksum, + local_path=final_path, + location=f"repodata/{os.path.basename(final_path)}", + ) + + def _strip(self, root, arches): + for update_element in root: + for collection in update_element.find("pkglist"): + for package in collection.getchildren(): + if package.get("arch") not in arches: + collection.remove(package) + + def _parse_zck_read_header(self, output): + checksum_match = re.search("Header checksum: (?P.*$)", output, flags=re.MULTILINE) + if not checksum_match: + raise ValueError(f"Failed to locate checksum in output: {output}") + size_match = re.search("Header size:(?P.*$)", output, flags=re.MULTILINE) + if not size_match: + raise ValueError(f"Failed to locate size in output: {output}") + return checksum_match.groupdict()["checksum"], int(size_match.groupdict()["size"]) class RPMRepository: @@ -176,17 +245,45 @@ def exists(self): return True return False - def get_repodata(self): - response = self._req(self.session.get, "repodata/repomd.xml") - repodata = self.parse_repomd(xml=safe_parse_xml(response.content)) + def get_repodata(self, xml_bytes=None): + if xml_bytes is None: + xml_bytes = self._req(self.session.get, "repodata/repomd.xml").content + repodata = self.parse_repomd(xml=safe_parse_xml(xml_bytes)) return repodata + def strip_metadata( + self, + xml_bytes: bytes, + target_arches: Tuple[str], + scratch_dir: str, + ): + sync_files, upload_files = [], [] + repomd_xml = safe_parse_xml(xml_bytes) + repodata = self.parse_repomd(xml=repomd_xml) + for key, section in repodata.items(): + if key.startswith("updateinfo_zck"): + with self._req(self.session.get, path=section.location, stream=True) as request: + local_path = download_repodata_section(section, request, destination_dir=scratch_dir) + update_section = UpdateInfoSection(path=local_path) + rewritten_section = update_section.strip_to_arches(arches=target_arches, scratch_dir=scratch_dir) + self._rewrite_repomd(repomd_xml=repomd_xml, snapshot=rewritten_section, section_name=key) + upload_files.append(rewritten_section.local_path) + repomd_xml_path = join(scratch_dir, "repomd.xml") + with open(repomd_xml_path, "wb+") as out: + out.write(tostring(repomd_xml, encoding="utf-8")) + upload_files.append(repomd_xml_path) + + return RepoDataFiles( + sync_files=sync_files, + upload_files=upload_files, + ) + def create_snapshot(self, scratch_dir): response = self._req(self.session.get, "repodata/repomd.xml") repomd_xml = safe_parse_xml(response.content) repodata = self.parse_repomd(xml=repomd_xml) snapshot_primary = self._rewrite_primary(temp_dir=scratch_dir, primary=repodata["primary"]) - self._rewrite_repomd(repomd_xml=repomd_xml, snapshot=snapshot_primary) + self._rewrite_repomd(repomd_xml=repomd_xml, snapshot=snapshot_primary, section_name="primary") repomd_xml_path = join(scratch_dir, "repomd.xml") with open(repomd_xml_path, "wb+") as out: out.write(tostring(repomd_xml, encoding="utf-8")) @@ -199,7 +296,7 @@ def create_snapshot(self, scratch_dir): or section.location.endswith("modules.yaml.gz") ): sync_files.append(urlparse(join(self.base_url, section.location)).path) - return Snapshot( + return RepoDataFiles( sync_files=sync_files, upload_files=[repomd_xml_path, snapshot_primary.local_path], ) @@ -230,7 +327,7 @@ def _rewrite_primary(self, temp_dir, primary: RepodataSection): with open(local_path, "wb+") as out: out.write(compressed_xml) - return SnapshotPrimary( + return SectionMetadata( open_checksum=open_checksum, checksum=compressed_sha256, checksum_type="sha256", @@ -240,14 +337,9 @@ def _rewrite_primary(self, temp_dir, primary: RepodataSection): location=f"repodata/{basename(local_path)}", ) - def _rewrite_repomd(self, repomd_xml: Element, snapshot: SnapshotPrimary): - for element in repomd_xml.findall("repo:*", namespaces=namespaces): - # We only support *.xml.gz files currently - if element.attrib.get("type", None) not in {"primary", "filelists", "other", "modules", "updateinfo"}: - repomd_xml.remove(element) - + def _rewrite_repomd(self, repomd_xml: Element, snapshot: SectionMetadata, section_name: str): # Rewrite the XML with correct metadata for our changed primary.xml - for element in repomd_xml.find("repo:data[@type='primary']", namespaces=namespaces): + for element in repomd_xml.find(f"repo:data[@type='{section_name}']", namespaces=namespaces): _, _, key = element.tag.partition("}") if key == "checksum": element.text = snapshot.checksum @@ -259,6 +351,10 @@ def _rewrite_repomd(self, repomd_xml: Element, snapshot: SnapshotPrimary): element.text = str(snapshot.size) elif key == "open-size": element.text = str(snapshot.open_size) + elif key == "header-size" and snapshot.header_size is not None: + element.text = str(snapshot.header_size) + elif key == "header-checksum" and snapshot.header_checksum is not None: + element.text = snapshot.header_checksum def _extract_package_list(self, primary: RepodataSection) -> PackageList: with self._req(self.session.get, path=primary.location, stream=True) as request: diff --git a/rpm_s3_mirror/s3.py b/rpm_s3_mirror/s3.py index 7eea595..ea3f6ef 100644 --- a/rpm_s3_mirror/s3.py +++ b/rpm_s3_mirror/s3.py @@ -81,7 +81,6 @@ def sync_packages( def overwrite_repomd(self, repomd_xml_local_path, base_url): url = f"{base_url}repodata/repomd.xml" remote_path = urlparse(url).path - self.log.info("Overwriting repomd.xml") self.put_object(repomd_xml_local_path, remote_path, cache_age=0) def archive_repomd(self, base_url, location): From 40654a1476e2b0b1fba775cd34a25b36d41e0585 Mon Sep 17 00:00:00 2001 From: William Coe Date: Sun, 14 Aug 2022 20:27:32 +0800 Subject: [PATCH 2/2] Add support for stripping xz metadata Follow on from previous commit and support stripping xz metadata also. --- rpm_s3_mirror/repository.py | 100 ++++++++++++++++++++++++++---------- 1 file changed, 72 insertions(+), 28 deletions(-) diff --git a/rpm_s3_mirror/repository.py b/rpm_s3_mirror/repository.py index 2a94a59..d81cbb9 100644 --- a/rpm_s3_mirror/repository.py +++ b/rpm_s3_mirror/repository.py @@ -1,7 +1,9 @@ # Copyright (c) 2020 Aiven, Helsinki, Finland. https://aiven.io/ import dataclasses +import lzma import re import subprocess +from abc import abstractmethod from collections import namedtuple from datetime import datetime from typing import Iterator, Dict, Optional, Tuple @@ -147,29 +149,78 @@ class SectionMetadata: class UpdateInfoSection: - def __init__(self, path: str): - # TOD: Add support for .xz metadata. - if not path.endswith(".zck"): - raise ValueError("Only .zck files are supported") + def __init__(self, path: str, scratch_dir): self.path = path - - def strip_to_arches(self, arches, scratch_dir): - result = subprocess.run(["unzck", self.path, "--stdout"], stdout=subprocess.PIPE, check=True) - root = safe_parse_xml(result.stdout) + self.scratch_dir = scratch_dir + + @classmethod + def from_path(cls, path: str, scratch_dir): + if path.endswith(".zck"): + return ZCKUpdateInfoSection(path, scratch_dir) + elif path.endswith(".xz"): + return XZUpdateInfoSection(path, scratch_dir) + else: + raise ValueError("Only xz and zck files supported") + + @abstractmethod + def _read(self) -> bytes: + pass + + @abstractmethod + def _compress(self, root, open_size, open_checksum): + pass + + def strip_to_arches(self, arches): + xml_bytes = self._read() + root = safe_parse_xml(xml_bytes) self._strip(root, arches) - return self._update_metadata(root, scratch_dir) + open_size = len(xml_bytes) + open_checksum = sha256(xml_bytes) + return self._compress(root, open_size, open_checksum) - def _update_metadata(self, root, scratch_dir): - stripped_path = os.path.join(scratch_dir, "stripped.xml") - ElementTree(root).write(stripped_path) + def _strip(self, root, arches): + for update_element in root: + for collection in update_element.find("pkglist"): + for package in collection.getchildren(): + arch = package.get("arch") + if arch is not None and arch not in arches: + collection.remove(package) + + +class XZUpdateInfoSection(UpdateInfoSection): + def _read(self) -> bytes: + with lzma.open(self.path, mode="rb") as f: + return f.read() + + def _compress(self, root, open_size, open_checksum): + compressed_xml = lzma.compress(tostring(root, encoding="utf-8")) + compressed_sha256 = sha256(compressed_xml) + compressed_size = len(compressed_xml) + + local_path = os.path.join(self.scratch_dir, f"{compressed_sha256}-updateinfo.xml.xz") + with open(local_path, "wb+") as out: + out.write(compressed_xml) + return SectionMetadata( + open_checksum=open_checksum, + checksum=compressed_sha256, + checksum_type="sha256", + size=compressed_size, + open_size=open_size, + local_path=local_path, + location=f"repodata/{basename(local_path)}", + ) - # Take open checksum and size. - sha256_out = subprocess.check_output(["sha256sum", stripped_path], text=True) - open_checksum = sha256_out.split()[0] - open_size = os.path.getsize(stripped_path) + +class ZCKUpdateInfoSection(UpdateInfoSection): + def _read(self): + return subprocess.check_output(["unzck", self.path, "--stdout"]) + + def _compress(self, root, open_size, open_checksum): + stripped_path = os.path.join(self.scratch_dir, "stripped.xml") + ElementTree(root).write(stripped_path) # Now compress and take compressed checksum,size. - compressed_stripped_path = os.path.join(scratch_dir, "stripped.xml.zck") + compressed_stripped_path = os.path.join(self.scratch_dir, "stripped.xml.zck") subprocess.check_call(["zck", stripped_path, "-o", compressed_stripped_path]) sha256_compressed_out = subprocess.check_output(["sha256sum", compressed_stripped_path], text=True) checksum = sha256_compressed_out.split()[0] @@ -178,7 +229,7 @@ def _update_metadata(self, root, scratch_dir): # We also need some ZChunk specific metadata. header_out = subprocess.check_output(["zck_read_header", compressed_stripped_path], text=True) header_checksum, header_size = self._parse_zck_read_header(output=header_out) - final_path = os.path.join(scratch_dir, f"{checksum}-updateinfo.xml.zck") + final_path = os.path.join(self.scratch_dir, f"{checksum}-updateinfo.xml.zck") # Rename it in the same format as the other sections. os.rename(compressed_stripped_path, final_path) @@ -194,13 +245,6 @@ def _update_metadata(self, root, scratch_dir): location=f"repodata/{os.path.basename(final_path)}", ) - def _strip(self, root, arches): - for update_element in root: - for collection in update_element.find("pkglist"): - for package in collection.getchildren(): - if package.get("arch") not in arches: - collection.remove(package) - def _parse_zck_read_header(self, output): checksum_match = re.search("Header checksum: (?P.*$)", output, flags=re.MULTILINE) if not checksum_match: @@ -261,11 +305,11 @@ def strip_metadata( repomd_xml = safe_parse_xml(xml_bytes) repodata = self.parse_repomd(xml=repomd_xml) for key, section in repodata.items(): - if key.startswith("updateinfo_zck"): + if key.startswith("updateinfo"): with self._req(self.session.get, path=section.location, stream=True) as request: local_path = download_repodata_section(section, request, destination_dir=scratch_dir) - update_section = UpdateInfoSection(path=local_path) - rewritten_section = update_section.strip_to_arches(arches=target_arches, scratch_dir=scratch_dir) + update_section = UpdateInfoSection.from_path(path=local_path, scratch_dir=scratch_dir) + rewritten_section = update_section.strip_to_arches(arches=target_arches) self._rewrite_repomd(repomd_xml=repomd_xml, snapshot=rewritten_section, section_name=key) upload_files.append(rewritten_section.local_path) repomd_xml_path = join(scratch_dir, "repomd.xml")