From 9d4aea9888a4b6b023d00bc3244168fb2c817424 Mon Sep 17 00:00:00 2001 From: Jakob Edding <15202881+JakobEdding@users.noreply.github.com> Date: Fri, 12 Apr 2024 15:22:07 +0200 Subject: [PATCH] Add method to convert output topic to List (#93) --- README.md | 28 ++++++++- .../junit4/WordCountTest.java | 62 ++++++++++++++++++- .../junit5/WordCountTest.java | 62 ++++++++++++++++++- .../BaseOutput.java | 25 +++++++- .../Expectation.java | 2 +- .../TestOutput.java | 13 +++- .../WordCountTest.java | 62 ++++++++++++++++++- 7 files changed, 243 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 3d206b4..e2525ad 100644 --- a/README.md +++ b/README.md @@ -101,9 +101,8 @@ To get the output, `TestTopology` provides two methods: `.streamOutput()` and `. They behave just like the input with regard to the number of output topics. Using the stream version simulates Kafka's stream-semantics, meaning that a key can be present many times in an output stream, whereas the table-semantics only output the newest value of each key. -To check the output records, you can call `.expectNextRecord()` to indicate that the output should not be empty. -You can then inspect the record with `.hasKey(K key)` and `.hasValue(V value)`. -Both are optional, but highly recommended so that your output is always valid. +To check the output records, you can call `.expectNextRecord()` and then chain `.hasKey(K key)`, `.hasKeySatisfying(Consumer requirements)`, `.hasValue(V value)` or `.hasValueSatisfying(Consumer requirements)` to this call. +Note that calling `.expectNextRecord()` by itself without chaining at least one of the `.has*` methods will not check for the existence of a next record! Once you expect no further records, call `.expectNoMoreRecord()` to indicate the end of the output stream. @@ -143,6 +142,29 @@ void shouldReturnCorrectIteratorTable() { } ``` +Alternatively, you can convert the output to `List` for use with your assertion framework. Here is an example of this with [AssertJ](http://joel-costigliola.github.io/assertj/). + +```java +@Test +void shouldConvertStreamOutputToList(){ + this.testTopology.input() + .add("cat") + .add("dog") + .add("bird"); + + final List>outputs = this.testTopology.streamOutput() + .withSerde(Serdes.String(), Serdes.Long()) + .toList(); + + assertThat(outputs) + .extracting(ProducerRecord::key) + .containsExactly("cat", "dog", "bird"); + assertThat(outputs) + .extracting(ProducerRecord::value) + .containsExactly(1L, 1L, 1L); +} +``` + #### More Examples You can find many more tests diff --git a/fluent-kafka-streams-tests-junit4/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit4/WordCountTest.java b/fluent-kafka-streams-tests-junit4/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit4/WordCountTest.java index e14eab6..8732451 100644 --- a/fluent-kafka-streams-tests-junit4/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit4/WordCountTest.java +++ b/fluent-kafka-streams-tests-junit4/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit4/WordCountTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2019 bakdata GmbH + * Copyright (c) 2024 bakdata GmbH * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -244,4 +244,64 @@ public void shouldBeDoneAfterSingleWord() { public void shouldDoNothingOnEmptyInput() { this.testTopology.streamOutput().expectNoMoreRecord().and().expectNoMoreRecord().toBeEmpty(); } + + @Test + public void shouldConvertStreamOutputToList() { + this.testTopology.input() + .add("bla") + .add("blub") + .add("bla"); + + final List> outputs = this.testTopology.streamOutput() + .withSerde(Serdes.String(), Serdes.Long()) + .toList(); + + assertThat(outputs) + .extracting(ProducerRecord::key) + .containsExactly("bla", "blub", "bla"); + assertThat(outputs) + .extracting(ProducerRecord::value) + .containsExactly(1L, 1L, 2L); + } + + @Test + public void shouldConvertTableOutputToList() { + this.testTopology.input() + .add("bla") + .add("blub") + .add("bla"); + + final List> outputs = this.testTopology.tableOutput() + .withSerde(Serdes.String(), Serdes.Long()) + .toList(); + + assertThat(outputs) + .extracting(ProducerRecord::key) + .containsExactly("bla", "blub"); + assertThat(outputs) + .extracting(ProducerRecord::value) + .containsExactly(2L, 1L); + } + + @Test + public void shouldConvertEmptyStreamOutputToEmptyList() { + final List> outputs = this.testTopology.streamOutput() + .withSerde(Serdes.String(), Serdes.Long()) + .toList(); + + assertThat(outputs) + .isInstanceOf(List.class) + .isEmpty(); + } + + @Test + public void shouldConvertEmptyTableOutputToEmptyList() { + final List> outputs = this.testTopology.tableOutput() + .withSerde(Serdes.String(), Serdes.Long()) + .toList(); + + assertThat(outputs) + .isInstanceOf(List.class) + .isEmpty(); + } } diff --git a/fluent-kafka-streams-tests-junit5/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit5/WordCountTest.java b/fluent-kafka-streams-tests-junit5/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit5/WordCountTest.java index 4fa9d1b..c12b88a 100644 --- a/fluent-kafka-streams-tests-junit5/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit5/WordCountTest.java +++ b/fluent-kafka-streams-tests-junit5/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit5/WordCountTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2019 bakdata GmbH + * Copyright (c) 2024 bakdata GmbH * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -244,4 +244,64 @@ void shouldBeDoneAfterSingleWord() { void shouldDoNothingOnEmptyInput() { this.testTopology.streamOutput().expectNoMoreRecord().and().expectNoMoreRecord().toBeEmpty(); } + + @Test + void shouldConvertStreamOutputToList() { + this.testTopology.input() + .add("bla") + .add("blub") + .add("bla"); + + final List> outputs = this.testTopology.streamOutput() + .withSerde(Serdes.String(), Serdes.Long()) + .toList(); + + assertThat(outputs) + .extracting(ProducerRecord::key) + .containsExactly("bla", "blub", "bla"); + assertThat(outputs) + .extracting(ProducerRecord::value) + .containsExactly(1L, 1L, 2L); + } + + @Test + void shouldConvertTableOutputToList() { + this.testTopology.input() + .add("bla") + .add("blub") + .add("bla"); + + final List> outputs = this.testTopology.tableOutput() + .withSerde(Serdes.String(), Serdes.Long()) + .toList(); + + assertThat(outputs) + .extracting(ProducerRecord::key) + .containsExactly("bla", "blub"); + assertThat(outputs) + .extracting(ProducerRecord::value) + .containsExactly(2L, 1L); + } + + @Test + void shouldConvertEmptyStreamOutputToEmptyList() { + final List> outputs = this.testTopology.streamOutput() + .withSerde(Serdes.String(), Serdes.Long()) + .toList(); + + assertThat(outputs) + .isInstanceOf(List.class) + .isEmpty(); + } + + @Test + void shouldConvertEmptyTableOutputToEmptyList() { + final List> outputs = this.testTopology.tableOutput() + .withSerde(Serdes.String(), Serdes.Long()) + .toList(); + + assertThat(outputs) + .isInstanceOf(List.class) + .isEmpty(); + } } diff --git a/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/BaseOutput.java b/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/BaseOutput.java index 2f4c513..9e73cf0 100644 --- a/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/BaseOutput.java +++ b/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/BaseOutput.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2019 bakdata GmbH + * Copyright (c) 2024 bakdata GmbH * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -24,6 +24,8 @@ package com.bakdata.fluent_kafka_streams_tests; +import java.util.ArrayList; +import java.util.List; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.TestOutputTopic; @@ -74,7 +76,10 @@ public TestOutput withValueSerde(final Serde valueSerde) { } /** - * Reads the next record as creates an {@link Expectation} for it.
+ * Reads the next record and creates an {@link Expectation} for it.
+ * + * Note that calling this method by itself without chaining at least one of the {@code has*()} methods will not + * check for the existence of a next record!
* * @return An {@link Expectation} containing the next record from the output.
*/ @@ -114,6 +119,22 @@ public TestOutput asStream() { return new StreamOutput<>(this.testDriver, this.topic, this.keySerde, this.valueSerde); } + /** + * Convert the output to a {@link java.util.List}. In case the current instance of this class is a + * {@link StreamOutput}, the output will be converted to List with {@link org.apache.kafka.streams.kstream.KStream} + * semantics (each key multiple times). In case the current instance of this class is a {@link TableOutput}, the + * output will be converted to List with {@link org.apache.kafka.streams.kstream.KTable} semantics (each key only + * once). + * + * @return A {@link java.util.List} representing the output + */ + @Override + public List> toList() { + final List> list = new ArrayList<>(); + this.iterator().forEachRemaining(list::add); + return list; + } + // ================== // Non-public methods // ================== diff --git a/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/Expectation.java b/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/Expectation.java index c299e35..d10b017 100644 --- a/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/Expectation.java +++ b/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/Expectation.java @@ -113,7 +113,7 @@ public Expectation and() { } /** - *

Reads the next record as creates an {@code Expectation} for it.

+ *

Reads the next record and creates an {@code Expectation} for it.

*

This is logically equivalent to {@link TestOutput#expectNextRecord()}.

*

This methods main purpose is to allow chaining:

*
{@code
diff --git a/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/TestOutput.java b/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/TestOutput.java
index 22ae948..8d18e69 100644
--- a/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/TestOutput.java
+++ b/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/TestOutput.java
@@ -1,7 +1,7 @@
 /*
  * MIT License
  *
- * Copyright (c) 2023 bakdata GmbH
+ * Copyright (c) 2024 bakdata GmbH
  *
  * Permission is hereby granted, free of charge, to any person obtaining a copy
  * of this software and associated documentation files (the "Software"), to deal
@@ -24,6 +24,7 @@
 
 package com.bakdata.fluent_kafka_streams_tests;
 
+import java.util.List;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.Serde;
 
@@ -117,7 +118,7 @@ default  TestOutput withValueType(final Class valueType) {
     ProducerRecord readOneRecord();
 
     /**
-     * Reads the next record as creates an {@link Expectation} for it.
+     * Reads the next record and creates an {@link Expectation} for it.
      *
      * @return An {@link Expectation} containing the next record from the output.
      */
@@ -144,8 +145,16 @@ default  TestOutput withValueType(final Class valueType) {
      * 

This is the default, there should usually be no need to call this method.

*

Note: once the first value of the stream has been read or the iterator has be called, you cannot switch * between the output types any more.

+ * * @return Current output with {@link org.apache.kafka.streams.kstream.KStream} semantics */ TestOutput asStream(); + + /** + * Convert the output to a {@link java.util.List}. + * + * @return A {@link java.util.List} representing the output + */ + List> toList(); } diff --git a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/WordCountTest.java b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/WordCountTest.java index 1b35b18..159d502 100644 --- a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/WordCountTest.java +++ b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/WordCountTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2023 bakdata GmbH + * Copyright (c) 2024 bakdata GmbH * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -387,4 +387,64 @@ void shouldBeDoneAfterSingleWord() { void shouldDoNothingOnEmptyInput() { this.testTopology.streamOutput().expectNoMoreRecord().and().expectNoMoreRecord().toBeEmpty(); } + + @Test + void shouldConvertStreamOutputToList() { + this.testTopology.input() + .add("bla") + .add("blub") + .add("bla"); + + final List> outputs = this.testTopology.streamOutput() + .withSerde(Serdes.String(), Serdes.Long()) + .toList(); + + assertThat(outputs) + .extracting(ProducerRecord::key) + .containsExactly("bla", "blub", "bla"); + assertThat(outputs) + .extracting(ProducerRecord::value) + .containsExactly(1L, 1L, 2L); + } + + @Test + void shouldConvertTableOutputToList() { + this.testTopology.input() + .add("bla") + .add("blub") + .add("bla"); + + final List> outputs = this.testTopology.tableOutput() + .withSerde(Serdes.String(), Serdes.Long()) + .toList(); + + assertThat(outputs) + .extracting(ProducerRecord::key) + .containsExactly("bla", "blub"); + assertThat(outputs) + .extracting(ProducerRecord::value) + .containsExactly(2L, 1L); + } + + @Test + void shouldConvertEmptyStreamOutputToEmptyList() { + final List> outputs = this.testTopology.streamOutput() + .withSerde(Serdes.String(), Serdes.Long()) + .toList(); + + assertThat(outputs) + .isInstanceOf(List.class) + .isEmpty(); + } + + @Test + void shouldConvertEmptyTableOutputToEmptyList() { + final List> outputs = this.testTopology.tableOutput() + .withSerde(Serdes.String(), Serdes.Long()) + .toList(); + + assertThat(outputs) + .isInstanceOf(List.class) + .isEmpty(); + } }