Skip to content

Commit

Permalink
Use RichFunction.open(OpenContext) instead of `RichFunction.open(Co…
Browse files Browse the repository at this point in the history
…nfiguration)` (#7347)
  • Loading branch information
piotrp authored Dec 17, 2024
1 parent 7aa25fb commit d0f0d3e
Show file tree
Hide file tree
Showing 22 changed files with 57 additions and 73 deletions.
2 changes: 2 additions & 0 deletions docs/MigrationGuide.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ To see the biggest differences please consult the [changelog](Changelog.md).
* `def actionTooltips(processStatus: ProcessStatus): Map[ScenarioActionName, String]` - allows to define custom tooltips for actions, if not defined the default is still used
* modified method:
* `def statusActions(processStatus: ProcessStatus): List[ScenarioActionName]` - changed argument, to include information about latest and deployed versions
* [#7347](https://github.com/TouK/nussknacker/pull/7347) All calls to `org.apache.flink.api.common.functions.RichFunction.open(Configuration)`,
which is deprecated, were replaced with calls to `org.apache.flink.api.common.functions.RichFunction.open(OpenContext)`

## In version 1.18.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@
package pl.touk.nussknacker.engine.flink.api.datastream

import org.apache.flink.annotation.Public
import org.apache.flink.api.common.functions.RichFunction
import org.apache.flink.api.common.functions.{OpenContext, RichFunction}
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.configuration.Configuration

/**
* Trait implementing the functionality necessary to apply stateful functions in RichFunctions
Expand All @@ -44,7 +43,7 @@ trait StatefulFunction[I, O, S] extends RichFunction {
o
}

override def open(c: Configuration) = {
override def open(openContext: OpenContext): Unit = {
val info = new ValueStateDescriptor[S]("state", stateSerializer)
state = getRuntimeContext().getState(info)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package pl.touk.nussknacker.engine.flink.api.exception

import org.apache.flink.api.common.functions.{RichFunction, RuntimeContext}
import org.apache.flink.configuration.Configuration
import org.apache.flink.api.common.functions.{OpenContext, RichFunction, RuntimeContext}

/**
* Helper for using exception handler.
Expand All @@ -15,7 +14,7 @@ trait WithExceptionHandler {

protected var exceptionHandler: ExceptionHandler = _

override def open(parameters: Configuration): Unit = {
override def open(openContext: OpenContext): Unit = {
exceptionHandler = exceptionHandlerPreparer(getRuntimeContext)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package pl.touk.nussknacker.engine.flink.api.process

import org.apache.flink.api.common.functions.{RichMapFunction, RuntimeContext}
import org.apache.flink.configuration.Configuration
import org.apache.flink.api.common.functions.{OpenContext, RichMapFunction, RuntimeContext}
import pl.touk.nussknacker.engine.api.Context
import pl.touk.nussknacker.engine.api.process.{ContextInitializer, ContextInitializingFunction}
import pl.touk.nussknacker.engine.api.runtimecontext.EngineRuntimeContext
Expand All @@ -14,7 +13,7 @@ class FlinkContextInitializingFunction[Raw](

private var initializingStrategy: ContextInitializingFunction[Raw] = _

override def open(parameters: Configuration): Unit = {
override def open(openContext: OpenContext): Unit = {
val contextIdGenerator = convertToEngineRuntimeContext(getRuntimeContext).contextIdGenerator(nodeId)
initializingStrategy = contextInitializer.initContext(contextIdGenerator)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package pl.touk.nussknacker.engine.flink.api.process

import org.apache.flink.api.common.functions._
import org.apache.flink.configuration.Configuration
import org.apache.flink.util.Collector
import pl.touk.nussknacker.engine.api.LazyParameter.Evaluate
import pl.touk.nussknacker.engine.api._
Expand Down Expand Up @@ -89,8 +88,8 @@ trait OneParamLazyParameterFunction[T <: AnyRef] extends LazyParameterInterprete
protected def evaluateParameter(ctx: Context): T =
_evaluateParameter(ctx)

override def open(parameters: Configuration): Unit = {
super.open(parameters)
override def open(openContext: OpenContext): Unit = {
super.open(openContext)
_evaluateParameter = toEvaluateFunctionConverter.toEvaluateFunction(parameter)
}

Expand All @@ -117,7 +116,7 @@ trait LazyParameterInterpreterFunction { self: RichFunction =>
}

// TODO: how can we make sure this will invoke super.open(...) (can't do it directly...)
override def open(parameters: Configuration): Unit = {
override def open(openContext: OpenContext): Unit = {
toEvaluateFunctionConverter = lazyParameterHelper.createInterpreter(getRuntimeContext)
exceptionHandler = lazyParameterHelper.exceptionHandler(getRuntimeContext)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package pl.touk.nussknacker.engine.flink.api.process

import org.apache.flink.api.common.functions.{AbstractRichFunction, MapFunction, RuntimeContext}
import org.apache.flink.configuration.Configuration
import org.apache.flink.api.common.functions.{AbstractRichFunction, MapFunction, OpenContext, RuntimeContext}
import pl.touk.nussknacker.engine.api.Lifecycle
import pl.touk.nussknacker.engine.api.runtimecontext.EngineRuntimeContext

Expand All @@ -11,7 +10,7 @@ abstract class RichLifecycleFunction extends AbstractRichFunction {

protected val convertToEngineRuntimeContext: RuntimeContext => EngineRuntimeContext

override def open(parameters: Configuration): Unit = {
override def open(openContext: OpenContext): Unit = {
lifecycle.open(convertToEngineRuntimeContext(getRuntimeContext))
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package pl.touk.nussknacker.engine.flink.api.state

import org.apache.flink.api.common.functions.RichFunction
import org.apache.flink.api.common.functions.{OpenContext, RichFunction}
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimerService
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
Expand All @@ -18,8 +17,8 @@ abstract class EvictableStateFunction[In, Out, StateType] extends KeyedProcessFu

protected def stateDescriptor: ValueStateDescriptor[StateType]

override def open(parameters: Configuration): Unit = {
super.open(parameters)
override def open(openContext: OpenContext): Unit = {
super.open(openContext)
lastEventTimeForKey = getRuntimeContext.getState[java.lang.Long](
new ValueStateDescriptor[java.lang.Long]("timers", classOf[java.lang.Long])
)
Expand Down Expand Up @@ -91,7 +90,7 @@ trait LatelyEvictableStateFunctionMixin[StateType] extends RichFunction with Sta
@transient
protected var state: ValueState[StateType] = _

override def open(parameters: Configuration): Unit = {
override def open(openContext: OpenContext): Unit = {
latestEvictionTimeForKey = getRuntimeContext.getState[java.lang.Long](
new ValueStateDescriptor[java.lang.Long]("timers", classOf[java.lang.Long])
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package pl.touk.nussknacker.engine.flink.util.function

import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.configuration.Configuration
import org.apache.flink.api.common.functions.{OpenContext, RuntimeContext}
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.util.Collector

Expand All @@ -12,8 +11,8 @@ import org.apache.flink.util.Collector
abstract class CoProcessFunctionInterceptor[IN1, IN2, OUT](underlying: CoProcessFunction[IN1, IN2, OUT])
extends CoProcessFunction[IN1, IN2, OUT] {

override def open(parameters: Configuration): Unit = {
underlying.open(parameters)
override def open(openContext: OpenContext): Unit = {
underlying.open(openContext)
}

override def setRuntimeContext(ctx: RuntimeContext): Unit = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package pl.touk.nussknacker.engine.flink.util.function

import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.configuration.Configuration
import org.apache.flink.api.common.functions.{OpenContext, RuntimeContext}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.util.Collector

abstract class ProcessFunctionInterceptor[IN, OUT](underlying: KeyedProcessFunction[String, IN, OUT])
extends KeyedProcessFunction[String, IN, OUT] {

override def open(parameters: Configuration): Unit = {
underlying.open(parameters)
override def open(openContext: OpenContext): Unit = {
underlying.open(openContext)
}

override def setRuntimeContext(ctx: RuntimeContext): Unit = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package pl.touk.nussknacker.engine.flink.util.transformer.aggregate

import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.api.common.functions.{OpenContext, RuntimeContext}
import org.apache.flink.api.common.state.ValueState
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimerService
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector
Expand Down Expand Up @@ -47,7 +46,7 @@ class EmitExtraWindowWhenNoDataTumblingAggregatorFunction[MapT[K, V]](
@transient
private var contextIdGenerator: ContextIdGenerator = _

override def open(parameters: Configuration): Unit = {
override def open(openContext: OpenContext): Unit = {
state = getRuntimeContext.getState(stateDescriptor)
contextIdGenerator = convertToEngineRuntimeContext(getRuntimeContext).contextIdGenerator(nodeId.id)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package pl.touk.nussknacker.engine.flink.util.transformer.aggregate

import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.api.common.functions.{OpenContext, RuntimeContext}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimerService
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector
Expand Down Expand Up @@ -42,8 +41,8 @@ class EmitWhenEventLeftAggregatorFunction[MapT[K, V]](
type FlinkOnTimerCtx =
KeyedProcessFunction[String, ValueWithContext[StringKeyedValue[AnyRef]], ValueWithContext[AnyRef]]#OnTimerContext

override def open(parameters: Configuration): Unit = {
super.open(parameters)
override def open(openContext: OpenContext): Unit = {
super.open(openContext)
contextIdGenerator = convertToEngineRuntimeContext(getRuntimeContext).contextIdGenerator(nodeId.id)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package pl.touk.nussknacker.engine.flink.util.transformer.aggregate

import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.configuration.Configuration
import org.apache.flink.api.common.functions.{OpenContext, RuntimeContext}
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
Expand All @@ -18,7 +17,7 @@ class EnrichingWithKeyFunction(convertToEngineRuntimeContext: RuntimeContext =>
@transient
private var contextIdGenerator: ContextIdGenerator = _

override def open(parameters: Configuration): Unit = {
override def open(openContext: OpenContext): Unit = {
contextIdGenerator = convertToEngineRuntimeContext(getRuntimeContext).contextIdGenerator(nodeId)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package pl.touk.nussknacker.engine.flink.util.transformer.aggregate

import com.github.ghik.silencer.silent
import org.apache.flink.api.common.functions.{AggregateFunction, RuntimeContext}
import org.apache.flink.api.common.functions.{AggregateFunction, OpenContext, RuntimeContext}
import org.apache.flink.api.common.state.AggregatingStateDescriptor
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.{KeyedStream, SingleOutputStreamOperator}
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner
Expand Down Expand Up @@ -102,7 +101,7 @@ private class ValueEmittingWindowFunction(
@transient
private var contextIdGenerator: ContextIdGenerator = _

override def open(parameters: Configuration): Unit = {
override def open(openContext: OpenContext): Unit = {
contextIdGenerator = convertToEngineRuntimeContext(getRuntimeContext).contextIdGenerator(nodeId)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package pl.touk.nussknacker.engine.flink.util.transformer

import org.apache.flink.api.common.functions.OpenContext
import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.ListTypeInfo
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector
Expand Down Expand Up @@ -75,7 +75,7 @@ class DelayFunction(nodeCtx: FlinkCustomNodeContext, delay: Duration)

@transient private var state: MapState[Long, java.util.List[api.Context]] = _

override def open(config: Configuration): Unit = {
override def open(openContext: OpenContext): Unit = {
state = getRuntimeContext.getMapState(descriptor)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package pl.touk.nussknacker.engine.flink.util.transformer

import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.functions.{OpenContext, RichFlatMapFunction}
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.util.Collector
import pl.touk.nussknacker.engine.api._
Expand Down Expand Up @@ -46,8 +45,8 @@ case object PreviousValueTransformer extends CustomStreamTransformer with Explic

private[this] var state: ValueState[Value] = _

override def open(c: Configuration): Unit = {
super.open(c)
override def open(openContext: OpenContext): Unit = {
super.open(openContext)
val info = new ValueStateDescriptor[Value]("state", typeInformation)
state = getRuntimeContext.getState(info)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package pl.touk.nussknacker.engine.flink.table.aggregate

import org.apache.flink.api.common.functions.{FlatMapFunction, RuntimeContext}
import org.apache.flink.api.common.functions.{FlatMapFunction, OpenContext, RuntimeContext}
import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.table.api.Expressions.{$, call}
Expand Down Expand Up @@ -116,7 +115,7 @@ class TableAggregation(
@transient
private var contextIdGenerator: ContextIdGenerator = _

override def open(configuration: Configuration): Unit = {
override def open(openContext: OpenContext): Unit = {
contextIdGenerator = convertToEngineRuntimeContext(getRuntimeContext).contextIdGenerator(nodeId.toString)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package pl.touk.nussknacker.engine.process

import org.apache.flink.api.common.functions.RichFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.api.common.functions.{OpenContext, RichFunction}
import pl.touk.nussknacker.engine.graph.node.NodeData
import pl.touk.nussknacker.engine.process.compiler.FlinkProcessCompilerData
import pl.touk.nussknacker.engine.process.exception.FlinkExceptionHandler
Expand All @@ -22,8 +21,8 @@ trait ProcessPartFunction extends ExceptionHandlerFunction {
}
}

override def open(parameters: Configuration): Unit = {
super.open(parameters)
override def open(openContext: OpenContext): Unit = {
super.open(openContext)
compilerData.open(getRuntimeContext, nodesUsed)
}

Expand All @@ -46,7 +45,7 @@ trait ExceptionHandlerFunction extends RichFunction {
}
}

override def open(parameters: Configuration): Unit = {
override def open(openContext: OpenContext): Unit = {
exceptionHandler = compilerData.prepareExceptionHandler(getRuntimeContext)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package pl.touk.nussknacker.engine.process.registrar
import cats.effect.IO
import cats.effect.unsafe.IORuntime
import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.configuration.Configuration
import org.apache.flink.api.common.functions.OpenContext
import org.apache.flink.streaming.api.functions.async.{ResultFuture, RichAsyncFunction}
import pl.touk.nussknacker.engine.InterpretationResult
import pl.touk.nussknacker.engine.Interpreter.FutureShape
Expand Down Expand Up @@ -35,8 +35,8 @@ private[registrar] class AsyncInterpretationFunction(

private var serviceExecutionContext: ServiceExecutionContext = _

override def open(parameters: Configuration): Unit = {
super.open(parameters)
override def open(openContext: OpenContext): Unit = {
super.open(openContext)

getRuntimeContext.registerUserCodeClassLoaderReleaseHookIfAbsent(
"closeAsyncExecutionContext",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package pl.touk.nussknacker.engine.process.registrar

import org.apache.flink.configuration.Configuration
import org.apache.flink.api.common.functions.OpenContext
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector
import pl.touk.nussknacker.engine.api.process.ComponentUseCase
Expand All @@ -12,7 +12,7 @@ private[registrar] class SourceMetricsFunction[T](sourceId: String, componentUse

@transient private var metrics: OneSourceMetrics = _

override def open(parameters: Configuration): Unit = {
override def open(openContext: OpenContext): Unit = {
metrics = new OneSourceMetrics(sourceId)
val metricsProvider = createMetricsProvider(componentUseCase, getRuntimeContext)
metrics.registerOwnMetrics(metricsProvider)
Expand Down
Loading

0 comments on commit d0f0d3e

Please sign in to comment.