Skip to content

Commit

Permalink
Improvement: Add missing TypeInformation
Browse files Browse the repository at this point in the history
  • Loading branch information
lciolecki committed Sep 26, 2024
1 parent 0e7a583 commit a9932f6
Showing 1 changed file with 9 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import pl.touk.nussknacker.engine.api.validation.ValidationMode
import pl.touk.nussknacker.engine.api.{Context, LazyParameter, ValueWithContext}
import pl.touk.nussknacker.engine.flink.api.exception.{ExceptionHandler, WithExceptionHandler}
import pl.touk.nussknacker.engine.flink.api.process.{FlinkCustomNodeContext, FlinkSink}
import pl.touk.nussknacker.engine.flink.typeinformation.KeyedValueType
import pl.touk.nussknacker.engine.flink.util.keyed
import pl.touk.nussknacker.engine.flink.util.keyed.KeyedValueMapper
import pl.touk.nussknacker.engine.kafka.serialization.KafkaSerializationSchema
Expand Down Expand Up @@ -40,11 +41,17 @@ class FlinkKafkaUniversalSink(
override def registerSink(
dataStream: DataStream[ValueWithContext[Value]],
flinkNodeContext: FlinkCustomNodeContext
): DataStreamSink[_] =
): DataStreamSink[_] = {
val avroRecordFunctionType = KeyedValueType.info(
flinkNodeContext.typeInformationDetection.forType[AnyRef](key.returnType),
flinkNodeContext.typeInformationDetection.forType[AnyRef](value.returnType)
)

dataStream
.map(new EncodeAvroRecordFunction(flinkNodeContext))
.map(new EncodeAvroRecordFunction(flinkNodeContext), avroRecordFunctionType)
.filter(_.value != null)
.addSink(toFlinkFunction)
}

def prepareValue(
ds: DataStream[Context],
Expand Down

0 comments on commit a9932f6

Please sign in to comment.