From be0fe74e1b8da840c2f43a248cd70163aa37c1f2 Mon Sep 17 00:00:00 2001 From: gskrobisz Date: Wed, 25 Sep 2024 18:29:28 +0200 Subject: [PATCH] upd --- .../api/component/NodesDeploymentData.scala | 31 ++-------- .../engine/api/process/Source.scala | 9 ++- .../ui/api/DeploymentApiHttpService.scala | 5 +- .../description/DeploymentApiEndpoints.scala | 15 +---- .../newactivity/ActivityInfoService.scala | 34 +++++------ .../test/base/it/NuResourcesTest.scala | 4 -- .../ui/api/ActivityInfoResourcesSpec.scala | 12 ++-- ...DeploymentApiHttpServiceBusinessSpec.scala | 2 +- ...tApiHttpServiceDeploymentCommentSpec.scala | 6 +- docs-internal/api/nu-designer-openapi.yaml | 33 ++--------- .../api/process/FlinkCustomNodeContext.scala | 2 +- .../flink/table/source/TableSourceTest.scala | 9 +-- .../flink/table/source/TableSource.scala | 5 +- .../kafka/source/flink/FlinkKafkaSource.scala | 35 +++++++---- .../sample/source/BoundedSource.scala | 58 ++++++++++--------- .../activity/ActivityInfoProvider.scala | 4 +- .../ModelDataActivityInfoProvider.scala | 15 +++-- 17 files changed, 123 insertions(+), 156 deletions(-) diff --git a/components-api/src/main/scala/pl/touk/nussknacker/engine/api/component/NodesDeploymentData.scala b/components-api/src/main/scala/pl/touk/nussknacker/engine/api/component/NodesDeploymentData.scala index 7a9151fbf91..f43126590f5 100644 --- a/components-api/src/main/scala/pl/touk/nussknacker/engine/api/component/NodesDeploymentData.scala +++ b/components-api/src/main/scala/pl/touk/nussknacker/engine/api/component/NodesDeploymentData.scala @@ -1,15 +1,18 @@ package pl.touk.nussknacker.engine.api.component -import cats.syntax.functor._ import io.circe.{Decoder, Encoder} -import io.circe.generic.auto._ -import io.circe.syntax._ import pl.touk.nussknacker.engine.api.NodeId +import pl.touk.nussknacker.engine.api.component.NodesDeploymentData.NodeDeploymentData final case class NodesDeploymentData(dataByNodeId: Map[NodeId, NodeDeploymentData]) object NodesDeploymentData { + // Raw deployment parameters (name -> value) that are used as additional node configuration during deployment. + // Each node can be provided with dedicated set of parameters. + // TODO: consider replacing NodeDeploymentData with Json + type NodeDeploymentData = Map[String, String] + val empty: NodesDeploymentData = NodesDeploymentData(Map.empty) implicit val nodesDeploymentDataEncoder: Encoder[NodesDeploymentData] = Encoder @@ -20,25 +23,3 @@ object NodesDeploymentData { Decoder.decodeMap[NodeId, NodeDeploymentData].map(NodesDeploymentData(_)) } - -sealed trait NodeDeploymentData - -final case class SqlFilteringExpression(sqlExpression: String) extends NodeDeploymentData - -final case class KafkaSourceOffset(offsetResetStrategy: Long) extends NodeDeploymentData - -object NodeDeploymentData { - - implicit val nodeDeploymentDataEncoder: Encoder[NodeDeploymentData] = - Encoder.instance { - case s: SqlFilteringExpression => s.asJson - case o: KafkaSourceOffset => o.asJson - } - - implicit val nodeDeploymentDataDecoder: Decoder[NodeDeploymentData] = - List[Decoder[NodeDeploymentData]]( - Decoder[SqlFilteringExpression].widen, - Decoder[KafkaSourceOffset].widen - ).reduceLeft(_ or _) - -} diff --git a/components-api/src/main/scala/pl/touk/nussknacker/engine/api/process/Source.scala b/components-api/src/main/scala/pl/touk/nussknacker/engine/api/process/Source.scala index ed783d853c0..0a213e5bc6b 100644 --- a/components-api/src/main/scala/pl/touk/nussknacker/engine/api/process/Source.scala +++ b/components-api/src/main/scala/pl/touk/nussknacker/engine/api/process/Source.scala @@ -1,7 +1,7 @@ package pl.touk.nussknacker.engine.api.process import pl.touk.nussknacker.engine.api.component.Component._ -import pl.touk.nussknacker.engine.api.component.{Component, ProcessingMode} +import pl.touk.nussknacker.engine.api.component.{Component, ParameterConfig, ProcessingMode} import pl.touk.nussknacker.engine.api.context.ContextTransformation import pl.touk.nussknacker.engine.api.definition.{Parameter, WithExplicitTypesToExtract} import pl.touk.nussknacker.engine.api.parameter.ParameterName @@ -49,8 +49,13 @@ trait TestWithParametersSupport[+T] { self: Source => def parametersToTestData(params: Map[ParameterName, AnyRef]): T } +/** + * Used to define Source parameters for each activity + * e.g. + * {"DEPLOY": { "parametername": ...parameter configuration... } + */ trait WithActivityParameters { self: Source => - def activityParametersDefinition: Map[String, List[Parameter]] + def activityParametersDefinition: Map[String, Map[String, ParameterConfig]] } /** diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/DeploymentApiHttpService.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/DeploymentApiHttpService.scala index 5284d34e724..24461e8def3 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/DeploymentApiHttpService.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/DeploymentApiHttpService.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.ui.api +import pl.touk.nussknacker.engine.api.component.NodesDeploymentData import pl.touk.nussknacker.engine.api.deployment.ProblemDeploymentStatus import pl.touk.nussknacker.ui.api.description.DeploymentApiEndpoints import pl.touk.nussknacker.ui.api.description.DeploymentApiEndpoints.Dtos._ @@ -29,7 +30,9 @@ class DeploymentApiHttpService( RunDeploymentCommand( id = deploymentId, scenarioName = request.scenarioName, - nodesDeploymentData = request.nodesDeploymentData, + nodesDeploymentData = NodesDeploymentData(request.nodesDeploymentData.map { case (n, p) => + (n, Map("sqlExpression" -> p)) + }), user = loggedUser ), request.comment diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/description/DeploymentApiEndpoints.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/description/DeploymentApiEndpoints.scala index 8e14c688134..db1fb23c53f 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/description/DeploymentApiEndpoints.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/description/DeploymentApiEndpoints.scala @@ -4,7 +4,6 @@ import cats.data.NonEmptyList import derevo.circe.{decoder, encoder} import derevo.derive import pl.touk.nussknacker.engine.api.NodeId -import pl.touk.nussknacker.engine.api.component.{NodeDeploymentData, NodesDeploymentData, SqlFilteringExpression} import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.{ EmptyProcess, ExpressionParserCompilationError, @@ -48,9 +47,7 @@ class DeploymentApiEndpoints(auth: EndpointInput[AuthCredentials]) extends BaseE .example( RunDeploymentRequest( scenarioName = ProcessName("scenario1"), - NodesDeploymentData( - Map(NodeId("sourceNodeId1") -> SqlFilteringExpression("field1 = 'value'")) - ), + nodesDeploymentData = Map(NodeId("sourceNodeId1") -> "field1 = 'value'"), comment = None ) ) @@ -197,7 +194,7 @@ object DeploymentApiEndpoints { @derive(encoder, decoder, schema) final case class RunDeploymentRequest( scenarioName: ProcessName, - nodesDeploymentData: NodesDeploymentData, + nodesDeploymentData: Map[NodeId, String], // nodeId -> single parameter value comment: Option[ApiCallComment] ) @@ -210,13 +207,7 @@ object DeploymentApiEndpoints { modifiedAt: Instant ) - implicit val nodeDeploymentDataCodec: Schema[NodeDeploymentData] = Schema.derived - - implicit val nodesDeploymentDataCodec: Schema[NodesDeploymentData] = Schema - .schemaForMap[NodeId, NodeDeploymentData](_.id) - .map[NodesDeploymentData]((map: Map[NodeId, NodeDeploymentData]) => Some(NodesDeploymentData(map)))( - _.dataByNodeId - ) + implicit val nodesDeploymentDataCodec: Schema[Map[NodeId, String]] = Schema.schemaForMap[NodeId, String](_.id) sealed trait RunDeploymentError diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/newactivity/ActivityInfoService.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/newactivity/ActivityInfoService.scala index be78ee6193d..0704c6f3017 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/newactivity/ActivityInfoService.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/newactivity/ActivityInfoService.scala @@ -1,14 +1,12 @@ package pl.touk.nussknacker.ui.process.newactivity -import pl.touk.nussknacker.engine.api.definition.StringParameterEditor +import pl.touk.nussknacker.engine.api.NodeId +import pl.touk.nussknacker.engine.api.definition.RawParameterEditor import pl.touk.nussknacker.engine.api.graph.ScenarioGraph import pl.touk.nussknacker.engine.api.process.ProcessName -import pl.touk.nussknacker.engine.api.typed.CanBeSubclassDeterminer -import pl.touk.nussknacker.engine.api.typed.typing.Typed import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.definition.activity.ActivityInfoProvider -import pl.touk.nussknacker.restmodel.definition.UISourceParameters -import pl.touk.nussknacker.ui.definition.DefinitionsService +import pl.touk.nussknacker.restmodel.definition.UiScenarioPropertyConfig import pl.touk.nussknacker.ui.process.label.ScenarioLabel import pl.touk.nussknacker.ui.security.api.LoggedUser import pl.touk.nussknacker.ui.uiresolving.UIProcessResolver @@ -16,6 +14,7 @@ import pl.touk.nussknacker.ui.uiresolving.UIProcessResolver // TODO: move to ActivityService? execute node compilation only once with ScenarioTestService? class ActivityInfoService(activityInfoProvider: ActivityInfoProvider, processResolver: UIProcessResolver) { + // TODO: use UiActivityParameterConfig instead of UiScenarioPropertyConfig def getActivityParameters( scenarioGraph: ScenarioGraph, processName: ProcessName, @@ -23,17 +22,22 @@ class ActivityInfoService(activityInfoProvider: ActivityInfoProvider, processRes labels: List[ScenarioLabel] )( implicit user: LoggedUser - ): Map[String, List[UISourceParameters]] = { + ): Map[String, Map[NodeId, Map[String, UiScenarioPropertyConfig]]] = { val canonical = toCanonicalProcess(scenarioGraph, processName, isFragment, labels) activityInfoProvider .getActivityParameters(canonical) .map { case (activityName, nodeParamsMap) => activityName -> nodeParamsMap .map { case (nodeId, params) => - UISourceParameters(nodeId, params.map(DefinitionsService.createUIParameter)) + NodeId(nodeId) -> params.map { case (name, value) => + name -> UiScenarioPropertyConfig( + value.defaultValue, + value.editor.getOrElse(RawParameterEditor), + value.label, + value.hintText + ) + } } - .map(assignUserFriendlyEditor) - .toList } } @@ -47,16 +51,4 @@ class ActivityInfoService(activityInfoProvider: ActivityInfoProvider, processRes processResolver.validateAndResolve(scenarioGraph, processName, isFragment, labels) } - // copied from ScenarioTestService - private def assignUserFriendlyEditor(uiSourceParameter: UISourceParameters): UISourceParameters = { - val adaptedParameters = uiSourceParameter.parameters.map { uiParameter => - if (CanBeSubclassDeterminer.canBeSubclassOf(uiParameter.typ, Typed.apply(classOf[String])).isValid) { - uiParameter.copy(editor = StringParameterEditor) - } else { - uiParameter - } - } - uiSourceParameter.copy(parameters = adaptedParameters) - } - } diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/test/base/it/NuResourcesTest.scala b/designer/server/src/test/scala/pl/touk/nussknacker/test/base/it/NuResourcesTest.scala index 43d66bffd59..ae6f2822264 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/test/base/it/NuResourcesTest.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/test/base/it/NuResourcesTest.scala @@ -57,10 +57,6 @@ import pl.touk.nussknacker.ui.processreport.ProcessCounter import pl.touk.nussknacker.ui.security.api.{LoggedUser, RealLoggedUser} import pl.touk.nussknacker.ui.util.{MultipartUtils, NuPathMatchers} import slick.dbio.DBIOAction -import pl.touk.nussknacker.engine.definition.activity.ModelDataActivityInfoProvider -import pl.touk.nussknacker.ui.LoadableConfigBasedNussknackerConfig -import pl.touk.nussknacker.ui.process.newactivity.ActivityInfoService -import pl.touk.nussknacker.ui.process.newactivity.ScenarioActivityService import java.net.URI import scala.concurrent.{ExecutionContext, Future} diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ActivityInfoResourcesSpec.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ActivityInfoResourcesSpec.scala index 6fa9d2e521f..408546f2721 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ActivityInfoResourcesSpec.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ActivityInfoResourcesSpec.scala @@ -2,7 +2,7 @@ package pl.touk.nussknacker.ui.api import io.restassured.RestAssured.`given` import io.restassured.module.scala.RestAssuredSupport.AddThenToResponse -import org.hamcrest.Matchers.equalTo +import org.hamcrest.Matchers.{emptyOrNullString, equalTo, is, notNullValue} import org.scalatest.freespec.AnyFreeSpecLike import pl.touk.nussknacker.engine.build.ScenarioBuilder import pl.touk.nussknacker.test.base.it.{NuItTest, WithSimplifiedConfigScenarioHelper} @@ -38,12 +38,10 @@ class ActivityInfoResourcesSpec .Then() .statusCode(200) .body( - "DEPLOY[0].sourceId", - equalTo("sourceWithParametersId"), - "DEPLOY[0].parameters[0].name", - equalTo("offset"), - "DEPLOY[0].parameters[0].typ.display", - equalTo("Long") + "DEPLOY.sourceWithParametersId.offset", + notNullValue(), + "DEPLOY.sourceWithParametersId.offset.defaultValue", + is(emptyOrNullString()) ) } diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/DeploymentApiHttpServiceBusinessSpec.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/DeploymentApiHttpServiceBusinessSpec.scala index c4620212221..5f12530554a 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/DeploymentApiHttpServiceBusinessSpec.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/DeploymentApiHttpServiceBusinessSpec.scala @@ -65,7 +65,7 @@ class DeploymentApiHttpServiceBusinessSpec private val correctDeploymentRequest = s"""{ | "scenarioName": "$scenarioName", | "nodesDeploymentData": { - | "$sourceNodeId": {"sqlExpression":"`date` = '2024-01-01'"} + | "$sourceNodeId": "`date` = '2024-01-01'" | } |}""".stripMargin diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/DeploymentApiHttpServiceDeploymentCommentSpec.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/DeploymentApiHttpServiceDeploymentCommentSpec.scala index e6042ead2a1..aedab7354bb 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/DeploymentApiHttpServiceDeploymentCommentSpec.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/DeploymentApiHttpServiceDeploymentCommentSpec.scala @@ -79,7 +79,7 @@ class DeploymentApiHttpServiceDeploymentCommentSpec .jsonBody(s"""{ | "scenarioName": "$scenarioName", | "nodesDeploymentData": { - | "$sourceNodeId": {"sqlExpression":"`date` = '2024-01-01'"} + | "$sourceNodeId": "`date` = '2024-01-01'" | } |}""".stripMargin) .put(s"$nuDesignerHttpAddress/api/deployments/${DeploymentId.generate}") @@ -99,7 +99,7 @@ class DeploymentApiHttpServiceDeploymentCommentSpec .jsonBody(s"""{ | "scenarioName": "$scenarioName", | "nodesDeploymentData": { - | "$sourceNodeId": {"sqlExpression":"`date` = '2024-01-01'"} + | "$sourceNodeId": "`date` = '2024-01-01'" | }, | "comment": "deployment comment not matching configured pattern" |}""".stripMargin) @@ -121,7 +121,7 @@ class DeploymentApiHttpServiceDeploymentCommentSpec .jsonBody(s"""{ | "scenarioName": "$scenarioName", | "nodesDeploymentData": { - | "$sourceNodeId": {"sqlExpression":"`date` = '2024-01-01'"} + | "$sourceNodeId": "`date` = '2024-01-01'" | }, | "comment": "comment with $configuredPhrase" |}""".stripMargin) diff --git a/docs-internal/api/nu-designer-openapi.yaml b/docs-internal/api/nu-designer-openapi.yaml index 38d2261b4e0..01ad538b04c 100644 --- a/docs-internal/api/nu-designer-openapi.yaml +++ b/docs-internal/api/nu-designer-openapi.yaml @@ -817,8 +817,7 @@ paths: example: scenarioName: scenario1 nodesDeploymentData: - sourceNodeId1: - sqlExpression: field1 = 'value' + sourceNodeId1: field1 = 'value' required: true responses: '202': @@ -4807,15 +4806,6 @@ components: JsonParameterEditor: title: JsonParameterEditor type: object - KafkaSourceOffset: - title: KafkaSourceOffset - type: object - required: - - offset - properties: - offset: - type: integer - format: int64 LayoutData: title: LayoutData type: object @@ -4867,11 +4857,11 @@ components: type: object additionalProperties: $ref: '#/components/schemas/Map_TypingResultInJson' - Map_NodeId_NodeDeploymentData: - title: Map_NodeId_NodeDeploymentData + Map_NodeId_String: + title: Map_NodeId_String type: object additionalProperties: - $ref: '#/components/schemas/NodeDeploymentData' + type: string Map_String: title: Map_String type: object @@ -5299,11 +5289,6 @@ components: type: string type: $ref: '#/components/schemas/NodeTypes12' - NodeDeploymentData: - title: NodeDeploymentData - oneOf: - - $ref: '#/components/schemas/KafkaSourceOffset' - - $ref: '#/components/schemas/SqlFilteringExpression' NodeTypes: title: NodeTypes type: string @@ -5734,7 +5719,7 @@ components: scenarioName: type: string nodesDeploymentData: - $ref: '#/components/schemas/Map_NodeId_NodeDeploymentData' + $ref: '#/components/schemas/Map_NodeId_String' comment: type: - string @@ -6044,14 +6029,6 @@ components: SpelTemplateParameterEditor: title: SpelTemplateParameterEditor type: object - SqlFilteringExpression: - title: SqlFilteringExpression - type: object - required: - - sqlExpression - properties: - sqlExpression: - type: string SqlParameterEditor: title: SqlParameterEditor type: object diff --git a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/process/FlinkCustomNodeContext.scala b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/process/FlinkCustomNodeContext.scala index 25e7e8b2502..b58aa347af2 100644 --- a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/process/FlinkCustomNodeContext.scala +++ b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/process/FlinkCustomNodeContext.scala @@ -3,7 +3,7 @@ package pl.touk.nussknacker.engine.flink.api.process import com.github.ghik.silencer.silent import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.api.common.typeinfo.TypeInformation -import pl.touk.nussknacker.engine.api.component.NodeDeploymentData +import pl.touk.nussknacker.engine.api.component.NodesDeploymentData.NodeDeploymentData import pl.touk.nussknacker.engine.api.context.ValidationContext import pl.touk.nussknacker.engine.api.process.ComponentUseCase import pl.touk.nussknacker.engine.api.runtimecontext.EngineRuntimeContext diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/source/TableSourceTest.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/source/TableSourceTest.scala index 36e2b17f08f..8857b738315 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/source/TableSourceTest.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/source/TableSourceTest.scala @@ -7,11 +7,12 @@ import org.scalatest.LoneElement import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers import pl.touk.nussknacker.engine.api.NodeId -import pl.touk.nussknacker.engine.api.component.{ComponentDefinition, NodesDeploymentData, SqlFilteringExpression} +import pl.touk.nussknacker.engine.api.component.{ComponentDefinition, NodesDeploymentData} import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies import pl.touk.nussknacker.engine.build.ScenarioBuilder import pl.touk.nussknacker.engine.flink.table.FlinkTableComponentProvider import pl.touk.nussknacker.engine.flink.table.definition.{FlinkDataDefinition, StubbedCatalogFactory} +import pl.touk.nussknacker.engine.flink.table.source.TableSource.SQL_EXPRESSION_PARAMETER_NAME import pl.touk.nussknacker.engine.flink.test.FlinkSpec import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner import pl.touk.nussknacker.engine.process.FlinkJobConfig.ExecutionMode @@ -87,7 +88,7 @@ class TableSourceTest val result = runner .runWithoutData[Row]( scenario, - nodesData = NodesDeploymentData(Map(NodeId("start") -> SqlFilteringExpression("true = true"))) + nodesData = NodesDeploymentData(Map(NodeId("start") -> Map(SQL_EXPRESSION_PARAMETER_NAME -> "true = true"))) ) .validValue result.errors shouldBe empty @@ -127,7 +128,7 @@ class TableSourceTest val resultWithoutFiltering = runnerWithCatalogConfiguration .runWithoutData[Row]( scenario, - nodesData = NodesDeploymentData(Map(NodeId("start") -> SqlFilteringExpression("true = true"))) + nodesData = NodesDeploymentData(Map(NodeId("start") -> Map(SQL_EXPRESSION_PARAMETER_NAME -> "true = true"))) ) .validValue resultWithoutFiltering.errors shouldBe empty @@ -136,7 +137,7 @@ class TableSourceTest val resultWithFiltering = runnerWithCatalogConfiguration .runWithoutData[Row]( scenario, - nodesData = NodesDeploymentData(Map(NodeId("start") -> SqlFilteringExpression("true = false"))) + nodesData = NodesDeploymentData(Map(NodeId("start") -> Map(SQL_EXPRESSION_PARAMETER_NAME -> "true = false"))) ) .validValue resultWithFiltering.errors shouldBe empty diff --git a/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/source/TableSource.scala b/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/source/TableSource.scala index 23c7dc55588..5f475f58314 100644 --- a/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/source/TableSource.scala +++ b/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/source/TableSource.scala @@ -6,7 +6,6 @@ import org.apache.flink.table.api.bridge.java.StreamTableEnvironment import org.apache.flink.table.api.{DataTypes, Schema} import org.apache.flink.table.catalog.Column.{ComputedColumn, MetadataColumn, PhysicalColumn} import org.apache.flink.types.Row -import pl.touk.nussknacker.engine.api.component.SqlFilteringExpression import pl.touk.nussknacker.engine.api.definition.Parameter import pl.touk.nussknacker.engine.api.parameter.ParameterName import pl.touk.nussknacker.engine.api.process.{ @@ -52,7 +51,8 @@ class TableSource( val selectQuery = tableEnv.from(tableDefinition.tableId.toString) val finalQuery = flinkNodeContext.nodeDeploymentData - .collect { case SqlFilteringExpression(sqlExpression) => + .flatMap(_.get(SQL_EXPRESSION_PARAMETER_NAME)) + .collect { case sqlExpression => tableEnv.executeSql( s"CREATE TEMPORARY VIEW $filteringInternalViewName AS SELECT * FROM ${tableDefinition.tableId} WHERE $sqlExpression" ) @@ -126,5 +126,6 @@ class TableSource( } object TableSource { + val SQL_EXPRESSION_PARAMETER_NAME = "sqlExpression" private val filteringInternalViewName = "filteringView" } diff --git a/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/source/flink/FlinkKafkaSource.scala b/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/source/flink/FlinkKafkaSource.scala index 3f01e894d96..f05c08aa1ac 100644 --- a/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/source/flink/FlinkKafkaSource.scala +++ b/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/source/flink/FlinkKafkaSource.scala @@ -11,7 +11,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaConsumerBase} import org.apache.kafka.clients.consumer.ConsumerRecord import pl.touk.nussknacker.engine.api.NodeId -import pl.touk.nussknacker.engine.api.component.KafkaSourceOffset +import pl.touk.nussknacker.engine.api.component.ParameterConfig import pl.touk.nussknacker.engine.api.definition.{FixedExpressionValue, FixedValuesParameterEditor, Parameter} import pl.touk.nussknacker.engine.api.deployment.ScenarioActionName import pl.touk.nussknacker.engine.api.namespaces.NamingStrategy @@ -82,18 +82,28 @@ class FlinkKafkaSource[T]( protected lazy val topics: NonEmptyList[TopicName.ForSource] = preparedTopics.map(_.prepared) - override def activityParametersDefinition: Map[String, List[Parameter]] = { - import pl.touk.nussknacker.engine.spel.SpelExtension._ - val defaultValue = if (kafkaConfig.forceLatestRead.contains(true)) Some("'LATEST'".spel) else Some("'NONE'".spel) - val offsetResetStrategyValues = List( - FixedExpressionValue("'LATEST'", "LATEST"), - FixedExpressionValue("'EARLIEST'", "EARLIEST"), - FixedExpressionValue("'NONE'", "NONE"), + private val OFFSET_RESET_STRATEGY_PARAM_NAME = "offsetResetStrategy" + + override def activityParametersDefinition: Map[String, Map[String, ParameterConfig]] = { + val defaultValue = if (kafkaConfig.forceLatestRead.contains(true)) Some("LATEST") else Some("NONE") + val editor = Some( + FixedValuesParameterEditor( + List( + FixedExpressionValue("LATEST", "LATEST"), + FixedExpressionValue("EARLIEST", "EARLIEST"), + FixedExpressionValue("NONE", "NONE"), + ) + ) ) Map( - ScenarioActionName.Deploy.value -> List( - Parameter(ParameterName("offsetResetStrategy"), Typed.apply[String]) - .copy(editor = Some(FixedValuesParameterEditor(offsetResetStrategyValues)), defaultValue = defaultValue), + ScenarioActionName.Deploy.value -> Map( + OFFSET_RESET_STRATEGY_PARAM_NAME -> ParameterConfig( + defaultValue = defaultValue, + editor = editor, + validators = None, + label = None, + hintText = None + ), ) ) } @@ -104,7 +114,8 @@ class FlinkKafkaSource[T]( flinkNodeContext: FlinkCustomNodeContext ): SourceFunction[T] = { // TODO: handle deployment parameters -> offset - val deploymentDataOpt = flinkNodeContext.nodeDeploymentData.collect { case d: KafkaSourceOffset => d } + val offsetResetStrategy = + flinkNodeContext.nodeDeploymentData.flatMap(_.get(OFFSET_RESET_STRATEGY_PARAM_NAME)).getOrElse() topics.toList.foreach(KafkaUtils.setToLatestOffsetIfNeeded(kafkaConfig, _, consumerGroupId)) createFlinkSource(consumerGroupId, flinkNodeContext) } diff --git a/engine/flink/management/dev-model/src/main/scala/pl/touk/nussknacker/engine/management/sample/source/BoundedSource.scala b/engine/flink/management/dev-model/src/main/scala/pl/touk/nussknacker/engine/management/sample/source/BoundedSource.scala index 5b40c849edc..4cce041c505 100644 --- a/engine/flink/management/dev-model/src/main/scala/pl/touk/nussknacker/engine/management/sample/source/BoundedSource.scala +++ b/engine/flink/management/dev-model/src/main/scala/pl/touk/nussknacker/engine/management/sample/source/BoundedSource.scala @@ -2,13 +2,8 @@ package pl.touk.nussknacker.engine.management.sample.source import org.apache.flink.streaming.api.datastream.DataStreamSource import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment -import pl.touk.nussknacker.engine.api.component.{KafkaSourceOffset, UnboundedStreamComponent} -import pl.touk.nussknacker.engine.api.definition.{ - FixedExpressionValue, - FixedValuesParameterEditor, - Parameter, - StringParameterEditor -} +import pl.touk.nussknacker.engine.api.component.{ParameterConfig, UnboundedStreamComponent} +import pl.touk.nussknacker.engine.api.definition.{FixedExpressionValue, FixedValuesParameterEditor, RawParameterEditor} import pl.touk.nussknacker.engine.api.deployment.ScenarioActionName import pl.touk.nussknacker.engine.api.parameter.ParameterName import pl.touk.nussknacker.engine.api.process.{SourceFactory, WithActivityParameters} @@ -33,23 +28,34 @@ object BoundedSourceWithOffset extends SourceFactory with UnboundedStreamCompone def source(@ParamName("elements") elements: java.util.List[Any]) = new CollectionSource[Any](elements.asScala.toList, None, Unknown) with WithActivityParameters { - override def activityParametersDefinition: Map[String, List[Parameter]] = { - - import pl.touk.nussknacker.engine.spel.SpelExtension._ - - val offsetResetStrategyValues = List( - FixedExpressionValue("'LATEST'", "LATEST"), - FixedExpressionValue("'EARLIEST'", "EARLIEST"), - FixedExpressionValue("'NONE'", "NONE"), + override def activityParametersDefinition: Map[String, Map[String, ParameterConfig]] = { + val fixedValuesEditor = Some( + FixedValuesParameterEditor( + List( + FixedExpressionValue("LATEST", "LATEST"), + FixedExpressionValue("EARLIEST", "EARLIEST"), + FixedExpressionValue("NONE", "NONE"), + ) + ) ) - Map( - ScenarioActionName.Deploy.value -> List( - Parameter(ParameterName("offset"), Typed.apply[Long]), - Parameter(ParameterName("sometext"), Typed.apply[String]) - .copy(editor = Some(StringParameterEditor), defaultValue = Some("'example'".spel)), - Parameter(ParameterName("offsetResetStrategy"), Typed.apply[String]) - .copy(editor = Some(FixedValuesParameterEditor(offsetResetStrategyValues))), + ScenarioActionName.Deploy.value -> Map( + "offset" -> ParameterConfig( + defaultValue = None, + editor = Some(RawParameterEditor), + validators = None, + label = None, + hintText = Some( + "Set offset to setup source to emit elements from specified start point in input collection. Empty field resets collection to the beginning." + ) + ), + "offsetResetStrategy" -> ParameterConfig( + defaultValue = Some("EARLIEST"), + editor = fixedValuesEditor, + validators = None, + label = None, + hintText = Some("Example of parameter with fixed values") + ), ) ) } @@ -59,10 +65,10 @@ object BoundedSourceWithOffset extends SourceFactory with UnboundedStreamCompone env: StreamExecutionEnvironment, flinkNodeContext: FlinkCustomNodeContext ): DataStreamSource[T] = { - val deploymentDataOpt = flinkNodeContext.nodeDeploymentData.collect { case d: KafkaSourceOffset => d } - val elementsWithOffset = deploymentDataOpt match { - case Some(data) => list.drop(data.offsetResetStrategy.toInt) - case _ => list + val offsetOpt = flinkNodeContext.nodeDeploymentData.flatMap(_.get("offset")) + val elementsWithOffset = offsetOpt match { + case Some(offset) => list.drop(offset.toInt) + case _ => list } super.createSourceStream(elementsWithOffset, env, flinkNodeContext) } diff --git a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/definition/activity/ActivityInfoProvider.scala b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/definition/activity/ActivityInfoProvider.scala index 0c679dad44a..1cf9ac7fcec 100644 --- a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/definition/activity/ActivityInfoProvider.scala +++ b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/definition/activity/ActivityInfoProvider.scala @@ -1,10 +1,10 @@ package pl.touk.nussknacker.engine.definition.activity -import pl.touk.nussknacker.engine.api.definition.Parameter +import pl.touk.nussknacker.engine.api.component.ParameterConfig import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess trait ActivityInfoProvider { - def getActivityParameters(scenario: CanonicalProcess): Map[String, Map[String, List[Parameter]]] + def getActivityParameters(scenario: CanonicalProcess): Map[String, Map[String, Map[String, ParameterConfig]]] } diff --git a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/definition/activity/ModelDataActivityInfoProvider.scala b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/definition/activity/ModelDataActivityInfoProvider.scala index dce50eb7160..32a57306a7d 100644 --- a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/definition/activity/ModelDataActivityInfoProvider.scala +++ b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/definition/activity/ModelDataActivityInfoProvider.scala @@ -1,7 +1,7 @@ package pl.touk.nussknacker.engine.definition.activity import pl.touk.nussknacker.engine.ModelData -import pl.touk.nussknacker.engine.api.definition.Parameter +import pl.touk.nussknacker.engine.api.component.ParameterConfig import pl.touk.nussknacker.engine.api.process.WithActivityParameters import pl.touk.nussknacker.engine.api.{MetaData, NodeId} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess @@ -12,7 +12,9 @@ class ModelDataActivityInfoProvider(modelData: ModelData) extends CommonModelDataInfoProvider(modelData) with ActivityInfoProvider { - override def getActivityParameters(scenario: CanonicalProcess): Map[String, Map[String, List[Parameter]]] = { + override def getActivityParameters( + scenario: CanonicalProcess + ): Map[String, Map[String, Map[String, ParameterConfig]]] = { modelData.withThisAsContextClassLoader { val nodeToActivityToParameters = collectAllSources(scenario) .map(source => source.id -> getActivityParameters(source, scenario.metaData)) @@ -22,8 +24,8 @@ class ModelDataActivityInfoProvider(modelData: ModelData) } private def groupByActivity( - nodeToActivityToParameters: Map[String, Map[String, List[Parameter]]] - ): Map[String, Map[String, List[Parameter]]] = { + nodeToActivityToParameters: Map[String, Map[String, Map[String, ParameterConfig]]] + ): Map[String, Map[String, Map[String, ParameterConfig]]] = { val activityToNodeToParameters = for { (node, activityToParams) <- nodeToActivityToParameters.toList (activity, params) <- activityToParams.toList @@ -33,7 +35,10 @@ class ModelDataActivityInfoProvider(modelData: ModelData) .mapValuesNow(_.map(_._2).toMap) } - private def getActivityParameters(source: SourceNodeData, metaData: MetaData): Map[String, List[Parameter]] = { + private def getActivityParameters( + source: SourceNodeData, + metaData: MetaData + ): Map[String, Map[String, ParameterConfig]] = { modelData.withThisAsContextClassLoader { val compiledSource = prepareSourceObj(source)(metaData, NodeId(source.id)) compiledSource match {