diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index bf8cb437b..0c06bad3e 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -74,8 +74,15 @@ jobs: java-version: 11 distribution: temurin + - name: Read version from properties file + id: read-version + run: | + version=$(grep -oP 'version=\K[^[:space:]]+' gradle.properties) + echo "Version found: $version" + echo "VERSION=$version" >> $GITHUB_ENV + - name: Build with Gradle - run: ./gradlew build + run: make build/distributions/tiered-storage-for-apache-kafka-${{ env.VERSION }}.tgz - name: Perform CodeQL Analysis uses: github/codeql-action/analyze@v2 diff --git a/.github/workflows/main_push_and_pull_request_workflow.yml b/.github/workflows/main_push_and_pull_request_workflow.yml index 13731a70b..c80d83261 100644 --- a/.github/workflows/main_push_and_pull_request_workflow.yml +++ b/.github/workflows/main_push_and_pull_request_workflow.yml @@ -25,12 +25,46 @@ jobs: distribution: temurin - name: Build with Gradle - run: ./gradlew build distTar -x integrationTest + run: make build - name: Run integration tests - run: ./gradlew integrationTest + run: make integration_test + + - name: Upload Build Artifacts + uses: actions/upload-artifact@v3 + with: + name: build-${{ matrix.java-version }} + path: build/distributions/*.tgz + e2e_test: + strategy: + matrix: + java-version: [ 11, 17 ] + runs-on: [ ubuntu-latest ] + name: E2E tests on ${{ matrix.runs-on }} with jdk ${{ matrix.java-version }} + runs-on: ${{ matrix.runs-on }} + needs: + - build + steps: + - name: Checkout code + uses: actions/checkout@v3 + with: + fetch-depth: 0 + + - name: Download Build Artifacts + uses: actions/download-artifact@v3 + with: + name: build-${{ matrix.java-version }} + path: build/distributions + + - name: Display structure of downloaded files + run: ls -R + working-directory: build/distributions - name: Build Docker image run: make docker_image + - name: Run E2E tests + timeout-minutes: 30 + run: make e2e_test + # TODO: publish docker image diff --git a/Makefile b/Makefile index c03cf080e..97aee2646 100644 --- a/Makefile +++ b/Makefile @@ -13,19 +13,30 @@ # See the License for the specific language governing permissions and # limitations under the License. ## -VERSION=0.0.1-SNAPSHOT -IMAGE_TAG=aivenoy/kafka:3.3-2022-10-06-tiered-storage-1-ts-2 +VERSION := $(shell grep -oP 'version=\K[^[:space:]]+' gradle.properties) +IMAGE_NAME := aivenoy/kafka-with-ts-plugin +IMAGE_VERSION := latest +IMAGE_TAG := $(IMAGE_NAME):$(IMAGE_VERSION) + +.PHONY: clean checkstyle build integration_test e2e_test docker_image docker_push -.PHONY: clean clean: ./gradlew clean +checkstyle: + ./gradlew checkstyleMain checkstyleTest checkstyleIntegrationTest + +build: build/distributions/tiered-storage-for-apache-kafka-$(VERSION).tgz + build/distributions/tiered-storage-for-apache-kafka-$(VERSION).tgz: - ./gradlew build distTar -x integrationTest + ./gradlew build distTar -x integrationTest -x e2e:test integration_test: build/distributions/tiered-storage-for-apache-kafka-$(VERSION).tgz ./gradlew integrationTest +e2e_test: build/distributions/tiered-storage-for-apache-kafka-$(VERSION).tgz + ./gradlew e2e:test + .PHONY: docker_image docker_image: build/distributions/tiered-storage-for-apache-kafka-$(VERSION).tgz docker build . \ diff --git a/build.gradle b/build.gradle index 53750de04..a556fe360 100644 --- a/build.gradle +++ b/build.gradle @@ -103,7 +103,7 @@ subprojects { slf4jVersion = "1.7.36" // Don't bump this version without need, as this is the min supported version for the plugin. - kafkaVersion = "3.0.0" + kafkaVersion = "3.3.2" assertJVersion = "3.24.2" @@ -111,6 +111,10 @@ subprojects { jacksonVersion = "2.15.2" + awaitilityVersion = "4.2.0" + + awsSdkVersion = "1.12.520" + testcontainersVersion = "1.18.3" } @@ -133,6 +137,8 @@ subprojects { testImplementation "org.mockito:mockito-core:$mockitoVersion" testImplementation "org.mockito:mockito-junit-jupiter:$mockitoVersion" + testImplementation "org.awaitility:awaitility:$awaitilityVersion" + testRuntimeOnly "org.slf4j:slf4j-log4j12:$slf4jVersion" } @@ -253,3 +259,8 @@ tasks.register('validateDependencies') { assert !conflictsFound : "Dependency conflicts found!" } } + +// TODO fix GCP dependency issues +//tasks.named("check") { +// dependsOn(tasks.named("validateDependencies")) +//} diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index fad9b92d1..e2e0a11e6 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -29,7 +29,10 @@ + + + diff --git a/core/build.gradle b/core/build.gradle index 7757c71e1..d711652eb 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -27,8 +27,6 @@ ext { caffeineVersion = "3.1.7" zstdVersion = "1.5.5-5" - - awaitilityVersion = "4.2.0" } dependencies { @@ -51,6 +49,5 @@ dependencies { testImplementation(project(":storage:filesystem")) testImplementation "com.github.luben:zstd-jni:$zstdVersion" - testImplementation "org.awaitility:awaitility:$awaitilityVersion" integrationTestImplementation sourceSets.test.output } diff --git a/demo/compose-local-fs.yml b/demo/compose-local-fs.yml index c54b85d2a..8231de71d 100644 --- a/demo/compose-local-fs.yml +++ b/demo/compose-local-fs.yml @@ -23,7 +23,7 @@ services: ZOOKEEPER_CLIENT_PORT: 2181 kafka: - image: "aivenoy/kafka:3.3-2022-10-06-tiered-storage-1-ts-2" + image: "aivenoy/kafka-with-ts-plugin" container_name: "kafka-ts" depends_on: - zookeeper diff --git a/demo/compose-s3-aws.yml b/demo/compose-s3-aws.yml index 5a142a562..94afdd302 100644 --- a/demo/compose-s3-aws.yml +++ b/demo/compose-s3-aws.yml @@ -23,7 +23,7 @@ services: ZOOKEEPER_CLIENT_PORT: 2181 kafka: - image: "aivenoy/kafka:3.3-2022-10-06-tiered-storage-1-ts-2" + image: "aivenoy/kafka-with-ts-plugin" container_name: "kafka-ts" depends_on: - zookeeper diff --git a/demo/compose-s3-minio.yml b/demo/compose-s3-minio.yml index 86707c761..e78a432d9 100644 --- a/demo/compose-s3-minio.yml +++ b/demo/compose-s3-minio.yml @@ -23,7 +23,7 @@ services: ZOOKEEPER_CLIENT_PORT: 2181 kafka: - image: "aivenoy/kafka:3.3-2022-10-06-tiered-storage-1-ts-2" + image: "aivenoy/kafka-with-ts-plugin" container_name: "kafka-ts" depends_on: - zookeeper diff --git a/e2e/README.md b/e2e/README.md new file mode 100644 index 000000000..18088da90 --- /dev/null +++ b/e2e/README.md @@ -0,0 +1,8 @@ +# End-to-end tests for Kafka tiered storage + +## Usage + +Docker is needed for running the tests. + +1. Build the image with < TBD >. +2. `./gradlew test` diff --git a/e2e/build.gradle b/e2e/build.gradle new file mode 100644 index 000000000..6c60357e7 --- /dev/null +++ b/e2e/build.gradle @@ -0,0 +1,43 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +dependencies { + testImplementation "org.apache.kafka:kafka-clients:$kafkaVersion" + testImplementation "org.apache.kafka:kafka-storage:$kafkaVersion" + testImplementation "org.apache.kafka:kafka-storage-api:$kafkaVersion" + + testImplementation "commons-io:commons-io:$apacheCommonsIOVersion" + testImplementation "com.amazonaws:aws-java-sdk-s3:$awsSdkVersion" + + testImplementation "org.testcontainers:junit-jupiter:$testcontainersVersion" + testImplementation "org.testcontainers:kafka:$testcontainersVersion" + + testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:$junitVersion" + testRuntimeOnly "org.slf4j:slf4j-log4j12:$slf4jVersion" +} + +tasks.named('test') { + // Use junit platform for unit tests. + useJUnitPlatform() + testLogging { + events 'passed', 'skipped', 'failed' + showStandardStreams = true + showExceptions = true + showStackTraces = true + showCauses = true + exceptionFormat "full" + } +} diff --git a/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/LocalSystemSingleBrokerTest.java b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/LocalSystemSingleBrokerTest.java new file mode 100644 index 000000000..25d846aa1 --- /dev/null +++ b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/LocalSystemSingleBrokerTest.java @@ -0,0 +1,84 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.e2e; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; + +public class LocalSystemSingleBrokerTest extends SingleBrokerTest { + static final String TS_DATA_SUBDIR_HOST = "ts-data"; + static final String TS_DATA_DIR_CONTAINER = "/home/appuser/kafka-tiered-storage"; + + static Path tieredDataDir; + + @BeforeAll + static void init() throws Exception { + setupKafka(kafka -> { + tieredDataDir = baseDir.resolve(TS_DATA_SUBDIR_HOST); + tieredDataDir.toFile().mkdirs(); + tieredDataDir.toFile().setWritable(true, false); + + kafka + .withEnv("KAFKA_RSM_CONFIG_STORAGE_BACKEND_CLASS", + "io.aiven.kafka.tieredstorage.storage.filesystem.FileSystemStorage") + .withEnv("KAFKA_RSM_CONFIG_STORAGE_ROOT", TS_DATA_DIR_CONTAINER) + .withFileSystemBind(tieredDataDir.toString(), TS_DATA_DIR_CONTAINER); + }); + } + + @AfterAll + static void cleanup() { + stopKafka(); + cleanupStorage(); + } + + @Override + boolean assertNoTopicDataOnTierStorage(final String topicName, final Uuid topicId) { + final String prefix = String.format("%s-%s", topicName, topicId.toString()); + try (final var files = Files.list(tieredDataDir)) { + return files.noneMatch(path -> path.getFileName().startsWith(prefix)); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + + @Override + List remotePartitionFiles(final TopicIdPartition topicIdPartition) { + final Path dir = tieredDataDir.resolve( + String.format("%s-%s/%s", + topicIdPartition.topic(), + topicIdPartition.topicId().toString(), + topicIdPartition.partition())); + try (final var paths = Files.list(dir)) { + return paths.map(Path::getFileName) + .map(Path::toString) + .sorted() + .collect(Collectors.toList()); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/S3MinioSingleBrokerTest.java b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/S3MinioSingleBrokerTest.java new file mode 100644 index 000000000..a9036ccda --- /dev/null +++ b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/S3MinioSingleBrokerTest.java @@ -0,0 +1,150 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.e2e; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; + +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.ListObjectsV2Request; +import com.amazonaws.services.s3.model.ListObjectsV2Result; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.testcontainers.Testcontainers; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.startupcheck.OneShotStartupCheckStrategy; +import org.testcontainers.utility.DockerImageName; + +public class S3MinioSingleBrokerTest extends SingleBrokerTest { + + static final int MINIO_PORT = 9000; + static final GenericContainer MINIO = new GenericContainer<>(DockerImageName.parse("minio/minio")) + .withCommand("server", "/data", "--console-address", ":9090") + .withExposedPorts(MINIO_PORT) + .withNetwork(NETWORK) + .withNetworkAliases("minio"); + static final String ACCESS_KEY_ID = "minioadmin"; + static final String SECRET_ACCESS_KEY = "minioadmin"; + static final String REGION = "us-east-1"; + static final String BUCKET = "test-bucket"; + + static AmazonS3 s3Client; + + @BeforeAll + static void init() throws Exception { + MINIO.start(); + + final String minioServerUrl = String.format("http://minio:%s", MINIO_PORT); + + createBucket(minioServerUrl); + + initializeS3Client(); + + setupKafka(kafka -> kafka.withEnv("KAFKA_RSM_CONFIG_STORAGE_BACKEND_CLASS", + "io.aiven.kafka.tieredstorage.storage.s3.S3Storage") + .withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_BUCKET_NAME", BUCKET) + .withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_REGION", REGION) + .withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_PATH_STYLE_ACCESS_ENABLED", "true") + .withEnv("KAFKA_RSM_CONFIG_STORAGE_AWS_ACCESS_KEY_ID", ACCESS_KEY_ID) + .withEnv("KAFKA_RSM_CONFIG_STORAGE_AWS_SECRET_ACCESS_KEY", SECRET_ACCESS_KEY) + .withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_ENDPOINT_URL", minioServerUrl) + .dependsOn(MINIO)); + } + + private static void initializeS3Client() { + final Integer mappedPort = MINIO.getFirstMappedPort(); + Testcontainers.exposeHostPorts(mappedPort); + s3Client = AmazonS3ClientBuilder + .standard() + .withEndpointConfiguration( + new AwsClientBuilder.EndpointConfiguration( + "http://localhost:" + mappedPort, + REGION + ) + ) + .withCredentials( + new AWSStaticCredentialsProvider( + new BasicAWSCredentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY) + ) + ) + .withPathStyleAccessEnabled(true) + .build(); + + s3Client.listBuckets() + .forEach(bucket -> LOG.info("S3 bucket: " + bucket.getName())); + } + + private static void createBucket(final String minioServerUrl) { + final String cmd = + "/usr/bin/mc config host add local " + + minioServerUrl + " " + ACCESS_KEY_ID + " " + SECRET_ACCESS_KEY + " --api s3v4 &&" + + "/usr/bin/mc mb local/test-bucket;\n"; + + final GenericContainer mcContainer = new GenericContainer<>("minio/mc") + .withNetwork(NETWORK) + .withStartupCheckStrategy(new OneShotStartupCheckStrategy()) + .withCreateContainerCmdModifier(containerCommand -> containerCommand + .withTty(true) + .withEntrypoint("/bin/sh", "-c", cmd)); + mcContainer.start(); + } + + + @AfterAll + static void cleanup() { + stopKafka(); + + MINIO.stop(); + + cleanupStorage(); + } + + @Override + boolean assertNoTopicDataOnTierStorage(final String topicName, final Uuid topicId) { + final String prefix = String.format("%s-%s", topicName, topicId.toString()); + + final var summaries = s3Client.listObjectsV2(BUCKET, prefix).getObjectSummaries(); + return summaries.isEmpty(); + } + + @Override + List remotePartitionFiles(final TopicIdPartition topicIdPartition) { + ListObjectsV2Request request = new ListObjectsV2Request().withBucketName(BUCKET); + final List summaries = new ArrayList<>(); + ListObjectsV2Result result; + while ((result = s3Client.listObjectsV2(request)).isTruncated()) { + summaries.addAll(result.getObjectSummaries()); + request = request.withContinuationToken(result.getNextContinuationToken()); + } + summaries.addAll(result.getObjectSummaries()); + + return summaries.stream() + .map(S3ObjectSummary::getKey) + .map(k -> k.substring(k.lastIndexOf('/') + 1)) + .sorted() + .collect(Collectors.toList()); + } +} diff --git a/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/SingleBrokerTest.java b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/SingleBrokerTest.java new file mode 100644 index 000000000..3491ca057 --- /dev/null +++ b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/SingleBrokerTest.java @@ -0,0 +1,617 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.e2e; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.RecordsToDelete; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; + +import io.aiven.kafka.tieredstorage.e2e.internal.RemoteLogMetadataTracker; +import io.aiven.kafka.tieredstorage.e2e.internal.RemoteSegment; + +import com.github.dockerjava.api.model.Ulimit; +import org.apache.commons.io.FileUtils; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestMethodOrder; +import org.junit.jupiter.api.extension.ConditionEvaluationResult; +import org.junit.jupiter.api.extension.ExecutionCondition; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.TestWatcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import static java.util.stream.Collectors.groupingBy; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; +import static org.awaitility.Awaitility.await; + +@Testcontainers +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@ExtendWith(SingleBrokerTest.FailFastExtension.class) +abstract class SingleBrokerTest { + + static final Logger LOG = LoggerFactory.getLogger(SingleBrokerTest.class); + + static final String KAFKA_DATA_SUBDIR_HOST = "data"; + static final String KAFKA_DATA_DIR_CONTAINER = "/var/lib/kafka/data"; + + static final String TOPIC_0 = "topic0"; + static final String TOPIC_1 = "topic1"; + static final TopicPartition TP_0_0 = new TopicPartition(TOPIC_0, 0); + static final TopicPartition TP_0_1 = new TopicPartition(TOPIC_0, 1); + static final TopicPartition TP_1_0 = new TopicPartition(TOPIC_1, 0); + static final List TOPIC_PARTITIONS = List.of(TP_0_0, TP_0_1, TP_1_0); + + static final int CHUNK_SIZE = 1024; // TODO something more reasonable? + static final int SEGMENT_SIZE_T0 = 256 * CHUNK_SIZE + CHUNK_SIZE / 2; + static final int SEGMENT_SIZE_T1 = 123 * CHUNK_SIZE + 123; + + static final int VALUE_SIZE_MIN = CHUNK_SIZE / 4 - 3; + static final int VALUE_SIZE_MAX = CHUNK_SIZE * 2 + 5; + + static final long RECORDS_TO_PRODUCE = 10_000; + + static final Duration TOTAL_RETENTION = Duration.ofHours(1); + static final Duration LOCAL_RETENTION = Duration.ofSeconds(5); + static final Network NETWORK = Network.newNetwork(); + + static Path baseDir; + // Can't use @TempDir, because it's initialized too late. + static Path localDataDir; + + static KafkaContainer kafka; + + static AdminClient adminClient; + + static RemoteLogMetadataTracker remoteLogMetadataTracker; + + static TopicIdPartition t0p0; + static TopicIdPartition t0p1; + static TopicIdPartition t1p0; + + static void setupKafka(final Consumer tsPluginSetup) throws Exception { + + try { + baseDir = Files.createTempDirectory("junit"); + localDataDir = baseDir.resolve(KAFKA_DATA_SUBDIR_HOST); + localDataDir.toFile().mkdirs(); + localDataDir.toFile().setWritable(true, false); + } catch (final IOException e) { + throw new RuntimeException(e); + } + + kafka = new KafkaContainer(DockerImageName.parse("aivenoy/kafka-with-ts-plugin") + .asCompatibleSubstituteFor("confluentinc/cp-kafka") + ) + .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false") + .withEnv("KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS", "10000") + // enable tiered storage + .withEnv("KAFKA_REMOTE_LOG_STORAGE_SYSTEM_ENABLE", "true") + .withEnv("KAFKA_REMOTE_LOG_MANAGER_TASK_INTERVAL_MS", "5000") + // remote metadata manager + .withEnv("KAFKA_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME", + "org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager") + .withEnv("KAFKA_RLMM_CONFIG_REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR", "1") + .withEnv("KAFKA_REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME", "BROKER") + // remote storage manager + .withEnv("KAFKA_REMOTE_LOG_STORAGE_MANAGER_CLASS_PATH", "/tiered-storage-for-apache-kafka/*") + .withEnv("KAFKA_REMOTE_LOG_STORAGE_MANAGER_CLASS_NAME", "io.aiven.kafka.tieredstorage.RemoteStorageManager") + .withEnv("KAFKA_RSM_CONFIG_CHUNK_SIZE", Integer.toString(CHUNK_SIZE)) + .withEnv("KAFKA_RSM_CONFIG_CHUNK_CACHE_CLASS", + "io.aiven.kafka.tieredstorage.chunkmanager.cache.InMemoryChunkCache") + .withEnv("KAFKA_RSM_CONFIG_CHUNK_CACHE_SIZE", "-1") + // other tweaks + .withEnv("KAFKA_OPTS", "") // disable JMX exporter + .withEnv("KAFKA_LOG4J_LOGGERS", "io.aiven.kafka.tieredstorage=DEBUG," + + "org.apache.kafka.server.log.remote.metadata.storage=DEBUG," + + "state.change.logger=INFO") + .withCreateContainerCmdModifier( + cmd -> cmd.getHostConfig().withUlimits(List.of(new Ulimit("nofile", 30_000L, 30_000L)))) + .withFileSystemBind(localDataDir.toString(), KAFKA_DATA_DIR_CONTAINER) + .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("kafka"))) + .withNetwork(NETWORK); + + tsPluginSetup.accept(kafka); + + kafka.start(); + + adminClient = AdminClient.create(Map.of( + "bootstrap.servers", kafka.getBootstrapServers() + )); + + remoteLogMetadataTracker = new RemoteLogMetadataTracker(kafka.getBootstrapServers()); + + createTopics(); + } + + static void stopKafka() { + if (adminClient != null) { + adminClient.close(); + } + + kafka.stop(); + } + + static void cleanupStorage() { + if (baseDir != null) { + // TODO: failing silently atm. As Delete Topics test is disabled, and topic directories are not removed + // they cannot be deleted as they have different user/group. see #366 + FileUtils.deleteQuietly(baseDir.toFile()); + } + } + + static void createTopics() throws Exception { + final NewTopic newTopic0 = new NewTopic(TOPIC_0, 2, (short) 1) + .configs(Map.of( + "remote.storage.enable", "true", + "segment.bytes", Integer.toString(SEGMENT_SIZE_T0), + "retention.ms", Long.toString(TOTAL_RETENTION.toMillis()), + "local.retention.ms", Long.toString(LOCAL_RETENTION.toMillis()) + )); + final NewTopic newTopic1 = new NewTopic(TOPIC_1, 1, (short) 1) + .configs(Map.of( + "remote.storage.enable", "true", + "segment.bytes", Integer.toString(SEGMENT_SIZE_T1), + "retention.ms", Long.toString(TOTAL_RETENTION.toMillis()), + "local.retention.ms", Long.toString(LOCAL_RETENTION.toMillis()) + )); + final var topicsToCreate = List.of(newTopic0, newTopic1); + adminClient.createTopics(topicsToCreate) + .all().get(30, TimeUnit.SECONDS); + + adminClient.describeTopics(List.of(TOPIC_0, TOPIC_1)) + .allTopicNames().get(30, TimeUnit.SECONDS) + .values().forEach(td -> { + if (td.name().equals(TOPIC_0)) { + t0p0 = new TopicIdPartition(td.topicId(), TP_0_0); + t0p1 = new TopicIdPartition(td.topicId(), TP_0_1); + } else if (td.name().equals(TOPIC_1)) { + t1p0 = new TopicIdPartition(td.topicId(), TP_1_0); + } else { + fail("Unknown topic %s", td); + } + }); + + LOG.info("Topics {} created successfully", topicsToCreate); + } + + @Test + @Order(1) + void remoteCopy() throws Exception { + fillTopics(); + + remoteLogMetadataTracker.initialize(List.of(t0p0, t0p1, t1p0)); + + // Check remote segments are present. + final var allRemoteSegments = remoteLogMetadataTracker.remoteSegments(); + + for (final Map.Entry> entry : allRemoteSegments.entrySet()) { + final Map> segmentFiles = remotePartitionFiles(entry.getKey()).stream() + .collect(groupingBy(SingleBrokerTest::extractSegmentId)); + + for (final RemoteSegment remoteLogSegment : entry.getValue()) { + final String key = remoteLogSegment.remoteLogSegmentId().id().toString(); + assertThat(segmentFiles).containsKey(key); + assertThat(segmentFiles.get(key).stream() + .map(SingleBrokerTest::extractSuffix)) + .containsExactlyInAnyOrder( + "index", "leader-epoch-checkpoint", "log", "rsm-manifest", "snapshot", "timeindex"); + } + } + + // Check that at least local segments are fully deleted for following test to read from remote tier + for (final TopicIdPartition tp : List.of(t0p0, t0p1, t1p0)) { + await().atMost(Duration.ofSeconds(60)) + .pollInterval(Duration.ofMillis(100)) + .until(() -> localLogFiles(tp.topicPartition()).size() == 1); + + final var localLogs = localLogFiles(tp.topicPartition()); + LOG.info("Local logs for {} [{}]: \n{}", tp, localLogs.size(), localLogs); + final var remoteSegments = allRemoteSegments.get(tp); + LOG.info("Remote logs for {} [{}]: \n{}", tp, remoteSegments.size(), remoteSegments); + } + } + + void fillTopics() throws Exception { + try (final var producer = new KafkaProducer<>(Map.of( + "bootstrap.servers", kafka.getBootstrapServers(), + "linger.ms", Long.toString(Duration.ofSeconds(1).toMillis()), + "batch.size", Integer.toString(1_000_000) + ), new ByteArraySerializer(), new ByteArraySerializer())) { + + for (final TopicPartition topicPartition : TOPIC_PARTITIONS) { + long offset = 0; + while (offset < RECORDS_TO_PRODUCE) { + final int batchSize = batchSize(offset); + final ArrayList> sendFutures = new ArrayList<>(batchSize); + for (int i = 0; i < batchSize; i++) { + final var record = createProducerRecord( + topicPartition.topic(), + topicPartition.partition(), + offset++ + ); + final Future sendFuture = producer.send(record); + sendFutures.add(sendFuture); + } + producer.flush(); + for (final Future f : sendFutures) { + f.get(30, TimeUnit.SECONDS); + } + } + + LOG.info("{} records produced to {}", RECORDS_TO_PRODUCE, topicPartition); + } + } + } + + private static List localLogFiles(final TopicPartition tp) { + final Path dir = localDataDir.resolve(String.format("%s-%d", tp.topic(), tp.partition())); + try (final var paths = Files.list(dir)) { + return paths + .map(Path::toFile) + .sorted(Comparator.comparing(File::getName)) + .filter(f -> f.getName().endsWith(".log")) + .collect(Collectors.toList()); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + + @Test + @Order(2) + void remoteRead() { + try (final var consumer = new KafkaConsumer<>(Map.of( + "bootstrap.servers", kafka.getBootstrapServers(), + "fetch.max.bytes", "1" + ), new ByteArrayDeserializer(), new ByteArrayDeserializer())) { + + // Check the beginning and end offsets. + final Map startOffsets = consumer.beginningOffsets(TOPIC_PARTITIONS); + assertThat(startOffsets).containsAllEntriesOf( + Map.of( + TP_0_0, 0L, + TP_0_1, 0L, + TP_1_0, 0L)); + final Map endOffsets = consumer.endOffsets(TOPIC_PARTITIONS); + assertThat(endOffsets).containsAllEntriesOf( + Map.of( + TP_0_0, RECORDS_TO_PRODUCE + 1, + TP_0_1, RECORDS_TO_PRODUCE + 1, + TP_1_0, RECORDS_TO_PRODUCE + 1)); + + LOG.info("start and end offsets are as expected"); + + // TODO check for EARLIEST_LOCAL_TIMESTAMP when available in client + + // Read by record. + LOG.info("Starting validation per record"); + + for (final TopicPartition tp : TOPIC_PARTITIONS) { + consumer.assign(List.of(tp)); + LOG.info("Checking records from partition {}", tp); + for (long offset = 0; offset < RECORDS_TO_PRODUCE; offset++) { + consumer.seek(tp, offset); + final var record = consumer.poll(Duration.ofSeconds(5)).records(tp).get(0); + final var expectedRecord = createProducerRecord(tp.topic(), tp.partition(), offset); + checkRecord(record, offset, expectedRecord); + if (offset % 500 == 0) { + LOG.info("{} of {} records checked", offset, RECORDS_TO_PRODUCE); + } + } + LOG.info("Records from partition {} checked", tp); + } + + LOG.info("Validation per record completed"); + + // Read by batches. + LOG.info("Starting validation per batch"); + + for (final TopicPartition tp : TOPIC_PARTITIONS) { + consumer.assign(List.of(tp)); + long offset = 0; + while (offset < RECORDS_TO_PRODUCE) { + consumer.seek(tp, offset); + final var records = consumer.poll(Duration.ofSeconds(1)).records(tp); + assertThat(records).hasSize(batchSize(offset)); + for (final var record : records) { + final var expectedRecord = createProducerRecord(tp.topic(), tp.partition(), offset); + checkRecord(record, offset, expectedRecord); + offset += 1; + } + } + } + + LOG.info("Validation per batch completed"); + } + + // Read over batch borders. + LOG.info("Starting validation over batch borders"); + + final ArrayList batchBorders = new ArrayList<>(); + // Skip the first and last batches because we can't read "over" their left and right border. + for (long offset = 1; offset < RECORDS_TO_PRODUCE - 1; ) { + batchBorders.add(offset); + final int batchSize = batchSize(offset); + offset += batchSize; + } + try (final var consumer = new KafkaConsumer<>(Map.of( + "bootstrap.servers", kafka.getBootstrapServers(), + "fetch.max.bytes", Integer.toString(VALUE_SIZE_MAX * 50) + ), new ByteArrayDeserializer(), new ByteArrayDeserializer())) { + for (final TopicPartition tp : TOPIC_PARTITIONS) { + consumer.assign(List.of(tp)); + for (final long batchBorder : batchBorders) { + final var offset = batchBorder - 1; + consumer.seek(tp, offset); + List> records = consumer.poll(Duration.ofSeconds(1)).records(tp); + checkRecord( + records.get(0), + offset, + createProducerRecord(tp.topic(), tp.partition(), offset)); + if (records.size() > 1) { + checkRecord( + records.get(1), + batchBorder, + createProducerRecord(tp.topic(), tp.partition(), batchBorder)); + } else { + // It's possible when the batch is the last in the segment: + // the broker won't return records over a segment border. + records = consumer.poll(Duration.ofSeconds(1)).records(tp); + checkRecord( + records.get(0), + batchBorder, + createProducerRecord(tp.topic(), tp.partition(), batchBorder)); + } + } + } + } + + LOG.info("Validation over batch borders completed"); + } + + @Test + @Disabled("https://github.com/aiven/kafka/issues/20") + @Order(3) + void remoteManualDelete() throws Exception { + final long newStartOffset = RECORDS_TO_PRODUCE / 2; + + final List remoteSegmentsBefore = remoteLogMetadataTracker.remoteSegments() + .get(t0p0); + final List segmentsToBeDeleted = remoteSegmentsBefore.stream() + .filter(rs -> rs.endOffset() < newStartOffset) + .collect(Collectors.toList()); + + adminClient.deleteRecords(Map.of(TP_0_0, RecordsToDelete.beforeOffset(newStartOffset))) + .all().get(5, TimeUnit.SECONDS); + + remoteLogMetadataTracker.waitUntilSegmentsAreDeleted(segmentsToBeDeleted); + + try (final var consumer = new KafkaConsumer<>(Map.of( + "bootstrap.servers", kafka.getBootstrapServers(), + "auto.offset.reset", "earliest" + ), new ByteArrayDeserializer(), new ByteArrayDeserializer())) { + + // Check the beginning and end offsets. + final Map startOffsets = consumer.beginningOffsets(List.of(TP_0_0)); + assertThat(startOffsets).containsEntry(TP_0_0, newStartOffset); + final Map endOffsets = consumer.endOffsets(List.of(TP_0_0)); + assertThat(endOffsets).containsEntry(TP_0_0, RECORDS_TO_PRODUCE + 1); + // TODO check for EARLIEST_LOCAL_TIMESTAMP when available in client + + // TODO check segments deleted on the remote + + // Check what we can now consume. + consumer.assign(List.of(TP_0_0)); + consumer.seek(TP_0_0, 0); + final ConsumerRecord record = consumer.poll(Duration.ofSeconds(1)).records(TP_0_0).get(0); + assertThat(record.offset()).isEqualTo(newStartOffset); + } + } + + @Test + @Order(4) + void remoteCleanupDueToRetention() throws Exception { + // Collect all remote segments, as after changing retention, all should be deleted. + final var remoteSegmentsBefore = remoteLogMetadataTracker.remoteSegments(); + final var segmentsToBeDeleted = Stream.concat( + remoteSegmentsBefore.get(t0p0).stream(), + remoteSegmentsBefore.get(t0p1).stream() + ).collect(Collectors.toList()); + + LOG.info("Forcing cleanup by setting bytes retention to 1"); + + final var alterConfigs = List.of(new AlterConfigOp( + new ConfigEntry("retention.bytes", "1"), AlterConfigOp.OpType.SET)); + adminClient.incrementalAlterConfigs(Map.of( + new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_0), alterConfigs + )).all().get(5, TimeUnit.SECONDS); + + LOG.info("Starting cleanup validation"); + + try (final var consumer = new KafkaConsumer<>(Map.of( + "bootstrap.servers", kafka.getBootstrapServers(), + "auto.offset.reset", "earliest" + ), new ByteArrayDeserializer(), new ByteArrayDeserializer())) { + + // Get earliest offset available locally + final long newStartOffset = localLogFiles(TP_0_0).stream() + .mapToLong(f -> Long.parseLong(f.getName().replace(".log", ""))) + .max() + .getAsLong(); + // and wait til expected earliest offset is in place + await() + .pollInterval(Duration.ofSeconds(5)) + .atMost(Duration.ofSeconds(30)) + .until(() -> { + final var beginningOffset = consumer.beginningOffsets(List.of(TP_0_0)).get(TP_0_0); + LOG.info("Beginning offset found {}, expecting {}", beginningOffset, newStartOffset); + return beginningOffset.equals(newStartOffset); + }); + + final Map endOffsets = consumer.endOffsets(List.of(TP_0_0)); + assertThat(endOffsets).containsEntry(TP_0_0, RECORDS_TO_PRODUCE + 1); + + // TODO check for EARLIEST_LOCAL_TIMESTAMP when available in client + + consumer.assign(List.of(TP_0_0)); + consumer.seek(TP_0_0, 0); + + final ConsumerRecord record = consumer.poll(Duration.ofSeconds(1)).records(TP_0_0).get(0); + assertThat(record.offset()).isEqualTo(newStartOffset); + + remoteLogMetadataTracker.waitUntilSegmentsAreDeleted(segmentsToBeDeleted); + await() + .between(Duration.ofSeconds(1), Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) + .until(() -> assertNoTopicDataOnTierStorage(t0p0.topic(), t0p0.topicId())); + } + + LOG.info("Cleanup validation completed"); + } + + @Test + @Disabled("https://github.com/aiven/kafka/issues/35") + @Order(5) + void topicDelete() throws Exception { + LOG.info("Starting topic delete test"); + + final var remoteSegmentsBefore = remoteLogMetadataTracker.remoteSegments(); + final var segmentsToBeDeleted = remoteSegmentsBefore.get(t1p0); + + adminClient.deleteTopics(List.of(TP_1_0.topic())) + .all().get(30, TimeUnit.SECONDS); + + remoteLogMetadataTracker.waitUntilSegmentsAreDeleted(segmentsToBeDeleted); + await() + .between(Duration.ofSeconds(1), Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) + .until(() -> assertNoTopicDataOnTierStorage(t1p0.topic(), t1p0.topicId())); + + LOG.info("Topic delete test completed"); + } + + private static String extractSegmentId(final String fileName) { + return fileName.substring(21, fileName.lastIndexOf('.')); + } + + private static String extractSuffix(final String fileName) { + return fileName.substring(fileName.lastIndexOf('.') + 1); + } + + /** + * Variable batch size based on the offset received + */ + private static int batchSize(final long offset) { + return (int) offset % 10 + 1; + } + + private static ProducerRecord createProducerRecord(final String topic, + final int partition, + final long offset) { + final int seed = (int) ((offset + partition) % 10); + + final int keySize = seed * 2 + 1; + final var key = ByteBuffer.allocate(keySize); + final var keyPattern = (topic + "-" + partition + "-" + offset).getBytes(); + while (key.remaining() >= keyPattern.length) { + key.put(keyPattern); + } + key.put(keyPattern, 0, key.remaining()); + assertThat(key.hasRemaining()).isFalse(); + + final int valueSize = VALUE_SIZE_MIN + (VALUE_SIZE_MAX - VALUE_SIZE_MIN) / 10 * seed; + final var value = ByteBuffer.allocate(valueSize); + + return new ProducerRecord<>(topic, partition, key.array(), value.array()); + } + + private void checkRecord(final ConsumerRecord actual, + final long offset, + final ProducerRecord expected) { + assertThat(actual.offset()).isEqualTo(offset); + assertThat(actual.key()).isEqualTo(expected.key()); + assertThat(actual.value()).isEqualTo(expected.value()); + } + + abstract List remotePartitionFiles(final TopicIdPartition topicIdPartition); + + abstract boolean assertNoTopicDataOnTierStorage(final String topicName, final Uuid topicId); + + /** + * Flag when a step has failed so next steps fail-fast. + * + *

Needed to allow running {@code SingleBrokerTest#stopKafka} after all tests. + */ + public static class FailFastExtension implements TestWatcher, ExecutionCondition { + private boolean failed; + + @Override + public void testFailed(final ExtensionContext context, final Throwable cause) { + LOG.error("Test failed: " + context.getDisplayName()); + failed = true; + } + + @Override + public ConditionEvaluationResult evaluateExecutionCondition(final ExtensionContext extensionContext) { + if (failed) { + return ConditionEvaluationResult.disabled("Already failed"); + } else { + return ConditionEvaluationResult.enabled("Continue testing"); + } + } + } +} diff --git a/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteLogMetadataDeserializer.java b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteLogMetadataDeserializer.java new file mode 100644 index 000000000..232099762 --- /dev/null +++ b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteLogMetadataDeserializer.java @@ -0,0 +1,30 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.e2e.internal; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; + +public class RemoteLogMetadataDeserializer implements Deserializer { + private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); + + @Override + public RemoteLogMetadata deserialize(final String topic, final byte[] data) { + return serde.deserialize(data); + } +} diff --git a/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteLogMetadataTracker.java b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteLogMetadataTracker.java new file mode 100644 index 000000000..76ed8cff3 --- /dev/null +++ b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteLogMetadataTracker.java @@ -0,0 +1,233 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.e2e.internal; + +import java.time.Duration; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; +import static org.awaitility.Awaitility.await; + +public class RemoteLogMetadataTracker { + private static final Logger LOG = LoggerFactory.getLogger(RemoteLogMetadataTracker.class); + private static final String REMOTE_LOG_METADATA_TOPIC = "__remote_log_metadata"; + + private final KafkaConsumer consumer; + private List partitions; + + private final Map> remoteSegments = new HashMap<>(); + private final Map remoteSegmentStates = new HashMap<>(); + + public RemoteLogMetadataTracker(final String bootstrapServers) { + consumer = new KafkaConsumer<>(Map.of( + "bootstrap.servers", bootstrapServers, + "auto.offset.reset", "earliest" + ), new ByteArrayDeserializer(), new RemoteLogMetadataDeserializer()); + } + + public Map> remoteSegments() { + final Map> result = new HashMap<>(); + for (final Map.Entry> entry : remoteSegments.entrySet()) { + final List list = entry.getValue().stream() + .sorted(Comparator.comparing(RemoteSegment::startOffset)) + .collect(Collectors.toList()); + result.put(entry.getKey(), list); + } + return result; + } + + /** + * Initializes the tracker. + * + *

It expects at least one record to be present in __remote_log_metadata + * and that all remote segments are in {@code COPY_SEGMENT_FINISHED} state. + */ + public void initialize(final List expectedPartitions) { + partitions = consumer.partitionsFor(REMOTE_LOG_METADATA_TOPIC).stream() + .map(pi -> new TopicPartition(pi.topic(), pi.partition())) + .collect(Collectors.toList()); + + await().atMost(Duration.ofSeconds(60)) + .pollInterval(Duration.ofMillis(100)) + .until(() -> + consumer.endOffsets(partitions).values().stream() + .mapToLong(Long::longValue) + .sum() > 0); + + // supply segment states where copy has not finished + final Supplier> segmentsStillCopying = () -> + remoteSegmentStates.entrySet().stream() + .filter(s -> s.getValue() != RemoteLogSegmentState.COPY_SEGMENT_FINISHED) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + consumer.assign(partitions); + consumer.seekToBeginning(partitions); + + LOG.info("Initializing remote segments"); + + final var timeout = Duration.ofMinutes(1); + final var startAt = System.currentTimeMillis(); + + final var metadataRecords = new LinkedHashMap, RemoteLogMetadata>(); + boolean isReady = false; + while (!isReady) { + consumer.poll(Duration.ofSeconds(5)).forEach(r -> { + final RemoteLogMetadata metadata = r.value(); + assertThat(metadata) + .withFailMessage("Unexpected metadata type: %s", metadata) + .isInstanceOfAny(RemoteLogSegmentMetadata.class, RemoteLogSegmentMetadataUpdate.class); + + remoteSegments.putIfAbsent(metadata.topicIdPartition(), new HashSet<>()); + metadataRecords.put(Map.entry(new TopicPartition(r.topic(), r.partition()), r.offset()), metadata); + if (metadata instanceof RemoteLogSegmentMetadata) { + final var segmentMetadata = (RemoteLogSegmentMetadata) metadata; + remoteSegmentStates.put(segmentMetadata.remoteLogSegmentId(), segmentMetadata.state()); + remoteSegments.get(metadata.topicIdPartition()).add( + new RemoteSegment( + segmentMetadata.remoteLogSegmentId(), + segmentMetadata.startOffset(), + segmentMetadata.endOffset() + )); + } else if (metadata instanceof RemoteLogSegmentMetadataUpdate) { + final var update = (RemoteLogSegmentMetadataUpdate) metadata; + remoteSegmentStates.put(update.remoteLogSegmentId(), update.state()); + + // Sanity check: if we see an update, the original record should be already taken into account. + assertThat(remoteSegments.get(metadata.topicIdPartition())) + .map(RemoteSegment::remoteLogSegmentId) + .contains(update.remoteLogSegmentId()); + } + }); + + isReady = segmentsStillCopying.get().isEmpty() // copies have not finished + && expectedPartitions.stream().allMatch(remoteSegments::containsKey); // AND not all segments present + + // check for timeout + final var running = System.currentTimeMillis() - startAt; + if (running > timeout.toMillis()) { + LOG.warn("Timeout waiting for segments copy finished events to arrive after {} running", running); + break; + } + } + + if (!isReady) { // if finished because of timeout + final var notCopied = segmentsStillCopying.get(); + fail("Fail to receive copy metadata records for %s out of %s segments." + + "%nSegments missing: %n%s" + + "%nMetadata events received: %n%s", + notCopied.size(), remoteSegments.size(), + notCopied.entrySet().stream() + .map(e -> e.getKey().toString() + + " => " + + e.getValue()) + .collect(Collectors.joining("\n")), + metadataRecords.entrySet().stream() + .map(e -> e.getKey().getKey() + + "-" + e.getKey().getValue() + ":" + e.getValue() + " => " + + e.getValue()) + .collect(Collectors.joining("\n")) + ); + } + + assertThat(remoteSegments.keySet()).containsExactlyInAnyOrderElementsOf(expectedPartitions); + + LOG.info("Remote Log Metadata Tracker initialized"); + } + + public void waitUntilSegmentsAreDeleted(final List segmentsToBeDeleted) { + final Supplier> segmentsNotDeleted = + () -> segmentsToBeDeleted.stream() + .map(rs -> Map.entry(rs, remoteSegmentStates.get(rs.remoteLogSegmentId()))) + .filter(rs -> rs.getValue() != RemoteLogSegmentState.DELETE_SEGMENT_FINISHED) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + LOG.info("Starting validation of remote segments deleted"); + + final var timeout = Duration.ofMinutes(1); + final var startAt = System.currentTimeMillis(); + + final var metadataRecords = new LinkedHashMap, RemoteLogMetadata>(); + + boolean isReady = false; + while (!isReady) { + (consumer.poll(Duration.ofSeconds(5))).forEach(r -> { + final RemoteLogMetadata metadata = r.value(); + metadataRecords.put(Map.entry(new TopicPartition(r.topic(), r.partition()), r.offset()), metadata); + if (metadata instanceof RemoteLogSegmentMetadataUpdate) { + final var metadataUpdate = (RemoteLogSegmentMetadataUpdate) metadata; + remoteSegmentStates.put(metadataUpdate.remoteLogSegmentId(), metadataUpdate.state()); + if (metadataUpdate.state() == RemoteLogSegmentState.DELETE_SEGMENT_FINISHED) { + remoteSegments.get(metadata.topicIdPartition()) + .removeIf( + segment -> segment.remoteLogSegmentId().equals(metadataUpdate.remoteLogSegmentId())); + } + } + }); + + isReady = segmentsNotDeleted.get().isEmpty(); + + final var running = System.currentTimeMillis() - startAt; + if (running > timeout.toMillis()) { + LOG.warn("Timeout waiting for segments delete finished events to arrive after {} running", running); + break; + } + } + + if (!segmentsNotDeleted.get().isEmpty()) { + final var notDeleted = segmentsNotDeleted.get(); + fail("Fail to receive delete metadata records for %s out of %s segments." + + "%nSegments missing: %n%s" + + "%nMetadata events received: %n%s", + notDeleted.size(), segmentsToBeDeleted.size(), + notDeleted.entrySet().stream() + .map(e -> e.getKey().remoteLogSegmentId().toString() + + "[" + e.getKey().startOffset() + "-" + e.getKey().endOffset() + "] => " + + e.getValue()) + .collect(Collectors.joining("\n")), + metadataRecords.entrySet().stream() + .map(e -> e.getKey().getKey() + + "-" + e.getKey().getValue() + ":" + e.getValue() + " => " + + e.getValue()) + .collect(Collectors.joining("\n")) + ); + } + + LOG.info("Remote segments deleted validation complete"); + } +} diff --git a/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteSegment.java b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteSegment.java new file mode 100644 index 000000000..aa00b0eae --- /dev/null +++ b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteSegment.java @@ -0,0 +1,75 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.e2e.internal; + +import java.util.Objects; + +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; + +public final class RemoteSegment { + private final RemoteLogSegmentId remoteLogSegmentId; + private final long startOffset; + private final long endOffset; + + RemoteSegment(final RemoteLogSegmentId remoteLogSegmentId, + final long startOffset, + final long endOffset) { + this.remoteLogSegmentId = remoteLogSegmentId; + this.startOffset = startOffset; + this.endOffset = endOffset; + } + + public RemoteLogSegmentId remoteLogSegmentId() { + return remoteLogSegmentId; + } + + public long startOffset() { + return startOffset; + } + + public long endOffset() { + return endOffset; + } + + @Override + public boolean equals(final Object obj) { + if (obj == this) { + return true; + } + if (obj == null || obj.getClass() != this.getClass()) { + return false; + } + final var that = (RemoteSegment) obj; + return Objects.equals(this.remoteLogSegmentId, that.remoteLogSegmentId) + && this.startOffset == that.startOffset + && this.endOffset == that.endOffset; + } + + @Override + public int hashCode() { + return Objects.hash(remoteLogSegmentId, startOffset, endOffset); + } + + @Override + public String toString() { + return "RemoteSegment[" + + "remoteLogSegmentId=" + remoteLogSegmentId + ", " + + "startOffset=" + startOffset + ", " + + "endOffset=" + endOffset + ']'; + } + +} diff --git a/e2e/src/test/resources/log4j.properties b/e2e/src/test/resources/log4j.properties new file mode 100644 index 000000000..9fed516a4 --- /dev/null +++ b/e2e/src/test/resources/log4j.properties @@ -0,0 +1,28 @@ +## +# Copyright 2023 Aiven Oy +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## + +log4j.rootLogger=INFO, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.logger.org.apache.kafka.clients.consumer.KafkaConsumer=WARN + +org.testcontainers=INFO +tc=INFO +com.github.dockerjava=WARN +com.github.dockerjava.zerodep.shaded.org.apache.hc.client5.http.wire=OFF diff --git a/settings.gradle b/settings.gradle index 841bc9f67..10c928539 100644 --- a/settings.gradle +++ b/settings.gradle @@ -21,3 +21,4 @@ include 'storage:core' include 'storage:filesystem' include 'storage:gcs' include 'storage:s3' +include 'e2e' diff --git a/storage/s3/build.gradle b/storage/s3/build.gradle index 0db62f656..50001e516 100644 --- a/storage/s3/build.gradle +++ b/storage/s3/build.gradle @@ -16,12 +16,6 @@ archivesBaseName = "storage-s3" -ext { - // Keep empty lines between versions to avoid conflicts on mass update (e.g. Dependabot). - - awsSdkVersion = "1.12.520" -} - dependencies { implementation project(":storage:core")