remotePartitionFiles(final TopicIdPartition topicIdPartition);
+
+ abstract boolean assertNoTopicDataOnTierStorage(final String topicName, final Uuid topicId);
+
+ /**
+ * Flag when a step has failed so next steps fail-fast.
+ *
+ * Needed to allow running {@code SingleBrokerTest#stopKafka} after all tests.
+ */
+ public static class FailFastExtension implements TestWatcher, ExecutionCondition {
+ private boolean failed;
+
+ @Override
+ public void testFailed(final ExtensionContext context, final Throwable cause) {
+ LOG.error("Test failed: " + context.getDisplayName());
+ failed = true;
+ }
+
+ @Override
+ public ConditionEvaluationResult evaluateExecutionCondition(final ExtensionContext extensionContext) {
+ if (failed) {
+ return ConditionEvaluationResult.disabled("Already failed");
+ } else {
+ return ConditionEvaluationResult.enabled("Continue testing");
+ }
+ }
+ }
+}
diff --git a/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteLogMetadataDeserializer.java b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteLogMetadataDeserializer.java
new file mode 100644
index 000000000..232099762
--- /dev/null
+++ b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteLogMetadataDeserializer.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2023 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.tieredstorage.e2e.internal;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+
+public class RemoteLogMetadataDeserializer implements Deserializer {
+ private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+ @Override
+ public RemoteLogMetadata deserialize(final String topic, final byte[] data) {
+ return serde.deserialize(data);
+ }
+}
diff --git a/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteLogMetadataTracker.java b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteLogMetadataTracker.java
new file mode 100644
index 000000000..76ed8cff3
--- /dev/null
+++ b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteLogMetadataTracker.java
@@ -0,0 +1,233 @@
+/*
+ * Copyright 2023 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.tieredstorage.e2e.internal;
+
+import java.time.Duration;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
+import static org.awaitility.Awaitility.await;
+
+public class RemoteLogMetadataTracker {
+ private static final Logger LOG = LoggerFactory.getLogger(RemoteLogMetadataTracker.class);
+ private static final String REMOTE_LOG_METADATA_TOPIC = "__remote_log_metadata";
+
+ private final KafkaConsumer consumer;
+ private List partitions;
+
+ private final Map> remoteSegments = new HashMap<>();
+ private final Map remoteSegmentStates = new HashMap<>();
+
+ public RemoteLogMetadataTracker(final String bootstrapServers) {
+ consumer = new KafkaConsumer<>(Map.of(
+ "bootstrap.servers", bootstrapServers,
+ "auto.offset.reset", "earliest"
+ ), new ByteArrayDeserializer(), new RemoteLogMetadataDeserializer());
+ }
+
+ public Map> remoteSegments() {
+ final Map> result = new HashMap<>();
+ for (final Map.Entry> entry : remoteSegments.entrySet()) {
+ final List list = entry.getValue().stream()
+ .sorted(Comparator.comparing(RemoteSegment::startOffset))
+ .collect(Collectors.toList());
+ result.put(entry.getKey(), list);
+ }
+ return result;
+ }
+
+ /**
+ * Initializes the tracker.
+ *
+ * It expects at least one record to be present in __remote_log_metadata
+ * and that all remote segments are in {@code COPY_SEGMENT_FINISHED} state.
+ */
+ public void initialize(final List expectedPartitions) {
+ partitions = consumer.partitionsFor(REMOTE_LOG_METADATA_TOPIC).stream()
+ .map(pi -> new TopicPartition(pi.topic(), pi.partition()))
+ .collect(Collectors.toList());
+
+ await().atMost(Duration.ofSeconds(60))
+ .pollInterval(Duration.ofMillis(100))
+ .until(() ->
+ consumer.endOffsets(partitions).values().stream()
+ .mapToLong(Long::longValue)
+ .sum() > 0);
+
+ // supply segment states where copy has not finished
+ final Supplier