Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-queue messages using a dead-letter-exchange #14

Closed
wants to merge 6 commits into from

Conversation

liseli
Copy link
Contributor

@liseli liseli commented Apr 19, 2024

This PR delivers the following changes:

  • Not processes messages are re-queue using a dead-letter-exchange
  • document_generator and document_indexer service starts automatically when the containers are up.
  • The services keep alive waiting for new messages from the queue.
  • The README file was updated
  • Some Coding style was also applied

The logic to re-queue messages associates a dead-letter-exchange (retriever_queue_dead_letter_queue and indexer_queue_dead_letter_queue) for each of the existing queues (queue_retriever and queue_indexer). With this logic, we guarantee the queue used for processing is not stuck with failed messages

How to test it:

Step 1. Create /sdr1/obj directory
mkdir ../sdr1/obj

Step 2. Retrieve from pairtree repository data for testing
export HT_SSH_HOST=dev-2.babel.hathitrust.org
scp $HT_SSH_HOST:/sdr1/obj/umn/pairtree_root/31/95/10/01/99/77/04/p/31951001997704p/31951001997704p{.zip,mets.xml} ../sdr1/obj

Step 3. Create the image
docker build -t document_generator .

Step 4. Run the container to retrieve documents

docker compose up document_retriever -d

Open RabbitQM and Solr to check the success of this tests

Step 5. Run retriever service

docker compose exec document_retriever python document_retriever_service/full_text_search_retriever_service.py --list_documents umn.31951001997704p,mdp.35112103801405 --query_field item

Checking: You should see 2 messages on the retriever_queue

Step 6. Run the container to generate documents

docker compose up document_generator

Checking: You should see the document umn.31951001997704p on indexer_queue and the document mdp.35112103801405 on the retriever_queue_dead_letter_queue. As you do not have the files associated with mdp.35112103801405, this message will be rejected.

Step 8. Run the container to index documents
docker compose up document_indexer

Checking: The document umn.31951001997704p should be indexed in Solr and the retriever_queue and indexer_queue are empty. However, you should find the document mdp.35112103801405 in the retriever_queue_dead_letter_queue.

…ocument_generator and document_indexer start automatica when the container is created, they are alive while the container is running
@liseli liseli requested a review from aelkiss April 19, 2024 17:36
Copy link
Member

@aelkiss aelkiss left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the changes here make sense in terms of the goal of what to do, but I think a couple of the specifics might not be working as expected in terms of actually putting something in the dead-letter exchange. Perhaps we could add a test which checks that after a failure there is a message in the dead-letter exchange. (Maybe this was supposed to be part of test_queue_requeue_message?)


from ht_queue_service.queue_connection import QueueConnection
from ht_utils.ht_logger import get_ht_logger

logger = get_ht_logger(name=__name__)


def reject_message(used_channel, basic_deliver):
used_channel.basic_reject(delivery_tag=basic_deliver, requeue=False)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on your comment above and https://www.rabbitmq.com/docs/nack it looks like if we reject but don't requeue, then the message just gets discarded -- but I thought we wanted it to go to the dead-letter exchange, so would we want requeue=True?

Copy link
Contributor Author

@liseli liseli Apr 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aelkiss, requeue=True means the message will be re-queued to the original queue. The first 3 paragraphs of this link explain the difference between nack and reject. I will add some tests to show this behavior (requeue=True => message in the dead letter exchange and in the original queue and requeue=False => message in the dead letter queue)

For simplicity, on this initial proposal, I decided to discard the failed messages from the original queue and routed them to a Dead Letter Exchange to prevent them from being stuck in the queue. When a message is requeued, RabbitMQ tries to place it in its original position, as a result, our consumer tries to recover the failed message meanwhile the list of messages could be growing longer.

On this iteration, if for some reason a message fails, we will route the message to the dead letter exchange queue being able to:

  • have a register of why the message failed
  • extract statistics about how many messages failed
  • prevent stuck in the queue

In the future, we could check by alternatives:

  • to reprocess messages on the dead letter exchange or
  • modify RabbitMQ to put failed messages at the end of the queue (I did some research to implement this alternative, but I have not found yet how to do it)

@@ -22,7 +22,10 @@ def __init__(self, user: str, password: str, host: str, queue_name: str):

def queue_reconnect(self):
self.conn = QueueConnection(self.user, self.password, self.host, self.queue_name)
self.conn.ht_channel.queue_declare(queue=self.queue_name, durable=True)
self.conn.ht_channel.queue_declare(queue=self.queue_name, durable=True, exclusive=False, auto_delete=False,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we extract this to a method in QueueConnection? Note we also have something very similar here: https://github.com/hathitrust/ht_indexer/pull/14/files#diff-360c2473a4deaa8abc596346bb14e34506f3e22482f1153d756e50ba82ce09e5R30-R32

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Certainly, this is a great example of how to eliminate redundant code. I will apply


assert 0 == consumer_instance.get_total_messages()

def test_queue_requeue_message(self, list_messages, consumer_instance, producer_instance):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't look like there are any assertions in this test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added it

@liseli
Copy link
Contributor Author

liseli commented Apr 30, 2024

@aelkiss, I have applied your suggestions to the PR. Please review when you have time. Thanks

@aelkiss
Copy link
Member

aelkiss commented May 3, 2024

Superceded by #15 (I will still go back and review comments here)

@aelkiss aelkiss closed this May 3, 2024
def reject_message(used_channel, basic_deliver):
used_channel.basic_reject(delivery_tag=basic_deliver, requeue=False)
def reject_message(used_channel, basic_deliver, requeue_message=False):
used_channel.basic_reject(delivery_tag=basic_deliver, requeue=requeue_message)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds like the issue with this is that it puts the message back on the queue at the same position.
One alternative would be to enqueue a new message that has a parameter like failure_count with an incremented count. We would need to create a producer as well here.
RabbitMQ may have some functionality built-in to check how many times a messages has failed, and we could check that and put it on the dead letter queue if it has failed more than the maximum failure count.

Copy link
Member

@aelkiss aelkiss May 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also probably have a database that tracks the status of documents (but this would be a future phase of work) - e.g. when it failed and why, or the last time it was successfully indexed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could think about whether we need a dead letter queue at all if we are tracking the status externally in a database table.

@liseli liseli deleted the DEV-1112-requeue_refreshBrach branch May 20, 2024 15:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants