Skip to content

Commit

Permalink
Support stripping metadata from updates.
Browse files Browse the repository at this point in the history
Updates metadata can be quite large and cause systems with low memory to
OOM refreshing it. Add support for dropping from the metadata
architectures that are not in use in any system the mirror serves. This
is achieved in much the same way as snapshotting is implemneted: pull
metadata, rewrite, update the various values like checksum and size so
DNF is happy to consume it and add to repomd.xml.

Some refactoring was performed as much of the code used for snapshotting
is also useful here.
  • Loading branch information
facetoe committed Aug 14, 2022
1 parent 48fe87f commit 74d2cde
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 35 deletions.
1 change: 1 addition & 0 deletions rpm_s3_mirror.spec
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Requires: python3-dateutil
Requires: python3-boto3
Requires: python3-lxml
Requires: systemd
Requires: zchunk

%undefine _missing_build_ids_terminate_build

Expand Down
4 changes: 3 additions & 1 deletion rpm_s3_mirror/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
DEFAULTS = {
"scratch_dir": "/var/tmp/",
"max_workers": 4,
"trim_updates_to_arches": [],
}


Expand All @@ -31,6 +32,7 @@ class Config:
scratch_dir = None
max_workers = None
upstream_repositories = None
trim_updates_to_arches = None
_config = DEFAULTS

def __init__(self):
Expand Down Expand Up @@ -63,7 +65,7 @@ def _populate_required(self):
raise ConfigError(f"Missing required environment variable: {key.upper()}")
else:
continue
elif key == "upstream_repositories":
elif key in ("upstream_repositories", "trim_updates_to_arches"):
value = value.split(",")
elif key == "max_workers":
value = int(value)
Expand Down
32 changes: 27 additions & 5 deletions rpm_s3_mirror/mirror.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# Copyright (c) 2020 Aiven, Helsinki, Finland. https://aiven.io/

import logging
import os
import re
from contextlib import suppress
from os.path import join
from os.path import join, basename
from tempfile import TemporaryDirectory

import time
Expand Down Expand Up @@ -132,10 +133,31 @@ def _sync_repository(self, upstream_repository: RPMRepository):
manifest_path = join(manifest_location, "manifest.json")
self.s3.put_manifest(location=manifest_path, manifest=manifest)

# Finally, overwrite the repomd.xml file to make our changes live
# Copy from the cached file in /var/tmp, because sync_packages may take a bit of time, and repomd.xml may
# change in upstream.
self.s3.overwrite_repomd(repomd_xml_local_path=cache_xml_path, base_url=upstream_repository.base_url)
# The metadata can be quite large and cause dnf to use excessive memory processing
# the updates. As an optimization, support dropping architectures that we know
# we don't need.
if self.config.trim_updates_to_arches:
self.log.info("Trimming metadata to: %s", self.config.trim_updates_to_arches)
metadata_scratch = os.path.join(temp_dir, "metadata")
os.makedirs(metadata_scratch, exist_ok=True)
with open(cache_xml_path, "rb") as f:
repodata_files = upstream_repository.strip_metadata(
xml_bytes=f.read(),
target_arches=self.config.trim_updates_to_arches,
scratch_dir=metadata_scratch,
)
base_path = urlparse(upstream_repository.base_url).path[1:] # need to strip the leading slash
for file_path in repodata_files.upload_files:
dest_path = join(base_path, "repodata", basename(file_path))
self.log.info("Uploading: %s -> %s", file_path, dest_path)
self.s3.put_object(
local_path=file_path,
key=dest_path,
)
else:
# Overwrite the repomd.xml file from upstream to make our changes live.
self.log.info("Overwriting repomd.xml")
self.s3.overwrite_repomd(repomd_xml_local_path=cache_xml_path, base_url=upstream_repository.base_url)
self.log.info("Updated mirror with %s packages", len(new_packages))
self.stats.gauge(
metric="s3_mirror_sync_seconds",
Expand Down
152 changes: 124 additions & 28 deletions rpm_s3_mirror/repository.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
# Copyright (c) 2020 Aiven, Helsinki, Finland. https://aiven.io/

import dataclasses
import re
import subprocess
from collections import namedtuple
from datetime import datetime
from typing import Iterator, Dict
from typing import Iterator, Dict, Optional, Tuple
from urllib.parse import urlparse
from xml.etree.ElementTree import ElementTree

from lxml.etree import fromstring, Element, tostring # pylint: disable=no-name-in-module
from lxml.etree import XMLParser # pylint: disable=no-name-in-module
Expand Down Expand Up @@ -126,20 +129,86 @@ def __iter__(self) -> Iterator[Package]:
"checksum",
],
)
SnapshotPrimary = namedtuple(
"SnapshotPrimary",
[
"open_checksum",
"checksum",
"checksum_type",
"size",
"open_size",
"local_path",
"location",
],
)

Snapshot = namedtuple("Snapshot", ["sync_files", "upload_files"])
RepoDataFiles = namedtuple("RepoDataFiles", ["sync_files", "upload_files"])


@dataclasses.dataclass
class SectionMetadata:
size: int
open_size: int
open_checksum: str
checksum: str
local_path: str
location: str
checksum_type: str = "sha256"
header_checksum: Optional[str] = None
header_size: Optional[int] = None


class UpdateInfoSection:
def __init__(self, path: str):
# TOD: Add support for .xz metadata.
if not path.endswith(".zck"):
raise ValueError("Only .zck files are supported")
self.path = path

def strip_to_arches(self, arches, scratch_dir):
result = subprocess.run(["unzck", self.path, "--stdout"], stdout=subprocess.PIPE, check=True)
root = safe_parse_xml(result.stdout)
self._strip(root, arches)
return self._update_metadata(root, scratch_dir)

def _update_metadata(self, root, scratch_dir):
stripped_path = os.path.join(scratch_dir, "stripped.xml")
ElementTree(root).write(stripped_path)

# Take open checksum and size.
sha256_out = subprocess.check_output(["sha256sum", stripped_path], text=True)
open_checksum = sha256_out.split()[0]
open_size = os.path.getsize(stripped_path)

# Now compress and take compressed checksum,size.
compressed_stripped_path = os.path.join(scratch_dir, "stripped.xml.zck")
subprocess.check_call(["zck", stripped_path, "-o", compressed_stripped_path])
sha256_compressed_out = subprocess.check_output(["sha256sum", compressed_stripped_path], text=True)
checksum = sha256_compressed_out.split()[0]
size = os.path.getsize(compressed_stripped_path)

# We also need some ZChunk specific metadata.
header_out = subprocess.check_output(["zck_read_header", compressed_stripped_path], text=True)
header_checksum, header_size = self._parse_zck_read_header(output=header_out)
final_path = os.path.join(scratch_dir, f"{checksum}-updateinfo.xml.zck")

# Rename it in the same format as the other sections.
os.rename(compressed_stripped_path, final_path)

return SectionMetadata(
size=size,
open_size=open_size,
header_size=header_size,
header_checksum=header_checksum,
open_checksum=open_checksum,
checksum=checksum,
local_path=final_path,
location=f"repodata/{os.path.basename(final_path)}",
)

def _strip(self, root, arches):
for update_element in root:
for collection in update_element.find("pkglist"):
for package in collection.getchildren():
if package.get("arch") not in arches:
collection.remove(package)

def _parse_zck_read_header(self, output):
checksum_match = re.search("Header checksum: (?P<checksum>.*$)", output, flags=re.MULTILINE)
if not checksum_match:
raise ValueError(f"Failed to locate checksum in output: {output}")
size_match = re.search("Header size:(?P<size>.*$)", output, flags=re.MULTILINE)
if not size_match:
raise ValueError(f"Failed to locate size in output: {output}")
return checksum_match.groupdict()["checksum"], int(size_match.groupdict()["size"])


class RPMRepository:
Expand Down Expand Up @@ -176,17 +245,45 @@ def exists(self):
return True
return False

def get_repodata(self):
response = self._req(self.session.get, "repodata/repomd.xml")
repodata = self.parse_repomd(xml=safe_parse_xml(response.content))
def get_repodata(self, xml_bytes=None):
if xml_bytes is None:
xml_bytes = self._req(self.session.get, "repodata/repomd.xml").content
repodata = self.parse_repomd(xml=safe_parse_xml(xml_bytes))
return repodata

def strip_metadata(
self,
xml_bytes: bytes,
target_arches: Tuple[str],
scratch_dir: str,
):
sync_files, upload_files = [], []
repomd_xml = safe_parse_xml(xml_bytes)
repodata = self.parse_repomd(xml=repomd_xml)
for key, section in repodata.items():
if key.startswith("updateinfo_zck"):
with self._req(self.session.get, path=section.location, stream=True) as request:
local_path = download_repodata_section(section, request, destination_dir=scratch_dir)
update_section = UpdateInfoSection(path=local_path)
rewritten_section = update_section.strip_to_arches(arches=target_arches, scratch_dir=scratch_dir)
self._rewrite_repomd(repomd_xml=repomd_xml, snapshot=rewritten_section, section_name=key)
upload_files.append(rewritten_section.local_path)
repomd_xml_path = join(scratch_dir, "repomd.xml")
with open(repomd_xml_path, "wb+") as out:
out.write(tostring(repomd_xml, encoding="utf-8"))
upload_files.append(repomd_xml_path)

return RepoDataFiles(
sync_files=sync_files,
upload_files=upload_files,
)

def create_snapshot(self, scratch_dir):
response = self._req(self.session.get, "repodata/repomd.xml")
repomd_xml = safe_parse_xml(response.content)
repodata = self.parse_repomd(xml=repomd_xml)
snapshot_primary = self._rewrite_primary(temp_dir=scratch_dir, primary=repodata["primary"])
self._rewrite_repomd(repomd_xml=repomd_xml, snapshot=snapshot_primary)
self._rewrite_repomd(repomd_xml=repomd_xml, snapshot=snapshot_primary, section_name="primary")
repomd_xml_path = join(scratch_dir, "repomd.xml")
with open(repomd_xml_path, "wb+") as out:
out.write(tostring(repomd_xml, encoding="utf-8"))
Expand All @@ -199,7 +296,7 @@ def create_snapshot(self, scratch_dir):
or section.location.endswith("modules.yaml.gz")
):
sync_files.append(urlparse(join(self.base_url, section.location)).path)
return Snapshot(
return RepoDataFiles(
sync_files=sync_files,
upload_files=[repomd_xml_path, snapshot_primary.local_path],
)
Expand Down Expand Up @@ -230,7 +327,7 @@ def _rewrite_primary(self, temp_dir, primary: RepodataSection):
with open(local_path, "wb+") as out:
out.write(compressed_xml)

return SnapshotPrimary(
return SectionMetadata(
open_checksum=open_checksum,
checksum=compressed_sha256,
checksum_type="sha256",
Expand All @@ -240,14 +337,9 @@ def _rewrite_primary(self, temp_dir, primary: RepodataSection):
location=f"repodata/{basename(local_path)}",
)

def _rewrite_repomd(self, repomd_xml: Element, snapshot: SnapshotPrimary):
for element in repomd_xml.findall("repo:*", namespaces=namespaces):
# We only support *.xml.gz files currently
if element.attrib.get("type", None) not in {"primary", "filelists", "other", "modules", "updateinfo"}:
repomd_xml.remove(element)

def _rewrite_repomd(self, repomd_xml: Element, snapshot: SectionMetadata, section_name: str):
# Rewrite the XML with correct metadata for our changed primary.xml
for element in repomd_xml.find("repo:data[@type='primary']", namespaces=namespaces):
for element in repomd_xml.find(f"repo:data[@type='{section_name}']", namespaces=namespaces):
_, _, key = element.tag.partition("}")
if key == "checksum":
element.text = snapshot.checksum
Expand All @@ -259,6 +351,10 @@ def _rewrite_repomd(self, repomd_xml: Element, snapshot: SnapshotPrimary):
element.text = str(snapshot.size)
elif key == "open-size":
element.text = str(snapshot.open_size)
elif key == "header-size" and snapshot.header_size is not None:
element.text = str(snapshot.header_size)
elif key == "header-checksum" and snapshot.header_checksum is not None:
element.text = snapshot.header_checksum

def _extract_package_list(self, primary: RepodataSection) -> PackageList:
with self._req(self.session.get, path=primary.location, stream=True) as request:
Expand Down
1 change: 0 additions & 1 deletion rpm_s3_mirror/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ def sync_packages(
def overwrite_repomd(self, repomd_xml_local_path, base_url):
url = f"{base_url}repodata/repomd.xml"
remote_path = urlparse(url).path
self.log.info("Overwriting repomd.xml")
self.put_object(repomd_xml_local_path, remote_path, cache_age=0)

def archive_repomd(self, base_url, location):
Expand Down

0 comments on commit 74d2cde

Please sign in to comment.