From f3adb3e947c030a10a0de2a1cfb04697953ff070 Mon Sep 17 00:00:00 2001 From: Piotr Przybylski Date: Fri, 27 Sep 2024 11:02:09 +0200 Subject: [PATCH] remove null --- .../exception/KafkaExceptionConsumer.scala | 6 ++--- ...kaExceptionConsumerSerializationSpec.scala | 8 +++++-- .../kafka/KafkaSingleScenarioTaskRun.scala | 7 ++++-- .../KafkaErrorTopicInitializerTest.scala | 4 ++-- .../KafkaErrorTopicInitializer.scala | 23 +++++++++++++++---- 5 files changed, 34 insertions(+), 14 deletions(-) diff --git a/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumer.scala b/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumer.scala index cefd3ddf0fc..d5ee2f36fb2 100644 --- a/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumer.scala +++ b/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumer.scala @@ -25,7 +25,7 @@ class KafkaExceptionConsumerProvider extends FlinkEspExceptionConsumerProvider { val consumerConfig = exceptionHandlerConfig.rootAs[KafkaExceptionConsumerConfig] val producerCreator = kafkaProducerCreator(kafkaConfig) val serializationSchema = createSerializationSchema(metaData, consumerConfig) - val errorTopicInitializer = new KafkaErrorTopicInitializer(kafkaConfig, consumerConfig) + val errorTopicInitializer = new DefaultKafkaErrorTopicInitializer(kafkaConfig, consumerConfig) if (consumerConfig.useSharedProducer) { SharedProducerKafkaExceptionConsumer(metaData, serializationSchema, producerCreator, errorTopicInitializer) } else { @@ -52,9 +52,7 @@ trait BaseKafkaExceptionConsumer extends FlinkEspExceptionConsumer with LazyLogg protected val kafkaErrorTopicInitializer: KafkaErrorTopicInitializer protected val metaData: MetaData - // can be null in tests - private val topic: String = - Option(kafkaErrorTopicInitializer).map(_.exceptionHandlerConfig.topic).getOrElse("-") + private val topic: String = kafkaErrorTopicInitializer.topicName protected def sendKafkaMessage(record: ProducerRecord[Array[Byte], Array[Byte]]): Future[_] diff --git a/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSerializationSpec.scala b/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSerializationSpec.scala index 4b010a6e228..7b5df7153e8 100644 --- a/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSerializationSpec.scala +++ b/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSerializationSpec.scala @@ -42,9 +42,13 @@ class KafkaExceptionConsumerSerializationSpec extends AnyFunSuite with Matchers private val serializationSchema = new KafkaJsonExceptionSerializationSchema(metaData, consumerConfig) - // null as we don't test open here... private val consumer = - TempProducerKafkaExceptionConsumer(metaData, serializationSchema, MockProducerCreator(mockProducer), null) + TempProducerKafkaExceptionConsumer( + metaData, + serializationSchema, + MockProducerCreator(mockProducer), + NoopKafkaErrorTopicInitializer + ) test("records event") { consumer.consume(exception) diff --git a/engine/lite/kafka/runtime/src/main/scala/pl/touk/nussknacker/engine/lite/kafka/KafkaSingleScenarioTaskRun.scala b/engine/lite/kafka/runtime/src/main/scala/pl/touk/nussknacker/engine/lite/kafka/KafkaSingleScenarioTaskRun.scala index a510f7ab5da..f2a15d4860f 100644 --- a/engine/lite/kafka/runtime/src/main/scala/pl/touk/nussknacker/engine/lite/kafka/KafkaSingleScenarioTaskRun.scala +++ b/engine/lite/kafka/runtime/src/main/scala/pl/touk/nussknacker/engine/lite/kafka/KafkaSingleScenarioTaskRun.scala @@ -17,7 +17,10 @@ import pl.touk.nussknacker.engine.api.process.TopicName import pl.touk.nussknacker.engine.api.runtimecontext.EngineRuntimeContext import pl.touk.nussknacker.engine.api.{MetaData, VariableConstants} import pl.touk.nussknacker.engine.kafka.KafkaUtils -import pl.touk.nussknacker.engine.kafka.exception.{KafkaErrorTopicInitializer, KafkaJsonExceptionSerializationSchema} +import pl.touk.nussknacker.engine.kafka.exception.{ + DefaultKafkaErrorTopicInitializer, + KafkaJsonExceptionSerializationSchema +} import pl.touk.nussknacker.engine.lite.ScenarioInterpreterFactory.ScenarioInterpreterWithLifecycle import pl.touk.nussknacker.engine.lite.api.commonTypes.{ErrorType, ResultType} import pl.touk.nussknacker.engine.lite.api.interpreterTypes @@ -72,7 +75,7 @@ class KafkaSingleScenarioTaskRun( def init(): Unit = { configSanityCheck() - new KafkaErrorTopicInitializer(engineConfig.kafka, engineConfig.exceptionHandlingConfig).init() + new DefaultKafkaErrorTopicInitializer(engineConfig.kafka, engineConfig.exceptionHandlingConfig).init() producer = prepareProducer consumer = prepareConsumer diff --git a/engine/lite/kafka/runtime/src/test/scala/pl/touk/nussknacker/engine/lite/kafka/KafkaErrorTopicInitializerTest.scala b/engine/lite/kafka/runtime/src/test/scala/pl/touk/nussknacker/engine/lite/kafka/KafkaErrorTopicInitializerTest.scala index 27c1a1e3e97..f613d63e014 100644 --- a/engine/lite/kafka/runtime/src/test/scala/pl/touk/nussknacker/engine/lite/kafka/KafkaErrorTopicInitializerTest.scala +++ b/engine/lite/kafka/runtime/src/test/scala/pl/touk/nussknacker/engine/lite/kafka/KafkaErrorTopicInitializerTest.scala @@ -7,7 +7,7 @@ import net.ceedubs.ficus.readers.EnumerationReader._ import org.scalatest.matchers.should.Matchers import org.scalatest.funsuite.AnyFunSuite import pl.touk.nussknacker.engine.kafka.KafkaSpec -import pl.touk.nussknacker.engine.kafka.exception.KafkaErrorTopicInitializer +import pl.touk.nussknacker.engine.kafka.exception.{DefaultKafkaErrorTopicInitializer, KafkaErrorTopicInitializer} import pl.touk.nussknacker.engine.lite.kafka.KafkaTransactionalScenarioInterpreter._ import pl.touk.nussknacker.test.PatientScalaFutures @@ -19,7 +19,7 @@ class KafkaErrorTopicInitializerTest extends AnyFunSuite with KafkaSpec with Mat val engineConfig = config .withValue("exceptionHandlingConfig.topic", fromAnyRef(topic)) .as[KafkaInterpreterConfig] - new KafkaErrorTopicInitializer(engineConfig.kafka, engineConfig.exceptionHandlingConfig) + new DefaultKafkaErrorTopicInitializer(engineConfig.kafka, engineConfig.exceptionHandlingConfig) } test("should create topic if not exists") { diff --git a/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaErrorTopicInitializer.scala b/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaErrorTopicInitializer.scala index 6223654ac81..56855514135 100644 --- a/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaErrorTopicInitializer.scala +++ b/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaErrorTopicInitializer.scala @@ -1,20 +1,28 @@ package pl.touk.nussknacker.engine.kafka.exception import com.typesafe.scalalogging.LazyLogging -import org.apache.kafka.clients.admin.{ListTopicsOptions, NewTopic} +import org.apache.kafka.clients.admin.{DescribeTopicsOptions, ListTopicsOptions, NewTopic} import pl.touk.nussknacker.engine.kafka.{KafkaConfig, KafkaUtils} -import java.util.Optional +import java.util.{Collections, Optional} import java.util.concurrent.TimeUnit import java.{lang, util} import scala.jdk.CollectionConverters._ import scala.util.control.NonFatal -class KafkaErrorTopicInitializer(kafkaConfig: KafkaConfig, val exceptionHandlerConfig: KafkaExceptionConsumerConfig) - extends LazyLogging { +trait KafkaErrorTopicInitializer { + def topicName: String + def init(): Unit +} + +class DefaultKafkaErrorTopicInitializer(kafkaConfig: KafkaConfig, exceptionHandlerConfig: KafkaExceptionConsumerConfig) + extends KafkaErrorTopicInitializer + with LazyLogging { private val timeoutSeconds = 5 + val topicName: String = exceptionHandlerConfig.topic + def init(): Unit = { val errorTopic = exceptionHandlerConfig.topic KafkaUtils.usingAdminClient(kafkaConfig) { admin => @@ -42,3 +50,10 @@ class KafkaErrorTopicInitializer(kafkaConfig: KafkaConfig, val exceptionHandlerC } } + +object NoopKafkaErrorTopicInitializer extends KafkaErrorTopicInitializer { + + override val topicName: String = "-" + + override def init(): Unit = {} +}