diff --git a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/NkGlobalParameters.scala b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/NkGlobalParameters.scala index fabab3aec76..ddd7fbc38e2 100644 --- a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/NkGlobalParameters.scala +++ b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/NkGlobalParameters.scala @@ -1,10 +1,9 @@ package pl.touk.nussknacker.engine.flink.api import com.typesafe.config.Config -import io.circe.{Decoder, Encoder} +import io.circe.Encoder import net.ceedubs.ficus.Ficus._ import net.ceedubs.ficus.readers.ArbitraryTypeReader._ -import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters import pl.touk.nussknacker.engine.api.ProcessVersion import pl.touk.nussknacker.engine.api.namespaces.NamingStrategy @@ -73,12 +72,8 @@ object NkGlobalParameters { NkGlobalParameters(buildInfo, processVersion, configGlobalParameters, namespaceTags, additionalInformation) } - def setInContext(ec: ExecutionConfig, globalParameters: NkGlobalParameters): Unit = { - ec.setGlobalJobParameters(globalParameters) - } - - def readFromContext(ec: ExecutionConfig): Option[NkGlobalParameters] = - NkGlobalParametersToMapEncoder.decode(ec.getGlobalJobParameters.toMap.asScala.toMap) + def fromMap(jobParameters: java.util.Map[String, String]): Option[NkGlobalParameters] = + NkGlobalParametersToMapEncoder.decode(jobParameters.asScala.toMap) private object NkGlobalParametersToMapEncoder { diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ExecutionConfigPreparer.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ExecutionConfigPreparer.scala index d614af52f67..34c9140ac1a 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ExecutionConfigPreparer.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ExecutionConfigPreparer.scala @@ -53,8 +53,7 @@ object ExecutionConfigPreparer extends LazyLogging { override def prepareExecutionConfig( config: ExecutionConfig )(jobData: JobData, deploymentData: DeploymentData): Unit = { - NkGlobalParameters.setInContext( - config, + config.setGlobalJobParameters( NkGlobalParameters.create( buildInfo, jobData.processVersion, diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkEngineRuntimeContextImpl.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkEngineRuntimeContextImpl.scala index ee33a6bca26..984d34a80df 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkEngineRuntimeContextImpl.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkEngineRuntimeContextImpl.scala @@ -1,6 +1,5 @@ package pl.touk.nussknacker.engine.process.compiler -import com.github.ghik.silencer.silent import org.apache.flink.api.common.functions.RuntimeContext import pl.touk.nussknacker.engine.api.JobData import pl.touk.nussknacker.engine.api.process.ComponentUseCase @@ -15,9 +14,10 @@ case class FlinkEngineRuntimeContextImpl( metricsProvider: MetricsProviderForScenario ) extends FlinkEngineRuntimeContext { - @silent("deprecated") override def contextIdGenerator(nodeId: String): ContextIdGenerator = - new IncContextIdGenerator(jobData.metaData.name.value + "-" + nodeId + "-" + runtimeContext.getIndexOfThisSubtask) + new IncContextIdGenerator( + jobData.metaData.name.value + "-" + nodeId + "-" + runtimeContext.getTaskInfo.getIndexOfThisSubtask + ) } diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkMetricsProviderForScenario.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkMetricsProviderForScenario.scala index 7fbd65f16f5..ccc9708f381 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkMetricsProviderForScenario.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkMetricsProviderForScenario.scala @@ -3,7 +3,6 @@ package pl.touk.nussknacker.engine.process.compiler import cats.data.NonEmptyList import com.codahale.metrics import com.codahale.metrics.SlidingTimeWindowReservoir -import com.github.ghik.silencer.silent import org.apache.flink import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper @@ -53,9 +52,8 @@ class FlinkMetricsProviderForScenario(runtimeContext: RuntimeContext) extends Ba ??? // Shouldn't be needed because Flink jobs are recreated "from scratch" and no cleanup of metrics during cancel is needed } - @silent("deprecated") private def groupsWithName(nameParts: NonEmptyList[String], tags: Map[String, String]): (MetricGroup, String) = { - val namespaceTags = extractTags(NkGlobalParameters.readFromContext(runtimeContext.getExecutionConfig)) + val namespaceTags = extractTags(NkGlobalParameters.fromMap(runtimeContext.getGlobalJobParameters)) tagMode(nameParts, tags ++ namespaceTags) } diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/FlinkProcessRegistrar.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/FlinkProcessRegistrar.scala index ef22dc586d6..408428d2453 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/FlinkProcessRegistrar.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/FlinkProcessRegistrar.scala @@ -128,7 +128,7 @@ class FlinkProcessRegistrar( resultCollector: ResultCollector, deploymentData: DeploymentData ): Unit = { - val globalParameters = NkGlobalParameters.readFromContext(env.getConfig) + val globalParameters = NkGlobalParameters.fromMap(env.getConfig.getGlobalJobParameters.toMap) def nodeContext( nodeComponentId: NodeComponentInfo, diff --git a/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/generic/DelayedFlinkKafkaConsumer.scala b/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/generic/DelayedFlinkKafkaConsumer.scala index 263c9abe958..0f29f74707e 100644 --- a/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/generic/DelayedFlinkKafkaConsumer.scala +++ b/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/generic/DelayedFlinkKafkaConsumer.scala @@ -136,7 +136,7 @@ class DelayedFlinkKafkaConsumer[T]( runtimeContext.getProcessingTimeService, runtimeContext.getExecutionConfig.getAutoWatermarkInterval, runtimeContext.getUserCodeClassLoader, - runtimeContext.getTaskNameWithSubtasks, + runtimeContext.getTaskInfo.getTaskNameWithSubtasks, runtimeContext.getMetricGroup, consumerMetricGroup, deserializer, diff --git a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/NkGlobalParametersEncoderTest.scala b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/NkGlobalParametersEncoderTest.scala index c36d050f7da..7ad5e7528a1 100644 --- a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/NkGlobalParametersEncoderTest.scala +++ b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/NkGlobalParametersEncoderTest.scala @@ -41,20 +41,18 @@ class NkGlobalParametersEncoderTest extends AnyFunSuite with Matchers { ) List(globalParamsWithAllOptionalValues, globalParamsWithNoOptionalValues).foreach { params => - val ec = new ExecutionConfig() - ec.setGlobalJobParameters(params) - val globalParamsFromEc = NkGlobalParameters.readFromContext(ec).get - - params.buildInfo shouldBe globalParamsFromEc.buildInfo - params.processVersion shouldBe globalParamsFromEc.processVersion - params.configParameters shouldBe globalParamsFromEc.configParameters - params.namespaceParameters shouldBe globalParamsFromEc.namespaceParameters - params.additionalInformation shouldBe globalParamsFromEc.additionalInformation + val decodedParams = NkGlobalParameters.fromMap(params.toMap).get + + decodedParams.buildInfo shouldBe params.buildInfo + decodedParams.processVersion shouldBe params.processVersion + decodedParams.configParameters shouldBe params.configParameters + decodedParams.namespaceParameters shouldBe params.namespaceParameters + decodedParams.additionalInformation shouldBe params.additionalInformation } } test("returns None when context doesnt have required parameters") { - NkGlobalParameters.readFromContext(new ExecutionConfig()) shouldBe None + NkGlobalParameters.fromMap(new ExecutionConfig.GlobalJobParameters().toMap) shouldBe None } }