Skip to content

Commit

Permalink
tests/datalake: deflake partition_movement_test
Browse files Browse the repository at this point in the history
- Fixes a deadlock with the way fetch positions are updated. Currently
  they are updated periodically (every 3s) since the original assumption
  was its an expensive RPC. This was causing a deadlock in some cases
  where data was consumed but fetch positions were not updated (since
  time threshold was not hit) but query thread cannot make progress
  because it relies on fetch positions and would never drain the
  semaphore which is effectively a deadlock. Turns out the method is not
  expensive in most cases and returns an offset locally cached in the
  consumer, removed the timer based based logic.

- Switched to a batched consumer from a poll(timeout) as the latter just
  fetches a single message resulting in a lot of tiny SQL verification
  queries. A batched consume effectively batches a bunch of messages
  resulting in range queries over larger offset ranges and also further
  reduces the # of calls to update fetch positions.
  • Loading branch information
bharathv committed Dec 28, 2024
1 parent f28f9fa commit 82d0fa6
Showing 1 changed file with 22 additions and 27 deletions.
49 changes: 22 additions & 27 deletions tests/rptest/tests/datalake/datalake_verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,7 @@ def _consumer_thread(self):
self.logger.info("Starting consumer thread")
consumer = self.create_consumer()

last_position_update = time()

def maybe_update_positions():
nonlocal last_position_update
if time() < last_position_update + 3:
# periodically sweep through all partitions
# and update positions
return
def update_fetch_positions():
with self._lock:
partitions = [
TopicPartition(topic=self.topic, partition=p)
Expand All @@ -108,32 +101,31 @@ def maybe_update_positions():
self.logger.debug(
f"next position for {p.partition} is {p.offset}")
self._next_positions[p.partition] = p.offset
last_position_update = time()

while not self._stop.is_set():
self._msg_semaphore.acquire()
if self._stop.is_set():
break
msg = consumer.poll(1.0)
maybe_update_positions()
if msg is None:
msgs = consumer.consume(num_messages=1000, timeout=1.0)
update_fetch_positions()
if not msgs:
continue
if msg.error():
self.logger.error(f"Consumer error: {msg.error()}")
continue

with self._lock:
self._total_msgs_cnt += 1
if self._total_msgs_cnt % 100 == 0:
self.logger.debug(
f"Consumed message partition: {msg.partition()} at offset {msg.offset()}"
)
self._consumed_messages[msg.partition()].append(msg)
self._max_consumed_offsets[msg.partition()] = max(
self._max_consumed_offsets.get(msg.partition(), -1),
msg.offset())
if len(self._errors) > 0:
return
for msg in msgs:
if msg.error():
self.logger.error(f"Consumer error: {msg.error()}")
continue
self._total_msgs_cnt += 1
if self._total_msgs_cnt % 100 == 0:
self.logger.debug(
f"Consumed message partition: {msg.partition()} at offset {msg.offset()}"
)
self._consumed_messages[msg.partition()].append(msg)
self._max_consumed_offsets[msg.partition()] = max(
self._max_consumed_offsets.get(msg.partition(), -1),
msg.offset())
if len(self._errors) > 0:
return

consumer.close()

Expand Down Expand Up @@ -196,6 +188,9 @@ def _query_thread(self):
max_consumed = next_consume_offset - 1
# no new messages consumed, skip query
if max_consumed <= last_queried_offset:
self.logger.debug(
f"No new messages to query for partition: {partition}, max consumed offset: {max_consumed}, last queried offset {last_queried_offset}"
)
continue

query = self._get_query(partition, last_queried_offset,
Expand Down

0 comments on commit 82d0fa6

Please sign in to comment.