Skip to content

Commit

Permalink
improved logging
Browse files Browse the repository at this point in the history
  • Loading branch information
extreme4all committed Jan 12, 2024
1 parent 18bfaea commit b325734
Showing 1 changed file with 26 additions and 17 deletions.
43 changes: 26 additions & 17 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,27 @@ async def update_player(session: AsyncSession, data: dict):


def log_speed(
counter: int, start_time: float, receive_queue: Queue
counter: int, start_time: float, _queue: Queue, topic: str, interval: int = 15
) -> tuple[float, int]:
end_time = time.time()
delta_time = end_time - start_time
# Calculate the time elapsed since the function started
delta_time = time.time() - start_time

# Check if the specified interval has not elapsed yet
if delta_time < interval:
# Return the original start time and the current counter value
return start_time, counter

# Calculate the processing speed (messages per second)
speed = counter / delta_time
logger.info(
f"qsize={receive_queue.qsize()}, processed {counter} in {delta_time:.2f} seconds, {speed:.2f} msg/sec"

# Log the processing speed and relevant information
log_message = (
f"{topic=}, qsize={_queue.qsize()}, "
f"processed {counter} in {delta_time:.2f} seconds, {speed:.2f} msg/sec"
)
logger.info(log_message)

# Return the current time and reset the counter to zero
return time.time(), 0


Expand All @@ -121,14 +134,15 @@ async def process_data(receive_queue: Queue, error_queue: Queue):

# Run indefinitely
while True:
# Check if both queues are empty
start_time, counter = log_speed(
counter=counter,
start_time=start_time,
_queue=receive_queue,
topic="scraper",
)

# Check if queue is empty
if receive_queue.empty():
# If there were previous iterations with data, log processing speed
if counter > 0:
start_time, counter = log_speed(
counter=counter, start_time=start_time, receive_queue=receive_queue
)
# Sleep for 1 second and then continue to the next iteration
await asyncio.sleep(1)
continue

Expand Down Expand Up @@ -178,11 +192,6 @@ async def process_data(receive_queue: Queue, error_queue: Queue):
receive_queue.task_done()
continue

# Log processing speed every 100 iterations
if counter % 100 == 0 and counter > 0:
start_time, counter = log_speed(
counter=counter, start_time=start_time, receive_queue=receive_queue
)
# Increment the counter
counter += 1

Expand Down

0 comments on commit b325734

Please sign in to comment.