From 6c5f6e356e90f497b7be391216bb107bc42c8bfd Mon Sep 17 00:00:00 2001 From: Maciej Cichanowicz Date: Wed, 6 Nov 2024 13:55:45 +0100 Subject: [PATCH 1/5] Fix deployments for scenarios with dict editors after model reload --- .../engine/api/parameter/ParameterName.scala | 11 +++ .../AdditionalUIConfigProvider.scala | 3 + .../component/DesignerWideComponentId.scala | 6 +- .../DeploymentManagerDependencies.scala | 2 + ...cessingTypeDeployedScenariosProvider.scala | 3 +- .../deployment/DeploymentService.scala | 16 ++- .../newdeployment/DeploymentService.scala | 24 ++++- .../server/AkkaHttpBasedRouteProvider.scala | 17 +++- .../test/base/it/NuResourcesTest.scala | 5 +- .../test/utils/domain/TestFactory.scala | 6 +- .../NotificationServiceTest.scala | 4 +- .../deployment/DeploymentServiceSpec.scala | 3 +- .../newdeployment/DeploymentServiceTest.scala | 3 +- .../FlinkProcessCompilerDataFactory.scala | 7 +- ...ubbedFlinkProcessCompilerDataFactory.scala | 6 +- .../TestFlinkProcessCompilerDataFactory.scala | 3 +- ...ationFlinkProcessCompilerDataFactory.scala | 3 +- .../process/runner/FlinkProcessMain.scala | 4 +- .../engine/process/runner/FlinkTestMain.scala | 9 +- .../process/runner/FlinkTestMainSpec.scala | 68 ++++++++++++- ...itionalDictComponentConfigsExtractor.scala | 28 ++++++ .../periodic/PeriodicDeploymentManager.scala | 3 +- .../periodic/PeriodicProcessService.scala | 14 ++- ...nalDictComponentConfigsExtractorTest.scala | 99 +++++++++++++++++++ .../PeriodicDeploymentManagerTest.scala | 3 +- ...eriodicProcessServiceIntegrationTest.scala | 3 +- .../periodic/PeriodicProcessServiceTest.scala | 3 +- .../management/FlinkRestManagerSpec.scala | 3 +- .../engine/deployment/DeploymentData.scala | 14 ++- .../touk/nussknacker/engine/ModelData.scala | 16 ++- ...ompilerDataFactoryWithTestComponents.scala | 7 +- 31 files changed, 348 insertions(+), 48 deletions(-) create mode 100644 engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/AdditionalDictComponentConfigsExtractor.scala create mode 100644 engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/AdditionalDictComponentConfigsExtractorTest.scala diff --git a/common-api/src/main/scala/pl/touk/nussknacker/engine/api/parameter/ParameterName.scala b/common-api/src/main/scala/pl/touk/nussknacker/engine/api/parameter/ParameterName.scala index 72076f2fb67..532d9453763 100644 --- a/common-api/src/main/scala/pl/touk/nussknacker/engine/api/parameter/ParameterName.scala +++ b/common-api/src/main/scala/pl/touk/nussknacker/engine/api/parameter/ParameterName.scala @@ -1,5 +1,16 @@ package pl.touk.nussknacker.engine.api.parameter +import io.circe.generic.extras.semiauto.{deriveUnwrappedDecoder, deriveUnwrappedEncoder} +import io.circe.{Decoder, Encoder, KeyDecoder, KeyEncoder} + final case class ParameterName(value: String) { def withBranchId(branchId: String): ParameterName = ParameterName(s"$value for branch $branchId") } + +object ParameterName { + implicit val encoder: Encoder[ParameterName] = deriveUnwrappedEncoder + implicit val decoder: Decoder[ParameterName] = deriveUnwrappedDecoder + + implicit val keyEncoder: KeyEncoder[ParameterName] = KeyEncoder.encodeKeyString.contramap(_.value) + implicit val keyDecoder: KeyDecoder[ParameterName] = KeyDecoder.decodeKeyString.map(ParameterName(_)) +} diff --git a/components-api/src/main/scala/pl/touk/nussknacker/engine/api/component/AdditionalUIConfigProvider.scala b/components-api/src/main/scala/pl/touk/nussknacker/engine/api/component/AdditionalUIConfigProvider.scala index 7be6b67a99f..6e38258c60f 100644 --- a/components-api/src/main/scala/pl/touk/nussknacker/engine/api/component/AdditionalUIConfigProvider.scala +++ b/components-api/src/main/scala/pl/touk/nussknacker/engine/api/component/AdditionalUIConfigProvider.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.api.component +import io.circe.generic.JsonCodec import pl.touk.nussknacker.engine.api.definition.FixedExpressionValue import pl.touk.nussknacker.engine.api.parameter.{ ParameterName, @@ -24,6 +25,7 @@ object AdditionalUIConfigProvider { val empty = new DefaultAdditionalUIConfigProvider(Map.empty, Map.empty) } +@JsonCodec case class ComponentAdditionalConfig( parameterConfigs: Map[ParameterName, ParameterAdditionalUIConfig], icon: Option[String] = None, @@ -32,6 +34,7 @@ case class ComponentAdditionalConfig( disabled: Boolean = false ) +@JsonCodec case class ParameterAdditionalUIConfig( required: Boolean, initialValue: Option[FixedExpressionValue], diff --git a/components-api/src/main/scala/pl/touk/nussknacker/engine/api/component/DesignerWideComponentId.scala b/components-api/src/main/scala/pl/touk/nussknacker/engine/api/component/DesignerWideComponentId.scala index d6a35fdd628..99b0ddcbec4 100644 --- a/components-api/src/main/scala/pl/touk/nussknacker/engine/api/component/DesignerWideComponentId.scala +++ b/components-api/src/main/scala/pl/touk/nussknacker/engine/api/component/DesignerWideComponentId.scala @@ -1,7 +1,7 @@ package pl.touk.nussknacker.engine.api.component import io.circe.generic.extras.semiauto.{deriveUnwrappedDecoder, deriveUnwrappedEncoder} -import io.circe.{Decoder, Encoder} +import io.circe.{Decoder, Encoder, KeyDecoder, KeyEncoder} // TODO This class is used as a work around for the problem that the components are duplicated across processing types. // We plan to get rid of this. After that, we could replace usages of this class by usage of ComponentId @@ -14,6 +14,10 @@ object DesignerWideComponentId { implicit val encoder: Encoder[DesignerWideComponentId] = deriveUnwrappedEncoder implicit val decoder: Decoder[DesignerWideComponentId] = deriveUnwrappedDecoder + implicit val keyEncoder: KeyEncoder[DesignerWideComponentId] = KeyEncoder.encodeKeyString.contramap(_.value) + implicit val keyDecoder: KeyDecoder[DesignerWideComponentId] = + KeyDecoder.decodeKeyString.map(DesignerWideComponentId(_)) + def apply(value: String): DesignerWideComponentId = new DesignerWideComponentId(value.toLowerCase) def forBuiltInComponent(componentId: ComponentId): DesignerWideComponentId = { diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/DeploymentManagerDependencies.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/DeploymentManagerDependencies.scala index 27f402a0384..bf8fed6e669 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/DeploymentManagerDependencies.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/DeploymentManagerDependencies.scala @@ -6,6 +6,7 @@ import pl.touk.nussknacker.engine.api.deployment.{ ProcessingTypeDeployedScenariosProvider, ScenarioActivityManager } +import pl.touk.nussknacker.engine.api.component.{ComponentAdditionalConfig, DesignerWideComponentId} import sttp.client3.SttpBackend import scala.concurrent.{ExecutionContext, Future} @@ -17,6 +18,7 @@ case class DeploymentManagerDependencies( executionContext: ExecutionContext, actorSystem: ActorSystem, sttpBackend: SttpBackend[Future, Any], + configsFromProvider: Map[DesignerWideComponentId, ComponentAdditionalConfig] = Map.empty ) { implicit def implicitExecutionContext: ExecutionContext = executionContext implicit def implicitActorSystem: ActorSystem = actorSystem diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DefaultProcessingTypeDeployedScenariosProvider.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DefaultProcessingTypeDeployedScenariosProvider.scala index 838787de980..378d0d3d415 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DefaultProcessingTypeDeployedScenariosProvider.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DefaultProcessingTypeDeployedScenariosProvider.scala @@ -51,7 +51,8 @@ class DefaultProcessingTypeDeployedScenariosProvider( DeploymentId.fromActionId(lastDeployAction.id), deployingUser, Map.empty, - NodesDeploymentData.empty + NodesDeploymentData.empty, + Map.empty ) val deployedScenarioDataTry = scenarioResolver.resolveScenario(details.json).map { resolvedScenario => diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentService.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentService.scala index 5ae7f7bab7b..815c46b6384 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentService.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentService.scala @@ -8,7 +8,11 @@ import cats.syntax.functor._ import com.typesafe.scalalogging.LazyLogging import db.util.DBIOActionInstances._ import pl.touk.nussknacker.engine.api.Comment -import pl.touk.nussknacker.engine.api.component.NodesDeploymentData +import pl.touk.nussknacker.engine.api.component.{ + ComponentAdditionalConfig, + DesignerWideComponentId, + NodesDeploymentData +} import pl.touk.nussknacker.engine.api.deployment.ScenarioActionName.{Cancel, Deploy} import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus @@ -16,6 +20,7 @@ import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefin import pl.touk.nussknacker.engine.api.process._ import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment._ +import pl.touk.nussknacker.engine.management.periodic.AdditionalDictComponentConfigsExtractor import pl.touk.nussknacker.restmodel.scenariodetails.ScenarioWithDetails import pl.touk.nussknacker.ui.api.{DeploymentCommentSettings, ListenerApiUser} import pl.touk.nussknacker.ui.listener.ProcessChangeEvent.{OnActionExecutionFinished, OnActionFailed, OnActionSuccess} @@ -55,6 +60,10 @@ class DeploymentService( processChangeListener: ProcessChangeListener, scenarioStateTimeout: Option[FiniteDuration], deploymentCommentSettings: Option[DeploymentCommentSettings], + additionalComponentConfigs: ProcessingTypeDataProvider[ + Map[DesignerWideComponentId, ComponentAdditionalConfig], + _ + ], clock: Clock = Clock.systemUTC() )(implicit system: ActorSystem) extends ActionService @@ -324,7 +333,10 @@ class DeploymentService( DeploymentId.fromActionId(actionId), user.toManagerUser, additionalDeploymentData, - nodesDeploymentData + nodesDeploymentData, + AdditionalDictComponentConfigsExtractor.getAdditionalConfigsWithDictParametersEditors( + additionalComponentConfigs.forProcessingType(processDetails.processingType).getOrElse(Map.empty) + ) ) } yield DeployedScenarioData(processDetails.toEngineProcessVersion, deploymentData, resolvedCanonicalProcess) } diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/newdeployment/DeploymentService.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/newdeployment/DeploymentService.scala index 55a48256f50..6621e5cac88 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/newdeployment/DeploymentService.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/newdeployment/DeploymentService.scala @@ -4,11 +4,16 @@ import cats.Applicative import cats.data.{EitherT, NonEmptyList} import com.typesafe.scalalogging.LazyLogging import db.util.DBIOActionInstances._ -import pl.touk.nussknacker.engine.api.component.NodesDeploymentData +import pl.touk.nussknacker.engine.api.component.{ + ComponentAdditionalConfig, + DesignerWideComponentId, + NodesDeploymentData +} import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessName, VersionId} import pl.touk.nussknacker.engine.api.{ProcessVersion => RuntimeVersionData} import pl.touk.nussknacker.engine.deployment.{DeploymentData, DeploymentId => LegacyDeploymentId} +import pl.touk.nussknacker.engine.management.periodic.AdditionalDictComponentConfigsExtractor import pl.touk.nussknacker.engine.newdeployment.DeploymentId import pl.touk.nussknacker.restmodel.validation.ValidationResults.ValidationErrors import pl.touk.nussknacker.security.Permission @@ -19,6 +24,7 @@ import pl.touk.nussknacker.ui.process.deployment.DeploymentManagerDispatcher import pl.touk.nussknacker.ui.process.deployment.LoggedUserConversions.LoggedUserOps import pl.touk.nussknacker.ui.process.newdeployment.DeploymentEntityFactory.{DeploymentEntityData, WithModifiedAt} import pl.touk.nussknacker.ui.process.newdeployment.DeploymentService._ +import pl.touk.nussknacker.ui.process.processingtype.provider.ProcessingTypeDataProvider import pl.touk.nussknacker.ui.process.repository.{DBIOActionRunner, ScenarioMetadataRepository} import pl.touk.nussknacker.ui.process.version.ScenarioGraphVersionService import pl.touk.nussknacker.ui.security.api.LoggedUser @@ -42,7 +48,11 @@ class DeploymentService( deploymentRepository: DeploymentRepository, dmDispatcher: DeploymentManagerDispatcher, dbioRunner: DBIOActionRunner, - clock: Clock + clock: Clock, + additionalComponentConfigs: ProcessingTypeDataProvider[ + Map[DesignerWideComponentId, ComponentAdditionalConfig], + _ + ], )(implicit ec: ExecutionContext) extends LazyLogging { @@ -156,7 +166,10 @@ class DeploymentService( LegacyDeploymentId(""), user.toManagerUser, Map.empty, - NodesDeploymentData.empty + NodesDeploymentData.empty, + AdditionalDictComponentConfigsExtractor.getAdditionalConfigsWithDictParametersEditors( + additionalComponentConfigs.forProcessingType(scenarioMetadata.processingType)(user).getOrElse(Map.empty) + ) ) for { result <- EitherT[Future, RunDeploymentError, Unit]( @@ -189,7 +202,10 @@ class DeploymentService( toLegacyDeploymentId(command.id), command.user.toManagerUser, additionalDeploymentData = Map.empty, - command.nodesDeploymentData + command.nodesDeploymentData, + AdditionalDictComponentConfigsExtractor.getAdditionalConfigsWithDictParametersEditors( + additionalComponentConfigs.forProcessingType(scenarioMetadata.processingType)(command.user).getOrElse(Map.empty) + ) ) dmDispatcher .deploymentManagerUnsafe(scenarioMetadata.processingType)(command.user) diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/server/AkkaHttpBasedRouteProvider.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/server/AkkaHttpBasedRouteProvider.scala index de65f02d053..6b9a0ce5e3f 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/server/AkkaHttpBasedRouteProvider.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/server/AkkaHttpBasedRouteProvider.scala @@ -15,6 +15,7 @@ import pl.touk.nussknacker.engine.compile.ProcessValidator import pl.touk.nussknacker.engine.definition.component.Components.ComponentDefinitionExtractionMode import pl.touk.nussknacker.engine.definition.test.ModelDataTestInfoProvider import pl.touk.nussknacker.engine.dict.ProcessDictSubstitutor +import pl.touk.nussknacker.engine.management.periodic.AdditionalDictComponentConfigsExtractor import pl.touk.nussknacker.engine.util.loader.ScalaServiceLoader import pl.touk.nussknacker.engine.util.multiplicity.{Empty, Many, Multiplicity, One} import pl.touk.nussknacker.engine.{ConfigWithUnresolvedVersion, DeploymentManagerDependencies, ModelDependencies} @@ -232,6 +233,10 @@ class AkkaHttpBasedRouteProvider( futureProcessRepository ) + val additionalComponentConfigs = processingTypeDataProvider.mapValues { processingTypeData => + processingTypeData.designerModelData.modelData.additionalConfigsFromProvider + } + val legacyDeploymentService = new LegacyDeploymentService( dmDispatcher, processRepository, @@ -241,7 +246,8 @@ class AkkaHttpBasedRouteProvider( scenarioResolver, processChangeListener, featureTogglesConfig.scenarioStateTimeout, - featureTogglesConfig.deploymentCommentSettings + featureTogglesConfig.deploymentCommentSettings, + additionalComponentConfigs ) legacyDeploymentService.invalidateInProgressActions() @@ -438,7 +444,8 @@ class AkkaHttpBasedRouteProvider( deploymentRepository, dmDispatcher, dbioRunner, - Clock.systemDefaultZone() + Clock.systemDefaultZone(), + additionalComponentConfigs ) val activityService = new ActivityService( @@ -707,6 +714,7 @@ class AkkaHttpBasedRouteProvider( featureTogglesConfig.componentDefinitionExtractionMode ), getDeploymentManagerDependencies( + additionalUIConfigProvider, actionServiceProvider, scenarioActivityRepository, dbioActionRunner, @@ -722,12 +730,14 @@ class AkkaHttpBasedRouteProvider( } private def getDeploymentManagerDependencies( + additionalUIConfigProvider: AdditionalUIConfigProvider, actionServiceProvider: Supplier[ActionService], scenarioActivityRepository: ScenarioActivityRepository, dbioActionRunner: DBIOActionRunner, sttpBackend: SttpBackend[Future, Any], processingType: ProcessingType )(implicit executionContext: ExecutionContext) = { + val additionalConfigsFromProvider = additionalUIConfigProvider.getAllForProcessingType(processingType) DeploymentManagerDependencies( DefaultProcessingTypeDeployedScenariosProvider(dbRef, processingType), new DefaultProcessingTypeActionService( @@ -740,7 +750,8 @@ class AkkaHttpBasedRouteProvider( ), system.dispatcher, system, - sttpBackend + sttpBackend, + additionalConfigsFromProvider ) } diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/test/base/it/NuResourcesTest.scala b/designer/server/src/test/scala/pl/touk/nussknacker/test/base/it/NuResourcesTest.scala index 4e7e42fc74d..cde22f36654 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/test/base/it/NuResourcesTest.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/test/base/it/NuResourcesTest.scala @@ -18,13 +18,11 @@ import org.scalatest.matchers.should.Matchers import org.scalatest.{Assertion, BeforeAndAfterEach, OptionValues, Suite} import pl.touk.nussknacker.engine._ import pl.touk.nussknacker.engine.api.CirceUtil.humanReadablePrinter -import pl.touk.nussknacker.engine.api.Comment import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.graph.ScenarioGraph import pl.touk.nussknacker.engine.api.process.VersionId.initialVersionId import pl.touk.nussknacker.engine.api.process._ import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess -import pl.touk.nussknacker.engine.definition.component.Components.ComponentDefinitionExtractionMode import pl.touk.nussknacker.engine.definition.test.{ModelDataTestInfoProvider, TestInfoProvider} import pl.touk.nussknacker.restmodel.CustomActionRequest import pl.touk.nussknacker.restmodel.scenariodetails.ScenarioWithDetails @@ -120,7 +118,8 @@ trait NuResourcesTest scenarioResolverByProcessingType, processChangeListener, None, - deploymentCommentSettings + deploymentCommentSettings, + mapProcessingTypeDataProvider() ) protected val processingTypeConfig: ProcessingTypeConfig = diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/test/utils/domain/TestFactory.scala b/designer/server/src/test/scala/pl/touk/nussknacker/test/utils/domain/TestFactory.scala index 6cb0419e829..c95746e8536 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/test/utils/domain/TestFactory.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/test/utils/domain/TestFactory.scala @@ -6,7 +6,7 @@ import cats.effect.unsafe.IORuntime import cats.instances.future._ import com.typesafe.config.ConfigFactory import db.util.DBIOActionInstances._ -import pl.touk.nussknacker.engine.api.component.{DesignerWideComponentId, ProcessingMode} +import pl.touk.nussknacker.engine.api.component.{ComponentAdditionalConfig, DesignerWideComponentId, ProcessingMode} import pl.touk.nussknacker.engine.api.definition.FixedExpressionValue import pl.touk.nussknacker.engine.api.deployment.{ NoOpScenarioActivityManager, @@ -120,6 +120,10 @@ object TestFactory { Streaming.stringify -> new ScenarioResolver(sampleResolver, Streaming.stringify) ) + def additionalComponentConfigsByProcessingType + : ProcessingTypeDataProvider[Map[DesignerWideComponentId, ComponentAdditionalConfig], _] = + mapProcessingTypeDataProvider() + val modelDependencies: ModelDependencies = ModelDependencies( TestAdditionalUIConfigProvider.componentAdditionalConfigMap, diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/notifications/NotificationServiceTest.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/notifications/NotificationServiceTest.scala index 931bcdf91c3..0433d1314bd 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/notifications/NotificationServiceTest.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/notifications/NotificationServiceTest.scala @@ -195,6 +195,7 @@ class NotificationServiceTest mock[ProcessChangeListener], None, None, + TestFactory.additionalComponentConfigsByProcessingType, clock ) { override protected def validateBeforeDeploy( @@ -216,7 +217,8 @@ class NotificationServiceTest DeploymentId.fromActionId(actionId), user.toManagerUser, additionalDeploymentData, - nodesDeploymentData + nodesDeploymentData, + Map.empty ), processDetails.json ) diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentServiceSpec.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentServiceSpec.scala index 88c0bb1cab4..fe926036f87 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentServiceSpec.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentServiceSpec.scala @@ -114,7 +114,8 @@ class DeploymentServiceSpec TestFactory.scenarioResolverByProcessingType, listener, scenarioStateTimeout, - deploymentCommentSettings + deploymentCommentSettings, + additionalComponentConfigsByProcessingType ) } diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/newdeployment/DeploymentServiceTest.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/newdeployment/DeploymentServiceTest.scala index e8259434b44..5b4b8d8bae9 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/newdeployment/DeploymentServiceTest.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/newdeployment/DeploymentServiceTest.scala @@ -53,7 +53,8 @@ class DeploymentServiceTest TestFactory.newFutureFetchingScenarioRepository(testDbRef) ), dbioRunner, - clock + clock, + TestFactory.additionalComponentConfigsByProcessingType ) } diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkProcessCompilerDataFactory.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkProcessCompilerDataFactory.scala index f2b0214f758..dd56b848c8f 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkProcessCompilerDataFactory.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkProcessCompilerDataFactory.scala @@ -2,7 +2,7 @@ package pl.touk.nussknacker.engine.process.compiler import com.typesafe.config.Config import pl.touk.nussknacker.engine.ModelData.ExtractDefinitionFun -import pl.touk.nussknacker.engine.api.component.DesignerWideComponentId +import pl.touk.nussknacker.engine.api.component.{ComponentAdditionalConfig, DesignerWideComponentId} import pl.touk.nussknacker.engine.api.dict.EngineDictRegistry import pl.touk.nussknacker.engine.api.namespaces.NamingStrategy import pl.touk.nussknacker.engine.api.process.{ComponentUseCase, ProcessConfigCreator, ProcessObjectDependencies} @@ -36,6 +36,7 @@ class FlinkProcessCompilerDataFactory( modelConfig: Config, namingStrategy: NamingStrategy, componentUseCase: ComponentUseCase, + configsFromProviderWithDictionaryEditor: Map[DesignerWideComponentId, ComponentAdditionalConfig] ) extends Serializable { import net.ceedubs.ficus.Ficus._ @@ -47,6 +48,7 @@ class FlinkProcessCompilerDataFactory( modelData.modelConfig, modelData.namingStrategy, componentUseCase = ComponentUseCase.EngineRuntime, + modelData.additionalConfigsFromProvider ) def prepareCompilerData( @@ -119,12 +121,11 @@ class FlinkProcessCompilerDataFactory( ): (ModelDefinitionWithClasses, EngineDictRegistry) = { val dictRegistryFactory = loadDictRegistry(userCodeClassLoader) val modelDefinitionWithTypes = ModelDefinitionWithClasses( - // additionalConfigsFromProvider aren't provided, as it's not needed to run the process on flink extractModelDefinition( userCodeClassLoader, modelDependencies, id => DesignerWideComponentId(id.toString), - Map.empty + configsFromProviderWithDictionaryEditor ) ) val dictRegistry = dictRegistryFactory.createEngineDictRegistry( diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/StubbedFlinkProcessCompilerDataFactory.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/StubbedFlinkProcessCompilerDataFactory.scala index 04e6f402438..6cd907bc57c 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/StubbedFlinkProcessCompilerDataFactory.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/StubbedFlinkProcessCompilerDataFactory.scala @@ -3,7 +3,7 @@ package pl.touk.nussknacker.engine.process.compiler import com.typesafe.config.Config import pl.touk.nussknacker.engine.ModelData.ExtractDefinitionFun import pl.touk.nussknacker.engine.api.{NodeId, Params} -import pl.touk.nussknacker.engine.api.component.ComponentType +import pl.touk.nussknacker.engine.api.component.{ComponentAdditionalConfig, ComponentType, DesignerWideComponentId} import pl.touk.nussknacker.engine.api.context.ContextTransformation import pl.touk.nussknacker.engine.api.namespaces.NamingStrategy import pl.touk.nussknacker.engine.api.process.{ComponentUseCase, ProcessConfigCreator} @@ -29,13 +29,15 @@ abstract class StubbedFlinkProcessCompilerDataFactory( extractModelDefinition: ExtractDefinitionFun, modelConfig: Config, namingStrategy: NamingStrategy, - componentUseCase: ComponentUseCase + componentUseCase: ComponentUseCase, + configsFromProviderWithDictionaryEditor: Map[DesignerWideComponentId, ComponentAdditionalConfig] ) extends FlinkProcessCompilerDataFactory( creator, extractModelDefinition, modelConfig, namingStrategy, componentUseCase, + configsFromProviderWithDictionaryEditor ) { override protected def adjustDefinitions( diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/TestFlinkProcessCompilerDataFactory.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/TestFlinkProcessCompilerDataFactory.scala index fa82e64d0b8..c2697567e2a 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/TestFlinkProcessCompilerDataFactory.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/TestFlinkProcessCompilerDataFactory.scala @@ -35,7 +35,8 @@ object TestFlinkProcessCompilerDataFactory { modelData.extractModelDefinitionFun, modelData.modelConfig, modelData.namingStrategy, - ComponentUseCase.TestRuntime + ComponentUseCase.TestRuntime, + modelData.additionalConfigsFromProvider ) { override protected def adjustListeners( diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/VerificationFlinkProcessCompilerDataFactory.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/VerificationFlinkProcessCompilerDataFactory.scala index ae29cccc21a..5f5b4e50151 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/VerificationFlinkProcessCompilerDataFactory.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/VerificationFlinkProcessCompilerDataFactory.scala @@ -17,7 +17,8 @@ object VerificationFlinkProcessCompilerDataFactory { modelData.extractModelDefinitionFun, modelData.modelConfig, modelData.namingStrategy, - componentUseCase = ComponentUseCase.Validation + componentUseCase = ComponentUseCase.Validation, + modelData.additionalConfigsFromProvider ) { override protected def adjustListeners( diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkProcessMain.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkProcessMain.scala index f2ade445799..0dd6ba65b47 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkProcessMain.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkProcessMain.scala @@ -27,8 +27,8 @@ trait FlinkProcessMain[Env] extends FlinkRunner with LazyLogging { s"Model version ${processVersion.modelVersion}. Deploying user [id=${deploymentData.user.id}, name=${deploymentData.user.name}]" ) val config: Config = readConfigFromArgs(args) - val modelData = ModelData.duringFlinkExecution(config) - val env = getExecutionEnvironment + val modelData = ModelData.duringFlinkExecution(config, deploymentData.configsFromProviderWithDictionaryEditor) + val env = getExecutionEnvironment runProcess( env, modelData, diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala index bdbce7f3d83..c7cc037196c 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala @@ -32,7 +32,14 @@ object FlinkTestMain extends FlinkRunner { val processVersion = ProcessVersion.empty.copy(processName = ProcessName("snapshot version") ) // testing process may be unreleased, so it has no version - new FlinkTestMain(modelData, process, scenarioTestData, processVersion, DeploymentData.empty, configuration).runTest + new FlinkTestMain( + modelData, + process, + scenarioTestData, + processVersion, + DeploymentData.empty.copy(configsFromProviderWithDictionaryEditor = modelData.additionalConfigsFromProvider), + configuration + ).runTest } } diff --git a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala index dea96b429cc..8d378cb6a97 100644 --- a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala +++ b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala @@ -21,6 +21,13 @@ import pl.touk.nussknacker.engine.process.helpers.SampleNodes._ import pl.touk.nussknacker.engine.testmode.TestProcess._ import pl.touk.nussknacker.engine.util.ThreadUtils import pl.touk.nussknacker.engine.ModelData +import pl.touk.nussknacker.engine.api.component.{ + ComponentAdditionalConfig, + DesignerWideComponentId, + ParameterAdditionalUIConfig +} +import pl.touk.nussknacker.engine.api.parameter.{ParameterName, ValueInputWithDictEditor} +import pl.touk.nussknacker.engine.graph.expression.Expression import java.util.{Date, UUID} import scala.concurrent.ExecutionContext.Implicits.global @@ -654,6 +661,62 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor variable(List(ComponentUseCase.TestRuntime, ComponentUseCase.TestRuntime)) ) } + + "should throw exception when parameter was modified by AdditionalUiConfigProvider with dict editor and flink wasn't provided with additional config" in { + val process = + ScenarioBuilder + .streaming(scenarioName) + .source(sourceNodeId, "input") + .processor( + "eager1", + "collectingEager", + "static" -> Expression.dictKeyWithLabel("'s'", Some("s")), + "dynamic" -> "#input.id".spel + ) + .emptySink("out", "valueMonitor", "Value" -> "#input.value1".spel) + + val run = Future { + runFlinkTest(process, ScenarioTestData(List(createTestRecord(id = "2", value1 = 2))), useIOMonadInInterpreter) + } + val dictEditorException = intercept[IllegalStateException](Await.result(run, 10 seconds)) + dictEditorException.getMessage shouldBe "DictKeyWithLabel expression can only be used with DictParameterEditor, got Some(DualParameterEditor(StringParameterEditor,RAW))" + } + + "should run correctly when parameter was modified by AdditionalUiConfigProvider with dict editor and flink was provided with additional config" in { + val modifiedComponentName = "collectingEager" + val modifiedParameterName = "static" + val process = + ScenarioBuilder + .streaming(scenarioName) + .source(sourceNodeId, "input") + .processor( + "eager1", + modifiedComponentName, + modifiedParameterName -> Expression.dictKeyWithLabel("'s'", Some("s")), + "dynamic" -> "#input.id".spel + ) + .emptySink("out", "valueMonitor", "Value" -> "#input.value1".spel) + + val results = runFlinkTest( + process, + ScenarioTestData(List(createTestRecord(id = "2", value1 = 2))), + useIOMonadInInterpreter, + additionalConfigsFromProvider = Map( + DesignerWideComponentId("service-" + modifiedComponentName) -> ComponentAdditionalConfig( + parameterConfigs = Map( + ParameterName(modifiedParameterName) -> ParameterAdditionalUIConfig( + required = false, + initialValue = None, + hintText = None, + valueEditor = Some(ValueInputWithDictEditor("someDictId", allowOtherValue = false)), + valueCompileTimeValidation = None + ) + ) + ) + ) + ) + results.exceptions should have length 0 + } } private def createTestRecord( @@ -667,13 +730,14 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor process: CanonicalProcess, scenarioTestData: ScenarioTestData, useIOMonadInInterpreter: Boolean, - enrichDefaultConfig: Config => Config = identity + enrichDefaultConfig: Config => Config = identity, + additionalConfigsFromProvider: Map[DesignerWideComponentId, ComponentAdditionalConfig] = Map.empty ): TestResults[_] = { val config = enrichDefaultConfig(ConfigFactory.load("application.conf")) .withValue("globalParameters.useIOMonadInInterpreter", ConfigValueFactory.fromAnyRef(useIOMonadInInterpreter)) // We need to set context loader to avoid forking in sbt - val modelData = ModelData.duringFlinkExecution(config) + val modelData = ModelData.duringFlinkExecution(config, additionalConfigsFromProvider) ThreadUtils.withThisAsContextClassLoader(getClass.getClassLoader) { FlinkTestMain.run(modelData, process, scenarioTestData, FlinkTestConfiguration.configuration()) } diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/AdditionalDictComponentConfigsExtractor.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/AdditionalDictComponentConfigsExtractor.scala new file mode 100644 index 00000000000..2fa5b7b78e1 --- /dev/null +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/AdditionalDictComponentConfigsExtractor.scala @@ -0,0 +1,28 @@ +package pl.touk.nussknacker.engine.management.periodic + +import pl.touk.nussknacker.engine.api.component.{ComponentAdditionalConfig, DesignerWideComponentId} +import pl.touk.nussknacker.engine.api.parameter.ValueInputWithDictEditor + +object AdditionalDictComponentConfigsExtractor { + + // This function filters additional configs provided by AdditionalUIConfigProvider + // to include only component and parameter configs with Dictionary editors. + // This is done to reduce data sent to Flink as only configs regarding this editor are required to be known during execution. + def getAdditionalConfigsWithDictParametersEditors( + additionalComponentConfigs: Map[DesignerWideComponentId, ComponentAdditionalConfig] + ): Map[DesignerWideComponentId, ComponentAdditionalConfig] = { + additionalComponentConfigs + .map { case (componentId, componentAdditionalConfig) => + val parametersWithDictEditors = componentAdditionalConfig.parameterConfigs.filter { + case (_, additionalUiConfig) => + additionalUiConfig.valueEditor match { + case Some(_: ValueInputWithDictEditor) => true + case _ => false + } + } + componentId -> componentAdditionalConfig.copy(parameterConfigs = parametersWithDictEditors) + } + .filter(_._2.parameterConfigs.nonEmpty) + } + +} diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManager.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManager.scala index 02e47fe9eaa..dae199ecf46 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManager.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManager.scala @@ -58,7 +58,8 @@ object PeriodicDeploymentManager { periodicBatchConfig.executionConfig, processConfigEnricher, clock, - dependencies.actionService + dependencies.actionService, + dependencies.configsFromProvider ) // These actors have to be created with retries because they can initially fail to create due to taken names, diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala index ebec77566a5..a2bc79a9fda 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala @@ -3,7 +3,11 @@ package pl.touk.nussknacker.engine.management.periodic import cats.implicits._ import com.typesafe.scalalogging.LazyLogging import pl.touk.nussknacker.engine.api.ProcessVersion -import pl.touk.nussknacker.engine.api.component.NodesDeploymentData +import pl.touk.nussknacker.engine.api.component.{ + ComponentAdditionalConfig, + DesignerWideComponentId, + NodesDeploymentData +} import pl.touk.nussknacker.engine.api.deployment.StateStatus.StatusName import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus @@ -40,7 +44,8 @@ class PeriodicProcessService( executionConfig: PeriodicExecutionConfig, processConfigEnricher: ProcessConfigEnricher, clock: Clock, - actionService: ProcessingTypeActionService + actionService: ProcessingTypeActionService, + configsFromProvider: Map[DesignerWideComponentId, ComponentAdditionalConfig] )(implicit ec: ExecutionContext) extends LazyLogging { @@ -401,7 +406,10 @@ class PeriodicProcessService( DeploymentData.systemUser, additionalDeploymentDataProvider.prepareAdditionalData(deployment), // TODO: in the future we could allow users to specify nodes data during schedule requesting - NodesDeploymentData.empty + NodesDeploymentData.empty, + AdditionalDictComponentConfigsExtractor.getAdditionalConfigsWithDictParametersEditors( + configsFromProvider + ) ) val deploymentWithJarData = deployment.periodicProcess.deploymentData val deploymentAction = for { diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/AdditionalDictComponentConfigsExtractorTest.scala b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/AdditionalDictComponentConfigsExtractorTest.scala new file mode 100644 index 00000000000..dc653309720 --- /dev/null +++ b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/AdditionalDictComponentConfigsExtractorTest.scala @@ -0,0 +1,99 @@ +package pl.touk.nussknacker.engine.management.periodic + +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers +import pl.touk.nussknacker.engine.api.component.{ + ComponentAdditionalConfig, + ComponentGroupName, + DesignerWideComponentId, + ParameterAdditionalUIConfig +} +import pl.touk.nussknacker.engine.api.definition.FixedExpressionValue +import pl.touk.nussknacker.engine.api.parameter.{ParameterName, ValueInputWithDictEditor} +import pl.touk.nussknacker.engine.management.periodic.AdditionalDictComponentConfigsExtractorTest.{ + componentConfigWithDictionaryEditorInParameter, + componentConfigWithOnlyDictEditorParameters, + componentConfigWithoutDictionaryEditorInParameter +} + +class AdditionalDictComponentConfigsExtractorTest extends AnyFunSuite with Matchers { + + test("should filter only components and parameters with dictionary editors") { + val additionalConfig = Map( + DesignerWideComponentId("componentA") -> componentConfigWithDictionaryEditorInParameter, + DesignerWideComponentId("componentB") -> componentConfigWithoutDictionaryEditorInParameter, + ) + val filteredResult = + AdditionalDictComponentConfigsExtractor.getAdditionalConfigsWithDictParametersEditors(additionalConfig) + + filteredResult shouldBe Map( + DesignerWideComponentId("componentA") -> componentConfigWithOnlyDictEditorParameters + ) + } + +} + +object AdditionalDictComponentConfigsExtractorTest { + + private val parameterAWithDictEditor = ( + ParameterName("parameterA"), + ParameterAdditionalUIConfig( + required = true, + initialValue = Some(FixedExpressionValue("'someInitialValueExpression'", "someInitialValueLabel")), + hintText = None, + valueEditor = Some(ValueInputWithDictEditor("someDictA", allowOtherValue = true)), + valueCompileTimeValidation = None + ) + ) + + private val parameterBWithDictEditor = ( + ParameterName("parameterB"), + ParameterAdditionalUIConfig( + required = false, + initialValue = None, + hintText = Some("someHint"), + valueEditor = Some(ValueInputWithDictEditor("someDictB", allowOtherValue = false)), + valueCompileTimeValidation = None + ) + ) + + private val parameterWithoutDictEditor = ( + ParameterName("parameterC"), + ParameterAdditionalUIConfig( + required = true, + initialValue = None, + hintText = None, + valueEditor = None, + valueCompileTimeValidation = None + ) + ) + + private val componentConfigWithDictionaryEditorInParameter = ComponentAdditionalConfig( + parameterConfigs = Map( + parameterAWithDictEditor, + parameterBWithDictEditor, + parameterWithoutDictEditor + ), + icon = Some("someIcon"), + docsUrl = Some("someDocUrl"), + componentGroup = Some(ComponentGroupName("Service")) + ) + + private val componentConfigWithoutDictionaryEditorInParameter = ComponentAdditionalConfig( + parameterConfigs = Map(parameterWithoutDictEditor), + icon = Some("someOtherIcon"), + docsUrl = Some("someOtherDocUrl"), + componentGroup = Some(ComponentGroupName("Service")) + ) + + private val componentConfigWithOnlyDictEditorParameters = ComponentAdditionalConfig( + parameterConfigs = Map( + parameterAWithDictEditor, + parameterBWithDictEditor + ), + icon = Some("someIcon"), + docsUrl = Some("someDocUrl"), + componentGroup = Some(ComponentGroupName("Service")) + ) + +} diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManagerTest.scala b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManagerTest.scala index 8c0401c3339..37f365c1d4d 100644 --- a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManagerTest.scala +++ b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManagerTest.scala @@ -74,7 +74,8 @@ class PeriodicDeploymentManagerTest executionConfig = executionConfig, processConfigEnricher = ProcessConfigEnricher.identity, clock = Clock.systemDefaultZone(), - new ProcessingTypeActionServiceStub + new ProcessingTypeActionServiceStub, + Map.empty ) val periodicDeploymentManager = new PeriodicDeploymentManager( diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceIntegrationTest.scala b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceIntegrationTest.scala index 8b6bcd8a564..0fd79cd3c06 100644 --- a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceIntegrationTest.scala +++ b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceIntegrationTest.scala @@ -145,7 +145,8 @@ class PeriodicProcessServiceIntegrationTest executionConfig = executionConfig, processConfigEnricher = ProcessConfigEnricher.identity, clock = fixedClock(currentTime), - new ProcessingTypeActionServiceStub + new ProcessingTypeActionServiceStub, + Map.empty ) } diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceTest.scala b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceTest.scala index 89c41213d2f..7da7f65421a 100644 --- a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceTest.scala +++ b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceTest.scala @@ -120,7 +120,8 @@ class PeriodicProcessServiceTest }, Clock.systemDefaultZone(), - actionService + actionService, + Map.empty ) } diff --git a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkRestManagerSpec.scala b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkRestManagerSpec.scala index ea47b28fd3f..3d206c6d4fb 100644 --- a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkRestManagerSpec.scala +++ b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkRestManagerSpec.scala @@ -58,7 +58,8 @@ class FlinkRestManagerSpec extends AnyFunSuite with Matchers with PatientScalaFu DeploymentId(""), User("user1", "User 1"), Map.empty, - NodesDeploymentData.empty + NodesDeploymentData.empty, + Map.empty ) private val returnedJobId = "jobId" diff --git a/extensions-api/src/main/scala/pl/touk/nussknacker/engine/deployment/DeploymentData.scala b/extensions-api/src/main/scala/pl/touk/nussknacker/engine/deployment/DeploymentData.scala index 09fc28ab902..6185d601a68 100644 --- a/extensions-api/src/main/scala/pl/touk/nussknacker/engine/deployment/DeploymentData.scala +++ b/extensions-api/src/main/scala/pl/touk/nussknacker/engine/deployment/DeploymentData.scala @@ -1,13 +1,18 @@ package pl.touk.nussknacker.engine.deployment import io.circe.generic.JsonCodec -import pl.touk.nussknacker.engine.api.component.NodesDeploymentData +import pl.touk.nussknacker.engine.api.component.{ + ComponentAdditionalConfig, + DesignerWideComponentId, + NodesDeploymentData +} @JsonCodec case class DeploymentData( deploymentId: DeploymentId, user: User, additionalDeploymentData: Map[String, String], - nodesData: NodesDeploymentData + nodesData: NodesDeploymentData, + configsFromProviderWithDictionaryEditor: Map[DesignerWideComponentId, ComponentAdditionalConfig] ) object DeploymentData { @@ -15,14 +20,15 @@ object DeploymentData { val systemUser: User = User("system", "system") val empty: DeploymentData = - DeploymentData(DeploymentId(""), systemUser, Map.empty, NodesDeploymentData.empty) + DeploymentData(DeploymentId(""), systemUser, Map.empty, NodesDeploymentData.empty, Map.empty) def withDeploymentId(deploymentIdString: String) = DeploymentData( DeploymentId(deploymentIdString), systemUser, Map.empty, - NodesDeploymentData.empty + NodesDeploymentData.empty, + Map.empty ) } diff --git a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/ModelData.scala b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/ModelData.scala index 3bee3bdfb54..53dc8f62f4a 100644 --- a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/ModelData.scala +++ b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/ModelData.scala @@ -58,8 +58,11 @@ object ModelData extends LazyLogging { // Also a classloader is correct so we don't need to build the new one // This tiny method is Flink specific so probably the interpreter module is not the best one // but it is very convenient to keep in near normal, duringExecution method - def duringFlinkExecution(inputConfig: Config): ModelData = { - duringExecution(inputConfig, ModelClassLoader.empty, resolveConfigs = false) + def duringFlinkExecution( + inputConfig: Config, + additionalConfigsFromProvider: Map[DesignerWideComponentId, ComponentAdditionalConfig] + ): ModelData = { + duringExecution(inputConfig, ModelClassLoader.empty, resolveConfigs = false, additionalConfigsFromProvider) } // On the runtime side, we get only model config, not the whole processing type config, @@ -67,7 +70,12 @@ object ModelData extends LazyLogging { // But it is not a big deal, because scenario was already validated before deploy, so we already check that // we don't use not allowed components for a given category // and that the scenario doesn't violate validators introduced by additionalConfigsFromProvider - def duringExecution(inputConfig: Config, modelClassLoader: ModelClassLoader, resolveConfigs: Boolean): ModelData = { + def duringExecution( + inputConfig: Config, + modelClassLoader: ModelClassLoader, + resolveConfigs: Boolean, + additionalConfigsFromProvider: Map[DesignerWideComponentId, ComponentAdditionalConfig] = Map.empty + ): ModelData = { def resolveInputConfigDuringExecution(modelConfigLoader: ModelConfigLoader): InputConfigDuringExecution = { if (resolveConfigs) { modelConfigLoader.resolveInputConfigDuringExecution( @@ -83,7 +91,7 @@ object ModelData extends LazyLogging { modelClassLoader = modelClassLoader, category = None, determineDesignerWideId = id => DesignerWideComponentId(id.toString), - additionalConfigsFromProvider = Map.empty, + additionalConfigsFromProvider = additionalConfigsFromProvider, shouldIncludeConfigCreator = _ => true, shouldIncludeComponentProvider = _ => true, componentDefinitionExtractionMode = ComponentDefinitionExtractionMode.FinalDefinition diff --git a/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkProcessCompilerDataFactoryWithTestComponents.scala b/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkProcessCompilerDataFactoryWithTestComponents.scala index f5dd8d0c133..15fcd878c7b 100644 --- a/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkProcessCompilerDataFactoryWithTestComponents.scala +++ b/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkProcessCompilerDataFactoryWithTestComponents.scala @@ -5,7 +5,7 @@ import io.circe.Json import pl.touk.nussknacker.engine.ModelData import pl.touk.nussknacker.engine.ModelData.ExtractDefinitionFun import pl.touk.nussknacker.engine.api._ -import pl.touk.nussknacker.engine.api.component.DesignerWideComponentId +import pl.touk.nussknacker.engine.api.component.{ComponentAdditionalConfig, DesignerWideComponentId} import pl.touk.nussknacker.engine.api.namespaces.NamingStrategy import pl.touk.nussknacker.engine.api.process._ import pl.touk.nussknacker.engine.definition.component.ComponentDefinitionWithImplementation @@ -36,7 +36,8 @@ object FlinkProcessCompilerDataFactoryWithTestComponents { modelData.namingStrategy, componentUseCase, testExtensionsHolder, - resultsCollectingListener + resultsCollectingListener, + modelData.additionalConfigsFromProvider ) def apply( @@ -47,6 +48,7 @@ object FlinkProcessCompilerDataFactoryWithTestComponents { componentUseCase: ComponentUseCase, testExtensionsHolder: TestExtensionsHolder, resultsCollectingListener: ResultsCollectingListener[Any], + configsFromProviderWithDictionaryEditor: Map[DesignerWideComponentId, ComponentAdditionalConfig] ): FlinkProcessCompilerDataFactory = { new FlinkProcessCompilerDataFactory( creator, @@ -54,6 +56,7 @@ object FlinkProcessCompilerDataFactoryWithTestComponents { modelConfig, namingStrategy, componentUseCase, + configsFromProviderWithDictionaryEditor ) { override protected def adjustDefinitions( From f2c0d21ba61cad4f33096c6aaafb6412d7f5e5c4 Mon Sep 17 00:00:00 2001 From: Maciej Cichanowicz Date: Tue, 12 Nov 2024 16:20:09 +0100 Subject: [PATCH 2/5] Validate dict editor is not provided by AdditionalUIConfigProvider for Lite --- .../process/runner/FlinkProcessMain.scala | 4 ++-- .../engine/process/runner/FlinkTestMain.scala | 2 +- .../embedded/EmbeddedDeploymentManager.scala | 20 +++++++++++++++++++ .../engine/deployment/DeploymentData.scala | 2 +- 4 files changed, 24 insertions(+), 4 deletions(-) diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkProcessMain.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkProcessMain.scala index 0dd6ba65b47..a5bae81e80b 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkProcessMain.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkProcessMain.scala @@ -27,8 +27,8 @@ trait FlinkProcessMain[Env] extends FlinkRunner with LazyLogging { s"Model version ${processVersion.modelVersion}. Deploying user [id=${deploymentData.user.id}, name=${deploymentData.user.name}]" ) val config: Config = readConfigFromArgs(args) - val modelData = ModelData.duringFlinkExecution(config, deploymentData.configsFromProviderWithDictionaryEditor) - val env = getExecutionEnvironment + val modelData = ModelData.duringFlinkExecution(config, deploymentData.additionalConfigsFromProvider) + val env = getExecutionEnvironment runProcess( env, modelData, diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala index c7cc037196c..78da3969f8b 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala @@ -37,7 +37,7 @@ object FlinkTestMain extends FlinkRunner { process, scenarioTestData, processVersion, - DeploymentData.empty.copy(configsFromProviderWithDictionaryEditor = modelData.additionalConfigsFromProvider), + DeploymentData.empty.copy(additionalConfigsFromProvider = modelData.additionalConfigsFromProvider), configuration ).runTest } diff --git a/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManager.scala b/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManager.scala index 178f5ab5cbf..80de9d12d58 100644 --- a/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManager.scala +++ b/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManager.scala @@ -9,6 +9,7 @@ import pl.touk.nussknacker.engine.api._ import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus +import pl.touk.nussknacker.engine.api.parameter.ValueInputWithDictEditor import pl.touk.nussknacker.engine.api.process.ProcessName import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.{DeploymentData, DeploymentId, ExternalDeploymentId} @@ -98,6 +99,7 @@ class EmbeddedDeploymentManager( case DMRunDeploymentCommand(processVersion, deploymentData, canonicalProcess, updateStrategy) => Future { ensureReplaceDeploymentUpdateStrategy(updateStrategy) + ensureAdditionalConfigsDoNotContainDictionaryEditors(deploymentData) deployScenarioClosingOldIfNeeded( processVersion, deploymentData, @@ -121,6 +123,24 @@ class EmbeddedDeploymentManager( } } + // We make sure that we don't let deploy a scenario when any parameter editor was modified to dictionary one by AdditionalUIConfigProvider + // as that would result in failure during compilation before execution + private def ensureAdditionalConfigsDoNotContainDictionaryEditors(deploymentData: DeploymentData): Unit = { + val configsWithDictEditors = deploymentData.additionalConfigsFromProvider.filter( + _._2.parameterConfigs.exists { case (_, parameterConfig) => + parameterConfig.valueEditor match { + case Some(_: ValueInputWithDictEditor) => true + case _ => false + } + } + ) + if (configsWithDictEditors.nonEmpty) { + throw new IllegalArgumentException( + "Parameter editor modification to ValueInputWithDictEditor by AdditionalUIConfigProvider is not supported for Lite engine" + ) + } + } + private def deployScenarioClosingOldIfNeeded( processVersion: ProcessVersion, deploymentData: DeploymentData, diff --git a/extensions-api/src/main/scala/pl/touk/nussknacker/engine/deployment/DeploymentData.scala b/extensions-api/src/main/scala/pl/touk/nussknacker/engine/deployment/DeploymentData.scala index 6185d601a68..09d9053003e 100644 --- a/extensions-api/src/main/scala/pl/touk/nussknacker/engine/deployment/DeploymentData.scala +++ b/extensions-api/src/main/scala/pl/touk/nussknacker/engine/deployment/DeploymentData.scala @@ -12,7 +12,7 @@ import pl.touk.nussknacker.engine.api.component.{ user: User, additionalDeploymentData: Map[String, String], nodesData: NodesDeploymentData, - configsFromProviderWithDictionaryEditor: Map[DesignerWideComponentId, ComponentAdditionalConfig] + additionalConfigsFromProvider: Map[DesignerWideComponentId, ComponentAdditionalConfig] ) object DeploymentData { From d6b8a2051605970b4577e99455efce470318c79a Mon Sep 17 00:00:00 2001 From: Maciej Cichanowicz Date: Fri, 15 Nov 2024 14:07:37 +0100 Subject: [PATCH 3/5] review comments --- ...lComponentConfigsForRuntimeExtractor.scala | 32 +++++++++++++ ...ponentConfigsForRuntimeExtractorTest.scala | 10 ++--- ...cessingTypeDeployedScenariosProvider.scala | 4 +- .../deployment/DeploymentService.scala | 8 ++-- .../newdeployment/DeploymentService.scala | 45 ++++++++++++------- .../server/AkkaHttpBasedRouteProvider.scala | 1 - .../NotificationServiceTest.scala | 9 +++- .../process/runner/FlinkProcessMain.scala | 4 +- .../engine/process/runner/FlinkTestMain.scala | 6 ++- .../process/runner/FlinkTestMainSpec.scala | 7 ++- ...itionalDictComponentConfigsExtractor.scala | 28 ------------ .../periodic/PeriodicProcessService.scala | 7 +-- .../management/FlinkRestManagerSpec.scala | 10 ++++- .../embedded/EmbeddedDeploymentManager.scala | 13 +++--- .../RunnableScenarioInterpreterFactory.scala | 4 +- .../deployment/AdditionalModelConfigs.scala | 15 +++++++ .../engine/deployment/DeploymentData.scala | 18 ++++---- .../touk/nussknacker/engine/ModelData.scala | 27 ++++++----- 18 files changed, 152 insertions(+), 96 deletions(-) create mode 100644 designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/util/AdditionalComponentConfigsForRuntimeExtractor.scala rename engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/AdditionalDictComponentConfigsExtractorTest.scala => designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/util/AdditionalComponentConfigsForRuntimeExtractorTest.scala (88%) delete mode 100644 engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/AdditionalDictComponentConfigsExtractor.scala create mode 100644 extensions-api/src/main/scala/pl/touk/nussknacker/engine/deployment/AdditionalModelConfigs.scala diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/util/AdditionalComponentConfigsForRuntimeExtractor.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/util/AdditionalComponentConfigsForRuntimeExtractor.scala new file mode 100644 index 00000000000..352a0562c5d --- /dev/null +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/util/AdditionalComponentConfigsForRuntimeExtractor.scala @@ -0,0 +1,32 @@ +package pl.touk.nussknacker.engine.util + +import pl.touk.nussknacker.engine.api.component.{ComponentAdditionalConfig, DesignerWideComponentId} +import pl.touk.nussknacker.engine.api.parameter.ValueInputWithDictEditor + +object AdditionalComponentConfigsForRuntimeExtractor { + + // This is done to reduce data sent to Flink + def getRequiredAdditionalConfigsForRuntime( + additionalComponentConfigs: Map[DesignerWideComponentId, ComponentAdditionalConfig] + ): Map[DesignerWideComponentId, ComponentAdditionalConfig] = { + getAdditionalConfigsWithDictParametersEditors(additionalComponentConfigs) + } + + // This function filters additional configs provided by AdditionalUIConfigProvider + // to include only component and parameter configs with Dictionary editors. + def getAdditionalConfigsWithDictParametersEditors( + additionalComponentConfigs: Map[DesignerWideComponentId, ComponentAdditionalConfig] + ): Map[DesignerWideComponentId, ComponentAdditionalConfig] = additionalComponentConfigs + .map { case (componentId, componentAdditionalConfig) => + val parametersWithDictEditors = componentAdditionalConfig.parameterConfigs.filter { + case (_, additionalUiConfig) => + additionalUiConfig.valueEditor match { + case Some(_: ValueInputWithDictEditor) => true + case _ => false + } + } + componentId -> componentAdditionalConfig.copy(parameterConfigs = parametersWithDictEditors) + } + .filter(_._2.parameterConfigs.nonEmpty) + +} diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/AdditionalDictComponentConfigsExtractorTest.scala b/designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/util/AdditionalComponentConfigsForRuntimeExtractorTest.scala similarity index 88% rename from engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/AdditionalDictComponentConfigsExtractorTest.scala rename to designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/util/AdditionalComponentConfigsForRuntimeExtractorTest.scala index dc653309720..568c72d6304 100644 --- a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/AdditionalDictComponentConfigsExtractorTest.scala +++ b/designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/util/AdditionalComponentConfigsForRuntimeExtractorTest.scala @@ -1,4 +1,4 @@ -package pl.touk.nussknacker.engine.management.periodic +package pl.touk.nussknacker.engine.util import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers @@ -10,13 +10,13 @@ import pl.touk.nussknacker.engine.api.component.{ } import pl.touk.nussknacker.engine.api.definition.FixedExpressionValue import pl.touk.nussknacker.engine.api.parameter.{ParameterName, ValueInputWithDictEditor} -import pl.touk.nussknacker.engine.management.periodic.AdditionalDictComponentConfigsExtractorTest.{ +import pl.touk.nussknacker.engine.util.AdditionalComponentConfigsForRuntimeExtractorTest.{ componentConfigWithDictionaryEditorInParameter, componentConfigWithOnlyDictEditorParameters, componentConfigWithoutDictionaryEditorInParameter } -class AdditionalDictComponentConfigsExtractorTest extends AnyFunSuite with Matchers { +class AdditionalComponentConfigsForRuntimeExtractorTest extends AnyFunSuite with Matchers { test("should filter only components and parameters with dictionary editors") { val additionalConfig = Map( @@ -24,7 +24,7 @@ class AdditionalDictComponentConfigsExtractorTest extends AnyFunSuite with Match DesignerWideComponentId("componentB") -> componentConfigWithoutDictionaryEditorInParameter, ) val filteredResult = - AdditionalDictComponentConfigsExtractor.getAdditionalConfigsWithDictParametersEditors(additionalConfig) + AdditionalComponentConfigsForRuntimeExtractor.getRequiredAdditionalConfigsForRuntime(additionalConfig) filteredResult shouldBe Map( DesignerWideComponentId("componentA") -> componentConfigWithOnlyDictEditorParameters @@ -33,7 +33,7 @@ class AdditionalDictComponentConfigsExtractorTest extends AnyFunSuite with Match } -object AdditionalDictComponentConfigsExtractorTest { +object AdditionalComponentConfigsForRuntimeExtractorTest { private val parameterAWithDictEditor = ( ParameterName("parameterA"), diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DefaultProcessingTypeDeployedScenariosProvider.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DefaultProcessingTypeDeployedScenariosProvider.scala index 378d0d3d415..fd0a3063793 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DefaultProcessingTypeDeployedScenariosProvider.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DefaultProcessingTypeDeployedScenariosProvider.scala @@ -6,7 +6,7 @@ import pl.touk.nussknacker.engine.api.component.NodesDeploymentData import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.process._ import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess -import pl.touk.nussknacker.engine.deployment.{DeploymentData, DeploymentId, User} +import pl.touk.nussknacker.engine.deployment.{AdditionalModelConfigs, DeploymentData, DeploymentId, User} import pl.touk.nussknacker.ui.db.DbRef import pl.touk.nussknacker.ui.process.ScenarioQuery import pl.touk.nussknacker.ui.process.fragment.{DefaultFragmentRepository, FragmentResolver} @@ -52,7 +52,7 @@ class DefaultProcessingTypeDeployedScenariosProvider( deployingUser, Map.empty, NodesDeploymentData.empty, - Map.empty + AdditionalModelConfigs.empty ) val deployedScenarioDataTry = scenarioResolver.resolveScenario(details.json).map { resolvedScenario => diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentService.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentService.scala index 815c46b6384..c3b1fdc7ef7 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentService.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentService.scala @@ -20,7 +20,7 @@ import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefin import pl.touk.nussknacker.engine.api.process._ import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment._ -import pl.touk.nussknacker.engine.management.periodic.AdditionalDictComponentConfigsExtractor +import pl.touk.nussknacker.engine.util.AdditionalComponentConfigsForRuntimeExtractor import pl.touk.nussknacker.restmodel.scenariodetails.ScenarioWithDetails import pl.touk.nussknacker.ui.api.{DeploymentCommentSettings, ListenerApiUser} import pl.touk.nussknacker.ui.listener.ProcessChangeEvent.{OnActionExecutionFinished, OnActionFailed, OnActionSuccess} @@ -334,8 +334,10 @@ class DeploymentService( user.toManagerUser, additionalDeploymentData, nodesDeploymentData, - AdditionalDictComponentConfigsExtractor.getAdditionalConfigsWithDictParametersEditors( - additionalComponentConfigs.forProcessingType(processDetails.processingType).getOrElse(Map.empty) + AdditionalModelConfigs( + AdditionalComponentConfigsForRuntimeExtractor.getRequiredAdditionalConfigsForRuntime( + additionalComponentConfigs.forProcessingType(processDetails.processingType).getOrElse(Map.empty) + ) ) ) } yield DeployedScenarioData(processDetails.toEngineProcessVersion, deploymentData, resolvedCanonicalProcess) diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/newdeployment/DeploymentService.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/newdeployment/DeploymentService.scala index 6621e5cac88..fbab402bfa7 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/newdeployment/DeploymentService.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/newdeployment/DeploymentService.scala @@ -10,11 +10,15 @@ import pl.touk.nussknacker.engine.api.component.{ NodesDeploymentData } import pl.touk.nussknacker.engine.api.deployment._ -import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessName, VersionId} +import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessName, ProcessingType, VersionId} import pl.touk.nussknacker.engine.api.{ProcessVersion => RuntimeVersionData} -import pl.touk.nussknacker.engine.deployment.{DeploymentData, DeploymentId => LegacyDeploymentId} -import pl.touk.nussknacker.engine.management.periodic.AdditionalDictComponentConfigsExtractor +import pl.touk.nussknacker.engine.deployment.{ + AdditionalModelConfigs, + DeploymentData, + DeploymentId => LegacyDeploymentId +} import pl.touk.nussknacker.engine.newdeployment.DeploymentId +import pl.touk.nussknacker.engine.util.AdditionalComponentConfigsForRuntimeExtractor import pl.touk.nussknacker.restmodel.validation.ValidationResults.ValidationErrors import pl.touk.nussknacker.security.Permission import pl.touk.nussknacker.security.Permission.Permission @@ -162,14 +166,11 @@ class DeploymentService( ): EitherT[Future, RunDeploymentError, Unit] = { val runtimeVersionData = processVersionFor(scenarioMetadata, scenarioGraphVersion) // TODO: It shouldn't be needed - val dumbDeploymentData = DeploymentData( + val dumbDeploymentData = createDeploymentData( LegacyDeploymentId(""), - user.toManagerUser, - Map.empty, + user, NodesDeploymentData.empty, - AdditionalDictComponentConfigsExtractor.getAdditionalConfigsWithDictParametersEditors( - additionalComponentConfigs.forProcessingType(scenarioMetadata.processingType)(user).getOrElse(Map.empty) - ) + scenarioMetadata.processingType ) for { result <- EitherT[Future, RunDeploymentError, Unit]( @@ -198,14 +199,11 @@ class DeploymentService( command: RunDeploymentCommand ): EitherT[Future, RunDeploymentError, Unit] = { val runtimeVersionData = processVersionFor(scenarioMetadata, scenarioGraphVersion) - val deploymentData = DeploymentData( + val deploymentData = createDeploymentData( toLegacyDeploymentId(command.id), - command.user.toManagerUser, - additionalDeploymentData = Map.empty, + command.user, command.nodesDeploymentData, - AdditionalDictComponentConfigsExtractor.getAdditionalConfigsWithDictParametersEditors( - additionalComponentConfigs.forProcessingType(scenarioMetadata.processingType)(command.user).getOrElse(Map.empty) - ) + scenarioMetadata.processingType ) dmDispatcher .deploymentManagerUnsafe(scenarioMetadata.processingType)(command.user) @@ -227,6 +225,23 @@ class DeploymentService( EitherT.pure(()) } + private def createDeploymentData( + deploymentId: LegacyDeploymentId, + loggedUser: LoggedUser, + nodesData: NodesDeploymentData, + processingType: ProcessingType + ) = DeploymentData( + deploymentId = deploymentId, + user = loggedUser.toManagerUser, + additionalDeploymentData = Map.empty, + nodesData = nodesData, + additionalModelConfigs = AdditionalModelConfigs( + AdditionalComponentConfigsForRuntimeExtractor.getRequiredAdditionalConfigsForRuntime( + additionalComponentConfigs.forProcessingType(processingType)(loggedUser).getOrElse(Map.empty) + ) + ) + ) + private def handleFailureDuringDeploymentRequesting( deploymentId: DeploymentId, ex: Throwable diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/server/AkkaHttpBasedRouteProvider.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/server/AkkaHttpBasedRouteProvider.scala index 6b9a0ce5e3f..822727dc6d4 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/server/AkkaHttpBasedRouteProvider.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/server/AkkaHttpBasedRouteProvider.scala @@ -15,7 +15,6 @@ import pl.touk.nussknacker.engine.compile.ProcessValidator import pl.touk.nussknacker.engine.definition.component.Components.ComponentDefinitionExtractionMode import pl.touk.nussknacker.engine.definition.test.ModelDataTestInfoProvider import pl.touk.nussknacker.engine.dict.ProcessDictSubstitutor -import pl.touk.nussknacker.engine.management.periodic.AdditionalDictComponentConfigsExtractor import pl.touk.nussknacker.engine.util.loader.ScalaServiceLoader import pl.touk.nussknacker.engine.util.multiplicity.{Empty, Many, Multiplicity, One} import pl.touk.nussknacker.engine.{ConfigWithUnresolvedVersion, DeploymentManagerDependencies, ModelDependencies} diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/notifications/NotificationServiceTest.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/notifications/NotificationServiceTest.scala index 0433d1314bd..0c01b8408aa 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/notifications/NotificationServiceTest.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/notifications/NotificationServiceTest.scala @@ -14,7 +14,12 @@ import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefin import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName} import pl.touk.nussknacker.engine.build.ScenarioBuilder import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess -import pl.touk.nussknacker.engine.deployment.{DeploymentData, DeploymentId, ExternalDeploymentId} +import pl.touk.nussknacker.engine.deployment.{ + AdditionalModelConfigs, + DeploymentData, + DeploymentId, + ExternalDeploymentId +} import pl.touk.nussknacker.test.base.db.WithHsqlDbTesting import pl.touk.nussknacker.test.utils.domain.{ProcessTestData, TestFactory} import pl.touk.nussknacker.test.utils.scalas.DBIOActionValues @@ -218,7 +223,7 @@ class NotificationServiceTest user.toManagerUser, additionalDeploymentData, nodesDeploymentData, - Map.empty + AdditionalModelConfigs.empty ), processDetails.json ) diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkProcessMain.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkProcessMain.scala index a5bae81e80b..da869ea2d97 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkProcessMain.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkProcessMain.scala @@ -4,7 +4,7 @@ import java.io.File import com.typesafe.config.{Config, ConfigFactory} import com.typesafe.scalalogging.LazyLogging import org.apache.flink.api.common.ExecutionConfig -import pl.touk.nussknacker.engine.ModelData +import pl.touk.nussknacker.engine.{ModelConfigs, ModelData} import pl.touk.nussknacker.engine.api.{CirceUtil, ProcessVersion} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.DeploymentData @@ -27,7 +27,7 @@ trait FlinkProcessMain[Env] extends FlinkRunner with LazyLogging { s"Model version ${processVersion.modelVersion}. Deploying user [id=${deploymentData.user.id}, name=${deploymentData.user.name}]" ) val config: Config = readConfigFromArgs(args) - val modelData = ModelData.duringFlinkExecution(config, deploymentData.additionalConfigsFromProvider) + val modelData = ModelData.duringFlinkExecution(ModelConfigs(config, deploymentData.additionalModelConfigs)) val env = getExecutionEnvironment runProcess( env, diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala index 78da3969f8b..62e2efce685 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala @@ -8,7 +8,7 @@ import pl.touk.nussknacker.engine.api.{JobData, ProcessVersion} import pl.touk.nussknacker.engine.api.process.ProcessName import pl.touk.nussknacker.engine.api.test.ScenarioTestData import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess -import pl.touk.nussknacker.engine.deployment.DeploymentData +import pl.touk.nussknacker.engine.deployment.{AdditionalModelConfigs, DeploymentData} import pl.touk.nussknacker.engine.process.compiler.TestFlinkProcessCompilerDataFactory import pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar import pl.touk.nussknacker.engine.process.{ExecutionConfigPreparer, FlinkJobConfig} @@ -37,7 +37,9 @@ object FlinkTestMain extends FlinkRunner { process, scenarioTestData, processVersion, - DeploymentData.empty.copy(additionalConfigsFromProvider = modelData.additionalConfigsFromProvider), + DeploymentData.empty.copy(additionalModelConfigs = + AdditionalModelConfigs(modelData.additionalConfigsFromProvider) + ), configuration ).runTest } diff --git a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala index 8d378cb6a97..ab8ce3c2f26 100644 --- a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala +++ b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala @@ -20,13 +20,14 @@ import pl.touk.nussknacker.engine.graph.node.Case import pl.touk.nussknacker.engine.process.helpers.SampleNodes._ import pl.touk.nussknacker.engine.testmode.TestProcess._ import pl.touk.nussknacker.engine.util.ThreadUtils -import pl.touk.nussknacker.engine.ModelData +import pl.touk.nussknacker.engine.{ModelConfigs, ModelData} import pl.touk.nussknacker.engine.api.component.{ ComponentAdditionalConfig, DesignerWideComponentId, ParameterAdditionalUIConfig } import pl.touk.nussknacker.engine.api.parameter.{ParameterName, ValueInputWithDictEditor} +import pl.touk.nussknacker.engine.deployment.AdditionalModelConfigs import pl.touk.nussknacker.engine.graph.expression.Expression import java.util.{Date, UUID} @@ -737,7 +738,9 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor .withValue("globalParameters.useIOMonadInInterpreter", ConfigValueFactory.fromAnyRef(useIOMonadInInterpreter)) // We need to set context loader to avoid forking in sbt - val modelData = ModelData.duringFlinkExecution(config, additionalConfigsFromProvider) + val modelData = ModelData.duringFlinkExecution( + ModelConfigs(config, AdditionalModelConfigs(additionalConfigsFromProvider)) + ) ThreadUtils.withThisAsContextClassLoader(getClass.getClassLoader) { FlinkTestMain.run(modelData, process, scenarioTestData, FlinkTestConfiguration.configuration()) } diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/AdditionalDictComponentConfigsExtractor.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/AdditionalDictComponentConfigsExtractor.scala deleted file mode 100644 index 2fa5b7b78e1..00000000000 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/AdditionalDictComponentConfigsExtractor.scala +++ /dev/null @@ -1,28 +0,0 @@ -package pl.touk.nussknacker.engine.management.periodic - -import pl.touk.nussknacker.engine.api.component.{ComponentAdditionalConfig, DesignerWideComponentId} -import pl.touk.nussknacker.engine.api.parameter.ValueInputWithDictEditor - -object AdditionalDictComponentConfigsExtractor { - - // This function filters additional configs provided by AdditionalUIConfigProvider - // to include only component and parameter configs with Dictionary editors. - // This is done to reduce data sent to Flink as only configs regarding this editor are required to be known during execution. - def getAdditionalConfigsWithDictParametersEditors( - additionalComponentConfigs: Map[DesignerWideComponentId, ComponentAdditionalConfig] - ): Map[DesignerWideComponentId, ComponentAdditionalConfig] = { - additionalComponentConfigs - .map { case (componentId, componentAdditionalConfig) => - val parametersWithDictEditors = componentAdditionalConfig.parameterConfigs.filter { - case (_, additionalUiConfig) => - additionalUiConfig.valueEditor match { - case Some(_: ValueInputWithDictEditor) => true - case _ => false - } - } - componentId -> componentAdditionalConfig.copy(parameterConfigs = parametersWithDictEditors) - } - .filter(_._2.parameterConfigs.nonEmpty) - } - -} diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala index a2bc79a9fda..e346c92ec08 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala @@ -14,7 +14,7 @@ import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess -import pl.touk.nussknacker.engine.deployment.{DeploymentData, DeploymentId} +import pl.touk.nussknacker.engine.deployment.{AdditionalModelConfigs, DeploymentData, DeploymentId} import pl.touk.nussknacker.engine.management.periodic.PeriodicProcessService.{ DeploymentStatus, EngineStatusesToReschedule, @@ -27,6 +27,7 @@ import pl.touk.nussknacker.engine.management.periodic.db.PeriodicProcessesReposi import pl.touk.nussknacker.engine.management.periodic.model.PeriodicProcessDeploymentStatus.PeriodicProcessDeploymentStatus import pl.touk.nussknacker.engine.management.periodic.model._ import pl.touk.nussknacker.engine.management.periodic.service._ +import pl.touk.nussknacker.engine.util.AdditionalComponentConfigsForRuntimeExtractor import java.time.chrono.ChronoLocalDateTime import java.time.temporal.ChronoUnit @@ -407,8 +408,8 @@ class PeriodicProcessService( additionalDeploymentDataProvider.prepareAdditionalData(deployment), // TODO: in the future we could allow users to specify nodes data during schedule requesting NodesDeploymentData.empty, - AdditionalDictComponentConfigsExtractor.getAdditionalConfigsWithDictParametersEditors( - configsFromProvider + AdditionalModelConfigs( + AdditionalComponentConfigsForRuntimeExtractor.getRequiredAdditionalConfigsForRuntime(configsFromProvider) ) ) val deploymentWithJarData = deployment.periodicProcess.deploymentData diff --git a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkRestManagerSpec.scala b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkRestManagerSpec.scala index 3d206c6d4fb..f379f73891b 100644 --- a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkRestManagerSpec.scala +++ b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkRestManagerSpec.scala @@ -19,7 +19,13 @@ import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.Proble import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessName, VersionId} import pl.touk.nussknacker.engine.api.{MetaData, ProcessVersion, StreamMetaData} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess -import pl.touk.nussknacker.engine.deployment.{DeploymentData, DeploymentId, ExternalDeploymentId, User} +import pl.touk.nussknacker.engine.deployment.{ + AdditionalModelConfigs, + DeploymentData, + DeploymentId, + ExternalDeploymentId, + User +} import pl.touk.nussknacker.engine.management.rest.HttpFlinkClient import pl.touk.nussknacker.engine.management.rest.flinkRestModel._ import pl.touk.nussknacker.engine.testing.LocalModelData @@ -59,7 +65,7 @@ class FlinkRestManagerSpec extends AnyFunSuite with Matchers with PatientScalaFu User("user1", "User 1"), Map.empty, NodesDeploymentData.empty, - Map.empty + AdditionalModelConfigs.empty ) private val returnedJobId = "jobId" diff --git a/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManager.scala b/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManager.scala index 80de9d12d58..e8f7b2da13d 100644 --- a/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManager.scala +++ b/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManager.scala @@ -20,6 +20,7 @@ import pl.touk.nussknacker.engine.lite.metrics.dropwizard.{DropwizardMetricsProv import pl.touk.nussknacker.engine.{BaseModelData, CustomProcessValidator, DeploymentManagerDependencies, ModelData} import pl.touk.nussknacker.lite.manager.{LiteDeploymentManager, LiteDeploymentManagerProvider} import pl.touk.nussknacker.engine.newdeployment +import pl.touk.nussknacker.engine.util.AdditionalComponentConfigsForRuntimeExtractor import scala.concurrent.duration.{DurationInt, FiniteDuration} import scala.concurrent.{Await, ExecutionContext, Future} @@ -126,14 +127,10 @@ class EmbeddedDeploymentManager( // We make sure that we don't let deploy a scenario when any parameter editor was modified to dictionary one by AdditionalUIConfigProvider // as that would result in failure during compilation before execution private def ensureAdditionalConfigsDoNotContainDictionaryEditors(deploymentData: DeploymentData): Unit = { - val configsWithDictEditors = deploymentData.additionalConfigsFromProvider.filter( - _._2.parameterConfigs.exists { case (_, parameterConfig) => - parameterConfig.valueEditor match { - case Some(_: ValueInputWithDictEditor) => true - case _ => false - } - } - ) + val configsWithDictEditors = + AdditionalComponentConfigsForRuntimeExtractor.getAdditionalConfigsWithDictParametersEditors( + deploymentData.additionalModelConfigs.additionalConfigsFromProvider + ) if (configsWithDictEditors.nonEmpty) { throw new IllegalArgumentException( "Parameter editor modification to ValueInputWithDictEditor by AdditionalUIConfigProvider is not supported for Lite engine" diff --git a/engine/lite/runtime-app/src/main/scala/pl/touk/nussknacker/engine/lite/app/RunnableScenarioInterpreterFactory.scala b/engine/lite/runtime-app/src/main/scala/pl/touk/nussknacker/engine/lite/app/RunnableScenarioInterpreterFactory.scala index a81ab729573..d4cd99dc792 100644 --- a/engine/lite/runtime-app/src/main/scala/pl/touk/nussknacker/engine/lite/app/RunnableScenarioInterpreterFactory.scala +++ b/engine/lite/runtime-app/src/main/scala/pl/touk/nussknacker/engine/lite/app/RunnableScenarioInterpreterFactory.scala @@ -4,7 +4,7 @@ import akka.actor.ActorSystem import com.typesafe.config.Config import com.typesafe.scalalogging.LazyLogging import net.ceedubs.ficus.readers.ArbitraryTypeReader.arbitraryTypeValueReader -import pl.touk.nussknacker.engine.ModelData +import pl.touk.nussknacker.engine.{ModelConfigs, ModelData} import pl.touk.nussknacker.engine.api.{JobData, LiteStreamMetaData, ProcessVersion, RequestResponseMetaData} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.lite.RunnableScenarioInterpreter @@ -25,7 +25,7 @@ object RunnableScenarioInterpreterFactory extends LazyLogging { ): RunnableScenarioInterpreter = { val modelConfig: Config = runtimeConfig.getConfig("modelConfig") val modelData = ModelData.duringExecution( - modelConfig, + ModelConfigs(modelConfig), ModelClassLoader(modelConfig.as[List[String]]("classPath"), workingDirectoryOpt = None), resolveConfigs = true ) diff --git a/extensions-api/src/main/scala/pl/touk/nussknacker/engine/deployment/AdditionalModelConfigs.scala b/extensions-api/src/main/scala/pl/touk/nussknacker/engine/deployment/AdditionalModelConfigs.scala new file mode 100644 index 00000000000..7bdc2207b14 --- /dev/null +++ b/extensions-api/src/main/scala/pl/touk/nussknacker/engine/deployment/AdditionalModelConfigs.scala @@ -0,0 +1,15 @@ +package pl.touk.nussknacker.engine.deployment + +import io.circe.generic.JsonCodec +import pl.touk.nussknacker.engine.api.component.{ComponentAdditionalConfig, DesignerWideComponentId} + +@JsonCodec +case class AdditionalModelConfigs( + additionalConfigsFromProvider: Map[DesignerWideComponentId, ComponentAdditionalConfig] +) + +object AdditionalModelConfigs { + + def empty: AdditionalModelConfigs = AdditionalModelConfigs(Map.empty) + +} diff --git a/extensions-api/src/main/scala/pl/touk/nussknacker/engine/deployment/DeploymentData.scala b/extensions-api/src/main/scala/pl/touk/nussknacker/engine/deployment/DeploymentData.scala index 09d9053003e..3e59150483a 100644 --- a/extensions-api/src/main/scala/pl/touk/nussknacker/engine/deployment/DeploymentData.scala +++ b/extensions-api/src/main/scala/pl/touk/nussknacker/engine/deployment/DeploymentData.scala @@ -1,18 +1,14 @@ package pl.touk.nussknacker.engine.deployment import io.circe.generic.JsonCodec -import pl.touk.nussknacker.engine.api.component.{ - ComponentAdditionalConfig, - DesignerWideComponentId, - NodesDeploymentData -} +import pl.touk.nussknacker.engine.api.component.NodesDeploymentData @JsonCodec case class DeploymentData( deploymentId: DeploymentId, user: User, additionalDeploymentData: Map[String, String], nodesData: NodesDeploymentData, - additionalConfigsFromProvider: Map[DesignerWideComponentId, ComponentAdditionalConfig] + additionalModelConfigs: AdditionalModelConfigs ) object DeploymentData { @@ -20,7 +16,13 @@ object DeploymentData { val systemUser: User = User("system", "system") val empty: DeploymentData = - DeploymentData(DeploymentId(""), systemUser, Map.empty, NodesDeploymentData.empty, Map.empty) + DeploymentData( + DeploymentId(""), + systemUser, + Map.empty, + NodesDeploymentData.empty, + AdditionalModelConfigs.empty + ) def withDeploymentId(deploymentIdString: String) = DeploymentData( @@ -28,7 +30,7 @@ object DeploymentData { systemUser, Map.empty, NodesDeploymentData.empty, - Map.empty + AdditionalModelConfigs.empty ) } diff --git a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/ModelData.scala b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/ModelData.scala index 53dc8f62f4a..43f00f94bd9 100644 --- a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/ModelData.scala +++ b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/ModelData.scala @@ -14,12 +14,12 @@ import pl.touk.nussknacker.engine.api.dict.{DictServicesFactory, EngineDictRegis import pl.touk.nussknacker.engine.api.namespaces.NamingStrategy import pl.touk.nussknacker.engine.api.process.{ProcessConfigCreator, ProcessObjectDependencies} import pl.touk.nussknacker.engine.definition.component.Components.ComponentDefinitionExtractionMode -import pl.touk.nussknacker.engine.definition.component.Components.ComponentDefinitionExtractionMode.FinalDefinition import pl.touk.nussknacker.engine.definition.model.{ ModelDefinition, ModelDefinitionExtractor, ModelDefinitionWithClasses } +import pl.touk.nussknacker.engine.deployment.AdditionalModelConfigs import pl.touk.nussknacker.engine.dict.DictServicesFactoryLoader import pl.touk.nussknacker.engine.migration.ProcessMigrations import pl.touk.nussknacker.engine.modelconfig._ @@ -58,11 +58,12 @@ object ModelData extends LazyLogging { // Also a classloader is correct so we don't need to build the new one // This tiny method is Flink specific so probably the interpreter module is not the best one // but it is very convenient to keep in near normal, duringExecution method - def duringFlinkExecution( - inputConfig: Config, - additionalConfigsFromProvider: Map[DesignerWideComponentId, ComponentAdditionalConfig] - ): ModelData = { - duringExecution(inputConfig, ModelClassLoader.empty, resolveConfigs = false, additionalConfigsFromProvider) + def duringFlinkExecution(modelConfigs: ModelConfigs): ModelData = { + duringExecution( + modelConfigs, + ModelClassLoader.empty, + resolveConfigs = false, + ) } // On the runtime side, we get only model config, not the whole processing type config, @@ -71,19 +72,18 @@ object ModelData extends LazyLogging { // we don't use not allowed components for a given category // and that the scenario doesn't violate validators introduced by additionalConfigsFromProvider def duringExecution( - inputConfig: Config, + modelConfigs: ModelConfigs, modelClassLoader: ModelClassLoader, resolveConfigs: Boolean, - additionalConfigsFromProvider: Map[DesignerWideComponentId, ComponentAdditionalConfig] = Map.empty ): ModelData = { def resolveInputConfigDuringExecution(modelConfigLoader: ModelConfigLoader): InputConfigDuringExecution = { if (resolveConfigs) { modelConfigLoader.resolveInputConfigDuringExecution( - ConfigWithUnresolvedVersion(modelClassLoader.classLoader, inputConfig), + ConfigWithUnresolvedVersion(modelClassLoader.classLoader, modelConfigs.modelInputConfig), modelClassLoader.classLoader ) } else { - InputConfigDuringExecution(inputConfig) + InputConfigDuringExecution(modelConfigs.modelInputConfig) } } ClassLoaderModelData( @@ -91,7 +91,7 @@ object ModelData extends LazyLogging { modelClassLoader = modelClassLoader, category = None, determineDesignerWideId = id => DesignerWideComponentId(id.toString), - additionalConfigsFromProvider = additionalConfigsFromProvider, + additionalConfigsFromProvider = modelConfigs.additionalModelConfigs.additionalConfigsFromProvider, shouldIncludeConfigCreator = _ => true, shouldIncludeComponentProvider = _ => true, componentDefinitionExtractionMode = ComponentDefinitionExtractionMode.FinalDefinition @@ -104,6 +104,11 @@ object ModelData extends LazyLogging { } +final case class ModelConfigs( + modelInputConfig: Config, + additionalModelConfigs: AdditionalModelConfigs = AdditionalModelConfigs.empty +) + final case class ModelDependencies( additionalConfigsFromProvider: Map[DesignerWideComponentId, ComponentAdditionalConfig], determineDesignerWideId: ComponentId => DesignerWideComponentId, From fd8d0b77c9694af6c0e21b7ffc14b6f270298e19 Mon Sep 17 00:00:00 2001 From: Maciej Cichanowicz Date: Fri, 15 Nov 2024 14:23:09 +0100 Subject: [PATCH 4/5] Added changelog entry --- docs/Changelog.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/Changelog.md b/docs/Changelog.md index cd3132a2435..9e45ab6c331 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -12,6 +12,7 @@ * [#7145](https://github.com/TouK/nussknacker/pull/7145) Lift TypingResult information for dictionaries * [#7116](https://github.com/TouK/nussknacker/pull/7116) Improve missing Flink Kafka Source / Sink TypeInformation +* [#7123](https://github.com/TouK/nussknacker/pull/7123) Fix deployments for scenarios with dict editors after model reload ## 1.18 From 064b5d065ae65fb7fb7f598f00a94565a023bc55 Mon Sep 17 00:00:00 2001 From: Maciej Cichanowicz Date: Tue, 19 Nov 2024 09:45:25 +0100 Subject: [PATCH 5/5] review comments --- ...onalComponentConfigsForRuntimeExtractor.scala | 2 +- .../process/deployment/DeploymentService.scala | 14 +++++++++----- .../newdeployment/DeploymentService.scala | 14 +++++++++----- .../embedded/EmbeddedDeploymentManager.scala | 16 ++++++---------- .../deployment/AdditionalModelConfigs.scala | 2 +- 5 files changed, 26 insertions(+), 22 deletions(-) diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/util/AdditionalComponentConfigsForRuntimeExtractor.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/util/AdditionalComponentConfigsForRuntimeExtractor.scala index 352a0562c5d..6490977cf9a 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/util/AdditionalComponentConfigsForRuntimeExtractor.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/util/AdditionalComponentConfigsForRuntimeExtractor.scala @@ -14,7 +14,7 @@ object AdditionalComponentConfigsForRuntimeExtractor { // This function filters additional configs provided by AdditionalUIConfigProvider // to include only component and parameter configs with Dictionary editors. - def getAdditionalConfigsWithDictParametersEditors( + private def getAdditionalConfigsWithDictParametersEditors( additionalComponentConfigs: Map[DesignerWideComponentId, ComponentAdditionalConfig] ): Map[DesignerWideComponentId, ComponentAdditionalConfig] = additionalComponentConfigs .map { case (componentId, componentAdditionalConfig) => diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentService.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentService.scala index c3b1fdc7ef7..8a6c8a78fc4 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentService.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentService.scala @@ -334,11 +334,7 @@ class DeploymentService( user.toManagerUser, additionalDeploymentData, nodesDeploymentData, - AdditionalModelConfigs( - AdditionalComponentConfigsForRuntimeExtractor.getRequiredAdditionalConfigsForRuntime( - additionalComponentConfigs.forProcessingType(processDetails.processingType).getOrElse(Map.empty) - ) - ) + getAdditionalModelConfigsRequiredForRuntime(processDetails.processingType) ) } yield DeployedScenarioData(processDetails.toEngineProcessVersion, deploymentData, resolvedCanonicalProcess) } @@ -422,6 +418,14 @@ class DeploymentService( ) } + private def getAdditionalModelConfigsRequiredForRuntime(processingType: ProcessingType)(implicit user: LoggedUser) = { + AdditionalModelConfigs( + AdditionalComponentConfigsForRuntimeExtractor.getRequiredAdditionalConfigsForRuntime( + additionalComponentConfigs.forProcessingType(processingType).getOrElse(Map.empty) + ) + ) + } + // TODO: check deployment id to be sure that returned status is for given deployment override def getProcessState( processIdWithName: ProcessIdWithName diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/newdeployment/DeploymentService.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/newdeployment/DeploymentService.scala index fbab402bfa7..f30004cb49e 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/newdeployment/DeploymentService.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/newdeployment/DeploymentService.scala @@ -235,11 +235,7 @@ class DeploymentService( user = loggedUser.toManagerUser, additionalDeploymentData = Map.empty, nodesData = nodesData, - additionalModelConfigs = AdditionalModelConfigs( - AdditionalComponentConfigsForRuntimeExtractor.getRequiredAdditionalConfigsForRuntime( - additionalComponentConfigs.forProcessingType(processingType)(loggedUser).getOrElse(Map.empty) - ) - ) + additionalModelConfigs = getAdditionalModelConfigsRequiredForRuntime(processingType, loggedUser) ) private def handleFailureDuringDeploymentRequesting( @@ -287,6 +283,14 @@ class DeploymentService( ) } + private def getAdditionalModelConfigsRequiredForRuntime(processingType: ProcessingType, loggedUser: LoggedUser) = { + AdditionalModelConfigs( + AdditionalComponentConfigsForRuntimeExtractor.getRequiredAdditionalConfigsForRuntime( + additionalComponentConfigs.forProcessingType(processingType)(loggedUser).getOrElse(Map.empty) + ) + ) + } + } object DeploymentService { diff --git a/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManager.scala b/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManager.scala index e8f7b2da13d..7a7242a36d4 100644 --- a/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManager.scala +++ b/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManager.scala @@ -100,7 +100,7 @@ class EmbeddedDeploymentManager( case DMRunDeploymentCommand(processVersion, deploymentData, canonicalProcess, updateStrategy) => Future { ensureReplaceDeploymentUpdateStrategy(updateStrategy) - ensureAdditionalConfigsDoNotContainDictionaryEditors(deploymentData) + ensureAdditionalComponentsConfigsAreEmpty(deploymentData) deployScenarioClosingOldIfNeeded( processVersion, deploymentData, @@ -124,16 +124,12 @@ class EmbeddedDeploymentManager( } } - // We make sure that we don't let deploy a scenario when any parameter editor was modified to dictionary one by AdditionalUIConfigProvider - // as that would result in failure during compilation before execution - private def ensureAdditionalConfigsDoNotContainDictionaryEditors(deploymentData: DeploymentData): Unit = { - val configsWithDictEditors = - AdditionalComponentConfigsForRuntimeExtractor.getAdditionalConfigsWithDictParametersEditors( - deploymentData.additionalModelConfigs.additionalConfigsFromProvider - ) - if (configsWithDictEditors.nonEmpty) { + // We make sure that we don't let deploy a scenario when any component was modified by AdditionalUIConfigProvider + // as it could potentially result in failure during compilation before execution + private def ensureAdditionalComponentsConfigsAreEmpty(deploymentData: DeploymentData): Unit = { + if (deploymentData.additionalModelConfigs.additionalConfigsFromProvider.nonEmpty) { throw new IllegalArgumentException( - "Parameter editor modification to ValueInputWithDictEditor by AdditionalUIConfigProvider is not supported for Lite engine" + "Component config modification by AdditionalUIConfigProvider is not supported for Lite engine" ) } } diff --git a/extensions-api/src/main/scala/pl/touk/nussknacker/engine/deployment/AdditionalModelConfigs.scala b/extensions-api/src/main/scala/pl/touk/nussknacker/engine/deployment/AdditionalModelConfigs.scala index 7bdc2207b14..65fbdea3716 100644 --- a/extensions-api/src/main/scala/pl/touk/nussknacker/engine/deployment/AdditionalModelConfigs.scala +++ b/extensions-api/src/main/scala/pl/touk/nussknacker/engine/deployment/AdditionalModelConfigs.scala @@ -4,7 +4,7 @@ import io.circe.generic.JsonCodec import pl.touk.nussknacker.engine.api.component.{ComponentAdditionalConfig, DesignerWideComponentId} @JsonCodec -case class AdditionalModelConfigs( +final case class AdditionalModelConfigs( additionalConfigsFromProvider: Map[DesignerWideComponentId, ComponentAdditionalConfig] )