Skip to content

Commit

Permalink
dl/verifier: batch messages for verification
Browse files Browse the repository at this point in the history
Two main optimizations here

- Batch enough messages so each SQL query spans bigger range of offsets
  (reduces # of queries)
- Avoids updating fetch positions too often, since the loop sweeps
  through every partition, debouncing it until enough messages are
  batched.
  • Loading branch information
bharathv committed Dec 31, 2024
1 parent 56efe82 commit 9bc73dd
Showing 1 changed file with 13 additions and 0 deletions.
13 changes: 13 additions & 0 deletions tests/rptest/tests/datalake/datalake_verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ def __init__(self, redpanda: RedpandaService, topic: str,
# number of messages buffered in memory
self._msg_semaphore = threading.Semaphore(5000)
self._num_msgs_pending_verification = 0
# Signalled when enough messages are batched so query
# thread can perform verification. Larger batches results
# in fewer SQL queries and hence faster verification
self._msgs_batched = threading.Condition()
self._query_batch_size = 1000
self._query_batch_wait_timeout_s = 3
self._executor = ThreadPoolExecutor(max_workers=2)
self._rpk = RpkTool(self.redpanda)
# errors found during verification
Expand Down Expand Up @@ -114,6 +120,9 @@ def _consumer_thread(self):
with self._lock:
self._num_msgs_pending_verification += 1
self._consumed_messages[msg.partition()].append(msg)
if self._num_msgs_pending_verification >= self._query_batch_size:
with self._msgs_batched:
self._msgs_batched.notify()
self._max_consumed_offsets[msg.partition()] = max(
self._max_consumed_offsets.get(msg.partition(), -1),
msg.offset())
Expand Down Expand Up @@ -167,6 +176,10 @@ def _query_thread(self):
self.logger.info("Starting query thread")
while not self._stop.is_set():
try:
with self._msgs_batched:
# Wait for enough data to be batched or a timeout.
self._msgs_batched.wait(
timeout=self._query_batch_wait_timeout_s)
partitions = self.update_and_get_fetch_positions()

for partition, next_consume_offset in partitions.items():
Expand Down

0 comments on commit 9bc73dd

Please sign in to comment.