Skip to content

Commit

Permalink
error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
extreme4all committed Dec 12, 2023
1 parent 78348c5 commit 6f15348
Showing 1 changed file with 10 additions and 3 deletions.
13 changes: 10 additions & 3 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ async def process_data(receive_queue: Queue, error_queue: Queue):
# Initialize counter and start time
counter = 0
start_time = time.time()
was_error = False

# Run indefinitely
while True:
Expand All @@ -131,7 +132,9 @@ async def process_data(receive_queue: Queue, error_queue: Queue):
continue

# Choose the queue to process based on whether the error queue is empty
_receive_queue = receive_queue if error_queue.empty() else error_queue
_receive_queue = (
receive_queue if error_queue.empty() or was_error else error_queue
)

# Get a message from the chosen queue
message: dict = await _receive_queue.get()
Expand Down Expand Up @@ -164,6 +167,9 @@ async def process_data(receive_queue: Queue, error_queue: Queue):

except Exception as e:
await error_queue.put(message)
# was error default false => will convert to True
# if previous run was an error than this will flip to False, taking the first error statement
was_error = not was_error
# Handle exceptions, log the error, and put the message in the error queue
logger.error({"error": e})
logger.debug(f"Traceback: \n{traceback.format_exc()}")
Expand All @@ -172,6 +178,8 @@ async def process_data(receive_queue: Queue, error_queue: Queue):
_receive_queue.task_done()
continue

was_error = False

# Log processing speed every 100 iterations
if counter % 100 == 0 and counter > 0:
start_time, counter = log_speed(
Expand All @@ -198,8 +206,7 @@ async def main():
asyncio.create_task(
send_messages(topic="scraper", producer=producer, send_queue=send_queue)
)

await process_data(receive_queue, error_queue)
await asyncio.gather(*[process_data(receive_queue, error_queue) for _ in range(2)])


if __name__ == "__main__":
Expand Down

0 comments on commit 6f15348

Please sign in to comment.