Skip to content

Commit

Permalink
Start indexing search documents via celery
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathangreen committed May 14, 2024
1 parent b0518de commit 369d8c0
Show file tree
Hide file tree
Showing 17 changed files with 272 additions and 473 deletions.
9 changes: 0 additions & 9 deletions bin/search_index_refresh

This file was deleted.

1 change: 0 additions & 1 deletion docker/services/cron/cron.d/circulation
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ HOME=/var/www/circulation

# These scripts update internal caches.
#
*/30 * * * * root bin/run -d 15 search_index_refresh >> /var/log/cron.log 2>&1
10 0 * * * root bin/run search_index_clear >> /var/log/cron.log 2>&1
0 0 * * * root bin/run update_custom_list_size >> /var/log/cron.log 2>&1
0 2 * * * root bin/run update_lane_size >> /var/log/cron.log 2>&1
Expand Down
5 changes: 3 additions & 2 deletions src/palace/manager/api/admin/controller/custom_lists.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,9 @@ def _create_or_update_list(
if membership_change:
# We need to update the search index entries for works that caused a membership change,
# so the upstream counts can be calculated correctly.
documents = self.search_engine.create_search_documents_from_works(
works_to_update_in_search
documents = Work.to_search_documents(
self._db,
[w.id for w in works_to_update_in_search],
)
index = self.search_engine.start_updating_search_documents()
index.add_documents(documents)
Expand Down
93 changes: 93 additions & 0 deletions src/palace/manager/celery/tasks/search.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from random import randrange

from celery import chain, shared_task
from sqlalchemy import select

from palace.manager.celery.task import Task
from palace.manager.service.celery.celery import QueueNames
from palace.manager.sqlalchemy.model.work import Work
from palace.manager.util.log import elapsed_time_logging


def exponential_backoff(retries: int) -> int:
return 3**retries + randrange(0, 3)


@shared_task(queue=QueueNames.default, bind=True, max_retries=5)
def search_reindex(task: Task, offset: int = 0, batch_size: int = 500) -> None:
index = task.services.search.index()

task.log.info(
f"Running search reindex at offset {offset} with batch size {batch_size}."
)

with (
task.session() as session,
elapsed_time_logging(
log_method=task.log.info,
message_prefix="Works queried from database",
skip_start=True,
),
):
works = [
w.id
for w in session.execute(
select(Work.id)
.where(Work.presentation_ready == True)
.order_by(Work.id)
.limit(batch_size)
.offset(offset)
)
]
documents = Work.to_search_documents(session, works)

with elapsed_time_logging(
log_method=task.log.info, message_prefix="Works added to index", skip_start=True
):
failed_documents = index.add_documents(documents=documents)
if failed_documents:
wait_time = exponential_backoff(task.request.retries)
task.log.error(
f"Failed to index {len(failed_documents)} works. Retrying in {wait_time} seconds."
)
raise task.retry(countdown=wait_time)

if len(works) == batch_size:
# This task is complete, but there are more works waiting to be indexed. Requeue ourselves
# to process the next batch.
raise task.replace(
search_reindex.s(offset=offset + batch_size, batch_size=batch_size)
)

task.log.info("Finished search reindex.")


@shared_task(queue=QueueNames.default, bind=True)
def update_read_pointer(task: Task) -> None:
task.log.info("Updating read pointer.")
service = task.services.search.service()
revision_directory = task.services.search.revision_directory()
revision = revision_directory.highest()
service.read_pointer_set(revision)
task.log.info(
f"Updated read pointer ({service.base_revision_name()} v{revision.version})."
)


@shared_task(queue=QueueNames.default, bind=True)
def index_work(task: Task, work_id: int) -> None:
index = task.services.search.index()
with task.session() as session:
[document] = Work.to_search_documents(session, [work_id])
error = index.add_document(document=document)
if error:
wait_time = exponential_backoff(task.request.retries)
task.log.error(
f"Failed to index work {work_id}: {error}. Retrying in {wait_time} seconds."
)
raise task.retry(countdown=wait_time)

task.log.info(f"Indexed work {work_id}.")


do_migration = chain(search_reindex.si(), update_read_pointer.si())
59 changes: 51 additions & 8 deletions src/palace/manager/scripts/initialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
from sqlalchemy.engine import Connection, Engine
from sqlalchemy.orm import Session

from palace.manager.celery.tasks.search import do_migration
from palace.manager.search.revision import SearchSchemaRevision
from palace.manager.search.service import SearchService
from palace.manager.service.container import container_instance
from palace.manager.sqlalchemy.session import SessionManager
from palace.manager.sqlalchemy.util import LOCK_ID_DB_INIT, pg_advisory_lock
Expand Down Expand Up @@ -56,7 +59,7 @@ def migrate_database(self, connection: Connection) -> None:
alembic_conf = self._get_alembic_config(connection, self._config_file)
command.upgrade(alembic_conf, "head")

def initialize_database(self, connection: Connection) -> None:
def initialize_database_schema(self, connection: Connection) -> None:
"""
Initialize the database, creating tables, loading default data and then
stamping the most recent migration as the current state of the DB.
Expand All @@ -71,11 +74,7 @@ def initialize_database(self, connection: Connection) -> None:
alembic_conf = self._get_alembic_config(connection, self._config_file)
command.stamp(alembic_conf, "head")

def initialize_search_indexes(self) -> bool:
search = self._container.search.index()
return search.initialize_indices()

def initialize(self, connection: Connection):
def initialize_database(self, connection: Connection) -> None:
"""Initialize the database if necessary."""
inspector = inspect(connection)
if inspector.has_table("alembic_version"):
Expand All @@ -91,10 +90,54 @@ def initialize(self, connection: Connection):
)
else:
self.log.info("Database schema does not exist. Initializing.")
self.initialize_database(connection)
self.initialize_database_schema(connection)
self.log.info("Initialization complete.")

self.initialize_search_indexes()
def create_search_index(
self, service: SearchService, revision: SearchSchemaRevision
) -> None:
# Initialize a new search index by creating the index, setting the mapping,
# and setting the read and write pointers.
service.index_create(revision)
service.index_set_mapping(revision)
service.write_pointer_set(revision)

if not service.read_pointer():
# A read pointer does not exist. We set it to the most recent.
service.read_pointer_set(revision)

def initialize_search(self) -> None:
service = self._container.search.service()
revision_directory = self._container.search.revision_directory()
revision = revision_directory.highest()

if not service.write_pointer():
# A write pointer does not exist. This is a fresh index.
self.log.info("Search index does not exist. Creating a new index.")
return self.create_search_index(service, revision)

# The index already exists. We need to check if the revision is the most recent.
write_pointer = service.write_pointer()
if write_pointer.version != revision.version:
# The revision is not the most recent. We need to create a new index.
# and start reindexing our data into it asynchronously. When the reindex
# is complete, we will switch the read pointer to the new index.
self.log.info(
f"Search index is out-of-date ({service.base_revision_name()} v{write_pointer.version})."
f"Creating a new index for revision {revision.version}."
)
self.create_search_index(service, revision)
task = do_migration.apply_async()
self.log.info(f"Reindexing data into new search index. Task ID: {task.id}")
else:
self.log.info(
f"Search index is up-to-date ({service.base_revision_name()} v{revision.version})."
)

def initialize(self, connection: Connection):
"""Initialize database and search index."""
self.initialize_database(connection)
self.initialize_search()

def run(self) -> None:
"""
Expand Down
7 changes: 5 additions & 2 deletions src/palace/manager/scripts/search.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
from palace.manager.core.metadata_layer import TimestampData
from palace.manager.scripts.base import Script
from palace.manager.scripts.coverage_provider import RunWorkCoverageProviderScript
from palace.manager.scripts.timestamp import TimestampScript
from palace.manager.search.coverage_provider import SearchIndexCoverageProvider
from palace.manager.search.coverage_remover import RemovesSearchCoverage
from palace.manager.search.external_search import ExternalSearchIndex


class RebuildSearchIndexScript(RunWorkCoverageProviderScript, RemovesSearchCoverage):
class RebuildSearchIndexScript(Script):
"""Completely delete the search index and recreate it."""

# TODO: This still needs unfucking

def __init__(self, *args, **kwargs):
search = kwargs.pop("search_index_client", None)
super().__init__(SearchIndexCoverageProvider, *args, **kwargs)
super().__init__(*args, **kwargs)
self.search: ExternalSearchIndex = search or self.services.search.index()

def do_run(self):
Expand Down
5 changes: 2 additions & 3 deletions src/palace/manager/search/coverage_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,8 @@ def run_once_and_update_timestamp(self):

def process_batch(self, works) -> list[Work | CoverageFailure]:
target: SearchDocumentReceiverType = self.migration or self.receiver
failures = target.add_documents(
documents=self.search_index_client.create_search_documents_from_works(works)
)
documents = Work.to_search_documents(self._db, [w.id for w in works])
failures = target.add_documents(documents=documents)

# Maintain a dictionary of works so that we can efficiently remove failed works later.
work_map: dict[int, Work] = {}
Expand Down
Loading

0 comments on commit 369d8c0

Please sign in to comment.