Skip to content

Commit

Permalink
remove null
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrp committed Sep 27, 2024
1 parent a7dfedd commit f3adb3e
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class KafkaExceptionConsumerProvider extends FlinkEspExceptionConsumerProvider {
val consumerConfig = exceptionHandlerConfig.rootAs[KafkaExceptionConsumerConfig]
val producerCreator = kafkaProducerCreator(kafkaConfig)
val serializationSchema = createSerializationSchema(metaData, consumerConfig)
val errorTopicInitializer = new KafkaErrorTopicInitializer(kafkaConfig, consumerConfig)
val errorTopicInitializer = new DefaultKafkaErrorTopicInitializer(kafkaConfig, consumerConfig)
if (consumerConfig.useSharedProducer) {
SharedProducerKafkaExceptionConsumer(metaData, serializationSchema, producerCreator, errorTopicInitializer)
} else {
Expand All @@ -52,9 +52,7 @@ trait BaseKafkaExceptionConsumer extends FlinkEspExceptionConsumer with LazyLogg
protected val kafkaErrorTopicInitializer: KafkaErrorTopicInitializer
protected val metaData: MetaData

// can be null in tests
private val topic: String =
Option(kafkaErrorTopicInitializer).map(_.exceptionHandlerConfig.topic).getOrElse("-")
private val topic: String = kafkaErrorTopicInitializer.topicName

protected def sendKafkaMessage(record: ProducerRecord[Array[Byte], Array[Byte]]): Future[_]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,13 @@ class KafkaExceptionConsumerSerializationSpec extends AnyFunSuite with Matchers

private val serializationSchema = new KafkaJsonExceptionSerializationSchema(metaData, consumerConfig)

// null as we don't test open here...
private val consumer =
TempProducerKafkaExceptionConsumer(metaData, serializationSchema, MockProducerCreator(mockProducer), null)
TempProducerKafkaExceptionConsumer(
metaData,
serializationSchema,
MockProducerCreator(mockProducer),
NoopKafkaErrorTopicInitializer
)

test("records event") {
consumer.consume(exception)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ import pl.touk.nussknacker.engine.api.process.TopicName
import pl.touk.nussknacker.engine.api.runtimecontext.EngineRuntimeContext
import pl.touk.nussknacker.engine.api.{MetaData, VariableConstants}
import pl.touk.nussknacker.engine.kafka.KafkaUtils
import pl.touk.nussknacker.engine.kafka.exception.{KafkaErrorTopicInitializer, KafkaJsonExceptionSerializationSchema}
import pl.touk.nussknacker.engine.kafka.exception.{
DefaultKafkaErrorTopicInitializer,
KafkaJsonExceptionSerializationSchema
}
import pl.touk.nussknacker.engine.lite.ScenarioInterpreterFactory.ScenarioInterpreterWithLifecycle
import pl.touk.nussknacker.engine.lite.api.commonTypes.{ErrorType, ResultType}
import pl.touk.nussknacker.engine.lite.api.interpreterTypes
Expand Down Expand Up @@ -72,7 +75,7 @@ class KafkaSingleScenarioTaskRun(

def init(): Unit = {
configSanityCheck()
new KafkaErrorTopicInitializer(engineConfig.kafka, engineConfig.exceptionHandlingConfig).init()
new DefaultKafkaErrorTopicInitializer(engineConfig.kafka, engineConfig.exceptionHandlingConfig).init()

producer = prepareProducer
consumer = prepareConsumer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import net.ceedubs.ficus.readers.EnumerationReader._
import org.scalatest.matchers.should.Matchers
import org.scalatest.funsuite.AnyFunSuite
import pl.touk.nussknacker.engine.kafka.KafkaSpec
import pl.touk.nussknacker.engine.kafka.exception.KafkaErrorTopicInitializer
import pl.touk.nussknacker.engine.kafka.exception.{DefaultKafkaErrorTopicInitializer, KafkaErrorTopicInitializer}
import pl.touk.nussknacker.engine.lite.kafka.KafkaTransactionalScenarioInterpreter._
import pl.touk.nussknacker.test.PatientScalaFutures

Expand All @@ -19,7 +19,7 @@ class KafkaErrorTopicInitializerTest extends AnyFunSuite with KafkaSpec with Mat
val engineConfig = config
.withValue("exceptionHandlingConfig.topic", fromAnyRef(topic))
.as[KafkaInterpreterConfig]
new KafkaErrorTopicInitializer(engineConfig.kafka, engineConfig.exceptionHandlingConfig)
new DefaultKafkaErrorTopicInitializer(engineConfig.kafka, engineConfig.exceptionHandlingConfig)
}

test("should create topic if not exists") {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,28 @@
package pl.touk.nussknacker.engine.kafka.exception

import com.typesafe.scalalogging.LazyLogging
import org.apache.kafka.clients.admin.{ListTopicsOptions, NewTopic}
import org.apache.kafka.clients.admin.{DescribeTopicsOptions, ListTopicsOptions, NewTopic}
import pl.touk.nussknacker.engine.kafka.{KafkaConfig, KafkaUtils}

import java.util.Optional
import java.util.{Collections, Optional}
import java.util.concurrent.TimeUnit
import java.{lang, util}
import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal

class KafkaErrorTopicInitializer(kafkaConfig: KafkaConfig, val exceptionHandlerConfig: KafkaExceptionConsumerConfig)
extends LazyLogging {
trait KafkaErrorTopicInitializer {
def topicName: String
def init(): Unit
}

class DefaultKafkaErrorTopicInitializer(kafkaConfig: KafkaConfig, exceptionHandlerConfig: KafkaExceptionConsumerConfig)
extends KafkaErrorTopicInitializer
with LazyLogging {

private val timeoutSeconds = 5

val topicName: String = exceptionHandlerConfig.topic

def init(): Unit = {
val errorTopic = exceptionHandlerConfig.topic
KafkaUtils.usingAdminClient(kafkaConfig) { admin =>
Expand Down Expand Up @@ -42,3 +50,10 @@ class KafkaErrorTopicInitializer(kafkaConfig: KafkaConfig, val exceptionHandlerC
}

}

object NoopKafkaErrorTopicInitializer extends KafkaErrorTopicInitializer {

override val topicName: String = "-"

override def init(): Unit = {}
}

0 comments on commit f3adb3e

Please sign in to comment.