-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update search indexing to use celery rather then coverage records (PP-1225) #1849
Merged
Merged
Changes from 8 commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
23560ca
Index search documents via celery
jonathangreen 954da3c
Make both read_pointer and write_pointer return same thing
jonathangreen 13be038
Move backoff function
jonathangreen a5db25d
Add a couple more tests
jonathangreen 28a13c2
Add some comments
jonathangreen e4034b4
Add one more test case.
jonathangreen 858c4c2
Add argparse to avoid suprises.
jonathangreen c20c9db
Rename methods
jonathangreen b8076bc
Make sure ID is set.
jonathangreen e92d58b
Some changes to retries after some local testing.
jonathangreen fce586e
Merge branch 'main' into feature/search-indexing-celery
jonathangreen 8779d75
Code review feedback
jonathangreen File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
from __future__ import annotations | ||
|
||
from collections.abc import Sequence | ||
from typing import Any | ||
|
||
from celery import chain, shared_task | ||
from opensearchpy import OpenSearchException | ||
from sqlalchemy import select | ||
from sqlalchemy.orm import Session | ||
|
||
from palace.manager.celery.task import Task | ||
from palace.manager.core.exceptions import BasePalaceException | ||
from palace.manager.service.celery.celery import QueueNames | ||
from palace.manager.sqlalchemy.model.work import Work | ||
from palace.manager.util.backoff import exponential_backoff | ||
from palace.manager.util.log import elapsed_time_logging | ||
|
||
|
||
def get_work_search_documents( | ||
session: Session, batch_size: int, offset: int | ||
) -> Sequence[dict[str, Any]]: | ||
""" | ||
Get a batch of search documents for works that are presentation ready. | ||
""" | ||
works = [ | ||
w.id | ||
for w in session.execute( | ||
select(Work.id) | ||
.where(Work.presentation_ready == True) | ||
.order_by(Work.id) | ||
.limit(batch_size) | ||
.offset(offset) | ||
) | ||
] | ||
return Work.to_search_documents(session, works) | ||
|
||
|
||
class FailedToIndex(BasePalaceException): | ||
... | ||
|
||
|
||
@shared_task(queue=QueueNames.default, bind=True, max_retries=4) | ||
def search_reindex(task: Task, offset: int = 0, batch_size: int = 500) -> None: | ||
""" | ||
Submit all works that are presentation ready to the search index. | ||
|
||
This is done in batches, with the batch size determined by the batch_size parameter. This | ||
task will do a batch, then requeue itself until all works have been indexed. | ||
""" | ||
index = task.services.search.index() | ||
|
||
task.log.info( | ||
f"Running search reindex at offset {offset} with batch size {batch_size}." | ||
) | ||
|
||
with ( | ||
task.session() as session, | ||
elapsed_time_logging( | ||
log_method=task.log.info, | ||
message_prefix="Works queried from database", | ||
skip_start=True, | ||
), | ||
): | ||
documents = get_work_search_documents(session, batch_size, offset) | ||
|
||
try: | ||
with elapsed_time_logging( | ||
log_method=task.log.info, | ||
message_prefix="Works added to index", | ||
skip_start=True, | ||
): | ||
failed_documents = index.add_documents(documents=documents) | ||
if failed_documents: | ||
raise FailedToIndex(f"Failed to index {len(failed_documents)} works.") | ||
except (FailedToIndex, OpenSearchException) as e: | ||
wait_time = exponential_backoff(task.request.retries) | ||
task.log.error(f"{e}. Retrying in {wait_time} seconds.") | ||
raise task.retry(countdown=wait_time) | ||
|
||
if len(documents) == batch_size: | ||
# This task is complete, but there are more works waiting to be indexed. Requeue ourselves | ||
# to process the next batch. | ||
raise task.replace( | ||
search_reindex.s(offset=offset + batch_size, batch_size=batch_size) | ||
) | ||
|
||
task.log.info("Finished search reindex.") | ||
|
||
|
||
@shared_task(queue=QueueNames.default, bind=True, max_retries=4) | ||
def update_read_pointer(task: Task) -> None: | ||
""" | ||
Update the read pointer to the latest revision. | ||
|
||
This is used to indicate that the search index has been updated to a specific version. We | ||
chain this task with search_reindex when doing a migration to ensure that the read pointer is | ||
updated after all works have been indexed. See get_migrate_search_chain. | ||
""" | ||
task.log.info("Updating read pointer.") | ||
service = task.services.search.service() | ||
revision_directory = task.services.search.revision_directory() | ||
revision = revision_directory.highest() | ||
try: | ||
service.read_pointer_set(revision) | ||
except OpenSearchException as e: | ||
wait_time = exponential_backoff(task.request.retries) | ||
task.log.error( | ||
f"Failed to update read pointer: {e}. Retrying in {wait_time} seconds." | ||
) | ||
raise task.retry(countdown=wait_time) | ||
task.log.info( | ||
f"Updated read pointer ({service.base_revision_name} v{revision.version})." | ||
) | ||
|
||
|
||
@shared_task(queue=QueueNames.default, bind=True, max_retries=4) | ||
def index_work(task: Task, work_id: int) -> None: | ||
""" | ||
Index a single work into the search index. | ||
""" | ||
index = task.services.search.index() | ||
with task.session() as session: | ||
documents = Work.to_search_documents(session, [work_id]) | ||
|
||
if not documents: | ||
task.log.warning(f"Work {work_id} not found. Unable to index.") | ||
return | ||
|
||
try: | ||
index.add_document(document=documents[0]) | ||
except OpenSearchException as e: | ||
wait_time = exponential_backoff(task.request.retries) | ||
task.log.error( | ||
f"Failed to index work {work_id}: {e}. Retrying in {wait_time} seconds." | ||
) | ||
raise task.retry(countdown=wait_time) | ||
|
||
task.log.info(f"Indexed work {work_id}.") | ||
|
||
|
||
def get_migrate_search_chain() -> chain: | ||
""" | ||
Get the chain of tasks to run when migrating the search index to a new schema. | ||
""" | ||
return chain(search_reindex.si(), update_read_pointer.si()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
from __future__ import annotations | ||
|
||
from argparse import ArgumentParser | ||
from collections.abc import Callable | ||
from pathlib import Path | ||
|
||
|
@@ -9,6 +10,9 @@ | |
from sqlalchemy.engine import Connection, Engine | ||
from sqlalchemy.orm import Session | ||
|
||
from palace.manager.celery.tasks.search import get_migrate_search_chain | ||
from palace.manager.search.revision import SearchSchemaRevision | ||
from palace.manager.search.service import SearchService | ||
from palace.manager.service.container import container_instance | ||
from palace.manager.sqlalchemy.session import SessionManager | ||
from palace.manager.sqlalchemy.util import LOCK_ID_DB_INIT, pg_advisory_lock | ||
|
@@ -56,7 +60,7 @@ def migrate_database(self, connection: Connection) -> None: | |
alembic_conf = self._get_alembic_config(connection, self._config_file) | ||
command.upgrade(alembic_conf, "head") | ||
|
||
def initialize_database(self, connection: Connection) -> None: | ||
def initialize_database_schema(self, connection: Connection) -> None: | ||
""" | ||
Initialize the database, creating tables, loading default data and then | ||
stamping the most recent migration as the current state of the DB. | ||
|
@@ -71,11 +75,7 @@ def initialize_database(self, connection: Connection) -> None: | |
alembic_conf = self._get_alembic_config(connection, self._config_file) | ||
command.stamp(alembic_conf, "head") | ||
|
||
def initialize_search_indexes(self) -> bool: | ||
search = self._container.search.index() | ||
return search.initialize_indices() | ||
|
||
def initialize(self, connection: Connection): | ||
def initialize_database(self, connection: Connection) -> None: | ||
"""Initialize the database if necessary.""" | ||
inspector = inspect(connection) | ||
if inspector.has_table("alembic_version"): | ||
|
@@ -91,10 +91,73 @@ def initialize(self, connection: Connection): | |
) | ||
else: | ||
self.log.info("Database schema does not exist. Initializing.") | ||
self.initialize_database(connection) | ||
self.initialize_database_schema(connection) | ||
self.log.info("Initialization complete.") | ||
|
||
self.initialize_search_indexes() | ||
@classmethod | ||
def create_search_index( | ||
cls, service: SearchService, revision: SearchSchemaRevision | ||
) -> None: | ||
# Initialize a new search index by creating the index, setting the mapping, | ||
# and setting the read and write pointers. | ||
service.index_create(revision) | ||
service.index_set_mapping(revision) | ||
service.write_pointer_set(revision) | ||
|
||
@classmethod | ||
def migrate_search( | ||
cls, | ||
service: SearchService, | ||
revision: SearchSchemaRevision, | ||
) -> None: | ||
# The revision is not the most recent. We need to create a new index. | ||
# and start reindexing our data into it asynchronously. When the reindex | ||
# is complete, we will switch the read pointer to the new index. | ||
cls.logger().info(f"Creating a new index for revision (v{revision.version}).") | ||
cls.create_search_index(service, revision) | ||
task = get_migrate_search_chain().apply_async() | ||
cls.logger().info( | ||
f"Task queued to indexing data into new search index (Task ID: {task.id})." | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor: grammar - suggest either:
|
||
) | ||
|
||
def initialize_search(self) -> None: | ||
service = self._container.search.service() | ||
revision_directory = self._container.search.revision_directory() | ||
revision = revision_directory.highest() | ||
write_pointer = service.write_pointer() | ||
read_pointer = service.read_pointer() | ||
|
||
if write_pointer is None or read_pointer is None: | ||
# Pointers do not exist. This is a fresh index. | ||
self.log.info("Search index does not exist. Creating a new index.") | ||
self.create_search_index(service, revision) | ||
service.read_pointer_set(revision) | ||
elif write_pointer.version < revision.version: | ||
self.log.info( | ||
f"Search index is out-of-date ({service.base_revision_name} v{write_pointer.version})." | ||
) | ||
self.migrate_search(service, revision) | ||
elif read_pointer.version < revision.version: | ||
self.log.info( | ||
f"Search read pointer is out-of-date (v{read_pointer.version}). Latest is v{revision.version}." | ||
f"This likely means that the reindexing task is in progress. If there is no reindexing task " | ||
f"running, you may need to repair the search index." | ||
) | ||
elif ( | ||
read_pointer.version > revision.version | ||
or write_pointer.version > revision.version | ||
): | ||
self.log.error( | ||
f"Search index is in an inconsistent state. Read pointer: v{read_pointer.version}, " | ||
f"Write pointer: v{write_pointer.version}, Latest revision: v{revision.version}. " | ||
f"You may be running an old version of the application against a new search index. " | ||
) | ||
return | ||
else: | ||
self.log.info( | ||
f"Search index is up-to-date ({service.base_revision_name} v{revision.version})." | ||
) | ||
self.log.info("Search initialization complete.") | ||
|
||
def run(self) -> None: | ||
""" | ||
|
@@ -105,9 +168,18 @@ def run(self) -> None: | |
instance of the script is running at a time. This prevents multiple | ||
instances from trying to initialize the database at the same time. | ||
""" | ||
|
||
# This script doesn't take any arguments, but we still call argparse, so that | ||
# we can use the --help option to print out a help message. This avoids the | ||
# surprise of the script actually running when the user just wanted to see the help. | ||
Comment on lines
+172
to
+174
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💯 |
||
ArgumentParser( | ||
description="Initialize the database and search index for the Palace Manager." | ||
).parse_args() | ||
|
||
engine = self._engine_factory() | ||
with engine.begin() as connection: | ||
with pg_advisory_lock(connection, LOCK_ID_DB_INIT): | ||
self.initialize(connection) | ||
self.initialize_database(connection) | ||
self.initialize_search() | ||
|
||
engine.dispose() |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This script now has all the responsibility for initializing and migrating our database and search.