From 8123580b0ab71bb0257758d8af278a55f96a001a Mon Sep 17 00:00:00 2001 From: extreme4all <> Date: Sun, 26 Nov 2023 23:05:57 +0100 Subject: [PATCH] added an error queue to deal with errors --- src/main.py | 89 ++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 65 insertions(+), 24 deletions(-) diff --git a/src/main.py b/src/main.py index 7930d7d..f059892 100644 --- a/src/main.py +++ b/src/main.py @@ -42,8 +42,13 @@ async def kafka_producer(): return producer -async def receive_messages(consumer: AIOKafkaConsumer, receive_queue: Queue): +async def receive_messages( + consumer: AIOKafkaConsumer, receive_queue: Queue, error_queue: Queue +): async for message in consumer: + if error_queue.qsize() > 100: + await asyncio.sleep(1) + continue value = message.value await receive_queue.put(value) @@ -94,54 +99,85 @@ async def update_player(session: AsyncSession, data: dict): await session.execute(query) -async def process_data(receive_queue: Queue): +def log_speed( + counter: int, start_time: float, receive_queue: Queue +) -> tuple[float, int]: + end_time = time.time() + delta_time = end_time - start_time + speed = counter / delta_time + logger.info( + f"qsize={receive_queue.qsize()}, processed {counter} in {delta_time:.2f} seconds, {speed:.2f} msg/sec" + ) + return time.time(), 0 + + +# Define a function to process data from a queue +async def process_data(receive_queue: Queue, error_queue: Queue): + # Initialize counter and start time counter = 0 start_time = time.time() + # Run indefinitely while True: - if receive_queue.empty(): + # Check if both queues are empty + if receive_queue.empty() and error_queue.empty(): + # If there were previous iterations with data, log processing speed if counter > 0: - end_time = time.time() - delta_time = end_time - start_time - speed = counter / delta_time - logger.info(f"qsize={receive_queue.qsize()}, {speed:.2f} it/s") - # reset - start_time = time.time() - counter = 0 - # sleep + start_time, counter = log_speed( + counter=counter, start_time=start_time, receive_queue=receive_queue + ) + # Sleep for 1 second and then continue to the next iteration await asyncio.sleep(1) continue - message: dict = await receive_queue.get() + # Choose the queue to process based on whether the error queue is empty + _receive_queue = receive_queue if error_queue.empty() else error_queue + + # Get a message from the chosen queue + message: dict = await _receive_queue.get() + + # Extract 'hiscores' and 'player' dictionaries from the message highscore: dict = message.get("hiscores") player: dict = message.get("player") + # Check the environment and filter out certain player IDs if not in production if settings.ENV != "PRD": player_id = player.get("id") - if player_id == 0 or player_id > 300: + MIN_PLAYER_ID = 0 + MAX_PLAYER_ID = 300 + if not (MIN_PLAYER_ID < player_id <= MAX_PLAYER_ID): continue + try: + # Acquire an asynchronous database session session: AsyncSession = await get_session() async with session.begin(): + # If 'highscore' dictionary is present, insert it into the database if highscore: await insert_highscore(session=session, data=highscore) + # Update the player information in the database await update_player(session=session, data=player) + # Commit the changes to the database await session.commit() + # Mark the message as processed in the queue + _receive_queue.task_done() + except Exception as e: + # Handle exceptions, log the error, and put the message in the error queue logger.error({"error": e}) logger.debug(f"Traceback: \n{traceback.format_exc()}") - await receive_queue.put(message) - receive_queue.task_done() + await error_queue.put(message) + logger.info(f"error_qsize={error_queue.qsize()}") + # Mark the message as processed in the queue and continue to the next iteration + _receive_queue.task_done() continue + # Log processing speed every 100 iterations if counter % 100 == 0 and counter > 0: - end_time = time.time() - delta_time = end_time - start_time - speed = counter / delta_time - logger.info(f"qsize={receive_queue.qsize()}, {speed:.2f} it/s") - # reset - start_time = time.time() - counter = 0 + start_time, counter = log_speed( + counter=counter, start_time=start_time, receive_queue=receive_queue + ) + # Increment the counter counter += 1 @@ -152,13 +188,18 @@ async def main(): receive_queue = Queue(maxsize=100) send_queue = Queue(maxsize=100) + error_queue = Queue() - asyncio.create_task(receive_messages(consumer, receive_queue)) + asyncio.create_task( + receive_messages( + consumer=consumer, receive_queue=receive_queue, error_queue=error_queue + ) + ) asyncio.create_task( send_messages(topic="scraper", producer=producer, send_queue=send_queue) ) - await process_data(receive_queue) + await process_data(receive_queue, error_queue) if __name__ == "__main__":