From 881a59049e1c60d82382bb24e199b0a82527ec46 Mon Sep 17 00:00:00 2001 From: Tim DiLauro Date: Fri, 27 Sep 2024 20:02:33 -0400 Subject: [PATCH] Improve `opds2_feed_reaper` performance for large feeds. (PP-1756) (#2089) * Improve `opds2_feed_reaper` performance for large feeds. (PP-1756) --- bin/opds2_reaper_monitor | 91 ++++++++++++++++++++++++++++------------ 1 file changed, 65 insertions(+), 26 deletions(-) diff --git a/bin/opds2_reaper_monitor b/bin/opds2_reaper_monitor index 109eec29a..dfc73f8c7 100755 --- a/bin/opds2_reaper_monitor +++ b/bin/opds2_reaper_monitor @@ -1,9 +1,11 @@ #!/usr/bin/env python """Remove availability of items no longer present in OPDS 2.0 import collections.""" +import itertools import json from collections.abc import Generator from typing import Any, cast +from sqlalchemy.orm import raiseload from webpub_manifest_parser.opds2 import OPDS2FeedParserFactory from palace.manager.core.coverage import CoverageFailure @@ -103,53 +105,88 @@ class OPDS2ReaperMonitor(OPDS2ImportMonitor): :param progress: A TimestampData, ignored. """ super().run_once(progress) + feed_id_count = len(self.seen_identifiers) self.log.info( f"Feed contained {self.publication_count} publication entries, " - f"{len(self.seen_identifiers)} unique identifiers, " + f"{feed_id_count} unique identifiers, " f"{self.missing_id_count} missing identifiers." ) + # Number of ORM objects to buffer at a time. + query_batch_size = 500 + # Convert feed identifiers to our identifiers, so we can find them. # Unlike the import case, we don't want to create identifiers, if # they don't already exist. - identifiers, failures = Identifier.parse_urns( - self._db, self.seen_identifiers, autocreate=False + self.log.info( + f"Mapping {feed_id_count} feed identifiers to database identifiers." + ) + failure_total = 0 + id_looked_up_count = 0 + db_identifiers: dict[str, Identifier] = {} + + feed_id_generator = (id_ for id_ in self.seen_identifiers) + while _feed_id_batch := list( + itertools.islice(feed_id_generator, query_batch_size) + ): + _batch_size = len(_feed_id_batch) + _batch_db_ids, _batch_failures = Identifier.parse_urns( + self._db, _feed_id_batch, autocreate=False + ) + db_identifiers |= _batch_db_ids + id_looked_up_count += _batch_size + _success_count = len(_batch_db_ids) + _failure_count = len(_batch_failures) + failure_total += _failure_count + self.log.info( + f"Mapped batch of {_batch_size} feed identifier(s) to database identifier(s) " + f"(cumulative: {id_looked_up_count} of {feed_id_count} feed ids) " + f"with {_success_count} success(es) and {_failure_count} failure(s))." + ) + + self.log.info( + f"Successfully mapped {len(db_identifiers)} feed identifier(s) to database identifier(s)." ) - identifier_ids = [x.id for x in list(identifiers.values())] - if failures: + if failure_total > 0: self.log.warning( - f"Unable to parse {len(failures)} of {len(self.seen_identifiers)} identifiers." + f"Unable to parse {failure_total} of {feed_id_count} identifiers." ) collection_license_pools_qu = self._db.query(LicensePool).filter( LicensePool.collection_id == self.collection.id ) - collection_license_pools = collection_license_pools_qu.count() + collection_lp_count = collection_license_pools_qu.count() - unlimited_access_license_pools_qu = collection_license_pools_qu.filter( + eligible_license_pools_qu = collection_license_pools_qu.filter( LicensePool.licenses_available == LicensePool.UNLIMITED_ACCESS ) - unlimited_access_license_pools = unlimited_access_license_pools_qu.count() + eligible_lp_count = eligible_license_pools_qu.count() - # At this point we've gone through the feed and collected all the identifiers. - # If there's anything we didn't see, we know it's no longer available. - to_be_reaped_qu = unlimited_access_license_pools_qu.join(Identifier).filter( - ~Identifier.id.in_(identifier_ids) - ) - reap_count = to_be_reaped_qu.count() self.log.info( - f"Reaping {reap_count} of {unlimited_access_license_pools} unlimited (of {collection_license_pools} total) license pools from collection '{self.collection.name}'. " + f"{eligible_lp_count} of collection's {collection_lp_count} license pool(s) " + "are unlimited and eligible to be reaped, if missing from the feed." ) - if self.dry_run: - # TODO: Need to prevent timestamp update for dry runs. - achievements = f"Dry run: {reap_count} license pools would have been removed. Failures parsing identifiers from feed: {len(failures)}." - else: - achievements = f"License pools removed: {reap_count}. Failures parsing identifiers from feed: {len(failures)}." - for pool in to_be_reaped_qu: - pool.unlimited_access = False - self.log.info(achievements) - + reap_count = 0 + pool: LicensePool + db_identifier_ids = {x.id for x in list(db_identifiers.values())} + + # Note: We need to turn off eager loading, so that `yield_per` works safely. + # `raiseload` will let us know if we're accidentally accessing a joined table. + for pool in eligible_license_pools_qu.options(raiseload("*")).yield_per( + query_batch_size + ): + if pool.identifier_id not in db_identifier_ids: + reap_count += 1 + # Don't actually reap, unless this is explicitly NOT a dry run. + if self.dry_run is False: + pool.unlimited_access = False + + achievements = ( + f"Dry run: {reap_count} of {eligible_lp_count} eligible license pool(s) would have been marked unavailable. {failure_total} failures parsing identifiers from feed." + if self.dry_run + else f"{reap_count} of {eligible_lp_count} eligible license pool(s) marked unavailable. {failure_total} failures parsing identifiers from feed." + ) return TimestampData(achievements=achievements) @@ -211,7 +248,9 @@ class OPDS2ReaperScript(CollectionInputScript): self.log.error("No collections specified.") return - self.log.info(f"Reaping books from {len(collections)} collections.") + self.log.info( + f"Reaping books from {len(collections)} collection{'s' if len(collections) != 1 else ''}." + ) for collection in collections: self.run_monitor( collection,