-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Re-queue messages using a dead-letter-exchange #14
Conversation
…ocument_generator and document_indexer start automatica when the container is created, they are alive while the container is running
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the changes here make sense in terms of the goal of what to do, but I think a couple of the specifics might not be working as expected in terms of actually putting something in the dead-letter exchange. Perhaps we could add a test which checks that after a failure there is a message in the dead-letter exchange. (Maybe this was supposed to be part of test_queue_requeue_message
?)
ht_queue_service/queue_consumer.py
Outdated
|
||
from ht_queue_service.queue_connection import QueueConnection | ||
from ht_utils.ht_logger import get_ht_logger | ||
|
||
logger = get_ht_logger(name=__name__) | ||
|
||
|
||
def reject_message(used_channel, basic_deliver): | ||
used_channel.basic_reject(delivery_tag=basic_deliver, requeue=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on your comment above and https://www.rabbitmq.com/docs/nack it looks like if we reject but don't requeue, then the message just gets discarded -- but I thought we wanted it to go to the dead-letter exchange, so would we want requeue=True
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aelkiss, requeue=True
means the message will be re-queued to the original queue. The first 3 paragraphs of this link explain the difference between nack
and reject
. I will add some tests to show this behavior (requeue=True =>
message in the dead letter exchange and in the original queue and requeue=False =>
message in the dead letter queue)
For simplicity, on this initial proposal, I decided to discard the failed messages from the original queue and routed them to a Dead Letter Exchange to prevent them from being stuck in the queue. When a message is requeued, RabbitMQ tries to place it in its original position, as a result, our consumer tries to recover the failed message meanwhile the list of messages could be growing longer.
On this iteration, if for some reason a message fails, we will route the message to the dead letter exchange queue being able to:
- have a register of why the message failed
- extract statistics about how many messages failed
- prevent stuck in the queue
In the future, we could check by alternatives:
- to reprocess messages on the dead letter exchange or
- modify RabbitMQ to put failed messages at the end of the queue (I did some research to implement this alternative, but I have not found yet how to do it)
ht_queue_service/queue_producer.py
Outdated
@@ -22,7 +22,10 @@ def __init__(self, user: str, password: str, host: str, queue_name: str): | |||
|
|||
def queue_reconnect(self): | |||
self.conn = QueueConnection(self.user, self.password, self.host, self.queue_name) | |||
self.conn.ht_channel.queue_declare(queue=self.queue_name, durable=True) | |||
self.conn.ht_channel.queue_declare(queue=self.queue_name, durable=True, exclusive=False, auto_delete=False, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we extract this to a method in QueueConnection
? Note we also have something very similar here: https://github.com/hathitrust/ht_indexer/pull/14/files#diff-360c2473a4deaa8abc596346bb14e34506f3e22482f1153d756e50ba82ce09e5R30-R32
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Certainly, this is a great example of how to eliminate redundant code. I will apply
|
||
assert 0 == consumer_instance.get_total_messages() | ||
|
||
def test_queue_requeue_message(self, list_messages, consumer_instance, producer_instance): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't look like there are any assertions in this test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added it
@aelkiss, I have applied your suggestions to the PR. Please review when you have time. Thanks |
Superceded by #15 (I will still go back and review comments here) |
ht_queue_service/queue_consumer.py
Outdated
def reject_message(used_channel, basic_deliver): | ||
used_channel.basic_reject(delivery_tag=basic_deliver, requeue=False) | ||
def reject_message(used_channel, basic_deliver, requeue_message=False): | ||
used_channel.basic_reject(delivery_tag=basic_deliver, requeue=requeue_message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It sounds like the issue with this is that it puts the message back on the queue at the same position.
One alternative would be to enqueue a new message that has a parameter like failure_count
with an incremented count. We would need to create a producer as well here.
RabbitMQ may have some functionality built-in to check how many times a messages has failed, and we could check that and put it on the dead letter queue if it has failed more than the maximum failure count.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also probably have a database that tracks the status of documents (but this would be a future phase of work) - e.g. when it failed and why, or the last time it was successfully indexed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could think about whether we need a dead letter queue at all if we are tracking the status externally in a database table.
This PR delivers the following changes:
The logic to re-queue messages associates a dead-letter-exchange (retriever_queue_dead_letter_queue and indexer_queue_dead_letter_queue) for each of the existing queues (queue_retriever and queue_indexer). With this logic, we guarantee the queue used for processing is not stuck with failed messages
How to test it:
Step 1. Create /sdr1/obj directory
mkdir ../sdr1/obj
Step 2. Retrieve from pairtree repository data for testing
export HT_SSH_HOST=dev-2.babel.hathitrust.org
scp $HT_SSH_HOST:/sdr1/obj/umn/pairtree_root/31/95/10/01/99/77/04/p/31951001997704p/31951001997704p{.zip,mets.xml} ../sdr1/obj
Step 3. Create the image
docker build -t document_generator .
Step 4. Run the container to retrieve documents
docker compose up document_retriever -d
Open RabbitQM and Solr to check the success of this tests
Step 5. Run retriever service
docker compose exec document_retriever python document_retriever_service/full_text_search_retriever_service.py --list_documents umn.31951001997704p,mdp.35112103801405 --query_field item
Checking: You should see 2 messages on the retriever_queue
Step 6. Run the container to generate documents
docker compose up document_generator
Checking: You should see the document umn.31951001997704p on indexer_queue and the document mdp.35112103801405 on the retriever_queue_dead_letter_queue. As you do not have the files associated with mdp.35112103801405, this message will be rejected.
Step 8. Run the container to index documents
docker compose up document_indexer
Checking: The document umn.31951001997704p should be indexed in Solr and the retriever_queue and indexer_queue are empty. However, you should find the document mdp.35112103801405 in the retriever_queue_dead_letter_queue.