diff --git a/defaults.yaml b/atm-fraud/defaults.yaml similarity index 97% rename from defaults.yaml rename to atm-fraud/defaults.yaml index 57d616f..bbae6a0 100644 --- a/defaults.yaml +++ b/atm-fraud/defaults.yaml @@ -15,7 +15,6 @@ streams-bootstrap-v2: enabled: false replicaCount: 1 - debug: true producer-app-v2: to: diff --git a/requirements.txt b/requirements.txt index d0a01b4..7d3c260 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1 @@ -kpops~=8.0 +kpops~=8.1.4 diff --git a/word-count/code/build.gradle.kts b/word-count/code/build.gradle.kts index e6c7071..0d26c0c 100644 --- a/word-count/code/build.gradle.kts +++ b/word-count/code/build.gradle.kts @@ -2,8 +2,8 @@ description = "Word count pipeline with Kafka Streams" plugins { java idea - id("io.freefair.lombok") version "6.6.1" - id("com.google.cloud.tools.jib") version "3.3.1" + id("io.freefair.lombok") version "8.11" + id("com.google.cloud.tools.jib") version "3.4.4" } group = "com.bakdata.kpops.examples" @@ -14,9 +14,10 @@ repositories { } -configure { - sourceCompatibility = JavaVersion.VERSION_17 - targetCompatibility = JavaVersion.VERSION_17 +java { + toolchain { + languageVersion = JavaLanguageVersion.of(21) + } } tasks { @@ -32,28 +33,34 @@ tasks { } dependencies { - implementation(group = "com.bakdata.kafka", name = "streams-bootstrap", version = "2.8.0") - implementation(group = "org.apache.logging.log4j", name = "log4j-slf4j-impl", version = "2.19.0") - implementation(group = "com.google.guava", name = "guava", version = "31.1-jre") + val streamsBootstrapVersion = "3.1.0" + implementation(group = "com.bakdata.kafka", name = "streams-bootstrap-cli", version = streamsBootstrapVersion) + val log4jVersion = "2.24.2" + implementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion) + val guavaVersion = "33.3.1-jre" + implementation(group = "com.google.guava", name = "guava", version = guavaVersion ) - val junitVersion: String by project + val junitVersion = "5.11.3" testImplementation(group = "org.junit.jupiter", name = "junit-jupiter-api", version = junitVersion) testRuntimeOnly(group = "org.junit.jupiter", name = "junit-jupiter-engine", version = junitVersion) - testImplementation(group = "org.assertj", name = "assertj-core", version = "3.24.2") - val fluentKafkaVersion = "2.8.1" + val assertJVersion = "3.26.3" + testImplementation(group = "org.assertj", name = "assertj-core", version = assertJVersion) + + testImplementation(group = "com.bakdata.kafka", name = "streams-bootstrap-test", version = streamsBootstrapVersion) + val fluentKafkaVersion = "2.14.0" testImplementation( group = "com.bakdata.fluent-kafka-streams-tests", name = "fluent-kafka-streams-tests-junit5", version = fluentKafkaVersion ) - val kafkaVersion: String by project - testImplementation(group = "net.mguenther.kafka", name = "kafka-junit", version = kafkaVersion) { + val kafkaJunitVersion = "3.6.0" + testImplementation(group = "net.mguenther.kafka", name = "kafka-junit", version = kafkaJunitVersion) { exclude(group = "org.slf4j", module = "slf4j-log4j12") } } jib { from { - image = "eclipse-temurin:17.0.6_10-jre" + image = "eclipse-temurin:21.0.5_11-jre" } } diff --git a/word-count/code/gradle.properties b/word-count/code/gradle.properties index 81c7d76..6d3fb86 100644 --- a/word-count/code/gradle.properties +++ b/word-count/code/gradle.properties @@ -1,5 +1,3 @@ -version=1.0.0-SNAPSHOT +version=2.0.0-SNAPSHOT org.gradle.caching=true org.gradle.parallel=true -junitVersion=5.9.2 -kafkaVersion=3.3.0 diff --git a/word-count/code/gradle/wrapper/gradle-wrapper.properties b/word-count/code/gradle/wrapper/gradle-wrapper.properties index f398c33..e2847c8 100644 --- a/word-count/code/gradle/wrapper/gradle-wrapper.properties +++ b/word-count/code/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,7 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-7.6-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.11.1-bin.zip networkTimeout=10000 +validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/word-count/code/gradlew b/word-count/code/gradlew index 65dcd68..1aa94a4 100755 --- a/word-count/code/gradlew +++ b/word-count/code/gradlew @@ -83,10 +83,8 @@ done # This is normally unused # shellcheck disable=SC2034 APP_BASE_NAME=${0##*/} -APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit - -# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' +# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036) +APP_HOME=$( cd "${APP_HOME:-./}" > /dev/null && pwd -P ) || exit # Use the maximum available, or set MAX_FD != -1 to use that value. MAX_FD=maximum @@ -133,10 +131,13 @@ location of your Java installation." fi else JAVACMD=java - which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + if ! command -v java >/dev/null 2>&1 + then + die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. Please set the JAVA_HOME variable in your environment to match the location of your Java installation." + fi fi # Increase the maximum file descriptors if we can. @@ -144,7 +145,7 @@ if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then case $MAX_FD in #( max*) # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked. - # shellcheck disable=SC3045 + # shellcheck disable=SC2039,SC3045 MAX_FD=$( ulimit -H -n ) || warn "Could not query maximum file descriptor limit" esac @@ -152,7 +153,7 @@ if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then '' | soft) :;; #( *) # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked. - # shellcheck disable=SC3045 + # shellcheck disable=SC2039,SC3045 ulimit -n "$MAX_FD" || warn "Could not set maximum file descriptor limit to $MAX_FD" esac @@ -197,11 +198,15 @@ if "$cygwin" || "$msys" ; then done fi -# Collect all arguments for the java command; -# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of -# shell script including quotes and variable substitutions, so put them in -# double quotes to make sure that they get re-expanded; and -# * put everything else in single quotes, so that it's not re-expanded. + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Collect all arguments for the java command: +# * DEFAULT_JVM_OPTS, JAVA_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments, +# and any embedded shellness will be escaped. +# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be +# treated as '${Hostname}' itself on the command line. set -- \ "-Dorg.gradle.appname=$APP_BASE_NAME" \ diff --git a/word-count/code/src/main/java/com/bakdata/kpops/examples/SentenceProducer.java b/word-count/code/src/main/java/com/bakdata/kpops/examples/SentenceProducer.java index 77745f7..d5e2705 100644 --- a/word-count/code/src/main/java/com/bakdata/kpops/examples/SentenceProducer.java +++ b/word-count/code/src/main/java/com/bakdata/kpops/examples/SentenceProducer.java @@ -1,52 +1,53 @@ package com.bakdata.kpops.examples; -import com.bakdata.kafka.KafkaProducerApplication; +import com.bakdata.kafka.ProducerApp; +import com.bakdata.kafka.ProducerBuilder; +import com.bakdata.kafka.ProducerRunnable; +import com.bakdata.kafka.SerializerConfig; +import com.bakdata.kafka.SimpleKafkaProducerApplication; import com.google.common.io.Resources; -import java.io.IOException; -import java.net.URL; -import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.Properties; import lombok.Setter; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; +import java.io.IOException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.List; + +import static com.bakdata.kafka.KafkaApplication.startApplication; + @Setter -public class SentenceProducer extends KafkaProducerApplication { +public class SentenceProducer implements ProducerApp { static final String FILE_NAME = "kpops.txt"; public static void main(final String[] args) { - startApplication(new SentenceProducer(), args); + startApplication( + new SimpleKafkaProducerApplication<>(SentenceProducer::new), + args + ); } @Override - protected Properties createKafkaProperties() { - final Properties kafkaProperties = super.createKafkaProperties(); - kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - return kafkaProperties; - } - - @Override - protected void runApplication() { - try (final KafkaProducer producer = this.createProducer()) { - final URL url = Resources.getResource(FILE_NAME); - final List textLines = Resources.readLines(url, StandardCharsets.UTF_8); - - for (final String textLine : textLines) { - this.publish(producer, textLine); + public ProducerRunnable buildRunnable(final ProducerBuilder producerBuilder) { + return () -> { + try (final Producer producer = producerBuilder.createProducer()) { + final URL url = Resources.getResource(FILE_NAME); + final List textLines = Resources.readLines(url, StandardCharsets.UTF_8); + final String outputTopic = producerBuilder.getTopics().getOutputTopic(); + for (final String textLine : textLines) { + producer.send(new ProducerRecord<>(outputTopic, null, textLine)); + } + producer.flush(); + } catch (final IOException e) { + throw new RuntimeException("Error occurred while reading the .txt file.", e); } - producer.flush(); - } catch (final IOException e) { - throw new RuntimeException("Error occurred while reading the .txt file.", e); - } + }; } - private void publish(final Producer producer, final String line) { - producer.send(new ProducerRecord<>(this.getOutputTopic(), null, line)); + @Override + public SerializerConfig defaultSerializationConfig() { + return new SerializerConfig(StringSerializer.class, StringSerializer.class); } } diff --git a/word-count/code/src/main/java/com/bakdata/kpops/examples/WordCountApplication.java b/word-count/code/src/main/java/com/bakdata/kpops/examples/WordCountApplication.java index 3a64d09..d02d1f6 100644 --- a/word-count/code/src/main/java/com/bakdata/kpops/examples/WordCountApplication.java +++ b/word-count/code/src/main/java/com/bakdata/kpops/examples/WordCountApplication.java @@ -1,26 +1,31 @@ package com.bakdata.kpops.examples; -import com.bakdata.kafka.KafkaStreamsApplication; -import java.util.Arrays; -import java.util.Properties; -import java.util.regex.Pattern; -import org.apache.kafka.clients.consumer.ConsumerConfig; +import com.bakdata.kafka.KafkaApplication; +import com.bakdata.kafka.SerdeConfig; +import com.bakdata.kafka.SimpleKafkaStreamsApplication; +import com.bakdata.kafka.StreamsApp; +import com.bakdata.kafka.StreamsTopicConfig; +import com.bakdata.kafka.TopologyBuilder; import org.apache.kafka.common.serialization.Serdes.StringSerde; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; -public class WordCountApplication extends KafkaStreamsApplication { +import java.util.Arrays; +import java.util.regex.Pattern; + +public class WordCountApplication implements StreamsApp { private static final Pattern COMPILE = Pattern.compile("\\W+"); public static void main(final String[] args) { - startApplication(new WordCountApplication(), args); + KafkaApplication.startApplication( + new SimpleKafkaStreamsApplication<>(WordCountApplication::new), + args + ); } @Override - public void buildTopology(final StreamsBuilder builder) { - final KStream textLines = builder.stream(this.getInputTopics()); + public void buildTopology(final TopologyBuilder builder) { + final KStream textLines = builder.streamInput(); final KTable wordCounts = textLines .flatMapValues(value -> Arrays.asList(COMPILE.split(value.toLowerCase()))) .groupBy((key, value) -> value) @@ -28,19 +33,17 @@ public void buildTopology(final StreamsBuilder builder) { // The redis sink connection lacks a Long converter and instead relies on a string converter. .mapValues(Object::toString); - wordCounts.toStream().to(this.getOutputTopic()); + wordCounts.toStream() + .to(builder.getTopics().getOutputTopic()); } @Override - public String getUniqueAppId() { - return String.format("word-count-app-%s", this.getOutputTopic()); + public String getUniqueAppId(final StreamsTopicConfig topics) { + return String.format("word-count-app-%s", topics.getOutputTopic()); } @Override - protected Properties createKafkaProperties() { - final Properties kafkaProperties = super.createKafkaProperties(); - kafkaProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class); - kafkaProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, StringSerde.class); - return kafkaProperties; + public SerdeConfig defaultSerializationConfig() { + return new SerdeConfig(StringSerde.class, StringSerde.class); } } diff --git a/word-count/code/src/test/java/com/bakdata/kpops/examples/SentenceProducerIntegrationTest.java b/word-count/code/src/test/java/com/bakdata/kpops/examples/SentenceProducerIntegrationTest.java index c316b4c..78044e3 100644 --- a/word-count/code/src/test/java/com/bakdata/kpops/examples/SentenceProducerIntegrationTest.java +++ b/word-count/code/src/test/java/com/bakdata/kpops/examples/SentenceProducerIntegrationTest.java @@ -1,12 +1,7 @@ package com.bakdata.kpops.examples; -import static net.mguenther.kafka.junit.EmbeddedKafkaCluster.provisionWith; -import static net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig.defaultClusterConfig; -import static net.mguenther.kafka.junit.Wait.delay; -import static org.assertj.core.api.Assertions.assertThat; - -import java.util.Map; -import java.util.concurrent.TimeUnit; +import com.bakdata.kafka.KafkaProducerApplication; +import com.bakdata.kafka.SimpleKafkaProducerApplication; import net.mguenther.kafka.junit.EmbeddedKafkaCluster; import net.mguenther.kafka.junit.KeyValue; import net.mguenther.kafka.junit.ReadKeyValues; @@ -17,11 +12,19 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static net.mguenther.kafka.junit.EmbeddedKafkaCluster.provisionWith; +import static net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig.defaultClusterConfig; +import static net.mguenther.kafka.junit.Wait.delay; +import static org.assertj.core.api.Assertions.assertThat; + class SentenceProducerIntegrationTest { private static final int TIMEOUT_SECONDS = 10; private static final String OUTPUT_TOPIC = "word-count-raw-data"; private final EmbeddedKafkaCluster kafkaCluster = provisionWith(defaultClusterConfig()); - private SentenceProducer sentenceProducer; + private KafkaProducerApplication sentenceProducer; @BeforeEach void setup() { @@ -37,8 +40,8 @@ void teardown() { @Test void shouldRunApp() throws InterruptedException { this.kafkaCluster.createTopic(TopicConfig.withName(OUTPUT_TOPIC).useDefaults()); - this.sentenceProducer.run(); + delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); assertThat( @@ -59,11 +62,12 @@ void shouldRunApp() throws InterruptedException { }); } - private SentenceProducer setupApp() { - final SentenceProducer producerApp = new SentenceProducer(); - producerApp.setBrokers(this.kafkaCluster.getBrokerList()); + private KafkaProducerApplication setupApp() { + final SimpleKafkaProducerApplication producerApp + = new SimpleKafkaProducerApplication<>(SentenceProducer::new); + producerApp.setBootstrapServers(this.kafkaCluster.getBrokerList()); producerApp.setOutputTopic(OUTPUT_TOPIC); - producerApp.setStreamsConfig(Map.of(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000")); + producerApp.setKafkaConfig(Map.of(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000")); return producerApp; } } diff --git a/word-count/code/src/test/java/com/bakdata/kpops/examples/WordCountApplicationIntegrationTest.java b/word-count/code/src/test/java/com/bakdata/kpops/examples/WordCountApplicationIntegrationTest.java index f19b9d6..67dcd12 100644 --- a/word-count/code/src/test/java/com/bakdata/kpops/examples/WordCountApplicationIntegrationTest.java +++ b/word-count/code/src/test/java/com/bakdata/kpops/examples/WordCountApplicationIntegrationTest.java @@ -1,29 +1,42 @@ package com.bakdata.kpops.examples; import com.bakdata.fluent_kafka_streams_tests.junit5.TestTopologyExtension; +import com.bakdata.kafka.AppConfiguration; +import com.bakdata.kafka.ConfiguredStreamsApp; +import com.bakdata.kafka.StreamsTopicConfig; +import com.bakdata.kafka.TestTopologyFactory; import com.google.common.base.Charsets; import com.google.common.io.Resources; -import java.io.IOException; -import java.net.URL; -import java.util.List; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import java.io.IOException; +import java.net.URL; +import java.util.List; + class WordCountApplicationIntegrationTest { public static final String INPUT_TOPIC = "word-count-raw-data"; private static final String OUTPUT_TOPIC = "word-count-topic"; - private final WordCountApplication app = createApp(); + + private final ConfiguredStreamsApp app = configureStreamsApp(); @RegisterExtension final TestTopologyExtension testTopology = - new TestTopologyExtension<>(this.app::createTopology, this.app.getKafkaProperties()); + TestTopologyFactory.createTopologyExtensionWithSchemaRegistry(this.app); + + private static ConfiguredStreamsApp configureStreamsApp() { + + final StreamsTopicConfig topicConfig = StreamsTopicConfig.builder() + .inputTopics(List.of(INPUT_TOPIC)) + .outputTopic(OUTPUT_TOPIC) + .build(); + + final AppConfiguration appConfig = new AppConfiguration<>(topicConfig); - private static WordCountApplication createApp() { final WordCountApplication app = new WordCountApplication(); - app.setInputTopics(List.of(INPUT_TOPIC)); - app.setOutputTopic(OUTPUT_TOPIC); - return app; + + return new ConfiguredStreamsApp<>(app, appConfig); } @AfterEach @@ -40,7 +53,7 @@ void shouldCountWordsCorrectly() throws IOException { this.testTopology.input().add(null, line); } - this.testTopology.streamOutput(this.app.getOutputTopic()) + this.testTopology.streamOutput(this.app.getTopics().getOutputTopic()) .expectNextRecord() .hasKey("kpops") .hasValue("1") diff --git a/word-count/defaults.yaml b/word-count/defaults.yaml new file mode 100644 index 0000000..dd1dac4 --- /dev/null +++ b/word-count/defaults.yaml @@ -0,0 +1,29 @@ +kubernetes-app: + namespace: ${NAMESPACE} + +streams-bootstrap: + version: "3.1.0" + values: + labels: + pipeline: ${pipeline.name} + kafka: + bootstrapServers: ${config.kafka_brokers} + prometheus: + jmx: + enabled: false + replicaCount: 1 + +producer-app: + to: + topics: + ${output_topic_name}: + partitions_count: 3 + +streams-app: + to: + topics: + ${error_topic_name}: + type: error + partitions_count: 1 + ${output_topic_name}: + partitions_count: 3 diff --git a/word-count/pipeline.yaml b/word-count/pipeline.yaml index 38f614a..481b509 100644 --- a/word-count/pipeline.yaml +++ b/word-count/pipeline.yaml @@ -1,19 +1,21 @@ -- type: producer-app-v2 +- type: producer-app name: data-producer values: image: bakdata/kpops-demo-sentence-producer + imageTag: "2.0.0" -- type: streams-app-v2 - name: word-counter +- type: streams-app + name: word-count-app + values: + image: bakdata/kpops-demo-word-count-app + imageTag: "2.0.0" + replicaCount: 1 to: topics: ${output_topic_name}: type: output configs: cleanup.policy: compact - values: - image: bakdata/kpops-demo-word-count-app - replicaCount: 1 - type: kafka-sink-connector name: redis-sink-connector