diff --git a/engine/flink/components/kafka/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/FlinkKafkaComponentProvider.scala b/engine/flink/components/kafka/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/FlinkKafkaComponentProvider.scala index dc8899248ac..d79c21b0d53 100644 --- a/engine/flink/components/kafka/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/FlinkKafkaComponentProvider.scala +++ b/engine/flink/components/kafka/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/FlinkKafkaComponentProvider.scala @@ -9,6 +9,7 @@ import pl.touk.nussknacker.engine.api.component.{ ComponentType, NussknackerVersion } +import pl.touk.nussknacker.engine.api.namespaces.NamingStrategy import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies import pl.touk.nussknacker.engine.kafka.KafkaConfig import pl.touk.nussknacker.engine.kafka.source.flink.FlinkKafkaSourceImplFactory @@ -29,8 +30,9 @@ class FlinkKafkaComponentProvider extends ComponentProvider { override def resolveConfigForExecution(config: Config): Config = config override def create(config: Config, dependencies: ProcessObjectDependencies): List[ComponentDefinition] = { - val overriddenDependencies = TemporaryKafkaConfigMapping.prepareDependencies(config, dependencies) - val docsConfig: DocsConfig = DocsConfig(config) + val overriddenDependencies = TemporaryKafkaConfigMapping.prepareDependencies(config, dependencies) + val finalComponentDependencies = dependenciesWithDisabledNamespacingIfApplicable(config, overriddenDependencies) + val docsConfig: DocsConfig = DocsConfig(config) import docsConfig._ def universal(componentType: ComponentType) = s"DataSourcesAndSinks#kafka-$componentType" @@ -42,7 +44,7 @@ class FlinkKafkaComponentProvider extends ComponentProvider { new UniversalKafkaSourceFactory( schemaRegistryClientFactory, universalSerdeProvider, - overriddenDependencies, + finalComponentDependencies, new FlinkKafkaSourceImplFactory(None) ) ).withRelativeDocs(universal(ComponentType.Source)), @@ -51,7 +53,7 @@ class FlinkKafkaComponentProvider extends ComponentProvider { new UniversalKafkaSinkFactory( schemaRegistryClientFactory, universalSerdeProvider, - overriddenDependencies, + finalComponentDependencies, FlinkKafkaUniversalSinkImplFactory ) ).withRelativeDocs(universal(ComponentType.Sink)) @@ -62,6 +64,18 @@ class FlinkKafkaComponentProvider extends ComponentProvider { override def isAutoLoaded: Boolean = false + private def dependenciesWithDisabledNamespacingIfApplicable( + config: Config, + dependencies: ProcessObjectDependencies + ): ProcessObjectDependencies = { + val disableNamespacePath = "disableNamespace" + if (config.hasPath(disableNamespacePath) && config.getBoolean(disableNamespacePath)) { + dependencies.copy(namingStrategy = NamingStrategy(None)) + } else { + dependencies + } + } + } //FIXME: Kafka components should not depend directly on ProcessObjectDependencies, only on diff --git a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/FlinkNamespacedKafkaTest.scala b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/FlinkNamespacedKafkaTest.scala index d0f84e594dc..de2b7f34270 100644 --- a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/FlinkNamespacedKafkaTest.scala +++ b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/FlinkNamespacedKafkaTest.scala @@ -1,7 +1,7 @@ package pl.touk.nussknacker.defaultmodel -import com.typesafe.config.{Config, ConfigFactory} import com.typesafe.config.ConfigValueFactory.fromAnyRef +import com.typesafe.config.{Config, ConfigFactory} import io.circe.Json import io.confluent.kafka.schemaregistry.json.JsonSchema import pl.touk.nussknacker.engine.api.process.TopicName @@ -12,13 +12,13 @@ import pl.touk.nussknacker.engine.schemedkafka.KafkaUniversalComponentTransforme import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.SchemaVersionOption import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.ConfluentUtils -class FlinkNamespacedKafkaTest extends FlinkWithKafkaSuite { +abstract class BaseFlinkNamespacedKafkaTest extends FlinkWithKafkaSuite { import pl.touk.nussknacker.engine.spel.SpelExtension._ private val namespaceName = "ns" - private val inputTopic = TopicName.ForSource("input") - private val outputTopic = TopicName.ForSink("output") + protected val inputTopic = TopicName.ForSource("input") + protected val outputTopic = TopicName.ForSink("output") override lazy val config: Config = ConfigFactory .load() @@ -39,16 +39,19 @@ class FlinkNamespacedKafkaTest extends FlinkWithKafkaSuite { | "value": "Jan" |}""".stripMargin - test("should send message to topic with appended namespace") { - val namespacedInputTopic = Namespaced(inputTopic) - val namespacedOutputTopic = Namespaced(outputTopic) + def runTest( + nuVisibleInputTopic: TopicName.ForSource, + externalInputTopicName: TopicName.ForSource, + nuVisibleOutputTopic: TopicName.ForSink, + externalOutputTopicName: TopicName.ForSink + ): Unit = { + val inputSubject = ConfluentUtils.topicSubject(externalInputTopicName.toUnspecialized, isKey = false) + val outputSubject = ConfluentUtils.topicSubject(externalOutputTopicName.toUnspecialized, isKey = false) - val inputSubject = ConfluentUtils.topicSubject(namespacedInputTopic.toUnspecialized, isKey = false) - val outputSubject = ConfluentUtils.topicSubject(namespacedOutputTopic.toUnspecialized, isKey = false) schemaRegistryMockClient.register(inputSubject, schema) schemaRegistryMockClient.register(outputSubject, schema) - sendAsJson(record, namespacedInputTopic) + sendAsJson(record, externalInputTopicName) val scenarioId = "scenarioId" val sourceId = "input" @@ -58,7 +61,7 @@ class FlinkNamespacedKafkaTest extends FlinkWithKafkaSuite { .source( sourceId, "kafka", - KafkaUniversalComponentTransformer.topicParamName.value -> s"'${inputTopic.name}'".spel, + KafkaUniversalComponentTransformer.topicParamName.value -> s"'${nuVisibleInputTopic.name}'".spel, KafkaUniversalComponentTransformer.schemaVersionParamName.value -> s"'${SchemaVersionOption.LatestOptionName}'".spel ) .emptySink( @@ -66,7 +69,7 @@ class FlinkNamespacedKafkaTest extends FlinkWithKafkaSuite { "kafka", KafkaUniversalComponentTransformer.sinkKeyParamName.value -> "".spel, KafkaUniversalComponentTransformer.sinkValueParamName.value -> "#input".spel, - KafkaUniversalComponentTransformer.topicParamName.value -> s"'${outputTopic.name}'".spel, + KafkaUniversalComponentTransformer.topicParamName.value -> s"'${nuVisibleOutputTopic.name}'".spel, KafkaUniversalComponentTransformer.schemaVersionParamName.value -> s"'${SchemaVersionOption.LatestOptionName}'".spel, KafkaUniversalComponentTransformer.sinkRawEditorParamName.value -> s"true".spel, ) @@ -75,7 +78,7 @@ class FlinkNamespacedKafkaTest extends FlinkWithKafkaSuite { val processed = kafkaClient .createConsumer() - .consumeWithJson[Json](namespacedOutputTopic.name) + .consumeWithJson[Json](externalOutputTopicName.name) .take(1) .map(_.message()) .toList @@ -105,4 +108,37 @@ class FlinkNamespacedKafkaTest extends FlinkWithKafkaSuite { } + protected def namespacedTopic[T <: TopicName](topic: T)(implicit ns: Namespaced[T]): T = Namespaced(topic) + +} + +class FlinkNamespacedKafkaTest extends BaseFlinkNamespacedKafkaTest { + + test("should send message to topic with appended namespace") { + val namespacedInputTopic = namespacedTopic(inputTopic) + val namespacedOutputTopic = namespacedTopic(outputTopic) + runTest( + nuVisibleInputTopic = inputTopic, + externalInputTopicName = namespacedInputTopic, + nuVisibleOutputTopic = outputTopic, + externalOutputTopicName = namespacedOutputTopic + ) + } + +} + +class FlinkDisabledNamespacedKafkaTest extends BaseFlinkNamespacedKafkaTest { + + override def kafkaComponentsConfig: Config = + super.kafkaComponentsConfig.withValue("disableNamespace", fromAnyRef(true)) + + test("should send message to topic without appended namespace when namespace is disabled for used kafka topic") { + runTest( + nuVisibleInputTopic = inputTopic, + externalInputTopicName = inputTopic, + nuVisibleOutputTopic = outputTopic, + externalOutputTopicName = outputTopic + ) + } + }