diff --git a/bin/search_index_refresh b/bin/search_index_refresh deleted file mode 100755 index 9c151f3a18..0000000000 --- a/bin/search_index_refresh +++ /dev/null @@ -1,9 +0,0 @@ -#!/usr/bin/env python -"""Re-index any Works whose entries in the search index have become -out of date. -""" - -from palace.manager.scripts.coverage_provider import RunWorkCoverageProviderScript -from palace.manager.search.coverage_provider import SearchIndexCoverageProvider - -RunWorkCoverageProviderScript(SearchIndexCoverageProvider).run() diff --git a/docker/services/cron/cron.d/circulation b/docker/services/cron/cron.d/circulation index 76be17bbb7..2534049d6b 100644 --- a/docker/services/cron/cron.d/circulation +++ b/docker/services/cron/cron.d/circulation @@ -11,7 +11,6 @@ HOME=/var/www/circulation # These scripts update internal caches. # -*/30 * * * * root bin/run -d 15 search_index_refresh >> /var/log/cron.log 2>&1 10 0 * * * root bin/run search_index_clear >> /var/log/cron.log 2>&1 0 0 * * * root bin/run update_custom_list_size >> /var/log/cron.log 2>&1 0 2 * * * root bin/run update_lane_size >> /var/log/cron.log 2>&1 diff --git a/src/palace/manager/api/admin/controller/custom_lists.py b/src/palace/manager/api/admin/controller/custom_lists.py index 3d24cdb3e3..17533eb262 100644 --- a/src/palace/manager/api/admin/controller/custom_lists.py +++ b/src/palace/manager/api/admin/controller/custom_lists.py @@ -289,8 +289,9 @@ def _create_or_update_list( if membership_change: # We need to update the search index entries for works that caused a membership change, # so the upstream counts can be calculated correctly. - documents = self.search_engine.create_search_documents_from_works( - works_to_update_in_search + documents = Work.to_search_documents( + self._db, + [w.id for w in works_to_update_in_search], ) index = self.search_engine.start_updating_search_documents() index.add_documents(documents) diff --git a/src/palace/manager/celery/tasks/search.py b/src/palace/manager/celery/tasks/search.py new file mode 100644 index 0000000000..0b17b32df3 --- /dev/null +++ b/src/palace/manager/celery/tasks/search.py @@ -0,0 +1,93 @@ +from random import randrange + +from celery import chain, shared_task +from sqlalchemy import select + +from palace.manager.celery.task import Task +from palace.manager.service.celery.celery import QueueNames +from palace.manager.sqlalchemy.model.work import Work +from palace.manager.util.log import elapsed_time_logging + + +def exponential_backoff(retries: int) -> int: + return 3**retries + randrange(0, 3) + + +@shared_task(queue=QueueNames.default, bind=True, max_retries=5) +def search_reindex(task: Task, offset: int = 0, batch_size: int = 500) -> None: + index = task.services.search.index() + + task.log.info( + f"Running search reindex at offset {offset} with batch size {batch_size}." + ) + + with ( + task.session() as session, + elapsed_time_logging( + log_method=task.log.info, + message_prefix="Works queried from database", + skip_start=True, + ), + ): + works = [ + w.id + for w in session.execute( + select(Work.id) + .where(Work.presentation_ready == True) + .order_by(Work.id) + .limit(batch_size) + .offset(offset) + ) + ] + documents = Work.to_search_documents(session, works) + + with elapsed_time_logging( + log_method=task.log.info, message_prefix="Works added to index", skip_start=True + ): + failed_documents = index.add_documents(documents=documents) + if failed_documents: + wait_time = exponential_backoff(task.request.retries) + task.log.error( + f"Failed to index {len(failed_documents)} works. Retrying in {wait_time} seconds." + ) + raise task.retry(countdown=wait_time) + + if len(works) == batch_size: + # This task is complete, but there are more works waiting to be indexed. Requeue ourselves + # to process the next batch. + raise task.replace( + search_reindex.s(offset=offset + batch_size, batch_size=batch_size) + ) + + task.log.info("Finished search reindex.") + + +@shared_task(queue=QueueNames.default, bind=True) +def update_read_pointer(task: Task) -> None: + task.log.info("Updating read pointer.") + service = task.services.search.service() + revision_directory = task.services.search.revision_directory() + revision = revision_directory.highest() + service.read_pointer_set(revision) + task.log.info( + f"Updated read pointer ({service.base_revision_name()} v{revision.version})." + ) + + +@shared_task(queue=QueueNames.default, bind=True) +def index_work(task: Task, work_id: int) -> None: + index = task.services.search.index() + with task.session() as session: + [document] = Work.to_search_documents(session, [work_id]) + error = index.add_document(document=document) + if error: + wait_time = exponential_backoff(task.request.retries) + task.log.error( + f"Failed to index work {work_id}: {error}. Retrying in {wait_time} seconds." + ) + raise task.retry(countdown=wait_time) + + task.log.info(f"Indexed work {work_id}.") + + +do_migration = chain(search_reindex.si(), update_read_pointer.si()) diff --git a/src/palace/manager/scripts/initialization.py b/src/palace/manager/scripts/initialization.py index 13c101c67d..e3c5f1c450 100644 --- a/src/palace/manager/scripts/initialization.py +++ b/src/palace/manager/scripts/initialization.py @@ -9,6 +9,9 @@ from sqlalchemy.engine import Connection, Engine from sqlalchemy.orm import Session +from palace.manager.celery.tasks.search import do_migration +from palace.manager.search.revision import SearchSchemaRevision +from palace.manager.search.service import SearchService from palace.manager.service.container import container_instance from palace.manager.sqlalchemy.session import SessionManager from palace.manager.sqlalchemy.util import LOCK_ID_DB_INIT, pg_advisory_lock @@ -56,7 +59,7 @@ def migrate_database(self, connection: Connection) -> None: alembic_conf = self._get_alembic_config(connection, self._config_file) command.upgrade(alembic_conf, "head") - def initialize_database(self, connection: Connection) -> None: + def initialize_database_schema(self, connection: Connection) -> None: """ Initialize the database, creating tables, loading default data and then stamping the most recent migration as the current state of the DB. @@ -71,11 +74,7 @@ def initialize_database(self, connection: Connection) -> None: alembic_conf = self._get_alembic_config(connection, self._config_file) command.stamp(alembic_conf, "head") - def initialize_search_indexes(self) -> bool: - search = self._container.search.index() - return search.initialize_indices() - - def initialize(self, connection: Connection): + def initialize_database(self, connection: Connection) -> None: """Initialize the database if necessary.""" inspector = inspect(connection) if inspector.has_table("alembic_version"): @@ -91,10 +90,54 @@ def initialize(self, connection: Connection): ) else: self.log.info("Database schema does not exist. Initializing.") - self.initialize_database(connection) + self.initialize_database_schema(connection) self.log.info("Initialization complete.") - self.initialize_search_indexes() + def create_search_index( + self, service: SearchService, revision: SearchSchemaRevision + ) -> None: + # Initialize a new search index by creating the index, setting the mapping, + # and setting the read and write pointers. + service.index_create(revision) + service.index_set_mapping(revision) + service.write_pointer_set(revision) + + if not service.read_pointer(): + # A read pointer does not exist. We set it to the most recent. + service.read_pointer_set(revision) + + def initialize_search(self) -> None: + service = self._container.search.service() + revision_directory = self._container.search.revision_directory() + revision = revision_directory.highest() + + if not service.write_pointer(): + # A write pointer does not exist. This is a fresh index. + self.log.info("Search index does not exist. Creating a new index.") + return self.create_search_index(service, revision) + + # The index already exists. We need to check if the revision is the most recent. + write_pointer = service.write_pointer() + if write_pointer.version != revision.version: + # The revision is not the most recent. We need to create a new index. + # and start reindexing our data into it asynchronously. When the reindex + # is complete, we will switch the read pointer to the new index. + self.log.info( + f"Search index is out-of-date ({service.base_revision_name()} v{write_pointer.version})." + f"Creating a new index for revision {revision.version}." + ) + self.create_search_index(service, revision) + task = do_migration.apply_async() + self.log.info(f"Reindexing data into new search index. Task ID: {task.id}") + else: + self.log.info( + f"Search index is up-to-date ({service.base_revision_name()} v{revision.version})." + ) + + def initialize(self, connection: Connection): + """Initialize database and search index.""" + self.initialize_database(connection) + self.initialize_search() def run(self) -> None: """ diff --git a/src/palace/manager/scripts/search.py b/src/palace/manager/scripts/search.py index fd76e392bf..781db9bc2a 100644 --- a/src/palace/manager/scripts/search.py +++ b/src/palace/manager/scripts/search.py @@ -1,4 +1,5 @@ from palace.manager.core.metadata_layer import TimestampData +from palace.manager.scripts.base import Script from palace.manager.scripts.coverage_provider import RunWorkCoverageProviderScript from palace.manager.scripts.timestamp import TimestampScript from palace.manager.search.coverage_provider import SearchIndexCoverageProvider @@ -6,12 +7,14 @@ from palace.manager.search.external_search import ExternalSearchIndex -class RebuildSearchIndexScript(RunWorkCoverageProviderScript, RemovesSearchCoverage): +class RebuildSearchIndexScript(Script): """Completely delete the search index and recreate it.""" + # TODO: This still needs unfucking + def __init__(self, *args, **kwargs): search = kwargs.pop("search_index_client", None) - super().__init__(SearchIndexCoverageProvider, *args, **kwargs) + super().__init__(*args, **kwargs) self.search: ExternalSearchIndex = search or self.services.search.index() def do_run(self): diff --git a/src/palace/manager/search/coverage_provider.py b/src/palace/manager/search/coverage_provider.py index 087cab4fc9..ffe6bf9a87 100644 --- a/src/palace/manager/search/coverage_provider.py +++ b/src/palace/manager/search/coverage_provider.py @@ -58,9 +58,8 @@ def run_once_and_update_timestamp(self): def process_batch(self, works) -> list[Work | CoverageFailure]: target: SearchDocumentReceiverType = self.migration or self.receiver - failures = target.add_documents( - documents=self.search_index_client.create_search_documents_from_works(works) - ) + documents = Work.to_search_documents(self._db, [w.id for w in works]) + failures = target.add_documents(documents=documents) # Maintain a dictionary of works so that we can efficiently remove failed works later. work_map: dict[int, Work] = {} diff --git a/src/palace/manager/search/external_search.py b/src/palace/manager/search/external_search.py index 81e6c78895..e6f66b1cbe 100644 --- a/src/palace/manager/search/external_search.py +++ b/src/palace/manager/search/external_search.py @@ -5,7 +5,7 @@ import re import time from collections import defaultdict -from collections.abc import Iterable, Sequence +from collections.abc import Sequence from attr import define from flask_babel import lazy_gettext as _ @@ -36,13 +36,12 @@ from palace.manager.core.facets import FacetConstants from palace.manager.core.metadata_layer import IdentifierData from palace.manager.core.problem_details import INVALID_INPUT -from palace.manager.search.migrator import ( - SearchDocumentReceiver, - SearchMigrationInProgress, - SearchMigrator, -) from palace.manager.search.revision_directory import SearchRevisionDirectory -from palace.manager.search.service import SearchDocument, SearchService +from palace.manager.search.service import ( + SearchDocument, + SearchService, + SearchServiceFailedDocument, +) from palace.manager.sqlalchemy.model.contributor import Contributor from palace.manager.sqlalchemy.model.edition import Edition from palace.manager.sqlalchemy.model.identifier import Identifier @@ -64,53 +63,16 @@ class ExternalSearchIndex(LoggerMixin): def __init__( self, service: SearchService, - revision_directory: SearchRevisionDirectory, - version: int | None = None, ) -> None: - """Constructor - - :param revision_directory Override the directory of revisions that will be used. If this isn't provided, - the default directory will be used. - :param version The specific revision that will be used. If not specified, the highest version in the - revision directory will be used. - """ + """Constructor""" self._search_service = service - # Locate the revision of the search index that we're going to use. - # This will fail fast if the requested version isn't available. - self._revision_directory = revision_directory - if version: - self._revision = self._revision_directory.find(version) - else: - self._revision = self._revision_directory.highest() - - # Get references to the read and write pointers. - self._search_read_pointer = self._search_service.read_pointer_name() - self._search_write_pointer = self._search_service.write_pointer_name() - def search_service(self) -> SearchService: """Get the underlying search service.""" return self._search_service - def start_migration(self) -> SearchMigrationInProgress | None: - """Update to the latest schema, indexing the given works.""" - migrator = SearchMigrator( - revisions=self._revision_directory, - service=self._search_service, - ) - return migrator.migrate( - base_name=self._search_service.base_revision_name, - version=self._revision.version, - ) - - def start_updating_search_documents(self) -> SearchDocumentReceiver: - """Start submitting search documents for whatever is the current write pointer.""" - return SearchDocumentReceiver( - pointer=self._search_write_pointer, service=self._search_service - ) - def clear_search_documents(self) -> None: - self._search_service.index_clear_documents(pointer=self._search_write_pointer) + self._search_service.index_clear_documents() def create_search_doc(self, query_string, filter, pagination, debug): if filter and filter.search_type == "json": @@ -125,7 +87,6 @@ def create_search_doc(self, query_string, filter, pagination, debug): if filter is not None and filter.min_score is not None: search = search.extra(min_score=filter.min_score) - fields = None if debug: # Don't restrict the fields at all -- get everything. # This makes it easy to investigate everything about the @@ -253,54 +214,25 @@ def count_works(self, filter): ) return qu.count() - def create_search_documents_from_works( - self, works: Iterable[Work] - ) -> Sequence[SearchDocument]: - """Create search documents for all the given works.""" - if not works: - # There's nothing to do. Don't bother making any requests - # to the search index. - return [] - - time1 = time.time() - needs_add = [] - for work in works: - needs_add.append(work) - - # Add/update any works that need adding/updating. - docs = Work.to_search_documents(needs_add) - time2 = time.time() - - self.log.info( - "Created %i search documents in %.2f seconds" % (len(docs), time2 - time1) - ) - return docs - - def remove_work(self, work): + def remove_work(self, work: Work | int) -> None: """Remove the search document for `work` from the search index.""" - self._search_service.index_remove_document( - pointer=self._search_read_pointer, id=work.id - ) - - def initialize_indices(self) -> bool: - """Attempt to initialize the indices and pointers for a first time run""" - service = self.search_service() - read_pointer = service.read_pointer() - if not read_pointer or service.is_pointer_empty(read_pointer): - # A read pointer does not exist, or points to the empty index - # This means either this is a new deployment or the first time - # the new opensearch code was deployed. - # In both cases doing a migration to the latest version is safe. - migration = self.start_migration() - if migration is not None: - migration.finish() - else: - self.log.warning( - "Read pointer was set to empty, but no migration was available." - ) - return False - - return True + if isinstance(work, Work): + if work.id is None: + self.log.warning("Work has no ID, unable to remove. %r", work) + return + work = work.id + + self._search_service.index_remove_document(doc_id=work) + + def add_document(self, document: SearchDocument) -> None: + """Add a document to the search index.""" + self._search_service.index_submit_document(document=document) + + def add_documents( + self, documents: Sequence[SearchDocument] + ) -> list[SearchServiceFailedDocument]: + """Add multiple documents to the search index.""" + return self._search_service.index_submit_documents(documents=documents) class SearchBase: diff --git a/src/palace/manager/search/migrator.py b/src/palace/manager/search/migrator.py deleted file mode 100644 index a683db86c7..0000000000 --- a/src/palace/manager/search/migrator.py +++ /dev/null @@ -1,178 +0,0 @@ -from abc import ABC, abstractmethod -from collections.abc import Sequence - -from palace.manager.core.exceptions import BasePalaceException -from palace.manager.search.revision import SearchSchemaRevision -from palace.manager.search.revision_directory import SearchRevisionDirectory -from palace.manager.search.service import ( - SearchDocument, - SearchService, - SearchServiceFailedDocument, -) -from palace.manager.util.log import LoggerMixin - - -class SearchMigrationException(BasePalaceException): - """The type of exceptions raised by the search migrator.""" - - def __init__(self, fatal: bool, message: str): - super().__init__(message) - self.fatal = fatal - - -class SearchDocumentReceiverType(ABC): - """A receiver of search documents.""" - - @abstractmethod - def add_documents( - self, documents: Sequence[SearchDocument] - ) -> list[SearchServiceFailedDocument]: - """Submit documents to be indexed.""" - - @abstractmethod - def finish(self) -> None: - """Make sure all changes are committed.""" - - -class SearchDocumentReceiver(SearchDocumentReceiverType, LoggerMixin): - """A receiver of search documents.""" - - def __init__(self, pointer: str, service: SearchService): - self._pointer = pointer - self._service = service - - @property - def pointer(self) -> str: - """The name of the index that will receive search documents.""" - return self._pointer - - def add_documents( - self, documents: Sequence[SearchDocument] - ) -> list[SearchServiceFailedDocument]: - """Submit documents to be indexed.""" - return self._service.index_submit_documents( - pointer=self._pointer, documents=documents - ) - - def finish(self) -> None: - """Make sure all changes are committed.""" - self.log.info("Finishing search documents.") - self._service.refresh() - self.log.info("Finished search documents.") - - -class SearchMigrationInProgress(SearchDocumentReceiverType, LoggerMixin): - """A migration in progress. Documents are being submitted, and the migration must be - explicitly finished or cancelled to take effect (or not!).""" - - def __init__( - self, - base_name: str, - revision: SearchSchemaRevision, - service: SearchService, - ): - self._base_name = base_name - self._revision = revision - self._service = service - self._receiver = SearchDocumentReceiver( - pointer=self._revision.name_for_index(base_name), service=self._service - ) - - def add_documents( - self, documents: Sequence[SearchDocument] - ) -> list[SearchServiceFailedDocument]: - """Submit documents to be indexed.""" - return self._receiver.add_documents(documents) - - def finish(self) -> None: - """Finish the migration.""" - self.log.info(f"Completing migration to {self._revision.version}") - # Make sure all changes are committed. - self._receiver.finish() - # Create the "indexed" alias. - self._service.index_set_populated(self._revision) - # Update the write pointer to point to the now-populated index. - self._service.write_pointer_set(self._revision) - # Set the read pointer to point at the now-populated index - self._service.read_pointer_set(self._revision) - self._service.refresh() - self.log.info(f"Completed migration to {self._revision.version}") - - def cancel(self) -> None: - """Cancel the migration, leaving the read and write pointers untouched.""" - self.log.info(f"Cancelling migration to {self._revision.version}") - return None - - -class SearchMigrator(LoggerMixin): - """A search migrator. This moves a search service to the targeted schema version.""" - - def __init__(self, revisions: SearchRevisionDirectory, service: SearchService): - self._revisions = revisions - self._service = service - - def migrate(self, base_name: str, version: int) -> SearchMigrationInProgress | None: - """ - Migrate to the given version using the given base name (such as 'circulation-works'). The function returns - an object that expects to receive batches of search documents used to populate any new index. When all - the batches of documents have been sent to the object, callers must call 'finish' to indicate to the search - migrator that no more documents are coming. Only at this point will the migrator consider the new index to be - "populated". - - :arg base_name: The base name used for indices (such as 'circulation-works'). - :arg version: The version number to which we are migrating - - :raises SearchMigrationException: On errors, but always leaves the system in a usable state. - """ - - self.log.info(f"starting migration to {base_name} {version}") - - try: - target = self._revisions.available.get(version) - if target is None: - raise SearchMigrationException( - fatal=True, - message=f"No support is available for schema version {version}", - ) - - # Does the empty index exist? Create it if not. - self._service.create_empty_index() - - # Does the read pointer exist? Point it at the empty index if not. - read = self._service.read_pointer() - if read is None: - self.log.info("Read pointer did not exist.") - self._service.read_pointer_set_empty() - - # We're probably going to have to do a migration. We might end up returning - # this instance so that users can submit documents for indexing. - in_progress = SearchMigrationInProgress( - base_name=base_name, revision=target, service=self._service - ) - - # Does the write pointer exist? - write = self._service.write_pointer() - if write is None or (not write.version == version): - self.log.info( - f"Write pointer does not point to the desired version: {write} != {version}." - ) - # Either the write pointer didn't exist, or it's pointing at a version - # other than the one we want. Create a new index for the version we want. - self._service.index_create(target) - self._service.index_set_mapping(target) - - # The index now definitely exists, but it might not be populated. Populate it if necessary. - if not self._service.index_is_populated(target): - self.log.info("Write index is not populated.") - return in_progress - - # If we didn't need to return the migration, finish it here. This will - # update the read and write pointers appropriately. - in_progress.finish() - return None - except SearchMigrationException: - raise - except Exception as e: - raise SearchMigrationException( - fatal=True, message=f"Service raised exception: {repr(e)}" - ) from e diff --git a/src/palace/manager/search/service.py b/src/palace/manager/search/service.py index 6f9aec3b27..82bd648f7f 100644 --- a/src/palace/manager/search/service.py +++ b/src/palace/manager/search/service.py @@ -100,38 +100,27 @@ def read_pointer(self) -> str | None: def write_pointer(self) -> SearchWritePointer | None: """Get the writer pointer, if it exists.""" - @abstractmethod - def create_empty_index(self) -> None: - """Atomically create the empty index for the given base name.""" - @abstractmethod def read_pointer_set(self, revision: SearchSchemaRevision) -> None: """Atomically set the read pointer to the index for the given revision and base name.""" - @abstractmethod - def read_pointer_set_empty(self) -> None: - """Atomically set the read pointer to the empty index for the base name.""" - @abstractmethod def index_create(self, revision: SearchSchemaRevision) -> None: """Atomically create an index for the given base name and revision.""" - @abstractmethod - def index_is_populated(self, revision: SearchSchemaRevision) -> bool: - """Return True if the index for the given base name and revision has been populated.""" - - @abstractmethod - def index_set_populated(self, revision: SearchSchemaRevision) -> None: - """Set an index as populated.""" - @abstractmethod def index_set_mapping(self, revision: SearchSchemaRevision) -> None: """Set the schema mappings for the given index.""" + @abstractmethod + def index_submit_document( + self, document: dict[str, Any], refresh: bool = False + ) -> None: + """Submit a search document to the given index.""" + @abstractmethod def index_submit_documents( self, - pointer: str, documents: Sequence[SearchDocument], ) -> list[SearchServiceFailedDocument]: """Submit search documents to the given index.""" @@ -145,7 +134,7 @@ def refresh(self) -> None: """Synchronously refresh the service and wait for changes to be completed.""" @abstractmethod - def index_clear_documents(self, pointer: str) -> None: + def index_clear_documents(self) -> None: """Clear all search documents in the given index.""" @abstractmethod @@ -157,13 +146,9 @@ def search_multi_client(self, write: bool = False) -> MultiSearch: """Return the underlying search client.""" @abstractmethod - def index_remove_document(self, pointer: str, id: int) -> None: + def index_remove_document(self, doc_id: int) -> None: """Remove a specific document from the given index.""" - @abstractmethod - def is_pointer_empty(self, pointer: str) -> bool: - """Check to see if a pointer points to an empty index""" - class SearchServiceOpensearch1(SearchService, LoggerMixin): """The real Opensearch 1.x service.""" @@ -197,16 +182,6 @@ def write_pointer(self) -> SearchWritePointer | None: except NotFoundError: return None - def create_empty_index(self) -> None: - try: - index_name = self._empty(self.base_revision_name) - self.log.debug(f"creating empty index {index_name}") - self._client.indices.create(index=index_name) - except RequestError as e: - if e.error == "resource_already_exists_exception": - return - raise e - def read_pointer_set(self, revision: SearchSchemaRevision) -> None: alias_name = self.read_pointer_name() target_index = revision.name_for_index(self.base_revision_name) @@ -219,34 +194,6 @@ def read_pointer_set(self, revision: SearchSchemaRevision) -> None: self.log.debug(f"setting read pointer {alias_name} to index {target_index}") self._client.indices.update_aliases(body=action) - def index_set_populated(self, revision: SearchSchemaRevision) -> None: - alias_name = revision.name_for_indexed_pointer(self.base_revision_name) - target_index = revision.name_for_index(self.base_revision_name) - action = { - "actions": [ - {"remove": {"index": "*", "alias": alias_name}}, - {"add": {"index": target_index, "alias": alias_name}}, - ] - } - self.log.debug( - f"creating 'indexed' flag alias {alias_name} for index {target_index}" - ) - self._client.indices.update_aliases(body=action) - - def read_pointer_set_empty(self) -> None: - alias_name = self.read_pointer_name() - target_index = self._empty(self.base_revision_name) - action = { - "actions": [ - {"remove": {"index": "*", "alias": alias_name}}, - {"add": {"index": target_index, "alias": alias_name}}, - ] - } - self.log.debug( - f"setting read pointer {alias_name} to empty index {target_index}" - ) - self._client.indices.update_aliases(body=action) - def index_create(self, revision: SearchSchemaRevision) -> None: try: index_name = revision.name_for_index(self.base_revision_name) @@ -260,11 +207,6 @@ def index_create(self, revision: SearchSchemaRevision) -> None: return raise e - def index_is_populated(self, revision: SearchSchemaRevision) -> bool: - return self._client.indices.exists_alias( - name=revision.name_for_indexed_pointer(self.base_revision_name) - ) - def index_set_mapping(self, revision: SearchSchemaRevision) -> None: data = {"properties": revision.mapping_document().serialize_properties()} index_name = revision.name_for_index(self.base_revision_name) @@ -279,9 +221,20 @@ def _ensure_scripts(self, revision: SearchSchemaRevision) -> None: name = revision.script_name(name) self._client.put_script(name, script) # type: ignore [misc] ## Seems the types aren't up to date + def index_submit_document( + self, document: dict[str, Any], refresh: bool = False + ) -> None: + self._client.index( + index=self.write_pointer_name(), + body=document, + require_alias=True, + refresh=refresh, + ) + def index_submit_documents( - self, pointer: str, documents: Sequence[SearchDocument] + self, documents: Sequence[SearchDocument] ) -> list[SearchServiceFailedDocument]: + pointer = self.write_pointer_name() self.log.info(f"submitting documents to index {pointer}") # Specifically override the target in all documents to the target pointer @@ -317,9 +270,11 @@ def index_submit_documents( return error_results - def index_clear_documents(self, pointer: str) -> None: + def index_clear_documents(self) -> None: self._client.delete_by_query( - index=pointer, body={"query": {"match_all": {}}}, wait_for_completion=True + index=self.write_pointer_name(), + body={"query": {"match_all": {}}}, + wait_for_completion=True, ) def refresh(self) -> None: @@ -366,12 +321,5 @@ def read_pointer_name(self) -> str: def write_pointer_name(self) -> str: return f"{self.base_revision_name}-search-write" - @staticmethod - def _empty(base_name: str) -> str: - return f"{base_name}-empty" - - def index_remove_document(self, pointer: str, id: int) -> None: - self._client.delete(index=pointer, id=id, doc_type="_doc") - - def is_pointer_empty(self, pointer: str) -> bool: - return pointer == self._empty(self.base_revision_name) + def index_remove_document(self, doc_id: int) -> None: + self._client.delete(index=self.write_pointer_name(), id=doc_id, doc_type="_doc") diff --git a/src/palace/manager/service/search/container.py b/src/palace/manager/service/search/container.py index 379dcf3e2d..e972adfb27 100644 --- a/src/palace/manager/service/search/container.py +++ b/src/palace/manager/service/search/container.py @@ -31,5 +31,4 @@ class Search(DeclarativeContainer): index: Provider[ExternalSearchIndex] = providers.Singleton( ExternalSearchIndex, service=service, - revision_directory=revision_directory, ) diff --git a/src/palace/manager/sqlalchemy/model/work.py b/src/palace/manager/sqlalchemy/model/work.py index 5f19627f55..32606ade98 100644 --- a/src/palace/manager/sqlalchemy/model/work.py +++ b/src/palace/manager/sqlalchemy/model/work.py @@ -29,7 +29,7 @@ from sqlalchemy.orm import Mapped, contains_eager, joinedload, relationship from sqlalchemy.orm.base import NO_VALUE from sqlalchemy.orm.session import Session -from sqlalchemy.sql.expression import and_, case, join, literal_column, select +from sqlalchemy.sql.expression import and_, case, literal_column, select from sqlalchemy.sql.functions import func from palace.manager.core.classifier import Classifier, WorkClassifier @@ -1207,15 +1207,7 @@ def external_index_needs_updating(self): This is a more efficient alternative to reindexing immediately, since these WorkCoverageRecords are handled in large batches. """ - return self._reset_coverage(WorkCoverageRecord.UPDATE_SEARCH_INDEX_OPERATION) - - def update_external_index(self, client, add_coverage_record=True): - """Create a WorkCoverageRecord so that this work's - entry in the search index can be modified or deleted. - This method is deprecated -- call - external_index_needs_updating() instead. - """ - self.external_index_needs_updating() + return index_work.delay(self.id) def needs_full_presentation_recalculation(self): """Mark this work as needing to have its presentation completely @@ -1415,19 +1407,23 @@ def assign_appeals(self, character, language, setting, story, cutoff=0.20): OPENSEARCH_TIME_FORMAT = 'YYYY-MM-DD"T"HH24:MI:SS"."MS' @classmethod - def to_search_documents(cls, works: list[Self]) -> Sequence[SearchDocument]: + def to_search_documents( + cls, session: Session, work_ids: Sequence[int | None] + ) -> Sequence[SearchDocument]: """In app to search documents needed to ease off the burden of complex queries from the DB cluster No recursive identifier policy is taken here as using the RecursiveEquivalentsCache implicitly has that set """ - _db = Session.object_session(works[0]) + if not work_ids: + return [] - qu = _db.query(Work).filter(Work.id.in_([w.id for w in works])) + qu = session.query(Work).filter(Work.id.in_(work_ids)) qu = qu.options( joinedload(Work.presentation_edition) .joinedload(Edition.contributions) .joinedload(Contribution.contributor), + joinedload(Work.suppressed_for), joinedload(Work.work_genres).joinedload(WorkGenre.genre), joinedload(Work.custom_list_entries), ) @@ -1444,14 +1440,14 @@ def to_search_documents(cls, works: list[Self]) -> Sequence[SearchDocument]: ## Add it to another table so it becomes faster to just query the pre-computed table equivalent_identifiers = ( - _db.query(RecursiveEquivalencyCache) + session.query(RecursiveEquivalencyCache) .join( Edition, Edition.primary_identifier_id == RecursiveEquivalencyCache.parent_identifier_id, ) .join(Work, Work.presentation_edition_id == Edition.id) - .filter(Work.id.in_(w.id for w in works)) + .filter(Work.id.in_(work_ids)) .with_entities( Work.id.label("work_id"), RecursiveEquivalencyCache.identifier_id.label("equivalent_id"), @@ -1459,19 +1455,19 @@ def to_search_documents(cls, works: list[Self]) -> Sequence[SearchDocument]: .cte("equivalent_cte") ) - identifiers_query = ( - select( - [ - equivalent_identifiers.c.work_id, - Identifier.identifier, - Identifier.type, - ] - ) - .select_from(Identifier) - .where(Identifier.id == literal_column("equivalent_cte.equivalent_id")) + identifiers_query = select( + [ + equivalent_identifiers.c.work_id, + Identifier.identifier, + Identifier.type, + ] + ).join_from( + Identifier, + equivalent_identifiers, + Identifier.id == literal_column("equivalent_cte.equivalent_id"), ) - identifiers = list(_db.execute(identifiers_query)) + identifiers = list(session.execute(identifiers_query)) ## IDENTIFIERS END ## CLASSIFICATION START @@ -1515,16 +1511,16 @@ def to_search_documents(cls, works: list[Self]) -> Sequence[SearchDocument]: and_(Subject.type.in_(Subject.TYPES_FOR_SEARCH), term_column != None), ) .group_by(scheme_column, term_column, equivalent_identifiers.c.work_id) - .where( + .join_from( + Classification, + equivalent_identifiers, Classification.identifier_id - == literal_column("equivalent_cte.equivalent_id") - ) - .select_from( - join(Classification, Subject, Classification.subject_id == Subject.id) + == literal_column("equivalent_cte.equivalent_id"), ) + .join_from(Classification, Subject, Classification.subject_id == Subject.id) ) - all_subjects = list(_db.execute(subjects)) + all_subjects = list(session.execute(subjects)) ## CLASSIFICATION END @@ -1545,7 +1541,7 @@ def to_search_documents(cls, works: list[Self]) -> Sequence[SearchDocument]: return results @classmethod - def search_doc_as_dict(cls, doc: Self): + def search_doc_as_dict(cls, doc: Self) -> dict[str, Any]: columns = { "work": [ "fiction", @@ -1732,9 +1728,10 @@ def target_age_query(self, foreign_work_id_field): target_age = select([upper, lower]).where(Work.id == foreign_work_id_field) return target_age - def to_search_document(self): + def to_search_document(self) -> dict[str, Any]: """Generate a search document for this Work.""" - return Work.to_search_documents([self])[0] + db = Session.object_session(self) + return Work.to_search_documents(db, [self.id])[0] def mark_licensepools_as_superceded(self): """Make sure that all but the single best open-access LicensePool for diff --git a/src/palace/manager/sqlalchemy/session.py b/src/palace/manager/sqlalchemy/session.py index cc415b69ee..f1f7d6504f 100644 --- a/src/palace/manager/sqlalchemy/session.py +++ b/src/palace/manager/sqlalchemy/session.py @@ -22,7 +22,7 @@ from palace.manager.util.log import LoggerMixin from palace.manager.util.resources import resources_dir -DEBUG = False +DEBUG = True def json_encoder(obj: Any) -> Any: diff --git a/tests/manager/scripts/test_informational.py b/tests/manager/scripts/test_informational.py index 5bc219f5e1..6cc8147db6 100644 --- a/tests/manager/scripts/test_informational.py +++ b/tests/manager/scripts/test_informational.py @@ -18,6 +18,7 @@ from palace.manager.search.external_search import ExternalSearchIndex from palace.manager.sqlalchemy.model.coverage import CoverageRecord from palace.manager.sqlalchemy.model.datasource import DataSource +from palace.manager.sqlalchemy.model.work import Work from tests.fixtures.database import DatabaseTransactionFixture from tests.fixtures.search import EndToEndSearchFixture @@ -407,7 +408,7 @@ def test_search_engine( docs = search.start_migration() assert docs is not None - docs.add_documents(search.create_search_documents_from_works([work])) + docs.add_documents(Work.to_search_documents(db.session, [work.id])) docs.finish() # This search index will always claim there is one result. diff --git a/tests/manager/search/test_external_search.py b/tests/manager/search/test_external_search.py index 30002794b5..046b4c9132 100644 --- a/tests/manager/search/test_external_search.py +++ b/tests/manager/search/test_external_search.py @@ -4639,7 +4639,7 @@ def test_works_not_presentation_ready_kept_in_index( docs = index.start_updating_search_documents() failures = docs.add_documents( - index.create_search_documents_from_works([w1, w2, w3]) + Work.to_search_documents(db.session, [w1.id, w2.id, w3.id]) ) docs.finish() @@ -4661,7 +4661,7 @@ def test_works_not_presentation_ready_kept_in_index( w2.presentation_ready = False docs = index.start_updating_search_documents() failures = docs.add_documents( - index.create_search_documents_from_works([w1, w2, w3]) + Work.to_search_documents(db.session, [w1.id, w2.id, w3.id]) ) docs.finish() assert {w1.id, w2.id, w3.id} == set( @@ -4689,7 +4689,7 @@ def test_search_connection_timeout( docs = search.external_search.start_updating_search_documents() failures = docs.add_documents( - search.external_search.create_search_documents_from_works([work]) + Work.to_search_documents(transaction.session, [work.id]) ) assert 1 == len(failures) assert work.id == failures[0].id @@ -4716,7 +4716,7 @@ def test_search_single_document_error( docs = search.external_search.start_updating_search_documents() failures = docs.add_documents( - search.external_search.create_search_documents_from_works([work]) + Work.to_search_documents(transaction.session, [work.id]) ) assert 1 == len(failures) assert work.id == failures[0].id @@ -4788,7 +4788,7 @@ def test_to_search_document(self, db: DatabaseTransactionFixture): db.session.flush() - search_docs = Work.to_search_documents(works) + search_docs = Work.to_search_documents(db.session, [w.id for w in works]) search_doc_work1 = list( filter(lambda x: x["work_id"] == work1.id, search_docs) @@ -4904,18 +4904,16 @@ def test_to_search_documents_with_missing_data( # Missing edition relationship work: Work = db.work(with_license_pool=True) work.presentation_edition_id = None - [result] = Work.to_search_documents([work]) + [result] = Work.to_search_documents(db.session, [work.id]) assert result["identifiers"] is None # Missing just some attributes work = db.work(with_license_pool=True) work.presentation_edition.title = None work.target_age = None - [result] = Work.to_search_documents([work]) + [result] = Work.to_search_documents(db.session, [work.id]) assert result["title"] is None - target_age = result["target_age"] - assert isinstance(target_age, dict) - assert target_age["lower"] is None + assert result["target_age"]["lower"] is None def test_success( self, diff --git a/tests/manager/sqlalchemy/model/test_lane.py b/tests/manager/sqlalchemy/model/test_lane.py index 0fe2970649..b972bd07c0 100644 --- a/tests/manager/sqlalchemy/model/test_lane.py +++ b/tests/manager/sqlalchemy/model/test_lane.py @@ -4033,7 +4033,7 @@ def test_search( search_client = end_to_end_search_fixture.external_search_index docs = end_to_end_search_fixture.external_search_index.start_migration() assert docs is not None - docs.add_documents(search_client.create_search_documents_from_works([work])) + docs.add_documents(Work.to_search_documents(db.session, [work.id])) docs.finish() pagination = Pagination(offset=0, size=1) diff --git a/tests/mocks/search.py b/tests/mocks/search.py index 0284287d9c..d1d34a760b 100644 --- a/tests/mocks/search.py +++ b/tests/mocks/search.py @@ -1,7 +1,9 @@ from __future__ import annotations +from collections import defaultdict from collections.abc import Iterable from enum import Enum +from typing import Any from unittest.mock import MagicMock from opensearch_dsl import MultiSearch, Search @@ -10,7 +12,6 @@ from palace.manager.search.external_search import ExternalSearchIndex from palace.manager.search.revision import SearchSchemaRevision -from palace.manager.search.revision_directory import SearchRevisionDirectory from palace.manager.search.service import ( SearchService, SearchServiceFailedDocument, @@ -31,16 +32,10 @@ class SearchServiceFailureMode(Enum): class SearchServiceFake(SearchService): """A search service that doesn't speak to a real service.""" - _documents_by_index: dict[str, list[dict]] - _failing: SearchServiceFailureMode - _search_client: Search - _multi_search_client: MultiSearch - _document_submission_attempts: list[dict] - def __init__(self): self.base_name = "test_index" self._failing = SearchServiceFailureMode.NOT_FAILING - self._documents_by_index = {} + self._documents_by_index = defaultdict(list) self._read_pointer: str | None = None self._write_pointer: SearchWritePointer | None = None self._search_client = Search(using=MagicMock()) @@ -65,8 +60,6 @@ def set_failing_mode(self, mode: SearchServiceFailureMode): def documents_for_index(self, index_name: str) -> list[dict]: self._fail_if_necessary() - if not (index_name in self._documents_by_index): - return [] return self._documents_by_index[index_name] def documents_all(self) -> list[dict]: @@ -99,37 +92,28 @@ def write_pointer(self) -> SearchWritePointer | None: self._fail_if_necessary() return self._write_pointer - def create_empty_index(self) -> None: - self._fail_if_necessary() - return None - def read_pointer_set(self, revision: SearchSchemaRevision) -> None: self._fail_if_necessary() self._read_pointer = f"{revision.name_for_indexed_pointer(self.base_name)}" - def index_set_populated(self, revision: SearchSchemaRevision) -> None: - self._fail_if_necessary() - - def read_pointer_set_empty(self) -> None: - self._fail_if_necessary() - self._read_pointer = f"{self.base_name}-empty" - def index_create(self, revision: SearchSchemaRevision) -> None: self._fail_if_necessary() return None - def index_is_populated(self, revision: SearchSchemaRevision) -> bool: - self._fail_if_necessary() - return True - def index_set_mapping(self, revision: SearchSchemaRevision) -> None: self._fail_if_necessary() + def index_submit_document( + self, document: dict[str, Any], refresh: bool = False + ) -> None: + self.index_submit_documents([document]) + def index_submit_documents( - self, pointer: str, documents: Iterable[dict] + self, documents: Iterable[dict] ) -> list[SearchServiceFailedDocument]: self._fail_if_necessary() + pointer = self.write_pointer_name() _should_fail = False _should_fail = ( _should_fail @@ -162,9 +146,6 @@ def index_submit_documents( return results - if not (pointer in self._documents_by_index): - self._documents_by_index[pointer] = [] - for document in documents: self._documents_by_index[pointer].append(document) @@ -174,10 +155,10 @@ def write_pointer_set(self, revision: SearchSchemaRevision) -> None: self._fail_if_necessary() self._write_pointer = SearchWritePointer(self.base_name, revision.version) - def index_clear_documents(self, pointer: str): + def index_clear_documents(self): self._fail_if_necessary() - if pointer in self._documents_by_index: - self._documents_by_index[pointer] = [] + pointer = self.write_pointer_name() + self._documents_by_index[pointer].clear() def search_client(self, write=False) -> Search: return self._search_client.index( @@ -189,19 +170,16 @@ def search_multi_client(self, write=False) -> MultiSearch: self.read_pointer_name() if not write else self.write_pointer_name() ) - def index_remove_document(self, pointer: str, id: int): + def index_remove_document(self, doc_id: int): self._fail_if_necessary() - if pointer in self._documents_by_index: - items = self._documents_by_index[pointer] - to_remove = [] - for item in items: - if item.get("_id") == id: - to_remove.append(item) - for item in to_remove: - items.remove(item) - - def is_pointer_empty(*args): - return False + pointer = self.write_pointer_name() + items = self._documents_by_index[pointer] + to_remove = [] + for item in items: + if item.get("_id") == doc_id: + to_remove.append(item) + for item in to_remove: + items.remove(item) def fake_hits(works: list[Work]): @@ -223,14 +201,9 @@ class ExternalSearchIndexFake(ExternalSearchIndex): def __init__( self, - revision_directory: SearchRevisionDirectory | None = None, - version: int | None = None, ): - revision_directory = revision_directory or SearchRevisionDirectory.create() super().__init__( service=SearchServiceFake(), - revision_directory=revision_directory, - version=version, ) self._mock_multi_works: list[dict] = []