Skip to content

Commit

Permalink
[Fix] Passing Flink Job Global Params (#7324)
Browse files Browse the repository at this point in the history
  • Loading branch information
lciolecki committed Dec 13, 2024
1 parent 3bf7aff commit 073432c
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 20 deletions.
3 changes: 3 additions & 0 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
* [#7184](https://github.com/TouK/nussknacker/pull/7184) Improve Nu Designer API notifications endpoint, to include events related to currently displayed scenario
* [#7323](https://github.com/TouK/nussknacker/pull/7323) Improve Periodic DeploymentManager db queries

### 1.18.2 (Not released)
* [#7324](https://github.com/TouK/nussknacker/pull/7324) Fix: Passing Flink Job Global Params

### 1.18.1 (9 December 2024)

* [#7207](https://github.com/TouK/nussknacker/pull/7207) Fixed minor clipboard, keyboard and focus related bugs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import scala.jdk.CollectionConverters._
//Also, those configuration properties will be exposed via Flink REST API/webconsole
case class NkGlobalParameters(
buildInfo: String,
deploymentId: String, // TODO: Pass here DeploymentId?
processVersion: ProcessVersion,
configParameters: Option[ConfigGlobalParameters],
namespaceParameters: Option[NamespaceMetricsTags],
Expand Down Expand Up @@ -64,13 +65,21 @@ object NkGlobalParameters {

def create(
buildInfo: String,
deploymentId: String, // TODO: Pass here DeploymentId?
processVersion: ProcessVersion,
modelConfig: Config,
namespaceTags: Option[NamespaceMetricsTags],
additionalInformation: Map[String, String]
): NkGlobalParameters = {
val configGlobalParameters = modelConfig.getAs[ConfigGlobalParameters]("globalParameters")
NkGlobalParameters(buildInfo, processVersion, configGlobalParameters, namespaceTags, additionalInformation)
NkGlobalParameters(
buildInfo,
deploymentId,
processVersion,
configGlobalParameters,
namespaceTags,
additionalInformation
)
}

def setInContext(ec: ExecutionConfig, globalParameters: NkGlobalParameters): Unit = {
Expand All @@ -84,11 +93,12 @@ object NkGlobalParameters {

def encode(parameters: NkGlobalParameters): Map[String, String] = {
def encodeWithKeyPrefix(map: Map[String, String], prefix: String): Map[String, String] = {
map.map { case (key, value) => s"$prefix$key" -> value }
map.map { case (key, value) => s"$prefix.$key" -> value }
}

val baseProperties = Map[String, String](
"buildInfo" -> parameters.buildInfo,
"deploymentId" -> parameters.deploymentId,
"versionId" -> parameters.processVersion.versionId.value.toString,
"processId" -> parameters.processVersion.processId.value.toString,
"modelVersion" -> parameters.processVersion.modelVersion.map(_.toString).orNull,
Expand All @@ -100,9 +110,11 @@ object NkGlobalParameters {
val configMap = parameters.configParameters
.map(ConfigGlobalParametersToMapEncoder.encode)
.getOrElse(Map.empty)

val namespaceTagsMap = parameters.namespaceParameters
.map(p => encodeWithKeyPrefix(p.tags, namespaceTagsMapPrefix))
.getOrElse(Map.empty)

val additionalInformationMap =
encodeWithKeyPrefix(parameters.additionalInformation, additionalInformationMapPrefix)

Expand All @@ -112,8 +124,8 @@ object NkGlobalParameters {
def decode(map: Map[String, String]): Option[NkGlobalParameters] = {
def decodeWithKeyPrefix(map: Map[String, String], prefix: String): Map[String, String] = {
map.view
.filter { case (key, _) => key.startsWith(prefix) }
.map { case (key, value) => key.stripPrefix(prefix) -> value }
.filter { case (key, _) => key.startsWith(s"$prefix.") }
.map { case (key, value) => key.stripPrefix(s"$prefix.") -> value }
.toMap
}

Expand All @@ -139,7 +151,15 @@ object NkGlobalParameters {
for {
processVersion <- processVersionOpt
buildInfo <- buildInfoOpt
} yield NkGlobalParameters(buildInfo, processVersion, configParameters, namespaceTags, additionalInformation)
deploymentId <- map.get("deploymentId")
} yield NkGlobalParameters(
buildInfo,
deploymentId,
processVersion,
configParameters,
namespaceTags,
additionalInformation
)
}

private object ConfigGlobalParametersToMapEncoder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,30 +57,19 @@ object ExecutionConfigPreparer extends LazyLogging {
config,
NkGlobalParameters.create(
buildInfo,
deploymentData.deploymentId.value,
jobData.processVersion,
modelConfig,
namespaceTags = NamespaceMetricsTags(jobData.metaData.name.value, namingStrategy),
prepareMap(jobData.processVersion, deploymentData)
prepareMap(deploymentData)
)
)
}

private def prepareMap(processVersion: ProcessVersion, deploymentData: DeploymentData) = {

val baseProperties = Map[String, String](
"buildInfo" -> buildInfo,
"versionId" -> processVersion.versionId.value.toString,
"processId" -> processVersion.processId.value.toString,
"labels" -> Encoder[List[String]].apply(processVersion.labels).noSpaces,
"modelVersion" -> processVersion.modelVersion.map(_.toString).orNull,
"user" -> processVersion.user,
"deploymentId" -> deploymentData.deploymentId.value
)
val scenarioProperties = deploymentData.additionalDeploymentData.map { case (k, v) =>
private def prepareMap(deploymentData: DeploymentData) =
deploymentData.additionalDeploymentData.map { case (k, v) =>
s"deployment.properties.$k" -> v
}
baseProperties ++ scenarioProperties
}

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class NkGlobalParametersEncoderTest extends AnyFunSuite with Matchers {
test("global parameters set and read from context are equal") {
val globalParamsWithAllOptionalValues = NkGlobalParameters(
buildInfo = "aBuildInfo",
deploymentId = "1",
processVersion = ProcessVersion(
VersionId.initialVersionId,
ProcessName("aProcessName"),
Expand All @@ -27,6 +28,7 @@ class NkGlobalParametersEncoderTest extends AnyFunSuite with Matchers {

val globalParamsWithNoOptionalValues = NkGlobalParameters(
buildInfo = "aBuildInfo",
deploymentId = "1",
processVersion = ProcessVersion(
VersionId.initialVersionId,
ProcessName("aProcessName"),
Expand All @@ -46,6 +48,7 @@ class NkGlobalParametersEncoderTest extends AnyFunSuite with Matchers {
val globalParamsFromEc = NkGlobalParameters.readFromContext(ec).get

params.buildInfo shouldBe globalParamsFromEc.buildInfo
params.deploymentId shouldBe globalParamsFromEc.deploymentId
params.processVersion shouldBe globalParamsFromEc.processVersion
params.configParameters shouldBe globalParamsFromEc.configParameters
params.namespaceParameters shouldBe globalParamsFromEc.namespaceParameters
Expand Down

0 comments on commit 073432c

Please sign in to comment.