Skip to content

Commit

Permalink
Fix deployments for scenarios with dict editors after model reload
Browse files Browse the repository at this point in the history
  • Loading branch information
Elmacioro committed Nov 5, 2024
1 parent acce220 commit bd2219a
Show file tree
Hide file tree
Showing 23 changed files with 155 additions and 34 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
package pl.touk.nussknacker.engine.api.parameter

import io.circe.generic.extras.semiauto.{deriveUnwrappedDecoder, deriveUnwrappedEncoder}
import io.circe.{Decoder, Encoder, KeyDecoder, KeyEncoder}

final case class ParameterName(value: String) {
def withBranchId(branchId: String): ParameterName = ParameterName(s"$value for branch $branchId")
}

object ParameterName {
implicit val encoder: Encoder[ParameterName] = deriveUnwrappedEncoder
implicit val decoder: Decoder[ParameterName] = deriveUnwrappedDecoder

implicit val keyEncoder: KeyEncoder[ParameterName] = KeyEncoder.encodeKeyString.contramap(_.value)
implicit val keyDecoder: KeyDecoder[ParameterName] = KeyDecoder.decodeKeyString.map(ParameterName(_))
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.api.component

import io.circe.generic.JsonCodec
import pl.touk.nussknacker.engine.api.definition.FixedExpressionValue
import pl.touk.nussknacker.engine.api.parameter.{
ParameterName,
Expand All @@ -24,6 +25,7 @@ object AdditionalUIConfigProvider {
val empty = new DefaultAdditionalUIConfigProvider(Map.empty, Map.empty)
}

@JsonCodec
case class ComponentAdditionalConfig(
parameterConfigs: Map[ParameterName, ParameterAdditionalUIConfig],
icon: Option[String] = None,
Expand All @@ -32,6 +34,7 @@ case class ComponentAdditionalConfig(
disabled: Boolean = false
)

@JsonCodec
case class ParameterAdditionalUIConfig(
required: Boolean,
initialValue: Option[FixedExpressionValue],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package pl.touk.nussknacker.engine.api.component

import io.circe.generic.extras.semiauto.{deriveUnwrappedDecoder, deriveUnwrappedEncoder}
import io.circe.{Decoder, Encoder}
import io.circe.{Decoder, Encoder, KeyDecoder, KeyEncoder}

// TODO This class is used as a work around for the problem that the components are duplicated across processing types.
// We plan to get rid of this. After that, we could replace usages of this class by usage of ComponentId
Expand All @@ -14,6 +14,10 @@ object DesignerWideComponentId {
implicit val encoder: Encoder[DesignerWideComponentId] = deriveUnwrappedEncoder
implicit val decoder: Decoder[DesignerWideComponentId] = deriveUnwrappedDecoder

implicit val keyEncoder: KeyEncoder[DesignerWideComponentId] = KeyEncoder.encodeKeyString.contramap(_.value)
implicit val keyDecoder: KeyDecoder[DesignerWideComponentId] =
KeyDecoder.decodeKeyString.map(DesignerWideComponentId(_))

def apply(value: String): DesignerWideComponentId = new DesignerWideComponentId(value.toLowerCase)

def forBuiltInComponent(componentId: ComponentId): DesignerWideComponentId = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package pl.touk.nussknacker.ui.config

import pl.touk.nussknacker.engine.api.component.{ComponentAdditionalConfig, DesignerWideComponentId}
import pl.touk.nussknacker.engine.api.parameter.ValueInputWithDictEditor
import pl.touk.nussknacker.engine.api.process.ProcessingType
import pl.touk.nussknacker.ui.process.processingtype.provider.ProcessingTypeDataProvider
import pl.touk.nussknacker.ui.security.api.LoggedUser

class AdditionalDictComponentConfigsProvider(
private val additionalConfigsProvider: ProcessingTypeDataProvider[
Map[DesignerWideComponentId, ComponentAdditionalConfig],
_
]
) {

// This function filters additional configs provided by AdditionalUIConfigProvider
// to include only component and parameter configs with Dictionary editors.
// This is done to reduce data sent to Flink as only configs regarding this editor are required to be known during execution.
def getAdditionalConfigsWithDictParametersEditors(
processingType: ProcessingType
)(implicit user: LoggedUser): Map[DesignerWideComponentId, ComponentAdditionalConfig] = {
additionalConfigsProvider
.forProcessingType(processingType)
.getOrElse(Map.empty)
.map { case (componentId, componentAdditionalConfig) =>
val parametersWithDictEditors = componentAdditionalConfig.parameterConfigs.filter {
case (_, additionalUiConfig) =>
additionalUiConfig.valueEditor match {
case Some(_: ValueInputWithDictEditor) => true
case _ => false
}
}
componentId -> componentAdditionalConfig.copy(parameterConfigs = parametersWithDictEditors)
}
.filter(_._2.parameterConfigs.nonEmpty)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ class DefaultProcessingTypeDeployedScenariosProvider(
DeploymentId.fromActionId(lastDeployAction.id),
deployingUser,
Map.empty,
NodesDeploymentData.empty
NodesDeploymentData.empty,
Map.empty
)
val deployedScenarioDataTry =
scenarioResolver.resolveScenario(details.json).map { resolvedScenario =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.deployment._
import pl.touk.nussknacker.restmodel.scenariodetails.ScenarioWithDetails
import pl.touk.nussknacker.ui.api.{DeploymentCommentSettings, ListenerApiUser}
import pl.touk.nussknacker.ui.config.AdditionalDictComponentConfigsProvider
import pl.touk.nussknacker.ui.listener.ProcessChangeEvent.{OnActionExecutionFinished, OnActionFailed, OnActionSuccess}
import pl.touk.nussknacker.ui.listener.{ProcessChangeListener, User => ListenerUser}
import pl.touk.nussknacker.ui.process.ProcessStateProvider
Expand Down Expand Up @@ -55,6 +56,7 @@ class DeploymentService(
processChangeListener: ProcessChangeListener,
scenarioStateTimeout: Option[FiniteDuration],
deploymentCommentSettings: Option[DeploymentCommentSettings],
additionalDictComponentConfigsProvider: AdditionalDictComponentConfigsProvider,
clock: Clock = Clock.systemUTC()
)(implicit system: ActorSystem)
extends ActionService
Expand Down Expand Up @@ -324,7 +326,10 @@ class DeploymentService(
DeploymentId.fromActionId(actionId),
user.toManagerUser,
additionalDeploymentData,
nodesDeploymentData
nodesDeploymentData,
additionalDictComponentConfigsProvider.getAdditionalConfigsWithDictParametersEditors(
processDetails.processingType
)
)
} yield DeployedScenarioData(processDetails.toEngineProcessVersion, deploymentData, resolvedCanonicalProcess)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import pl.touk.nussknacker.engine.newdeployment.DeploymentId
import pl.touk.nussknacker.restmodel.validation.ValidationResults.ValidationErrors
import pl.touk.nussknacker.security.Permission
import pl.touk.nussknacker.security.Permission.Permission
import pl.touk.nussknacker.ui.config.AdditionalDictComponentConfigsProvider
import pl.touk.nussknacker.ui.db.entity.ProcessVersionEntityData
import pl.touk.nussknacker.ui.process.ScenarioMetadata
import pl.touk.nussknacker.ui.process.deployment.DeploymentManagerDispatcher
Expand Down Expand Up @@ -42,7 +43,8 @@ class DeploymentService(
deploymentRepository: DeploymentRepository,
dmDispatcher: DeploymentManagerDispatcher,
dbioRunner: DBIOActionRunner,
clock: Clock
clock: Clock,
additionalDictComponentConfigsProvider: AdditionalDictComponentConfigsProvider
)(implicit ec: ExecutionContext)
extends LazyLogging {

Expand Down Expand Up @@ -156,7 +158,10 @@ class DeploymentService(
LegacyDeploymentId(""),
user.toManagerUser,
Map.empty,
NodesDeploymentData.empty
NodesDeploymentData.empty,
additionalDictComponentConfigsProvider.getAdditionalConfigsWithDictParametersEditors(
scenarioMetadata.processingType
)(user)
)
for {
result <- EitherT[Future, RunDeploymentError, Unit](
Expand Down Expand Up @@ -189,7 +194,10 @@ class DeploymentService(
toLegacyDeploymentId(command.id),
command.user.toManagerUser,
additionalDeploymentData = Map.empty,
command.nodesDeploymentData
command.nodesDeploymentData,
additionalDictComponentConfigsProvider.getAdditionalConfigsWithDictParametersEditors(
scenarioMetadata.processingType
)(command.user)
)
dmDispatcher
.deploymentManagerUnsafe(scenarioMetadata.processingType)(command.user)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import pl.touk.nussknacker.processCounts.{CountsReporter, CountsReporterCreator}
import pl.touk.nussknacker.ui.api._
import pl.touk.nussknacker.ui.config.scenariotoolbar.CategoriesScenarioToolbarsConfigParser
import pl.touk.nussknacker.ui.config.{
AdditionalDictComponentConfigsProvider,
AttachmentsConfig,
ComponentLinksConfigExtractor,
FeatureTogglesConfig,
Expand Down Expand Up @@ -232,6 +233,13 @@ class AkkaHttpBasedRouteProvider(
futureProcessRepository
)

val additionalDictComponentConfigsProvider = {
val additionalConfigsProvider = processingTypeDataProvider.mapValues { processingTypeData =>
processingTypeData.designerModelData.modelData.additionalConfigsFromProvider
}
new AdditionalDictComponentConfigsProvider(additionalConfigsProvider)
}

val legacyDeploymentService = new LegacyDeploymentService(
dmDispatcher,
processRepository,
Expand All @@ -241,7 +249,8 @@ class AkkaHttpBasedRouteProvider(
scenarioResolver,
processChangeListener,
featureTogglesConfig.scenarioStateTimeout,
featureTogglesConfig.deploymentCommentSettings
featureTogglesConfig.deploymentCommentSettings,
additionalDictComponentConfigsProvider
)
legacyDeploymentService.invalidateInProgressActions()

Expand Down Expand Up @@ -438,7 +447,8 @@ class AkkaHttpBasedRouteProvider(
deploymentRepository,
dmDispatcher,
dbioRunner,
Clock.systemDefaultZone()
Clock.systemDefaultZone(),
additionalDictComponentConfigsProvider
)
val activityService =
new ActivityService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import pl.touk.nussknacker.test.utils.domain.{ProcessTestData, TestFactory}
import pl.touk.nussknacker.test.utils.scalas.AkkaHttpExtensions.toRequestEntity
import pl.touk.nussknacker.ui.LoadableConfigBasedNussknackerConfig
import pl.touk.nussknacker.ui.api._
import pl.touk.nussknacker.ui.config.FeatureTogglesConfig
import pl.touk.nussknacker.ui.config.{AdditionalDictComponentConfigsProvider, FeatureTogglesConfig}
import pl.touk.nussknacker.ui.config.scenariotoolbar.CategoriesScenarioToolbarsConfigParser
import pl.touk.nussknacker.ui.process.ProcessService.{CreateScenarioCommand, UpdateScenarioCommand}
import pl.touk.nussknacker.ui.process._
Expand Down Expand Up @@ -110,6 +110,10 @@ trait NuResourcesTest
futureFetchingScenarioRepository
)

protected val additionalDictComponentConfigsProvider = new AdditionalDictComponentConfigsProvider(
mapProcessingTypeDataProvider()
)

protected val deploymentService: DeploymentService =
new DeploymentService(
dmDispatcher,
Expand All @@ -120,7 +124,8 @@ trait NuResourcesTest
scenarioResolverByProcessingType,
processChangeListener,
None,
deploymentCommentSettings
deploymentCommentSettings,
additionalDictComponentConfigsProvider
)

protected val processingTypeConfig: ProcessingTypeConfig =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ import pl.touk.nussknacker.engine.build.ScenarioBuilder
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.deployment.{DeploymentData, DeploymentId, ExternalDeploymentId}
import pl.touk.nussknacker.test.base.db.WithHsqlDbTesting
import pl.touk.nussknacker.test.utils.domain.TestFactory.mapProcessingTypeDataProvider
import pl.touk.nussknacker.test.utils.domain.{ProcessTestData, TestFactory}
import pl.touk.nussknacker.test.utils.scalas.DBIOActionValues
import pl.touk.nussknacker.test.{EitherValuesDetailedMessage, PatientScalaFutures}
import pl.touk.nussknacker.ui.config.AdditionalDictComponentConfigsProvider
import pl.touk.nussknacker.ui.listener.ProcessChangeListener
import pl.touk.nussknacker.ui.process.deployment.LoggedUserConversions._
import pl.touk.nussknacker.ui.process.deployment.{
Expand Down Expand Up @@ -195,6 +197,7 @@ class NotificationServiceTest
mock[ProcessChangeListener],
None,
None,
new AdditionalDictComponentConfigsProvider(mapProcessingTypeDataProvider()),
clock
) {
override protected def validateBeforeDeploy(
Expand All @@ -216,7 +219,8 @@ class NotificationServiceTest
DeploymentId.fromActionId(actionId),
user.toManagerUser,
additionalDeploymentData,
nodesDeploymentData
nodesDeploymentData,
Map.empty
),
processDetails.json
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import pl.touk.nussknacker.test.utils.domain.{ProcessTestData, TestFactory}
import pl.touk.nussknacker.test.utils.scalas.DBIOActionValues
import pl.touk.nussknacker.test.{EitherValuesDetailedMessage, NuScalaTestAssertions, PatientScalaFutures}
import pl.touk.nussknacker.ui.api.DeploymentCommentSettings
import pl.touk.nussknacker.ui.config.AdditionalDictComponentConfigsProvider
import pl.touk.nussknacker.ui.listener.ProcessChangeEvent.{OnActionExecutionFinished, OnActionSuccess}
import pl.touk.nussknacker.ui.process.processingtype.ValueWithRestriction
import pl.touk.nussknacker.ui.process.processingtype.provider.ProcessingTypeDataProvider.noCombinedDataFun
Expand Down Expand Up @@ -114,7 +115,10 @@ class DeploymentServiceSpec
TestFactory.scenarioResolverByProcessingType,
listener,
scenarioStateTimeout,
deploymentCommentSettings
deploymentCommentSettings,
new AdditionalDictComponentConfigsProvider(
mapProcessingTypeDataProvider()
)
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ import pl.touk.nussknacker.engine.newdeployment.DeploymentId
import pl.touk.nussknacker.test.base.db.WithHsqlDbTesting
import pl.touk.nussknacker.test.base.it.WithClock
import pl.touk.nussknacker.test.config.WithSimplifiedDesignerConfig.TestProcessingType.Streaming
import pl.touk.nussknacker.test.utils.domain.TestFactory.mapProcessingTypeDataProvider
import pl.touk.nussknacker.test.utils.domain.{ProcessTestData, TestFactory}
import pl.touk.nussknacker.test.utils.scalas.DBIOActionValues
import pl.touk.nussknacker.test.{EitherValuesDetailedMessage, PatientScalaFutures}
import pl.touk.nussknacker.ui.config.AdditionalDictComponentConfigsProvider
import pl.touk.nussknacker.ui.process.deployment.DeploymentManagerDispatcher
import pl.touk.nussknacker.ui.process.processingtype.ValueWithRestriction
import pl.touk.nussknacker.ui.process.processingtype.provider.ProcessingTypeDataProvider
Expand Down Expand Up @@ -53,7 +55,8 @@ class DeploymentServiceTest
TestFactory.newFutureFetchingScenarioRepository(testDbRef)
),
dbioRunner,
clock
clock,
new AdditionalDictComponentConfigsProvider(mapProcessingTypeDataProvider())
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package pl.touk.nussknacker.engine.process.compiler

import com.typesafe.config.Config
import pl.touk.nussknacker.engine.ModelData.ExtractDefinitionFun
import pl.touk.nussknacker.engine.api.component.DesignerWideComponentId
import pl.touk.nussknacker.engine.api.component.{ComponentAdditionalConfig, DesignerWideComponentId}
import pl.touk.nussknacker.engine.api.dict.EngineDictRegistry
import pl.touk.nussknacker.engine.api.namespaces.NamingStrategy
import pl.touk.nussknacker.engine.api.process.{ComponentUseCase, ProcessConfigCreator, ProcessObjectDependencies}
Expand Down Expand Up @@ -36,6 +36,7 @@ class FlinkProcessCompilerDataFactory(
modelConfig: Config,
namingStrategy: NamingStrategy,
componentUseCase: ComponentUseCase,
configsFromProviderWithDictionaryEditor: Map[DesignerWideComponentId, ComponentAdditionalConfig]
) extends Serializable {

import net.ceedubs.ficus.Ficus._
Expand All @@ -47,6 +48,7 @@ class FlinkProcessCompilerDataFactory(
modelData.modelConfig,
modelData.namingStrategy,
componentUseCase = ComponentUseCase.EngineRuntime,
modelData.additionalConfigsFromProvider
)

def prepareCompilerData(
Expand Down Expand Up @@ -124,7 +126,7 @@ class FlinkProcessCompilerDataFactory(
userCodeClassLoader,
modelDependencies,
id => DesignerWideComponentId(id.toString),
Map.empty
configsFromProviderWithDictionaryEditor
)
)
val dictRegistry = dictRegistryFactory.createEngineDictRegistry(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package pl.touk.nussknacker.engine.process.compiler
import com.typesafe.config.Config
import pl.touk.nussknacker.engine.ModelData.ExtractDefinitionFun
import pl.touk.nussknacker.engine.api.{NodeId, Params}
import pl.touk.nussknacker.engine.api.component.ComponentType
import pl.touk.nussknacker.engine.api.component.{ComponentAdditionalConfig, ComponentType, DesignerWideComponentId}
import pl.touk.nussknacker.engine.api.context.ContextTransformation
import pl.touk.nussknacker.engine.api.namespaces.NamingStrategy
import pl.touk.nussknacker.engine.api.process.{ComponentUseCase, ProcessConfigCreator}
Expand All @@ -29,13 +29,15 @@ abstract class StubbedFlinkProcessCompilerDataFactory(
extractModelDefinition: ExtractDefinitionFun,
modelConfig: Config,
namingStrategy: NamingStrategy,
componentUseCase: ComponentUseCase
componentUseCase: ComponentUseCase,
configsFromProviderWithDictionaryEditor: Map[DesignerWideComponentId, ComponentAdditionalConfig]
) extends FlinkProcessCompilerDataFactory(
creator,
extractModelDefinition,
modelConfig,
namingStrategy,
componentUseCase,
configsFromProviderWithDictionaryEditor
) {

override protected def adjustDefinitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ object TestFlinkProcessCompilerDataFactory {
modelData.extractModelDefinitionFun,
modelData.modelConfig,
modelData.namingStrategy,
ComponentUseCase.TestRuntime
ComponentUseCase.TestRuntime,
modelData.additionalConfigsFromProvider
) {

override protected def adjustListeners(
Expand Down
Loading

0 comments on commit bd2219a

Please sign in to comment.