Skip to content

Commit

Permalink
added an error queue to deal with errors
Browse files Browse the repository at this point in the history
  • Loading branch information
extreme4all committed Nov 26, 2023
1 parent ef41820 commit 8123580
Showing 1 changed file with 65 additions and 24 deletions.
89 changes: 65 additions & 24 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,13 @@ async def kafka_producer():
return producer


async def receive_messages(consumer: AIOKafkaConsumer, receive_queue: Queue):
async def receive_messages(
consumer: AIOKafkaConsumer, receive_queue: Queue, error_queue: Queue
):
async for message in consumer:
if error_queue.qsize() > 100:
await asyncio.sleep(1)
continue
value = message.value
await receive_queue.put(value)

Expand Down Expand Up @@ -94,54 +99,85 @@ async def update_player(session: AsyncSession, data: dict):
await session.execute(query)


async def process_data(receive_queue: Queue):
def log_speed(
counter: int, start_time: float, receive_queue: Queue
) -> tuple[float, int]:
end_time = time.time()
delta_time = end_time - start_time
speed = counter / delta_time
logger.info(
f"qsize={receive_queue.qsize()}, processed {counter} in {delta_time:.2f} seconds, {speed:.2f} msg/sec"
)
return time.time(), 0


# Define a function to process data from a queue
async def process_data(receive_queue: Queue, error_queue: Queue):
# Initialize counter and start time
counter = 0
start_time = time.time()

# Run indefinitely
while True:
if receive_queue.empty():
# Check if both queues are empty
if receive_queue.empty() and error_queue.empty():
# If there were previous iterations with data, log processing speed
if counter > 0:
end_time = time.time()
delta_time = end_time - start_time
speed = counter / delta_time
logger.info(f"qsize={receive_queue.qsize()}, {speed:.2f} it/s")
# reset
start_time = time.time()
counter = 0
# sleep
start_time, counter = log_speed(
counter=counter, start_time=start_time, receive_queue=receive_queue
)
# Sleep for 1 second and then continue to the next iteration
await asyncio.sleep(1)
continue

message: dict = await receive_queue.get()
# Choose the queue to process based on whether the error queue is empty
_receive_queue = receive_queue if error_queue.empty() else error_queue

# Get a message from the chosen queue
message: dict = await _receive_queue.get()

# Extract 'hiscores' and 'player' dictionaries from the message
highscore: dict = message.get("hiscores")
player: dict = message.get("player")

# Check the environment and filter out certain player IDs if not in production
if settings.ENV != "PRD":
player_id = player.get("id")
if player_id == 0 or player_id > 300:
MIN_PLAYER_ID = 0
MAX_PLAYER_ID = 300
if not (MIN_PLAYER_ID < player_id <= MAX_PLAYER_ID):
continue

try:
# Acquire an asynchronous database session
session: AsyncSession = await get_session()
async with session.begin():
# If 'highscore' dictionary is present, insert it into the database
if highscore:
await insert_highscore(session=session, data=highscore)
# Update the player information in the database
await update_player(session=session, data=player)
# Commit the changes to the database
await session.commit()
# Mark the message as processed in the queue
_receive_queue.task_done()

except Exception as e:
# Handle exceptions, log the error, and put the message in the error queue
logger.error({"error": e})
logger.debug(f"Traceback: \n{traceback.format_exc()}")
await receive_queue.put(message)
receive_queue.task_done()
await error_queue.put(message)
logger.info(f"error_qsize={error_queue.qsize()}")
# Mark the message as processed in the queue and continue to the next iteration
_receive_queue.task_done()
continue

# Log processing speed every 100 iterations
if counter % 100 == 0 and counter > 0:
end_time = time.time()
delta_time = end_time - start_time
speed = counter / delta_time
logger.info(f"qsize={receive_queue.qsize()}, {speed:.2f} it/s")
# reset
start_time = time.time()
counter = 0
start_time, counter = log_speed(
counter=counter, start_time=start_time, receive_queue=receive_queue
)
# Increment the counter
counter += 1


Expand All @@ -152,13 +188,18 @@ async def main():

receive_queue = Queue(maxsize=100)
send_queue = Queue(maxsize=100)
error_queue = Queue()

asyncio.create_task(receive_messages(consumer, receive_queue))
asyncio.create_task(
receive_messages(
consumer=consumer, receive_queue=receive_queue, error_queue=error_queue
)
)
asyncio.create_task(
send_messages(topic="scraper", producer=producer, send_queue=send_queue)
)

await process_data(receive_queue)
await process_data(receive_queue, error_queue)


if __name__ == "__main__":
Expand Down

0 comments on commit 8123580

Please sign in to comment.