-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update search indexing to use celery rather then coverage records (PP-1225) #1849
Conversation
02ba424
to
369d8c0
Compare
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #1849 +/- ##
==========================================
+ Coverage 90.10% 90.11% +0.01%
==========================================
Files 325 324 -1
Lines 39640 39578 -62
Branches 8591 8595 +4
==========================================
- Hits 35716 35665 -51
+ Misses 2602 2599 -3
+ Partials 1322 1314 -8 ☔ View full report in Codecov by Sentry. |
d844845
to
23560ca
Compare
@@ -1,5 +1,6 @@ | |||
from __future__ import annotations |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This script now has all the responsibility for initializing and migrating our database and search.
|
||
# Get references to the read and write pointers. | ||
self._search_read_pointer = self._search_service.read_pointer_name() | ||
self._search_write_pointer = self._search_service.write_pointer_name() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously the responsibility for making sure operations were done against the correct search alias was split between this class and the search service. This has now entirely been moved into the search service, so this class doesn't have to know anything about the aliases we are using.
@@ -253,54 +214,25 @@ def count_works(self, filter): | |||
) | |||
return qu.count() | |||
|
|||
def create_search_documents_from_works( | |||
self, works: Iterable[Work] | |||
) -> Sequence[SearchDocument]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function didn't really add anything to Work.to_search_documents
other then doing some timing of how long that function took, so it was removed, and uses were updated to call Work.to_search_documents
directly.
"""The 'write' pointer; the pointer that will be used to populate an index with search documents.""" | ||
@dataclass(frozen=True) | ||
class SearchPointer: | ||
"""A search pointer, which is an alias that points to a specific index.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously calls to write_pointer
would return a SearchWritePointer
and calls to read_pointer
would return a str
. This creates a new SearchPointer
that both methods return.
@abstractmethod | ||
def index_set_populated(self, revision: SearchSchemaRevision) -> None: | ||
"""Set an index as populated.""" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We used to have empty
and populated
aliases for an index. Now that all our initialization is centralized in the instance_initialization
script, and the actions in that script are protected by a postgres lock, so we are sure only one instance is doing the initialization at a time, we don't need these extra alias.
).join_from( | ||
Identifier, | ||
equivalent_identifiers, | ||
Identifier.id == literal_column("equivalent_cte.equivalent_id"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This query is equivalent to what we had before, but we are using a join condition rather then a where clause to limit the results. This removes a warning that sql alchemy was emitting every time we did this query because it thought we were doing a join without a condition on it.
) | ||
.select_from( | ||
join(Classification, Subject, Classification.subject_id == Subject.id) | ||
== literal_column("equivalent_cte.equivalent_id"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same thing as the identifiers_query we use a join condition to stop a sqlalchemy warning.
@@ -88,7 +88,7 @@ def services_search_fixture() -> ServicesSearchFixture: | |||
search_container = Search() | |||
client_mock = create_autospec(boto3.client) | |||
service_mock = create_autospec(SearchServiceOpensearch1) | |||
revision_directory_mock = create_autospec(SearchRevisionDirectory.create) | |||
revision_directory_mock = create_autospec(SearchRevisionDirectory) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This spec was wrong, we need a spec for SearchRevisionDirectory
not the create
method. This caused me issues when I tried to actually use the mock.
|
||
def test_to_search_document(self, db: DatabaseTransactionFixture): | ||
"""Test the output of the to_search_document method.""" | ||
customlist, editions = db.customlist() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This wasn't really testing the Coverage Provider, so this test was moved into the Work tests.
def test_to_search_documents_with_missing_data( | ||
self, db: DatabaseTransactionFixture | ||
): | ||
# Missing edition relationship |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was also moved into works tests
I'm still doing some final local testing on this one, but I think this is ready for review whenever anyone has the cycles to take a look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great! 🎸🤘🏽
Just a few small comments. I'm gonna go ahead and approve.
cls.create_search_index(service, revision) | ||
task = get_migrate_search_chain().apply_async() | ||
cls.logger().info( | ||
f"Task queued to indexing data into new search index (Task ID: {task.id})." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: grammar - suggest either:
- "Task queued to index data..." or
- "Task queued for indexing data..."
# This script doesn't take any arguments, but we still call argparse, so that | ||
# we can use the --help option to print out a help message. This avoids the | ||
# surprise of the script actually running when the user just wanted to see the help. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯
src/palace/manager/util/backoff.py
Outdated
:param retries: The number of retries that have already been attempted. | ||
:return: The number of seconds to wait before the next retry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have a limit on the level of backoff?
limit = 6 # e.g.
backoff: int = 3 ** (min(retries, limit) + 1)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the instances we use this function right now, this is limited by the max retries of the task, but it does make sense to have a parameter like this. I'll add it in.
assert work_external_indexing.is_queued(first.work) | ||
work_external_indexing.clear() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: Can you combine these two into
assert work_external_indexing.is_queued(first.work, clear=True)
moby_duck = db.work(title="Moby Duck", with_open_access_download=True) | ||
moby_dick = db.work(title="Moby Dick", with_open_access_download=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to look at this a couple of times because I didn't catch the difference. Moby Duck. 🦆
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I pulled this from an older test. It is hard to see the diff though. I renamed things a bit to make it easier to see whats happening at a glance.
tests/mocks/search.py
Outdated
to_remove = [] | ||
for item in items: | ||
if item.get("_id") == doc_id: | ||
to_remove.append(item) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see this came in from older code, but could do this with a list comprehension:
to_remove = [item for item in items if item.get("_id") == doc_id]
assert doc["customlists"] is None | ||
|
||
if work.presentation_edition.contributions: | ||
assert len(doc["contributors"]) is len( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'==' instead is
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This whole test was just moved in from the coverage providers tests. Not sure why it was using is
. I updated it.
@jonathangreen Also, I forgot to mention that your pre-review comments were very handy. Thanks! |
Description
This PR updates our search indexing to happen via a celery queue, vs the existing work coverage record approach. The existing search index work coverage records are left in the DB and will be removed in a follow up PR.
All responsibility for the search index initialization and migration has been moved into
InstanceInitializationScript
. This script now handles both the DB and search index initialization. When a migration needs to happen to a new search index this now happens via a celery task, previously this was handled in the search coverage monitor.There was some refactoring of responsibilities between the external search classes and the search client. The search client is now responsible for making sure the correct search alias is used when requests are being made.
Motivation and Context
On our larger CMs, a large amount of the DB load is due to the work coverage records being generated for search. This is mostly due to deadlocks happening between these records when two processes try to update the same work. Moving search indexing to a queue like this should drastically reduce the DB load, and improve our performance.
How Has This Been Tested?
Checklist