Skip to content

Commit

Permalink
Merge branch 'staging' into integer-dictionary-support-in-fragment-in…
Browse files Browse the repository at this point in the history
…puts
  • Loading branch information
mateuszkp96 committed Dec 17, 2024
2 parents a715669 + 7feab59 commit f1d0a7f
Show file tree
Hide file tree
Showing 7 changed files with 18 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package pl.touk.nussknacker.engine.flink.api

import com.typesafe.config.Config
import io.circe.{Decoder, Encoder}
import io.circe.Encoder
import net.ceedubs.ficus.Ficus._
import net.ceedubs.ficus.readers.ArbitraryTypeReader._
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters
import pl.touk.nussknacker.engine.api.ProcessVersion
import pl.touk.nussknacker.engine.api.namespaces.NamingStrategy
Expand Down Expand Up @@ -73,12 +72,8 @@ object NkGlobalParameters {
NkGlobalParameters(buildInfo, processVersion, configGlobalParameters, namespaceTags, additionalInformation)
}

def setInContext(ec: ExecutionConfig, globalParameters: NkGlobalParameters): Unit = {
ec.setGlobalJobParameters(globalParameters)
}

def readFromContext(ec: ExecutionConfig): Option[NkGlobalParameters] =
NkGlobalParametersToMapEncoder.decode(ec.getGlobalJobParameters.toMap.asScala.toMap)
def fromMap(jobParameters: java.util.Map[String, String]): Option[NkGlobalParameters] =
NkGlobalParametersToMapEncoder.decode(jobParameters.asScala.toMap)

private object NkGlobalParametersToMapEncoder {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ object ExecutionConfigPreparer extends LazyLogging {
override def prepareExecutionConfig(
config: ExecutionConfig
)(jobData: JobData, deploymentData: DeploymentData): Unit = {
NkGlobalParameters.setInContext(
config,
config.setGlobalJobParameters(
NkGlobalParameters.create(
buildInfo,
jobData.processVersion,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package pl.touk.nussknacker.engine.process.compiler

import com.github.ghik.silencer.silent
import org.apache.flink.api.common.functions.RuntimeContext
import pl.touk.nussknacker.engine.api.JobData
import pl.touk.nussknacker.engine.api.process.ComponentUseCase
Expand All @@ -15,9 +14,10 @@ case class FlinkEngineRuntimeContextImpl(
metricsProvider: MetricsProviderForScenario
) extends FlinkEngineRuntimeContext {

@silent("deprecated")
override def contextIdGenerator(nodeId: String): ContextIdGenerator =
new IncContextIdGenerator(jobData.metaData.name.value + "-" + nodeId + "-" + runtimeContext.getIndexOfThisSubtask)
new IncContextIdGenerator(
jobData.metaData.name.value + "-" + nodeId + "-" + runtimeContext.getTaskInfo.getIndexOfThisSubtask
)

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package pl.touk.nussknacker.engine.process.compiler
import cats.data.NonEmptyList
import com.codahale.metrics
import com.codahale.metrics.SlidingTimeWindowReservoir
import com.github.ghik.silencer.silent
import org.apache.flink
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper
Expand Down Expand Up @@ -53,9 +52,8 @@ class FlinkMetricsProviderForScenario(runtimeContext: RuntimeContext) extends Ba
??? // Shouldn't be needed because Flink jobs are recreated "from scratch" and no cleanup of metrics during cancel is needed
}

@silent("deprecated")
private def groupsWithName(nameParts: NonEmptyList[String], tags: Map[String, String]): (MetricGroup, String) = {
val namespaceTags = extractTags(NkGlobalParameters.readFromContext(runtimeContext.getExecutionConfig))
val namespaceTags = extractTags(NkGlobalParameters.fromMap(runtimeContext.getGlobalJobParameters))
tagMode(nameParts, tags ++ namespaceTags)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class FlinkProcessRegistrar(
resultCollector: ResultCollector,
deploymentData: DeploymentData
): Unit = {
val globalParameters = NkGlobalParameters.readFromContext(env.getConfig)
val globalParameters = NkGlobalParameters.fromMap(env.getConfig.getGlobalJobParameters.toMap)

def nodeContext(
nodeComponentId: NodeComponentInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class DelayedFlinkKafkaConsumer[T](
runtimeContext.getProcessingTimeService,
runtimeContext.getExecutionConfig.getAutoWatermarkInterval,
runtimeContext.getUserCodeClassLoader,
runtimeContext.getTaskNameWithSubtasks,
runtimeContext.getTaskInfo.getTaskNameWithSubtasks,
runtimeContext.getMetricGroup,
consumerMetricGroup,
deserializer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,18 @@ class NkGlobalParametersEncoderTest extends AnyFunSuite with Matchers {
)

List(globalParamsWithAllOptionalValues, globalParamsWithNoOptionalValues).foreach { params =>
val ec = new ExecutionConfig()
ec.setGlobalJobParameters(params)
val globalParamsFromEc = NkGlobalParameters.readFromContext(ec).get

params.buildInfo shouldBe globalParamsFromEc.buildInfo
params.processVersion shouldBe globalParamsFromEc.processVersion
params.configParameters shouldBe globalParamsFromEc.configParameters
params.namespaceParameters shouldBe globalParamsFromEc.namespaceParameters
params.additionalInformation shouldBe globalParamsFromEc.additionalInformation
val decodedParams = NkGlobalParameters.fromMap(params.toMap).get

decodedParams.buildInfo shouldBe params.buildInfo
decodedParams.processVersion shouldBe params.processVersion
decodedParams.configParameters shouldBe params.configParameters
decodedParams.namespaceParameters shouldBe params.namespaceParameters
decodedParams.additionalInformation shouldBe params.additionalInformation
}
}

test("returns None when context doesnt have required parameters") {
NkGlobalParameters.readFromContext(new ExecutionConfig()) shouldBe None
NkGlobalParameters.fromMap(new ExecutionConfig.GlobalJobParameters().toMap) shouldBe None
}

}

0 comments on commit f1d0a7f

Please sign in to comment.