Skip to content

Commit

Permalink
Disable namespace in Kafka component (#7326)
Browse files Browse the repository at this point in the history
  • Loading branch information
mslabek authored Dec 13, 2024
1 parent 671cc85 commit 5c6cbb9
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import pl.touk.nussknacker.engine.api.component.{
ComponentType,
NussknackerVersion
}
import pl.touk.nussknacker.engine.api.namespaces.NamingStrategy
import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies
import pl.touk.nussknacker.engine.kafka.KafkaConfig
import pl.touk.nussknacker.engine.kafka.source.flink.FlinkKafkaSourceImplFactory
Expand All @@ -29,8 +30,9 @@ class FlinkKafkaComponentProvider extends ComponentProvider {
override def resolveConfigForExecution(config: Config): Config = config

override def create(config: Config, dependencies: ProcessObjectDependencies): List[ComponentDefinition] = {
val overriddenDependencies = TemporaryKafkaConfigMapping.prepareDependencies(config, dependencies)
val docsConfig: DocsConfig = DocsConfig(config)
val overriddenDependencies = TemporaryKafkaConfigMapping.prepareDependencies(config, dependencies)
val finalComponentDependencies = dependenciesWithDisabledNamespacingIfApplicable(config, overriddenDependencies)
val docsConfig: DocsConfig = DocsConfig(config)
import docsConfig._
def universal(componentType: ComponentType) = s"DataSourcesAndSinks#kafka-$componentType"

Expand All @@ -42,7 +44,7 @@ class FlinkKafkaComponentProvider extends ComponentProvider {
new UniversalKafkaSourceFactory(
schemaRegistryClientFactory,
universalSerdeProvider,
overriddenDependencies,
finalComponentDependencies,
new FlinkKafkaSourceImplFactory(None)
)
).withRelativeDocs(universal(ComponentType.Source)),
Expand All @@ -51,7 +53,7 @@ class FlinkKafkaComponentProvider extends ComponentProvider {
new UniversalKafkaSinkFactory(
schemaRegistryClientFactory,
universalSerdeProvider,
overriddenDependencies,
finalComponentDependencies,
FlinkKafkaUniversalSinkImplFactory
)
).withRelativeDocs(universal(ComponentType.Sink))
Expand All @@ -62,6 +64,18 @@ class FlinkKafkaComponentProvider extends ComponentProvider {

override def isAutoLoaded: Boolean = false

private def dependenciesWithDisabledNamespacingIfApplicable(
config: Config,
dependencies: ProcessObjectDependencies
): ProcessObjectDependencies = {
val disableNamespacePath = "disableNamespace"
if (config.hasPath(disableNamespacePath) && config.getBoolean(disableNamespacePath)) {
dependencies.copy(namingStrategy = NamingStrategy(None))
} else {
dependencies
}
}

}

//FIXME: Kafka components should not depend directly on ProcessObjectDependencies, only on
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package pl.touk.nussknacker.defaultmodel

import com.typesafe.config.{Config, ConfigFactory}
import com.typesafe.config.ConfigValueFactory.fromAnyRef
import com.typesafe.config.{Config, ConfigFactory}
import io.circe.Json
import io.confluent.kafka.schemaregistry.json.JsonSchema
import pl.touk.nussknacker.engine.api.process.TopicName
Expand All @@ -12,13 +12,13 @@ import pl.touk.nussknacker.engine.schemedkafka.KafkaUniversalComponentTransforme
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.SchemaVersionOption
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.ConfluentUtils

class FlinkNamespacedKafkaTest extends FlinkWithKafkaSuite {
abstract class BaseFlinkNamespacedKafkaTest extends FlinkWithKafkaSuite {

import pl.touk.nussknacker.engine.spel.SpelExtension._

private val namespaceName = "ns"
private val inputTopic = TopicName.ForSource("input")
private val outputTopic = TopicName.ForSink("output")
protected val inputTopic = TopicName.ForSource("input")
protected val outputTopic = TopicName.ForSink("output")

override lazy val config: Config = ConfigFactory
.load()
Expand All @@ -39,16 +39,19 @@ class FlinkNamespacedKafkaTest extends FlinkWithKafkaSuite {
| "value": "Jan"
|}""".stripMargin

test("should send message to topic with appended namespace") {
val namespacedInputTopic = Namespaced(inputTopic)
val namespacedOutputTopic = Namespaced(outputTopic)
def runTest(
nuVisibleInputTopic: TopicName.ForSource,
externalInputTopicName: TopicName.ForSource,
nuVisibleOutputTopic: TopicName.ForSink,
externalOutputTopicName: TopicName.ForSink
): Unit = {
val inputSubject = ConfluentUtils.topicSubject(externalInputTopicName.toUnspecialized, isKey = false)
val outputSubject = ConfluentUtils.topicSubject(externalOutputTopicName.toUnspecialized, isKey = false)

val inputSubject = ConfluentUtils.topicSubject(namespacedInputTopic.toUnspecialized, isKey = false)
val outputSubject = ConfluentUtils.topicSubject(namespacedOutputTopic.toUnspecialized, isKey = false)
schemaRegistryMockClient.register(inputSubject, schema)
schemaRegistryMockClient.register(outputSubject, schema)

sendAsJson(record, namespacedInputTopic)
sendAsJson(record, externalInputTopicName)

val scenarioId = "scenarioId"
val sourceId = "input"
Expand All @@ -58,15 +61,15 @@ class FlinkNamespacedKafkaTest extends FlinkWithKafkaSuite {
.source(
sourceId,
"kafka",
KafkaUniversalComponentTransformer.topicParamName.value -> s"'${inputTopic.name}'".spel,
KafkaUniversalComponentTransformer.topicParamName.value -> s"'${nuVisibleInputTopic.name}'".spel,
KafkaUniversalComponentTransformer.schemaVersionParamName.value -> s"'${SchemaVersionOption.LatestOptionName}'".spel
)
.emptySink(
"output",
"kafka",
KafkaUniversalComponentTransformer.sinkKeyParamName.value -> "".spel,
KafkaUniversalComponentTransformer.sinkValueParamName.value -> "#input".spel,
KafkaUniversalComponentTransformer.topicParamName.value -> s"'${outputTopic.name}'".spel,
KafkaUniversalComponentTransformer.topicParamName.value -> s"'${nuVisibleOutputTopic.name}'".spel,
KafkaUniversalComponentTransformer.schemaVersionParamName.value -> s"'${SchemaVersionOption.LatestOptionName}'".spel,
KafkaUniversalComponentTransformer.sinkRawEditorParamName.value -> s"true".spel,
)
Expand All @@ -75,7 +78,7 @@ class FlinkNamespacedKafkaTest extends FlinkWithKafkaSuite {
val processed =
kafkaClient
.createConsumer()
.consumeWithJson[Json](namespacedOutputTopic.name)
.consumeWithJson[Json](externalOutputTopicName.name)
.take(1)
.map(_.message())
.toList
Expand Down Expand Up @@ -105,4 +108,37 @@ class FlinkNamespacedKafkaTest extends FlinkWithKafkaSuite {

}

protected def namespacedTopic[T <: TopicName](topic: T)(implicit ns: Namespaced[T]): T = Namespaced(topic)

}

class FlinkNamespacedKafkaTest extends BaseFlinkNamespacedKafkaTest {

test("should send message to topic with appended namespace") {
val namespacedInputTopic = namespacedTopic(inputTopic)
val namespacedOutputTopic = namespacedTopic(outputTopic)
runTest(
nuVisibleInputTopic = inputTopic,
externalInputTopicName = namespacedInputTopic,
nuVisibleOutputTopic = outputTopic,
externalOutputTopicName = namespacedOutputTopic
)
}

}

class FlinkDisabledNamespacedKafkaTest extends BaseFlinkNamespacedKafkaTest {

override def kafkaComponentsConfig: Config =
super.kafkaComponentsConfig.withValue("disableNamespace", fromAnyRef(true))

test("should send message to topic without appended namespace when namespace is disabled for used kafka topic") {
runTest(
nuVisibleInputTopic = inputTopic,
externalInputTopicName = inputTopic,
nuVisibleOutputTopic = outputTopic,
externalOutputTopicName = outputTopic
)
}

}

0 comments on commit 5c6cbb9

Please sign in to comment.