Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests/datalake: deflake partition_movement_test #24661

Merged
merged 3 commits into from
Jan 2, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 52 additions & 55 deletions tests/rptest/tests/datalake/datalake_verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,20 @@ def __init__(self, redpanda: RedpandaService, topic: str,
# > max_consumed_offset + 1 if there are gaps from non consumable
# batches like aborted data batches / control batches
self._next_positions = defaultdict(lambda: -1)

self._query: QueryEngineBase = query_engine
self._cg = f"verifier-group-{random.randint(0, 1000000)}"
self._consumer: Consumer = self.create_consumer()
self._query: QueryEngineBase = query_engine
self._lock = threading.Lock()
self._stop = threading.Event()
# number of messages buffered in memory
self._msg_semaphore = threading.Semaphore(5000)
self._total_msgs_cnt = 0
self._num_msgs_pending_verification = 0
# Signalled when enough messages are batched so query
# thread can perform verification. Larger batches results
# in fewer SQL queries and hence faster verification
self._msgs_batched = threading.Condition()
self._query_batch_size = 1000
self._query_batch_wait_timeout_s = 3
self._executor = ThreadPoolExecutor(max_workers=2)
self._rpk = RpkTool(self.redpanda)
# errors found during verification
Expand All @@ -81,62 +87,48 @@ def create_consumer(self):

return c

def update_and_get_fetch_positions(self):
with self._lock:
partitions = [
TopicPartition(topic=self.topic, partition=p)
for p in self._consumed_messages.keys()
]
positions = self._consumer.position(partitions)
for p in positions:
if p.error is not None:
self.logger.warning(
f"Erorr querying position for partition {p.partition}")
else:
self.logger.debug(
f"next position for {p.partition} is {p.offset}")
self._next_positions[p.partition] = p.offset
return self._next_positions.copy()

def _consumer_thread(self):
self.logger.info("Starting consumer thread")
consumer = self.create_consumer()

last_position_update = time()

def maybe_update_positions():
nonlocal last_position_update
if time() < last_position_update + 3:
# periodically sweep through all partitions
# and update positions
return
with self._lock:
partitions = [
TopicPartition(topic=self.topic, partition=p)
for p in self._consumed_messages.keys()
]
positions = consumer.position(partitions)
for p in positions:
if p.error is not None:
self.logger.warning(
f"Erorr querying position for partition {p.partition}"
)
else:
self.logger.debug(
f"next position for {p.partition} is {p.offset}")
self._next_positions[p.partition] = p.offset
last_position_update = time()

while not self._stop.is_set():
self._msg_semaphore.acquire()
if self._stop.is_set():
break
msg = consumer.poll(1.0)
maybe_update_positions()
msg = self._consumer.poll(1.0)
if msg is None:
continue
if msg.error():
self.logger.error(f"Consumer error: {msg.error()}")
continue

with self._lock:
self._total_msgs_cnt += 1
if self._total_msgs_cnt % 100 == 0:
self.logger.debug(
f"Consumed message partition: {msg.partition()} at offset {msg.offset()}"
)
self._num_msgs_pending_verification += 1
self._consumed_messages[msg.partition()].append(msg)
if self._num_msgs_pending_verification >= self._query_batch_size:
with self._msgs_batched:
self._msgs_batched.notify()
self._max_consumed_offsets[msg.partition()] = max(
self._max_consumed_offsets.get(msg.partition(), -1),
msg.offset())
if len(self._errors) > 0:
return

consumer.close()

def _get_query(self, partition, last_queried_offset, max_consumed_offset):
return f"\
SELECT redpanda.offset FROM redpanda.{self._query.escape_identifier(self.topic)} \
Expand Down Expand Up @@ -177,17 +169,18 @@ def _verify_next_message(self, partition, iceberg_offset):
)
return
self._consumed_messages[partition].pop(0)
self._num_msgs_pending_verification -= 1
self._msg_semaphore.release()

def _get_partition_offsets_list(self):
with self._lock:
return self._next_positions.copy()

def _query_thread(self):
self.logger.info("Starting query thread")
while not self._stop.is_set():
try:
partitions = self._get_partition_offsets_list()
with self._msgs_batched:
# Wait for enough data to be batched or a timeout.
self._msgs_batched.wait(
timeout=self._query_batch_wait_timeout_s)
partitions = self.update_and_get_fetch_positions()

for partition, next_consume_offset in partitions.items():
last_queried_offset = self._max_queried_offsets[
Expand Down Expand Up @@ -276,14 +269,18 @@ def wait(self, progress_timeout_sec=30):
self.stop()

def stop(self):
self._stop.set()
self._msg_semaphore.release()
self._executor.shutdown(wait=False)
assert len(
self._errors
) == 0, f"Topic {self.topic} validation errors: {self._errors}"

self.logger.debug(f"consumed offsets: {self._max_consumed_offsets}")
self.logger.debug(f"queried offsets: {self._max_queried_offsets}")

assert self._max_queried_offsets == self._max_consumed_offsets, "Mismatch between maximum offsets in topic vs iceberg table"
try:
self._stop.set()
self._msg_semaphore.release()
self._executor.shutdown(wait=False)
assert len(
self._errors
) == 0, f"Topic {self.topic} validation errors: {self._errors}"

self.logger.debug(
f"consumed offsets: {self._max_consumed_offsets}")
self.logger.debug(f"queried offsets: {self._max_queried_offsets}")

assert self._max_queried_offsets == self._max_consumed_offsets, "Mismatch between maximum offsets in topic vs iceberg table"
finally:
self._consumer.close()
Loading