Skip to content

Commit

Permalink
fix: Producer가 RabbitMQ와 연결 끊기면 재시도하는 로직 추가
Browse files Browse the repository at this point in the history
  • Loading branch information
yeonjy committed Apr 3, 2024
1 parent c6efcdf commit de06d51
Showing 1 changed file with 20 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import os
import time
from dotenv import load_dotenv
import json
from pika import BlockingConnection, ConnectionParameters, PlainCredentials, BasicProperties
from pika import BlockingConnection, ConnectionParameters, PlainCredentials, BasicProperties, exceptions

from news.schema.message_item import MessageItem

Expand All @@ -26,16 +27,22 @@ def get_connection_params():
heartbeat=600,
blocked_connection_timeout=300)


def send_message(message: MessageItem):
connection = BlockingConnection(get_connection_params())
channel = connection.channel()
channel.queue_declare(queue=CONFIG['queue_name'], durable=True)

props = BasicProperties(content_type=CONTENT_TYPE, delivery_mode=1)
serialized_message = json.dumps(message.__dict__)

channel.basic_publish(exchange=CONFIG['exchange_name'],
routing_key=CONFIG['routing_key'],
body=serialized_message,
properties=props)
connection.close()
try:
connection = BlockingConnection(get_connection_params())
channel = connection.channel()
channel.queue_declare(queue=CONFIG['queue_name'], durable=True)

props = BasicProperties(content_type=CONTENT_TYPE, delivery_mode=1)
serialized_message = json.dumps(message.__dict__)

channel.basic_publish(exchange=CONFIG['exchange_name'],
routing_key=CONFIG['routing_key'],
body=serialized_message,
properties=props)
connection.close()
except (exceptions.AMQPConnectionError, exceptions.StreamLostError) as e:
print("Connection failed, retrying in 5 seconds... Error: {}".format(e))
time.sleep(5)
send_message(message)

0 comments on commit de06d51

Please sign in to comment.