Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Redis JSON extension from MARC file generation (PP-1839) #2143

Merged
merged 7 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""Add index ix_licensepools_collection_id_work_id

Revision ID: 3faa5bba3ddf
Revises: 1938277e993f
Create Date: 2024-10-29 15:29:56.588830+00:00

"""

from alembic import op

# revision identifiers, used by Alembic.
revision = "3faa5bba3ddf"
down_revision = "1938277e993f"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.create_index(
"ix_licensepools_collection_id_work_id",
"licensepools",
["collection_id", "work_id"],
unique=False,
)


def downgrade() -> None:
op.drop_index("ix_licensepools_collection_id_work_id", table_name="licensepools")
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ module = [
"palace.manager.api.metadata.*",
"palace.manager.api.odl.*",
"palace.manager.api.opds_for_distributors",
"palace.manager.core.marc",
"palace.manager.core.opds2_import",
"palace.manager.core.opds_import",
"palace.manager.core.selftest",
"palace.manager.feed.*",
"palace.manager.integration.*",
"palace.manager.marc.*",
"palace.manager.opds.*",
"palace.manager.scripts.initialization",
"palace.manager.scripts.rotate_jwe_key",
Expand Down
266 changes: 178 additions & 88 deletions src/palace/manager/celery/tasks/marc.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
import datetime
from contextlib import ExitStack
from tempfile import TemporaryFile
from typing import Any

from celery import shared_task
from pydantic import TypeAdapter

from palace.manager.celery.task import Task
from palace.manager.marc.exporter import LibraryInfo, MarcExporter
from palace.manager.marc.uploader import MarcUploadManager
from palace.manager.marc.uploader import MarcUploadManager, UploadContext
from palace.manager.service.celery.celery import QueueNames
from palace.manager.service.redis.models.marc import (
MarcFileUploadSession,
MarcFileUploadState,
)
from palace.manager.service.redis.models.lock import RedisLock
from palace.manager.service.redis.redis import Redis
from palace.manager.sqlalchemy.model.collection import Collection
from palace.manager.sqlalchemy.model.marcfile import MarcFile
from palace.manager.sqlalchemy.util import create
from palace.manager.util.datetime_helpers import utc_now


Expand All @@ -26,73 +30,84 @@ def marc_export(task: Task, force: bool = False) -> None:
start_time = utc_now()
collections = MarcExporter.enabled_collections(session, registry)
for collection in collections:
# Collection.id should never be able to be None here, but mypy doesn't know that.
# So we assert it for mypy's benefit.
assert collection.id is not None
upload_session = MarcFileUploadSession(
task.services.redis.client(), collection.id
libraries_info = MarcExporter.enabled_libraries(
session, registry, collection.id
)
with upload_session.lock() as acquired:
if not acquired:
task.log.info(
f"Skipping collection {collection.name} ({collection.id}) because another task holds its lock."
)
continue
needs_update = any(info.needs_update for info in libraries_info) or force

if (
upload_state := upload_session.state()
) != MarcFileUploadState.INITIAL:
task.log.info(
f"Skipping collection {collection.name} ({collection.id}) because it is already being "
f"processed (state: {upload_state})."
)
continue

libraries_info = MarcExporter.enabled_libraries(
session, registry, collection.id
if not needs_update:
task.log.info(
f"Skipping collection {collection.name} ({collection.id}) because it has been updated recently."
)
needs_update = (
any(info.needs_update for info in libraries_info) or force
continue

if not MarcExporter.query_works(
session,
collection.id,
batch_size=1,
):
task.log.info(
f"Skipping collection {collection.name} ({collection.id}) because it has no works."
)
continue

if not needs_update:
task.log.info(
f"Skipping collection {collection.name} ({collection.id}) because it has been updated recently."
)
continue
task.log.info(
f"Generating MARC records for collection {collection.name} ({collection.id})."
)

works = MarcExporter.query_works(
marc_export_collection.delay(
collection_id=collection.id,
collection_name=collection.name,
start_time=start_time,
libraries=[l.model_dump() for l in libraries_info],
)

needs_delta = [l.model_dump() for l in libraries_info if l.last_updated]
if needs_delta:
min_last_updated = min(
[l.last_updated for l in libraries_info if l.last_updated]
)
if not MarcExporter.query_works(
session,
collection.id,
work_id_offset=0,
batch_size=1,
)
if not works:
last_updated=min_last_updated,
):
task.log.info(
f"Skipping collection {collection.name} ({collection.id}) because it has no works."
f"Skipping delta for collection {collection.name} ({collection.id}) "
f"because no works have been updated."
)
else:
marc_export_collection.delay(
collection_id=collection.id,
collection_name=collection.name,
start_time=start_time,
libraries=needs_delta,
delta=True,
)
continue

task.log.info(
f"Generating MARC records for collection {collection.name} ({collection.id})."
)
upload_session.set_state(MarcFileUploadState.QUEUED)
marc_export_collection.delay(
collection_id=collection.id,
start_time=start_time,
libraries=[l.model_dump() for l in libraries_info],
)

def marc_export_collection_lock(
client: Redis, collection_id: int, delta: bool = False
) -> RedisLock:
return RedisLock(
client,
["MarcUpload", Collection.redis_key_from_id(collection_id), f"Delta::{delta}"],
lock_timeout=datetime.timedelta(minutes=20),
)


@shared_task(queue=QueueNames.default, bind=True)
def marc_export_collection(
task: Task,
collection_id: int,
collection_name: str,
start_time: datetime.datetime,
libraries: list[dict[str, Any]],
batch_size: int = 500,
context: dict[int, dict[str, Any]] | None = None,
last_work_id: int | None = None,
update_number: int = 0,
batch_size: int = 1000,
delta: bool = False,
) -> None:
"""
Export MARC records for a single collection.
Expand All @@ -104,64 +119,139 @@ def marc_export_collection(

base_url = task.services.config.sitewide.base_url()
storage_service = task.services.storage.public()
libraries_info = [LibraryInfo.model_validate(l) for l in libraries]
upload_manager = MarcUploadManager(
storage_service,
MarcFileUploadSession(
task.services.redis.client(), collection_id, update_number
),

# Parse data into pydantic models
libraries_info = TypeAdapter(list[LibraryInfo]).validate_python(libraries)
context_parsed = TypeAdapter(dict[int, UploadContext]).validate_python(
context or {}
)
with upload_manager.begin():
if not upload_manager.locked:

lock = marc_export_collection_lock(
task.services.redis.client(), collection_id, delta
)

with lock.lock() as locked:
if not locked:
task.log.info(
f"Skipping collection {collection_id} because another task is already processing it."
)
return

with task.session() as session:
works = MarcExporter.query_works(
session,
collection_id,
work_id_offset=last_work_id,
batch_size=batch_size,
)
for work in works:
MarcExporter.process_work(
work, libraries_info, base_url, upload_manager=upload_manager
with ExitStack() as stack, task.transaction() as session:
files = {
library: stack.enter_context(TemporaryFile())
for library in libraries_info
}
uploads: dict[LibraryInfo, MarcUploadManager] = {
library: stack.enter_context(
MarcUploadManager(
storage_service,
collection_name,
library.library_short_name,
start_time,
library.last_updated if delta else None,
context_parsed.get(library.library_id),
)
)
for library in libraries_info
}

# Sync the upload_manager to ensure that all the data is written to storage.
upload_manager.sync()
min_last_updated = (
min([l.last_updated for l in libraries_info if l.last_updated])
if delta
else None
)

if len(works) != batch_size:
# We have finished generating MARC records. Cleanup and exit.
with task.transaction() as session:
collection = MarcExporter.collection(session, collection_id)
collection_name = collection.name if collection else "unknown"
completed_uploads = upload_manager.complete()
MarcExporter.create_marc_upload_records(
no_more_works = False
while not all(
[
file.tell() > storage_service.MINIMUM_MULTIPART_UPLOAD_SIZE
for file in files.values()
]
):
works = MarcExporter.query_works(
session,
start_time,
collection_id,
libraries_info,
completed_uploads,
batch_size=batch_size,
work_id_offset=last_work_id,
last_updated=min_last_updated,
)
upload_manager.remove_session()
task.log.info(
f"Finished generating MARC records for collection '{collection_name}' ({collection_id})."
)
return
if not works:
no_more_works = True
break

# Set this for the next iteration
last_work_id = works[-1].id

works_with_pools = [
(work, pool)
for work in works
if (pool := work.active_license_pool()) is not None
]

# Find ISBN for any work that needs it
isbns = MarcExporter.query_isbn_identifiers(
session,
{pool.identifier for work, pool in works_with_pools},
)

for work, pool in works_with_pools:
isbn_identifier = isbns.get(pool.identifier)
records = MarcExporter.process_work(
work, pool, isbn_identifier, libraries_info, base_url, delta
)
for library, record in records.items():
files[library].write(record)

# Upload part to s3, if there is anything to upload
for library, tmp_file in files.items():
upload = uploads[library]
if not upload.upload_part(tmp_file):
task.log.warning(
f"No data to upload to s3 '{upload.context.s3_key}'."
)

if no_more_works:
# Task is complete. Finalize the s3 uploads and create MarcFile records in DB.
for library, upload in uploads.items():
if upload.complete():
create(
session,
MarcFile,
id=upload.context.upload_uuid,
library_id=library.library_id,
collection_id=collection_id,
created=start_time,
key=upload.context.s3_key,
since=library.last_updated if delta else None,
)
task.log.info(f"Completed upload for '{upload.context.s3_key}'")
else:
task.log.warning(
f"No upload for '{upload.context.s3_key}', "
f"because there were no records."
)

task.log.info(
f"Finished generating MARC records for collection '{collection_name}' ({collection_id}) "
f"in {(utc_now() - start_time).seconds} seconds."
)
return

# This task is complete, but there are more works waiting to be exported. So we requeue ourselves
# to process the next batch.
raise task.replace(
marc_export_collection.s(
collection_id=collection_id,
collection_name=collection_name,
start_time=start_time,
libraries=[l.model_dump() for l in libraries_info],
context={
l.library_id: uploads[l].context.model_dump() for l in libraries_info
},
last_work_id=last_work_id,
batch_size=batch_size,
last_work_id=works[-1].id,
update_number=upload_manager.update_number,
delta=delta,
)
)

Expand Down
Loading