Skip to content

Commit

Permalink
offsetResetStrategy parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
gskrobisz committed Sep 25, 2024
1 parent 18c1acd commit 82bc8e5
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ sealed trait NodeDeploymentData

final case class SqlFilteringExpression(sqlExpression: String) extends NodeDeploymentData

final case class KafkaSourceOffset(offset: Long) extends NodeDeploymentData
final case class KafkaSourceOffset(offsetResetStrategy: Long) extends NodeDeploymentData

object NodeDeploymentData {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKaf
import org.apache.kafka.clients.consumer.ConsumerRecord
import pl.touk.nussknacker.engine.api.NodeId
import pl.touk.nussknacker.engine.api.component.KafkaSourceOffset
import pl.touk.nussknacker.engine.api.definition.Parameter
import pl.touk.nussknacker.engine.api.definition.{FixedExpressionValue, FixedValuesParameterEditor, Parameter}
import pl.touk.nussknacker.engine.api.deployment.ScenarioActionName
import pl.touk.nussknacker.engine.api.namespaces.NamingStrategy
import pl.touk.nussknacker.engine.api.parameter.ParameterName
Expand Down Expand Up @@ -82,11 +82,21 @@ class FlinkKafkaSource[T](

protected lazy val topics: NonEmptyList[TopicName.ForSource] = preparedTopics.map(_.prepared)

override def activityParametersDefinition: Map[String, List[Parameter]] = Map(
ScenarioActionName.Deploy.value -> List(
Parameter(ParameterName("offset"), Typed.apply[Long])
override def activityParametersDefinition: Map[String, List[Parameter]] = {
import pl.touk.nussknacker.engine.spel.SpelExtension._
val defaultValue = if (kafkaConfig.forceLatestRead.contains(true)) Some("'LATEST'".spel) else Some("'NONE'".spel)
val offsetResetStrategyValues = List(
FixedExpressionValue("'LATEST'", "LATEST"),
FixedExpressionValue("'EARLIEST'", "EARLIEST"),
FixedExpressionValue("'NONE'", "NONE"),
)
)
Map(
ScenarioActionName.Deploy.value -> List(
Parameter(ParameterName("offsetResetStrategy"), Typed.apply[String])
.copy(editor = Some(FixedValuesParameterEditor(offsetResetStrategyValues)), defaultValue = defaultValue),
)
)
}

@silent("deprecated")
protected def flinkSourceFunction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@ package pl.touk.nussknacker.engine.management.sample.source
import org.apache.flink.streaming.api.datastream.DataStreamSource
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import pl.touk.nussknacker.engine.api.component.{KafkaSourceOffset, UnboundedStreamComponent}
import pl.touk.nussknacker.engine.api.definition.Parameter
import pl.touk.nussknacker.engine.api.definition.{
FixedExpressionValue,
FixedValuesParameterEditor,
Parameter,
StringParameterEditor
}
import pl.touk.nussknacker.engine.api.deployment.ScenarioActionName
import pl.touk.nussknacker.engine.api.parameter.ParameterName
import pl.touk.nussknacker.engine.api.process.{SourceFactory, WithActivityParameters}
Expand All @@ -28,11 +33,26 @@ object BoundedSourceWithOffset extends SourceFactory with UnboundedStreamCompone
def source(@ParamName("elements") elements: java.util.List[Any]) =
new CollectionSource[Any](elements.asScala.toList, None, Unknown) with WithActivityParameters {

override def activityParametersDefinition: Map[String, List[Parameter]] = Map(
ScenarioActionName.Deploy.value -> List(
Parameter(ParameterName("offset"), Typed.apply[Long])
override def activityParametersDefinition: Map[String, List[Parameter]] = {

import pl.touk.nussknacker.engine.spel.SpelExtension._

val offsetResetStrategyValues = List(
FixedExpressionValue("'LATEST'", "LATEST"),
FixedExpressionValue("'EARLIEST'", "EARLIEST"),
FixedExpressionValue("'NONE'", "NONE"),
)
)

Map(
ScenarioActionName.Deploy.value -> List(
Parameter(ParameterName("offset"), Typed.apply[Long]),
Parameter(ParameterName("sometext"), Typed.apply[String])
.copy(editor = Some(StringParameterEditor), defaultValue = Some("'example'".spel)),
Parameter(ParameterName("offsetResetStrategy"), Typed.apply[String])
.copy(editor = Some(FixedValuesParameterEditor(offsetResetStrategyValues))),
)
)
}

override protected def createSourceStream[T](
list: List[T],
Expand All @@ -41,7 +61,7 @@ object BoundedSourceWithOffset extends SourceFactory with UnboundedStreamCompone
): DataStreamSource[T] = {
val deploymentDataOpt = flinkNodeContext.nodeDeploymentData.collect { case d: KafkaSourceOffset => d }
val elementsWithOffset = deploymentDataOpt match {
case Some(data) => list.drop(data.offset.toInt)
case Some(data) => list.drop(data.offsetResetStrategy.toInt)
case _ => list
}
super.createSourceStream(elementsWithOffset, env, flinkNodeContext)
Expand Down

0 comments on commit 82bc8e5

Please sign in to comment.