Skip to content

Commit

Permalink
Merge pull request #25 from aiven/will-strip-metadata
Browse files Browse the repository at this point in the history
Support stripping metadata from updates.
  • Loading branch information
lionbee authored Aug 16, 2022
2 parents 48fe87f + 40654a1 commit 18c5aa9
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 35 deletions.
1 change: 1 addition & 0 deletions rpm_s3_mirror.spec
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Requires: python3-dateutil
Requires: python3-boto3
Requires: python3-lxml
Requires: systemd
Requires: zchunk

%undefine _missing_build_ids_terminate_build

Expand Down
4 changes: 3 additions & 1 deletion rpm_s3_mirror/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
DEFAULTS = {
"scratch_dir": "/var/tmp/",
"max_workers": 4,
"trim_updates_to_arches": [],
}


Expand All @@ -31,6 +32,7 @@ class Config:
scratch_dir = None
max_workers = None
upstream_repositories = None
trim_updates_to_arches = None
_config = DEFAULTS

def __init__(self):
Expand Down Expand Up @@ -63,7 +65,7 @@ def _populate_required(self):
raise ConfigError(f"Missing required environment variable: {key.upper()}")
else:
continue
elif key == "upstream_repositories":
elif key in ("upstream_repositories", "trim_updates_to_arches"):
value = value.split(",")
elif key == "max_workers":
value = int(value)
Expand Down
32 changes: 27 additions & 5 deletions rpm_s3_mirror/mirror.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# Copyright (c) 2020 Aiven, Helsinki, Finland. https://aiven.io/

import logging
import os
import re
from contextlib import suppress
from os.path import join
from os.path import join, basename
from tempfile import TemporaryDirectory

import time
Expand Down Expand Up @@ -132,10 +133,31 @@ def _sync_repository(self, upstream_repository: RPMRepository):
manifest_path = join(manifest_location, "manifest.json")
self.s3.put_manifest(location=manifest_path, manifest=manifest)

# Finally, overwrite the repomd.xml file to make our changes live
# Copy from the cached file in /var/tmp, because sync_packages may take a bit of time, and repomd.xml may
# change in upstream.
self.s3.overwrite_repomd(repomd_xml_local_path=cache_xml_path, base_url=upstream_repository.base_url)
# The metadata can be quite large and cause dnf to use excessive memory processing
# the updates. As an optimization, support dropping architectures that we know
# we don't need.
if self.config.trim_updates_to_arches:
self.log.info("Trimming metadata to: %s", self.config.trim_updates_to_arches)
metadata_scratch = os.path.join(temp_dir, "metadata")
os.makedirs(metadata_scratch, exist_ok=True)
with open(cache_xml_path, "rb") as f:
repodata_files = upstream_repository.strip_metadata(
xml_bytes=f.read(),
target_arches=self.config.trim_updates_to_arches,
scratch_dir=metadata_scratch,
)
base_path = urlparse(upstream_repository.base_url).path[1:] # need to strip the leading slash
for file_path in repodata_files.upload_files:
dest_path = join(base_path, "repodata", basename(file_path))
self.log.info("Uploading: %s -> %s", file_path, dest_path)
self.s3.put_object(
local_path=file_path,
key=dest_path,
)
else:
# Overwrite the repomd.xml file from upstream to make our changes live.
self.log.info("Overwriting repomd.xml")
self.s3.overwrite_repomd(repomd_xml_local_path=cache_xml_path, base_url=upstream_repository.base_url)
self.log.info("Updated mirror with %s packages", len(new_packages))
self.stats.gauge(
metric="s3_mirror_sync_seconds",
Expand Down
196 changes: 168 additions & 28 deletions rpm_s3_mirror/repository.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
# Copyright (c) 2020 Aiven, Helsinki, Finland. https://aiven.io/

import dataclasses
import lzma
import re
import subprocess
from abc import abstractmethod
from collections import namedtuple
from datetime import datetime
from typing import Iterator, Dict
from typing import Iterator, Dict, Optional, Tuple
from urllib.parse import urlparse
from xml.etree.ElementTree import ElementTree

from lxml.etree import fromstring, Element, tostring # pylint: disable=no-name-in-module
from lxml.etree import XMLParser # pylint: disable=no-name-in-module
Expand Down Expand Up @@ -126,20 +131,128 @@ def __iter__(self) -> Iterator[Package]:
"checksum",
],
)
SnapshotPrimary = namedtuple(
"SnapshotPrimary",
[
"open_checksum",
"checksum",
"checksum_type",
"size",
"open_size",
"local_path",
"location",
],
)

Snapshot = namedtuple("Snapshot", ["sync_files", "upload_files"])
RepoDataFiles = namedtuple("RepoDataFiles", ["sync_files", "upload_files"])


@dataclasses.dataclass
class SectionMetadata:
size: int
open_size: int
open_checksum: str
checksum: str
local_path: str
location: str
checksum_type: str = "sha256"
header_checksum: Optional[str] = None
header_size: Optional[int] = None


class UpdateInfoSection:
def __init__(self, path: str, scratch_dir):
self.path = path
self.scratch_dir = scratch_dir

@classmethod
def from_path(cls, path: str, scratch_dir):
if path.endswith(".zck"):
return ZCKUpdateInfoSection(path, scratch_dir)
elif path.endswith(".xz"):
return XZUpdateInfoSection(path, scratch_dir)
else:
raise ValueError("Only xz and zck files supported")

@abstractmethod
def _read(self) -> bytes:
pass

@abstractmethod
def _compress(self, root, open_size, open_checksum):
pass

def strip_to_arches(self, arches):
xml_bytes = self._read()
root = safe_parse_xml(xml_bytes)
self._strip(root, arches)
open_size = len(xml_bytes)
open_checksum = sha256(xml_bytes)
return self._compress(root, open_size, open_checksum)

def _strip(self, root, arches):
for update_element in root:
for collection in update_element.find("pkglist"):
for package in collection.getchildren():
arch = package.get("arch")
if arch is not None and arch not in arches:
collection.remove(package)


class XZUpdateInfoSection(UpdateInfoSection):
def _read(self) -> bytes:
with lzma.open(self.path, mode="rb") as f:
return f.read()

def _compress(self, root, open_size, open_checksum):
compressed_xml = lzma.compress(tostring(root, encoding="utf-8"))
compressed_sha256 = sha256(compressed_xml)
compressed_size = len(compressed_xml)

local_path = os.path.join(self.scratch_dir, f"{compressed_sha256}-updateinfo.xml.xz")
with open(local_path, "wb+") as out:
out.write(compressed_xml)
return SectionMetadata(
open_checksum=open_checksum,
checksum=compressed_sha256,
checksum_type="sha256",
size=compressed_size,
open_size=open_size,
local_path=local_path,
location=f"repodata/{basename(local_path)}",
)


class ZCKUpdateInfoSection(UpdateInfoSection):
def _read(self):
return subprocess.check_output(["unzck", self.path, "--stdout"])

def _compress(self, root, open_size, open_checksum):
stripped_path = os.path.join(self.scratch_dir, "stripped.xml")
ElementTree(root).write(stripped_path)

# Now compress and take compressed checksum,size.
compressed_stripped_path = os.path.join(self.scratch_dir, "stripped.xml.zck")
subprocess.check_call(["zck", stripped_path, "-o", compressed_stripped_path])
sha256_compressed_out = subprocess.check_output(["sha256sum", compressed_stripped_path], text=True)
checksum = sha256_compressed_out.split()[0]
size = os.path.getsize(compressed_stripped_path)

# We also need some ZChunk specific metadata.
header_out = subprocess.check_output(["zck_read_header", compressed_stripped_path], text=True)
header_checksum, header_size = self._parse_zck_read_header(output=header_out)
final_path = os.path.join(self.scratch_dir, f"{checksum}-updateinfo.xml.zck")

# Rename it in the same format as the other sections.
os.rename(compressed_stripped_path, final_path)

return SectionMetadata(
size=size,
open_size=open_size,
header_size=header_size,
header_checksum=header_checksum,
open_checksum=open_checksum,
checksum=checksum,
local_path=final_path,
location=f"repodata/{os.path.basename(final_path)}",
)

def _parse_zck_read_header(self, output):
checksum_match = re.search("Header checksum: (?P<checksum>.*$)", output, flags=re.MULTILINE)
if not checksum_match:
raise ValueError(f"Failed to locate checksum in output: {output}")
size_match = re.search("Header size:(?P<size>.*$)", output, flags=re.MULTILINE)
if not size_match:
raise ValueError(f"Failed to locate size in output: {output}")
return checksum_match.groupdict()["checksum"], int(size_match.groupdict()["size"])


class RPMRepository:
Expand Down Expand Up @@ -176,17 +289,45 @@ def exists(self):
return True
return False

def get_repodata(self):
response = self._req(self.session.get, "repodata/repomd.xml")
repodata = self.parse_repomd(xml=safe_parse_xml(response.content))
def get_repodata(self, xml_bytes=None):
if xml_bytes is None:
xml_bytes = self._req(self.session.get, "repodata/repomd.xml").content
repodata = self.parse_repomd(xml=safe_parse_xml(xml_bytes))
return repodata

def strip_metadata(
self,
xml_bytes: bytes,
target_arches: Tuple[str],
scratch_dir: str,
):
sync_files, upload_files = [], []
repomd_xml = safe_parse_xml(xml_bytes)
repodata = self.parse_repomd(xml=repomd_xml)
for key, section in repodata.items():
if key.startswith("updateinfo"):
with self._req(self.session.get, path=section.location, stream=True) as request:
local_path = download_repodata_section(section, request, destination_dir=scratch_dir)
update_section = UpdateInfoSection.from_path(path=local_path, scratch_dir=scratch_dir)
rewritten_section = update_section.strip_to_arches(arches=target_arches)
self._rewrite_repomd(repomd_xml=repomd_xml, snapshot=rewritten_section, section_name=key)
upload_files.append(rewritten_section.local_path)
repomd_xml_path = join(scratch_dir, "repomd.xml")
with open(repomd_xml_path, "wb+") as out:
out.write(tostring(repomd_xml, encoding="utf-8"))
upload_files.append(repomd_xml_path)

return RepoDataFiles(
sync_files=sync_files,
upload_files=upload_files,
)

def create_snapshot(self, scratch_dir):
response = self._req(self.session.get, "repodata/repomd.xml")
repomd_xml = safe_parse_xml(response.content)
repodata = self.parse_repomd(xml=repomd_xml)
snapshot_primary = self._rewrite_primary(temp_dir=scratch_dir, primary=repodata["primary"])
self._rewrite_repomd(repomd_xml=repomd_xml, snapshot=snapshot_primary)
self._rewrite_repomd(repomd_xml=repomd_xml, snapshot=snapshot_primary, section_name="primary")
repomd_xml_path = join(scratch_dir, "repomd.xml")
with open(repomd_xml_path, "wb+") as out:
out.write(tostring(repomd_xml, encoding="utf-8"))
Expand All @@ -199,7 +340,7 @@ def create_snapshot(self, scratch_dir):
or section.location.endswith("modules.yaml.gz")
):
sync_files.append(urlparse(join(self.base_url, section.location)).path)
return Snapshot(
return RepoDataFiles(
sync_files=sync_files,
upload_files=[repomd_xml_path, snapshot_primary.local_path],
)
Expand Down Expand Up @@ -230,7 +371,7 @@ def _rewrite_primary(self, temp_dir, primary: RepodataSection):
with open(local_path, "wb+") as out:
out.write(compressed_xml)

return SnapshotPrimary(
return SectionMetadata(
open_checksum=open_checksum,
checksum=compressed_sha256,
checksum_type="sha256",
Expand All @@ -240,14 +381,9 @@ def _rewrite_primary(self, temp_dir, primary: RepodataSection):
location=f"repodata/{basename(local_path)}",
)

def _rewrite_repomd(self, repomd_xml: Element, snapshot: SnapshotPrimary):
for element in repomd_xml.findall("repo:*", namespaces=namespaces):
# We only support *.xml.gz files currently
if element.attrib.get("type", None) not in {"primary", "filelists", "other", "modules", "updateinfo"}:
repomd_xml.remove(element)

def _rewrite_repomd(self, repomd_xml: Element, snapshot: SectionMetadata, section_name: str):
# Rewrite the XML with correct metadata for our changed primary.xml
for element in repomd_xml.find("repo:data[@type='primary']", namespaces=namespaces):
for element in repomd_xml.find(f"repo:data[@type='{section_name}']", namespaces=namespaces):
_, _, key = element.tag.partition("}")
if key == "checksum":
element.text = snapshot.checksum
Expand All @@ -259,6 +395,10 @@ def _rewrite_repomd(self, repomd_xml: Element, snapshot: SnapshotPrimary):
element.text = str(snapshot.size)
elif key == "open-size":
element.text = str(snapshot.open_size)
elif key == "header-size" and snapshot.header_size is not None:
element.text = str(snapshot.header_size)
elif key == "header-checksum" and snapshot.header_checksum is not None:
element.text = snapshot.header_checksum

def _extract_package_list(self, primary: RepodataSection) -> PackageList:
with self._req(self.session.get, path=primary.location, stream=True) as request:
Expand Down
1 change: 0 additions & 1 deletion rpm_s3_mirror/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ def sync_packages(
def overwrite_repomd(self, repomd_xml_local_path, base_url):
url = f"{base_url}repodata/repomd.xml"
remote_path = urlparse(url).path
self.log.info("Overwriting repomd.xml")
self.put_object(repomd_xml_local_path, remote_path, cache_age=0)

def archive_repomd(self, base_url, location):
Expand Down

0 comments on commit 18c5aa9

Please sign in to comment.