Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update word-count to streams bootstrap 3 #28

Merged
merged 5 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion defaults.yaml → atm-fraud/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ streams-bootstrap-v2:
enabled: false

replicaCount: 1
debug: true

producer-app-v2:
to:
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
kpops~=8.0
kpops~=8.1.4
35 changes: 21 additions & 14 deletions word-count/code/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ description = "Word count pipeline with Kafka Streams"
plugins {
java
idea
id("io.freefair.lombok") version "6.6.1"
id("com.google.cloud.tools.jib") version "3.3.1"
id("io.freefair.lombok") version "8.11"
id("com.google.cloud.tools.jib") version "3.4.4"
}

group = "com.bakdata.kpops.examples"
Expand All @@ -14,9 +14,10 @@ repositories {
}


configure<JavaPluginExtension> {
sourceCompatibility = JavaVersion.VERSION_17
targetCompatibility = JavaVersion.VERSION_17
java {
toolchain {
languageVersion = JavaLanguageVersion.of(21)
}
}

tasks {
Expand All @@ -32,28 +33,34 @@ tasks {
}

dependencies {
implementation(group = "com.bakdata.kafka", name = "streams-bootstrap", version = "2.8.0")
implementation(group = "org.apache.logging.log4j", name = "log4j-slf4j-impl", version = "2.19.0")
implementation(group = "com.google.guava", name = "guava", version = "31.1-jre")
val streamsBootstrapVersion = "3.1.0"
implementation(group = "com.bakdata.kafka", name = "streams-bootstrap-cli", version = streamsBootstrapVersion)
val log4jVersion = "2.24.2"
implementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion)
val guavaVersion = "33.3.1-jre"
implementation(group = "com.google.guava", name = "guava", version = guavaVersion )

val junitVersion: String by project
val junitVersion = "5.11.3"
testImplementation(group = "org.junit.jupiter", name = "junit-jupiter-api", version = junitVersion)
testRuntimeOnly(group = "org.junit.jupiter", name = "junit-jupiter-engine", version = junitVersion)
testImplementation(group = "org.assertj", name = "assertj-core", version = "3.24.2")
val fluentKafkaVersion = "2.8.1"
val assertJVersion = "3.26.3"
testImplementation(group = "org.assertj", name = "assertj-core", version = assertJVersion)

testImplementation(group = "com.bakdata.kafka", name = "streams-bootstrap-test", version = streamsBootstrapVersion)
val fluentKafkaVersion = "2.14.0"
testImplementation(
group = "com.bakdata.fluent-kafka-streams-tests",
name = "fluent-kafka-streams-tests-junit5",
version = fluentKafkaVersion
)
val kafkaVersion: String by project
testImplementation(group = "net.mguenther.kafka", name = "kafka-junit", version = kafkaVersion) {
val kafkaJunitVersion = "3.6.0"
testImplementation(group = "net.mguenther.kafka", name = "kafka-junit", version = kafkaJunitVersion) {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
}
}

jib {
from {
image = "eclipse-temurin:17.0.6_10-jre"
image = "eclipse-temurin:21.0.5_11-jre"
}
}
4 changes: 1 addition & 3 deletions word-count/code/gradle.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
version=1.0.0-SNAPSHOT
version=2.0.0-SNAPSHOT
org.gradle.caching=true
org.gradle.parallel=true
junitVersion=5.9.2
kafkaVersion=3.3.0
3 changes: 2 additions & 1 deletion word-count/code/gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-7.6-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.11.1-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
29 changes: 17 additions & 12 deletions word-count/code/gradlew
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,8 @@ done
# This is normally unused
# shellcheck disable=SC2034
APP_BASE_NAME=${0##*/}
APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit

# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036)
APP_HOME=$( cd "${APP_HOME:-./}" > /dev/null && pwd -P ) || exit

# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD=maximum
Expand Down Expand Up @@ -133,26 +131,29 @@ location of your Java installation."
fi
else
JAVACMD=java
which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
if ! command -v java >/dev/null 2>&1
then
die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
Please set the JAVA_HOME variable in your environment to match the
location of your Java installation."
fi
fi

# Increase the maximum file descriptors if we can.
if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then
case $MAX_FD in #(
max*)
# In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked.
# shellcheck disable=SC3045
# shellcheck disable=SC2039,SC3045
MAX_FD=$( ulimit -H -n ) ||
warn "Could not query maximum file descriptor limit"
esac
case $MAX_FD in #(
'' | soft) :;; #(
*)
# In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked.
# shellcheck disable=SC3045
# shellcheck disable=SC2039,SC3045
ulimit -n "$MAX_FD" ||
warn "Could not set maximum file descriptor limit to $MAX_FD"
esac
Expand Down Expand Up @@ -197,11 +198,15 @@ if "$cygwin" || "$msys" ; then
done
fi

# Collect all arguments for the java command;
# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of
# shell script including quotes and variable substitutions, so put them in
# double quotes to make sure that they get re-expanded; and
# * put everything else in single quotes, so that it's not re-expanded.

# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'

# Collect all arguments for the java command:
# * DEFAULT_JVM_OPTS, JAVA_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments,
# and any embedded shellness will be escaped.
# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be
# treated as '${Hostname}' itself on the command line.

set -- \
"-Dorg.gradle.appname=$APP_BASE_NAME" \
Expand Down
Original file line number Diff line number Diff line change
@@ -1,52 +1,53 @@
package com.bakdata.kpops.examples;

import com.bakdata.kafka.KafkaProducerApplication;
import com.bakdata.kafka.ProducerApp;
import com.bakdata.kafka.ProducerBuilder;
import com.bakdata.kafka.ProducerRunnable;
import com.bakdata.kafka.SerializerConfig;
import com.bakdata.kafka.SimpleKafkaProducerApplication;
import com.google.common.io.Resources;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Properties;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.io.IOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.List;

import static com.bakdata.kafka.KafkaApplication.startApplication;

@Setter
public class SentenceProducer extends KafkaProducerApplication {
public class SentenceProducer implements ProducerApp {
static final String FILE_NAME = "kpops.txt";

public static void main(final String[] args) {
startApplication(new SentenceProducer(), args);
startApplication(
new SimpleKafkaProducerApplication<>(SentenceProducer::new),
args
);
}

@Override
protected Properties createKafkaProperties() {
final Properties kafkaProperties = super.createKafkaProperties();
kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return kafkaProperties;
}

@Override
protected void runApplication() {
try (final KafkaProducer<String, String> producer = this.createProducer()) {
final URL url = Resources.getResource(FILE_NAME);
final List<String> textLines = Resources.readLines(url, StandardCharsets.UTF_8);

for (final String textLine : textLines) {
this.publish(producer, textLine);
public ProducerRunnable buildRunnable(final ProducerBuilder producerBuilder) {
return () -> {
try (final Producer<String, String> producer = producerBuilder.createProducer()) {
final URL url = Resources.getResource(FILE_NAME);
final List<String> textLines = Resources.readLines(url, StandardCharsets.UTF_8);
final String outputTopic = producerBuilder.getTopics().getOutputTopic();
for (final String textLine : textLines) {
producer.send(new ProducerRecord<>(outputTopic, null, textLine));
}
producer.flush();
} catch (final IOException e) {
throw new RuntimeException("Error occurred while reading the .txt file.", e);
}
producer.flush();
} catch (final IOException e) {
throw new RuntimeException("Error occurred while reading the .txt file.", e);
}
};
}

private void publish(final Producer<? super String, ? super String> producer, final String line) {
producer.send(new ProducerRecord<>(this.getOutputTopic(), null, line));
@Override
public SerializerConfig defaultSerializationConfig() {
return new SerializerConfig(StringSerializer.class, StringSerializer.class);
}
}
Original file line number Diff line number Diff line change
@@ -1,46 +1,49 @@
package com.bakdata.kpops.examples;

import com.bakdata.kafka.KafkaStreamsApplication;
import java.util.Arrays;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import com.bakdata.kafka.KafkaApplication;
import com.bakdata.kafka.SerdeConfig;
import com.bakdata.kafka.SimpleKafkaStreamsApplication;
import com.bakdata.kafka.StreamsApp;
import com.bakdata.kafka.StreamsTopicConfig;
import com.bakdata.kafka.TopologyBuilder;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;

public class WordCountApplication extends KafkaStreamsApplication {
import java.util.Arrays;
import java.util.regex.Pattern;

public class WordCountApplication implements StreamsApp {
private static final Pattern COMPILE = Pattern.compile("\\W+");

public static void main(final String[] args) {
startApplication(new WordCountApplication(), args);
KafkaApplication.startApplication(
new SimpleKafkaStreamsApplication<>(WordCountApplication::new),
args
);
}

@Override
public void buildTopology(final StreamsBuilder builder) {
final KStream<String, String> textLines = builder.stream(this.getInputTopics());
public void buildTopology(final TopologyBuilder builder) {
final KStream<String, String> textLines = builder.streamInput();
final KTable<String, String> wordCounts = textLines
.flatMapValues(value -> Arrays.asList(COMPILE.split(value.toLowerCase())))
.groupBy((key, value) -> value)
.count()
// The redis sink connection lacks a Long converter and instead relies on a string converter.
.mapValues(Object::toString);

wordCounts.toStream().to(this.getOutputTopic());
wordCounts.toStream()
.to(builder.getTopics().getOutputTopic());
}

@Override
public String getUniqueAppId() {
return String.format("word-count-app-%s", this.getOutputTopic());
public String getUniqueAppId(final StreamsTopicConfig topics) {
return String.format("word-count-app-%s", topics.getOutputTopic());
}

@Override
protected Properties createKafkaProperties() {
final Properties kafkaProperties = super.createKafkaProperties();
kafkaProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class);
kafkaProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, StringSerde.class);
return kafkaProperties;
public SerdeConfig defaultSerializationConfig() {
return new SerdeConfig(StringSerde.class, StringSerde.class);
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
package com.bakdata.kpops.examples;

import static net.mguenther.kafka.junit.EmbeddedKafkaCluster.provisionWith;
import static net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig.defaultClusterConfig;
import static net.mguenther.kafka.junit.Wait.delay;
import static org.assertj.core.api.Assertions.assertThat;

import java.util.Map;
import java.util.concurrent.TimeUnit;
import com.bakdata.kafka.KafkaProducerApplication;
import com.bakdata.kafka.SimpleKafkaProducerApplication;
import net.mguenther.kafka.junit.EmbeddedKafkaCluster;
import net.mguenther.kafka.junit.KeyValue;
import net.mguenther.kafka.junit.ReadKeyValues;
Expand All @@ -17,11 +12,19 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.Map;
import java.util.concurrent.TimeUnit;

import static net.mguenther.kafka.junit.EmbeddedKafkaCluster.provisionWith;
import static net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig.defaultClusterConfig;
import static net.mguenther.kafka.junit.Wait.delay;
import static org.assertj.core.api.Assertions.assertThat;

class SentenceProducerIntegrationTest {
private static final int TIMEOUT_SECONDS = 10;
private static final String OUTPUT_TOPIC = "word-count-raw-data";
private final EmbeddedKafkaCluster kafkaCluster = provisionWith(defaultClusterConfig());
private SentenceProducer sentenceProducer;
private KafkaProducerApplication<SentenceProducer> sentenceProducer;

@BeforeEach
void setup() {
Expand All @@ -37,8 +40,8 @@ void teardown() {
@Test
void shouldRunApp() throws InterruptedException {
this.kafkaCluster.createTopic(TopicConfig.withName(OUTPUT_TOPIC).useDefaults());

this.sentenceProducer.run();

delay(TIMEOUT_SECONDS, TimeUnit.SECONDS);

assertThat(
Expand All @@ -59,11 +62,12 @@ void shouldRunApp() throws InterruptedException {
});
}

private SentenceProducer setupApp() {
final SentenceProducer producerApp = new SentenceProducer();
producerApp.setBrokers(this.kafkaCluster.getBrokerList());
private KafkaProducerApplication<SentenceProducer> setupApp() {
final SimpleKafkaProducerApplication<SentenceProducer> producerApp
= new SimpleKafkaProducerApplication<>(SentenceProducer::new);
producerApp.setBootstrapServers(this.kafkaCluster.getBrokerList());
producerApp.setOutputTopic(OUTPUT_TOPIC);
producerApp.setStreamsConfig(Map.of(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"));
producerApp.setKafkaConfig(Map.of(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"));
return producerApp;
}
}
Loading