Skip to content

Commit

Permalink
Update search indexing to use celery rather then coverage records (PP…
Browse files Browse the repository at this point in the history
…-1225) (#1849)

* Index search documents via celery
  • Loading branch information
jonathangreen authored May 22, 2024
1 parent 3472332 commit 5be6f9a
Show file tree
Hide file tree
Showing 41 changed files with 1,718 additions and 1,783 deletions.
10 changes: 0 additions & 10 deletions bin/search_index_clear

This file was deleted.

9 changes: 0 additions & 9 deletions bin/search_index_refresh

This file was deleted.

2 changes: 0 additions & 2 deletions docker/services/cron/cron.d/circulation
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ HOME=/var/www/circulation

# These scripts update internal caches.
#
*/30 * * * * root bin/run -d 15 search_index_refresh >> /var/log/cron.log 2>&1
10 0 * * * root bin/run search_index_clear >> /var/log/cron.log 2>&1
0 0 * * * root bin/run update_custom_list_size >> /var/log/cron.log 2>&1
0 2 * * * root bin/run update_lane_size >> /var/log/cron.log 2>&1
*/30 * * * * root bin/run -d 5 equivalent_identifiers_refresh >> /var/log/cron.log 2>&1
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ module = [
"palace.manager.core.selftest",
"palace.manager.feed.*",
"palace.manager.integration.*",
"palace.manager.scripts.initialization",
"palace.manager.scripts.rotate_jwe_key",
"palace.manager.scripts.search",
"palace.manager.search.document",
"palace.manager.search.migrator",
"palace.manager.search.revision",
Expand Down
11 changes: 6 additions & 5 deletions src/palace/manager/api/admin/controller/custom_lists.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,12 +289,13 @@ def _create_or_update_list(
if membership_change:
# We need to update the search index entries for works that caused a membership change,
# so the upstream counts can be calculated correctly.
documents = self.search_engine.create_search_documents_from_works(
works_to_update_in_search
documents = Work.to_search_documents(
self._db,
[w.id for w in works_to_update_in_search if w.id is not None],
)
index = self.search_engine.start_updating_search_documents()
index.add_documents(documents)
index.finish()
# TODO: Does this need to be done here, or can this be done asynchronously?
self.search_engine.add_documents(documents)
self.search_engine.search_service().refresh()

# If this list was used to populate any lanes, those lanes need to have their counts updated.
for lane in Lane.affected_by_customlist(list):
Expand Down
150 changes: 150 additions & 0 deletions src/palace/manager/celery/tasks/search.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
from __future__ import annotations

from collections.abc import Sequence
from typing import Any

from celery import chain, shared_task
from opensearchpy import OpenSearchException
from sqlalchemy import select
from sqlalchemy.orm import Session

from palace.manager.celery.task import Task
from palace.manager.core.exceptions import BasePalaceException
from palace.manager.service.celery.celery import QueueNames
from palace.manager.sqlalchemy.model.work import Work
from palace.manager.util.backoff import exponential_backoff
from palace.manager.util.log import elapsed_time_logging


def get_work_search_documents(
session: Session, batch_size: int, offset: int
) -> Sequence[dict[str, Any]]:
"""
Get a batch of search documents for works that are presentation ready.
"""
works = [
w.id
for w in session.execute(
select(Work.id)
.where(Work.presentation_ready == True)
.order_by(Work.id)
.limit(batch_size)
.offset(offset)
)
]
return Work.to_search_documents(session, works)


class FailedToIndex(BasePalaceException):
...


@shared_task(queue=QueueNames.default, bind=True, max_retries=4)
def search_reindex(task: Task, offset: int = 0, batch_size: int = 500) -> None:
"""
Submit all works that are presentation ready to the search index.
This is done in batches, with the batch size determined by the batch_size parameter. This
task will do a batch, then requeue itself until all works have been indexed.
"""
index = task.services.search.index()

task.log.info(
f"Running search reindex at offset {offset} with batch size {batch_size}."
)

with (
task.session() as session,
elapsed_time_logging(
log_method=task.log.info,
message_prefix="Works queried from database",
skip_start=True,
),
):
documents = get_work_search_documents(session, batch_size, offset)

try:
with elapsed_time_logging(
log_method=task.log.info,
message_prefix="Works added to index",
skip_start=True,
):
failed_documents = index.add_documents(documents=documents)
if failed_documents:
raise FailedToIndex(f"Failed to index {len(failed_documents)} works.")
except (FailedToIndex, OpenSearchException) as e:
wait_time = exponential_backoff(task.request.retries)
task.log.error(f"{e}. Retrying in {wait_time} seconds.")
raise task.retry(countdown=wait_time)

if len(documents) == batch_size:
# This task is complete, but there are more works waiting to be indexed. Requeue ourselves
# to process the next batch.
raise task.replace(
search_reindex.s(offset=offset + batch_size, batch_size=batch_size)
)

task.log.info("Finished search reindex.")


@shared_task(queue=QueueNames.default, bind=True, max_retries=4)
def update_read_pointer(task: Task) -> None:
"""
Update the read pointer to the latest revision.
This is used to indicate that the search index has been updated to a specific version. We
chain this task with search_reindex when doing a migration to ensure that the read pointer is
updated after all works have been indexed. See get_migrate_search_chain.
"""
task.log.info("Updating read pointer.")
service = task.services.search.service()
revision_directory = task.services.search.revision_directory()
revision = revision_directory.highest()
try:
service.read_pointer_set(revision)
except OpenSearchException as e:
wait_time = exponential_backoff(task.request.retries)
task.log.error(
f"Failed to update read pointer: {e}. Retrying in {wait_time} seconds."
)
raise task.retry(countdown=wait_time)
task.log.info(
f"Updated read pointer ({service.base_revision_name} v{revision.version})."
)


@shared_task(queue=QueueNames.default, bind=True, max_retries=4)
def index_work(task: Task, work_id: int) -> None:
"""
Index a single work into the search index.
"""
index = task.services.search.index()
with task.session() as session:
documents = Work.to_search_documents(session, [work_id])

if not documents:
# We were unable to find the work. It could have been deleted or maybe the transaction
# hasn't been committed yet. We'll wait a bit and try again.
wait_time = exponential_backoff(task.request.retries)
task.log.warning(
f"Work {work_id} not found. Unable to index. Retrying in {wait_time} seconds."
)
raise task.retry(countdown=wait_time)

try:
index.add_document(document=documents[0])
except OpenSearchException as e:
wait_time = exponential_backoff(task.request.retries)
task.log.error(
f"Failed to index work {work_id}: {e}. Retrying in {wait_time} seconds."
)
raise task.retry(countdown=wait_time)

task.log.info(f"Indexed work {work_id}.")


def get_migrate_search_chain() -> chain:
"""
Get the chain of tasks to run when migrating the search index to a new schema.
"""
return chain(search_reindex.si(), update_read_pointer.si())
90 changes: 81 additions & 9 deletions src/palace/manager/scripts/initialization.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from argparse import ArgumentParser
from collections.abc import Callable
from pathlib import Path

Expand All @@ -9,6 +10,9 @@
from sqlalchemy.engine import Connection, Engine
from sqlalchemy.orm import Session

from palace.manager.celery.tasks.search import get_migrate_search_chain
from palace.manager.search.revision import SearchSchemaRevision
from palace.manager.search.service import SearchService
from palace.manager.service.container import container_instance
from palace.manager.sqlalchemy.session import SessionManager
from palace.manager.sqlalchemy.util import LOCK_ID_DB_INIT, pg_advisory_lock
Expand Down Expand Up @@ -56,7 +60,7 @@ def migrate_database(self, connection: Connection) -> None:
alembic_conf = self._get_alembic_config(connection, self._config_file)
command.upgrade(alembic_conf, "head")

def initialize_database(self, connection: Connection) -> None:
def initialize_database_schema(self, connection: Connection) -> None:
"""
Initialize the database, creating tables, loading default data and then
stamping the most recent migration as the current state of the DB.
Expand All @@ -71,11 +75,7 @@ def initialize_database(self, connection: Connection) -> None:
alembic_conf = self._get_alembic_config(connection, self._config_file)
command.stamp(alembic_conf, "head")

def initialize_search_indexes(self) -> bool:
search = self._container.search.index()
return search.initialize_indices()

def initialize(self, connection: Connection):
def initialize_database(self, connection: Connection) -> None:
"""Initialize the database if necessary."""
inspector = inspect(connection)
if inspector.has_table("alembic_version"):
Expand All @@ -91,10 +91,73 @@ def initialize(self, connection: Connection):
)
else:
self.log.info("Database schema does not exist. Initializing.")
self.initialize_database(connection)
self.initialize_database_schema(connection)
self.log.info("Initialization complete.")

self.initialize_search_indexes()
@classmethod
def create_search_index(
cls, service: SearchService, revision: SearchSchemaRevision
) -> None:
# Initialize a new search index by creating the index, setting the mapping,
# and setting the read and write pointers.
service.index_create(revision)
service.index_set_mapping(revision)
service.write_pointer_set(revision)

@classmethod
def migrate_search(
cls,
service: SearchService,
revision: SearchSchemaRevision,
) -> None:
# The revision is not the most recent. We need to create a new index.
# and start reindexing our data into it asynchronously. When the reindex
# is complete, we will switch the read pointer to the new index.
cls.logger().info(f"Creating a new index for revision (v{revision.version}).")
cls.create_search_index(service, revision)
task = get_migrate_search_chain().apply_async()
cls.logger().info(
f"Task queued to index data into new search index (Task ID: {task.id})."
)

def initialize_search(self) -> None:
service = self._container.search.service()
revision_directory = self._container.search.revision_directory()
revision = revision_directory.highest()
write_pointer = service.write_pointer()
read_pointer = service.read_pointer()

if write_pointer is None or read_pointer is None:
# Pointers do not exist. This is a fresh index.
self.log.info("Search index does not exist. Creating a new index.")
self.create_search_index(service, revision)
service.read_pointer_set(revision)
elif write_pointer.version < revision.version:
self.log.info(
f"Search index is out-of-date ({service.base_revision_name} v{write_pointer.version})."
)
self.migrate_search(service, revision)
elif read_pointer.version < revision.version:
self.log.info(
f"Search read pointer is out-of-date (v{read_pointer.version}). Latest is v{revision.version}."
f"This likely means that the reindexing task is in progress. If there is no reindexing task "
f"running, you may need to repair the search index."
)
elif (
read_pointer.version > revision.version
or write_pointer.version > revision.version
):
self.log.error(
f"Search index is in an inconsistent state. Read pointer: v{read_pointer.version}, "
f"Write pointer: v{write_pointer.version}, Latest revision: v{revision.version}. "
f"You may be running an old version of the application against a new search index. "
)
return
else:
self.log.info(
f"Search index is up-to-date ({service.base_revision_name} v{revision.version})."
)
self.log.info("Search initialization complete.")

def run(self) -> None:
"""
Expand All @@ -105,9 +168,18 @@ def run(self) -> None:
instance of the script is running at a time. This prevents multiple
instances from trying to initialize the database at the same time.
"""

# This script doesn't take any arguments, but we still call argparse, so that
# we can use the --help option to print out a help message. This avoids the
# surprise of the script actually running when the user just wanted to see the help.
ArgumentParser(
description="Initialize the database and search index for the Palace Manager."
).parse_args()

engine = self._engine_factory()
with engine.begin() as connection:
with pg_advisory_lock(connection, LOCK_ID_DB_INIT):
self.initialize(connection)
self.initialize_database(connection)
self.initialize_search()

engine.dispose()
Loading

0 comments on commit 5be6f9a

Please sign in to comment.