From 3362b9f3ab0f632d0bf5cc376060bb9adedb1f6c Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Tue, 31 Dec 2024 09:00:16 -0800 Subject: [PATCH 1/3] dl/verifier: update fetch positions while querying This fixes a subtle deadlock where positions are not updated before the memory fills up and query thread cannot make progress without correct fetch positions. Removed timer based logic that periodically updates positions, it will be further refined in the next commits. --- .../tests/datalake/datalake_verifier.py | 85 ++++++++----------- 1 file changed, 36 insertions(+), 49 deletions(-) diff --git a/tests/rptest/tests/datalake/datalake_verifier.py b/tests/rptest/tests/datalake/datalake_verifier.py index 5ae1676394fca..886cd7e51c03e 100644 --- a/tests/rptest/tests/datalake/datalake_verifier.py +++ b/tests/rptest/tests/datalake/datalake_verifier.py @@ -55,9 +55,9 @@ def __init__(self, redpanda: RedpandaService, topic: str, # > max_consumed_offset + 1 if there are gaps from non consumable # batches like aborted data batches / control batches self._next_positions = defaultdict(lambda: -1) - - self._query: QueryEngineBase = query_engine self._cg = f"verifier-group-{random.randint(0, 1000000)}" + self._consumer: Consumer = self.create_consumer() + self._query: QueryEngineBase = query_engine self._lock = threading.Lock() self._stop = threading.Event() # number of messages buffered in memory @@ -81,41 +81,30 @@ def create_consumer(self): return c + def update_and_get_fetch_positions(self): + with self._lock: + partitions = [ + TopicPartition(topic=self.topic, partition=p) + for p in self._consumed_messages.keys() + ] + positions = self._consumer.position(partitions) + for p in positions: + if p.error is not None: + self.logger.warning( + f"Erorr querying position for partition {p.partition}") + else: + self.logger.debug( + f"next position for {p.partition} is {p.offset}") + self._next_positions[p.partition] = p.offset + return self._next_positions.copy() + def _consumer_thread(self): self.logger.info("Starting consumer thread") - consumer = self.create_consumer() - - last_position_update = time() - - def maybe_update_positions(): - nonlocal last_position_update - if time() < last_position_update + 3: - # periodically sweep through all partitions - # and update positions - return - with self._lock: - partitions = [ - TopicPartition(topic=self.topic, partition=p) - for p in self._consumed_messages.keys() - ] - positions = consumer.position(partitions) - for p in positions: - if p.error is not None: - self.logger.warning( - f"Erorr querying position for partition {p.partition}" - ) - else: - self.logger.debug( - f"next position for {p.partition} is {p.offset}") - self._next_positions[p.partition] = p.offset - last_position_update = time() - while not self._stop.is_set(): self._msg_semaphore.acquire() if self._stop.is_set(): break - msg = consumer.poll(1.0) - maybe_update_positions() + msg = self._consumer.poll(1.0) if msg is None: continue if msg.error(): @@ -135,8 +124,6 @@ def maybe_update_positions(): if len(self._errors) > 0: return - consumer.close() - def _get_query(self, partition, last_queried_offset, max_consumed_offset): return f"\ SELECT redpanda.offset FROM redpanda.{self._query.escape_identifier(self.topic)} \ @@ -179,15 +166,11 @@ def _verify_next_message(self, partition, iceberg_offset): self._consumed_messages[partition].pop(0) self._msg_semaphore.release() - def _get_partition_offsets_list(self): - with self._lock: - return self._next_positions.copy() - def _query_thread(self): self.logger.info("Starting query thread") while not self._stop.is_set(): try: - partitions = self._get_partition_offsets_list() + partitions = self.update_and_get_fetch_positions() for partition, next_consume_offset in partitions.items(): last_queried_offset = self._max_queried_offsets[ @@ -276,14 +259,18 @@ def wait(self, progress_timeout_sec=30): self.stop() def stop(self): - self._stop.set() - self._msg_semaphore.release() - self._executor.shutdown(wait=False) - assert len( - self._errors - ) == 0, f"Topic {self.topic} validation errors: {self._errors}" - - self.logger.debug(f"consumed offsets: {self._max_consumed_offsets}") - self.logger.debug(f"queried offsets: {self._max_queried_offsets}") - - assert self._max_queried_offsets == self._max_consumed_offsets, "Mismatch between maximum offsets in topic vs iceberg table" + try: + self._stop.set() + self._msg_semaphore.release() + self._executor.shutdown(wait=False) + assert len( + self._errors + ) == 0, f"Topic {self.topic} validation errors: {self._errors}" + + self.logger.debug( + f"consumed offsets: {self._max_consumed_offsets}") + self.logger.debug(f"queried offsets: {self._max_queried_offsets}") + + assert self._max_queried_offsets == self._max_consumed_offsets, "Mismatch between maximum offsets in topic vs iceberg table" + finally: + self._consumer.close() From 56efe8293d928e0fdc97bfd1d25e4e87ff0a7a0e Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Tue, 31 Dec 2024 09:02:14 -0800 Subject: [PATCH 2/3] dl/verifier: track count of messages pending verification --- tests/rptest/tests/datalake/datalake_verifier.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/tests/rptest/tests/datalake/datalake_verifier.py b/tests/rptest/tests/datalake/datalake_verifier.py index 886cd7e51c03e..172e1423bf7f4 100644 --- a/tests/rptest/tests/datalake/datalake_verifier.py +++ b/tests/rptest/tests/datalake/datalake_verifier.py @@ -62,7 +62,7 @@ def __init__(self, redpanda: RedpandaService, topic: str, self._stop = threading.Event() # number of messages buffered in memory self._msg_semaphore = threading.Semaphore(5000) - self._total_msgs_cnt = 0 + self._num_msgs_pending_verification = 0 self._executor = ThreadPoolExecutor(max_workers=2) self._rpk = RpkTool(self.redpanda) # errors found during verification @@ -112,11 +112,7 @@ def _consumer_thread(self): continue with self._lock: - self._total_msgs_cnt += 1 - if self._total_msgs_cnt % 100 == 0: - self.logger.debug( - f"Consumed message partition: {msg.partition()} at offset {msg.offset()}" - ) + self._num_msgs_pending_verification += 1 self._consumed_messages[msg.partition()].append(msg) self._max_consumed_offsets[msg.partition()] = max( self._max_consumed_offsets.get(msg.partition(), -1), @@ -164,6 +160,7 @@ def _verify_next_message(self, partition, iceberg_offset): ) return self._consumed_messages[partition].pop(0) + self._num_msgs_pending_verification -= 1 self._msg_semaphore.release() def _query_thread(self): From 9bc73dd97f8e24714c3a2f30ca1527dec5d644a7 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Tue, 31 Dec 2024 09:05:20 -0800 Subject: [PATCH 3/3] dl/verifier: batch messages for verification Two main optimizations here - Batch enough messages so each SQL query spans bigger range of offsets (reduces # of queries) - Avoids updating fetch positions too often, since the loop sweeps through every partition, debouncing it until enough messages are batched. --- tests/rptest/tests/datalake/datalake_verifier.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/tests/rptest/tests/datalake/datalake_verifier.py b/tests/rptest/tests/datalake/datalake_verifier.py index 172e1423bf7f4..32b5be0a83bd5 100644 --- a/tests/rptest/tests/datalake/datalake_verifier.py +++ b/tests/rptest/tests/datalake/datalake_verifier.py @@ -63,6 +63,12 @@ def __init__(self, redpanda: RedpandaService, topic: str, # number of messages buffered in memory self._msg_semaphore = threading.Semaphore(5000) self._num_msgs_pending_verification = 0 + # Signalled when enough messages are batched so query + # thread can perform verification. Larger batches results + # in fewer SQL queries and hence faster verification + self._msgs_batched = threading.Condition() + self._query_batch_size = 1000 + self._query_batch_wait_timeout_s = 3 self._executor = ThreadPoolExecutor(max_workers=2) self._rpk = RpkTool(self.redpanda) # errors found during verification @@ -114,6 +120,9 @@ def _consumer_thread(self): with self._lock: self._num_msgs_pending_verification += 1 self._consumed_messages[msg.partition()].append(msg) + if self._num_msgs_pending_verification >= self._query_batch_size: + with self._msgs_batched: + self._msgs_batched.notify() self._max_consumed_offsets[msg.partition()] = max( self._max_consumed_offsets.get(msg.partition(), -1), msg.offset()) @@ -167,6 +176,10 @@ def _query_thread(self): self.logger.info("Starting query thread") while not self._stop.is_set(): try: + with self._msgs_batched: + # Wait for enough data to be batched or a timeout. + self._msgs_batched.wait( + timeout=self._query_batch_wait_timeout_s) partitions = self.update_and_get_fetch_positions() for partition, next_consume_offset in partitions.items():