From d0f0d3eeafbe2500f37ead5eb79c4dacff2dc378 Mon Sep 17 00:00:00 2001 From: Piotr Przybylski Date: Tue, 17 Dec 2024 11:26:05 +0100 Subject: [PATCH] Use `RichFunction.open(OpenContext)` instead of `RichFunction.open(Configuration)` (#7347) --- docs/MigrationGuide.md | 2 ++ .../engine/flink/api/datastream/StatefulFunction.scala | 5 ++--- .../flink/api/exception/WithExceptionHandler.scala | 5 ++--- .../api/process/FlinkContextInitializingFunction.scala | 5 ++--- .../api/process/FlinkLazyParameterFunctionHelper.scala | 7 +++---- .../engine/flink/api/process/RichLifecycleFunction.scala | 5 ++--- .../engine/flink/api/state/EvictableState.scala | 9 ++++----- .../util/function/CoProcessFunctionInterceptor.scala | 7 +++---- .../flink/util/function/ProcessFunctionInterceptor.scala | 8 +++----- ...ExtraWindowWhenNoDataTumblingAggregatorFunction.scala | 5 ++--- .../aggregate/EmitWhenEventLeftAggregatorFunction.scala | 7 +++---- .../transformer/aggregate/EnrichingWithKeyFunction.scala | 5 ++--- .../aggregate/OnEventTriggerWindowOperator.scala | 5 ++--- .../engine/flink/util/transformer/DelayTransformer.scala | 4 ++-- .../util/transformer/PreviousValueTransformer.scala | 7 +++---- .../engine/flink/table/aggregate/TableAggregation.scala | 5 ++--- .../nussknacker/engine/process/ProcessPartFunction.scala | 9 ++++----- .../process/registrar/AsyncInterpretationFunction.scala | 6 +++--- .../engine/process/registrar/SourceMetricsFunction.scala | 4 ++-- .../engine/kafka/source/flink/FlinkKafkaSource.scala | 7 +++---- .../sample/transformer/ConstantStateTransformer.scala | 6 +++--- .../sink/flink/FlinkKafkaUniversalSink.scala | 7 +++---- 22 files changed, 57 insertions(+), 73 deletions(-) diff --git a/docs/MigrationGuide.md b/docs/MigrationGuide.md index 46857f82e8b..9137f2400d3 100644 --- a/docs/MigrationGuide.md +++ b/docs/MigrationGuide.md @@ -26,6 +26,8 @@ To see the biggest differences please consult the [changelog](Changelog.md). * `def actionTooltips(processStatus: ProcessStatus): Map[ScenarioActionName, String]` - allows to define custom tooltips for actions, if not defined the default is still used * modified method: * `def statusActions(processStatus: ProcessStatus): List[ScenarioActionName]` - changed argument, to include information about latest and deployed versions +* [#7347](https://github.com/TouK/nussknacker/pull/7347) All calls to `org.apache.flink.api.common.functions.RichFunction.open(Configuration)`, + which is deprecated, were replaced with calls to `org.apache.flink.api.common.functions.RichFunction.open(OpenContext)` ## In version 1.18.0 diff --git a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/datastream/StatefulFunction.scala b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/datastream/StatefulFunction.scala index c67806ebdb5..0f545377e01 100644 --- a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/datastream/StatefulFunction.scala +++ b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/datastream/StatefulFunction.scala @@ -18,10 +18,9 @@ package pl.touk.nussknacker.engine.flink.api.datastream import org.apache.flink.annotation.Public -import org.apache.flink.api.common.functions.RichFunction +import org.apache.flink.api.common.functions.{OpenContext, RichFunction} import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.api.common.typeutils.TypeSerializer -import org.apache.flink.configuration.Configuration /** * Trait implementing the functionality necessary to apply stateful functions in RichFunctions @@ -44,7 +43,7 @@ trait StatefulFunction[I, O, S] extends RichFunction { o } - override def open(c: Configuration) = { + override def open(openContext: OpenContext): Unit = { val info = new ValueStateDescriptor[S]("state", stateSerializer) state = getRuntimeContext().getState(info) } diff --git a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/exception/WithExceptionHandler.scala b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/exception/WithExceptionHandler.scala index 77a506f3e3a..89de86c9e55 100644 --- a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/exception/WithExceptionHandler.scala +++ b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/exception/WithExceptionHandler.scala @@ -1,7 +1,6 @@ package pl.touk.nussknacker.engine.flink.api.exception -import org.apache.flink.api.common.functions.{RichFunction, RuntimeContext} -import org.apache.flink.configuration.Configuration +import org.apache.flink.api.common.functions.{OpenContext, RichFunction, RuntimeContext} /** * Helper for using exception handler. @@ -15,7 +14,7 @@ trait WithExceptionHandler { protected var exceptionHandler: ExceptionHandler = _ - override def open(parameters: Configuration): Unit = { + override def open(openContext: OpenContext): Unit = { exceptionHandler = exceptionHandlerPreparer(getRuntimeContext) } diff --git a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/process/FlinkContextInitializingFunction.scala b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/process/FlinkContextInitializingFunction.scala index b4eaadee252..51e1d741f31 100644 --- a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/process/FlinkContextInitializingFunction.scala +++ b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/process/FlinkContextInitializingFunction.scala @@ -1,7 +1,6 @@ package pl.touk.nussknacker.engine.flink.api.process -import org.apache.flink.api.common.functions.{RichMapFunction, RuntimeContext} -import org.apache.flink.configuration.Configuration +import org.apache.flink.api.common.functions.{OpenContext, RichMapFunction, RuntimeContext} import pl.touk.nussknacker.engine.api.Context import pl.touk.nussknacker.engine.api.process.{ContextInitializer, ContextInitializingFunction} import pl.touk.nussknacker.engine.api.runtimecontext.EngineRuntimeContext @@ -14,7 +13,7 @@ class FlinkContextInitializingFunction[Raw]( private var initializingStrategy: ContextInitializingFunction[Raw] = _ - override def open(parameters: Configuration): Unit = { + override def open(openContext: OpenContext): Unit = { val contextIdGenerator = convertToEngineRuntimeContext(getRuntimeContext).contextIdGenerator(nodeId) initializingStrategy = contextInitializer.initContext(contextIdGenerator) } diff --git a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/process/FlinkLazyParameterFunctionHelper.scala b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/process/FlinkLazyParameterFunctionHelper.scala index 3399986c49a..de801f542c0 100644 --- a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/process/FlinkLazyParameterFunctionHelper.scala +++ b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/process/FlinkLazyParameterFunctionHelper.scala @@ -1,7 +1,6 @@ package pl.touk.nussknacker.engine.flink.api.process import org.apache.flink.api.common.functions._ -import org.apache.flink.configuration.Configuration import org.apache.flink.util.Collector import pl.touk.nussknacker.engine.api.LazyParameter.Evaluate import pl.touk.nussknacker.engine.api._ @@ -89,8 +88,8 @@ trait OneParamLazyParameterFunction[T <: AnyRef] extends LazyParameterInterprete protected def evaluateParameter(ctx: Context): T = _evaluateParameter(ctx) - override def open(parameters: Configuration): Unit = { - super.open(parameters) + override def open(openContext: OpenContext): Unit = { + super.open(openContext) _evaluateParameter = toEvaluateFunctionConverter.toEvaluateFunction(parameter) } @@ -117,7 +116,7 @@ trait LazyParameterInterpreterFunction { self: RichFunction => } // TODO: how can we make sure this will invoke super.open(...) (can't do it directly...) - override def open(parameters: Configuration): Unit = { + override def open(openContext: OpenContext): Unit = { toEvaluateFunctionConverter = lazyParameterHelper.createInterpreter(getRuntimeContext) exceptionHandler = lazyParameterHelper.exceptionHandler(getRuntimeContext) } diff --git a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/process/RichLifecycleFunction.scala b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/process/RichLifecycleFunction.scala index cb73532c81e..81d692d2d32 100644 --- a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/process/RichLifecycleFunction.scala +++ b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/process/RichLifecycleFunction.scala @@ -1,7 +1,6 @@ package pl.touk.nussknacker.engine.flink.api.process -import org.apache.flink.api.common.functions.{AbstractRichFunction, MapFunction, RuntimeContext} -import org.apache.flink.configuration.Configuration +import org.apache.flink.api.common.functions.{AbstractRichFunction, MapFunction, OpenContext, RuntimeContext} import pl.touk.nussknacker.engine.api.Lifecycle import pl.touk.nussknacker.engine.api.runtimecontext.EngineRuntimeContext @@ -11,7 +10,7 @@ abstract class RichLifecycleFunction extends AbstractRichFunction { protected val convertToEngineRuntimeContext: RuntimeContext => EngineRuntimeContext - override def open(parameters: Configuration): Unit = { + override def open(openContext: OpenContext): Unit = { lifecycle.open(convertToEngineRuntimeContext(getRuntimeContext)) } diff --git a/engine/flink/components-utils/src/main/scala/pl/touk/nussknacker/engine/flink/api/state/EvictableState.scala b/engine/flink/components-utils/src/main/scala/pl/touk/nussknacker/engine/flink/api/state/EvictableState.scala index 45d17502995..0068f849483 100644 --- a/engine/flink/components-utils/src/main/scala/pl/touk/nussknacker/engine/flink/api/state/EvictableState.scala +++ b/engine/flink/components-utils/src/main/scala/pl/touk/nussknacker/engine/flink/api/state/EvictableState.scala @@ -1,8 +1,7 @@ package pl.touk.nussknacker.engine.flink.api.state -import org.apache.flink.api.common.functions.RichFunction +import org.apache.flink.api.common.functions.{OpenContext, RichFunction} import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} -import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.TimerService import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.functions.co.CoProcessFunction @@ -18,8 +17,8 @@ abstract class EvictableStateFunction[In, Out, StateType] extends KeyedProcessFu protected def stateDescriptor: ValueStateDescriptor[StateType] - override def open(parameters: Configuration): Unit = { - super.open(parameters) + override def open(openContext: OpenContext): Unit = { + super.open(openContext) lastEventTimeForKey = getRuntimeContext.getState[java.lang.Long]( new ValueStateDescriptor[java.lang.Long]("timers", classOf[java.lang.Long]) ) @@ -91,7 +90,7 @@ trait LatelyEvictableStateFunctionMixin[StateType] extends RichFunction with Sta @transient protected var state: ValueState[StateType] = _ - override def open(parameters: Configuration): Unit = { + override def open(openContext: OpenContext): Unit = { latestEvictionTimeForKey = getRuntimeContext.getState[java.lang.Long]( new ValueStateDescriptor[java.lang.Long]("timers", classOf[java.lang.Long]) ) diff --git a/engine/flink/components-utils/src/main/scala/pl/touk/nussknacker/engine/flink/util/function/CoProcessFunctionInterceptor.scala b/engine/flink/components-utils/src/main/scala/pl/touk/nussknacker/engine/flink/util/function/CoProcessFunctionInterceptor.scala index bd38add02ed..8a0b58d5561 100644 --- a/engine/flink/components-utils/src/main/scala/pl/touk/nussknacker/engine/flink/util/function/CoProcessFunctionInterceptor.scala +++ b/engine/flink/components-utils/src/main/scala/pl/touk/nussknacker/engine/flink/util/function/CoProcessFunctionInterceptor.scala @@ -1,7 +1,6 @@ package pl.touk.nussknacker.engine.flink.util.function -import org.apache.flink.api.common.functions.RuntimeContext -import org.apache.flink.configuration.Configuration +import org.apache.flink.api.common.functions.{OpenContext, RuntimeContext} import org.apache.flink.streaming.api.functions.co.CoProcessFunction import org.apache.flink.util.Collector @@ -12,8 +11,8 @@ import org.apache.flink.util.Collector abstract class CoProcessFunctionInterceptor[IN1, IN2, OUT](underlying: CoProcessFunction[IN1, IN2, OUT]) extends CoProcessFunction[IN1, IN2, OUT] { - override def open(parameters: Configuration): Unit = { - underlying.open(parameters) + override def open(openContext: OpenContext): Unit = { + underlying.open(openContext) } override def setRuntimeContext(ctx: RuntimeContext): Unit = { diff --git a/engine/flink/components-utils/src/main/scala/pl/touk/nussknacker/engine/flink/util/function/ProcessFunctionInterceptor.scala b/engine/flink/components-utils/src/main/scala/pl/touk/nussknacker/engine/flink/util/function/ProcessFunctionInterceptor.scala index bc4096fde18..41e7b1aef94 100644 --- a/engine/flink/components-utils/src/main/scala/pl/touk/nussknacker/engine/flink/util/function/ProcessFunctionInterceptor.scala +++ b/engine/flink/components-utils/src/main/scala/pl/touk/nussknacker/engine/flink/util/function/ProcessFunctionInterceptor.scala @@ -1,16 +1,14 @@ package pl.touk.nussknacker.engine.flink.util.function -import org.apache.flink.api.common.functions.RuntimeContext -import org.apache.flink.configuration.Configuration +import org.apache.flink.api.common.functions.{OpenContext, RuntimeContext} import org.apache.flink.streaming.api.functions.KeyedProcessFunction -import org.apache.flink.streaming.api.functions.co.CoProcessFunction import org.apache.flink.util.Collector abstract class ProcessFunctionInterceptor[IN, OUT](underlying: KeyedProcessFunction[String, IN, OUT]) extends KeyedProcessFunction[String, IN, OUT] { - override def open(parameters: Configuration): Unit = { - underlying.open(parameters) + override def open(openContext: OpenContext): Unit = { + underlying.open(openContext) } override def setRuntimeContext(ctx: RuntimeContext): Unit = { diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/EmitExtraWindowWhenNoDataTumblingAggregatorFunction.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/EmitExtraWindowWhenNoDataTumblingAggregatorFunction.scala index c6fd168e6df..dc5fcaf0c2d 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/EmitExtraWindowWhenNoDataTumblingAggregatorFunction.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/EmitExtraWindowWhenNoDataTumblingAggregatorFunction.scala @@ -1,9 +1,8 @@ package pl.touk.nussknacker.engine.flink.util.transformer.aggregate -import org.apache.flink.api.common.functions.RuntimeContext +import org.apache.flink.api.common.functions.{OpenContext, RuntimeContext} import org.apache.flink.api.common.state.ValueState import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.TimerService import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.util.Collector @@ -47,7 +46,7 @@ class EmitExtraWindowWhenNoDataTumblingAggregatorFunction[MapT[K, V]]( @transient private var contextIdGenerator: ContextIdGenerator = _ - override def open(parameters: Configuration): Unit = { + override def open(openContext: OpenContext): Unit = { state = getRuntimeContext.getState(stateDescriptor) contextIdGenerator = convertToEngineRuntimeContext(getRuntimeContext).contextIdGenerator(nodeId.id) } diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/EmitWhenEventLeftAggregatorFunction.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/EmitWhenEventLeftAggregatorFunction.scala index 8955783b8cb..d7605bfb92e 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/EmitWhenEventLeftAggregatorFunction.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/EmitWhenEventLeftAggregatorFunction.scala @@ -1,8 +1,7 @@ package pl.touk.nussknacker.engine.flink.util.transformer.aggregate -import org.apache.flink.api.common.functions.RuntimeContext +import org.apache.flink.api.common.functions.{OpenContext, RuntimeContext} import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.TimerService import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.util.Collector @@ -42,8 +41,8 @@ class EmitWhenEventLeftAggregatorFunction[MapT[K, V]]( type FlinkOnTimerCtx = KeyedProcessFunction[String, ValueWithContext[StringKeyedValue[AnyRef]], ValueWithContext[AnyRef]]#OnTimerContext - override def open(parameters: Configuration): Unit = { - super.open(parameters) + override def open(openContext: OpenContext): Unit = { + super.open(openContext) contextIdGenerator = convertToEngineRuntimeContext(getRuntimeContext).contextIdGenerator(nodeId.id) } diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/EnrichingWithKeyFunction.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/EnrichingWithKeyFunction.scala index cdc45efbe7f..97824a1ccef 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/EnrichingWithKeyFunction.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/EnrichingWithKeyFunction.scala @@ -1,7 +1,6 @@ package pl.touk.nussknacker.engine.flink.util.transformer.aggregate -import org.apache.flink.api.common.functions.RuntimeContext -import org.apache.flink.configuration.Configuration +import org.apache.flink.api.common.functions.{OpenContext, RuntimeContext} import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector @@ -18,7 +17,7 @@ class EnrichingWithKeyFunction(convertToEngineRuntimeContext: RuntimeContext => @transient private var contextIdGenerator: ContextIdGenerator = _ - override def open(parameters: Configuration): Unit = { + override def open(openContext: OpenContext): Unit = { contextIdGenerator = convertToEngineRuntimeContext(getRuntimeContext).contextIdGenerator(nodeId) } diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/OnEventTriggerWindowOperator.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/OnEventTriggerWindowOperator.scala index d9026604ef2..1ca336bcdff 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/OnEventTriggerWindowOperator.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/OnEventTriggerWindowOperator.scala @@ -1,9 +1,8 @@ package pl.touk.nussknacker.engine.flink.util.transformer.aggregate import com.github.ghik.silencer.silent -import org.apache.flink.api.common.functions.{AggregateFunction, RuntimeContext} +import org.apache.flink.api.common.functions.{AggregateFunction, OpenContext, RuntimeContext} import org.apache.flink.api.common.state.AggregatingStateDescriptor -import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.datastream.{KeyedStream, SingleOutputStreamOperator} import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner @@ -102,7 +101,7 @@ private class ValueEmittingWindowFunction( @transient private var contextIdGenerator: ContextIdGenerator = _ - override def open(parameters: Configuration): Unit = { + override def open(openContext: OpenContext): Unit = { contextIdGenerator = convertToEngineRuntimeContext(getRuntimeContext).contextIdGenerator(nodeId) } diff --git a/engine/flink/components/base/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/DelayTransformer.scala b/engine/flink/components/base/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/DelayTransformer.scala index 2e9cd878090..820c3bb8678 100644 --- a/engine/flink/components/base/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/DelayTransformer.scala +++ b/engine/flink/components/base/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/DelayTransformer.scala @@ -1,9 +1,9 @@ package pl.touk.nussknacker.engine.flink.util.transformer +import org.apache.flink.api.common.functions.OpenContext import org.apache.flink.api.common.state.{MapState, MapStateDescriptor} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.ListTypeInfo -import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.util.Collector @@ -75,7 +75,7 @@ class DelayFunction(nodeCtx: FlinkCustomNodeContext, delay: Duration) @transient private var state: MapState[Long, java.util.List[api.Context]] = _ - override def open(config: Configuration): Unit = { + override def open(openContext: OpenContext): Unit = { state = getRuntimeContext.getMapState(descriptor) } diff --git a/engine/flink/components/base/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/PreviousValueTransformer.scala b/engine/flink/components/base/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/PreviousValueTransformer.scala index ad14927bf49..88f9d144bc2 100644 --- a/engine/flink/components/base/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/PreviousValueTransformer.scala +++ b/engine/flink/components/base/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/PreviousValueTransformer.scala @@ -1,9 +1,8 @@ package pl.touk.nussknacker.engine.flink.util.transformer -import org.apache.flink.api.common.functions.RichFlatMapFunction +import org.apache.flink.api.common.functions.{OpenContext, RichFlatMapFunction} import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.util.Collector import pl.touk.nussknacker.engine.api._ @@ -46,8 +45,8 @@ case object PreviousValueTransformer extends CustomStreamTransformer with Explic private[this] var state: ValueState[Value] = _ - override def open(c: Configuration): Unit = { - super.open(c) + override def open(openContext: OpenContext): Unit = { + super.open(openContext) val info = new ValueStateDescriptor[Value]("state", typeInformation) state = getRuntimeContext.getState(info) } diff --git a/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/aggregate/TableAggregation.scala b/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/aggregate/TableAggregation.scala index 8b21893cf9f..8c0eb79a15b 100644 --- a/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/aggregate/TableAggregation.scala +++ b/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/aggregate/TableAggregation.scala @@ -1,8 +1,7 @@ package pl.touk.nussknacker.engine.flink.table.aggregate -import org.apache.flink.api.common.functions.{FlatMapFunction, RuntimeContext} +import org.apache.flink.api.common.functions.{FlatMapFunction, OpenContext, RuntimeContext} import org.apache.flink.api.common.typeinfo.{TypeInformation, Types} -import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.table.api.Expressions.{$, call} @@ -116,7 +115,7 @@ class TableAggregation( @transient private var contextIdGenerator: ContextIdGenerator = _ - override def open(configuration: Configuration): Unit = { + override def open(openContext: OpenContext): Unit = { contextIdGenerator = convertToEngineRuntimeContext(getRuntimeContext).contextIdGenerator(nodeId.toString) } diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ProcessPartFunction.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ProcessPartFunction.scala index 5270feaaccb..37cf43d7524 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ProcessPartFunction.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ProcessPartFunction.scala @@ -1,7 +1,6 @@ package pl.touk.nussknacker.engine.process -import org.apache.flink.api.common.functions.RichFunction -import org.apache.flink.configuration.Configuration +import org.apache.flink.api.common.functions.{OpenContext, RichFunction} import pl.touk.nussknacker.engine.graph.node.NodeData import pl.touk.nussknacker.engine.process.compiler.FlinkProcessCompilerData import pl.touk.nussknacker.engine.process.exception.FlinkExceptionHandler @@ -22,8 +21,8 @@ trait ProcessPartFunction extends ExceptionHandlerFunction { } } - override def open(parameters: Configuration): Unit = { - super.open(parameters) + override def open(openContext: OpenContext): Unit = { + super.open(openContext) compilerData.open(getRuntimeContext, nodesUsed) } @@ -46,7 +45,7 @@ trait ExceptionHandlerFunction extends RichFunction { } } - override def open(parameters: Configuration): Unit = { + override def open(openContext: OpenContext): Unit = { exceptionHandler = compilerData.prepareExceptionHandler(getRuntimeContext) } diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/AsyncInterpretationFunction.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/AsyncInterpretationFunction.scala index 75f887c6c18..6901b7ccb1f 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/AsyncInterpretationFunction.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/AsyncInterpretationFunction.scala @@ -3,7 +3,7 @@ package pl.touk.nussknacker.engine.process.registrar import cats.effect.IO import cats.effect.unsafe.IORuntime import com.typesafe.scalalogging.LazyLogging -import org.apache.flink.configuration.Configuration +import org.apache.flink.api.common.functions.OpenContext import org.apache.flink.streaming.api.functions.async.{ResultFuture, RichAsyncFunction} import pl.touk.nussknacker.engine.InterpretationResult import pl.touk.nussknacker.engine.Interpreter.FutureShape @@ -35,8 +35,8 @@ private[registrar] class AsyncInterpretationFunction( private var serviceExecutionContext: ServiceExecutionContext = _ - override def open(parameters: Configuration): Unit = { - super.open(parameters) + override def open(openContext: OpenContext): Unit = { + super.open(openContext) getRuntimeContext.registerUserCodeClassLoaderReleaseHookIfAbsent( "closeAsyncExecutionContext", diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/SourceMetricsFunction.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/SourceMetricsFunction.scala index 5174ba94be6..3a0cd4d9427 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/SourceMetricsFunction.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/SourceMetricsFunction.scala @@ -1,6 +1,6 @@ package pl.touk.nussknacker.engine.process.registrar -import org.apache.flink.configuration.Configuration +import org.apache.flink.api.common.functions.OpenContext import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.util.Collector import pl.touk.nussknacker.engine.api.process.ComponentUseCase @@ -12,7 +12,7 @@ private[registrar] class SourceMetricsFunction[T](sourceId: String, componentUse @transient private var metrics: OneSourceMetrics = _ - override def open(parameters: Configuration): Unit = { + override def open(openContext: OpenContext): Unit = { metrics = new OneSourceMetrics(sourceId) val metricsProvider = createMetricsProvider(componentUseCase, getRuntimeContext) metrics.registerOwnMetrics(metricsProvider) diff --git a/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/source/flink/FlinkKafkaSource.scala b/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/source/flink/FlinkKafkaSource.scala index 3deb771269b..ee856a4e369 100644 --- a/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/source/flink/FlinkKafkaSource.scala +++ b/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/source/flink/FlinkKafkaSource.scala @@ -3,8 +3,7 @@ package pl.touk.nussknacker.engine.kafka.source.flink import cats.data.NonEmptyList import com.github.ghik.silencer.silent import com.typesafe.scalalogging.LazyLogging -import org.apache.flink.api.common.functions.RuntimeContext -import org.apache.flink.configuration.Configuration +import org.apache.flink.api.common.functions.{OpenContext, RuntimeContext} import org.apache.flink.streaming.api.datastream.DataStreamSource import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.api.functions.source.SourceFunction @@ -166,9 +165,9 @@ class FlinkKafkaConsumerHandlingExceptions[T]( protected var exceptionPurposeContextIdGenerator: ContextIdGenerator = _ - override def open(parameters: Configuration): Unit = { + override def open(openContext: OpenContext): Unit = { patchRestoredState() - super.open(parameters) + super.open(openContext) exceptionHandler = exceptionHandlerPreparer(getRuntimeContext) exceptionPurposeContextIdGenerator = convertToEngineRuntimeContext(getRuntimeContext).contextIdGenerator(nodeId.id) deserializationSchema.setExceptionHandlingData(exceptionHandler, exceptionPurposeContextIdGenerator, nodeId) diff --git a/engine/flink/management/dev-model/src/main/scala/pl/touk/nussknacker/engine/management/sample/transformer/ConstantStateTransformer.scala b/engine/flink/management/dev-model/src/main/scala/pl/touk/nussknacker/engine/management/sample/transformer/ConstantStateTransformer.scala index a2f782e5bde..2b6de4ae659 100644 --- a/engine/flink/management/dev-model/src/main/scala/pl/touk/nussknacker/engine/management/sample/transformer/ConstantStateTransformer.scala +++ b/engine/flink/management/dev-model/src/main/scala/pl/touk/nussknacker/engine/management/sample/transformer/ConstantStateTransformer.scala @@ -1,6 +1,6 @@ package pl.touk.nussknacker.engine.management.sample.transformer -import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.common.functions.{OpenContext, RichMapFunction} import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.configuration.Configuration @@ -22,8 +22,8 @@ case class ConstantStateTransformer[T: TypeInformation](defaultValue: T) extends var constantState: ValueState[T] = _ - override def open(parameters: Configuration): Unit = { - super.open(parameters) + override def open(openContext: OpenContext): Unit = { + super.open(openContext) val descriptor = new ValueStateDescriptor[T]("constantState", implicitly[TypeInformation[T]]) constantState = getRuntimeContext.getState(descriptor) } diff --git a/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/flink/FlinkKafkaUniversalSink.scala b/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/flink/FlinkKafkaUniversalSink.scala index 4ef5e35d98a..7c3531147a5 100644 --- a/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/flink/FlinkKafkaUniversalSink.scala +++ b/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/flink/FlinkKafkaUniversalSink.scala @@ -2,9 +2,8 @@ package pl.touk.nussknacker.engine.schemedkafka.sink.flink import com.typesafe.scalalogging.LazyLogging import io.confluent.kafka.schemaregistry.ParsedSchema -import org.apache.flink.api.common.functions.{RichMapFunction, RuntimeContext} +import org.apache.flink.api.common.functions.{OpenContext, RichMapFunction, RuntimeContext} import org.apache.flink.api.common.typeinfo.{TypeInformation, Types} -import org.apache.flink.configuration.Configuration import org.apache.flink.formats.avro.typeutils.NkSerializableParsedSchema import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink} import org.apache.flink.streaming.api.functions.sink.SinkFunction @@ -81,8 +80,8 @@ class FlinkKafkaUniversalSink( @transient private var encodeRecord: Any => AnyRef = _ - override def open(parameters: Configuration): Unit = { - super.open(parameters) + override def open(openContext: OpenContext): Unit = { + super.open(openContext) encodeRecord = schemaSupportDispatcher .forSchemaType(schema.getParsedSchema.schemaType()) .formValueEncoder(schema.getParsedSchema, validationMode)