From 6f15348512b6837851a68ff2ba23ae937b4ac911 Mon Sep 17 00:00:00 2001 From: extreme4all <> Date: Tue, 12 Dec 2023 13:11:52 +0100 Subject: [PATCH] error handling --- src/main.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/main.py b/src/main.py index 8c1060a..70f3090 100644 --- a/src/main.py +++ b/src/main.py @@ -116,6 +116,7 @@ async def process_data(receive_queue: Queue, error_queue: Queue): # Initialize counter and start time counter = 0 start_time = time.time() + was_error = False # Run indefinitely while True: @@ -131,7 +132,9 @@ async def process_data(receive_queue: Queue, error_queue: Queue): continue # Choose the queue to process based on whether the error queue is empty - _receive_queue = receive_queue if error_queue.empty() else error_queue + _receive_queue = ( + receive_queue if error_queue.empty() or was_error else error_queue + ) # Get a message from the chosen queue message: dict = await _receive_queue.get() @@ -164,6 +167,9 @@ async def process_data(receive_queue: Queue, error_queue: Queue): except Exception as e: await error_queue.put(message) + # was error default false => will convert to True + # if previous run was an error than this will flip to False, taking the first error statement + was_error = not was_error # Handle exceptions, log the error, and put the message in the error queue logger.error({"error": e}) logger.debug(f"Traceback: \n{traceback.format_exc()}") @@ -172,6 +178,8 @@ async def process_data(receive_queue: Queue, error_queue: Queue): _receive_queue.task_done() continue + was_error = False + # Log processing speed every 100 iterations if counter % 100 == 0 and counter > 0: start_time, counter = log_speed( @@ -198,8 +206,7 @@ async def main(): asyncio.create_task( send_messages(topic="scraper", producer=producer, send_queue=send_queue) ) - - await process_data(receive_queue, error_queue) + await asyncio.gather(*[process_data(receive_queue, error_queue) for _ in range(2)]) if __name__ == "__main__":