Skip to content

Commit

Permalink
Merge pull request #24 from aiven/aiqin-s3-mirror
Browse files Browse the repository at this point in the history
Cache repomd.xml and use cache when overwrite the repo
  • Loading branch information
facetoe authored Jul 8, 2022
2 parents 98a53be + 91f8f61 commit 48fe87f
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 62 deletions.
116 changes: 63 additions & 53 deletions rpm_s3_mirror/mirror.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from collections import namedtuple, defaultdict
from urllib.parse import urlparse

from rpm_s3_mirror.repository import RPMRepository, safe_parse_xml
from rpm_s3_mirror.repository import RPMRepository, safe_parse_xml, Metadata
from rpm_s3_mirror.s3 import S3, S3DirectoryNotFound
from rpm_s3_mirror.statsd import StatsClient
from rpm_s3_mirror.util import (
Expand Down Expand Up @@ -40,6 +40,7 @@ def __init__(self, config):
self.session = get_requests_session()
self.log = logging.getLogger(type(self).__name__)
self.stats = StatsClient()
# S3 bucket
self.s3 = S3(
aws_secret_access_key=self.config.aws_secret_access_key,
aws_access_key_id=self.config.aws_access_key_id,
Expand All @@ -49,6 +50,7 @@ def __init__(self, config):
max_workers=self.config.max_workers,
scratch_dir=self.config.scratch_dir,
)
# upstream repos
self.repositories = [RPMRepository(base_url=url) for url in config.upstream_repositories]

def sync(self):
Expand All @@ -69,63 +71,71 @@ def sync(self):
self.stats.gauge(metric="s3_mirror_sync_seconds_total", value=time.monotonic() - start)
return sync_success

def _sync_repository(self, upstream_repository):
def _sync_repository(self, upstream_repository: RPMRepository):
mirror_start = time.monotonic()
update_time = now()
upstream_metadata = upstream_repository.parse_metadata()
upstream_metadata: Metadata = upstream_repository.parse_metadata()
# Cache the repomd.xml when 1st reading from upstream repo.
with TemporaryDirectory(prefix=self.config.scratch_dir) as temp_dir:
base_url = upstream_repository.base_url
url = f"{base_url}repodata/repomd.xml"
cache_xml_path = download_file(temp_dir=temp_dir, url=url, session=self.session)
self.log.info("Cache repomd.xml to local path: %s", cache_xml_path)
# Our S3 mirror. Hostname based on S3 bucket region & name. Path based on upstream path.
mirror_repository: RPMRepository = RPMRepository(base_url=self._build_s3_url(upstream_repository.base_url))
bootstrap = not mirror_repository.exists()
if bootstrap:
self.log.info("Bootstrapping repository: %s", upstream_repository.base_url)
new_packages = upstream_metadata.package_list
else:
self.log.info("Syncing repository: %s", upstream_repository.base_url)
# If the upstream repomd.xml file was updated after the last time we updated our
# mirror repomd.xml file then there is probably some work to do.
last_check_time = self.s3.repomd_update_time(base_url=mirror_repository.base_url)
if not upstream_repository.has_updates(since=last_check_time):
self.log.info("Skipping repository with no updates since: %s", last_check_time)
self.stats.gauge(
metric="s3_mirror_sync_seconds",
value=time.monotonic() - mirror_start,
tags={"repo": upstream_repository.path},
)
return

# Extract our metadata and detect any new/updated packages.
mirror_metadata = mirror_repository.parse_metadata()
new_packages = set(upstream_metadata.package_list).difference(set(mirror_metadata.package_list))

# Sync our mirror with upstream.
self.s3.sync_packages(
base_url=upstream_metadata.base_url,
upstream_repodata=upstream_metadata.repodata,
upstream_packages=new_packages,
# If we are bootstrapping the s3 repo, it is worth checking if the package already exists as if the
# process is interrupted halfway through we would have to do a lot of potentially useless work. Once
# we have completed bootstrapping and are just running a sync we don't benefit from checking as it
# slows things down for no good reason (we expect the packages to be there already and if not
# it is a bug of some kind).
skip_existing=bootstrap,
)

mirror_repository = RPMRepository(base_url=self._build_s3_url(upstream_repository.base_url))
bootstrap = not mirror_repository.exists()
if bootstrap:
self.log.info("Bootstrapping repository: %s", upstream_repository.base_url)
new_packages = upstream_metadata.package_list
else:
self.log.info("Syncing repository: %s", upstream_repository.base_url)
# If the upstream repomd.xml file was updated after the last time we updated our
# mirror repomd.xml file then there is probably some work to do.
last_check_time = self.s3.repomd_update_time(base_url=mirror_repository.base_url)
if not upstream_repository.has_updates(since=last_check_time):
self.log.info("Skipping repository with no updates since: %s", last_check_time)
self.stats.gauge(
metric="s3_mirror_sync_seconds",
value=time.monotonic() - mirror_start,
tags={"repo": upstream_repository.path},
# If we are not bootstrapping, store a manifest that describes the changes synced in this run
if not bootstrap:
manifest_location = self._build_manifest_location(base_url=upstream_repository.base_url)
repomd_path = join(manifest_location, "repomd.xml")
self.s3.archive_repomd(base_url=upstream_repository.base_url, location=repomd_path)
manifest = Manifest(
update_time=update_time,
upstream_repository=upstream_repository.base_url,
previous_repomd=repomd_path,
synced_packages=[package.to_dict() for package in new_packages],
)
return

# Extract our metadata and detect any new/updated packages.
mirror_metadata = mirror_repository.parse_metadata()
new_packages = set(upstream_metadata.package_list).difference(set(mirror_metadata.package_list))

# Sync our mirror with upstream.
self.s3.sync_packages(
base_url=upstream_metadata.base_url,
upstream_repodata=upstream_metadata.repodata,
upstream_packages=new_packages,
# If we are bootstrapping the s3 repo, it is worth checking if the package already exists as if the
# process is interrupted halfway through we would have to do a lot of potentially useless work. Once
# we have completed bootstrapping and are just running a sync we don't benefit from checking as it
# slows things down for no good reason (we expect the packages to be there already and if not
# it is a bug of some kind).
skip_existing=bootstrap,
)

# If we are not bootstrapping, store a manifest that describes the changes synced in this run
if not bootstrap:
manifest_location = self._build_manifest_location(base_url=upstream_repository.base_url)
repomd_path = join(manifest_location, "repomd.xml")
self.s3.archive_repomd(base_url=upstream_repository.base_url, location=repomd_path)
manifest = Manifest(
update_time=update_time,
upstream_repository=upstream_repository.base_url,
previous_repomd=repomd_path,
synced_packages=[package.to_dict() for package in new_packages],
)
manifest_path = join(manifest_location, "manifest.json")
self.s3.put_manifest(location=manifest_path, manifest=manifest)
manifest_path = join(manifest_location, "manifest.json")
self.s3.put_manifest(location=manifest_path, manifest=manifest)

# Finally, overwrite the repomd.xml file to make our changes live
self.s3.overwrite_repomd(base_url=upstream_repository.base_url)
# Finally, overwrite the repomd.xml file to make our changes live
# Copy from the cached file in /var/tmp, because sync_packages may take a bit of time, and repomd.xml may
# change in upstream.
self.s3.overwrite_repomd(repomd_xml_local_path=cache_xml_path, base_url=upstream_repository.base_url)
self.log.info("Updated mirror with %s packages", len(new_packages))
self.stats.gauge(
metric="s3_mirror_sync_seconds",
Expand Down
4 changes: 3 additions & 1 deletion rpm_s3_mirror/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ def __iter__(self) -> Iterator[Package]:


class RPMRepository:
"""Upstream repository. This MAY NOT be a S3 bucket."""

def __init__(self, base_url: str):
if not base_url.startswith("https://"):
raise ValueError("Only https upstream repositories can be synced from")
Expand Down Expand Up @@ -238,7 +240,7 @@ def _rewrite_primary(self, temp_dir, primary: RepodataSection):
location=f"repodata/{basename(local_path)}",
)

def _rewrite_repomd(self, repomd_xml, snapshot: SnapshotPrimary):
def _rewrite_repomd(self, repomd_xml: Element, snapshot: SnapshotPrimary):
for element in repomd_xml.findall("repo:*", namespaces=namespaces):
# We only support *.xml.gz files currently
if element.attrib.get("type", None) not in {"primary", "filelists", "other", "modules", "updateinfo"}:
Expand Down
12 changes: 5 additions & 7 deletions rpm_s3_mirror/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,11 @@ def sync_packages(
)
self._sync_objects(temp_dir=temp_dir, repo_objects=upstream_repodata.values(), skip_existing=skip_existing)

def overwrite_repomd(self, base_url):
with TemporaryDirectory(prefix=self.scratch_dir) as temp_dir:
url = f"{base_url}repodata/repomd.xml"
repomd_xml = download_file(temp_dir=temp_dir, url=url, session=self.session)
path = urlparse(url).path
self.log.info("Overwriting repomd.xml")
self.put_object(repomd_xml, path, cache_age=0)
def overwrite_repomd(self, repomd_xml_local_path, base_url):
url = f"{base_url}repodata/repomd.xml"
remote_path = urlparse(url).path
self.log.info("Overwriting repomd.xml")
self.put_object(repomd_xml_local_path, remote_path, cache_age=0)

def archive_repomd(self, base_url, location):
self.log.debug("Archiving repomd.xml to %s", location)
Expand Down
3 changes: 2 additions & 1 deletion rpm_s3_mirror/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def get_requests_session() -> Session:


def download_file(temp_dir: str, url: str, session: Session = None) -> str:
"""Download a file and return the local path."""
session = session or get_requests_session()
try:
return _download_file(session, temp_dir, url)
Expand All @@ -60,7 +61,7 @@ def _escape_s3_url(url: str) -> str:
)


def _download_file(session, temp_dir, url):
def _download_file(session, temp_dir, url) -> str:
with session.get(url, stream=True) as request:
request.raise_for_status()
out_path = join(temp_dir, os.path.basename(url))
Expand Down

0 comments on commit 48fe87f

Please sign in to comment.