diff --git a/tests/rptest/tests/datalake/datalake_verifier.py b/tests/rptest/tests/datalake/datalake_verifier.py index 5ae1676394fc..90797bcd9657 100644 --- a/tests/rptest/tests/datalake/datalake_verifier.py +++ b/tests/rptest/tests/datalake/datalake_verifier.py @@ -85,14 +85,7 @@ def _consumer_thread(self): self.logger.info("Starting consumer thread") consumer = self.create_consumer() - last_position_update = time() - - def maybe_update_positions(): - nonlocal last_position_update - if time() < last_position_update + 3: - # periodically sweep through all partitions - # and update positions - return + def update_fetch_positions(): with self._lock: partitions = [ TopicPartition(topic=self.topic, partition=p) @@ -108,32 +101,31 @@ def maybe_update_positions(): self.logger.debug( f"next position for {p.partition} is {p.offset}") self._next_positions[p.partition] = p.offset - last_position_update = time() while not self._stop.is_set(): self._msg_semaphore.acquire() if self._stop.is_set(): break - msg = consumer.poll(1.0) - maybe_update_positions() - if msg is None: + msgs = consumer.consume(num_messages=1000, timeout=1.0) + update_fetch_positions() + if not msgs: continue - if msg.error(): - self.logger.error(f"Consumer error: {msg.error()}") - continue - with self._lock: - self._total_msgs_cnt += 1 - if self._total_msgs_cnt % 100 == 0: - self.logger.debug( - f"Consumed message partition: {msg.partition()} at offset {msg.offset()}" - ) - self._consumed_messages[msg.partition()].append(msg) - self._max_consumed_offsets[msg.partition()] = max( - self._max_consumed_offsets.get(msg.partition(), -1), - msg.offset()) - if len(self._errors) > 0: - return + for msg in msgs: + if msg.error(): + self.logger.error(f"Consumer error: {msg.error()}") + continue + self._total_msgs_cnt += 1 + if self._total_msgs_cnt % 100 == 0: + self.logger.debug( + f"Consumed message partition: {msg.partition()} at offset {msg.offset()}" + ) + self._consumed_messages[msg.partition()].append(msg) + self._max_consumed_offsets[msg.partition()] = max( + self._max_consumed_offsets.get(msg.partition(), -1), + msg.offset()) + if len(self._errors) > 0: + return consumer.close() @@ -196,6 +188,9 @@ def _query_thread(self): max_consumed = next_consume_offset - 1 # no new messages consumed, skip query if max_consumed <= last_queried_offset: + self.logger.debug( + f"No new messages to query for partition: {partition}, max consumed offset: {max_consumed}, last queried offset {last_queried_offset}" + ) continue query = self._get_query(partition, last_queried_offset,