Skip to content

Commit

Permalink
lesser verbose sqlalchemy error
Browse files Browse the repository at this point in the history
  • Loading branch information
extreme4all committed Dec 12, 2023
1 parent 7dfbf9e commit ef22788
Showing 1 changed file with 9 additions and 1 deletion.
10 changes: 9 additions & 1 deletion src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from sqlalchemy import insert, select, update
from sqlalchemy.exc import OperationalError
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.sql.expression import Insert, Select, Update

Expand Down Expand Up @@ -159,7 +160,14 @@ async def process_data(receive_queue: Queue, error_queue: Queue, type: str):
await session.commit()
# Mark the message as processed in the queue
receive_queue.task_done()

except OperationalError as e:
await error_queue.put(message)
# Handle exceptions, log the error, and put the message in the error queue
logger.error({"error": e})
logger.info(f"error_qsize={error_queue.qsize()}, {message=}")
# Mark the message as processed in the queue and continue to the next iteration
receive_queue.task_done()
continue
except Exception as e:
await error_queue.put(message)
# Handle exceptions, log the error, and put the message in the error queue
Expand Down

0 comments on commit ef22788

Please sign in to comment.