diff --git a/src/palace/manager/celery/tasks/marc.py b/src/palace/manager/celery/tasks/marc.py index 2d528ca10..3462e39dd 100644 --- a/src/palace/manager/celery/tasks/marc.py +++ b/src/palace/manager/celery/tasks/marc.py @@ -3,7 +3,6 @@ from tempfile import TemporaryFile from typing import Any -import pyinstrument from celery import shared_task from pydantic import TypeAdapter @@ -124,7 +123,7 @@ def marc_export_collection( ) return - with ExitStack() as stack, task.session() as session, pyinstrument.profile(): + with ExitStack() as stack, task.transaction() as session: files = { library: stack.enter_context(TemporaryFile()) for library in libraries_info @@ -192,7 +191,11 @@ def marc_export_collection( # Upload part to s3, if there is anything to upload for library, tmp_file in files.items(): - uploads[library].upload_part(tmp_file) + upload = uploads[library] + if not upload.upload_part(tmp_file): + task.log.warning( + f"No data to upload to s3 '{upload.context.s3_key}'." + ) if no_more_works: # Task is complete. Finalize the s3 uploads and create MarcFile records in DB. @@ -208,6 +211,12 @@ def marc_export_collection( key=upload.context.s3_key, since=library.last_updated if delta else None, ) + task.log.info(f"Completed upload for '{upload.context.s3_key}'") + else: + task.log.warning( + f"No upload for '{upload.context.s3_key}', " + f"because there were no records." + ) task.log.info( f"Finished generating MARC records for collection '{collection_name}' ({collection_id}) " diff --git a/src/palace/manager/marc/uploader.py b/src/palace/manager/marc/uploader.py index 5218304d3..f6a19e0c7 100644 --- a/src/palace/manager/marc/uploader.py +++ b/src/palace/manager/marc/uploader.py @@ -16,7 +16,7 @@ class UploadContext(BaseModel): upload_uuid: uuid.UUID s3_key: str - upload_id: str + upload_id: str | None = None parts: list[MultipartS3UploadPart] = [] @@ -51,11 +51,9 @@ def __init__( upload_uuid, since_time, ) - upload_id = self.storage_service.multipart_create( - s3_key, content_type=Representation.MARC_MEDIA_TYPE - ) context = UploadContext( - upload_uuid=upload_uuid, s3_key=s3_key, upload_id=upload_id + upload_uuid=upload_uuid, + s3_key=s3_key, ) self.context = context @@ -114,17 +112,34 @@ def __exit__( self._in_context_manager = False return False + def begin_upload(self) -> str: + upload_id = self.storage_service.multipart_create( + self.context.s3_key, content_type=Representation.MARC_MEDIA_TYPE + ) + self.context.upload_id = upload_id + return upload_id + def upload_part(self, data: IO[bytes] | bytes) -> bool: if self._finalized: raise MarcUploadException("Upload is already finalized.") - length = len(data) if isinstance(data, bytes) else data.tell() + if isinstance(data, bytes): + length = len(data) + else: + length = data.tell() + data.seek(0) + if length == 0: return False + if self.context.upload_id is None: + upload_id = self.begin_upload() + else: + upload_id = self.context.upload_id + part_number = len(self.context.parts) + 1 upload_part = self.storage_service.multipart_upload( - self.context.s3_key, self.context.upload_id, part_number, data + self.context.s3_key, upload_id, part_number, data ) self.context.parts.append(upload_part) return True @@ -133,7 +148,7 @@ def complete(self) -> bool: if self._finalized: raise MarcUploadException("Upload is already finalized.") - if not self.context.parts: + if self.context.upload_id is None or not self.context.parts: self.abort() return False @@ -147,6 +162,10 @@ def abort(self) -> None: if self._finalized: return + if self.context.upload_id is None: + self._finalized = True + return + self.storage_service.multipart_abort( self.context.s3_key, self.context.upload_id ) diff --git a/tests/fixtures/marc.py b/tests/fixtures/marc.py index 8d2bea910..ca035a9c9 100644 --- a/tests/fixtures/marc.py +++ b/tests/fixtures/marc.py @@ -42,8 +42,6 @@ def __init__( self.collection2.libraries = [self.library1] self.collection3.libraries = [self.library2] - self.test_marc_file_key = "test-file-1.mrc" - def integration(self) -> IntegrationConfiguration: return self._db.integration_configuration( MarcExporter, Goals.CATALOG_GOAL, name="MARC Exporter" @@ -54,13 +52,13 @@ def work(self, collection: Collection | None = None) -> Work: edition = self._db.edition() self._db.licensepool(edition, collection=collection) work = self._db.work(presentation_edition=edition) - work.last_update_time = utc_now() + work.last_update_time = utc_now() - datetime.timedelta(days=1) return work def works(self, collection: Collection | None = None) -> list[Work]: return [self.work(collection) for _ in range(5)] - def configure_export(self, *, marc_file: bool = True) -> None: + def configure_export(self) -> None: marc_integration = self.integration() self._db.integration_library_configuration( marc_integration, @@ -77,12 +75,6 @@ def configure_export(self, *, marc_file: bool = True) -> None: self.collection2.export_marc_records = True self.collection3.export_marc_records = True - if marc_file: - self.marc_file( - key=self.test_marc_file_key, - created=utc_now() - datetime.timedelta(days=7), - ) - def enabled_libraries( self, collection: Collection | None = None ) -> Sequence[LibraryInfo]: diff --git a/tests/fixtures/s3.py b/tests/fixtures/s3.py index f5b3e2a6d..b25d2435b 100644 --- a/tests/fixtures/s3.py +++ b/tests/fixtures/s3.py @@ -81,7 +81,6 @@ def __init__( if isinstance(content, bytes): self.content = content else: - content.seek(0) self.content = content.read() diff --git a/tests/manager/celery/tasks/test_marc.py b/tests/manager/celery/tasks/test_marc.py index 54e9d2fee..f3ccc40ee 100644 --- a/tests/manager/celery/tasks/test_marc.py +++ b/tests/manager/celery/tasks/test_marc.py @@ -1,346 +1,369 @@ -# import datetime -# from typing import Any -# from unittest.mock import ANY, call, patch -# -# import pytest -# from pymarc import MARCReader -# from sqlalchemy import select -# -# from palace.manager.celery.tasks import marc -# from palace.manager.marc.exporter import MarcExporter -# from palace.manager.marc.uploader import MarcUploadManager -# from palace.manager.service.logging.configuration import LogLevel -# from palace.manager.sqlalchemy.model.collection import Collection -# from palace.manager.sqlalchemy.model.marcfile import MarcFile -# from palace.manager.sqlalchemy.model.work import Work -# from palace.manager.util.datetime_helpers import utc_now -# from tests.fixtures.celery import CeleryFixture -# from tests.fixtures.database import DatabaseTransactionFixture -# from tests.fixtures.marc import MarcExporterFixture -# from tests.fixtures.redis import RedisFixture -# from tests.fixtures.s3 import S3ServiceFixture, S3ServiceIntegrationFixture -# from tests.fixtures.services import ServicesFixture -# -# -# class TestMarcExport: -# def test_no_works( -# self, -# db: DatabaseTransactionFixture, -# redis_fixture: RedisFixture, -# marc_exporter_fixture: MarcExporterFixture, -# celery_fixture: CeleryFixture, -# ): -# marc_exporter_fixture.configure_export() -# with patch.object(marc, "marc_export_collection") as marc_export_collection: -# # Because none of the collections have works, we should skip all of them. -# marc.marc_export.delay().wait() -# marc_export_collection.delay.assert_not_called() -# -# def test_normal_run( -# self, -# db: DatabaseTransactionFixture, -# redis_fixture: RedisFixture, -# marc_exporter_fixture: MarcExporterFixture, -# celery_fixture: CeleryFixture, -# ): -# marc_exporter_fixture.configure_export() -# with patch.object(marc, "marc_export_collection") as marc_export_collection: -# # Runs against all the expected collections -# collections = [ -# marc_exporter_fixture.collection1, -# marc_exporter_fixture.collection2, -# marc_exporter_fixture.collection3, -# ] -# for collection in collections: -# marc_exporter_fixture.work(collection) -# marc.marc_export.delay().wait() -# marc_export_collection.delay.assert_has_calls( -# [ -# call( -# collection_id=collection.id, -# collection_name=collection.name, -# start_time=ANY, -# libraries=ANY, -# ) -# for collection in collections -# ], -# any_order=True, -# ) -# -# def test_skip_collections( -# self, -# db: DatabaseTransactionFixture, -# redis_fixture: RedisFixture, -# marc_exporter_fixture: MarcExporterFixture, -# celery_fixture: CeleryFixture, -# ): -# marc_exporter_fixture.configure_export() -# collections = [ -# marc_exporter_fixture.collection1, -# marc_exporter_fixture.collection2, -# marc_exporter_fixture.collection3, -# ] -# for collection in collections: -# marc_exporter_fixture.work(collection) -# with patch.object(marc, "marc_export_collection") as marc_export_collection: -# # Collection 1 should be skipped because it is locked -# assert marc_exporter_fixture.collection1.id is not None -# MarcFileUploadSession( -# redis_fixture.client, marc_exporter_fixture.collection1.id -# ).acquire() -# -# # Collection 2 should be skipped because it was updated recently -# marc_exporter_fixture.marc_file( -# collection=marc_exporter_fixture.collection2 -# ) -# -# # Collection 3 should be skipped because its state is not INITIAL -# assert marc_exporter_fixture.collection3.id is not None -# upload_session = MarcFileUploadSession( -# redis_fixture.client, marc_exporter_fixture.collection3.id -# ) -# with upload_session.lock() as acquired: -# assert acquired -# upload_session.set_state(MarcFileUploadState.QUEUED) -# -# marc.marc_export.delay().wait() -# marc_export_collection.delay.assert_not_called() -# -# -# class MarcExportCollectionFixture: -# def __init__( -# self, -# db: DatabaseTransactionFixture, -# celery_fixture: CeleryFixture, -# redis_fixture: RedisFixture, -# marc_exporter_fixture: MarcExporterFixture, -# s3_service_integration_fixture: S3ServiceIntegrationFixture, -# s3_service_fixture: S3ServiceFixture, -# services_fixture: ServicesFixture, -# ): -# self.db = db -# self.celery_fixture = celery_fixture -# self.redis_fixture = redis_fixture -# self.marc_exporter_fixture = marc_exporter_fixture -# self.s3_service_integration_fixture = s3_service_integration_fixture -# self.s3_service_fixture = s3_service_fixture -# self.services_fixture = services_fixture -# -# self.mock_s3 = self.s3_service_fixture.mock_service() -# self.mock_s3.MINIMUM_MULTIPART_UPLOAD_SIZE = 10 -# marc_exporter_fixture.configure_export() -# -# self.start_time = utc_now() -# -# def marc_files(self) -> list[MarcFile]: -# # We need to ignore the test-file-1.mrc file, which is created by our call to configure_export. -# return [ -# f -# for f in self.db.session.execute(select(MarcFile)).scalars().all() -# if f.key != self.marc_exporter_fixture.test_marc_file_key -# ] -# -# def redis_data(self, collection: Collection) -> dict[str, Any] | None: -# assert collection.id is not None -# uploads = MarcFileUploadSession(self.redis_fixture.client, collection.id) -# return self.redis_fixture.client.json().get(uploads.key) -# -# def setup_minio_storage(self) -> None: -# self.services_fixture.services.storage.override( -# self.s3_service_integration_fixture.container -# ) -# -# def setup_mock_storage(self) -> None: -# self.services_fixture.services.storage.public.override(self.mock_s3) -# -# def works(self, collection: Collection) -> list[Work]: -# return [self.marc_exporter_fixture.work(collection) for _ in range(15)] -# -# def export_collection(self, collection: Collection) -> None: -# service = self.services_fixture.services.integration_registry.catalog_services() -# assert collection.id is not None -# info = MarcExporter.enabled_libraries(self.db.session, service, collection.id) -# libraries = [l.model_dump() for l in info] -# marc.marc_export_collection.delay( -# collection.id, -# collection_name=collection.name, -# batch_size=5, -# start_time=self.start_time, -# libraries=libraries, -# ).wait() -# -# -# @pytest.fixture -# def marc_export_collection_fixture( -# db: DatabaseTransactionFixture, -# celery_fixture: CeleryFixture, -# redis_fixture: RedisFixture, -# marc_exporter_fixture: MarcExporterFixture, -# s3_service_integration_fixture: S3ServiceIntegrationFixture, -# s3_service_fixture: S3ServiceFixture, -# services_fixture: ServicesFixture, -# ) -> MarcExportCollectionFixture: -# return MarcExportCollectionFixture( -# db, -# celery_fixture, -# redis_fixture, -# marc_exporter_fixture, -# s3_service_integration_fixture, -# s3_service_fixture, -# services_fixture, -# ) -# -# -# class TestMarcExportCollection: -# def test_normal_run( -# self, -# s3_service_integration_fixture: S3ServiceIntegrationFixture, -# marc_exporter_fixture: MarcExporterFixture, -# marc_export_collection_fixture: MarcExportCollectionFixture, -# ): -# marc_export_collection_fixture.setup_minio_storage() -# collection = marc_exporter_fixture.collection1 -# work_uris = [ -# work.license_pools[0].identifier.urn -# for work in marc_export_collection_fixture.works(collection) -# ] -# -# # Run the full end-to-end process for exporting a collection, this should generate -# # 3 batches of 5 works each, putting the results into minio. -# marc_export_collection_fixture.export_collection(collection) -# -# # Verify that we didn't leave anything in the redis cache. -# assert marc_export_collection_fixture.redis_data(collection) is None -# -# # Verify that the expected number of files were uploaded to minio. -# uploaded_files = s3_service_integration_fixture.list_objects("public") -# assert len(uploaded_files) == 3 -# -# # Verify that the expected number of marc files were created in the database. -# marc_files = marc_export_collection_fixture.marc_files() -# assert len(marc_files) == 3 -# filenames = [marc_file.key for marc_file in marc_files] -# -# # Verify that the uploaded files are the expected ones. -# assert set(uploaded_files) == set(filenames) -# -# # Verify that the marc files contain the expected works. -# for file in uploaded_files: -# data = s3_service_integration_fixture.get_object("public", file) -# records = list(MARCReader(data)) -# assert len(records) == len(work_uris) -# marc_uris = [record["001"].data for record in records] -# assert set(marc_uris) == set(work_uris) -# -# # Make sure the records have the correct organization code. -# expected_org = "library1-org" if "library1" in file else "library2-org" -# assert all(record["003"].data == expected_org for record in records) -# -# # Make sure records have the correct status -# expected_status = "c" if "delta" in file else "n" -# assert all( -# record.leader.record_status == expected_status for record in records -# ) -# -# def test_collection_no_works( -# self, -# marc_exporter_fixture: MarcExporterFixture, -# s3_service_integration_fixture: S3ServiceIntegrationFixture, -# marc_export_collection_fixture: MarcExportCollectionFixture, -# ): -# marc_export_collection_fixture.setup_minio_storage() -# collection = marc_exporter_fixture.collection2 -# marc_export_collection_fixture.export_collection(collection) -# -# assert marc_export_collection_fixture.marc_files() == [] -# assert s3_service_integration_fixture.list_objects("public") == [] -# assert marc_export_collection_fixture.redis_data(collection) is None -# -# def test_exception_handled( -# self, -# marc_exporter_fixture: MarcExporterFixture, -# marc_export_collection_fixture: MarcExportCollectionFixture, -# ): -# marc_export_collection_fixture.setup_mock_storage() -# collection = marc_exporter_fixture.collection1 -# marc_export_collection_fixture.works(collection) -# -# with patch.object(MarcUploadManager, "complete") as complete: -# complete.side_effect = Exception("Test Exception") -# with pytest.raises(Exception, match="Test Exception"): -# marc_export_collection_fixture.export_collection(collection) -# -# # After the exception, we should have aborted the multipart uploads and deleted the redis data. -# assert marc_export_collection_fixture.marc_files() == [] -# assert marc_export_collection_fixture.redis_data(collection) is None -# assert len(marc_export_collection_fixture.mock_s3.aborted) == 3 -# -# def test_locked( -# self, -# redis_fixture: RedisFixture, -# marc_exporter_fixture: MarcExporterFixture, -# marc_export_collection_fixture: MarcExportCollectionFixture, -# caplog: pytest.LogCaptureFixture, -# ): -# caplog.set_level(LogLevel.info) -# collection = marc_exporter_fixture.collection1 -# assert collection.id is not None -# MarcFileUploadSession(redis_fixture.client, collection.id).acquire() -# marc_export_collection_fixture.setup_mock_storage() -# with patch.object(MarcExporter, "query_works") as query: -# marc_export_collection_fixture.export_collection(collection) -# query.assert_not_called() -# assert "another task is already processing it" in caplog.text -# -# def test_outdated_task_run( -# self, -# redis_fixture: RedisFixture, -# marc_exporter_fixture: MarcExporterFixture, -# marc_export_collection_fixture: MarcExportCollectionFixture, -# caplog: pytest.LogCaptureFixture, -# ): -# # In the case that an old task is run again for some reason, it should -# # detect that its update number is incorrect and exit. -# caplog.set_level(LogLevel.info) -# collection = marc_exporter_fixture.collection1 -# marc_export_collection_fixture.setup_mock_storage() -# assert collection.id is not None -# -# # Acquire the lock and start an upload, this simulates another task having done work -# # that the current task doesn't know about. -# uploads = MarcFileUploadSession(redis_fixture.client, collection.id) -# with uploads.lock() as locked: -# assert locked -# uploads.append_buffers({"test": "data"}) -# -# with pytest.raises(MarcFileUploadSessionError, match="Update number mismatch"): -# marc_export_collection_fixture.export_collection(collection) -# -# assert marc_export_collection_fixture.marc_files() == [] -# assert marc_export_collection_fixture.redis_data(collection) is None -# -# -# def test_marc_export_cleanup( -# db: DatabaseTransactionFixture, -# celery_fixture: CeleryFixture, -# s3_service_fixture: S3ServiceFixture, -# marc_exporter_fixture: MarcExporterFixture, -# services_fixture: ServicesFixture, -# ): -# marc_exporter_fixture.configure_export(marc_file=False) -# mock_s3 = s3_service_fixture.mock_service() -# services_fixture.services.storage.public.override(mock_s3) -# -# not_deleted_id = marc_exporter_fixture.marc_file(created=utc_now()).id -# deleted_keys = [ -# marc_exporter_fixture.marc_file( -# created=utc_now() - datetime.timedelta(days=d + 1) -# ).key -# for d in range(20) -# ] -# -# marc.marc_export_cleanup.delay(batch_size=5).wait() -# -# [not_deleted] = db.session.execute(select(MarcFile)).scalars().all() -# assert not_deleted.id == not_deleted_id -# assert mock_s3.deleted == deleted_keys +import datetime +from unittest.mock import ANY, call, patch + +import pytest +from pymarc import MARCReader +from sqlalchemy import select + +from palace.manager.celery.tasks import marc +from palace.manager.celery.tasks.marc import marc_export_collection_lock +from palace.manager.marc.exporter import MarcExporter +from palace.manager.marc.uploader import MarcUploadManager +from palace.manager.service.logging.configuration import LogLevel +from palace.manager.service.redis.models.lock import RedisLock +from palace.manager.sqlalchemy.model.collection import Collection +from palace.manager.sqlalchemy.model.marcfile import MarcFile +from palace.manager.sqlalchemy.model.work import Work +from palace.manager.util.datetime_helpers import utc_now +from tests.fixtures.celery import CeleryFixture +from tests.fixtures.database import DatabaseTransactionFixture +from tests.fixtures.marc import MarcExporterFixture +from tests.fixtures.redis import RedisFixture +from tests.fixtures.s3 import S3ServiceFixture, S3ServiceIntegrationFixture +from tests.fixtures.services import ServicesFixture + + +class TestMarcExport: + def test_no_works( + self, + db: DatabaseTransactionFixture, + redis_fixture: RedisFixture, + marc_exporter_fixture: MarcExporterFixture, + celery_fixture: CeleryFixture, + ): + marc_exporter_fixture.configure_export() + with patch.object(marc, "marc_export_collection") as marc_export_collection: + # Because none of the collections have works, we should skip all of them. + marc.marc_export.delay().wait() + marc_export_collection.delay.assert_not_called() + + def test_normal_run( + self, + db: DatabaseTransactionFixture, + redis_fixture: RedisFixture, + marc_exporter_fixture: MarcExporterFixture, + celery_fixture: CeleryFixture, + ): + marc_exporter_fixture.configure_export() + marc_exporter_fixture.marc_file( + collection=marc_exporter_fixture.collection1, + library=marc_exporter_fixture.library1, + created=utc_now() - datetime.timedelta(days=7), + ) + with patch.object(marc, "marc_export_collection") as marc_export_collection: + # Runs against all the expected collections + collections = [ + marc_exporter_fixture.collection1, + marc_exporter_fixture.collection2, + marc_exporter_fixture.collection3, + ] + for collection in collections: + marc_exporter_fixture.work(collection) + marc.marc_export.delay().wait() + + # We make the calls to generate a full export for every collection + marc_export_collection.delay.assert_has_calls( + [ + call( + collection_id=collection.id, + collection_name=collection.name, + start_time=ANY, + libraries=ANY, + ) + for collection in collections + ], + any_order=True, + ) + + # We make the calls to generate a delta export only for collection1 + marc_export_collection.delay.assert_any_call( + collection_id=marc_exporter_fixture.collection1.id, + collection_name=marc_exporter_fixture.collection1.name, + start_time=ANY, + libraries=ANY, + delta=True, + ) + + # Make sure the call was made with the correct library set + [delta_call] = [ + c + for c in marc_export_collection.delay.mock_calls + if "delta" in c.kwargs + ] + libraries_kwarg = delta_call.kwargs["libraries"] + assert len(libraries_kwarg) == 1 + assert ( + libraries_kwarg[0].get("library_id") + == marc_exporter_fixture.library1.id + ) + + def test_skip_collections( + self, + db: DatabaseTransactionFixture, + redis_fixture: RedisFixture, + marc_exporter_fixture: MarcExporterFixture, + celery_fixture: CeleryFixture, + ): + marc_exporter_fixture.configure_export() + with patch.object(marc, "marc_export_collection") as marc_export_collection: + # Collection 1 should be skipped because it has no works + + # Collection 2 should be skipped because it was updated recently + marc_exporter_fixture.work(marc_exporter_fixture.collection2) + marc_exporter_fixture.marc_file( + collection=marc_exporter_fixture.collection2, + library=marc_exporter_fixture.library1, + ) + + # Collection 3 should be skipped because it was updated recently + marc_exporter_fixture.work(marc_exporter_fixture.collection3) + marc_exporter_fixture.marc_file( + collection=marc_exporter_fixture.collection3, + library=marc_exporter_fixture.library2, + ) + + marc.marc_export.delay().wait() + marc_export_collection.delay.assert_not_called() + + +class MarcExportCollectionFixture: + def __init__( + self, + db: DatabaseTransactionFixture, + celery_fixture: CeleryFixture, + redis_fixture: RedisFixture, + marc_exporter_fixture: MarcExporterFixture, + s3_service_integration_fixture: S3ServiceIntegrationFixture, + s3_service_fixture: S3ServiceFixture, + services_fixture: ServicesFixture, + ): + self.db = db + self.celery_fixture = celery_fixture + self.redis_fixture = redis_fixture + self.marc_exporter_fixture = marc_exporter_fixture + self.s3_service_integration_fixture = s3_service_integration_fixture + self.s3_service_fixture = s3_service_fixture + self.services_fixture = services_fixture + + self.mock_s3 = self.s3_service_fixture.mock_service() + self.mock_s3.MINIMUM_MULTIPART_UPLOAD_SIZE = 10 + marc_exporter_fixture.configure_export() + + self.start_time = utc_now() + + def marc_files(self) -> list[MarcFile]: + return self.db.session.execute(select(MarcFile)).scalars().all() + + def setup_minio_storage(self) -> None: + self.services_fixture.services.storage.override( + self.s3_service_integration_fixture.container + ) + + def setup_mock_storage(self) -> None: + self.services_fixture.services.storage.public.override(self.mock_s3) + + def works(self, collection: Collection) -> list[Work]: + return [self.marc_exporter_fixture.work(collection) for _ in range(15)] + + def export_collection(self, collection: Collection, delta: bool = False) -> None: + service = self.services_fixture.services.integration_registry.catalog_services() + assert collection.id is not None + info = MarcExporter.enabled_libraries(self.db.session, service, collection.id) + libraries = [l.model_dump() for l in info] + marc.marc_export_collection.delay( + collection.id, + collection_name=collection.name, + batch_size=5, + start_time=self.start_time, + libraries=libraries, + delta=delta, + ).wait() + + def redis_lock(self, collection: Collection, delta: bool = False) -> RedisLock: + return marc_export_collection_lock( + self.redis_fixture.client, collection.id, delta=delta + ) + + +@pytest.fixture +def marc_export_collection_fixture( + db: DatabaseTransactionFixture, + celery_fixture: CeleryFixture, + redis_fixture: RedisFixture, + marc_exporter_fixture: MarcExporterFixture, + s3_service_integration_fixture: S3ServiceIntegrationFixture, + s3_service_fixture: S3ServiceFixture, + services_fixture: ServicesFixture, +) -> MarcExportCollectionFixture: + return MarcExportCollectionFixture( + db, + celery_fixture, + redis_fixture, + marc_exporter_fixture, + s3_service_integration_fixture, + s3_service_fixture, + services_fixture, + ) + + +class TestMarcExportCollection: + def test_normal_run( + self, + s3_service_integration_fixture: S3ServiceIntegrationFixture, + marc_exporter_fixture: MarcExporterFixture, + marc_export_collection_fixture: MarcExportCollectionFixture, + ): + marc_export_collection_fixture.setup_minio_storage() + collection = marc_exporter_fixture.collection1 + works = marc_export_collection_fixture.works(collection) + work_uris = [work.license_pools[0].identifier.urn for work in works] + + # Run the full end-to-end process for exporting a collection, this should generate + # 3 batches of 5 works each, putting the results into minio. + marc_export_collection_fixture.export_collection(collection) + + # Lock is released + assert not marc_export_collection_fixture.redis_lock(collection).locked() + + # Verify that the expected number of files were uploaded to minio. + uploaded_files = s3_service_integration_fixture.list_objects("public") + assert len(uploaded_files) == 2 + + # Verify that the expected number of marc files were created in the database. + marc_files = marc_export_collection_fixture.marc_files() + assert len(marc_files) == 2 + filenames = [marc_file.key for marc_file in marc_files] + + # Verify that the uploaded files are the expected ones. + assert set(uploaded_files) == set(filenames) + + # Verify that the marc files contain the expected works. + for file in uploaded_files: + data = s3_service_integration_fixture.get_object("public", file) + records = list(MARCReader(data)) + assert len(records) == len(work_uris) + marc_uris = [record["001"].data for record in records] + assert set(marc_uris) == set(work_uris) + + # Make sure the records have the correct organization code. + expected_org = "library1-org" if "library1" in file else "library2-org" + assert all(record["003"].data == expected_org for record in records) + + # Make sure records have the correct status + assert all(record.leader.record_status == "n" for record in records) + + # Try running a delta export now + marc_export_collection_fixture.export_collection(collection, delta=True) + + # Because no works have been updated since the last run, no delta exports are generated + marc_files = marc_export_collection_fixture.marc_files() + assert len(marc_files) == 2 + + # Update a couple works last_updated_time + updated_works = [works[0], works[1]] + for work in updated_works: + work.last_update_time = utc_now() + + marc_export_collection_fixture.export_collection(collection, delta=True) + + # Now we generate marc files + marc_files = marc_export_collection_fixture.marc_files() + assert len(marc_files) == 4 + delta_marc_files = [ + marc_file for marc_file in marc_files if "delta" in marc_file.key + ] + assert len(delta_marc_files) == 2 + + # Verify that the marc files contain the expected works. + for marc_file in delta_marc_files: + data = s3_service_integration_fixture.get_object("public", marc_file.key) + records = list(MARCReader(data)) + assert len(records) == 2 + marc_uris = [record["001"].data for record in records] + assert set(marc_uris) == { + work.license_pools[0].identifier.urn for work in updated_works + } + + # Make sure the records have the correct organization code. + expected_org = ( + "library1-org" if "library1" in marc_file.key else "library2-org" + ) + assert all(record["003"].data == expected_org for record in records) + + # Make sure records have the correct status + assert all(record.leader.record_status == "c" for record in records) + + def test_collection_no_works( + self, + marc_exporter_fixture: MarcExporterFixture, + s3_service_integration_fixture: S3ServiceIntegrationFixture, + marc_export_collection_fixture: MarcExportCollectionFixture, + ): + marc_export_collection_fixture.setup_minio_storage() + collection = marc_exporter_fixture.collection2 + marc_export_collection_fixture.export_collection(collection) + + assert marc_export_collection_fixture.marc_files() == [] + assert s3_service_integration_fixture.list_objects("public") == [] + assert not marc_export_collection_fixture.redis_lock(collection).locked() + + def test_exception_handled( + self, + marc_exporter_fixture: MarcExporterFixture, + marc_export_collection_fixture: MarcExportCollectionFixture, + ): + marc_export_collection_fixture.setup_mock_storage() + collection = marc_exporter_fixture.collection1 + marc_export_collection_fixture.works(collection) + + with patch.object(MarcUploadManager, "complete") as complete: + complete.side_effect = Exception("Test Exception") + with pytest.raises(Exception, match="Test Exception"): + marc_export_collection_fixture.export_collection(collection) + + # After the exception, we should have aborted the multipart uploads and released the lock + assert marc_export_collection_fixture.marc_files() == [] + assert len(marc_export_collection_fixture.mock_s3.aborted) == 2 + assert not marc_export_collection_fixture.redis_lock(collection).locked() + + def test_locked( + self, + redis_fixture: RedisFixture, + marc_exporter_fixture: MarcExporterFixture, + marc_export_collection_fixture: MarcExportCollectionFixture, + caplog: pytest.LogCaptureFixture, + ): + caplog.set_level(LogLevel.info) + collection = marc_exporter_fixture.collection1 + marc_export_collection_fixture.redis_lock(collection).acquire() + marc_export_collection_fixture.setup_mock_storage() + with patch.object(MarcExporter, "query_works") as query: + marc_export_collection_fixture.export_collection(collection) + query.assert_not_called() + assert "another task is already processing it" in caplog.text + + +def test_marc_export_cleanup( + db: DatabaseTransactionFixture, + celery_fixture: CeleryFixture, + s3_service_fixture: S3ServiceFixture, + marc_exporter_fixture: MarcExporterFixture, + services_fixture: ServicesFixture, +): + marc_exporter_fixture.configure_export() + mock_s3 = s3_service_fixture.mock_service() + services_fixture.services.storage.public.override(mock_s3) + + not_deleted_id = marc_exporter_fixture.marc_file(created=utc_now()).id + deleted_keys = [ + marc_exporter_fixture.marc_file( + created=utc_now() - datetime.timedelta(days=d + 1) + ).key + for d in range(20) + ] + + marc.marc_export_cleanup.delay(batch_size=5).wait() + + [not_deleted] = db.session.execute(select(MarcFile)).scalars().all() + assert not_deleted.id == not_deleted_id + assert mock_s3.deleted == deleted_keys diff --git a/tests/manager/marc/test_exporter.py b/tests/manager/marc/test_exporter.py index 0c06cb901..76a0b1b4b 100644 --- a/tests/manager/marc/test_exporter.py +++ b/tests/manager/marc/test_exporter.py @@ -375,6 +375,10 @@ def test_collection(self, marc_exporter_fixture: MarcExporterFixture) -> None: def test_process_work(self, marc_exporter_fixture: MarcExporterFixture) -> None: marc_exporter_fixture.configure_export() + marc_exporter_fixture.marc_file( + library=marc_exporter_fixture.library1, + created=utc_now() - datetime.timedelta(days=14), + ) collection = marc_exporter_fixture.collection1 work = marc_exporter_fixture.work(collection) @@ -394,15 +398,15 @@ def test_process_work(self, marc_exporter_fixture: MarcExporterFixture) -> None: processed = process_work(False) assert list(processed.keys()) == enabled_libraries - # But only the first in a delta record, since the other library - # has had a delta generated recently - processed = process_work(True) - assert list(processed.keys()) == [enabled_libraries[0]] + # But we only get library1 in a delta record, since this is the first full marc export + # for library2, so there is no timestamp to create a delta record against. + [processed] = process_work(True).keys() + assert processed.library_id == marc_exporter_fixture.library1.id def test_files_for_cleanup_deleted_disabled( self, marc_exporter_fixture: MarcExporterFixture ) -> None: - marc_exporter_fixture.configure_export(marc_file=False) + marc_exporter_fixture.configure_export() files_for_cleanup = partial( MarcExporter.files_for_cleanup, marc_exporter_fixture.session, @@ -460,7 +464,7 @@ def test_files_for_cleanup_deleted_disabled( def test_files_for_cleanup_outdated_full( self, marc_exporter_fixture: MarcExporterFixture ) -> None: - marc_exporter_fixture.configure_export(marc_file=False) + marc_exporter_fixture.configure_export() files_for_cleanup = partial( MarcExporter.files_for_cleanup, marc_exporter_fixture.session, @@ -484,7 +488,7 @@ def test_files_for_cleanup_outdated_full( def test_files_for_cleanup_outdated_delta( self, marc_exporter_fixture: MarcExporterFixture ) -> None: - marc_exporter_fixture.configure_export(marc_file=False) + marc_exporter_fixture.configure_export() files_for_cleanup = partial( MarcExporter.files_for_cleanup, marc_exporter_fixture.session, diff --git a/tests/manager/marc/test_uploader.py b/tests/manager/marc/test_uploader.py index e1a1fa085..0e9fb0578 100644 --- a/tests/manager/marc/test_uploader.py +++ b/tests/manager/marc/test_uploader.py @@ -74,23 +74,30 @@ def test__init_(self, marc_upload_manager_fixture: MarcUploadManagerFixture): # If we don't give a context, one is created and set uploader = marc_upload_manager_fixture.create_uploader() - assert len(marc_upload_manager_fixture.mock_s3_service.upload_in_progress) == 1 - [upload] = ( - marc_upload_manager_fixture.mock_s3_service.upload_in_progress.values() - ) - + assert uploader.context.upload_id is None assert uploader.context.s3_key.startswith( f"marc/{marc_upload_manager_fixture.library_short_name}/" f"{marc_upload_manager_fixture.collection_name}.full.2001-01-01." ) - assert uploader.context.s3_key == upload.key - assert uploader.context.upload_id == upload.upload_id assert isinstance(uploader.context.upload_uuid, UUID) assert uploader.context.parts == [] + def test_begin_upload(self, marc_upload_manager_fixture: MarcUploadManagerFixture): + uploader = marc_upload_manager_fixture.create_uploader() + assert len(marc_upload_manager_fixture.mock_s3_service.upload_in_progress) == 0 + uploader.begin_upload() + assert len(marc_upload_manager_fixture.mock_s3_service.upload_in_progress) == 1 + [upload] = ( + marc_upload_manager_fixture.mock_s3_service.upload_in_progress.values() + ) + assert uploader.context.upload_id == upload.upload_id + def test_upload_part(self, marc_upload_manager_fixture: MarcUploadManagerFixture): uploader = marc_upload_manager_fixture.create_uploader() + # If begin_upload hasn't been called, it will be called by upload_part + assert len(marc_upload_manager_fixture.mock_s3_service.upload_in_progress) == 0 + # Can upload parts as a binary file, or a byte string assert uploader.upload_part(b"test") with TemporaryFile() as f: @@ -101,6 +108,8 @@ def test_upload_part(self, marc_upload_manager_fixture: MarcUploadManagerFixture assert not uploader.upload_part(b"") assert not uploader.upload_part(BytesIO()) + assert len(marc_upload_manager_fixture.mock_s3_service.upload_in_progress) == 1 + [upload_parts] = ( marc_upload_manager_fixture.mock_s3_service.upload_in_progress.values() ) @@ -116,7 +125,16 @@ def test_upload_part(self, marc_upload_manager_fixture: MarcUploadManagerFixture uploader.upload_part(b"123") def test_abort(self, marc_upload_manager_fixture: MarcUploadManagerFixture): + # If an upload hasn't been started abort just sets finalized + uploader = marc_upload_manager_fixture.create_uploader() + uploader.abort() + assert uploader.finalized + assert len(marc_upload_manager_fixture.mock_s3_service.upload_in_progress) == 0 + assert len(marc_upload_manager_fixture.mock_s3_service.aborted) == 0 + + # Otherwise abort calls to the API to abort the upload uploader = marc_upload_manager_fixture.create_uploader() + uploader.begin_upload() uploader.abort() assert uploader.finalized assert ( @@ -128,8 +146,14 @@ def test_abort(self, marc_upload_manager_fixture: MarcUploadManagerFixture): uploader.abort() def test_complete(self, marc_upload_manager_fixture: MarcUploadManagerFixture): + # If the upload hasn't started, the upload isn't aborted, but it is finalized + uploader = marc_upload_manager_fixture.create_uploader() + assert not uploader.complete() + assert uploader.finalized + # If the upload has no parts, it is aborted uploader = marc_upload_manager_fixture.create_uploader() + uploader.begin_upload() assert not uploader.complete() assert uploader.finalized assert (