Skip to content

Commit

Permalink
Removed Redis JSON from MARC generation
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathangreen committed Oct 29, 2024
1 parent 853bcda commit 2955774
Show file tree
Hide file tree
Showing 18 changed files with 819 additions and 2,095 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ module = [
"palace.manager.api.metadata.*",
"palace.manager.api.odl.*",
"palace.manager.api.opds_for_distributors",
"palace.manager.core.marc",
"palace.manager.core.opds2_import",
"palace.manager.core.opds_import",
"palace.manager.core.selftest",
"palace.manager.feed.*",
"palace.manager.integration.*",
"palace.manager.marc.*",
"palace.manager.opds.*",
"palace.manager.scripts.initialization",
"palace.manager.scripts.rotate_jwe_key",
Expand Down
254 changes: 165 additions & 89 deletions src/palace/manager/celery/tasks/marc.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
import datetime
from contextlib import ExitStack
from tempfile import TemporaryFile
from typing import Any

from celery import shared_task
from pydantic import TypeAdapter

from palace.manager.celery.task import Task
from palace.manager.marc.exporter import LibraryInfo, MarcExporter
from palace.manager.marc.uploader import MarcUploadManager
from palace.manager.marc.uploader import MarcUploadManager, UploadContext
from palace.manager.service.celery.celery import QueueNames
from palace.manager.service.redis.models.marc import (
MarcFileUploadSession,
MarcFileUploadState,
)
from palace.manager.service.redis.models.lock import RedisLock
from palace.manager.service.redis.redis import Redis
from palace.manager.sqlalchemy.model.collection import Collection
from palace.manager.sqlalchemy.model.marcfile import MarcFile
from palace.manager.sqlalchemy.util import create
from palace.manager.util.datetime_helpers import utc_now


Expand All @@ -26,73 +30,70 @@ def marc_export(task: Task, force: bool = False) -> None:
start_time = utc_now()
collections = MarcExporter.enabled_collections(session, registry)
for collection in collections:
# Collection.id should never be able to be None here, but mypy doesn't know that.
# So we assert it for mypy's benefit.
assert collection.id is not None
upload_session = MarcFileUploadSession(
task.services.redis.client(), collection.id
libraries_info = MarcExporter.enabled_libraries(
session, registry, collection.id
)
with upload_session.lock() as acquired:
if not acquired:
task.log.info(
f"Skipping collection {collection.name} ({collection.id}) because another task holds its lock."
)
continue

if (
upload_state := upload_session.state()
) != MarcFileUploadState.INITIAL:
task.log.info(
f"Skipping collection {collection.name} ({collection.id}) because it is already being "
f"processed (state: {upload_state})."
)
continue
needs_update = any(info.needs_update for info in libraries_info) or force

libraries_info = MarcExporter.enabled_libraries(
session, registry, collection.id
if not needs_update:
task.log.info(
f"Skipping collection {collection.name} ({collection.id}) because it has been updated recently."
)
needs_update = (
any(info.needs_update for info in libraries_info) or force
continue

if not MarcExporter.query_works(
session,
collection.id,
batch_size=1,
):
task.log.info(
f"Skipping collection {collection.name} ({collection.id}) because it has no works."
)
continue

if not needs_update:
task.log.info(
f"Skipping collection {collection.name} ({collection.id}) because it has been updated recently."
)
continue
task.log.info(
f"Generating MARC records for collection {collection.name} ({collection.id})."
)

works = MarcExporter.query_works(
session,
collection.id,
work_id_offset=0,
batch_size=1,
)
if not works:
task.log.info(
f"Skipping collection {collection.name} ({collection.id}) because it has no works."
)
continue
marc_export_collection.delay(
collection_id=collection.id,
collection_name=collection.name,
start_time=start_time,
libraries=[l.model_dump() for l in libraries_info],
)

task.log.info(
f"Generating MARC records for collection {collection.name} ({collection.id})."
)
upload_session.set_state(MarcFileUploadState.QUEUED)
needs_delta = [l.model_dump() for l in libraries_info if l.last_updated]
if needs_delta:
marc_export_collection.delay(
collection_id=collection.id,
collection_name=collection.name,
start_time=start_time,
libraries=[l.model_dump() for l in libraries_info],
libraries=needs_delta,
delta=True,
)


def marc_export_collection_lock(
client: Redis, collection_id: int, delta: bool = False
) -> RedisLock:
return RedisLock(
client,
["MarcUpload", Collection.redis_key_from_id(collection_id), f"Delta::{delta}"],
lock_timeout=datetime.timedelta(minutes=20),
)


@shared_task(queue=QueueNames.default, bind=True)
def marc_export_collection(
task: Task,
collection_id: int,
collection_name: str,
start_time: datetime.datetime,
libraries: list[dict[str, Any]],
batch_size: int = 500,
context: dict[int, dict[str, Any]] | None = None,
last_work_id: int | None = None,
update_number: int = 0,
batch_size: int = 1000,
delta: bool = False,
) -> None:
"""
Export MARC records for a single collection.
Expand All @@ -104,64 +105,139 @@ def marc_export_collection(

base_url = task.services.config.sitewide.base_url()
storage_service = task.services.storage.public()
libraries_info = [LibraryInfo.model_validate(l) for l in libraries]
upload_manager = MarcUploadManager(
storage_service,
MarcFileUploadSession(
task.services.redis.client(), collection_id, update_number
),

# Parse data into pydantic models
libraries_info = TypeAdapter(list[LibraryInfo]).validate_python(libraries)
context_parsed = TypeAdapter(dict[int, UploadContext]).validate_python(
context or {}
)

lock = marc_export_collection_lock(
task.services.redis.client(), collection_id, delta
)
with upload_manager.begin():
if not upload_manager.locked:

with lock.lock() as locked:
if not locked:
task.log.info(
f"Skipping collection {collection_id} because another task is already processing it."
)
return

with task.session() as session:
works = MarcExporter.query_works(
session,
collection_id,
work_id_offset=last_work_id,
batch_size=batch_size,
)
for work in works:
MarcExporter.process_work(
work, libraries_info, base_url, upload_manager=upload_manager
with ExitStack() as stack, task.transaction() as session:
files = {
library: stack.enter_context(TemporaryFile())
for library in libraries_info
}
uploads: dict[LibraryInfo, MarcUploadManager] = {
library: stack.enter_context(
MarcUploadManager(
storage_service,
collection_name,
library.library_short_name,
start_time,
library.last_updated if delta else None,
context_parsed.get(library.library_id),
)
)
for library in libraries_info
}

# Sync the upload_manager to ensure that all the data is written to storage.
upload_manager.sync()
min_last_updated = (
min([l.last_updated for l in libraries_info if l.last_updated])
if delta
else None
)

if len(works) != batch_size:
# We have finished generating MARC records. Cleanup and exit.
with task.transaction() as session:
collection = MarcExporter.collection(session, collection_id)
collection_name = collection.name if collection else "unknown"
completed_uploads = upload_manager.complete()
MarcExporter.create_marc_upload_records(
no_more_works = False
while not all(
[
file.tell() > storage_service.MINIMUM_MULTIPART_UPLOAD_SIZE
for file in files.values()
]
):
works = MarcExporter.query_works(
session,
start_time,
collection_id,
libraries_info,
completed_uploads,
batch_size=batch_size,
work_id_offset=last_work_id,
last_updated=min_last_updated,
)
upload_manager.remove_session()
task.log.info(
f"Finished generating MARC records for collection '{collection_name}' ({collection_id})."
)
return
if not works:
no_more_works = True
break

# Set this for the next iteration
last_work_id = works[-1].id

works_with_pools = [
(work, pool)
for work in works
if (pool := work.active_license_pool()) is not None
]

# Find ISBN for any work that needs it
isbns = MarcExporter.query_isbn_identifiers(
session,
{pool.identifier for work, pool in works_with_pools},
)

for work, pool in works_with_pools:
isbn_identifier = isbns.get(pool.identifier)
records = MarcExporter.process_work(
work, pool, isbn_identifier, libraries_info, base_url, delta
)
for library, record in records.items():
files[library].write(record)

# Upload part to s3, if there is anything to upload
for library, tmp_file in files.items():
upload = uploads[library]
if not upload.upload_part(tmp_file):
task.log.warning(
f"No data to upload to s3 '{upload.context.s3_key}'."
)

if no_more_works:
# Task is complete. Finalize the s3 uploads and create MarcFile records in DB.
for library, upload in uploads.items():
if upload.complete():
create(
session,
MarcFile,
id=upload.context.upload_uuid,
library_id=library.library_id,
collection_id=collection_id,
created=start_time,
key=upload.context.s3_key,
since=library.last_updated if delta else None,
)
task.log.info(f"Completed upload for '{upload.context.s3_key}'")
else:
task.log.warning(
f"No upload for '{upload.context.s3_key}', "
f"because there were no records."
)

task.log.info(
f"Finished generating MARC records for collection '{collection_name}' ({collection_id}) "
f"in {(utc_now() - start_time).seconds} seconds."
)
return

# This task is complete, but there are more works waiting to be exported. So we requeue ourselves
# to process the next batch.
raise task.replace(
marc_export_collection.s(
collection_id=collection_id,
collection_name=collection_name,
start_time=start_time,
libraries=[l.model_dump() for l in libraries_info],
context={
l.library_id: uploads[l].context.model_dump() for l in libraries_info
},
last_work_id=last_work_id,
batch_size=batch_size,
last_work_id=works[-1].id,
update_number=upload_manager.update_number,
delta=delta,
)
)

Expand Down
Loading

0 comments on commit 2955774

Please sign in to comment.