Skip to content

Commit

Permalink
type checking message & error Q
Browse files Browse the repository at this point in the history
model_dump(json)
  • Loading branch information
extreme4all committed Jul 21, 2024
1 parent d4f0969 commit 841a5d5
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 3 deletions.
4 changes: 2 additions & 2 deletions src/bulk_normalization.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,14 +310,14 @@ async def insert_data_v3(batch: list[Message], error_queue: Queue):
logger.error({"error": e})
logger.debug(f"Traceback: \n{traceback.format_exc()}")
for message in batch:
await error_queue.put(message.model_dump_json())
await error_queue.put(message.model_dump(mode="json"))

logger.info(f"error_qsize={error_queue.qsize()}, {message=}")
except Exception as e:
logger.error({"error": e})
logger.debug(f"Traceback: \n{traceback.format_exc()}")
for message in batch:
await error_queue.put(message.model_dump_json())
await error_queue.put(message.model_dump(mode="json"))

logger.info(f"error_qsize={error_queue.qsize()}, {message=}")

Expand Down
11 changes: 10 additions & 1 deletion src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,16 @@ async def process_data(receive_queue: Queue, error_queue: Queue):

# Get a message from the chosen queue
message = await receive_queue.get()
message = Message(**message)

if not (isinstance(message, dict) or isinstance(message, list)):
logger.debug(f"invalid type: {message=}")
continue

try:
message = Message(**message)
except Exception as e:
logger.error(e)
continue

# TODO fix test data
if settings.ENV != "PRD":
Expand Down

0 comments on commit 841a5d5

Please sign in to comment.