Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

connection lost when process time is bigger than hearbeat #650

Open
ericmagalhaes opened this issue Nov 26, 2024 · 4 comments
Open

connection lost when process time is bigger than hearbeat #650

ericmagalhaes opened this issue Nov 26, 2024 · 4 comments

Comments

@ericmagalhaes
Copy link

I would like to check what is being done wrong here, I have few process running and when the process time is longer then the default rabbimq heartbeat the robust connection is disconnected and cannot connect again.

Connection Code

async def connect(self, loop):
        """
        Establish connection with the RabbitMQ

        :return: None
        """
        #logging.info(rabbit.CONNECTING)
        try:
            self.connection = await connect_robust(self.url, loop=loop, client_properties={
                "connection_name":"myconn"
            })
            self.channel = await self.connection.channel(publisher_confirms=False)
            
            #logging.info(rabbit.CONNECTED)
        except Exception as e:
            await self._clear()
            logging.error(e.__dict__)

Processing Code

async def queue_deploy_handler(message: IncomingMessage):
    async with message.process():
        headers = message.headers or {}
        retry_count = (headers.get("x-retry-count", 0) + 1)
        status = 'Fail' if retry_count > 3 else f'Retry #{retry_count}'
        try:
            body = message.body.decode()
            msg = json.loads(body)
            document_id = msg.get("document_id")
            transform_handler(body)
            update_doc_status(document_id, cursor, conn, "Transformed")
            await message.channel.basic_publish(
                exchange=f'ingest-docs.ex',
                body=message.body,           
                routing_key='ingest-docs'
        )
        except Exception as ex:
            update_doc_status(document_id, cursor, conn, status)
            await retry_message(document_id, message, "deploy-ryzedocs", ex)

I'm processing PDF files, like OCR, PDFs with few pages works fine, to workaround I had to change the heatbeat on the connection so I could get more time before disconnected.
Using aio-pika 9.5.0

@mosquito
Copy link
Owner

Do not block the event loop, use loop.run_in_executor for processing your messages. Python can’t send a heartbeat frame.

@ericmagalhaes
Copy link
Author

you mean something like it?

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    //loop.run_until_complete(main(loop)) remove it
    loop.run_in_executor(main(loop)) // add this
    loop.run_forever()

@mosquito
Copy link
Owner

Nope. await loop.run_in_executor(None, transform_handler, body)

@mosquito
Copy link
Owner

mosquito commented Nov 26, 2024

https://docs.python.org/3/library/asyncio-dev.html#running-blocking-code See this article for more information

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants