Skip to content

Commit

Permalink
Update word-count to streams bootstrap 3 (#28)
Browse files Browse the repository at this point in the history
  • Loading branch information
raminqaf authored Dec 11, 2024
1 parent 5ea4366 commit 3fef629
Show file tree
Hide file tree
Showing 12 changed files with 174 additions and 112 deletions.
1 change: 0 additions & 1 deletion defaults.yaml → atm-fraud/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ streams-bootstrap-v2:
enabled: false

replicaCount: 1
debug: true

producer-app-v2:
to:
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
kpops~=8.0
kpops~=8.1.4
35 changes: 21 additions & 14 deletions word-count/code/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ description = "Word count pipeline with Kafka Streams"
plugins {
java
idea
id("io.freefair.lombok") version "6.6.1"
id("com.google.cloud.tools.jib") version "3.3.1"
id("io.freefair.lombok") version "8.11"
id("com.google.cloud.tools.jib") version "3.4.4"
}

group = "com.bakdata.kpops.examples"
Expand All @@ -14,9 +14,10 @@ repositories {
}


configure<JavaPluginExtension> {
sourceCompatibility = JavaVersion.VERSION_17
targetCompatibility = JavaVersion.VERSION_17
java {
toolchain {
languageVersion = JavaLanguageVersion.of(21)
}
}

tasks {
Expand All @@ -32,28 +33,34 @@ tasks {
}

dependencies {
implementation(group = "com.bakdata.kafka", name = "streams-bootstrap", version = "2.8.0")
implementation(group = "org.apache.logging.log4j", name = "log4j-slf4j-impl", version = "2.19.0")
implementation(group = "com.google.guava", name = "guava", version = "31.1-jre")
val streamsBootstrapVersion = "3.1.0"
implementation(group = "com.bakdata.kafka", name = "streams-bootstrap-cli", version = streamsBootstrapVersion)
val log4jVersion = "2.24.2"
implementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion)
val guavaVersion = "33.3.1-jre"
implementation(group = "com.google.guava", name = "guava", version = guavaVersion )

val junitVersion: String by project
val junitVersion = "5.11.3"
testImplementation(group = "org.junit.jupiter", name = "junit-jupiter-api", version = junitVersion)
testRuntimeOnly(group = "org.junit.jupiter", name = "junit-jupiter-engine", version = junitVersion)
testImplementation(group = "org.assertj", name = "assertj-core", version = "3.24.2")
val fluentKafkaVersion = "2.8.1"
val assertJVersion = "3.26.3"
testImplementation(group = "org.assertj", name = "assertj-core", version = assertJVersion)

testImplementation(group = "com.bakdata.kafka", name = "streams-bootstrap-test", version = streamsBootstrapVersion)
val fluentKafkaVersion = "2.14.0"
testImplementation(
group = "com.bakdata.fluent-kafka-streams-tests",
name = "fluent-kafka-streams-tests-junit5",
version = fluentKafkaVersion
)
val kafkaVersion: String by project
testImplementation(group = "net.mguenther.kafka", name = "kafka-junit", version = kafkaVersion) {
val kafkaJunitVersion = "3.6.0"
testImplementation(group = "net.mguenther.kafka", name = "kafka-junit", version = kafkaJunitVersion) {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
}
}

jib {
from {
image = "eclipse-temurin:17.0.6_10-jre"
image = "eclipse-temurin:21.0.5_11-jre"
}
}
4 changes: 1 addition & 3 deletions word-count/code/gradle.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
version=1.0.0-SNAPSHOT
version=2.0.0-SNAPSHOT
org.gradle.caching=true
org.gradle.parallel=true
junitVersion=5.9.2
kafkaVersion=3.3.0
3 changes: 2 additions & 1 deletion word-count/code/gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-7.6-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.11.1-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
29 changes: 17 additions & 12 deletions word-count/code/gradlew
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,8 @@ done
# This is normally unused
# shellcheck disable=SC2034
APP_BASE_NAME=${0##*/}
APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit

# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036)
APP_HOME=$( cd "${APP_HOME:-./}" > /dev/null && pwd -P ) || exit

# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD=maximum
Expand Down Expand Up @@ -133,26 +131,29 @@ location of your Java installation."
fi
else
JAVACMD=java
which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
if ! command -v java >/dev/null 2>&1
then
die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
Please set the JAVA_HOME variable in your environment to match the
location of your Java installation."
fi
fi

# Increase the maximum file descriptors if we can.
if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then
case $MAX_FD in #(
max*)
# In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked.
# shellcheck disable=SC3045
# shellcheck disable=SC2039,SC3045
MAX_FD=$( ulimit -H -n ) ||
warn "Could not query maximum file descriptor limit"
esac
case $MAX_FD in #(
'' | soft) :;; #(
*)
# In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked.
# shellcheck disable=SC3045
# shellcheck disable=SC2039,SC3045
ulimit -n "$MAX_FD" ||
warn "Could not set maximum file descriptor limit to $MAX_FD"
esac
Expand Down Expand Up @@ -197,11 +198,15 @@ if "$cygwin" || "$msys" ; then
done
fi

# Collect all arguments for the java command;
# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of
# shell script including quotes and variable substitutions, so put them in
# double quotes to make sure that they get re-expanded; and
# * put everything else in single quotes, so that it's not re-expanded.

# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'

# Collect all arguments for the java command:
# * DEFAULT_JVM_OPTS, JAVA_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments,
# and any embedded shellness will be escaped.
# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be
# treated as '${Hostname}' itself on the command line.

set -- \
"-Dorg.gradle.appname=$APP_BASE_NAME" \
Expand Down
Original file line number Diff line number Diff line change
@@ -1,52 +1,53 @@
package com.bakdata.kpops.examples;

import com.bakdata.kafka.KafkaProducerApplication;
import com.bakdata.kafka.ProducerApp;
import com.bakdata.kafka.ProducerBuilder;
import com.bakdata.kafka.ProducerRunnable;
import com.bakdata.kafka.SerializerConfig;
import com.bakdata.kafka.SimpleKafkaProducerApplication;
import com.google.common.io.Resources;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Properties;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.io.IOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.List;

import static com.bakdata.kafka.KafkaApplication.startApplication;

@Setter
public class SentenceProducer extends KafkaProducerApplication {
public class SentenceProducer implements ProducerApp {
static final String FILE_NAME = "kpops.txt";

public static void main(final String[] args) {
startApplication(new SentenceProducer(), args);
startApplication(
new SimpleKafkaProducerApplication<>(SentenceProducer::new),
args
);
}

@Override
protected Properties createKafkaProperties() {
final Properties kafkaProperties = super.createKafkaProperties();
kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return kafkaProperties;
}

@Override
protected void runApplication() {
try (final KafkaProducer<String, String> producer = this.createProducer()) {
final URL url = Resources.getResource(FILE_NAME);
final List<String> textLines = Resources.readLines(url, StandardCharsets.UTF_8);

for (final String textLine : textLines) {
this.publish(producer, textLine);
public ProducerRunnable buildRunnable(final ProducerBuilder producerBuilder) {
return () -> {
try (final Producer<String, String> producer = producerBuilder.createProducer()) {
final URL url = Resources.getResource(FILE_NAME);
final List<String> textLines = Resources.readLines(url, StandardCharsets.UTF_8);
final String outputTopic = producerBuilder.getTopics().getOutputTopic();
for (final String textLine : textLines) {
producer.send(new ProducerRecord<>(outputTopic, null, textLine));
}
producer.flush();
} catch (final IOException e) {
throw new RuntimeException("Error occurred while reading the .txt file.", e);
}
producer.flush();
} catch (final IOException e) {
throw new RuntimeException("Error occurred while reading the .txt file.", e);
}
};
}

private void publish(final Producer<? super String, ? super String> producer, final String line) {
producer.send(new ProducerRecord<>(this.getOutputTopic(), null, line));
@Override
public SerializerConfig defaultSerializationConfig() {
return new SerializerConfig(StringSerializer.class, StringSerializer.class);
}
}
Original file line number Diff line number Diff line change
@@ -1,46 +1,49 @@
package com.bakdata.kpops.examples;

import com.bakdata.kafka.KafkaStreamsApplication;
import java.util.Arrays;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import com.bakdata.kafka.KafkaApplication;
import com.bakdata.kafka.SerdeConfig;
import com.bakdata.kafka.SimpleKafkaStreamsApplication;
import com.bakdata.kafka.StreamsApp;
import com.bakdata.kafka.StreamsTopicConfig;
import com.bakdata.kafka.TopologyBuilder;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;

public class WordCountApplication extends KafkaStreamsApplication {
import java.util.Arrays;
import java.util.regex.Pattern;

public class WordCountApplication implements StreamsApp {
private static final Pattern COMPILE = Pattern.compile("\\W+");

public static void main(final String[] args) {
startApplication(new WordCountApplication(), args);
KafkaApplication.startApplication(
new SimpleKafkaStreamsApplication<>(WordCountApplication::new),
args
);
}

@Override
public void buildTopology(final StreamsBuilder builder) {
final KStream<String, String> textLines = builder.stream(this.getInputTopics());
public void buildTopology(final TopologyBuilder builder) {
final KStream<String, String> textLines = builder.streamInput();
final KTable<String, String> wordCounts = textLines
.flatMapValues(value -> Arrays.asList(COMPILE.split(value.toLowerCase())))
.groupBy((key, value) -> value)
.count()
// The redis sink connection lacks a Long converter and instead relies on a string converter.
.mapValues(Object::toString);

wordCounts.toStream().to(this.getOutputTopic());
wordCounts.toStream()
.to(builder.getTopics().getOutputTopic());
}

@Override
public String getUniqueAppId() {
return String.format("word-count-app-%s", this.getOutputTopic());
public String getUniqueAppId(final StreamsTopicConfig topics) {
return String.format("word-count-app-%s", topics.getOutputTopic());
}

@Override
protected Properties createKafkaProperties() {
final Properties kafkaProperties = super.createKafkaProperties();
kafkaProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class);
kafkaProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, StringSerde.class);
return kafkaProperties;
public SerdeConfig defaultSerializationConfig() {
return new SerdeConfig(StringSerde.class, StringSerde.class);
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
package com.bakdata.kpops.examples;

import static net.mguenther.kafka.junit.EmbeddedKafkaCluster.provisionWith;
import static net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig.defaultClusterConfig;
import static net.mguenther.kafka.junit.Wait.delay;
import static org.assertj.core.api.Assertions.assertThat;

import java.util.Map;
import java.util.concurrent.TimeUnit;
import com.bakdata.kafka.KafkaProducerApplication;
import com.bakdata.kafka.SimpleKafkaProducerApplication;
import net.mguenther.kafka.junit.EmbeddedKafkaCluster;
import net.mguenther.kafka.junit.KeyValue;
import net.mguenther.kafka.junit.ReadKeyValues;
Expand All @@ -17,11 +12,19 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.Map;
import java.util.concurrent.TimeUnit;

import static net.mguenther.kafka.junit.EmbeddedKafkaCluster.provisionWith;
import static net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig.defaultClusterConfig;
import static net.mguenther.kafka.junit.Wait.delay;
import static org.assertj.core.api.Assertions.assertThat;

class SentenceProducerIntegrationTest {
private static final int TIMEOUT_SECONDS = 10;
private static final String OUTPUT_TOPIC = "word-count-raw-data";
private final EmbeddedKafkaCluster kafkaCluster = provisionWith(defaultClusterConfig());
private SentenceProducer sentenceProducer;
private KafkaProducerApplication<SentenceProducer> sentenceProducer;

@BeforeEach
void setup() {
Expand All @@ -37,8 +40,8 @@ void teardown() {
@Test
void shouldRunApp() throws InterruptedException {
this.kafkaCluster.createTopic(TopicConfig.withName(OUTPUT_TOPIC).useDefaults());

this.sentenceProducer.run();

delay(TIMEOUT_SECONDS, TimeUnit.SECONDS);

assertThat(
Expand All @@ -59,11 +62,12 @@ void shouldRunApp() throws InterruptedException {
});
}

private SentenceProducer setupApp() {
final SentenceProducer producerApp = new SentenceProducer();
producerApp.setBrokers(this.kafkaCluster.getBrokerList());
private KafkaProducerApplication<SentenceProducer> setupApp() {
final SimpleKafkaProducerApplication<SentenceProducer> producerApp
= new SimpleKafkaProducerApplication<>(SentenceProducer::new);
producerApp.setBootstrapServers(this.kafkaCluster.getBrokerList());
producerApp.setOutputTopic(OUTPUT_TOPIC);
producerApp.setStreamsConfig(Map.of(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"));
producerApp.setKafkaConfig(Map.of(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"));
return producerApp;
}
}
Loading

0 comments on commit 3fef629

Please sign in to comment.