Skip to content

Commit

Permalink
[NU-1800] Add TemplateEvaluationResult to evaluate SpEL expression pa…
Browse files Browse the repository at this point in the history
…rts in LazyParameter (#7162)

Add TemplateEvaluationResult to evaluate SpEL expression parts in LazyParameter
---------
Co-authored-by: Arek Burdach <[email protected]>
  • Loading branch information
mslabek authored Nov 20, 2024
1 parent 1539ae5 commit 14a9a75
Show file tree
Hide file tree
Showing 17 changed files with 349 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package pl.touk.nussknacker.engine.api

case class TemplateEvaluationResult(renderedParts: List[TemplateRenderedPart]) {
def renderedTemplate: String = renderedParts.map(_.value).mkString("")
}

sealed trait TemplateRenderedPart {
def value: String
}

object TemplateRenderedPart {
case class RenderedLiteral(value: String) extends TemplateRenderedPart

case class RenderedSubExpression(value: String) extends TemplateRenderedPart
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,7 @@ object DatabaseQueryEnricher {

final val queryParamName: ParameterName = ParameterName("Query")

final val queryParamDeclaration =
ParameterDeclaration
.mandatory[String](queryParamName)
.withCreator(modify = _.copy(editor = Some(SqlParameterEditor)))
final val queryParam = Parameter[String](queryParamName).copy(editor = Some(SqlParameterEditor))

final val resultStrategyParamName: ParameterName = ParameterName("Result strategy")

Expand Down Expand Up @@ -132,7 +129,7 @@ class DatabaseQueryEnricher(val dbPoolConfig: DBPoolConfig, val dbMetaDataProvid
): ContextTransformationDefinition = { case TransformationStep(Nil, _) =>
NextParameters(parameters =
resultStrategyParamDeclaration.createParameter() ::
queryParamDeclaration.createParameter() ::
queryParam ::
cacheTTLParamDeclaration.createParameter() :: Nil
)
}
Expand All @@ -142,14 +139,15 @@ class DatabaseQueryEnricher(val dbPoolConfig: DBPoolConfig, val dbMetaDataProvid
): ContextTransformationDefinition = {
case TransformationStep(
(`resultStrategyParamName`, DefinedEagerParameter(strategyName: String, _)) ::
(`queryParamName`, DefinedEagerParameter(query: String, _)) ::
(`queryParamName`, DefinedEagerParameter(query: TemplateEvaluationResult, _)) ::
(`cacheTTLParamName`, _) :: Nil,
None
) =>
if (query.isEmpty) {
val renderedQuery = query.renderedTemplate
if (renderedQuery.isEmpty) {
FinalResults(context, errors = CustomNodeError("Query is missing", Some(queryParamName)) :: Nil, state = None)
} else {
parseQuery(context, dependencies, strategyName, query)
parseQuery(context, dependencies, strategyName, renderedQuery)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package pl.touk.nussknacker.sql.service

import pl.touk.nussknacker.engine.api.TemplateRenderedPart.RenderedLiteral
import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.CustomNodeError
import pl.touk.nussknacker.engine.api.context.transformation.{DefinedEagerParameter, OutputVariableNameValue}
import pl.touk.nussknacker.engine.api.context.{OutputVar, ValidationContext}
import pl.touk.nussknacker.engine.api.typed.typing.{Typed, Unknown}
import pl.touk.nussknacker.engine.api.NodeId
import pl.touk.nussknacker.engine.api.{NodeId, TemplateEvaluationResult}
import pl.touk.nussknacker.sql.db.query.{ResultSetStrategy, SingleResultStrategy}
import pl.touk.nussknacker.sql.db.schema.MetaDataProviderFactory
import pl.touk.nussknacker.sql.utils.BaseHsqlQueryEnricherTest
Expand Down Expand Up @@ -32,8 +33,10 @@ class DatabaseQueryEnricherValidationTest extends BaseHsqlQueryEnricherTest {
service.TransformationStep(
List(
DatabaseQueryEnricher.resultStrategyParamName -> eagerValueParameter(SingleResultStrategy.name),
DatabaseQueryEnricher.queryParamName -> eagerValueParameter("select from"),
DatabaseQueryEnricher.cacheTTLParamName -> eagerValueParameter(Duration.ofMinutes(1)),
DatabaseQueryEnricher.queryParamName -> eagerValueParameter(
TemplateEvaluationResult(List(RenderedLiteral("select from")))
),
DatabaseQueryEnricher.cacheTTLParamName -> eagerValueParameter(Duration.ofMinutes(1)),
),
None
)
Expand Down Expand Up @@ -62,8 +65,10 @@ class DatabaseQueryEnricherValidationTest extends BaseHsqlQueryEnricherTest {
service.TransformationStep(
List(
DatabaseQueryEnricher.resultStrategyParamName -> eagerValueParameter(ResultSetStrategy.name),
DatabaseQueryEnricher.queryParamName -> eagerValueParameter("select * from persons"),
DatabaseQueryEnricher.cacheTTLParamName -> eagerValueParameter(Duration.ofMinutes(1)),
DatabaseQueryEnricher.queryParamName -> eagerValueParameter(
TemplateEvaluationResult(List(RenderedLiteral("select * from persons")))
),
DatabaseQueryEnricher.cacheTTLParamName -> eagerValueParameter(Duration.ofMinutes(1)),
),
None
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import pl.touk.nussknacker.engine.definition.component.methodbased.MethodBasedCo
import pl.touk.nussknacker.engine.definition.component.{ComponentStaticDefinition, FragmentSpecificData}
import pl.touk.nussknacker.engine.util.Implicits.RichScalaMap
import pl.touk.nussknacker.engine.ModelData
import pl.touk.nussknacker.engine.api.TemplateEvaluationResult
import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypingResult}
import pl.touk.nussknacker.restmodel.definition._
import pl.touk.nussknacker.ui.definition.DefinitionsService.{
ComponentUiConfigMode,
Expand Down Expand Up @@ -162,7 +164,7 @@ object DefinitionsService {
def createUIParameter(parameter: Parameter): UIParameter = {
UIParameter(
name = parameter.name.value,
typ = parameter.typ,
typ = toUIType(parameter.typ),
editor = parameter.finalEditor,
defaultValue = parameter.finalDefaultValue,
additionalVariables = parameter.additionalVariables.mapValuesNow(_.typingResult),
Expand All @@ -174,6 +176,10 @@ object DefinitionsService {
)
}

private def toUIType(typingResult: TypingResult): TypingResult = {
if (typingResult == Typed[TemplateEvaluationResult]) Typed[String] else typingResult
}

def createUIScenarioPropertyConfig(config: ScenarioPropertyConfig): UiScenarioPropertyConfig = {
val editor = UiScenarioPropertyEditorDeterminer.determine(config)
UiScenarioPropertyConfig(config.defaultValue, editor, config.label, config.hintText)
Expand Down
2 changes: 2 additions & 0 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
* [#7145](https://github.com/TouK/nussknacker/pull/7145) Lift TypingResult information for dictionaries
* [#7116](https://github.com/TouK/nussknacker/pull/7116) Improve missing Flink Kafka Source / Sink TypeInformation
* [#7123](https://github.com/TouK/nussknacker/pull/7123) Fix deployments for scenarios with dict editors after model reload
* [#7162](https://github.com/TouK/nussknacker/pull/7162) Component API enhancement: ability to access information about
expression parts used in SpEL template

## 1.18

Expand Down
3 changes: 3 additions & 0 deletions docs/MigrationGuide.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ To see the biggest differences please consult the [changelog](Changelog.md).
* [#6988](https://github.com/TouK/nussknacker/pull/6988) Removed unused API classes: `MultiMap`, `TimestampedEvictableStateFunction`.
`MultiMap` was incorrectly handled by Flink's default Kryo serializer, so if you want to copy it to your code
you should write and register a proper serializer.
* [#7162](https://github.com/TouK/nussknacker/pull/7162) When component declares that requires parameter with either `SpelTemplateParameterEditor`
or `SqlParameterEditor` editor, in the runtime, for the expression evaluation result, will be used the new `TemplateEvaluationResult`
class instead of `String` class. To access the previous `String` use `TemplateEvaluationResult.renderedTemplate` method.
### REST API changes
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package pl.touk.nussknacker.engine.flink

import com.typesafe.config.ConfigFactory
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.connector.source.Boundedness
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.util.Collector
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import pl.touk.nussknacker.engine.api.TemplateRenderedPart.{RenderedLiteral, RenderedSubExpression}
import pl.touk.nussknacker.engine.api._
import pl.touk.nussknacker.engine.api.component.{BoundedStreamComponent, ComponentDefinition}
import pl.touk.nussknacker.engine.api.context.ValidationContext
import pl.touk.nussknacker.engine.api.context.transformation.{DefinedLazyParameter, NodeDependencyValue, SingleInputDynamicComponent}
import pl.touk.nussknacker.engine.api.definition.{NodeDependency, OutputVariableNameDependency, Parameter, SpelTemplateParameterEditor}
import pl.touk.nussknacker.engine.api.parameter.ParameterName
import pl.touk.nussknacker.engine.api.typed.typing.Typed
import pl.touk.nussknacker.engine.build.ScenarioBuilder
import pl.touk.nussknacker.engine.flink.api.process.{AbstractOneParamLazyParameterFunction, FlinkCustomNodeContext, FlinkCustomStreamTransformation}
import pl.touk.nussknacker.engine.flink.test.FlinkSpec
import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner._
import pl.touk.nussknacker.engine.graph.expression.Expression
import pl.touk.nussknacker.engine.process.FlinkJobConfig.ExecutionMode
import pl.touk.nussknacker.engine.spel.SpelExtension._
import pl.touk.nussknacker.engine.util.test.TestScenarioRunner
import pl.touk.nussknacker.test.ValidatedValuesDetailedMessage

class SpelTemplateLazyParameterTest extends AnyFunSuite with FlinkSpec with Matchers with ValidatedValuesDetailedMessage {

private lazy val runner = TestScenarioRunner
.flinkBased(ConfigFactory.empty(), flinkMiniCluster)
.withExecutionMode(ExecutionMode.Batch)
.withExtraComponents(
List(ComponentDefinition("spelTemplatePartsCustomTransformer", SpelTemplatePartsCustomTransformer))
)
.build()

test("flink custom transformer using spel template rendered parts") {
val scenario = ScenarioBuilder
.streaming("test")
.source("source", TestScenarioRunner.testDataSource)
.customNode(
"custom",
"output",
"spelTemplatePartsCustomTransformer",
"template" -> Expression.spelTemplate(s"Hello#{#input}")
)
.emptySink("sink", TestScenarioRunner.testResultSink, "value" -> "#output".spel)

val result = runner.runWithData(scenario, List(1, 2, 3), Boundedness.BOUNDED)
result.validValue.errors shouldBe empty
result.validValue.successes shouldBe List(
"[Hello]-literal[1]-subexpression",
"[Hello]-literal[2]-subexpression",
"[Hello]-literal[3]-subexpression"
)
}

}

object SpelTemplatePartsCustomTransformer
extends CustomStreamTransformer
with SingleInputDynamicComponent[FlinkCustomStreamTransformation]
with BoundedStreamComponent {

private val spelTemplateParameterName = ParameterName("template")

private val spelTemplateParameter = Parameter
.optional[String](spelTemplateParameterName)
.copy(
isLazyParameter = true,
editor = Some(SpelTemplateParameterEditor)
)

override type State = Unit

override def contextTransformation(context: ValidationContext, dependencies: List[NodeDependencyValue])(
implicit nodeId: NodeId
): SpelTemplatePartsCustomTransformer.ContextTransformationDefinition = {
case TransformationStep(Nil, _) => NextParameters(List(spelTemplateParameter))
case TransformationStep((`spelTemplateParameterName`, DefinedLazyParameter(_)) :: Nil, _) =>
val outName = OutputVariableNameDependency.extract(dependencies)
FinalResults(context.withVariableUnsafe(outName, Typed[String]), List.empty)
}

override def nodeDependencies: List[NodeDependency] = List(OutputVariableNameDependency)

override def implementation(
params: Params,
dependencies: List[NodeDependencyValue],
finalState: Option[Unit]
): FlinkCustomStreamTransformation = {
val templateLazyParam: LazyParameter[TemplateEvaluationResult] =
params.extractUnsafe[LazyParameter[TemplateEvaluationResult]](spelTemplateParameterName)
FlinkCustomStreamTransformation {
(dataStream: DataStream[Context], flinkCustomNodeContext: FlinkCustomNodeContext) =>
dataStream.flatMap(
new AbstractOneParamLazyParameterFunction[TemplateEvaluationResult](
templateLazyParam,
flinkCustomNodeContext.lazyParameterHelper
) with FlatMapFunction[Context, ValueWithContext[String]] {
override def flatMap(value: Context, out: Collector[ValueWithContext[String]]): Unit = {
collectHandlingErrors(value, out) {
val templateResult = evaluateParameter(value)
val result = templateResult.renderedParts.map {
case RenderedLiteral(value) => s"[$value]-literal"
case RenderedSubExpression(value) => s"[$value]-subexpression"
}.mkString
ValueWithContext(result, value)
}
}
},
flinkCustomNodeContext.valueWithContextInfo.forClass[String]
).asInstanceOf[DataStream[ValueWithContext[AnyRef]]]
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ object LoggingService extends EagerService {
def prepare(
@ParamName("logger") @Nullable loggerName: String,
@ParamName("level") @DefaultValue("T(org.slf4j.event.Level).DEBUG") level: Level,
@ParamName("message") @SimpleEditor(`type` = SimpleEditorType.SPEL_TEMPLATE_EDITOR) message: LazyParameter[String]
@ParamName("message") @SimpleEditor(`type` = SimpleEditorType.SPEL_TEMPLATE_EDITOR) message: LazyParameter[
TemplateEvaluationResult
]
)(implicit metaData: MetaData, nodeId: NodeId): ServiceInvoker =
new ServiceInvoker {

Expand All @@ -31,7 +33,7 @@ object LoggingService extends EagerService {
collector: ServiceInvocationCollector,
componentUseCase: ComponentUseCase
): Future[Any] = {
val msg = message.evaluate(context)
val msg = message.evaluate(context).renderedTemplate
level match {
case Level.TRACE => logger.trace(msg)
case Level.DEBUG => logger.debug(msg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import pl.touk.nussknacker.engine.api.component.UnboundedStreamComponent
import pl.touk.nussknacker.engine.api.editor.{SimpleEditor, SimpleEditorType}
import pl.touk.nussknacker.engine.api.process.SourceFactory
import pl.touk.nussknacker.engine.api.typed.typing.Unknown
import pl.touk.nussknacker.engine.api.{MethodToInvoke, ParamName}
import pl.touk.nussknacker.engine.api.{MethodToInvoke, ParamName, TemplateEvaluationResult}
import pl.touk.nussknacker.engine.flink.util.source.CollectionSource

//It's only for test FE sql editor
object SqlSource extends SourceFactory with UnboundedStreamComponent {

@MethodToInvoke
def source(@ParamName("sql") @SimpleEditor(`type` = SimpleEditorType.SQL_EDITOR) sql: String) =
def source(@ParamName("sql") @SimpleEditor(`type` = SimpleEditorType.SQL_EDITOR) sql: TemplateEvaluationResult) =
new CollectionSource[Any](List.empty, None, Unknown)

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@ package pl.touk.nussknacker.engine.schemedkafka

import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.serialization.KafkaSchemaRegistryBasedValueSerializationSchemaFactory
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.UniversalSchemaBasedSerdeProvider.createSchemaIdFromMessageExtractor
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.{UniversalKafkaDeserializerFactory, UniversalSchemaValidator, UniversalSerializerFactory, UniversalToJsonFormatterFactory}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.{
UniversalKafkaDeserializerFactory,
UniversalSchemaValidator,
UniversalSerializerFactory,
UniversalToJsonFormatterFactory
}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{SchemaBasedSerdeProvider, SchemaRegistryClientFactory}
import pl.touk.nussknacker.engine.schemedkafka.source.flink.FlinkKafkaSchemaRegistryBasedKeyValueDeserializationSchemaFactory

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import io.circe.{Decoder, Encoder}
import pl.touk.nussknacker.engine.api.definition.ParameterEditor
import pl.touk.nussknacker.engine.api.typed.supertype.ReturningSingleClassPromotionStrategy
import pl.touk.nussknacker.engine.api.typed.typing.Typed
import pl.touk.nussknacker.engine.api.{Hidden, HideToString}
import pl.touk.nussknacker.engine.api.{Hidden, HideToString, TemplateEvaluationResult}

import java.lang.reflect.{AccessibleObject, Member, Method}
import java.text.NumberFormat
Expand Down Expand Up @@ -109,8 +109,9 @@ object ClassExtractionSettings {
// we want only boxed types
ClassPredicate { case cl => cl.isPrimitive },
ExactClassPredicate[ReturningSingleClassPromotionStrategy],
// We use this type only programmable
// We use these types only programmable
ClassNamePredicate("pl.touk.nussknacker.engine.spel.SpelExpressionRepr"),
ExactClassPredicate[TemplateEvaluationResult],
)

lazy val ExcludedCollectionFunctionalClasses: List[ClassPredicate] = List(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class OrderedDependencies(dependencies: List[NodeDependency]) extends Serializab
): List[Any] = {
dependencies.map {
case param: Parameter =>
values.getOrElse(param.name, throw new IllegalArgumentException(s"Missing parameter: ${param.name}"))
values.getOrElse(param.name, throw new IllegalArgumentException(s"Missing parameter: ${param.name.value}"))
case OutputVariableNameDependency =>
outputVariableNameOpt.getOrElse(throw MissingOutputVariableException)
case TypedNodeDependency(clazz) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ import org.springframework.expression.spel.{
SpelParserConfiguration,
standard
}
import pl.touk.nussknacker.engine.api.Context
import pl.touk.nussknacker.engine.api.TemplateRenderedPart.{RenderedLiteral, RenderedSubExpression}
import pl.touk.nussknacker.engine.api.context.ValidationContext
import pl.touk.nussknacker.engine.api.dict.DictRegistry
import pl.touk.nussknacker.engine.api.exception.NonTransientException
import pl.touk.nussknacker.engine.api.generics.ExpressionParseError
import pl.touk.nussknacker.engine.api.typed.typing
import pl.touk.nussknacker.engine.api.typed.typing.{SingleTypingResult, TypingResult}
import pl.touk.nussknacker.engine.api.{Context, TemplateEvaluationResult, TemplateRenderedPart}
import pl.touk.nussknacker.engine.definition.clazz.ClassDefinitionSet
import pl.touk.nussknacker.engine.definition.globalvariables.ExpressionConfigDefinition
import pl.touk.nussknacker.engine.dict.{KeysDictTyper, LabelsDictTyper}
Expand Down Expand Up @@ -107,7 +108,28 @@ class SpelExpression(
return SpelExpressionRepr(parsed.parsed, ctx, globals, original).asInstanceOf[T]
}
val evaluationContext = evaluationContextPreparer.prepareEvaluationContext(ctx, globals)
parsed.getValue[T](evaluationContext, expectedClass)
flavour match {
case SpelExpressionParser.Standard =>
parsed.getValue[T](evaluationContext, expectedClass)
case SpelExpressionParser.Template =>
val parts = renderTemplateExpressionParts(evaluationContext)
TemplateEvaluationResult(parts).asInstanceOf[T]
}
}

private def renderTemplateExpressionParts(evaluationContext: EvaluationContext): List[TemplateRenderedPart] = {
def renderExpression(expression: Expression): List[TemplateRenderedPart] = expression match {
case literal: LiteralExpression => List(RenderedLiteral(literal.getExpressionString))
case spelExpr: org.springframework.expression.spel.standard.SpelExpression =>
// TODO: Should we use the same trick with re-parsing after ClassCastException as we use in ParsedSpelExpression?
List(RenderedSubExpression(spelExpr.getValue[String](evaluationContext, classOf[String])))
case composite: CompositeStringExpression => composite.getExpressions.toList.flatMap(renderExpression)
case other =>
throw new IllegalArgumentException(
s"Unsupported expression type: ${other.getClass.getName} for a template expression"
)
}
renderExpression(parsed.parsed)
}

private def logOnException[A](ctx: Context)(block: => A): A = {
Expand Down
Loading

0 comments on commit 14a9a75

Please sign in to comment.