diff --git a/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java b/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java index c2502e31a488e..cce48cd0925ed 100644 --- a/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java @@ -87,16 +87,4 @@ public static void checkSupplier(final Supplier supplier) { " %s#get() must return a new object each time it is called.", supplierClass, supplierClass)); } } - - /** - * @throws IllegalArgumentException if the same instance is obtained each time - */ - @SuppressWarnings("deprecation") - public static void checkSupplier(final org.apache.kafka.streams.kstream.ValueTransformerSupplier supplier) { - if (supplier.get() == supplier.get()) { - final String supplierClass = supplier.getClass().getName(); - throw new IllegalArgumentException(String.format("%s generates single reference." + - " %s#get() must return a new object each time it is called.", supplierClass, supplierClass)); - } - } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 5c8c5ef092d93..59ba928299e98 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -431,8 +431,7 @@ KStream mapValues(final ValueMapperWithKey KStream flatMap(final KeyValueMapper>> mapper); @@ -481,8 +480,7 @@ KStream mapValues(final ValueMapperWithKey KStream flatMap(final KeyValueMapper>> mapper, final Named named); @@ -523,8 +521,7 @@ KStream flatMap(final KeyValueMapper KStream flatMapValues(final ValueMapper> mapper); @@ -565,8 +562,7 @@ KStream flatMap(final KeyValueMapper KStream flatMapValues(final ValueMapper> mapper, final Named named); @@ -612,8 +608,7 @@ KStream flatMapValues(final ValueMapper KStream flatMapValues(final ValueMapperWithKey> mapper); @@ -660,8 +655,7 @@ KStream flatMapValues(final ValueMapper KStream flatMapValues(final ValueMapperWithKey> mapper, final Named named); @@ -2972,491 +2966,6 @@ KStream leftJoin(final GlobalKTable globalTable, final KeyValueMapper keySelector, final ValueJoinerWithKey valueJoiner, final Named named); - /** - * Transform the value of each input record into zero or more new values (with possibly a new - * type) and emit for each new value a record with the same key of the input record and the value. - * A {@link ValueTransformer} (provided by the given {@link ValueTransformerSupplier}) is applied to each input - * record value and computes zero or more new values. - * Thus, an input record {@code } can be transformed into output records {@code , , ...}. - * Attaching a state store makes this a stateful record-by-record operation (cf. {@link #mapValues(ValueMapper) mapValues()}). - * If you choose not to attach one, this operation is similar to the stateless {@link #mapValues(ValueMapper) mapValues()} - * but allows access to the {@code ProcessorContext} and record metadata. - * This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI. - * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()} - * the processing progress can be observed and additional periodic actions can be performed. - *

- * In order for the transformer to use state stores, the stores must be added to the topology and connected to the - * transformer using at least one of two strategies (though it's not required to connect global state stores; read-only - * access to global state stores is available by default). - *

- * The first strategy is to manually add the {@link StoreBuilder}s via {@link Topology#addStateStore(StoreBuilder, String...)}, - * and specify the store names via {@code stateStoreNames} so they will be connected to the transformer. - *

{@code
-     * // create store
-     * StoreBuilder> keyValueStoreBuilder =
-     *         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
-     *                 Serdes.String(),
-     *                 Serdes.String());
-     * // add store
-     * builder.addStateStore(keyValueStoreBuilder);
-     *
-     * KStream outputStream = inputStream.flatTransformValues(new ValueTransformerSupplier() {
-     *     public ValueTransformer get() {
-     *         return new MyValueTransformer();
-     *     }
-     * }, "myValueTransformState");
-     * }
- * The second strategy is for the given {@link ValueTransformerSupplier} to implement {@link ConnectedStoreProvider#stores()}, - * which provides the {@link StoreBuilder}s to be automatically added to the topology and connected to the transformer. - *
{@code
-     * class MyValueTransformerSupplier implements ValueTransformerSupplier {
-     *     // supply transformer
-     *     ValueTransformerWithKey get() {
-     *         return new MyValueTransformerWithKey();
-     *     }
-     *
-     *     // provide store(s) that will be added and connected to the associated transformer
-     *     // the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext
-     *     Set stores() {
-     *         StoreBuilder> keyValueStoreBuilder =
-     *                   Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
-     *                   Serdes.String(),
-     *                   Serdes.String());
-     *         return Collections.singleton(keyValueStoreBuilder);
-     *     }
-     * }
-     *
-     * ...
-     *
-     * KStream outputStream = inputStream.flatTransformValues(new MyValueTransformer());
-     * }
- *

- * With either strategy, within the {@link ValueTransformer}, the state is obtained via the {@link ProcessorContext}. - * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()}, - * a schedule must be registered. - * The {@link ValueTransformer} must return an {@link java.lang.Iterable} type (e.g., any - * {@link java.util.Collection} type) in {@link ValueTransformer#transform(Object) - * transform()}. - * If the return value of {@link ValueTransformer#transform(Object) ValueTransformer#transform()} is an empty - * {@link java.lang.Iterable Iterable} or {@code null}, no records are emitted. - * No additional {@link KeyValue} pairs can be emitted via - * {@link org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) ProcessorContext.forward()}. - * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformer} tries to - * emit a {@link KeyValue} pair. - *

{@code
-     * class MyValueTransformer implements ValueTransformer {
-     *     private StateStore state;
-     *
-     *     void init(ProcessorContext context) {
-     *         this.state = context.getStateStore("myValueTransformState");
-     *         // punctuate each second, can access this.state
-     *         context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
-     *     }
-     *
-     *     Iterable transform(V value) {
-     *         // can access this.state
-     *         List result = new ArrayList<>();
-     *         for (int i = 0; i < 3; i++) {
-     *             result.add(new NewValueType(value));
-     *         }
-     *         return result; // values
-     *     }
-     *
-     *     void close() {
-     *         // can access this.state
-     *     }
-     * }
-     * }
- * Even if any upstream operation was key-changing, no auto-repartition is triggered. - * If repartitioning is required, a call to {@link #repartition()} should be performed before - * {@code flatTransformValues()}. - *

- * Setting a new value preserves data co-location with respect to the key. - * Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) - * is applied to the result {@code KStream}. - * - * @param valueTransformerSupplier an instance of {@link ValueTransformerSupplier} that generates a newly constructed {@link ValueTransformer} - * The supplier should always generate a new instance. Creating a single {@link ValueTransformer} object - * and returning the same object reference in {@link ValueTransformer} is a - * violation of the supplier pattern and leads to runtime exceptions. - * @param stateStoreNames the names of the state stores used by the processor; not required if the supplier - * implements {@link ConnectedStoreProvider#stores()} - * @param the value type of the result stream - * @return a {@code KStream} that contains more or less records with unmodified key and new values (possibly of - * different type) - * @see #mapValues(ValueMapper) - * @see #mapValues(ValueMapperWithKey) - * @deprecated Since 3.3. Use {@link KStream#processValues(FixedKeyProcessorSupplier, String...)} instead. - */ - @Deprecated - KStream flatTransformValues(final ValueTransformerSupplier> valueTransformerSupplier, - final String... stateStoreNames); - - /** - * Transform the value of each input record into zero or more new values (with possibly a new - * type) and emit for each new value a record with the same key of the input record and the value. - * A {@link ValueTransformer} (provided by the given {@link ValueTransformerSupplier}) is applied to each input - * record value and computes zero or more new values. - * Thus, an input record {@code } can be transformed into output records {@code , , ...}. - * Attaching a state store makes this a stateful record-by-record operation (cf. {@link #mapValues(ValueMapper) mapValues()}). - * If you choose not to attach one, this operation is similar to the stateless {@link #mapValues(ValueMapper) mapValues()} - * but allows access to the {@code ProcessorContext} and record metadata. - * This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI. - * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()} - * the processing progress can be observed and additional periodic actions can be performed. - *

- * In order for the transformer to use state stores, the stores must be added to the topology and connected to the - * transformer using at least one of two strategies (though it's not required to connect global state stores; read-only - * access to global state stores is available by default). - *

- * The first strategy is to manually add the {@link StoreBuilder}s via {@link Topology#addStateStore(StoreBuilder, String...)}, - * and specify the store names via {@code stateStoreNames} so they will be connected to the transformer. - *

{@code
-     * // create store
-     * StoreBuilder> keyValueStoreBuilder =
-     *         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
-     *                 Serdes.String(),
-     *                 Serdes.String());
-     * // add store
-     * builder.addStateStore(keyValueStoreBuilder);
-     *
-     * KStream outputStream = inputStream.flatTransformValues(new ValueTransformerSupplier() {
-     *     public ValueTransformer get() {
-     *         return new MyValueTransformer();
-     *     }
-     * }, "myValueTransformState");
-     * }
- * The second strategy is for the given {@link ValueTransformerSupplier} to implement {@link ConnectedStoreProvider#stores()}, - * which provides the {@link StoreBuilder}s to be automatically added to the topology and connected to the transformer. - *
{@code
-     * class MyValueTransformerSupplier implements ValueTransformerSupplier {
-     *     // supply transformer
-     *     ValueTransformerWithKey get() {
-     *         return new MyValueTransformerWithKey();
-     *     }
-     *
-     *     // provide store(s) that will be added and connected to the associated transformer
-     *     // the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext
-     *     Set stores() {
-     *         StoreBuilder> keyValueStoreBuilder =
-     *                   Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
-     *                   Serdes.String(),
-     *                   Serdes.String());
-     *         return Collections.singleton(keyValueStoreBuilder);
-     *     }
-     * }
-     *
-     * ...
-     *
-     * KStream outputStream = inputStream.flatTransformValues(new MyValueTransformer());
-     * }
- *

- * With either strategy, within the {@link ValueTransformer}, the state is obtained via the {@link ProcessorContext}. - * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()}, - * a schedule must be registered. - * The {@link ValueTransformer} must return an {@link java.lang.Iterable} type (e.g., any - * {@link java.util.Collection} type) in {@link ValueTransformer#transform(Object) - * transform()}. - * If the return value of {@link ValueTransformer#transform(Object) ValueTransformer#transform()} is an empty - * {@link java.lang.Iterable Iterable} or {@code null}, no records are emitted. - * No additional {@link KeyValue} pairs can be emitted via - * {@link org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) ProcessorContext.forward()}. - * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformer} tries to - * emit a {@link KeyValue} pair. - *

{@code
-     * class MyValueTransformer implements ValueTransformer {
-     *     private StateStore state;
-     *
-     *     void init(ProcessorContext context) {
-     *         this.state = context.getStateStore("myValueTransformState");
-     *         // punctuate each second, can access this.state
-     *         context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
-     *     }
-     *
-     *     Iterable transform(V value) {
-     *         // can access this.state
-     *         List result = new ArrayList<>();
-     *         for (int i = 0; i < 3; i++) {
-     *             result.add(new NewValueType(value));
-     *         }
-     *         return result; // values
-     *     }
-     *
-     *     void close() {
-     *         // can access this.state
-     *     }
-     * }
-     * }
- * Even if any upstream operation was key-changing, no auto-repartition is triggered. - * If repartitioning is required, a call to {@link #repartition()} should be performed before - * {@code flatTransformValues()}. - *

- * Setting a new value preserves data co-location with respect to the key. - * Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) - * is applied to the result {@code KStream}. - * - * @param valueTransformerSupplier an instance of {@link ValueTransformerSupplier} that generates a newly constructed {@link ValueTransformer} - * The supplier should always generate a new instance. Creating a single {@link ValueTransformer} object - * and returning the same object reference in {@link ValueTransformer} is a - * violation of the supplier pattern and leads to runtime exceptions. - * @param named a {@link Named} config used to name the processor in the topology - * @param stateStoreNames the names of the state stores used by the processor; not required if the supplier - * implements {@link ConnectedStoreProvider#stores()} - * @param the value type of the result stream - * @return a {@code KStream} that contains more or less records with unmodified key and new values (possibly of - * different type) - * @see #mapValues(ValueMapper) - * @see #mapValues(ValueMapperWithKey) - * @deprecated Since 3.3. Use {@link KStream#processValues(FixedKeyProcessorSupplier, Named, String...)} instead. - */ - @Deprecated - KStream flatTransformValues(final ValueTransformerSupplier> valueTransformerSupplier, - final Named named, - final String... stateStoreNames); - - /** - * Transform the value of each input record into zero or more new values (with possibly a new - * type) and emit for each new value a record with the same key of the input record and the value. - * A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applied to - * each input record value and computes zero or more new values. - * Thus, an input record {@code } can be transformed into output records {@code , , ...}. - * Attaching a state store makes this a stateful record-by-record operation (cf. {@link #flatMapValues(ValueMapperWithKey) flatMapValues()}). - * If you choose not to attach one, this operation is similar to the stateless {@link #flatMapValues(ValueMapperWithKey) flatMapValues()} - * but allows access to the {@code ProcessorContext} and record metadata. - * This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI. - * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress can - * be observed and additional periodic actions can be performed. - *

- * In order for the transformer to use state stores, the stores must be added to the topology and connected to the - * transformer using at least one of two strategies (though it's not required to connect global state stores; read-only - * access to global state stores is available by default). - *

- * The first strategy is to manually add the {@link StoreBuilder}s via {@link Topology#addStateStore(StoreBuilder, String...)}, - * and specify the store names via {@code stateStoreNames} so they will be connected to the transformer. - *

{@code
-     * // create store
-     * StoreBuilder> keyValueStoreBuilder =
-     *         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
-     *                 Serdes.String(),
-     *                 Serdes.String());
-     * // add store
-     * builder.addStateStore(keyValueStoreBuilder);
-     *
-     * KStream outputStream = inputStream.flatTransformValues(new ValueTransformerWithKeySupplier() {
-     *     public ValueTransformerWithKey get() {
-     *         return new MyValueTransformerWithKey();
-     *     }
-     * }, "myValueTransformState");
-     * }
- * The second strategy is for the given {@link ValueTransformerSupplier} to implement {@link ConnectedStoreProvider#stores()}, - * which provides the {@link StoreBuilder}s to be automatically added to the topology and connected to the transformer. - *
{@code
-     * class MyValueTransformerWithKeySupplier implements ValueTransformerWithKeySupplier {
-     *     // supply transformer
-     *     ValueTransformerWithKey get() {
-     *         return new MyValueTransformerWithKey();
-     *     }
-     *
-     *     // provide store(s) that will be added and connected to the associated transformer
-     *     // the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext
-     *     Set stores() {
-     *         StoreBuilder> keyValueStoreBuilder =
-     *                   Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
-     *                   Serdes.String(),
-     *                   Serdes.String());
-     *         return Collections.singleton(keyValueStoreBuilder);
-     *     }
-     * }
-     *
-     * ...
-     *
-     * KStream outputStream = inputStream.flatTransformValues(new MyValueTransformerWithKey());
-     * }
- *

- * With either strategy, within the {@link ValueTransformerWithKey}, the state is obtained via the {@link ProcessorContext}. - * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()}, - * a schedule must be registered. - * The {@link ValueTransformerWithKey} must return an {@link java.lang.Iterable} type (e.g., any - * {@link java.util.Collection} type) in {@link ValueTransformerWithKey#transform(Object, Object) - * transform()}. - * If the return value of {@link ValueTransformerWithKey#transform(Object, Object) ValueTransformerWithKey#transform()} - * is an empty {@link java.lang.Iterable Iterable} or {@code null}, no records are emitted. - * No additional {@link KeyValue} pairs can be emitted via - * {@link org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) ProcessorContext.forward()}. - * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformerWithKey} tries - * to emit a {@link KeyValue} pair. - *

{@code
-     * class MyValueTransformerWithKey implements ValueTransformerWithKey {
-     *     private StateStore state;
-     *
-     *     void init(ProcessorContext context) {
-     *         this.state = context.getStateStore("myValueTransformState");
-     *         // punctuate each second, can access this.state
-     *         context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
-     *     }
-     *
-     *     Iterable transform(K readOnlyKey, V value) {
-     *         // can access this.state and use read-only key
-     *         List result = new ArrayList<>();
-     *         for (int i = 0; i < 3; i++) {
-     *             result.add(new NewValueType(readOnlyKey));
-     *         }
-     *         return result; // values
-     *     }
-     *
-     *     void close() {
-     *         // can access this.state
-     *     }
-     * }
-     * }
- * Even if any upstream operation was key-changing, no auto-repartition is triggered. - * If repartitioning is required, a call to {@link #repartition()} should be performed before - * {@code flatTransformValues()}. - *

- * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. - * So, setting a new value preserves data co-location with respect to the key. - * Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) - * is applied to the result {@code KStream}. - * - * @param valueTransformerSupplier an instance of {@link ValueTransformerWithKeySupplier} that generates a newly constructed {@link ValueTransformerWithKey} - * The supplier should always generate a new instance. Creating a single {@link ValueTransformerWithKey} object - * and returning the same object reference in {@link ValueTransformerWithKey} is a - * violation of the supplier pattern and leads to runtime exceptions. - * @param stateStoreNames the names of the state stores used by the processor; not required if the supplier - * implements {@link ConnectedStoreProvider#stores()} - * @param the value type of the result stream - * @return a {@code KStream} that contains more or less records with unmodified key and new values (possibly of - * different type) - * @see #mapValues(ValueMapper) - * @see #mapValues(ValueMapperWithKey) - * @deprecated Since 3.3. Use {@link KStream#processValues(FixedKeyProcessorSupplier, String...)} instead. - */ - @Deprecated - KStream flatTransformValues(final ValueTransformerWithKeySupplier> valueTransformerSupplier, - final String... stateStoreNames); - - /** - * Transform the value of each input record into zero or more new values (with possibly a new - * type) and emit for each new value a record with the same key of the input record and the value. - * A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applied to - * each input record value and computes zero or more new values. - * Thus, an input record {@code } can be transformed into output records {@code , , ...}. - * Attaching a state store makes this a stateful record-by-record operation (cf. {@link #flatMapValues(ValueMapperWithKey) flatMapValues()}). - * If you choose not to attach one, this operation is similar to the stateless {@link #flatMapValues(ValueMapperWithKey) flatMapValues()} - * but allows access to the {@code ProcessorContext} and record metadata. - * This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI. - * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress can - * be observed and additional periodic actions can be performed. - *

- * In order for the transformer to use state stores, the stores must be added to the topology and connected to the - * transformer using at least one of two strategies (though it's not required to connect global state stores; read-only - * access to global state stores is available by default). - *

- * The first strategy is to manually add the {@link StoreBuilder}s via {@link Topology#addStateStore(StoreBuilder, String...)}, - * and specify the store names via {@code stateStoreNames} so they will be connected to the transformer. - *

{@code
-     * // create store
-     * StoreBuilder> keyValueStoreBuilder =
-     *         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
-     *                 Serdes.String(),
-     *                 Serdes.String());
-     * // add store
-     * builder.addStateStore(keyValueStoreBuilder);
-     *
-     * KStream outputStream = inputStream.flatTransformValues(new ValueTransformerWithKeySupplier() {
-     *     public ValueTransformerWithKey get() {
-     *         return new MyValueTransformerWithKey();
-     *     }
-     * }, "myValueTransformState");
-     * }
- * The second strategy is for the given {@link ValueTransformerSupplier} to implement {@link ConnectedStoreProvider#stores()}, - * which provides the {@link StoreBuilder}s to be automatically added to the topology and connected to the transformer. - *
{@code
-     * class MyValueTransformerWithKeySupplier implements ValueTransformerWithKeySupplier {
-     *     // supply transformer
-     *     ValueTransformerWithKey get() {
-     *         return new MyValueTransformerWithKey();
-     *     }
-     *
-     *     // provide store(s) that will be added and connected to the associated transformer
-     *     // the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext
-     *     Set stores() {
-     *         StoreBuilder> keyValueStoreBuilder =
-     *                   Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
-     *                   Serdes.String(),
-     *                   Serdes.String());
-     *         return Collections.singleton(keyValueStoreBuilder);
-     *     }
-     * }
-     *
-     * ...
-     *
-     * KStream outputStream = inputStream.flatTransformValues(new MyValueTransformerWithKey());
-     * }
- *

- * With either strategy, within the {@link ValueTransformerWithKey}, the state is obtained via the {@link ProcessorContext}. - * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()}, - * a schedule must be registered. - * The {@link ValueTransformerWithKey} must return an {@link java.lang.Iterable} type (e.g., any - * {@link java.util.Collection} type) in {@link ValueTransformerWithKey#transform(Object, Object) - * transform()}. - * If the return value of {@link ValueTransformerWithKey#transform(Object, Object) ValueTransformerWithKey#transform()} - * is an empty {@link java.lang.Iterable Iterable} or {@code null}, no records are emitted. - * No additional {@link KeyValue} pairs can be emitted via - * {@link org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) ProcessorContext.forward()}. - * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformerWithKey} tries - * to emit a {@link KeyValue} pair. - *

{@code
-     * class MyValueTransformerWithKey implements ValueTransformerWithKey {
-     *     private StateStore state;
-     *
-     *     void init(ProcessorContext context) {
-     *         this.state = context.getStateStore("myValueTransformState");
-     *         // punctuate each second, can access this.state
-     *         context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
-     *     }
-     *
-     *     Iterable transform(K readOnlyKey, V value) {
-     *         // can access this.state and use read-only key
-     *         List result = new ArrayList<>();
-     *         for (int i = 0; i < 3; i++) {
-     *             result.add(new NewValueType(readOnlyKey));
-     *         }
-     *         return result; // values
-     *     }
-     *
-     *     void close() {
-     *         // can access this.state
-     *     }
-     * }
-     * }
- * Even if any upstream operation was key-changing, no auto-repartition is triggered. - * If repartitioning is required, a call to {@link #repartition()} should be performed before - * {@code flatTransformValues()}. - *

- * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. - * So, setting a new value preserves data co-location with respect to the key. - * Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) - * is applied to the result {@code KStream}. - * - * @param valueTransformerSupplier an instance of {@link ValueTransformerWithKeySupplier} that generates a newly constructed {@link ValueTransformerWithKey} - * The supplier should always generate a new instance. Creating a single {@link ValueTransformerWithKey} object - * and returning the same object reference in {@link ValueTransformerWithKey} is a - * violation of the supplier pattern and leads to runtime exceptions. - * @param named a {@link Named} config used to name the processor in the topology - * @param stateStoreNames the names of the state stores used by the processor; not required if the supplier - * implements {@link ConnectedStoreProvider#stores()} - * @param the value type of the result stream - * @return a {@code KStream} that contains more or less records with unmodified key and new values (possibly of - * different type) - * @see #mapValues(ValueMapper) - * @see #mapValues(ValueMapperWithKey) - * @deprecated Since 3.3. Use {@link KStream#processValues(FixedKeyProcessorSupplier, Named, String...)} instead. - */ - @Deprecated - KStream flatTransformValues(final ValueTransformerWithKeySupplier> valueTransformerSupplier, - final Named named, - final String... stateStoreNames); /** * Process all records in this stream, one record at a time, by applying a diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java index 2483cbbbe1673..91a93d23a07e0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java @@ -17,17 +17,12 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.streams.internals.ApiUtils; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueJoinerWithKey; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.kstream.ValueMapperWithKey; -import org.apache.kafka.streams.kstream.ValueTransformerWithKey; -import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; import org.apache.kafka.streams.kstream.internals.graph.GraphNode; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; -import org.apache.kafka.streams.state.StoreBuilder; import java.util.Collection; import java.util.HashSet; @@ -109,40 +104,6 @@ static ValueMapperWithKey withKey(final ValueMapper return (readOnlyKey, value) -> valueMapper.apply(value); } - @SuppressWarnings("deprecation") - static ValueTransformerWithKeySupplier toValueTransformerWithKeySupplier( - final org.apache.kafka.streams.kstream.ValueTransformerSupplier valueTransformerSupplier) { - Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null"); - ApiUtils.checkSupplier(valueTransformerSupplier); - return new ValueTransformerWithKeySupplier() { - @Override - public ValueTransformerWithKey get() { - final org.apache.kafka.streams.kstream.ValueTransformer valueTransformer = valueTransformerSupplier.get(); - return new ValueTransformerWithKey() { - @Override - public void init(final ProcessorContext context) { - valueTransformer.init(context); - } - - @Override - public VR transform(final K readOnlyKey, final V value) { - return valueTransformer.transform(value); - } - - @Override - public void close() { - valueTransformer.close(); - } - }; - } - - @Override - public Set> stores() { - return valueTransformerSupplier.stores(); - } - }; - } - static ValueJoinerWithKey toValueJoinerWithKey(final ValueJoiner valueJoiner) { Objects.requireNonNull(valueJoiner, "joiner can't be null"); return (readOnlyKey, value1, value2) -> valueJoiner.apply(value1, value2); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransform.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransform.java deleted file mode 100644 index 5ce059990630a..0000000000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransform.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.processor.api.ContextualProcessor; -import org.apache.kafka.streams.processor.api.Processor; -import org.apache.kafka.streams.processor.api.ProcessorContext; -import org.apache.kafka.streams.processor.api.ProcessorSupplier; -import org.apache.kafka.streams.processor.api.Record; -import org.apache.kafka.streams.processor.internals.InternalProcessorContext; -import org.apache.kafka.streams.state.StoreBuilder; - -import java.util.Set; - -public class KStreamFlatTransform implements ProcessorSupplier { - - @SuppressWarnings("deprecation") - private final org.apache.kafka.streams.kstream.TransformerSupplier>> transformerSupplier; - - @SuppressWarnings("deprecation") - public KStreamFlatTransform(final org.apache.kafka.streams.kstream.TransformerSupplier>> transformerSupplier) { - this.transformerSupplier = transformerSupplier; - } - - @Override - public Processor get() { - return new KStreamFlatTransformProcessor<>(transformerSupplier.get()); - } - - @Override - public Set> stores() { - return transformerSupplier.stores(); - } - - public static class KStreamFlatTransformProcessor extends ContextualProcessor { - - @SuppressWarnings("deprecation") - private final org.apache.kafka.streams.kstream.Transformer>> transformer; - - @SuppressWarnings("deprecation") - public KStreamFlatTransformProcessor(final org.apache.kafka.streams.kstream.Transformer>> transformer) { - this.transformer = transformer; - } - - @Override - public void init(final ProcessorContext context) { - super.init(context); - transformer.init((InternalProcessorContext) context); - } - - @Override - public void process(final Record record) { - final Iterable> pairs = transformer.transform(record.key(), record.value()); - if (pairs != null) { - for (final KeyValue pair : pairs) { - context().forward(record.withKey(pair.key).withValue(pair.value)); - } - } - } - - @Override - public void close() { - transformer.close(); - } - } -} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java deleted file mode 100644 index 5469c668dfee2..0000000000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.streams.kstream.ValueTransformerWithKey; -import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; -import org.apache.kafka.streams.processor.api.ContextualProcessor; -import org.apache.kafka.streams.processor.api.Processor; -import org.apache.kafka.streams.processor.api.ProcessorContext; -import org.apache.kafka.streams.processor.api.ProcessorSupplier; -import org.apache.kafka.streams.processor.api.Record; -import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext; -import org.apache.kafka.streams.processor.internals.InternalProcessorContext; -import org.apache.kafka.streams.state.StoreBuilder; - -import java.util.Set; - -public class KStreamFlatTransformValues implements ProcessorSupplier { - - private final ValueTransformerWithKeySupplier> valueTransformerSupplier; - - public KStreamFlatTransformValues(final ValueTransformerWithKeySupplier> valueTransformerWithKeySupplier) { - this.valueTransformerSupplier = valueTransformerWithKeySupplier; - } - - @Override - public Processor get() { - return new KStreamFlatTransformValuesProcessor<>(valueTransformerSupplier.get()); - } - - @Override - public Set> stores() { - return valueTransformerSupplier.stores(); - } - - public static class KStreamFlatTransformValuesProcessor extends ContextualProcessor { - - private final ValueTransformerWithKey> valueTransformer; - - KStreamFlatTransformValuesProcessor(final ValueTransformerWithKey> valueTransformer) { - this.valueTransformer = valueTransformer; - } - - @Override - public void init(final ProcessorContext context) { - super.init(context); - valueTransformer.init(new ForwardingDisabledProcessorContext((InternalProcessorContext) context)); - } - - @Override - public void process(final Record record) { - final Iterable transformedValues = valueTransformer.transform(record.key(), record.value()); - if (transformedValues != null) { - for (final VOut transformedValue : transformedValues) { - context().forward(record.withValue(transformedValue)); - } - } - } - - @Override - public void close() { - super.close(); - valueTransformer.close(); - } - } - -} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 820c31f29e43e..ab27cfc1ea19d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -41,7 +41,6 @@ import org.apache.kafka.streams.kstream.ValueJoinerWithKey; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.kstream.ValueMapperWithKey; -import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; import org.apache.kafka.streams.kstream.internals.graph.BaseRepartitionNode; import org.apache.kafka.streams.kstream.internals.graph.BaseRepartitionNode.BaseRepartitionNodeBuilder; import org.apache.kafka.streams.kstream.internals.graph.GraphNode; @@ -121,8 +120,6 @@ public class KStreamImpl extends AbstractStream implements KStream KStream doStreamTableJoin(final KTable table, builder); } - @Override - @Deprecated - public KStream flatTransformValues(final org.apache.kafka.streams.kstream.ValueTransformerSupplier> valueTransformerSupplier, - final String... stateStoreNames) { - Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null"); - return doFlatTransformValues( - toValueTransformerWithKeySupplier(valueTransformerSupplier), - NamedInternal.empty(), - stateStoreNames); - } - - @Override - @Deprecated - public KStream flatTransformValues(final org.apache.kafka.streams.kstream.ValueTransformerSupplier> valueTransformerSupplier, - final Named named, - final String... stateStoreNames) { - Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null"); - return doFlatTransformValues( - toValueTransformerWithKeySupplier(valueTransformerSupplier), - named, - stateStoreNames); - } - - @Override - @Deprecated - public KStream flatTransformValues(final ValueTransformerWithKeySupplier> valueTransformerSupplier, - final String... stateStoreNames) { - Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null"); - return doFlatTransformValues(valueTransformerSupplier, NamedInternal.empty(), stateStoreNames); - } - - @Override - @Deprecated - public KStream flatTransformValues(final ValueTransformerWithKeySupplier> valueTransformerSupplier, - final Named named, - final String... stateStoreNames) { - Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null"); - return doFlatTransformValues(valueTransformerSupplier, named, stateStoreNames); - } - - private KStream doFlatTransformValues(final ValueTransformerWithKeySupplier> valueTransformerWithKeySupplier, - final Named named, - final String... stateStoreNames) { - Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be a null array"); - for (final String stateStoreName : stateStoreNames) { - Objects.requireNonNull(stateStoreName, "stateStoreNames can't contain `null` as store name"); - } - ApiUtils.checkSupplier(valueTransformerWithKeySupplier); - - final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, TRANSFORMVALUES_NAME); - final StatefulProcessorNode transformNode = new StatefulProcessorNode<>( - name, - new ProcessorParameters<>(new KStreamFlatTransformValues<>(valueTransformerWithKeySupplier), name), - stateStoreNames); - transformNode.setValueChangingOperation(true); - - builder.addGraphNode(graphNode, transformNode); - - // cannot inherit value serde - return new KStreamImpl<>( - name, - keySerde, - null, - subTopologySourceNodes, - repartitionRequired, - transformNode, - builder); - } - @Override @Deprecated public void process(final org.apache.kafka.streams.processor.ProcessorSupplier processorSupplier, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ConnectedStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/processor/ConnectedStoreProvider.java index 108a7d7233bf3..ad3a834257d1b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ConnectedStoreProvider.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ConnectedStoreProvider.java @@ -19,7 +19,6 @@ import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Named; -import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier; import org.apache.kafka.streams.state.StoreBuilder; @@ -92,11 +91,8 @@ * @see Topology#addProcessor(String, org.apache.kafka.streams.processor.api.ProcessorSupplier, String...) * @see KStream#process(org.apache.kafka.streams.processor.api.ProcessorSupplier, String...) * @see KStream#process(org.apache.kafka.streams.processor.api.ProcessorSupplier, Named, String...) - * @see KStream#processValues(FixedKeyProcessorSupplier, String...) - * @see KStream#flatTransformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier, String...) - * @see KStream#flatTransformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier, Named, String...) - * @see KStream#flatTransformValues(ValueTransformerWithKeySupplier, String...) - * @see KStream#flatTransformValues(ValueTransformerWithKeySupplier, Named, String...) + * @see KStream#processValues(FixedKeyProcessorSupplier, String...) + * @see KStream#processValues(FixedKeyProcessorSupplier, Named, String...) */ public interface ConnectedStoreProvider { diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java index 0c9031afbdfc6..056721fa8af1d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java @@ -61,12 +61,11 @@ import org.apache.kafka.streams.state.internals.WrappedStateStore; import org.apache.kafka.streams.utils.TestUtils.RecordingProcessorWrapper; import org.apache.kafka.streams.utils.TestUtils.RecordingProcessorWrapper.WrapperRecorder; +import org.apache.kafka.test.MockApiFixedKeyProcessorSupplier; import org.apache.kafka.test.MockApiProcessorSupplier; import org.apache.kafka.test.MockMapper; import org.apache.kafka.test.MockPredicate; import org.apache.kafka.test.MockValueJoiner; -import org.apache.kafka.test.NoopValueTransformer; -import org.apache.kafka.test.NoopValueTransformerWithKey; import org.apache.kafka.test.StreamsTestUtils; import org.hamcrest.CoreMatchers; @@ -1315,29 +1314,21 @@ public void shouldUseSpecifiedNameForProcessOperation() { } @Test - public void shouldUseSpecifiedNameForPrintOperation() { - builder.stream(STREAM_TOPIC).print(Printed.toSysOut().withName("print-processor")); - builder.build(); - final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology(); - assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", "print-processor"); - } + public void shouldUseSpecifiedNameForProcessValuesOperation() { + builder.stream(STREAM_TOPIC) + .processValues(new MockApiFixedKeyProcessorSupplier<>(), Named.as("test-fixed-key-processor")); - @Test - @SuppressWarnings("deprecation") - public void shouldUseSpecifiedNameForFlatTransformValueOperation() { - builder.stream(STREAM_TOPIC).flatTransformValues(() -> new NoopValueTransformer<>(), Named.as(STREAM_OPERATION_NAME)); builder.build(); final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology(); - assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME); + assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", "test-fixed-key-processor"); } @Test - @SuppressWarnings({"unchecked", "rawtypes", "deprecation"}) - public void shouldUseSpecifiedNameForFlatTransformValueWithKeyOperation() { - builder.stream(STREAM_TOPIC).flatTransformValues(() -> new NoopValueTransformerWithKey(), Named.as(STREAM_OPERATION_NAME)); + public void shouldUseSpecifiedNameForPrintOperation() { + builder.stream(STREAM_TOPIC).print(Printed.toSysOut().withName("print-processor")); builder.build(); final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology(); - assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME); + assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", "print-processor"); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java index ca4fd756cbc1b..01e833f1b976b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java @@ -32,7 +32,6 @@ import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.test.MockApiProcessorSupplier; -import org.apache.kafka.test.NoopValueTransformer; import org.apache.kafka.test.NoopValueTransformerWithKey; import org.junit.jupiter.api.Test; @@ -51,21 +50,6 @@ @MockitoSettings(strictness = Strictness.STRICT_STUBS) public class AbstractStreamTest { - @SuppressWarnings("deprecation") - @Test - public void testToInternalValueTransformerSupplierSuppliesNewTransformers() { - final org.apache.kafka.streams.kstream.ValueTransformerSupplier valueTransformerSupplier = - mock(org.apache.kafka.streams.kstream.ValueTransformerSupplier.class); - when(valueTransformerSupplier.get()) - .thenReturn(new NoopValueTransformer<>()) - .thenReturn(new NoopValueTransformer<>()); - final ValueTransformerWithKeySupplier valueTransformerWithKeySupplier = - AbstractStream.toValueTransformerWithKeySupplier(valueTransformerSupplier); - valueTransformerWithKeySupplier.get(); - valueTransformerWithKeySupplier.get(); - valueTransformerWithKeySupplier.get(); - } - @Test public void testToInternalValueTransformerWithKeySupplierSuppliesNewTransformers() { final ValueTransformerWithKeySupplier valueTransformerWithKeySupplier = diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java deleted file mode 100644 index 5335128aa459d..0000000000000 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.kstream.internals.KStreamFlatTransform.KStreamFlatTransformProcessor; -import org.apache.kafka.streams.processor.api.Processor; -import org.apache.kafka.streams.processor.api.Record; -import org.apache.kafka.streams.processor.internals.InternalProcessorContext; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.ArgumentMatchers; -import org.mockito.InOrder; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.mockito.junit.jupiter.MockitoSettings; -import org.mockito.quality.Strictness; - -import java.util.Arrays; -import java.util.Collections; - -import static org.junit.jupiter.api.Assertions.assertInstanceOf; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -@ExtendWith(MockitoExtension.class) -@MockitoSettings(strictness = Strictness.STRICT_STUBS) -@SuppressWarnings("deprecation") -public class KStreamFlatTransformTest { - - private Number inputKey; - private Number inputValue; - - @Mock - private org.apache.kafka.streams.kstream.Transformer>> transformer; - @Mock - private InternalProcessorContext context; - private InOrder inOrder; - - private KStreamFlatTransformProcessor processor; - - @BeforeEach - public void setUp() { - inputKey = 1; - inputValue = 10; - inOrder = inOrder(context); - processor = new KStreamFlatTransformProcessor<>(transformer); - } - - @Test - public void shouldInitialiseFlatTransformProcessor() { - processor.init(context); - - verify(transformer).init(context); - } - - @Test - public void shouldTransformInputRecordToMultipleOutputRecords() { - final Iterable> outputRecords = Arrays.asList( - KeyValue.pair(2, 20), - KeyValue.pair(3, 30), - KeyValue.pair(4, 40)); - - processor.init(context); - - when(transformer.transform(inputKey, inputValue)).thenReturn(outputRecords); - - processor.process(new Record<>(inputKey, inputValue, 0L)); - - for (final KeyValue outputRecord : outputRecords) { - inOrder.verify(context).forward(new Record<>(outputRecord.key, outputRecord.value, 0L)); - } - } - - @Test - public void shouldAllowEmptyListAsResultOfTransform() { - processor.init(context); - - when(transformer.transform(inputKey, inputValue)).thenReturn(Collections.emptyList()); - - processor.process(new Record<>(inputKey, inputValue, 0L)); - - inOrder.verify(context, never()).forward(ArgumentMatchers.>any()); - } - - @Test - public void shouldAllowNullAsResultOfTransform() { - processor.init(context); - - when(transformer.transform(inputKey, inputValue)).thenReturn(null); - - processor.process(new Record<>(inputKey, inputValue, 0L)); - - inOrder.verify(context, never()).forward(ArgumentMatchers.>any()); - } - - @Test - public void shouldCloseFlatTransformProcessor() { - processor.close(); - - verify(transformer).close(); - } - - @Test - public void shouldGetFlatTransformProcessor() { - @SuppressWarnings("unchecked") - final org.apache.kafka.streams.kstream.TransformerSupplier>> transformerSupplier = - mock(org.apache.kafka.streams.kstream.TransformerSupplier.class); - final KStreamFlatTransform processorSupplier = - new KStreamFlatTransform<>(transformerSupplier); - - when(transformerSupplier.get()).thenReturn(transformer); - - final Processor processor = processorSupplier.get(); - - assertInstanceOf(KStreamFlatTransformProcessor.class, processor); - } -} diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java deleted file mode 100644 index 50a636f349db3..0000000000000 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.streams.kstream.ValueTransformerWithKey; -import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; -import org.apache.kafka.streams.kstream.internals.KStreamFlatTransformValues.KStreamFlatTransformValuesProcessor; -import org.apache.kafka.streams.processor.api.Processor; -import org.apache.kafka.streams.processor.api.Record; -import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext; -import org.apache.kafka.streams.processor.internals.InternalProcessorContext; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.ArgumentMatchers; -import org.mockito.InOrder; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.mockito.junit.jupiter.MockitoSettings; -import org.mockito.quality.Strictness; - -import java.util.Arrays; -import java.util.Collections; - -import static org.junit.jupiter.api.Assertions.assertInstanceOf; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -@ExtendWith(MockitoExtension.class) -@MockitoSettings(strictness = Strictness.STRICT_STUBS) -public class KStreamFlatTransformValuesTest { - - private Integer inputKey; - private Integer inputValue; - - @Mock - private ValueTransformerWithKey> valueTransformer; - @Mock - private InternalProcessorContext context; - private InOrder inOrder; - - private KStreamFlatTransformValuesProcessor processor; - - @BeforeEach - public void setUp() { - inputKey = 1; - inputValue = 10; - inOrder = inOrder(context); - processor = new KStreamFlatTransformValuesProcessor<>(valueTransformer); - } - - @Test - public void shouldInitializeFlatTransformValuesProcessor() { - processor.init(context); - - verify(valueTransformer).init(ArgumentMatchers.isA(ForwardingDisabledProcessorContext.class)); - } - - @Test - public void shouldTransformInputRecordToMultipleOutputValues() { - final Iterable outputValues = Arrays.asList( - "Hello", - "Blue", - "Planet"); - - processor.init(context); - - when(valueTransformer.transform(inputKey, inputValue)).thenReturn(outputValues); - - processor.process(new Record<>(inputKey, inputValue, 0L)); - - for (final String outputValue : outputValues) { - inOrder.verify(context).forward(new Record<>(inputKey, outputValue, 0L)); - } - } - - @Test - public void shouldEmitNoRecordIfTransformReturnsEmptyList() { - processor.init(context); - - when(valueTransformer.transform(inputKey, inputValue)).thenReturn(Collections.emptyList()); - - processor.process(new Record<>(inputKey, inputValue, 0L)); - - inOrder.verify(context, never()).forward(ArgumentMatchers.>any()); - } - - @Test - public void shouldEmitNoRecordIfTransformReturnsNull() { - processor.init(context); - - when(valueTransformer.transform(inputKey, inputValue)).thenReturn(null); - - processor.process(new Record<>(inputKey, inputValue, 0L)); - - inOrder.verify(context, never()).forward(ArgumentMatchers.>any()); - } - - @Test - public void shouldCloseFlatTransformValuesProcessor() { - processor.close(); - - verify(valueTransformer).close(); - } - - @Test - public void shouldGetFlatTransformValuesProcessor() { - @SuppressWarnings("unchecked") - final ValueTransformerWithKeySupplier> valueTransformerSupplier = - mock(ValueTransformerWithKeySupplier.class); - final KStreamFlatTransformValues processorSupplier = - new KStreamFlatTransformValues<>(valueTransformerSupplier); - - when(valueTransformerSupplier.get()).thenReturn(valueTransformer); - - final Processor processor = processorSupplier.get(); - - assertInstanceOf(KStreamFlatTransformValuesProcessor.class, processor); - } -} diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index b78696f259ade..8d2a280be4e8e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -48,8 +48,6 @@ import org.apache.kafka.streams.kstream.ValueJoinerWithKey; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.kstream.ValueMapperWithKey; -import org.apache.kafka.streams.kstream.ValueTransformerWithKey; -import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.TopicNameExtractor; @@ -113,33 +111,6 @@ public class KStreamImplTest { private final Consumed stringConsumed = Consumed.with(Serdes.String(), Serdes.String()); private final MockApiProcessorSupplier processorSupplier = new MockApiProcessorSupplier<>(); private final MockApiFixedKeyProcessorSupplier fixedKeyProcessorSupplier = new MockApiFixedKeyProcessorSupplier<>(); - @SuppressWarnings("deprecation") - private final org.apache.kafka.streams.kstream.ValueTransformerSupplier> flatValueTransformerSupplier = - () -> new org.apache.kafka.streams.kstream.ValueTransformer>() { - @Override - public void init(final ProcessorContext context) {} - - @Override - public Iterable transform(final String value) { - return Collections.singleton(value); - } - - @Override - public void close() {} - }; - private final ValueTransformerWithKeySupplier> flatValueTransformerWithKeySupplier = - () -> new ValueTransformerWithKey>() { - @Override - public void init(final ProcessorContext context) {} - - @Override - public Iterable transform(final String key, final String value) { - return Collections.singleton(value); - } - - @Override - public void close() {} - }; private StreamsBuilder builder; private KStream testStream; @@ -1619,230 +1590,6 @@ public void shouldNotAllowBadProcessSupplierOnProcessValuesWithNamedAndStores() assertThat(exception.getMessage(), containsString("#get() must return a new object each time it is called.")); } - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullValueTransformerSupplierOnFlatTransformValues() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.flatTransformValues((org.apache.kafka.streams.kstream.ValueTransformerSupplier>) null)); - assertThat(exception.getMessage(), equalTo("valueTransformerSupplier can't be null")); - } - - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullValueTransformerWithKeySupplierOnFlatTransformValues() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.flatTransformValues((ValueTransformerWithKeySupplier>) null)); - assertThat(exception.getMessage(), equalTo("valueTransformerSupplier can't be null")); - } - - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullValueTransformerSupplierOnFlatTransformValuesWithStores() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.flatTransformValues( - (org.apache.kafka.streams.kstream.ValueTransformerSupplier>) null, - "stateStore")); - assertThat(exception.getMessage(), equalTo("valueTransformerSupplier can't be null")); - } - - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullValueTransformerWithKeySupplierOnFlatTransformValuesWithStores() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.flatTransformValues( - (ValueTransformerWithKeySupplier>) null, - "stateStore")); - assertThat(exception.getMessage(), equalTo("valueTransformerSupplier can't be null")); - } - - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullValueTransformerSupplierOnFlatTransformValuesWithNamed() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.flatTransformValues( - (org.apache.kafka.streams.kstream.ValueTransformerSupplier>) null, - Named.as("flatValueTransformer"))); - assertThat(exception.getMessage(), equalTo("valueTransformerSupplier can't be null")); - } - - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullValueTransformerWithKeySupplierOnFlatTransformValuesWithNamed() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.flatTransformValues( - (ValueTransformerWithKeySupplier>) null, - Named.as("flatValueWithKeyTransformer"))); - assertThat(exception.getMessage(), equalTo("valueTransformerSupplier can't be null")); - } - - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullValueTransformerSupplierOnFlatTransformValuesWithNamedAndStores() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.flatTransformValues( - (org.apache.kafka.streams.kstream.ValueTransformerSupplier>) null, - Named.as("flatValueTransformer"), - "stateStore")); - assertThat(exception.getMessage(), equalTo("valueTransformerSupplier can't be null")); - } - - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullValueTransformerWithKeySupplierOnFlatTransformValuesWithNamedAndStores() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.flatTransformValues( - (ValueTransformerWithKeySupplier>) null, - Named.as("flatValueWitKeyTransformer"), - "stateStore")); - assertThat(exception.getMessage(), equalTo("valueTransformerSupplier can't be null")); - } - - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullStoreNamesOnFlatTransformValuesWithFlatValueSupplier() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.flatTransformValues( - flatValueTransformerSupplier, - (String[]) null)); - assertThat(exception.getMessage(), equalTo("stateStoreNames can't be a null array")); - } - - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullStoreNamesOnFlatTransformValuesWithFlatValueWithKeySupplier() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.flatTransformValues( - flatValueTransformerWithKeySupplier, - (String[]) null)); - assertThat(exception.getMessage(), equalTo("stateStoreNames can't be a null array")); - } - - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullStoreNameOnFlatTransformValuesWithFlatValueSupplier() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.flatTransformValues( - flatValueTransformerSupplier, - (String) null)); - assertThat(exception.getMessage(), equalTo("stateStoreNames can't contain `null` as store name")); - } - - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullStoreNameOnFlatTransformValuesWithFlatValueWithKeySupplier() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.flatTransformValues( - flatValueTransformerWithKeySupplier, - (String) null)); - assertThat(exception.getMessage(), equalTo("stateStoreNames can't contain `null` as store name")); - } - - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullStoreNamesOnFlatTransformValuesWithFlatValueSupplierAndNamed() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.flatTransformValues( - flatValueTransformerSupplier, - Named.as("flatValueTransformer"), - (String[]) null)); - assertThat(exception.getMessage(), equalTo("stateStoreNames can't be a null array")); - } - - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullStoreNamesOnFlatTransformValuesWithFlatValueWithKeySupplierAndNamed() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.flatTransformValues( - flatValueTransformerWithKeySupplier, - Named.as("flatValueWitKeyTransformer"), - (String[]) null)); - assertThat(exception.getMessage(), equalTo("stateStoreNames can't be a null array")); - } - - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullStoreNameOnFlatTransformValuesWithFlatValueSupplierAndNamed() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.flatTransformValues( - flatValueTransformerSupplier, - Named.as("flatValueTransformer"), - (String) null)); - assertThat(exception.getMessage(), equalTo("stateStoreNames can't contain `null` as store name")); - } - - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullStoreNameOnFlatTransformValuesWithFlatValueWithKeySupplierAndNamed() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.flatTransformValues( - flatValueTransformerWithKeySupplier, - Named.as("flatValueWitKeyTransformer"), - (String) null)); - assertThat(exception.getMessage(), equalTo("stateStoreNames can't contain `null` as store name")); - } - - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullNamedOnFlatTransformValuesWithFlatValueSupplier() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.flatTransformValues( - flatValueTransformerSupplier, - (Named) null)); - assertThat(exception.getMessage(), equalTo("named can't be null")); - } - - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullNamedOnFlatTransformValuesWithFlatValueWithKeySupplier() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.flatTransformValues( - flatValueTransformerWithKeySupplier, - (Named) null)); - assertThat(exception.getMessage(), equalTo("named can't be null")); - } - - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullNamedOnFlatTransformValuesWithFlatValueSupplierAndStores() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.flatTransformValues( - flatValueTransformerSupplier, - (Named) null, - "storeName")); - assertThat(exception.getMessage(), equalTo("named can't be null")); - } - - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullNamedOnFlatTransformValuesWithFlatValueWithKeySupplierAndStore() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.flatTransformValues( - flatValueTransformerWithKeySupplier, - (Named) null, - "storeName")); - assertThat(exception.getMessage(), equalTo("named can't be null")); - } - @Test public void shouldNotAllowNullProcessSupplierOnProcess() { final NullPointerException exception = assertThrows( diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala index 80d43fc2315b2..9db92525ced22 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala @@ -17,14 +17,7 @@ package org.apache.kafka.streams.scala package kstream -import org.apache.kafka.streams.kstream.{ - GlobalKTable, - JoinWindows, - KStream => KStreamJ, - Printed, - ValueTransformerSupplier, - ValueTransformerWithKeySupplier -} +import org.apache.kafka.streams.kstream.{GlobalKTable, JoinWindows, KStream => KStreamJ, Printed} import org.apache.kafka.streams.processor.TopicNameExtractor import org.apache.kafka.streams.processor.api.{FixedKeyProcessorSupplier, ProcessorSupplier} import org.apache.kafka.streams.scala.FunctionsCompatConversions.{ @@ -35,9 +28,7 @@ import org.apache.kafka.streams.scala.FunctionsCompatConversions.{ MapperFromFunction, PredicateFromFunction, ValueMapperFromFunction, - ValueMapperWithKeyFromFunction, - ValueTransformerSupplierAsJava, - ValueTransformerSupplierWithKeyAsJava + ValueMapperWithKeyFromFunction } import scala.jdk.CollectionConverters._ @@ -492,98 +483,6 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { def toTable(named: Named, materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] = new KTable(inner.toTable(named, materialized)) - /** - * Transform the value of each input record into zero or more records (with possible new type) in the - * output stream. - * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input - * record value and computes a new value for it. - * In order to assign a state, the state must be created and added via `addStateStore` before they can be connected - * to the `ValueTransformer`. - * It's not required to connect global state stores that are added via `addGlobalStore`; - * read-only access to global state stores is available by default. - * - * @param valueTransformerSupplier a instance of `ValueTransformerSupplier` that generates a `ValueTransformer` - * @param stateStoreNames the names of the state stores used by the processor - * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type) - * @see `org.apache.kafka.streams.kstream.KStream#transformValues` - */ - @deprecated(since = "3.3", message = "Use processValues(FixedKeyProcessorSupplier, Named, String*) instead.") - def flatTransformValues[VR]( - valueTransformerSupplier: ValueTransformerSupplier[V, Iterable[VR]], - stateStoreNames: String* - ): KStream[K, VR] = - new KStream(inner.flatTransformValues[VR](valueTransformerSupplier.asJava, stateStoreNames: _*)) - - /** - * Transform the value of each input record into zero or more records (with possible new type) in the - * output stream. - * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input - * record value and computes a new value for it. - * In order to assign a state, the state must be created and added via `addStateStore` before they can be connected - * to the `ValueTransformer`. - * It's not required to connect global state stores that are added via `addGlobalStore`; - * read-only access to global state stores is available by default. - * - * @param valueTransformerSupplier a instance of `ValueTransformerSupplier` that generates a `ValueTransformer` - * @param named a [[Named]] config used to name the processor in the topology - * @param stateStoreNames the names of the state stores used by the processor - * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type) - * @see `org.apache.kafka.streams.kstream.KStream#transformValues` - */ - @deprecated(since = "3.3", message = "Use processValues(FixedKeyProcessorSupplier, Named, String*) instead.") - def flatTransformValues[VR]( - valueTransformerSupplier: ValueTransformerSupplier[V, Iterable[VR]], - named: Named, - stateStoreNames: String* - ): KStream[K, VR] = - new KStream(inner.flatTransformValues[VR](valueTransformerSupplier.asJava, named, stateStoreNames: _*)) - - /** - * Transform the value of each input record into zero or more records (with possible new type) in the - * output stream. - * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input - * record value and computes a new value for it. - * In order to assign a state, the state must be created and added via `addStateStore` before they can be connected - * to the `ValueTransformer`. - * It's not required to connect global state stores that are added via `addGlobalStore`; - * read-only access to global state stores is available by default. - * - * @param valueTransformerSupplier a instance of `ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey` - * @param stateStoreNames the names of the state stores used by the processor - * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type) - * @see `org.apache.kafka.streams.kstream.KStream#transformValues` - */ - @deprecated(since = "3.3", message = "Use processValues(FixedKeyProcessorSupplier, String*) instead.") - def flatTransformValues[VR]( - valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, Iterable[VR]], - stateStoreNames: String* - ): KStream[K, VR] = - new KStream(inner.flatTransformValues[VR](valueTransformerSupplier.asJava, stateStoreNames: _*)) - - /** - * Transform the value of each input record into zero or more records (with possible new type) in the - * output stream. - * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input - * record value and computes a new value for it. - * In order to assign a state, the state must be created and added via `addStateStore` before they can be connected - * to the `ValueTransformer`. - * It's not required to connect global state stores that are added via `addGlobalStore`; - * read-only access to global state stores is available by default. - * - * @param valueTransformerSupplier a instance of `ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey` - * @param named a [[Named]] config used to name the processor in the topology - * @param stateStoreNames the names of the state stores used by the processor - * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type) - * @see `org.apache.kafka.streams.kstream.KStream#transformValues` - */ - @deprecated(since = "3.3", message = "Use processValues(FixedKeyProcessorSupplier, Named, String*) instead.") - def flatTransformValues[VR]( - valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, Iterable[VR]], - named: Named, - stateStoreNames: String* - ): KStream[K, VR] = - new KStream(inner.flatTransformValues[VR](valueTransformerSupplier.asJava, named, stateStoreNames: _*)) - /** * Process all records in this stream, one record at a time, by applying a `Processor` (provided by the given * `processorSupplier`). diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala index 88fe8e8980d15..6a0b6c1b0e988 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala @@ -18,17 +18,9 @@ package org.apache.kafka.streams.scala.kstream import java.time.Duration.ofSeconds import java.time.{Duration, Instant} -import org.apache.kafka.streams.kstream.{ - JoinWindows, - Named, - ValueTransformer, - ValueTransformerSupplier, - ValueTransformerWithKey, - ValueTransformerWithKeySupplier -} +import org.apache.kafka.streams.kstream.{JoinWindows, Named} import org.apache.kafka.streams.processor.api -import org.apache.kafka.streams.processor.ProcessorContext -import org.apache.kafka.streams.processor.api.{Processor, ProcessorSupplier} +import org.apache.kafka.streams.processor.api.{FixedKeyRecord, Processor, ProcessorSupplier} import org.apache.kafka.streams.scala.ImplicitConversions._ import org.apache.kafka.streams.scala.serialization.Serdes._ import org.apache.kafka.streams.scala.StreamsBuilder @@ -40,7 +32,6 @@ import org.junit.jupiter.api.Test import java.util import java.util.Collections -import scala.annotation.nowarn import scala.jdk.CollectionConverters._ class KStreamTest extends TestDriver { @@ -287,64 +278,29 @@ class KStreamTest extends TestDriver { testDriver.close() } - @nowarn - @Test - def testCorrectlyFlatTransformValuesInRecords(): Unit = { - class TestTransformer extends ValueTransformer[String, Iterable[String]] { - override def init(context: ProcessorContext): Unit = {} - - override def transform(value: String): Iterable[String] = - Array(s"$value-transformed") - - override def close(): Unit = {} - } - val builder = new StreamsBuilder() - val sourceTopic = "source" - val sinkTopic = "sink" - - val stream = builder.stream[String, String](sourceTopic) - stream - .flatTransformValues(new ValueTransformerSupplier[String, Iterable[String]] { - def get(): ValueTransformer[String, Iterable[String]] = - new TestTransformer - }) - .to(sinkTopic) - - val now = Instant.now() - val testDriver = createTestDriver(builder, now) - val testInput = testDriver.createInput[String, String](sourceTopic) - val testOutput = testDriver.createOutput[String, String](sinkTopic) - - testInput.pipeInput("1", "value", now) - - assertEquals("value-transformed", testOutput.readValue) - - assertTrue(testOutput.isEmpty) - - testDriver.close() - } - - @nowarn @Test - def testCorrectlyFlatTransformValuesInRecordsWithKey(): Unit = { - class TestTransformer extends ValueTransformerWithKey[String, String, Iterable[String]] { - override def init(context: ProcessorContext): Unit = {} - - override def transform(key: String, value: String): Iterable[String] = - Array(s"$value-transformed-$key") + def testProcessValuesCorrectlyRecords(): Unit = { + val processorSupplier: api.FixedKeyProcessorSupplier[String, String, String] = + () => + new api.FixedKeyProcessor[String, String, String] { + private var context: api.FixedKeyProcessorContext[String, String] = _ + + override def init(context: api.FixedKeyProcessorContext[String, String]): Unit = + this.context = context + + override def process(record: FixedKeyRecord[String, String]): Unit = { + val processedValue = s"${record.value()}-processed" + context.forward(record.withValue(processedValue)) + } + } - override def close(): Unit = {} - } val builder = new StreamsBuilder() val sourceTopic = "source" val sinkTopic = "sink" val stream = builder.stream[String, String](sourceTopic) stream - .flatTransformValues(new ValueTransformerWithKeySupplier[String, String, Iterable[String]] { - def get(): ValueTransformerWithKey[String, String, Iterable[String]] = - new TestTransformer - }) + .processValues(processorSupplier) .to(sinkTopic) val now = Instant.now() @@ -354,7 +310,9 @@ class KStreamTest extends TestDriver { testInput.pipeInput("1", "value", now) - assertEquals("value-transformed-1", testOutput.readValue) + val result = testOutput.readKeyValue() + assertEquals("value-processed", result.value) + assertEquals("1", result.key) assertTrue(testOutput.isEmpty)