Skip to content

Commit

Permalink
evaluate value every time for count > 1
Browse files Browse the repository at this point in the history
  • Loading branch information
mslabek committed Dec 31, 2024
1 parent e863800 commit bc276d9
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@ import pl.touk.nussknacker.engine.testing.LocalModelData
import pl.touk.nussknacker.engine.testmode.ResultsCollectingListenerHolder
import pl.touk.nussknacker.test.PatientScalaFutures

class SampleGeneratorSourceFactorySpec extends AnyFunSuite with FlinkSpec with PatientScalaFutures with Matchers with Inside {
class SampleGeneratorSourceFactorySpec
extends AnyFunSuite
with FlinkSpec
with PatientScalaFutures
with Matchers
with Inside {

test("should produce results for each element in list") {
val sinkId = "sinkId"
Expand Down Expand Up @@ -52,4 +57,40 @@ class SampleGeneratorSourceFactorySpec extends AnyFunSuite with FlinkSpec with P

}

test("should produce n individually evaluated results for n count") {
val sinkId = "sinkId"

val collectingListener = ResultsCollectingListenerHolder.registerListener
val model = LocalModelData(
ConfigFactory.empty(),
FlinkBaseComponentProvider.Components ::: FlinkBaseUnboundedComponentProvider.Components,
configCreator = new ConfigCreatorWithCollectingListener(collectingListener),
)
val scenario = ScenarioBuilder
.streaming("test")
.source(
"sample-generator",
"sample-generator",
"period" -> "T(java.time.Duration).ofSeconds(1)".spel,
"count" -> "2".spel,
"value" -> s"T(java.util.UUID).randomUUID".spel
)
.emptySink(sinkId, "dead-end")

val stoppableEnv = flinkMiniCluster.createExecutionEnvironment()
UnitTestsFlinkRunner.registerInEnvironmentWithModel(stoppableEnv, model)(scenario)

val id = stoppableEnv.executeAndWaitForStart(scenario.name.value)
try {
eventually {
val results = collectingListener.results.nodeResults.get(sinkId)
val emittedResults = results.toList.flatten.flatMap(_.variableTyped("input"))
emittedResults.size should be > 1
emittedResults.distinct.size shouldBe emittedResults.size
}
} finally {
stoppableEnv.cancel(id.getJobID)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pl.touk.nussknacker.engine.flink.util.transformer

import com.github.ghik.silencer.silent
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.source.SourceFunction
Expand Down Expand Up @@ -63,10 +64,10 @@ class SampleGeneratorSourceFactory(timestampAssigner: TimestampWatermarkHandler[
val stream = env
.addSource(new PeriodicFunction(period))
.map(_ => Context(processName.value))
.flatMap(value)(ctx)
.flatMap(new ContextMultiplierPerCountFunction(count))
.flatMap(ctx.lazyParameterHelper.lazyMapFunction(value))
.flatMap(
(value: ValueWithContext[AnyRef], out: Collector[AnyRef]) =>
1.to(count).map(_ => value.value).foreach(out.collect),
(value: ValueWithContext[AnyRef], out: Collector[AnyRef]) => out.collect(value.value),
TypeInformationDetection.instance.forType[AnyRef](value.returnType)
)

Expand All @@ -88,6 +89,14 @@ class SampleGeneratorSourceFactory(timestampAssigner: TimestampWatermarkHandler[
}
}

class ContextMultiplierPerCountFunction(count: Int) extends FlatMapFunction[Context, Context] with Serializable {

override def flatMap(context: Context, out: Collector[Context]): Unit = {
1.to(count).foreach(_ => out.collect(context))
}

}

}

@silent("deprecated")
Expand Down

0 comments on commit bc276d9

Please sign in to comment.