From 1607e2fcff12345328a02791af63555913abc1eb Mon Sep 17 00:00:00 2001 From: Daniel Bernstein Date: Tue, 15 Oct 2024 13:55:58 -0700 Subject: [PATCH 01/18] [PP-1358] Add hold expired, loan expired, hold converted, and loan revoked events. --- src/palace/manager/api/circulation.py | 9 ++++ src/palace/manager/api/circulation_manager.py | 6 +-- src/palace/manager/api/monitor.py | 28 ++++++++++++- src/palace/manager/celery/tasks/opds_odl.py | 42 +++++++++++++++++-- src/palace/manager/core/monitor.py | 7 ++++ .../sqlalchemy/model/circulationevent.py | 5 +++ 6 files changed, 90 insertions(+), 7 deletions(-) diff --git a/src/palace/manager/api/circulation.py b/src/palace/manager/api/circulation.py index 11d020d49..cc060cf55 100644 --- a/src/palace/manager/api/circulation.py +++ b/src/palace/manager/api/circulation.py @@ -1143,6 +1143,14 @@ def borrow( ) if existing_hold: # The book was on hold, and now we have a loan. + # collect cm event to commemorate the conversion: + self._collect_event( + patron=patron, + licensepool=licensepool, + name=CirculationEvent.CM_HOLD_CONVERTED_TO_LOAN, + include_neighborhood=True, + ) + # Delete the record of the hold. self._db.delete(existing_hold) __transaction.commit() @@ -1197,6 +1205,7 @@ def borrow( self._collect_event(patron, licensepool, CirculationEvent.CM_HOLD_PLACE) if existing_loan: + self._collect_event(patron, licensepool, CirculationEvent.CM_LOAN_REVOKED) self._db.delete(existing_loan) __transaction.commit() return None, hold, is_new diff --git a/src/palace/manager/api/circulation_manager.py b/src/palace/manager/api/circulation_manager.py index 94130e20d..c7f8d1928 100644 --- a/src/palace/manager/api/circulation_manager.py +++ b/src/palace/manager/api/circulation_manager.py @@ -197,9 +197,9 @@ def reload_settings_if_changed(self): CirculationManager's configuration from the database. """ last_update = Configuration.site_configuration_last_update(self._db) - if last_update > self.site_configuration_last_update: - self.load_settings() - self.site_configuration_last_update = last_update + # if last_update > self.site_configuration_last_update: + self.load_settings() + self.site_configuration_last_update = last_update def get_patron_web_domains(self) -> set[str]: """Return the set of patron web client domains.""" diff --git a/src/palace/manager/api/monitor.py b/src/palace/manager/api/monitor.py index 073e2cf49..ec2a26c71 100644 --- a/src/palace/manager/api/monitor.py +++ b/src/palace/manager/api/monitor.py @@ -1,11 +1,19 @@ +from collections.abc import Callable + from sqlalchemy import and_, or_ from palace.manager.api.opds_for_distributors import OPDSForDistributorsAPI from palace.manager.core.monitor import ReaperMonitor +from palace.manager.sqlalchemy.model.circulationevent import CirculationEvent from palace.manager.sqlalchemy.model.collection import Collection from palace.manager.sqlalchemy.model.integration import IntegrationConfiguration from palace.manager.sqlalchemy.model.licensing import LicensePool -from palace.manager.sqlalchemy.model.patron import Annotation, Hold, Loan +from palace.manager.sqlalchemy.model.patron import ( + Annotation, + Hold, + Loan, + LoanAndHoldMixin, +) from palace.manager.util.datetime_helpers import utc_now @@ -44,6 +52,24 @@ def where_clause(self): ) return ~self.MODEL_CLASS.id.in_(source_of_truth_subquery) + def post_delete_op(self, row) -> Callable: + loan_like: LoanAndHoldMixin = row + + def post_delete(): + ce = CirculationEvent + event_type = ( + ce.CM_HOLD_EXPIRED + if isinstance(loan_like, Hold) + else ce.CM_LOAN_EXPIRED + ) + self.analytics.collect_event( + library=loan_like.library, + license_pool=loan_like.license_pool, + event_type=event_type, + ) + + return post_delete + class LoanReaper(LoanlikeReaperMonitor): """Remove expired and abandoned loans from the database.""" diff --git a/src/palace/manager/celery/tasks/opds_odl.py b/src/palace/manager/celery/tasks/opds_odl.py index f51931b8f..325631901 100644 --- a/src/palace/manager/celery/tasks/opds_odl.py +++ b/src/palace/manager/celery/tasks/opds_odl.py @@ -7,19 +7,41 @@ from palace.manager.api.odl.api import OPDS2WithODLApi from palace.manager.celery.task import Task +from palace.manager.service.analytics.analytics import Analytics from palace.manager.service.celery.celery import QueueNames from palace.manager.service.redis.models.lock import RedisLock from palace.manager.service.redis.redis import Redis +from palace.manager.sqlalchemy.model.circulationevent import CirculationEvent from palace.manager.sqlalchemy.model.collection import Collection from palace.manager.sqlalchemy.model.licensing import License, LicensePool from palace.manager.sqlalchemy.model.patron import Hold from palace.manager.util.datetime_helpers import utc_now -def remove_expired_holds_for_collection(db: Session, collection_id: int) -> int: +def remove_expired_holds_for_collection( + db: Session, collection_id: int, analytics: Analytics +) -> int: """ Remove expired holds from the database for this collection. """ + + # generate expiration events for expired holds before deleting them + select_query = select(Hold).where( + Hold.position == 0, + Hold.end < utc_now(), + Hold.license_pool_id == LicensePool.id, + LicensePool.collection_id == collection_id, + ) + + expired_holds = db.scalars(select_query).all() + for hold in expired_holds: + analytics.collect_event( + library=hold.library, + license_pool=hold.license_pool, + event_type=CirculationEvent.CM_HOLD_EXPIRED, + patron=hold.patron, + ) + # delete the holds query = ( delete(Hold) .where( @@ -74,6 +96,7 @@ def lock_licenses(license_pool: LicensePool) -> None: def recalculate_holds_for_licensepool( license_pool: LicensePool, reservation_period: datetime.timedelta, + analytics: Analytics, ) -> int: # We take out row level locks on all the licenses and holds for this license pool, so that # everything is in a consistent state while we update the hold queue. This means we should be @@ -96,6 +119,12 @@ def recalculate_holds_for_licensepool( hold.position = 0 hold.end = utc_now() + reservation_period updated += 1 + analytics.collect_event( + library=hold.library, + license_pool=hold.license_pool, + event_type=CirculationEvent.CM_HOLD_READY_FOR_CHECKOUT, + patron=hold.patron, + ) # Update the position for the remaining holds. for idx, hold in enumerate(waiting): @@ -115,6 +144,7 @@ def remove_expired_holds(task: Task) -> None: """ registry = task.services.integration_registry.license_providers() protocols = registry.get_protocols(OPDS2WithODLApi, default=False) + analytics = task.services.analytics.analytics() with task.session() as session: collections = [ (collection.id, collection.name) @@ -123,7 +153,9 @@ def remove_expired_holds(task: Task) -> None: ] for collection_id, collection_name in collections: with task.transaction() as session: - removed = remove_expired_holds_for_collection(session, collection_id) + removed = remove_expired_holds_for_collection( + session, collection_id, analytics + ) task.log.info( f"Removed {removed} expired holds for collection {collection_name} ({collection_id})." ) @@ -200,8 +232,12 @@ def recalculate_hold_queue_collection( f"Skipping license pool {license_pool_id} because it no longer exists." ) continue + + analytics = task.services.analytics.analytics() updated = recalculate_holds_for_licensepool( - license_pool, reservation_period + license_pool, + reservation_period, + analytics, ) edition = license_pool.presentation_edition title = edition.title if edition else None diff --git a/src/palace/manager/core/monitor.py b/src/palace/manager/core/monitor.py index a9022239b..65420529a 100644 --- a/src/palace/manager/core/monitor.py +++ b/src/palace/manager/core/monitor.py @@ -880,11 +880,15 @@ def run_once(self, *args, **kwargs): count = qu.count() self.log.info("Deleting %d row(s)", count) while count > 0: + post_delete_ops = [] for i in qu.limit(self.BATCH_SIZE): self.log.info("Deleting %r", i) + post_delete_ops = self.post_delete_op(i) self.delete(i) rows_deleted += 1 self._db.commit() + for op in post_delete_ops: + op() count = qu.count() return TimestampData(achievements="Items deleted: %d" % rows_deleted) @@ -897,6 +901,9 @@ def delete(self, row): """ self._db.delete(row) + def post_delete_op(self, row: MODEL_CLASS): + pass + def query(self): return self._db.query(self.MODEL_CLASS).filter(self.where_clause) diff --git a/src/palace/manager/sqlalchemy/model/circulationevent.py b/src/palace/manager/sqlalchemy/model/circulationevent.py index 91b9d5eed..4433893c2 100644 --- a/src/palace/manager/sqlalchemy/model/circulationevent.py +++ b/src/palace/manager/sqlalchemy/model/circulationevent.py @@ -99,6 +99,11 @@ class CirculationEvent(Base): CM_CHECKIN = "circulation_manager_check_in" CM_HOLD_PLACE = "circulation_manager_hold_place" CM_HOLD_RELEASE = "circulation_manager_hold_release" + CM_HOLD_EXPIRED = "circulation_manager_hold_expired" + CM_HOLD_READY_FOR_CHECKOUT = "circulation_manager_hold_ready" + CM_LOAN_EXPIRED = "circulation_manager_loan_expired" + CM_HOLD_CONVERTED_TO_LOAN = "circulation_manager_hold_converted_to_loan" + CM_LOAN_REVOKED = "circulation_manager_loan_revoked" CM_FULFILL = "circulation_manager_fulfill" # Events that we hear about from a distributor. From ad93d7fe56849dd8ec46ab987e6c42513dcbc44b Mon Sep 17 00:00:00 2001 From: Daniel Bernstein Date: Fri, 15 Nov 2024 22:41:08 -0800 Subject: [PATCH 02/18] Fix tests. --- src/palace/manager/api/monitor.py | 23 +++++------- src/palace/manager/core/monitor.py | 12 ++++-- tests/manager/celery/tasks/test_opds_odl.py | 41 +++++++++++++++++---- 3 files changed, 52 insertions(+), 24 deletions(-) diff --git a/src/palace/manager/api/monitor.py b/src/palace/manager/api/monitor.py index ec2a26c71..bdf701481 100644 --- a/src/palace/manager/api/monitor.py +++ b/src/palace/manager/api/monitor.py @@ -8,12 +8,7 @@ from palace.manager.sqlalchemy.model.collection import Collection from palace.manager.sqlalchemy.model.integration import IntegrationConfiguration from palace.manager.sqlalchemy.model.licensing import LicensePool -from palace.manager.sqlalchemy.model.patron import ( - Annotation, - Hold, - Loan, - LoanAndHoldMixin, -) +from palace.manager.sqlalchemy.model.patron import Annotation, Hold, Loan from palace.manager.util.datetime_helpers import utc_now @@ -52,20 +47,20 @@ def where_clause(self): ) return ~self.MODEL_CLASS.id.in_(source_of_truth_subquery) - def post_delete_op(self, row) -> Callable: - loan_like: LoanAndHoldMixin = row + def post_delete_op(self, row: Loan | Hold) -> Callable: - def post_delete(): + def post_delete() -> None: ce = CirculationEvent event_type = ( - ce.CM_HOLD_EXPIRED - if isinstance(loan_like, Hold) - else ce.CM_LOAN_EXPIRED + CirculationEvent.CM_LOAN_EXPIRED + if isinstance(row, Loan) + else CirculationEvent.CM_HOLD_EXPIRED ) self.analytics.collect_event( - library=loan_like.library, - license_pool=loan_like.license_pool, + library=row.library, + license_pool=row.license_pool, event_type=event_type, + patron=row.patron, ) return post_delete diff --git a/src/palace/manager/core/monitor.py b/src/palace/manager/core/monitor.py index 65420529a..e6cdc0bc0 100644 --- a/src/palace/manager/core/monitor.py +++ b/src/palace/manager/core/monitor.py @@ -3,6 +3,7 @@ import datetime import logging import traceback +from collections.abc import Callable from typing import TYPE_CHECKING from sqlalchemy.exc import InvalidRequestError @@ -883,12 +884,17 @@ def run_once(self, *args, **kwargs): post_delete_ops = [] for i in qu.limit(self.BATCH_SIZE): self.log.info("Deleting %r", i) - post_delete_ops = self.post_delete_op(i) + post_delete_op = self.post_delete_op(i) + if post_delete_op: + post_delete_ops.append(post_delete_op) self.delete(i) rows_deleted += 1 + self._db.commit() + for op in post_delete_ops: op() + count = qu.count() return TimestampData(achievements="Items deleted: %d" % rows_deleted) @@ -901,8 +907,8 @@ def delete(self, row): """ self._db.delete(row) - def post_delete_op(self, row: MODEL_CLASS): - pass + def post_delete_op(self, row: MODEL_CLASS) -> Callable | None: + return None def query(self): return self._db.query(self.MODEL_CLASS).filter(self.where_clause) diff --git a/tests/manager/celery/tasks/test_opds_odl.py b/tests/manager/celery/tasks/test_opds_odl.py index 213802ed1..8129061f1 100644 --- a/tests/manager/celery/tasks/test_opds_odl.py +++ b/tests/manager/celery/tasks/test_opds_odl.py @@ -2,6 +2,7 @@ from unittest.mock import call, patch import pytest +from fixtures.services import ServicesFixture from freezegun import freeze_time from sqlalchemy import func, select @@ -18,6 +19,7 @@ remove_expired_holds_for_collection, ) from palace.manager.service.logging.configuration import LogLevel +from palace.manager.sqlalchemy.model.circulationevent import CirculationEvent from palace.manager.sqlalchemy.model.collection import Collection from palace.manager.sqlalchemy.model.licensing import License, LicensePool from palace.manager.sqlalchemy.model.patron import Hold, Patron @@ -29,8 +31,9 @@ class OpdsTaskFixture: - def __init__(self, db: DatabaseTransactionFixture): + def __init__(self, db: DatabaseTransactionFixture, services: ServicesFixture): self.db = db + self.services = services self.two_weeks_ago = utc_now() - timedelta(weeks=2) self.yesterday = utc_now() - timedelta(days=1) @@ -117,8 +120,10 @@ def pool_with_licenses( @pytest.fixture -def opds_task_fixture(db: DatabaseTransactionFixture) -> OpdsTaskFixture: - return OpdsTaskFixture(db) +def opds_task_fixture( + db: DatabaseTransactionFixture, services_fixture: ServicesFixture +) -> OpdsTaskFixture: + return OpdsTaskFixture(db, services_fixture) def _hold_sort_key(hold: Hold) -> int: @@ -128,7 +133,9 @@ def _hold_sort_key(hold: Hold) -> int: def test_remove_expired_holds_for_collection( - db: DatabaseTransactionFixture, opds_task_fixture: OpdsTaskFixture + db: DatabaseTransactionFixture, + opds_task_fixture: OpdsTaskFixture, + celery_fixture: CeleryFixture, ): collection = db.collection(protocol=OPDS2WithODLApi) decoy_collection = db.collection(protocol=OverdriveAPI) @@ -142,9 +149,13 @@ def test_remove_expired_holds_for_collection( select(func.count()).select_from(LicensePool) ).one() + analytics = opds_task_fixture.services.analytics_fixture.analytics_mock + # Remove the expired holds assert collection.id is not None - removed = remove_expired_holds_for_collection(db.session, collection.id) + removed = remove_expired_holds_for_collection( + db.session, collection.id, analytics=analytics + ) # Assert that the correct holds were removed current_holds = {h.id for h in db.session.scalars(select(Hold))} @@ -163,6 +174,12 @@ def test_remove_expired_holds_for_collection( # Make sure the license pools for those holds were not deleted assert pools_before == pools_after + # verify that the correct analytics calls were made + call_args_list = analytics.collect_event.call_args_list + assert len(call_args_list) == 10 + for call_args in call_args_list: + assert call_args.kwargs["event_type"] == CirculationEvent.CM_HOLD_EXPIRED + def test_licensepools_with_holds( db: DatabaseTransactionFixture, opds_task_fixture: OpdsTaskFixture @@ -209,8 +226,9 @@ def test_recalculate_holds_for_licensepool( collection = db.collection(protocol=OPDS2WithODLApi) pool, [license1, license2] = opds_task_fixture.pool_with_licenses(collection) + analytics = opds_task_fixture.services.analytics_fixture.analytics_mock # Recalculate the hold queue - recalculate_holds_for_licensepool(pool, timedelta(days=5)) + recalculate_holds_for_licensepool(pool, timedelta(days=5), analytics=analytics) current_holds = pool.get_active_holds() assert len(current_holds) == 20 @@ -221,7 +239,7 @@ def test_recalculate_holds_for_licensepool( license1.checkouts_available = 1 license2.checkouts_available = 2 reservation_time = timedelta(days=5) - recalculate_holds_for_licensepool(pool, reservation_time) + recalculate_holds_for_licensepool(pool, reservation_time, analytics) assert pool.licenses_reserved == 3 assert pool.licenses_available == 0 @@ -250,6 +268,15 @@ def test_recalculate_holds_for_licensepool( ) assert hold.start and expected_start and hold.start >= expected_start + # verify that the correct analytics calls were made + call_args_list = analytics.collect_event.call_args_list + assert len(call_args_list) == 3 + for call_args in call_args_list: + assert ( + call_args.kwargs["event_type"] + == CirculationEvent.CM_HOLD_READY_FOR_CHECKOUT + ) + def test_remove_expired_holds( celery_fixture: CeleryFixture, From 1f37950c1120581024f7f96af8f591888b4d8af8 Mon Sep 17 00:00:00 2001 From: Daniel Bernstein Date: Mon, 18 Nov 2024 12:37:19 -0800 Subject: [PATCH 03/18] Fix tests and mypy --- src/palace/manager/api/monitor.py | 2 +- src/palace/manager/core/monitor.py | 2 +- tests/manager/celery/tasks/test_opds_odl.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/palace/manager/api/monitor.py b/src/palace/manager/api/monitor.py index bdf701481..9c5c54635 100644 --- a/src/palace/manager/api/monitor.py +++ b/src/palace/manager/api/monitor.py @@ -56,7 +56,7 @@ def post_delete() -> None: if isinstance(row, Loan) else CirculationEvent.CM_HOLD_EXPIRED ) - self.analytics.collect_event( + self.services.analytics.collect_event( library=row.library, license_pool=row.license_pool, event_type=event_type, diff --git a/src/palace/manager/core/monitor.py b/src/palace/manager/core/monitor.py index e6cdc0bc0..9182f2abd 100644 --- a/src/palace/manager/core/monitor.py +++ b/src/palace/manager/core/monitor.py @@ -907,7 +907,7 @@ def delete(self, row): """ self._db.delete(row) - def post_delete_op(self, row: MODEL_CLASS) -> Callable | None: + def post_delete_op(self, row) -> Callable | None: return None def query(self): diff --git a/tests/manager/celery/tasks/test_opds_odl.py b/tests/manager/celery/tasks/test_opds_odl.py index 8129061f1..54bea5dc3 100644 --- a/tests/manager/celery/tasks/test_opds_odl.py +++ b/tests/manager/celery/tasks/test_opds_odl.py @@ -2,7 +2,6 @@ from unittest.mock import call, patch import pytest -from fixtures.services import ServicesFixture from freezegun import freeze_time from sqlalchemy import func, select @@ -28,6 +27,7 @@ from tests.fixtures.celery import CeleryFixture from tests.fixtures.database import DatabaseTransactionFixture from tests.fixtures.redis import RedisFixture +from tests.fixtures.services import ServicesFixture class OpdsTaskFixture: From 658cb8866958fa6620972516d37ea4a70e228e8c Mon Sep 17 00:00:00 2001 From: Daniel Bernstein Date: Mon, 18 Nov 2024 13:27:29 -0800 Subject: [PATCH 04/18] Add test for hold converted to loan and test for loan converted to hold. --- src/palace/manager/api/circulation.py | 6 ++- .../sqlalchemy/model/circulationevent.py | 2 +- tests/manager/api/test_authenticator.py | 2 +- tests/manager/api/test_circulationapi.py | 43 ++++++++++++++++--- tests/manager/api/test_monitor.py | 24 ++++++++++- tests/mocks/analytics_provider.py | 5 ++- 6 files changed, 72 insertions(+), 10 deletions(-) diff --git a/src/palace/manager/api/circulation.py b/src/palace/manager/api/circulation.py index cc060cf55..b7e423da0 100644 --- a/src/palace/manager/api/circulation.py +++ b/src/palace/manager/api/circulation.py @@ -1205,7 +1205,11 @@ def borrow( self._collect_event(patron, licensepool, CirculationEvent.CM_HOLD_PLACE) if existing_loan: - self._collect_event(patron, licensepool, CirculationEvent.CM_LOAN_REVOKED) + # Send out analytics event capturing the unusual circumstance that a loan was converted to a hold + # TODO: Do we know what the conditions under which this situation can occur? + self._collect_event( + patron, licensepool, CirculationEvent.CM_LOAN_CONVERTED_TO_HOLD + ) self._db.delete(existing_loan) __transaction.commit() return None, hold, is_new diff --git a/src/palace/manager/sqlalchemy/model/circulationevent.py b/src/palace/manager/sqlalchemy/model/circulationevent.py index 4433893c2..c2c1c33f9 100644 --- a/src/palace/manager/sqlalchemy/model/circulationevent.py +++ b/src/palace/manager/sqlalchemy/model/circulationevent.py @@ -103,7 +103,7 @@ class CirculationEvent(Base): CM_HOLD_READY_FOR_CHECKOUT = "circulation_manager_hold_ready" CM_LOAN_EXPIRED = "circulation_manager_loan_expired" CM_HOLD_CONVERTED_TO_LOAN = "circulation_manager_hold_converted_to_loan" - CM_LOAN_REVOKED = "circulation_manager_loan_revoked" + CM_LOAN_CONVERTED_TO_HOLD = "circulation_manager_loan_converted_to_hold" CM_FULFILL = "circulation_manager_fulfill" # Events that we hear about from a distributor. diff --git a/tests/manager/api/test_authenticator.py b/tests/manager/api/test_authenticator.py index 69b2ea3b2..659692de7 100644 --- a/tests/manager/api/test_authenticator.py +++ b/tests/manager/api/test_authenticator.py @@ -420,7 +420,7 @@ def test_get_or_create_patron( assert "2" == patron.authorization_identifier assert default_library == patron.library assert True == is_new - assert CirculationEvent.NEW_PATRON == analytics.event_type + assert CirculationEvent.NEW_PATRON == analytics.last_event_type assert 1 == analytics.count # Patron.neighborhood was set, even though there is no diff --git a/tests/manager/api/test_circulationapi.py b/tests/manager/api/test_circulationapi.py index 27042a399..ece8a7954 100644 --- a/tests/manager/api/test_circulationapi.py +++ b/tests/manager/api/test_circulationapi.py @@ -135,7 +135,7 @@ def test_borrow_sends_analytics_event(self, circulation_api: CirculationAPIFixtu # An analytics event was created. assert 1 == circulation_api.analytics.count - assert CirculationEvent.CM_CHECKOUT == circulation_api.analytics.event_type + assert CirculationEvent.CM_CHECKOUT == circulation_api.analytics.last_event_type # Try to 'borrow' the same book again. circulation_api.remote.queue_checkout(AlreadyCheckedOut()) @@ -362,7 +362,9 @@ def test_hold_sends_analytics_event(self, circulation_api: CirculationAPIFixture # An analytics event was created. assert 1 == circulation_api.analytics.count - assert CirculationEvent.CM_HOLD_PLACE == circulation_api.analytics.event_type + assert ( + CirculationEvent.CM_HOLD_PLACE == circulation_api.analytics.last_event_type + ) # Try to 'borrow' the same book again. circulation_api.remote.queue_checkout(AlreadyOnHold()) @@ -373,6 +375,34 @@ def test_hold_sends_analytics_event(self, circulation_api: CirculationAPIFixture # sent. assert 1 == circulation_api.analytics.count + def test_hold_is_ready_converts_to_loan_on_borrow( + self, circulation_api: CirculationAPIFixture + ): + now = utc_now() + loaninfo = LoanInfo.from_license_pool( + circulation_api.pool, + start_date=now, + end_date=now + timedelta(seconds=3600), + external_identifier=circulation_api.db.fresh_str(), + ) + circulation_api.remote.queue_checkout(loaninfo) + circulation_api.pool.on_hold_to(patron=circulation_api.patron, position=0) + loan, hold, is_new = self.borrow(circulation_api) + + # The Hold is gone and there is a new loan. + assert loan is not None + assert hold is None + assert is_new is True + + assert circulation_api.analytics.count == 2 + # A hold converted analytics event was recorded + assert ( + circulation_api.analytics.event_types[0] + == CirculationEvent.CM_HOLD_CONVERTED_TO_LOAN + ) + # A check event was recorded + assert circulation_api.analytics.event_types[1] == CirculationEvent.CM_CHECKOUT + def test_borrow_with_expired_card_fails( self, circulation_api: CirculationAPIFixture ): @@ -812,7 +842,7 @@ def test_fulfill(self, circulation_api: CirculationAPIFixture): # An analytics event was created. assert 1 == circulation_api.analytics.count - assert CirculationEvent.CM_FULFILL == circulation_api.analytics.event_type + assert CirculationEvent.CM_FULFILL == circulation_api.analytics.last_event_type def test_fulfill_without_loan(self, circulation_api: CirculationAPIFixture): # By default, a title cannot be fulfilled unless there is an active @@ -856,7 +886,7 @@ def test_revoke_loan(self, circulation_api: CirculationAPIFixture, open_access): # An analytics event was created. assert 1 == circulation_api.analytics.count - assert CirculationEvent.CM_CHECKIN == circulation_api.analytics.event_type + assert CirculationEvent.CM_CHECKIN == circulation_api.analytics.last_event_type @pytest.mark.parametrize("open_access", [True, False]) def test_release_hold(self, circulation_api: CirculationAPIFixture, open_access): @@ -872,7 +902,10 @@ def test_release_hold(self, circulation_api: CirculationAPIFixture, open_access) # An analytics event was created. assert 1 == circulation_api.analytics.count - assert CirculationEvent.CM_HOLD_RELEASE == circulation_api.analytics.event_type + assert ( + CirculationEvent.CM_HOLD_RELEASE + == circulation_api.analytics.last_event_type + ) def test__collect_event(self, circulation_api: CirculationAPIFixture): # Test the _collect_event method, which gathers information diff --git a/tests/manager/api/test_monitor.py b/tests/manager/api/test_monitor.py index ba4086759..fe74c15c2 100644 --- a/tests/manager/api/test_monitor.py +++ b/tests/manager/api/test_monitor.py @@ -8,11 +8,13 @@ LoanReaper, ) from palace.manager.api.opds_for_distributors import OPDSForDistributorsAPI +from palace.manager.sqlalchemy.model.circulationevent import CirculationEvent from palace.manager.sqlalchemy.model.datasource import DataSource from palace.manager.sqlalchemy.model.patron import Annotation from palace.manager.sqlalchemy.util import get_one_or_create from palace.manager.util.datetime_helpers import utc_now from tests.fixtures.database import DatabaseTransactionFixture +from tests.fixtures.services import ServicesFixture class TestLoanlikeReaperMonitor: @@ -26,7 +28,9 @@ def test_source_of_truth_protocols(self): OPDSForDistributorsAPI.label() ] - def test_reaping(self, db: DatabaseTransactionFixture): + def test_reaping( + self, db: DatabaseTransactionFixture, services_fixture: ServicesFixture + ): # This patron stopped using the circulation manager a long time # ago. inactive_patron = db.patron() @@ -152,6 +156,7 @@ def test_reaping(self, db: DatabaseTransactionFixture): # Now we fire up the loan reaper. monitor = LoanReaper(db.session) + monitor.services.analytics = services_fixture.analytics_fixture.analytics_mock monitor.run() # All of the inactive patron's loans have been reaped, @@ -171,8 +176,18 @@ def test_reaping(self, db: DatabaseTransactionFixture): assert 2 == len(current_patron.loans) assert 2 == len(current_patron.holds) + call_args_list = ( + services_fixture.analytics_fixture.analytics_mock.collect_event.call_args_list + ) + assert len(call_args_list) == 2 + for call_args in call_args_list: + assert call_args.kwargs["event_type"] == CirculationEvent.CM_LOAN_EXPIRED + # Now fire up the hold reaper. hold_monitor = HoldReaper(db.session) + hold_monitor.services.analytics = ( + services_fixture.analytics_fixture.analytics_mock + ) hold_monitor.run() # All of the inactive patron's holds have been reaped, @@ -181,6 +196,13 @@ def test_reaping(self, db: DatabaseTransactionFixture): assert [sot_hold] == inactive_patron.holds assert 2 == len(current_patron.holds) + call_args_list = ( + services_fixture.analytics_fixture.analytics_mock.collect_event.call_args_list + ) + assert len(call_args_list) == 2 + for call_args in call_args_list: + assert call_args.kwargs["event_type"] == CirculationEvent.CM_HOLD_EXPIRED + class TestIdlingAnnotationReaper: def test_where_clause(self, db: DatabaseTransactionFixture): diff --git a/tests/mocks/analytics_provider.py b/tests/mocks/analytics_provider.py index e2805565e..c9b821379 100644 --- a/tests/mocks/analytics_provider.py +++ b/tests/mocks/analytics_provider.py @@ -13,10 +13,13 @@ def __init__(self, integration=None, services=None, library=None): """ self.count = 0 self.event = None + self.event_types = [] + self.last_event_type = None if integration: self.url = integration.url def collect_event(self, library, lp, event_type, time=None, **kwargs): self.count = self.count + 1 - self.event_type = event_type + self.last_event_type = event_type + self.event_types.append(event_type) self.time = time From 7ee4d16cdaf12ae3a8bbee443d1bd2d5a5981995 Mon Sep 17 00:00:00 2001 From: Daniel Bernstein Date: Tue, 19 Nov 2024 09:11:51 -0800 Subject: [PATCH 05/18] Revert unintended change. --- src/palace/manager/api/circulation_manager.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/palace/manager/api/circulation_manager.py b/src/palace/manager/api/circulation_manager.py index c7f8d1928..94130e20d 100644 --- a/src/palace/manager/api/circulation_manager.py +++ b/src/palace/manager/api/circulation_manager.py @@ -197,9 +197,9 @@ def reload_settings_if_changed(self): CirculationManager's configuration from the database. """ last_update = Configuration.site_configuration_last_update(self._db) - # if last_update > self.site_configuration_last_update: - self.load_settings() - self.site_configuration_last_update = last_update + if last_update > self.site_configuration_last_update: + self.load_settings() + self.site_configuration_last_update = last_update def get_patron_web_domains(self) -> set[str]: """Return the set of patron web client domains.""" From ee5567b94f62af75b3d4cfca403e0302d6082ae0 Mon Sep 17 00:00:00 2001 From: Daniel Bernstein Date: Tue, 19 Nov 2024 09:30:35 -0800 Subject: [PATCH 06/18] Formatting. --- tests/manager/api/test_authenticator.py | 19 +++++----- tests/manager/api/test_circulationapi.py | 46 ++++++++++++------------ 2 files changed, 32 insertions(+), 33 deletions(-) diff --git a/tests/manager/api/test_authenticator.py b/tests/manager/api/test_authenticator.py index 659692de7..d6726673b 100644 --- a/tests/manager/api/test_authenticator.py +++ b/tests/manager/api/test_authenticator.py @@ -417,15 +417,15 @@ def test_get_or_create_patron( patron, is_new = patron_data.get_or_create_patron( db.session, default_library.id, analytics ) - assert "2" == patron.authorization_identifier + assert patron.authorization_identifier == "2" assert default_library == patron.library - assert True == is_new - assert CirculationEvent.NEW_PATRON == analytics.last_event_type - assert 1 == analytics.count + assert is_new == True + assert analytics.last_event_type == CirculationEvent.NEW_PATRON + assert analytics.count == 1 # Patron.neighborhood was set, even though there is no # value and that's not a database field. - assert None == patron.neighborhood + assert patron.neighborhood is None # Set a neighborhood and try again. patron_data.neighborhood = "Achewood" @@ -435,15 +435,14 @@ def test_get_or_create_patron( patron, is_new = patron_data.get_or_create_patron( db.session, default_library.id, analytics ) - assert "2" == patron.authorization_identifier - assert False == is_new - assert "Achewood" == patron.neighborhood - assert 1 == analytics.count + assert patron.authorization_identifier == "2" + assert is_new == False + assert patron.neighborhood == "Achewood" + assert analytics.count == 1 def test_to_response_parameters(self, patron_data: PatronData): params = patron_data.to_response_parameters assert dict(name="4") == params - patron_data.personal_name = None params = patron_data.to_response_parameters assert dict() == params diff --git a/tests/manager/api/test_circulationapi.py b/tests/manager/api/test_circulationapi.py index ece8a7954..5dd707a36 100644 --- a/tests/manager/api/test_circulationapi.py +++ b/tests/manager/api/test_circulationapi.py @@ -129,18 +129,18 @@ def test_borrow_sends_analytics_event(self, circulation_api: CirculationAPIFixtu # The Loan looks good. assert loaninfo.identifier == loan.license_pool.identifier.identifier assert circulation_api.patron == loan.patron - assert None == hold - assert True == is_new + assert hold is None + assert is_new == True assert loaninfo.external_identifier == loan.external_identifier # An analytics event was created. - assert 1 == circulation_api.analytics.count - assert CirculationEvent.CM_CHECKOUT == circulation_api.analytics.last_event_type + assert circulation_api.analytics.count == 1 + assert circulation_api.analytics.last_event_type == CirculationEvent.CM_CHECKOUT # Try to 'borrow' the same book again. circulation_api.remote.queue_checkout(AlreadyCheckedOut()) loan, hold, is_new = self.borrow(circulation_api) - assert False == is_new + assert is_new == False assert loaninfo.external_identifier == loan.external_identifier # Since the loan already existed, no new analytics event was @@ -150,18 +150,18 @@ def test_borrow_sends_analytics_event(self, circulation_api: CirculationAPIFixtu # Now try to renew the book. circulation_api.remote.queue_checkout(loaninfo) loan, hold, is_new = self.borrow(circulation_api) - assert False == is_new + assert is_new == False # Renewals are counted as loans, since from an accounting # perspective they _are_ loans. - assert 2 == circulation_api.analytics.count + assert circulation_api.analytics.count == 2 # Loans of open-access books go through a different code # path, but they count as loans nonetheless. circulation_api.pool.open_access = True circulation_api.remote.queue_checkout(loaninfo) loan, hold, is_new = self.borrow(circulation_api) - assert 3 == circulation_api.analytics.count + assert circulation_api.analytics.count == 3 @freeze_time() def test_attempt_borrow_with_existing_remote_loan( @@ -357,23 +357,23 @@ def test_hold_sends_analytics_event(self, circulation_api: CirculationAPIFixture # The Hold looks good. assert holdinfo.identifier == hold.license_pool.identifier.identifier assert circulation_api.patron == hold.patron - assert None == loan - assert True == is_new + assert loan is None + assert is_new == True # An analytics event was created. assert 1 == circulation_api.analytics.count assert ( - CirculationEvent.CM_HOLD_PLACE == circulation_api.analytics.last_event_type + circulation_api.analytics.last_event_type == CirculationEvent.CM_HOLD_PLACE ) # Try to 'borrow' the same book again. circulation_api.remote.queue_checkout(AlreadyOnHold()) loan, hold, is_new = self.borrow(circulation_api) - assert False == is_new + assert is_new == False # Since the hold already existed, no new analytics event was # sent. - assert 1 == circulation_api.analytics.count + assert circulation_api.analytics.count == 1 def test_hold_is_ready_converts_to_loan_on_borrow( self, circulation_api: CirculationAPIFixture @@ -838,11 +838,11 @@ def test_fulfill(self, circulation_api: CirculationAPIFixture): ) # The fulfillment looks good. - assert fulfillment == result + assert result == fulfillment # An analytics event was created. - assert 1 == circulation_api.analytics.count - assert CirculationEvent.CM_FULFILL == circulation_api.analytics.last_event_type + assert circulation_api.analytics.count == 1 + assert circulation_api.analytics.last_event_type == CirculationEvent.CM_FULFILL def test_fulfill_without_loan(self, circulation_api: CirculationAPIFixture): # By default, a title cannot be fulfilled unless there is an active @@ -882,11 +882,11 @@ def test_revoke_loan(self, circulation_api: CirculationAPIFixture, open_access): result = circulation_api.circulation.revoke_loan( circulation_api.patron, "1234", circulation_api.pool ) - assert True == result + assert result == True # An analytics event was created. - assert 1 == circulation_api.analytics.count - assert CirculationEvent.CM_CHECKIN == circulation_api.analytics.last_event_type + assert circulation_api.analytics.count == 1 + assert circulation_api.analytics.last_event_type == CirculationEvent.CM_CHECKIN @pytest.mark.parametrize("open_access", [True, False]) def test_release_hold(self, circulation_api: CirculationAPIFixture, open_access): @@ -898,13 +898,13 @@ def test_release_hold(self, circulation_api: CirculationAPIFixture, open_access) result = circulation_api.circulation.release_hold( circulation_api.patron, "1234", circulation_api.pool ) - assert True == result + assert result == True # An analytics event was created. - assert 1 == circulation_api.analytics.count + assert circulation_api.analytics.count == 1 assert ( - CirculationEvent.CM_HOLD_RELEASE - == circulation_api.analytics.last_event_type + circulation_api.analytics.last_event_type + == CirculationEvent.CM_HOLD_RELEASE ) def test__collect_event(self, circulation_api: CirculationAPIFixture): From 9d9001b752a80e047818e95b3f6e7160649e1d47 Mon Sep 17 00:00:00 2001 From: Daniel Bernstein Date: Tue, 19 Nov 2024 09:46:30 -0800 Subject: [PATCH 07/18] Fix broken tests. --- src/palace/manager/api/monitor.py | 31 +++++++++++++----------------- src/palace/manager/core/monitor.py | 7 ++----- tests/manager/api/test_monitor.py | 25 ++++++++++++------------ 3 files changed, 27 insertions(+), 36 deletions(-) diff --git a/src/palace/manager/api/monitor.py b/src/palace/manager/api/monitor.py index 9c5c54635..b22014a36 100644 --- a/src/palace/manager/api/monitor.py +++ b/src/palace/manager/api/monitor.py @@ -1,5 +1,3 @@ -from collections.abc import Callable - from sqlalchemy import and_, or_ from palace.manager.api.opds_for_distributors import OPDSForDistributorsAPI @@ -47,23 +45,20 @@ def where_clause(self): ) return ~self.MODEL_CLASS.id.in_(source_of_truth_subquery) - def post_delete_op(self, row: Loan | Hold) -> Callable: - - def post_delete() -> None: - ce = CirculationEvent - event_type = ( - CirculationEvent.CM_LOAN_EXPIRED - if isinstance(row, Loan) - else CirculationEvent.CM_HOLD_EXPIRED - ) - self.services.analytics.collect_event( - library=row.library, - license_pool=row.license_pool, - event_type=event_type, - patron=row.patron, - ) + def post_delete(self, row: Loan | Hold) -> None: + ce = CirculationEvent + event_type = ( + CirculationEvent.CM_LOAN_EXPIRED + if isinstance(row, Loan) + else CirculationEvent.CM_HOLD_EXPIRED + ) - return post_delete + self.services.analytics.collect_event( + library=row.library, + license_pool=row.license_pool, + event_type=event_type, + patron=row.patron, + ) class LoanReaper(LoanlikeReaperMonitor): diff --git a/src/palace/manager/core/monitor.py b/src/palace/manager/core/monitor.py index 9182f2abd..914f61276 100644 --- a/src/palace/manager/core/monitor.py +++ b/src/palace/manager/core/monitor.py @@ -3,7 +3,6 @@ import datetime import logging import traceback -from collections.abc import Callable from typing import TYPE_CHECKING from sqlalchemy.exc import InvalidRequestError @@ -884,10 +883,8 @@ def run_once(self, *args, **kwargs): post_delete_ops = [] for i in qu.limit(self.BATCH_SIZE): self.log.info("Deleting %r", i) - post_delete_op = self.post_delete_op(i) - if post_delete_op: - post_delete_ops.append(post_delete_op) self.delete(i) + self.post_delete(i) rows_deleted += 1 self._db.commit() @@ -907,7 +904,7 @@ def delete(self, row): """ self._db.delete(row) - def post_delete_op(self, row) -> Callable | None: + def post_delete(self, row) -> None: return None def query(self): diff --git a/tests/manager/api/test_monitor.py b/tests/manager/api/test_monitor.py index fe74c15c2..3ecbec9a8 100644 --- a/tests/manager/api/test_monitor.py +++ b/tests/manager/api/test_monitor.py @@ -168,20 +168,13 @@ def test_reaping( assert {open_access_loan, sot_loan, unlimited_access_loan} == set( inactive_patron.loans ) - assert 3 == len(inactive_patron.holds) + assert len(inactive_patron.holds) == 3 # The active patron's loans and holds are unaffected, either # because they have not expired or because they have no known # expiration date and were created relatively recently. - assert 2 == len(current_patron.loans) - assert 2 == len(current_patron.holds) - - call_args_list = ( - services_fixture.analytics_fixture.analytics_mock.collect_event.call_args_list - ) - assert len(call_args_list) == 2 - for call_args in call_args_list: - assert call_args.kwargs["event_type"] == CirculationEvent.CM_LOAN_EXPIRED + assert len(current_patron.loans) == 2 + assert len(current_patron.holds) == 2 # Now fire up the hold reaper. hold_monitor = HoldReaper(db.session) @@ -196,12 +189,18 @@ def test_reaping( assert [sot_hold] == inactive_patron.holds assert 2 == len(current_patron.holds) + # verify expected circ event count and order for two monitor operations. call_args_list = ( services_fixture.analytics_fixture.analytics_mock.collect_event.call_args_list ) - assert len(call_args_list) == 2 - for call_args in call_args_list: - assert call_args.kwargs["event_type"] == CirculationEvent.CM_HOLD_EXPIRED + assert len(call_args_list) == 4 + event_types = [call_args.kwargs["event_type"] for call_args in call_args_list] + assert event_types == [ + CirculationEvent.CM_LOAN_EXPIRED, + CirculationEvent.CM_LOAN_EXPIRED, + CirculationEvent.CM_HOLD_EXPIRED, + CirculationEvent.CM_HOLD_EXPIRED, + ] class TestIdlingAnnotationReaper: From e8d9eaa8d6021cb5bcb25242041da48b3c82f99e Mon Sep 17 00:00:00 2001 From: Daniel Bernstein Date: Tue, 19 Nov 2024 10:08:27 -0800 Subject: [PATCH 08/18] Make loan.patron_id and hold.patron_id fields non-nullable. --- ...make_loan_patron_id_and_hold_patron_id_.py | 43 +++++++++++++++++++ src/palace/manager/sqlalchemy/model/patron.py | 6 +++ 2 files changed, 49 insertions(+) create mode 100644 alembic/versions/20241119_58b0ae7f5b67_make_loan_patron_id_and_hold_patron_id_.py diff --git a/alembic/versions/20241119_58b0ae7f5b67_make_loan_patron_id_and_hold_patron_id_.py b/alembic/versions/20241119_58b0ae7f5b67_make_loan_patron_id_and_hold_patron_id_.py new file mode 100644 index 000000000..cfd409637 --- /dev/null +++ b/alembic/versions/20241119_58b0ae7f5b67_make_loan_patron_id_and_hold_patron_id_.py @@ -0,0 +1,43 @@ +"""Make Loan.patron_id and Hold.patron_id non-nullable. + +Revision ID: 58b0ae7f5b67 +Revises: 272da5f400de +Create Date: 2024-11-19 18:04:24.182444+00:00 + +""" + +from alembic import op + +# revision identifiers, used by Alembic. +revision = "58b0ae7f5b67" +down_revision = "272da5f400de" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.alter_column( + table_name="loans", + column_name="patron_id", + nullable=False, + ) + + op.alter_column( + table_name="holds", + column_name="patron_id", + nullable=False, + ) + + +def downgrade() -> None: + op.alter_column( + table_name="loans", + column_name="patron_id", + nullable=True, + ) + + op.alter_column( + table_name="holds", + column_name="patron_id", + nullable=True, + ) diff --git a/src/palace/manager/sqlalchemy/model/patron.py b/src/palace/manager/sqlalchemy/model/patron.py index 88c7f5912..186809304 100644 --- a/src/palace/manager/sqlalchemy/model/patron.py +++ b/src/palace/manager/sqlalchemy/model/patron.py @@ -571,6 +571,7 @@ class Hold(Base, LoanAndHoldMixin): """A patron is in line to check out a book.""" __tablename__ = "holds" +<<<<<<< HEAD id: Mapped[int] = Column(Integer, primary_key=True) patron_id: Mapped[int] = Column( Integer, ForeignKey("patrons.id"), index=True, nullable=False @@ -581,6 +582,11 @@ class Hold(Base, LoanAndHoldMixin): license_pool_id: Mapped[int] = Column( Integer, ForeignKey("licensepools.id"), index=True, nullable=False ) +======= + id = Column(Integer, primary_key=True) + patron_id = Column(Integer, ForeignKey("patrons.id"), index=True, nullable=False) + license_pool_id = Column(Integer, ForeignKey("licensepools.id"), index=True) +>>>>>>> 0e3659672 (Make loan.patron_id and hold.patron_id fields non-nullable.) license_pool: Mapped[LicensePool] = relationship( "LicensePool", back_populates="holds" ) From 3f6a31ede9187de7ad1d2197489634f587d4104c Mon Sep 17 00:00:00 2001 From: Daniel Bernstein Date: Tue, 3 Dec 2024 10:16:16 -0800 Subject: [PATCH 09/18] Improve test formatting. --- tests/manager/api/test_authenticator.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/manager/api/test_authenticator.py b/tests/manager/api/test_authenticator.py index d6726673b..2f892d83a 100644 --- a/tests/manager/api/test_authenticator.py +++ b/tests/manager/api/test_authenticator.py @@ -404,7 +404,7 @@ def test_apply_on_incomplete_information(self, db: DatabaseTransactionFixture): ) authenticated_by_weird_identifier.apply(patron) assert "1234" == patron.authorization_identifier - assert None == patron.last_external_sync + assert patron.last_external_sync is None def test_get_or_create_patron( self, patron_data: PatronData, db: DatabaseTransactionFixture @@ -419,7 +419,7 @@ def test_get_or_create_patron( ) assert patron.authorization_identifier == "2" assert default_library == patron.library - assert is_new == True + assert is_new is True assert analytics.last_event_type == CirculationEvent.NEW_PATRON assert analytics.count == 1 @@ -436,7 +436,7 @@ def test_get_or_create_patron( db.session, default_library.id, analytics ) assert patron.authorization_identifier == "2" - assert is_new == False + assert is_new is False assert patron.neighborhood == "Achewood" assert analytics.count == 1 From 463ba28e97f0b57bfc74aca54b87938e6f5058da Mon Sep 17 00:00:00 2001 From: Daniel Bernstein Date: Tue, 3 Dec 2024 12:00:55 -0800 Subject: [PATCH 10/18] Create separate tasks for reaping holds by collection. Also use with_for_update to lock expired holds before deletion Also ensure that transaction is closed before collecting analytics events. --- src/palace/manager/api/monitor.py | 24 +++-- src/palace/manager/celery/tasks/opds_odl.py | 101 ++++++++++++++------ src/palace/manager/core/monitor.py | 14 +-- tests/manager/celery/tasks/test_opds_odl.py | 68 +++++++------ 4 files changed, 131 insertions(+), 76 deletions(-) diff --git a/src/palace/manager/api/monitor.py b/src/palace/manager/api/monitor.py index b22014a36..ed112a6ee 100644 --- a/src/palace/manager/api/monitor.py +++ b/src/palace/manager/api/monitor.py @@ -15,6 +15,10 @@ class LoanlikeReaperMonitor(ReaperMonitor): OPDSForDistributorsAPI.label(), ] + def __init__(self, *args, **kwargs): + super().__init__(args, kwargs) + self._events_to_be_logged = [] + @property def where_clause(self): """We never want to automatically reap loans or holds for situations @@ -45,20 +49,24 @@ def where_clause(self): ) return ~self.MODEL_CLASS.id.in_(source_of_truth_subquery) - def post_delete(self, row: Loan | Hold) -> None: + def delete(self, row) -> None: ce = CirculationEvent - event_type = ( - CirculationEvent.CM_LOAN_EXPIRED - if isinstance(row, Loan) - else CirculationEvent.CM_HOLD_EXPIRED - ) - - self.services.analytics.collect_event( + event_type = ce.CM_LOAN_EXPIRED if isinstance(row, Loan) else ce.CM_HOLD_EXPIRED + event = dict( library=row.library, license_pool=row.license_pool, event_type=event_type, patron=row.patron, ) + super().delete(row) + self.events_to_be_logged.append(event) + + def after_commit(self) -> None: + super().after_commit() + copy_of_list = list(self._events_to_be_logged) + for event in copy_of_list: + self.services.analytics.collect_event(**event) + self._events_to_be_logged.remove(event) class LoanReaper(LoanlikeReaperMonitor): diff --git a/src/palace/manager/celery/tasks/opds_odl.py b/src/palace/manager/celery/tasks/opds_odl.py index 325631901..cce437386 100644 --- a/src/palace/manager/celery/tasks/opds_odl.py +++ b/src/palace/manager/celery/tasks/opds_odl.py @@ -1,4 +1,5 @@ import datetime +from typing import Any from celery import shared_task from sqlalchemy import delete, select @@ -7,7 +8,6 @@ from palace.manager.api.odl.api import OPDS2WithODLApi from palace.manager.celery.task import Task -from palace.manager.service.analytics.analytics import Analytics from palace.manager.service.celery.celery import QueueNames from palace.manager.service.redis.models.lock import RedisLock from palace.manager.service.redis.redis import Redis @@ -19,13 +19,31 @@ def remove_expired_holds_for_collection( - db: Session, collection_id: int, analytics: Analytics -) -> int: + db: Session, + collection_id: int, +) -> tuple[int, dict[str, Any]]: """ Remove expired holds from the database for this collection. """ # generate expiration events for expired holds before deleting them + # lock rows + lock_query = ( + select(Hold.id) + .where( + Hold.position == 0, + Hold.end < utc_now(), + Hold.license_pool_id == LicensePool.id, + LicensePool.collection_id == collection_id, + ) + .with_for_update() + ) + + db.execute(lock_query).all() + + # a separate query is required to get around the + # "FOR UPDATE cannot be applied to the nullable side of an outer join" issue when trying to use with_for_update + # on the Hold object. select_query = select(Hold).where( Hold.position == 0, Hold.end < utc_now(), @@ -34,13 +52,17 @@ def remove_expired_holds_for_collection( ) expired_holds = db.scalars(select_query).all() + expired_hold_events: [dict[str, Any]] = [] for hold in expired_holds: - analytics.collect_event( - library=hold.library, - license_pool=hold.license_pool, - event_type=CirculationEvent.CM_HOLD_EXPIRED, - patron=hold.patron, + expired_hold_events.append( + dict( + library=hold.library, + license_pool=hold.license_pool, + event_type=CirculationEvent.CM_HOLD_EXPIRED, + patron=hold.patron, + ) ) + # delete the holds query = ( delete(Hold) @@ -53,11 +75,12 @@ def remove_expired_holds_for_collection( .execution_options(synchronize_session="fetch") ) result = db.execute(query) + # We need the type ignores here because result doesn't always have # a rowcount, but the sqlalchemy docs swear it will in the case of # a delete statement. # https://docs.sqlalchemy.org/en/20/tutorial/data_update.html#getting-affected-row-count-from-update-delete - return result.rowcount # type: ignore[attr-defined,no-any-return] + return result.rowcount, expired_hold_events # type: ignore[attr-defined,no-any-return] def licensepool_ids_with_holds( @@ -96,8 +119,7 @@ def lock_licenses(license_pool: LicensePool) -> None: def recalculate_holds_for_licensepool( license_pool: LicensePool, reservation_period: datetime.timedelta, - analytics: Analytics, -) -> int: +) -> tuple[int, dict[str, Any]]: # We take out row level locks on all the licenses and holds for this license pool, so that # everything is in a consistent state while we update the hold queue. This means we should be # quickly committing the transaction, to avoid contention or deadlocks. @@ -111,6 +133,8 @@ def recalculate_holds_for_licensepool( waiting = holds[reserved:] updated = 0 + events: [dict[str, Any]] = [] + # These holds have a copy reserved for them. for hold in ready: # If this hold isn't already in position 0, the hold just became available. @@ -119,11 +143,13 @@ def recalculate_holds_for_licensepool( hold.position = 0 hold.end = utc_now() + reservation_period updated += 1 - analytics.collect_event( - library=hold.library, - license_pool=hold.license_pool, - event_type=CirculationEvent.CM_HOLD_READY_FOR_CHECKOUT, - patron=hold.patron, + events.append( + dict( + library=hold.library, + license_pool=hold.license_pool, + event_type=CirculationEvent.CM_HOLD_READY_FOR_CHECKOUT, + patron=hold.patron, + ) ) # Update the position for the remaining holds. @@ -134,17 +160,37 @@ def recalculate_holds_for_licensepool( hold.end = None updated += 1 - return updated + return updated, events + + +@shared_task(queue=QueueNames.default, bind=True) +def remove_expired_holds_for_collection_task(task: Task, collection_id: int) -> None: + """ + A shared task for removing expired holds from the database for a collection + """ + analytics = task.services.analytics.analytics() + with task.transaction() as session: + collection = Collection.by_id(session, collection_id) + removed, events = remove_expired_holds_for_collection( + session, + collection_id, + ) + task.log.info( + f"Removed {removed} expired holds for collection {collection.name} ({collection_id})." + ) + + # publish events only after successful commit + for event in events: + analytics.collect_event(**event) @shared_task(queue=QueueNames.default, bind=True) def remove_expired_holds(task: Task) -> None: """ - Remove expired holds from the database. + Issue remove expired hold tasks for eligible collections """ registry = task.services.integration_registry.license_providers() protocols = registry.get_protocols(OPDS2WithODLApi, default=False) - analytics = task.services.analytics.analytics() with task.session() as session: collections = [ (collection.id, collection.name) @@ -152,13 +198,7 @@ def remove_expired_holds(task: Task) -> None: if collection.id is not None ] for collection_id, collection_name in collections: - with task.transaction() as session: - removed = remove_expired_holds_for_collection( - session, collection_id, analytics - ) - task.log.info( - f"Removed {removed} expired holds for collection {collection_name} ({collection_id})." - ) + remove_expired_holds_for_collection.delay(collection_id) @shared_task(queue=QueueNames.default, bind=True) @@ -191,6 +231,7 @@ def recalculate_hold_queue_collection( Recalculate the hold queue for a collection. """ lock = _redis_lock_recalculate_holds(task.services.redis.client(), collection_id) + analytics = task.services.analytics.analytics() with lock.lock() as locked: if not locked: task.log.info( @@ -233,11 +274,9 @@ def recalculate_hold_queue_collection( ) continue - analytics = task.services.analytics.analytics() - updated = recalculate_holds_for_licensepool( + updated, events = recalculate_holds_for_licensepool( license_pool, reservation_period, - analytics, ) edition = license_pool.presentation_edition title = edition.title if edition else None @@ -247,6 +286,10 @@ def recalculate_hold_queue_collection( f"{updated} holds out of date." ) + # fire events after successful database update + for event in events: + analytics.collect_event(**event) + if len(license_pool_ids) == batch_size: # We are done this batch, but there is probably more work to do, we queue up the next batch. raise task.replace( diff --git a/src/palace/manager/core/monitor.py b/src/palace/manager/core/monitor.py index 914f61276..444f04579 100644 --- a/src/palace/manager/core/monitor.py +++ b/src/palace/manager/core/monitor.py @@ -880,22 +880,21 @@ def run_once(self, *args, **kwargs): count = qu.count() self.log.info("Deleting %d row(s)", count) while count > 0: - post_delete_ops = [] for i in qu.limit(self.BATCH_SIZE): self.log.info("Deleting %r", i) self.delete(i) - self.post_delete(i) rows_deleted += 1 - self._db.commit() - for op in post_delete_ops: - op() + self.after_commit() count = qu.count() return TimestampData(achievements="Items deleted: %d" % rows_deleted) - def delete(self, row): + def after_commit(self) -> None: + return None + + def delete(self, row) -> None: """Delete a row from the database. CAUTION: If you override this method such that it doesn't @@ -904,9 +903,6 @@ def delete(self, row): """ self._db.delete(row) - def post_delete(self, row) -> None: - return None - def query(self): return self._db.query(self.MODEL_CLASS).filter(self.where_clause) diff --git a/tests/manager/celery/tasks/test_opds_odl.py b/tests/manager/celery/tasks/test_opds_odl.py index 54bea5dc3..8c245cd96 100644 --- a/tests/manager/celery/tasks/test_opds_odl.py +++ b/tests/manager/celery/tasks/test_opds_odl.py @@ -16,6 +16,7 @@ recalculate_holds_for_licensepool, remove_expired_holds, remove_expired_holds_for_collection, + remove_expired_holds_for_collection_task, ) from palace.manager.service.logging.configuration import LogLevel from palace.manager.sqlalchemy.model.circulationevent import CirculationEvent @@ -149,12 +150,11 @@ def test_remove_expired_holds_for_collection( select(func.count()).select_from(LicensePool) ).one() - analytics = opds_task_fixture.services.analytics_fixture.analytics_mock - # Remove the expired holds assert collection.id is not None - removed = remove_expired_holds_for_collection( - db.session, collection.id, analytics=analytics + removed, events = remove_expired_holds_for_collection( + db.session, + collection.id, ) # Assert that the correct holds were removed @@ -175,10 +175,9 @@ def test_remove_expired_holds_for_collection( assert pools_before == pools_after # verify that the correct analytics calls were made - call_args_list = analytics.collect_event.call_args_list - assert len(call_args_list) == 10 - for call_args in call_args_list: - assert call_args.kwargs["event_type"] == CirculationEvent.CM_HOLD_EXPIRED + assert len(events) == 10 + for event in events: + assert event["event_type"] == CirculationEvent.CM_HOLD_EXPIRED def test_licensepools_with_holds( @@ -228,7 +227,7 @@ def test_recalculate_holds_for_licensepool( analytics = opds_task_fixture.services.analytics_fixture.analytics_mock # Recalculate the hold queue - recalculate_holds_for_licensepool(pool, timedelta(days=5), analytics=analytics) + recalculate_holds_for_licensepool(pool, timedelta(days=5)) current_holds = pool.get_active_holds() assert len(current_holds) == 20 @@ -239,7 +238,7 @@ def test_recalculate_holds_for_licensepool( license1.checkouts_available = 1 license2.checkouts_available = 2 reservation_time = timedelta(days=5) - recalculate_holds_for_licensepool(pool, reservation_time, analytics) + _, events = recalculate_holds_for_licensepool(pool, reservation_time) assert pool.licenses_reserved == 3 assert pool.licenses_available == 0 @@ -268,42 +267,51 @@ def test_recalculate_holds_for_licensepool( ) assert hold.start and expected_start and hold.start >= expected_start - # verify that the correct analytics calls were made - call_args_list = analytics.collect_event.call_args_list - assert len(call_args_list) == 3 - for call_args in call_args_list: - assert ( - call_args.kwargs["event_type"] - == CirculationEvent.CM_HOLD_READY_FOR_CHECKOUT - ) + # verify that the correct analytics events were returned + assert len(events) == 3 + for event in events: + assert event["event_type"] == CirculationEvent.CM_HOLD_READY_FOR_CHECKOUT -def test_remove_expired_holds( +def test_remove_expired_holds_for_collection( celery_fixture: CeleryFixture, db: DatabaseTransactionFixture, opds_task_fixture: OpdsTaskFixture, ): collection1 = db.collection(protocol=OPDS2WithODLApi) - collection2 = db.collection(protocol=OPDS2WithODLApi) - decoy_collection = db.collection(protocol=OverdriveAPI) expired_holds1, non_expired_holds1 = opds_task_fixture.holds(collection1) - expired_holds2, non_expired_holds2 = opds_task_fixture.holds(collection2) - decoy_expired_holds, decoy_non_expired_holds = opds_task_fixture.holds( - decoy_collection - ) # Remove the expired holds - remove_expired_holds.delay().wait() + remove_expired_holds_for_collection_task.delay(collection1.id).wait() + + assert len( + opds_task_fixture.services.analytics_fixture.analytics_mock.method_calls + ) == len(expired_holds1) current_holds = {h.id for h in db.session.scalars(select(Hold))} assert expired_holds1.isdisjoint(current_holds) - assert expired_holds2.isdisjoint(current_holds) - assert decoy_non_expired_holds.issubset(current_holds) - assert decoy_expired_holds.issubset(current_holds) assert non_expired_holds1.issubset(current_holds) - assert non_expired_holds2.issubset(current_holds) + + +def test_remove_expired_holds( + celery_fixture: CeleryFixture, + redis_fixture: RedisFixture, + db: DatabaseTransactionFixture, + opds_task_fixture: OpdsTaskFixture, +): + collection1 = db.collection(protocol=OPDS2WithODLApi) + collection2 = db.collection(protocol=OPDS2WithODLApi) + decoy_collection = db.collection(protocol=OverdriveAPI) + + with patch.object(opds_odl, "remove_expired_holds_for_collection") as mock_remove: + remove_expired_holds.delay().wait() + + assert mock_remove.delay.call_count == 2 + mock_remove.delay.assert_has_calls( + [call(collection1.id), call(collection2.id)], any_order=True + ) def test_recalculate_hold_queue( From 8e84469747da8357e1f8b8c59d0072bc76d97ff6 Mon Sep 17 00:00:00 2001 From: Daniel Bernstein Date: Wed, 4 Dec 2024 08:59:36 -0800 Subject: [PATCH 11/18] Change date on revision to ensure it reflects the order of application. --- ...58b0ae7f5b67_make_loan_patron_id_and_hold_patron_id_.py} | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) rename alembic/versions/{20241119_58b0ae7f5b67_make_loan_patron_id_and_hold_patron_id_.py => 20241204_58b0ae7f5b67_make_loan_patron_id_and_hold_patron_id_.py} (87%) diff --git a/alembic/versions/20241119_58b0ae7f5b67_make_loan_patron_id_and_hold_patron_id_.py b/alembic/versions/20241204_58b0ae7f5b67_make_loan_patron_id_and_hold_patron_id_.py similarity index 87% rename from alembic/versions/20241119_58b0ae7f5b67_make_loan_patron_id_and_hold_patron_id_.py rename to alembic/versions/20241204_58b0ae7f5b67_make_loan_patron_id_and_hold_patron_id_.py index cfd409637..0b051e3d3 100644 --- a/alembic/versions/20241119_58b0ae7f5b67_make_loan_patron_id_and_hold_patron_id_.py +++ b/alembic/versions/20241204_58b0ae7f5b67_make_loan_patron_id_and_hold_patron_id_.py @@ -1,8 +1,8 @@ """Make Loan.patron_id and Hold.patron_id non-nullable. Revision ID: 58b0ae7f5b67 -Revises: 272da5f400de -Create Date: 2024-11-19 18:04:24.182444+00:00 +Revises: c3458e1ef9aa +Create Date: 2024-12-04 08:04:24.182444+00:00 """ @@ -10,7 +10,7 @@ # revision identifiers, used by Alembic. revision = "58b0ae7f5b67" -down_revision = "272da5f400de" +down_revision = "c3458e1ef9aa" branch_labels = None depends_on = None From 3ae5a00d495a213ccb17f9d2f910f87368e6b711 Mon Sep 17 00:00:00 2001 From: Daniel Bernstein Date: Wed, 4 Dec 2024 10:06:26 -0800 Subject: [PATCH 12/18] Fix a couple of bugs. --- src/palace/manager/api/monitor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/palace/manager/api/monitor.py b/src/palace/manager/api/monitor.py index ed112a6ee..3f9eed858 100644 --- a/src/palace/manager/api/monitor.py +++ b/src/palace/manager/api/monitor.py @@ -16,7 +16,7 @@ class LoanlikeReaperMonitor(ReaperMonitor): ] def __init__(self, *args, **kwargs): - super().__init__(args, kwargs) + super().__init__(*args, **kwargs) self._events_to_be_logged = [] @property @@ -59,7 +59,7 @@ def delete(self, row) -> None: patron=row.patron, ) super().delete(row) - self.events_to_be_logged.append(event) + self._events_to_be_logged.append(event) def after_commit(self) -> None: super().after_commit() From c7b68831039489f31deed73bd1107a150a063262 Mon Sep 17 00:00:00 2001 From: Daniel Bernstein Date: Mon, 9 Dec 2024 09:40:15 -0800 Subject: [PATCH 13/18] Fix mypy --- src/palace/manager/celery/tasks/opds_odl.py | 16 +++++++++------- tests/manager/celery/tasks/test_opds_odl.py | 6 ++++-- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/src/palace/manager/celery/tasks/opds_odl.py b/src/palace/manager/celery/tasks/opds_odl.py index cce437386..a05db97b7 100644 --- a/src/palace/manager/celery/tasks/opds_odl.py +++ b/src/palace/manager/celery/tasks/opds_odl.py @@ -21,7 +21,7 @@ def remove_expired_holds_for_collection( db: Session, collection_id: int, -) -> tuple[int, dict[str, Any]]: +) -> tuple[int, list[dict[str, Any]]]: """ Remove expired holds from the database for this collection. """ @@ -52,7 +52,7 @@ def remove_expired_holds_for_collection( ) expired_holds = db.scalars(select_query).all() - expired_hold_events: [dict[str, Any]] = [] + expired_hold_events: list[dict[str, Any]] = [] for hold in expired_holds: expired_hold_events.append( dict( @@ -80,7 +80,7 @@ def remove_expired_holds_for_collection( # a rowcount, but the sqlalchemy docs swear it will in the case of # a delete statement. # https://docs.sqlalchemy.org/en/20/tutorial/data_update.html#getting-affected-row-count-from-update-delete - return result.rowcount, expired_hold_events # type: ignore[attr-defined,no-any-return] + return result.rowcount, expired_hold_events # type: ignore def licensepool_ids_with_holds( @@ -119,7 +119,7 @@ def lock_licenses(license_pool: LicensePool) -> None: def recalculate_holds_for_licensepool( license_pool: LicensePool, reservation_period: datetime.timedelta, -) -> tuple[int, dict[str, Any]]: +) -> tuple[int, list[dict[str, Any]]]: # We take out row level locks on all the licenses and holds for this license pool, so that # everything is in a consistent state while we update the hold queue. This means we should be # quickly committing the transaction, to avoid contention or deadlocks. @@ -133,7 +133,7 @@ def recalculate_holds_for_licensepool( waiting = holds[reserved:] updated = 0 - events: [dict[str, Any]] = [] + events: list[dict[str, Any]] = [] # These holds have a copy reserved for them. for hold in ready: @@ -175,8 +175,10 @@ def remove_expired_holds_for_collection_task(task: Task, collection_id: int) -> session, collection_id, ) + + collection_name = None if not collection else collection.name task.log.info( - f"Removed {removed} expired holds for collection {collection.name} ({collection_id})." + f"Removed {removed} expired holds for collection {collection_name} ({collection_id})." ) # publish events only after successful commit @@ -198,7 +200,7 @@ def remove_expired_holds(task: Task) -> None: if collection.id is not None ] for collection_id, collection_name in collections: - remove_expired_holds_for_collection.delay(collection_id) + remove_expired_holds_for_collection_task.delay(collection_id) @shared_task(queue=QueueNames.default, bind=True) diff --git a/tests/manager/celery/tasks/test_opds_odl.py b/tests/manager/celery/tasks/test_opds_odl.py index 8c245cd96..9996a96f3 100644 --- a/tests/manager/celery/tasks/test_opds_odl.py +++ b/tests/manager/celery/tasks/test_opds_odl.py @@ -273,7 +273,7 @@ def test_recalculate_holds_for_licensepool( assert event["event_type"] == CirculationEvent.CM_HOLD_READY_FOR_CHECKOUT -def test_remove_expired_holds_for_collection( +def test_remove_expired_holds_for_collection_task( celery_fixture: CeleryFixture, db: DatabaseTransactionFixture, opds_task_fixture: OpdsTaskFixture, @@ -305,7 +305,9 @@ def test_remove_expired_holds( collection2 = db.collection(protocol=OPDS2WithODLApi) decoy_collection = db.collection(protocol=OverdriveAPI) - with patch.object(opds_odl, "remove_expired_holds_for_collection") as mock_remove: + with patch.object( + opds_odl, "remove_expired_holds_for_collection_task" + ) as mock_remove: remove_expired_holds.delay().wait() assert mock_remove.delay.call_count == 2 From 7b54daf9e998ce97d7b24b3c5478fa4a68a5e6ba Mon Sep 17 00:00:00 2001 From: Daniel Bernstein Date: Mon, 9 Dec 2024 10:04:48 -0800 Subject: [PATCH 14/18] Merge in mistakenly unmerged changes. --- src/palace/manager/sqlalchemy/model/patron.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/palace/manager/sqlalchemy/model/patron.py b/src/palace/manager/sqlalchemy/model/patron.py index 186809304..88c7f5912 100644 --- a/src/palace/manager/sqlalchemy/model/patron.py +++ b/src/palace/manager/sqlalchemy/model/patron.py @@ -571,7 +571,6 @@ class Hold(Base, LoanAndHoldMixin): """A patron is in line to check out a book.""" __tablename__ = "holds" -<<<<<<< HEAD id: Mapped[int] = Column(Integer, primary_key=True) patron_id: Mapped[int] = Column( Integer, ForeignKey("patrons.id"), index=True, nullable=False @@ -582,11 +581,6 @@ class Hold(Base, LoanAndHoldMixin): license_pool_id: Mapped[int] = Column( Integer, ForeignKey("licensepools.id"), index=True, nullable=False ) -======= - id = Column(Integer, primary_key=True) - patron_id = Column(Integer, ForeignKey("patrons.id"), index=True, nullable=False) - license_pool_id = Column(Integer, ForeignKey("licensepools.id"), index=True) ->>>>>>> 0e3659672 (Make loan.patron_id and hold.patron_id fields non-nullable.) license_pool: Mapped[LicensePool] = relationship( "LicensePool", back_populates="holds" ) From 96c620bf7fea2b9167c3452da09888759f361a15 Mon Sep 17 00:00:00 2001 From: Daniel Bernstein Date: Mon, 9 Dec 2024 10:22:36 -0800 Subject: [PATCH 15/18] Fix migration target. --- ...04_58b0ae7f5b67_make_loan_patron_id_and_hold_patron_id_.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/alembic/versions/20241204_58b0ae7f5b67_make_loan_patron_id_and_hold_patron_id_.py b/alembic/versions/20241204_58b0ae7f5b67_make_loan_patron_id_and_hold_patron_id_.py index 0b051e3d3..e2a08ac91 100644 --- a/alembic/versions/20241204_58b0ae7f5b67_make_loan_patron_id_and_hold_patron_id_.py +++ b/alembic/versions/20241204_58b0ae7f5b67_make_loan_patron_id_and_hold_patron_id_.py @@ -1,7 +1,7 @@ """Make Loan.patron_id and Hold.patron_id non-nullable. Revision ID: 58b0ae7f5b67 -Revises: c3458e1ef9aa +Revises: 8dde64eab209 Create Date: 2024-12-04 08:04:24.182444+00:00 """ @@ -10,7 +10,7 @@ # revision identifiers, used by Alembic. revision = "58b0ae7f5b67" -down_revision = "c3458e1ef9aa" +down_revision = "8dde64eab209" branch_labels = None depends_on = None From 55b14f4bcd2782d9d54e12c211b8a9c86d7d7aee Mon Sep 17 00:00:00 2001 From: Daniel Bernstein Date: Tue, 10 Dec 2024 14:28:13 -0800 Subject: [PATCH 16/18] Remove unnecessary migration. --- ...make_loan_patron_id_and_hold_patron_id_.py | 43 ------------------- 1 file changed, 43 deletions(-) delete mode 100644 alembic/versions/20241204_58b0ae7f5b67_make_loan_patron_id_and_hold_patron_id_.py diff --git a/alembic/versions/20241204_58b0ae7f5b67_make_loan_patron_id_and_hold_patron_id_.py b/alembic/versions/20241204_58b0ae7f5b67_make_loan_patron_id_and_hold_patron_id_.py deleted file mode 100644 index e2a08ac91..000000000 --- a/alembic/versions/20241204_58b0ae7f5b67_make_loan_patron_id_and_hold_patron_id_.py +++ /dev/null @@ -1,43 +0,0 @@ -"""Make Loan.patron_id and Hold.patron_id non-nullable. - -Revision ID: 58b0ae7f5b67 -Revises: 8dde64eab209 -Create Date: 2024-12-04 08:04:24.182444+00:00 - -""" - -from alembic import op - -# revision identifiers, used by Alembic. -revision = "58b0ae7f5b67" -down_revision = "8dde64eab209" -branch_labels = None -depends_on = None - - -def upgrade() -> None: - op.alter_column( - table_name="loans", - column_name="patron_id", - nullable=False, - ) - - op.alter_column( - table_name="holds", - column_name="patron_id", - nullable=False, - ) - - -def downgrade() -> None: - op.alter_column( - table_name="loans", - column_name="patron_id", - nullable=True, - ) - - op.alter_column( - table_name="holds", - column_name="patron_id", - nullable=True, - ) From 5c942dd5d4133841bff0cf9e33a2e741eca1276d Mon Sep 17 00:00:00 2001 From: Daniel Bernstein Date: Tue, 10 Dec 2024 16:05:44 -0800 Subject: [PATCH 17/18] Ensure that event data is attached to a database session when passed to analytics collector. --- src/palace/manager/celery/tasks/opds_odl.py | 89 +++++++++++---------- tests/manager/celery/tasks/test_opds_odl.py | 9 +-- 2 files changed, 50 insertions(+), 48 deletions(-) diff --git a/src/palace/manager/celery/tasks/opds_odl.py b/src/palace/manager/celery/tasks/opds_odl.py index a05db97b7..49e60050e 100644 --- a/src/palace/manager/celery/tasks/opds_odl.py +++ b/src/palace/manager/celery/tasks/opds_odl.py @@ -1,4 +1,5 @@ import datetime +from dataclasses import dataclass from typing import Any from celery import shared_task @@ -8,42 +9,34 @@ from palace.manager.api.odl.api import OPDS2WithODLApi from palace.manager.celery.task import Task +from palace.manager.service.analytics.analytics import Analytics from palace.manager.service.celery.celery import QueueNames from palace.manager.service.redis.models.lock import RedisLock from palace.manager.service.redis.redis import Redis from palace.manager.sqlalchemy.model.circulationevent import CirculationEvent from palace.manager.sqlalchemy.model.collection import Collection +from palace.manager.sqlalchemy.model.library import Library from palace.manager.sqlalchemy.model.licensing import License, LicensePool -from palace.manager.sqlalchemy.model.patron import Hold +from palace.manager.sqlalchemy.model.patron import Hold, Patron from palace.manager.util.datetime_helpers import utc_now +@dataclass +class CirculationEventData: + library: Library + license_pool: LicensePool + event_type: str + patron: Patron + + def remove_expired_holds_for_collection( db: Session, collection_id: int, -) -> tuple[int, list[dict[str, Any]]]: +) -> list[CirculationEventData]: """ Remove expired holds from the database for this collection. """ - # generate expiration events for expired holds before deleting them - # lock rows - lock_query = ( - select(Hold.id) - .where( - Hold.position == 0, - Hold.end < utc_now(), - Hold.license_pool_id == LicensePool.id, - LicensePool.collection_id == collection_id, - ) - .with_for_update() - ) - - db.execute(lock_query).all() - - # a separate query is required to get around the - # "FOR UPDATE cannot be applied to the nullable side of an outer join" issue when trying to use with_for_update - # on the Hold object. select_query = select(Hold).where( Hold.position == 0, Hold.end < utc_now(), @@ -55,7 +48,7 @@ def remove_expired_holds_for_collection( expired_hold_events: list[dict[str, Any]] = [] for hold in expired_holds: expired_hold_events.append( - dict( + CirculationEventData( library=hold.library, license_pool=hold.license_pool, event_type=CirculationEvent.CM_HOLD_EXPIRED, @@ -66,21 +59,13 @@ def remove_expired_holds_for_collection( # delete the holds query = ( delete(Hold) - .where( - Hold.position == 0, - Hold.end < utc_now(), - Hold.license_pool_id == LicensePool.id, - LicensePool.collection_id == collection_id, - ) + .where(Hold.id.in_(h.id for h in expired_holds)) .execution_options(synchronize_session="fetch") ) - result = db.execute(query) - # We need the type ignores here because result doesn't always have - # a rowcount, but the sqlalchemy docs swear it will in the case of - # a delete statement. - # https://docs.sqlalchemy.org/en/20/tutorial/data_update.html#getting-affected-row-count-from-update-delete - return result.rowcount, expired_hold_events # type: ignore + db.execute(query) + + return expired_hold_events def licensepool_ids_with_holds( @@ -119,7 +104,7 @@ def lock_licenses(license_pool: LicensePool) -> None: def recalculate_holds_for_licensepool( license_pool: LicensePool, reservation_period: datetime.timedelta, -) -> tuple[int, list[dict[str, Any]]]: +) -> tuple[int, list[CirculationEventData]]: # We take out row level locks on all the licenses and holds for this license pool, so that # everything is in a consistent state while we update the hold queue. This means we should be # quickly committing the transaction, to avoid contention or deadlocks. @@ -144,7 +129,7 @@ def recalculate_holds_for_licensepool( hold.end = utc_now() + reservation_period updated += 1 events.append( - dict( + CirculationEventData( library=hold.library, license_pool=hold.license_pool, event_type=CirculationEvent.CM_HOLD_READY_FOR_CHECKOUT, @@ -169,21 +154,20 @@ def remove_expired_holds_for_collection_task(task: Task, collection_id: int) -> A shared task for removing expired holds from the database for a collection """ analytics = task.services.analytics.analytics() + with task.transaction() as session: collection = Collection.by_id(session, collection_id) - removed, events = remove_expired_holds_for_collection( + events = remove_expired_holds_for_collection( session, collection_id, ) collection_name = None if not collection else collection.name task.log.info( - f"Removed {removed} expired holds for collection {collection_name} ({collection_id})." + f"Removed {len(events)} expired holds for collection {collection_name} ({collection_id})." ) - # publish events only after successful commit - for event in events: - analytics.collect_event(**event) + collect_events(task, events, analytics) @shared_task(queue=QueueNames.default, bind=True) @@ -225,6 +209,27 @@ def _redis_lock_recalculate_holds(client: Redis, collection_id: int) -> RedisLoc ) +def collect_events( + task: Task, events: list[CirculationEventData], analytics: Analytics +) -> None: + """ + Collect events after successful database is commit and any row locks are removed. + We perform this operation outside after completed the transaction to ensure that any row locks + are held for the shortest possible duration in case writing to the s3 analytics provider is slow. + """ + with task.session() as session: + for e in events: + session.refresh(e.library) + session.refresh(e.license_pool) + session.refresh(e.patron) + analytics.collect_event( + event_type=e.event_type, + library=e.library, + license_pool=e.license_pool, + patron=e.patron, + ) + + @shared_task(queue=QueueNames.default, bind=True) def recalculate_hold_queue_collection( task: Task, collection_id: int, batch_size: int = 100, after_id: int | None = None @@ -288,9 +293,7 @@ def recalculate_hold_queue_collection( f"{updated} holds out of date." ) - # fire events after successful database update - for event in events: - analytics.collect_event(**event) + collect_events(task, events, analytics) if len(license_pool_ids) == batch_size: # We are done this batch, but there is probably more work to do, we queue up the next batch. diff --git a/tests/manager/celery/tasks/test_opds_odl.py b/tests/manager/celery/tasks/test_opds_odl.py index 9996a96f3..829a1677d 100644 --- a/tests/manager/celery/tasks/test_opds_odl.py +++ b/tests/manager/celery/tasks/test_opds_odl.py @@ -152,7 +152,7 @@ def test_remove_expired_holds_for_collection( # Remove the expired holds assert collection.id is not None - removed, events = remove_expired_holds_for_collection( + events = remove_expired_holds_for_collection( db.session, collection.id, ) @@ -165,8 +165,6 @@ def test_remove_expired_holds_for_collection( assert decoy_non_expired_holds.issubset(current_holds) assert decoy_expired_holds.issubset(current_holds) - assert removed == 10 - pools_after = db.session.scalars( select(func.count()).select_from(LicensePool) ).one() @@ -177,7 +175,8 @@ def test_remove_expired_holds_for_collection( # verify that the correct analytics calls were made assert len(events) == 10 for event in events: - assert event["event_type"] == CirculationEvent.CM_HOLD_EXPIRED + assert event.event_type == CirculationEvent.CM_HOLD_EXPIRED + assert event.library == db.default_library() def test_licensepools_with_holds( @@ -270,7 +269,7 @@ def test_recalculate_holds_for_licensepool( # verify that the correct analytics events were returned assert len(events) == 3 for event in events: - assert event["event_type"] == CirculationEvent.CM_HOLD_READY_FOR_CHECKOUT + assert event.event_type == CirculationEvent.CM_HOLD_READY_FOR_CHECKOUT def test_remove_expired_holds_for_collection_task( From 94d8de07586b352a10ddc24e02ae8f231fcfa60e Mon Sep 17 00:00:00 2001 From: Daniel Bernstein Date: Wed, 11 Dec 2024 10:19:42 -0800 Subject: [PATCH 18/18] Fix mypy --- src/palace/manager/celery/tasks/opds_odl.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/palace/manager/celery/tasks/opds_odl.py b/src/palace/manager/celery/tasks/opds_odl.py index 49e60050e..03d8a1b45 100644 --- a/src/palace/manager/celery/tasks/opds_odl.py +++ b/src/palace/manager/celery/tasks/opds_odl.py @@ -1,6 +1,5 @@ import datetime from dataclasses import dataclass -from typing import Any from celery import shared_task from sqlalchemy import delete, select @@ -45,7 +44,7 @@ def remove_expired_holds_for_collection( ) expired_holds = db.scalars(select_query).all() - expired_hold_events: list[dict[str, Any]] = [] + expired_hold_events: list[CirculationEventData] = [] for hold in expired_holds: expired_hold_events.append( CirculationEventData( @@ -118,7 +117,7 @@ def recalculate_holds_for_licensepool( waiting = holds[reserved:] updated = 0 - events: list[dict[str, Any]] = [] + events: list[CirculationEventData] = [] # These holds have a copy reserved for them. for hold in ready: