Skip to content

Commit

Permalink
[PP-1358] add new events (#2174)
Browse files Browse the repository at this point in the history
* Add hold expired, loan expired, hold converted, and loan revoked events.
* Create separate tasks for reaping holds by collection.
* Ensure that transaction is closed before collecting analytics events.
* Ensure that event data is attached to a database session when passed to analytics collector.
  • Loading branch information
dbernstein authored Dec 12, 2024
1 parent c31677f commit 48260da
Show file tree
Hide file tree
Showing 10 changed files with 306 additions and 83 deletions.
13 changes: 13 additions & 0 deletions src/palace/manager/api/circulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1143,6 +1143,14 @@ def borrow(
)
if existing_hold:
# The book was on hold, and now we have a loan.
# collect cm event to commemorate the conversion:
self._collect_event(
patron=patron,
licensepool=licensepool,
name=CirculationEvent.CM_HOLD_CONVERTED_TO_LOAN,
include_neighborhood=True,
)

# Delete the record of the hold.
self._db.delete(existing_hold)
__transaction.commit()
Expand Down Expand Up @@ -1197,6 +1205,11 @@ def borrow(
self._collect_event(patron, licensepool, CirculationEvent.CM_HOLD_PLACE)

if existing_loan:
# Send out analytics event capturing the unusual circumstance that a loan was converted to a hold
# TODO: Do we know what the conditions under which this situation can occur?
self._collect_event(
patron, licensepool, CirculationEvent.CM_LOAN_CONVERTED_TO_HOLD
)
self._db.delete(existing_loan)
__transaction.commit()
return None, hold, is_new
Expand Down
24 changes: 24 additions & 0 deletions src/palace/manager/api/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from palace.manager.api.opds_for_distributors import OPDSForDistributorsAPI
from palace.manager.core.monitor import ReaperMonitor
from palace.manager.sqlalchemy.model.circulationevent import CirculationEvent
from palace.manager.sqlalchemy.model.collection import Collection
from palace.manager.sqlalchemy.model.integration import IntegrationConfiguration
from palace.manager.sqlalchemy.model.licensing import LicensePool
Expand All @@ -14,6 +15,10 @@ class LoanlikeReaperMonitor(ReaperMonitor):
OPDSForDistributorsAPI.label(),
]

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._events_to_be_logged = []

@property
def where_clause(self):
"""We never want to automatically reap loans or holds for situations
Expand Down Expand Up @@ -44,6 +49,25 @@ def where_clause(self):
)
return ~self.MODEL_CLASS.id.in_(source_of_truth_subquery)

def delete(self, row) -> None:
ce = CirculationEvent
event_type = ce.CM_LOAN_EXPIRED if isinstance(row, Loan) else ce.CM_HOLD_EXPIRED
event = dict(
library=row.library,
license_pool=row.license_pool,
event_type=event_type,
patron=row.patron,
)
super().delete(row)
self._events_to_be_logged.append(event)

def after_commit(self) -> None:
super().after_commit()
copy_of_list = list(self._events_to_be_logged)
for event in copy_of_list:
self.services.analytics.collect_event(**event)
self._events_to_be_logged.remove(event)


class LoanReaper(LoanlikeReaperMonitor):
"""Remove expired and abandoned loans from the database."""
Expand Down
131 changes: 107 additions & 24 deletions src/palace/manager/celery/tasks/opds_odl.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
from dataclasses import dataclass

from celery import shared_task
from sqlalchemy import delete, select
Expand All @@ -7,35 +8,63 @@

from palace.manager.api.odl.api import OPDS2WithODLApi
from palace.manager.celery.task import Task
from palace.manager.service.analytics.analytics import Analytics
from palace.manager.service.celery.celery import QueueNames
from palace.manager.service.redis.models.lock import RedisLock
from palace.manager.service.redis.redis import Redis
from palace.manager.sqlalchemy.model.circulationevent import CirculationEvent
from palace.manager.sqlalchemy.model.collection import Collection
from palace.manager.sqlalchemy.model.library import Library
from palace.manager.sqlalchemy.model.licensing import License, LicensePool
from palace.manager.sqlalchemy.model.patron import Hold
from palace.manager.sqlalchemy.model.patron import Hold, Patron
from palace.manager.util.datetime_helpers import utc_now


def remove_expired_holds_for_collection(db: Session, collection_id: int) -> int:
@dataclass
class CirculationEventData:
library: Library
license_pool: LicensePool
event_type: str
patron: Patron


def remove_expired_holds_for_collection(
db: Session,
collection_id: int,
) -> list[CirculationEventData]:
"""
Remove expired holds from the database for this collection.
"""

select_query = select(Hold).where(
Hold.position == 0,
Hold.end < utc_now(),
Hold.license_pool_id == LicensePool.id,
LicensePool.collection_id == collection_id,
)

expired_holds = db.scalars(select_query).all()
expired_hold_events: list[CirculationEventData] = []
for hold in expired_holds:
expired_hold_events.append(
CirculationEventData(
library=hold.library,
license_pool=hold.license_pool,
event_type=CirculationEvent.CM_HOLD_EXPIRED,
patron=hold.patron,
)
)

# delete the holds
query = (
delete(Hold)
.where(
Hold.position == 0,
Hold.end < utc_now(),
Hold.license_pool_id == LicensePool.id,
LicensePool.collection_id == collection_id,
)
.where(Hold.id.in_(h.id for h in expired_holds))
.execution_options(synchronize_session="fetch")
)
result = db.execute(query)
# We need the type ignores here because result doesn't always have
# a rowcount, but the sqlalchemy docs swear it will in the case of
# a delete statement.
# https://docs.sqlalchemy.org/en/20/tutorial/data_update.html#getting-affected-row-count-from-update-delete
return result.rowcount # type: ignore[attr-defined,no-any-return]

db.execute(query)

return expired_hold_events


def licensepool_ids_with_holds(
Expand Down Expand Up @@ -74,7 +103,7 @@ def lock_licenses(license_pool: LicensePool) -> None:
def recalculate_holds_for_licensepool(
license_pool: LicensePool,
reservation_period: datetime.timedelta,
) -> int:
) -> tuple[int, list[CirculationEventData]]:
# We take out row level locks on all the licenses and holds for this license pool, so that
# everything is in a consistent state while we update the hold queue. This means we should be
# quickly committing the transaction, to avoid contention or deadlocks.
Expand All @@ -88,6 +117,8 @@ def recalculate_holds_for_licensepool(
waiting = holds[reserved:]
updated = 0

events: list[CirculationEventData] = []

# These holds have a copy reserved for them.
for hold in ready:
# If this hold isn't already in position 0, the hold just became available.
Expand All @@ -96,6 +127,14 @@ def recalculate_holds_for_licensepool(
hold.position = 0
hold.end = utc_now() + reservation_period
updated += 1
events.append(
CirculationEventData(
library=hold.library,
license_pool=hold.license_pool,
event_type=CirculationEvent.CM_HOLD_READY_FOR_CHECKOUT,
patron=hold.patron,
)
)

# Update the position for the remaining holds.
for idx, hold in enumerate(waiting):
Expand All @@ -105,13 +144,35 @@ def recalculate_holds_for_licensepool(
hold.end = None
updated += 1

return updated
return updated, events


@shared_task(queue=QueueNames.default, bind=True)
def remove_expired_holds_for_collection_task(task: Task, collection_id: int) -> None:
"""
A shared task for removing expired holds from the database for a collection
"""
analytics = task.services.analytics.analytics()

with task.transaction() as session:
collection = Collection.by_id(session, collection_id)
events = remove_expired_holds_for_collection(
session,
collection_id,
)

collection_name = None if not collection else collection.name
task.log.info(
f"Removed {len(events)} expired holds for collection {collection_name} ({collection_id})."
)

collect_events(task, events, analytics)


@shared_task(queue=QueueNames.default, bind=True)
def remove_expired_holds(task: Task) -> None:
"""
Remove expired holds from the database.
Issue remove expired hold tasks for eligible collections
"""
registry = task.services.integration_registry.license_providers()
protocols = registry.get_protocols(OPDS2WithODLApi, default=False)
Expand All @@ -122,11 +183,7 @@ def remove_expired_holds(task: Task) -> None:
if collection.id is not None
]
for collection_id, collection_name in collections:
with task.transaction() as session:
removed = remove_expired_holds_for_collection(session, collection_id)
task.log.info(
f"Removed {removed} expired holds for collection {collection_name} ({collection_id})."
)
remove_expired_holds_for_collection_task.delay(collection_id)


@shared_task(queue=QueueNames.default, bind=True)
Expand All @@ -151,6 +208,27 @@ def _redis_lock_recalculate_holds(client: Redis, collection_id: int) -> RedisLoc
)


def collect_events(
task: Task, events: list[CirculationEventData], analytics: Analytics
) -> None:
"""
Collect events after successful database is commit and any row locks are removed.
We perform this operation outside after completed the transaction to ensure that any row locks
are held for the shortest possible duration in case writing to the s3 analytics provider is slow.
"""
with task.session() as session:
for e in events:
session.refresh(e.library)
session.refresh(e.license_pool)
session.refresh(e.patron)
analytics.collect_event(
event_type=e.event_type,
library=e.library,
license_pool=e.license_pool,
patron=e.patron,
)


@shared_task(queue=QueueNames.default, bind=True)
def recalculate_hold_queue_collection(
task: Task, collection_id: int, batch_size: int = 100, after_id: int | None = None
Expand All @@ -159,6 +237,7 @@ def recalculate_hold_queue_collection(
Recalculate the hold queue for a collection.
"""
lock = _redis_lock_recalculate_holds(task.services.redis.client(), collection_id)
analytics = task.services.analytics.analytics()
with lock.lock() as locked:
if not locked:
task.log.info(
Expand Down Expand Up @@ -200,8 +279,10 @@ def recalculate_hold_queue_collection(
f"Skipping license pool {license_pool_id} because it no longer exists."
)
continue
updated = recalculate_holds_for_licensepool(
license_pool, reservation_period

updated, events = recalculate_holds_for_licensepool(
license_pool,
reservation_period,
)
edition = license_pool.presentation_edition
title = edition.title if edition else None
Expand All @@ -211,6 +292,8 @@ def recalculate_hold_queue_collection(
f"{updated} holds out of date."
)

collect_events(task, events, analytics)

if len(license_pool_ids) == batch_size:
# We are done this batch, but there is probably more work to do, we queue up the next batch.
raise task.replace(
Expand Down
8 changes: 7 additions & 1 deletion src/palace/manager/core/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -885,10 +885,16 @@ def run_once(self, *args, **kwargs):
self.delete(i)
rows_deleted += 1
self._db.commit()

self.after_commit()

count = qu.count()
return TimestampData(achievements="Items deleted: %d" % rows_deleted)

def delete(self, row):
def after_commit(self) -> None:
return None

def delete(self, row) -> None:
"""Delete a row from the database.
CAUTION: If you override this method such that it doesn't
Expand Down
5 changes: 5 additions & 0 deletions src/palace/manager/sqlalchemy/model/circulationevent.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ class CirculationEvent(Base):
CM_CHECKIN = "circulation_manager_check_in"
CM_HOLD_PLACE = "circulation_manager_hold_place"
CM_HOLD_RELEASE = "circulation_manager_hold_release"
CM_HOLD_EXPIRED = "circulation_manager_hold_expired"
CM_HOLD_READY_FOR_CHECKOUT = "circulation_manager_hold_ready"
CM_LOAN_EXPIRED = "circulation_manager_loan_expired"
CM_HOLD_CONVERTED_TO_LOAN = "circulation_manager_hold_converted_to_loan"
CM_LOAN_CONVERTED_TO_HOLD = "circulation_manager_loan_converted_to_hold"
CM_FULFILL = "circulation_manager_fulfill"

# Events that we hear about from a distributor.
Expand Down
21 changes: 10 additions & 11 deletions tests/manager/api/test_authenticator.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ def test_apply_on_incomplete_information(self, db: DatabaseTransactionFixture):
)
authenticated_by_weird_identifier.apply(patron)
assert "1234" == patron.authorization_identifier
assert None == patron.last_external_sync
assert patron.last_external_sync is None

def test_get_or_create_patron(
self, patron_data: PatronData, db: DatabaseTransactionFixture
Expand All @@ -417,15 +417,15 @@ def test_get_or_create_patron(
patron, is_new = patron_data.get_or_create_patron(
db.session, default_library.id, analytics
)
assert "2" == patron.authorization_identifier
assert patron.authorization_identifier == "2"
assert default_library == patron.library
assert True == is_new
assert CirculationEvent.NEW_PATRON == analytics.event_type
assert 1 == analytics.count
assert is_new is True
assert analytics.last_event_type == CirculationEvent.NEW_PATRON
assert analytics.count == 1

# Patron.neighborhood was set, even though there is no
# value and that's not a database field.
assert None == patron.neighborhood
assert patron.neighborhood is None

# Set a neighborhood and try again.
patron_data.neighborhood = "Achewood"
Expand All @@ -435,15 +435,14 @@ def test_get_or_create_patron(
patron, is_new = patron_data.get_or_create_patron(
db.session, default_library.id, analytics
)
assert "2" == patron.authorization_identifier
assert False == is_new
assert "Achewood" == patron.neighborhood
assert 1 == analytics.count
assert patron.authorization_identifier == "2"
assert is_new is False
assert patron.neighborhood == "Achewood"
assert analytics.count == 1

def test_to_response_parameters(self, patron_data: PatronData):
params = patron_data.to_response_parameters
assert dict(name="4") == params

patron_data.personal_name = None
params = patron_data.to_response_parameters
assert dict() == params
Expand Down
Loading

0 comments on commit 48260da

Please sign in to comment.