From de06d511179422d43e6fc55c73fb539be8ade950 Mon Sep 17 00:00:00 2001 From: yeonjy Date: Thu, 4 Apr 2024 01:40:32 +0900 Subject: [PATCH] =?UTF-8?q?fix:=20Producer=EA=B0=80=20RabbitMQ=EC=99=80=20?= =?UTF-8?q?=EC=97=B0=EA=B2=B0=20=EB=81=8A=EA=B8=B0=EB=A9=B4=20=EC=9E=AC?= =?UTF-8?q?=EC=8B=9C=EB=8F=84=ED=95=98=EB=8A=94=20=EB=A1=9C=EC=A7=81=20?= =?UTF-8?q?=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../news/contentqueue/rabbitmq_producer.py | 33 +++++++++++-------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/backend/ai_response_processor/news/contentqueue/rabbitmq_producer.py b/backend/ai_response_processor/news/contentqueue/rabbitmq_producer.py index 0490dbb2..c73a73cf 100644 --- a/backend/ai_response_processor/news/contentqueue/rabbitmq_producer.py +++ b/backend/ai_response_processor/news/contentqueue/rabbitmq_producer.py @@ -1,7 +1,8 @@ import os +import time from dotenv import load_dotenv import json -from pika import BlockingConnection, ConnectionParameters, PlainCredentials, BasicProperties +from pika import BlockingConnection, ConnectionParameters, PlainCredentials, BasicProperties, exceptions from news.schema.message_item import MessageItem @@ -26,16 +27,22 @@ def get_connection_params(): heartbeat=600, blocked_connection_timeout=300) + def send_message(message: MessageItem): - connection = BlockingConnection(get_connection_params()) - channel = connection.channel() - channel.queue_declare(queue=CONFIG['queue_name'], durable=True) - - props = BasicProperties(content_type=CONTENT_TYPE, delivery_mode=1) - serialized_message = json.dumps(message.__dict__) - - channel.basic_publish(exchange=CONFIG['exchange_name'], - routing_key=CONFIG['routing_key'], - body=serialized_message, - properties=props) - connection.close() \ No newline at end of file + try: + connection = BlockingConnection(get_connection_params()) + channel = connection.channel() + channel.queue_declare(queue=CONFIG['queue_name'], durable=True) + + props = BasicProperties(content_type=CONTENT_TYPE, delivery_mode=1) + serialized_message = json.dumps(message.__dict__) + + channel.basic_publish(exchange=CONFIG['exchange_name'], + routing_key=CONFIG['routing_key'], + body=serialized_message, + properties=props) + connection.close() + except (exceptions.AMQPConnectionError, exceptions.StreamLostError) as e: + print("Connection failed, retrying in 5 seconds... Error: {}".format(e)) + time.sleep(5) + send_message(message) \ No newline at end of file