Skip to content

Commit

Permalink
Merge pull request #24031 from bashtanov/migrations-test-fixes
Browse files Browse the repository at this point in the history
Migrations test fixes
  • Loading branch information
bashtanov authored Nov 14, 2024
2 parents 4cf4a20 + f302a49 commit d6e55e0
Showing 1 changed file with 101 additions and 86 deletions.
187 changes: 101 additions & 86 deletions tests/rptest/tests/data_migrations_api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,15 @@ def poll_hard(consumer, timeout):
self.logger.debug(f"second msg={format_message(msg)}")
assert msg is None

def log_topics(self, topic_names):
for t in topic_names:
try:
topic_desc = self.client().describe_topic(t)
except ck.KafkaException as e:
self.logger.warn(f"failed to describe topic {t}: {e}")
else:
self.logger.info(f"topic {t} is {topic_desc}")

def check_migrations(self, migration_id, exp_topics_cnt,
exp_migrations_cnt):
""" make sure that, when the migration appears,
Expand All @@ -505,7 +514,9 @@ def check():
backoff_sec=1,
err_msg=f"Failed waiting for migration")

@cluster(num_nodes=3, log_allow_list=MIGRATION_LOG_ALLOW_LIST)
@cluster(num_nodes=3,
log_allow_list=MIGRATION_LOG_ALLOW_LIST +
["Labeled topic manifest download resulted in listing error"])
def test_mount_inexistent(self):
topic = TopicSpec(partition_count=3)

Expand Down Expand Up @@ -586,10 +597,8 @@ def test_creating_and_listing_migrations(self):
in_migration_id = self.create_and_wait(in_migration)
self.check_migrations(in_migration_id, len(inbound_topics), 2)

for t in inbound_topics:
self.logger.info(
f"inbound topic: {self.client().describe_topic(t.source_topic_reference.topic)}"
)
self.log_topics(t.source_topic_reference.topic
for t in inbound_topics)

self.execute_data_migration_action_flaky(in_migration_id,
MigrationAction.prepare)
Expand All @@ -607,10 +616,7 @@ def test_creating_and_listing_migrations(self):
['cut_over', 'finished'])
self.wait_for_migration_states(in_migration_id, ['finished'])

for t in topics:
self.logger.info(
f'topic {t.name} is {self.client().describe_topic(t.name)}'
)
self.log_topics(t.name for t in topics)

# todo: fix rp_storage_tool to use overridden topic names
self.redpanda.si_settings.set_expected_damage(
Expand Down Expand Up @@ -709,10 +715,7 @@ def test_higher_level_migration_api(self):
self.wait_partitions_appear(inbound_topics_spec)
self.logger.info('waiting for migration to be deleted')
self.wait_migration_disappear(in_migration_id)
for t in topics:
self.logger.info(
f'topic {t.name} is {self.client().describe_topic(t.name)}'
)
self.log_topics(t.name for t in topics)
migrations_map = self.get_migrations_map()
self.logger.info(f"migrations: {migrations_map}")

Expand Down Expand Up @@ -782,8 +785,11 @@ def start_consumer(self, topic):
return consumer

def _do_validate_topic_operation(self, topic: str, op_name: str,
expected_to_pass: bool,
expected_to_pass: bool | None,
operation: Callable[[str], typing.Any]):
if expected_to_pass is None:
return

self.logger.info(
f"Validating execution of {op_name} against topic: {topic}")
success = True
Expand All @@ -798,51 +804,48 @@ def _do_validate_topic_operation(self, topic: str, op_name: str,
assert expected_to_pass == success, f"Operation {op_name} outcome is not " \
f"expected. Expected to pass: {expected_to_pass}, succeeded: {success}"

def validate_topic_access(
self,
topic: str,
metadata_locked: bool,
read_blocked: bool,
produce_blocked: bool,
test_add_partitions: bool = True,
assert_topic_present: bool = True,
):
def validate_topic_access(self, topic: str, expect_present: bool | None,
expect_metadata_changeable: bool | None,
expect_readable: bool | None,
expect_writable: bool | None):
rpk = RpkTool(self.redpanda)
if assert_topic_present:
assert topic in rpk.list_topics(), \

if expect_present is not None:
assert expect_present == (topic in rpk.list_topics()), \
f"validated topic {topic} must be present"

if test_add_partitions:
self._do_validate_topic_operation(
topic=topic,
op_name="add_partitions",
expected_to_pass=not metadata_locked,
operation=lambda topic: rpk.add_partitions(topic, 33))
self._do_validate_topic_operation(
topic=topic,
op_name="add_partitions",
expected_to_pass=expect_metadata_changeable,
operation=lambda topic: rpk.add_partitions(topic, 33))

def _alter_cfg(topic):
rpk.alter_topic_config(topic, TopicSpec.PROPERTY_FLUSH_MS, 2000)
rpk.delete_topic_config(topic, TopicSpec.PROPERTY_FLUSH_MS)

self._do_validate_topic_operation(topic=topic,
op_name="alter_topic_configuration",
expected_to_pass=not metadata_locked,
operation=_alter_cfg)
self._do_validate_topic_operation(
topic=topic,
op_name="alter_topic_configuration",
expected_to_pass=expect_metadata_changeable,
operation=_alter_cfg)

self._do_validate_topic_operation(
topic=topic,
op_name="read",
expected_to_pass=not read_blocked,
expected_to_pass=expect_readable,
operation=lambda topic: rpk.consume(topic=topic, n=1, offset=0))

# check if topic is writable only if it is expected to be blocked not to disturb the verifiers.
if produce_blocked:
# check if topic is readable
self._do_validate_topic_operation(
topic=topic,
op_name="produce",
expected_to_pass=not produce_blocked,
operation=lambda topic: rpk.produce(
topic=topic, key="test-key", msg='test-msg'))
if expect_writable:
expect_writable = None

self._do_validate_topic_operation(
topic=topic,
op_name="produce",
expected_to_pass=expect_writable,
operation=lambda topic: rpk.produce(
topic=topic, key="test-key", msg='test-msg'))

def consume_and_validate(self, topic_name, expected_records):
consumer = self.start_consumer(topic=topic_name)
Expand Down Expand Up @@ -883,11 +886,10 @@ def cancel_outbound(self, migration_id, topic_name, producer):
producer.stop_if_running()
self.consume_and_validate(topic_name, producer.acked_records)
self.validate_topic_access(topic=topic_name,
metadata_locked=False,
read_blocked=False,
produce_blocked=False,
assert_topic_present=False,
test_add_partitions=True)
expect_present=True,
expect_metadata_changeable=True,
expect_readable=True,
expect_writable=True)

def cancel_inbound(self, migration_id, topic_name):
self.cancel(migration_id, topic_name)
Expand Down Expand Up @@ -932,16 +934,18 @@ def test_migrated_topic_data_integrity(self, transfer_leadership: bool,
workload_topic.name, producer)

self.validate_topic_access(topic=workload_topic.name,
metadata_locked=True,
read_blocked=False,
produce_blocked=False)
expect_present=True,
expect_metadata_changeable=False,
expect_readable=True,
expect_writable=True)

self.wait_for_migration_states(out_migration_id, ['prepared'])

self.validate_topic_access(topic=workload_topic.name,
metadata_locked=True,
read_blocked=False,
produce_blocked=False)
expect_present=True,
expect_metadata_changeable=False,
expect_readable=True,
expect_writable=True)
if cancellation == CancellationStage('out', 'prepared'):
return self.cancel_outbound(out_migration_id,
workload_topic.name, producer)
Expand All @@ -955,15 +959,17 @@ def test_migrated_topic_data_integrity(self, transfer_leadership: bool,
workload_topic.name, producer)

self.validate_topic_access(topic=workload_topic.name,
metadata_locked=True,
read_blocked=False,
produce_blocked=True)
expect_present=True,
expect_metadata_changeable=False,
expect_readable=True,
expect_writable=None)

self.wait_for_migration_states(out_migration_id, ['executed'])
self.validate_topic_access(topic=workload_topic.name,
metadata_locked=True,
read_blocked=False,
produce_blocked=True)
expect_present=True,
expect_metadata_changeable=False,
expect_readable=True,
expect_writable=False)
if cancellation == CancellationStage('out', 'executed'):
return self.cancel_outbound(out_migration_id,
workload_topic.name, producer)
Expand All @@ -972,10 +978,10 @@ def test_migrated_topic_data_integrity(self, transfer_leadership: bool,
MigrationAction.finish)

self.validate_topic_access(topic=workload_topic.name,
metadata_locked=True,
read_blocked=True,
produce_blocked=True,
assert_topic_present=False)
expect_present=None,
expect_metadata_changeable=False,
expect_readable=False,
expect_writable=False)

self.wait_for_migration_states(out_migration_id,
['cut_over', 'finished'])
Expand All @@ -986,12 +992,16 @@ def test_migrated_topic_data_integrity(self, transfer_leadership: bool,

self.wait_for_migration_states(out_migration_id, ['finished'])
self.validate_topic_access(topic=workload_topic.name,
metadata_locked=True,
read_blocked=True,
produce_blocked=True,
assert_topic_present=False)

expect_present=False,
expect_metadata_changeable=False,
expect_readable=False,
expect_writable=False)
self.admin.delete_data_migration(out_migration_id)
self.validate_topic_access(topic=workload_topic.name,
expect_present=False,
expect_metadata_changeable=False,
expect_readable=False,
expect_writable=False)

# attach topic back
inbound_topic_name = "aliased-workload-topic" if params.use_alias else workload_topic.name
Expand Down Expand Up @@ -1030,17 +1040,18 @@ def test_migrated_topic_data_integrity(self, transfer_leadership: bool,
continue

self.validate_topic_access(topic=inbound_topic_name,
metadata_locked=True,
read_blocked=True,
produce_blocked=True,
assert_topic_present=False)
expect_present=None,
expect_metadata_changeable=False,
expect_readable=False,
expect_writable=False)

self.wait_for_migration_states(in_migration_id, ['prepared'])

self.validate_topic_access(topic=inbound_topic_name,
metadata_locked=True,
read_blocked=True,
produce_blocked=True)
expect_present=True,
expect_metadata_changeable=False,
expect_readable=False,
expect_writable=False)

if cancellation == CancellationStage('in', 'prepared'):
cancellation = None
Expand All @@ -1063,16 +1074,19 @@ def test_migrated_topic_data_integrity(self, transfer_leadership: bool,
continue

self.validate_topic_access(topic=inbound_topic_name,
metadata_locked=True,
read_blocked=True,
produce_blocked=True)
expect_present=True,
expect_metadata_changeable=False,
expect_readable=False,
expect_writable=False)

self.wait_for_migration_states(in_migration_id, ['executed'])

self.validate_topic_access(topic=inbound_topic_name,
metadata_locked=True,
read_blocked=True,
produce_blocked=True)
expect_present=True,
expect_metadata_changeable=False,
expect_readable=False,
expect_writable=False)

if cancellation == CancellationStage('in', 'executed'):
cancellation = None
self.cancel_inbound(in_migration_id, inbound_topic_name)
Expand All @@ -1087,9 +1101,10 @@ def test_migrated_topic_data_integrity(self, transfer_leadership: bool,
self.consume_and_validate(inbound_topic_name,
producer.acked_records)
self.validate_topic_access(topic=inbound_topic_name,
metadata_locked=False,
read_blocked=False,
produce_blocked=False)
expect_present=True,
expect_metadata_changeable=True,
expect_readable=True,
expect_writable=True)
remounted = True

@cluster(num_nodes=3, log_allow_list=MIGRATION_LOG_ALLOW_LIST)
Expand Down

0 comments on commit d6e55e0

Please sign in to comment.