Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to Kafka 3.5 #9

Merged
merged 11 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/build-and-publish.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ jobs:
build-and-publish:
name: Java Gradle
uses: bakdata/ci-templates/.github/workflows/[email protected]
with:
java-version: 17
secrets:
sonar-token: ${{ secrets.SONARCLOUD_TOKEN }}
sonar-organization: ${{ secrets.SONARCLOUD_ORGANIZATION }}
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ jobs:
name: Java Gradle
uses: bakdata/ci-templates/.github/workflows/[email protected]
with:
java-version: 17
release-type: "${{ inputs.release-type }}"
secrets:
github-email: "${{ secrets.GH_EMAIL }}"
Expand Down
11 changes: 5 additions & 6 deletions brute-force-connect/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,18 @@ dependencies {
testImplementation(group = "io.confluent", name = "kafka-streams-protobuf-serde", version = confluentVersion)
testImplementation(group = "io.confluent", name = "kafka-streams-json-schema-serde", version = confluentVersion)

testImplementation(group = "com.adobe.testing", name = "s3mock-junit5", version = "2.1.8") {
exclude(group = "ch.qos.logback")
exclude(group = "org.apache.logging.log4j", module = "log4j-to-slf4j")
}
val fluentKafkaVersion = "2.5.1"
val testContainersVersion: String by project
testImplementation(group = "org.testcontainers", name = "junit-jupiter", version = testContainersVersion)
testImplementation(group = "org.testcontainers", name = "localstack", version = testContainersVersion)
val fluentKafkaVersion = "2.11.3"
testImplementation(
group = "com.bakdata.fluent-kafka-streams-tests",
name = "schema-registry-mock-junit5",
version = fluentKafkaVersion
)

testImplementation(group = "com.bakdata.kafka", name = "large-message-serde", version = largeMessageVersion)
testImplementation(group = "net.mguenther.kafka", name = "kafka-junit", version = kafkaVersion) {
testImplementation(group = "net.mguenther.kafka", name = "kafka-junit", version = "3.5.0") {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
}
testImplementation(group = "org.apache.kafka", name = "connect-file", version = kafkaVersion)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2022 bakdata
* Copyright (c) 2024 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand All @@ -28,11 +28,11 @@
import static net.mguenther.kafka.junit.Wait.delay;
import static org.assertj.core.api.Assertions.assertThat;

import com.adobe.testing.s3mock.junit5.S3MockExtension;
import com.bakdata.schemaregistrymock.junit5.SchemaRegistryMockExtension;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerializer;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
Expand All @@ -55,11 +55,47 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.containers.localstack.LocalStackContainer.Service;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;

@Testcontainers
class BruteForceConverterIntegrationTest {
@RegisterExtension
static final S3MockExtension S3_MOCK = S3MockExtension.builder().silent()
.withSecureConnection(false).build();

private static final DockerImageName LOCAL_STACK_IMAGE = DockerImageName.parse("localstack/localstack")
.withTag("1.3.1");
@Container
private static final LocalStackContainer LOCAL_STACK_CONTAINER = new LocalStackContainer(LOCAL_STACK_IMAGE)
.withServices(Service.S3);

static S3Client getS3Client() {
return S3Client.builder()
.endpointOverride(getEndpointOverride())
.credentialsProvider(StaticCredentialsProvider.create(getCredentials()))
.region(getRegion())
.build();
}

private static Region getRegion() {
return Region.of(LOCAL_STACK_CONTAINER.getRegion());
}

private static AwsBasicCredentials getCredentials() {
return AwsBasicCredentials.create(
LOCAL_STACK_CONTAINER.getAccessKey(), LOCAL_STACK_CONTAINER.getSecretKey()
);
}

private static URI getEndpointOverride() {
return LOCAL_STACK_CONTAINER.getEndpointOverride(Service.S3);
}
private static final String BUCKET_NAME = "testbucket";
private static final String TOPIC = "input";
@RegisterExtension
Expand All @@ -69,12 +105,13 @@ class BruteForceConverterIntegrationTest {

private static Properties createS3BackedProperties() {
final Properties properties = new Properties();
properties.put(AbstractLargeMessageConfig.S3_ENDPOINT_CONFIG, "http://localhost:" + S3_MOCK.getHttpPort());
properties.put(AbstractLargeMessageConfig.S3_REGION_CONFIG, "us-east-1");
properties.put(AbstractLargeMessageConfig.S3_ACCESS_KEY_CONFIG, "foo");
properties.put(AbstractLargeMessageConfig.S3_SECRET_KEY_CONFIG, "bar");
properties.put(AbstractLargeMessageConfig.S3_ENABLE_PATH_STYLE_ACCESS_CONFIG, true);
properties.put(AbstractLargeMessageConfig.BASE_PATH_CONFIG, String.format("s3://%s/", BUCKET_NAME));
final AwsBasicCredentials credentials = getCredentials();
properties.setProperty(AbstractLargeMessageConfig.S3_ENDPOINT_CONFIG,
getEndpointOverride().toString());
properties.setProperty(AbstractLargeMessageConfig.S3_REGION_CONFIG, getRegion().id());
properties.setProperty(AbstractLargeMessageConfig.S3_ACCESS_KEY_CONFIG, credentials.accessKeyId());
properties.setProperty(AbstractLargeMessageConfig.S3_SECRET_KEY_CONFIG, credentials.secretAccessKey());
properties.setProperty(AbstractLargeMessageConfig.BASE_PATH_CONFIG, String.format("s3://%s/", BUCKET_NAME));
return properties;
}

Expand All @@ -85,7 +122,9 @@ private static String withValuePrefix(final Object config) {
@BeforeEach
void setUp() throws IOException {
this.outputFile = Files.createTempFile("test", "temp");
S3_MOCK.createS3Client().createBucket(BUCKET_NAME);
getS3Client().createBucket(CreateBucketRequest.builder()
.bucket(BUCKET_NAME)
.build());
this.kafkaCluster = this.createCluster();
this.kafkaCluster.start();
}
Expand Down Expand Up @@ -130,13 +169,13 @@ private EmbeddedKafkaCluster createCluster() {

private Properties config() {
final Properties properties = new Properties();
properties.put(ConnectorConfig.NAME_CONFIG, "test");
properties.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "FileStreamSink");
properties.put(SinkConnector.TOPICS_CONFIG, TOPIC);
properties.put(FileStreamSinkConnector.FILE_CONFIG, this.outputFile.toString());
properties.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
properties.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, BruteForceConverter.class.getName());
properties.put(withValuePrefix(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG),
properties.setProperty(ConnectorConfig.NAME_CONFIG, "test");
properties.setProperty(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "FileStreamSink");
properties.setProperty(SinkConnector.TOPICS_CONFIG, TOPIC);
properties.setProperty(FileStreamSinkConnector.FILE_CONFIG, this.outputFile.toString());
properties.setProperty(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
properties.setProperty(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, BruteForceConverter.class.getName());
properties.setProperty(withValuePrefix(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG),
this.schemaRegistry.getUrl());
createS3BackedProperties().forEach((key, value) -> properties.put(withValuePrefix(key), value));
return properties;
Expand All @@ -154,7 +193,7 @@ private Properties createBackedStringProducerProperties(final boolean shouldBack
private Properties createBaseProducerProperties() {
final Properties properties = new Properties();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaCluster.getBrokerList());
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaCluster.getBrokerList());
return properties;
}

Expand All @@ -167,7 +206,7 @@ private Properties createStringProducerProperties() {
private Properties createAvroProducerProperties() {
final Properties properties = this.createBaseProducerProperties();
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GenericAvroSerializer.class);
properties.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.schemaRegistry.getUrl());
properties.setProperty(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.schemaRegistry.getUrl());
return properties;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2022 bakdata
* Copyright (c) 2024 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -30,7 +30,6 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;

import com.adobe.testing.s3mock.junit5.S3MockExtension;
import com.bakdata.schemaregistrymock.SchemaRegistryMock;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.DescriptorValidationException;
Expand All @@ -49,6 +48,7 @@
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerializer;
import io.confluent.kafka.streams.serdes.json.KafkaJsonSchemaSerde;
import io.confluent.kafka.streams.serdes.protobuf.KafkaProtobufSerde;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -76,21 +76,57 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.containers.localstack.LocalStackContainer.Service;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;

@Testcontainers
class BruteForceConverterTest {
@RegisterExtension
static final S3MockExtension S3_MOCK = S3MockExtension.builder().silent().withSecureConnection(false).build();
private static final String TOPIC = "topic";
final SchemaRegistryMock schemaRegistry = new SchemaRegistryMock(List.of(

private static final DockerImageName LOCAL_STACK_IMAGE = DockerImageName.parse("localstack/localstack")
.withTag("1.3.1");
@Container
private static final LocalStackContainer LOCAL_STACK_CONTAINER = new LocalStackContainer(LOCAL_STACK_IMAGE)
.withServices(Service.S3);
private final SchemaRegistryMock schemaRegistry = new SchemaRegistryMock(List.of(
new AvroSchemaProvider(),
new JsonSchemaProvider(),
new ProtobufSchemaProvider()
));

static S3Client getS3Client() {
return S3Client.builder()
.endpointOverride(getEndpointOverride())
.credentialsProvider(StaticCredentialsProvider.create(getCredentials()))
.region(getRegion())
.build();
}

private static Region getRegion() {
return Region.of(LOCAL_STACK_CONTAINER.getRegion());
}

private static AwsBasicCredentials getCredentials() {
return AwsBasicCredentials.create(
LOCAL_STACK_CONTAINER.getAccessKey(), LOCAL_STACK_CONTAINER.getSecretKey()
);
}
private static final String TOPIC = "topic";

private static URI getEndpointOverride() {
return LOCAL_STACK_CONTAINER.getEndpointOverride(Service.S3);
}

static Stream<Arguments> generateGenericAvroSerializers() {
return generateSerializers(new GenericAvroSerde());
}
Expand Down Expand Up @@ -125,7 +161,7 @@ private static DynamicMessage generateDynamicMessage() throws DescriptorValidati
final DynamicSchema dynamicSchema = DynamicSchema.newBuilder()
.setName("file")
.addMessageDefinition(MessageDefinition.newBuilder("Test")
.addField("", "string", "testId", 1, null, null, null)
.addField(null, "string", "testId", 1, null, null)
.build())
.build();
final Descriptor test = dynamicSchema.getMessageDescriptor("Test");
Expand Down Expand Up @@ -154,12 +190,12 @@ private static <T> SerializerFactory<T> createLargeMessageSerializer(final Serde
}

private static Map<String, Object> getS3EndpointConfig() {
final AwsBasicCredentials credentials = getCredentials();
return Map.of(
AbstractLargeMessageConfig.S3_ENDPOINT_CONFIG, "http://localhost:" + S3_MOCK.getHttpPort(),
AbstractLargeMessageConfig.S3_REGION_CONFIG, "us-east-1",
AbstractLargeMessageConfig.S3_ACCESS_KEY_CONFIG, "foo",
AbstractLargeMessageConfig.S3_SECRET_KEY_CONFIG, "bar",
AbstractLargeMessageConfig.S3_ENABLE_PATH_STYLE_ACCESS_CONFIG, true
AbstractLargeMessageConfig.S3_ENDPOINT_CONFIG, getEndpointOverride().toString(),
AbstractLargeMessageConfig.S3_REGION_CONFIG, getRegion().id(),
AbstractLargeMessageConfig.S3_ACCESS_KEY_CONFIG, credentials.accessKeyId(),
AbstractLargeMessageConfig.S3_SECRET_KEY_CONFIG, credentials.secretAccessKey()
);
}

Expand Down Expand Up @@ -287,14 +323,14 @@ void shouldConvertByteKeys(final SerializerFactory<byte[]> factory) {
}

@ParameterizedTest
@MethodSource("generateProtobufSerializers")
void shouldConvertJsonKeys(final SerializerFactory<DynamicMessage> factory) throws DescriptorValidationException {
final DynamicMessage value = generateDynamicMessage();
@MethodSource("generateJsonSerializers")
void shouldConvertJsonKeys(final SerializerFactory<JsonTestRecord> factory) {
final JsonTestRecord value = new JsonTestRecord("test");
final Map<String, Object> config = Map.of(
BruteForceConverterConfig.CONVERTER_CONFIG,
List.of(AvroConverter.class.getName(), ProtobufConverter.class.getName())
List.of(AvroConverter.class.getName(), JsonSchemaConverter.class.getName())
);
this.testValueConversion(factory, new KafkaProtobufSerializer<>(), value, config, new ProtobufConverter());
this.testKeyConversion(factory, new KafkaJsonSchemaSerializer<>(), value, config, new JsonSchemaConverter());
}

@ParameterizedTest
Expand Down Expand Up @@ -359,7 +395,9 @@ private <T> void testConversion(final SerializerFactory<T> factory, final Serial
final T value, final Map<String, Object> originals, final Converter expectedConverter,
final boolean isKey) {
final String bucket = "bucket";
S3_MOCK.createS3Client().createBucket(bucket);
getS3Client().createBucket(CreateBucketRequest.builder()
.bucket(bucket)
.build());
final Map<String, Object> config = new HashMap<>(originals);
config.put(SCHEMA_REGISTRY_URL_CONFIG, this.schemaRegistry.getUrl());
config.put(AbstractLargeMessageConfig.BASE_PATH_CONFIG, "s3://" + bucket + "/");
Expand Down
11 changes: 5 additions & 6 deletions brute-force-serde/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.google.protobuf.gradle.protoc
description = "Kafka SerDe that deserializes messages of an unknown serialization format"

plugins {
id("com.github.davidmc24.gradle.plugin.avro") version "1.2.1"
id("com.github.davidmc24.gradle.plugin.avro") version "1.9.1"
id("com.google.protobuf") version "0.8.18"
}

Expand All @@ -26,13 +26,12 @@ dependencies {
testImplementation(group = "io.confluent", name = "kafka-streams-protobuf-serde", version = confluentVersion)
testImplementation(group = "io.confluent", name = "kafka-streams-json-schema-serde", version = confluentVersion)

testImplementation(group = "com.adobe.testing", name = "s3mock-junit5", version = "2.1.8") {
exclude(group = "ch.qos.logback")
exclude(group = "org.apache.logging.log4j", module = "log4j-to-slf4j")
}
val testContainersVersion: String by project
testImplementation(group = "org.testcontainers", name = "junit-jupiter", version = testContainersVersion)
testImplementation(group = "org.testcontainers", name = "localstack", version = testContainersVersion)
testImplementation(group = "org.jooq", name = "jool", version = "0.9.14")

val fluentKafkaVersion = "2.5.1"
val fluentKafkaVersion = "2.11.3"
testImplementation(
group = "com.bakdata.fluent-kafka-streams-tests",
name = "fluent-kafka-streams-tests-junit5",
Expand Down
Loading