diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/aggregates.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/aggregates.scala index bdb37f86bee..6f09a3cf5c1 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/aggregates.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/aggregates.scala @@ -346,6 +346,12 @@ object aggregates { _.computeStoredType(_) ) + def expand(key: String, aggregator: Aggregator): MapAggregator = { + val expandedMap: Map[String, Aggregator] = (scalaFields + (key -> aggregator)) + + new MapAggregator(expandedMap.asJava) + } + private def computeTypeByFields( input: TypingResult, objType: TypedClass,