diff --git a/build.sbt b/build.sbt index e2b291643f2..4e353991f6c 100644 --- a/build.sbt +++ b/build.sbt @@ -892,7 +892,8 @@ lazy val kafkaUtils = (project in utils("kafka-utils")) name := "nussknacker-kafka-utils", libraryDependencies ++= { Seq( - "org.apache.kafka" % "kafka-clients" % kafkaV + "org.apache.kafka" % "kafka-clients" % kafkaV, + "org.scalatest" %% "scalatest" % scalaTestV % Test, ) } // Depends on componentsApi because of dependency to NuExceptionInfo and NonTransientException - diff --git a/docs/Changelog.md b/docs/Changelog.md index a2f4ff7c08c..7ca1455bed4 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -69,6 +69,7 @@ * [#6952](https://github.com/TouK/nussknacker/pull/6952) Improvement: TypeInformation support for scala.Option * [#6840](https://github.com/TouK/nussknacker/pull/6840) Introduce canCastTo, castTo and castToOrNull extension methods in SpeL. * [#6974](https://github.com/TouK/nussknacker/pull/6974) Add SpeL suggestions for cast methods parameter. +* [#6958](https://github.com/TouK/nussknacker/pull/6958) Add message size limit in the "Kafka" exceptionHandler * [#6988](https://github.com/TouK/nussknacker/pull/6988) Remove unused API classes: `MultiMap`, `TimestampedEvictableStateFunction` * [#7000](https://github.com/TouK/nussknacker/pull/7000) Show all possible options for dictionary editor on open. diff --git a/docs/MigrationGuide.md b/docs/MigrationGuide.md index 5c1153b26b0..5e53cd9cc94 100644 --- a/docs/MigrationGuide.md +++ b/docs/MigrationGuide.md @@ -33,6 +33,12 @@ To see the biggest differences please consult the [changelog](Changelog.md). * Migration API changes: * POST `/api/migrate` supports v2 request format (with `scenarioLabels` field) +### Configuration changes + +* [#6958](https://github.com/TouK/nussknacker/pull/6958) Added message size limit in the "Kafka" exceptionHandler: `maxMessageBytes`. + Its default value reflects Kafka's default size limit of 1 MB (`max.message.bytes`), you need to increase it if your + error topic allows for larger messages. Remember to add some margin for Kafka protocol overhead (100 bytes should be enough). + ### Other changes * [#6692](https://github.com/TouK/nussknacker/pull/6692) Kryo serializers for `UnmodifiableCollection`, `scala.Product` etc. diff --git a/docs/integration/KafkaIntegration.md b/docs/integration/KafkaIntegration.md index aba90eedd29..fd2c4edef06 100644 --- a/docs/integration/KafkaIntegration.md +++ b/docs/integration/KafkaIntegration.md @@ -227,15 +227,15 @@ Errors can be sent to specified Kafka topic in following json format (see below Following properties can be configured (please look at correct engine page : [Lite](../../configuration/model/Lite#exception-handling) or [Flink](../../configuration/model/Flink#configuring-exception-handling), to see where they should be set): -| Name | Default value | Description | -|------------------------|---------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| topic | - | Topic where errors will be sent. If the topic does not exist, it will be created (with default settings - for production deployments make sure the config is ok) during deploy. If (e.g. due to ACL settings) the topic cannot be created, the scenarios will fail, in that case, the topic has to be configured manually. | -| stackTraceLengthLimit | 50 | Limit of stacktrace length that will be sent (0 to omit stacktrace at all) | -| includeHost | true | Should name of host where error occurred (e.g. TaskManager in case of Flink) be included. Can be misleading if there are many network interfaces or hostname is improperly configured) | -| includeInputEvent | false | Should input event be serialized (can be large or contain sensitive data so use with care) | -| useSharedProducer | false | For better performance shared Kafka producer can be used (by default it's created and closed for each error), shared Producer is kind of experimental feature and should be used with care | -| additionalParams | {} | Map of fixed parameters that can be added to Kafka message | - +| Name | Default value | Description | +|-----------------------|---------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| topic | - | Topic where errors will be sent. If the topic does not exist, it will be created (with default settings - for production deployments make sure the config is ok) during deploy. If (e.g. due to ACL settings) the topic cannot be created, the scenarios will fail, in that case, the topic has to be configured manually. | +| stackTraceLengthLimit | 50 | Limit of stacktrace length that will be sent (0 to omit stacktrace at all) | +| maxMessageBytes | 1048476 | Limit Kafka message length by truncating variables from input event (enabled by `includeInputEvent`), defaults to Kafka's default `max.message.bytes` of 1 MB with 100 bytes of safety margin | +| includeHost | true | Should name of host where error occurred (e.g. TaskManager in case of Flink) be included. Can be misleading if there are many network interfaces or hostname is improperly configured) | +| includeInputEvent | false | Should input event be serialized (can be large or contain sensitive data so use with care) | +| useSharedProducer | false | For better performance shared Kafka producer can be used (by default it's created and closed for each error), shared Producer is kind of experimental feature and should be used with care | +| additionalParams | {} | Map of fixed parameters that can be added to Kafka message | ### Configuration for Flink engine diff --git a/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumer.scala b/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumer.scala index 3ecf09f105e..411f790e210 100644 --- a/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumer.scala +++ b/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumer.scala @@ -1,8 +1,11 @@ package pl.touk.nussknacker.engine.kafka.exception import com.typesafe.config.Config +import com.typesafe.scalalogging.LazyLogging import net.ceedubs.ficus.Ficus._ import net.ceedubs.ficus.readers.ArbitraryTypeReader._ +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.errors.RecordTooLargeException import pl.touk.nussknacker.engine.api.MetaData import pl.touk.nussknacker.engine.api.exception.{NonTransientException, NuExceptionInfo} import pl.touk.nussknacker.engine.api.runtimecontext.EngineRuntimeContext @@ -13,6 +16,8 @@ import pl.touk.nussknacker.engine.kafka.{DefaultProducerCreator, KafkaConfig, Ka import pl.touk.nussknacker.engine.util.SynchronousExecutionContextAndIORuntime import pl.touk.nussknacker.engine.util.config.ConfigEnrichments.RichConfig +import scala.concurrent.{ExecutionContext, Future} + class KafkaExceptionConsumerProvider extends FlinkEspExceptionConsumerProvider { override def create(metaData: MetaData, exceptionHandlerConfig: Config): FlinkEspExceptionConsumer = { @@ -20,11 +25,11 @@ class KafkaExceptionConsumerProvider extends FlinkEspExceptionConsumerProvider { val consumerConfig = exceptionHandlerConfig.rootAs[KafkaExceptionConsumerConfig] val producerCreator = kafkaProducerCreator(kafkaConfig) val serializationSchema = createSerializationSchema(metaData, consumerConfig) - val errorTopicInitializer = new KafkaErrorTopicInitializer(kafkaConfig, consumerConfig) + val errorTopicInitializer = new DefaultKafkaErrorTopicInitializer(kafkaConfig, consumerConfig) if (consumerConfig.useSharedProducer) { SharedProducerKafkaExceptionConsumer(metaData, serializationSchema, producerCreator, errorTopicInitializer) } else { - TempProducerKafkaExceptionConsumer(serializationSchema, producerCreator, errorTopicInitializer) + TempProducerKafkaExceptionConsumer(metaData, serializationSchema, producerCreator, errorTopicInitializer) } } @@ -42,42 +47,88 @@ class KafkaExceptionConsumerProvider extends FlinkEspExceptionConsumerProvider { } -case class TempProducerKafkaExceptionConsumer( - serializationSchema: KafkaSerializationSchema[NuExceptionInfo[NonTransientException]], - kafkaProducerCreator: KafkaProducerCreator.Binary, - kafkaErrorTopicInitializer: KafkaErrorTopicInitializer -) extends FlinkEspExceptionConsumer { +trait BaseKafkaExceptionConsumer extends FlinkEspExceptionConsumer with LazyLogging { + protected val serializationSchema: KafkaSerializationSchema[NuExceptionInfo[NonTransientException]] + protected val kafkaErrorTopicInitializer: KafkaErrorTopicInitializer + protected val metaData: MetaData + + private val topic: String = kafkaErrorTopicInitializer.topicName + + protected def sendKafkaMessage(record: ProducerRecord[Array[Byte], Array[Byte]]): Future[_] override def open(context: EngineRuntimeContext): Unit = { super.open(context) kafkaErrorTopicInitializer.init() } - override def consume(exceptionInfo: NuExceptionInfo[NonTransientException]): Unit = { - KafkaUtils.sendToKafkaWithTempProducer(serializationSchema.serialize(exceptionInfo, System.currentTimeMillis()))( - kafkaProducerCreator - ) + override final def consume(exceptionInfo: NuExceptionInfo[NonTransientException]): Unit = { + sendKafkaMessage(serializationSchema.serialize(exceptionInfo, System.currentTimeMillis())) + .recoverWith { case e: RecordTooLargeException => + // Kafka message size should be kept within acceptable size by serializer, but it may be incorrectly configured. + // We may also encounter this exception due to producer's 'buffer.memory' being set too low. + // + // Try to reduce Kafka message size in hope that it will fit configured limits. It's practically impossible + // to correctly detect and handle this limit preemptively, because: + // * Kafka limits are imposed on compressed message sizes (with headers and protocol overhead) + // * there are limits on multiple levels: producer, topic, broker (access requires additional permissions) + + val scenario = metaData.id + val node = exceptionInfo.nodeComponentInfo.map(_.nodeId).getOrElse("-") + val error = exceptionInfo.throwable.message + logger.warn( + s"Cannot write to $topic, retrying with stripped context (scenario: $scenario, node: $node, error: $error)." + + s"Verify your configuration of Kafka producer, error logging and errors topic and set correct limits. ${e.getMessage}" + ) + + val lightExceptionInfo = exceptionInfo.copy( + context = exceptionInfo.context.copy( + variables = Map( + "!warning" -> s"variables truncated, they didn't fit within max allowed size of a Kafka message: ${e.getMessage}", + ), + parentContext = None + ) + ) + + sendKafkaMessage(serializationSchema.serialize(lightExceptionInfo, System.currentTimeMillis())) + }(ExecutionContext.Implicits.global) + .recover { case e: Throwable => + val scenario = metaData.id + val node = exceptionInfo.nodeComponentInfo.map(_.nodeId).getOrElse("-") + val error = exceptionInfo.throwable.message + + logger.warn( + s"Failed to write to $topic (scenario: $scenario, node: $node, error: $error): ${e.getMessage}", + e + ) + }(ExecutionContext.Implicits.global) } } -case class SharedProducerKafkaExceptionConsumer( +case class TempProducerKafkaExceptionConsumer( metaData: MetaData, serializationSchema: KafkaSerializationSchema[NuExceptionInfo[NonTransientException]], kafkaProducerCreator: KafkaProducerCreator.Binary, kafkaErrorTopicInitializer: KafkaErrorTopicInitializer -) extends FlinkEspExceptionConsumer - with WithSharedKafkaProducer { +) extends BaseKafkaExceptionConsumer { - override def open(context: EngineRuntimeContext): Unit = { - super.open(context) - kafkaErrorTopicInitializer.init() + override protected def sendKafkaMessage(record: ProducerRecord[Array[Byte], Array[Byte]]): Future[_] = { + KafkaUtils + .sendToKafkaWithTempProducer(record)(kafkaProducerCreator) } - override def consume(exceptionInfo: NuExceptionInfo[NonTransientException]): Unit = { - sendToKafka(serializationSchema.serialize(exceptionInfo, System.currentTimeMillis()))( - SynchronousExecutionContextAndIORuntime.syncEc - ) +} + +case class SharedProducerKafkaExceptionConsumer( + metaData: MetaData, + serializationSchema: KafkaSerializationSchema[NuExceptionInfo[NonTransientException]], + kafkaProducerCreator: KafkaProducerCreator.Binary, + kafkaErrorTopicInitializer: KafkaErrorTopicInitializer +) extends BaseKafkaExceptionConsumer + with WithSharedKafkaProducer { + + override protected def sendKafkaMessage(record: ProducerRecord[Array[Byte], Array[Byte]]): Future[_] = { + sendToKafka(record)(SynchronousExecutionContextAndIORuntime.syncEc) } } diff --git a/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSerializationSpec.scala b/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSerializationSpec.scala index 9be8cdf11a6..7b5df7153e8 100644 --- a/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSerializationSpec.scala +++ b/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSerializationSpec.scala @@ -42,9 +42,13 @@ class KafkaExceptionConsumerSerializationSpec extends AnyFunSuite with Matchers private val serializationSchema = new KafkaJsonExceptionSerializationSchema(metaData, consumerConfig) - // null as we don't test open here... private val consumer = - TempProducerKafkaExceptionConsumer(serializationSchema, MockProducerCreator(mockProducer), null) + TempProducerKafkaExceptionConsumer( + metaData, + serializationSchema, + MockProducerCreator(mockProducer), + NoopKafkaErrorTopicInitializer + ) test("records event") { consumer.consume(exception) diff --git a/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSpec.scala b/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSpec.scala index a181c837473..7ebaa242c62 100644 --- a/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSpec.scala +++ b/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSpec.scala @@ -1,13 +1,14 @@ package pl.touk.nussknacker.engine.kafka.exception import com.typesafe.config.ConfigValueFactory.fromAnyRef +import org.scalatest.{EitherValues, OptionValues} import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers -import pl.touk.nussknacker.engine.ModelData import pl.touk.nussknacker.engine.api.component.ComponentDefinition import pl.touk.nussknacker.engine.api.process.SinkFactory import pl.touk.nussknacker.engine.build.ScenarioBuilder import pl.touk.nussknacker.engine.flink.test.FlinkSpec +import pl.touk.nussknacker.engine.graph.expression.Expression import pl.touk.nussknacker.engine.kafka.KafkaSpec import pl.touk.nussknacker.engine.process.helpers.SampleNodes import pl.touk.nussknacker.engine.process.helpers.SampleNodes.SimpleRecord @@ -17,22 +18,56 @@ import pl.touk.nussknacker.engine.testing.LocalModelData import java.util.Date -class KafkaExceptionConsumerSpec extends AnyFunSuite with FlinkSpec with KafkaSpec with Matchers { +class KafkaExceptionConsumerSpec + extends AnyFunSuite + with OptionValues + with FlinkSpec + with KafkaSpec + with Matchers + with EitherValues { import pl.touk.nussknacker.engine.kafka.KafkaTestUtils.richConsumer - private val topicName = "testingErrors" - protected var modelData: ModelData = _ + test("should record errors on topic") { + val message = runTest(s"testProcess-shortString", stringVariable = "'short string'".spel) + + val inputEvent = extractInputEventMap(message) + (inputEvent - "input") shouldBe Map( + "string" -> "short string" + ) + } + + test("should record errors on topic - strips context from too large error input") { + // long string variable: 8^7 = 2097152 = 2 MB + val message = runTest( + "testProcess-longString", + stringVariable = ("'xxxxxxxx'" + ".replaceAll('x', 'xxxxxxxx')".repeat(6)).spel, + maxMessageBytes = 5242880 + ) + + val inputEvent = extractInputEventMap(message) + inputEvent.keySet shouldBe Set("!warning") + inputEvent("!warning") should startWith( + "variables truncated, they didn't fit within max allowed size of a Kafka message:" + ) + } + + private def runTest( + scenarioName: String, + stringVariable: Expression, + maxMessageBytes: Int = 1048576 + ): KafkaExceptionInfo = { + val topicName = s"$scenarioName.errors" - protected override def beforeAll(): Unit = { - super.beforeAll() val configWithExceptionHandler = config .withValue("exceptionHandler.type", fromAnyRef("Kafka")) .withValue("exceptionHandler.topic", fromAnyRef(topicName)) + .withValue("exceptionHandler.maxMessageBytes", fromAnyRef(maxMessageBytes)) + .withValue("exceptionHandler.includeInputEvent", fromAnyRef(true)) .withValue("exceptionHandler.additionalParams.configurableKey", fromAnyRef("sampleValue")) .withValue("exceptionHandler.kafka", config.getConfig("kafka").root()) - modelData = LocalModelData( + val modelData = LocalModelData( configWithExceptionHandler, List( ComponentDefinition( @@ -42,28 +77,35 @@ class KafkaExceptionConsumerSpec extends AnyFunSuite with FlinkSpec with KafkaSp ComponentDefinition("sink", SinkFactory.noParam(SampleNodes.MonitorEmptySink)) ) ) - } - test("should record errors on topic") { val process = ScenarioBuilder - .streaming("testProcess") + .streaming(scenarioName) .source("source", "source") + .buildSimpleVariable("string", "string", stringVariable) .filter("shouldFail", "1/{0, 1}[0] != 10".spel) .emptySink("end", "sink") val env = flinkMiniCluster.createExecutionEnvironment() UnitTestsFlinkRunner.registerInEnvironmentWithModel(env, modelData)(process) - env.withJobRunning(process.name.value) { + val message = env.withJobRunning(process.name.value) { val consumed = kafkaClient.createConsumer().consumeWithJson[KafkaExceptionInfo](topicName).take(1).head - consumed.key() shouldBe "testProcess-shouldFail" - consumed.message().nodeId shouldBe Some("shouldFail") - consumed.message().processName.value shouldBe "testProcess" - consumed.message().message shouldBe Some("Expression [1/{0, 1}[0] != 10] evaluation failed, message: / by zero") - consumed.message().exceptionInput shouldBe Some("1/{0, 1}[0] != 10") - consumed.message().additionalData shouldBe Map("configurableKey" -> "sampleValue") + consumed.key() shouldBe s"$scenarioName-shouldFail" + + consumed.message() } + message.processName.value shouldBe scenarioName + message.nodeId shouldBe Some("shouldFail") + message.message shouldBe Some("Expression [1/{0, 1}[0] != 10] evaluation failed, message: / by zero") + message.exceptionInput shouldBe Some("1/{0, 1}[0] != 10") + message.stackTrace.value should include("evaluation failed, message:") + message.additionalData shouldBe Map("configurableKey" -> "sampleValue") + + message } + def extractInputEventMap(message: KafkaExceptionInfo): Map[String, String] = + message.inputEvent.value.as[Map[String, String]].value + } diff --git a/engine/lite/kafka/runtime/src/main/scala/pl/touk/nussknacker/engine/lite/kafka/KafkaSingleScenarioTaskRun.scala b/engine/lite/kafka/runtime/src/main/scala/pl/touk/nussknacker/engine/lite/kafka/KafkaSingleScenarioTaskRun.scala index a510f7ab5da..f2a15d4860f 100644 --- a/engine/lite/kafka/runtime/src/main/scala/pl/touk/nussknacker/engine/lite/kafka/KafkaSingleScenarioTaskRun.scala +++ b/engine/lite/kafka/runtime/src/main/scala/pl/touk/nussknacker/engine/lite/kafka/KafkaSingleScenarioTaskRun.scala @@ -17,7 +17,10 @@ import pl.touk.nussknacker.engine.api.process.TopicName import pl.touk.nussknacker.engine.api.runtimecontext.EngineRuntimeContext import pl.touk.nussknacker.engine.api.{MetaData, VariableConstants} import pl.touk.nussknacker.engine.kafka.KafkaUtils -import pl.touk.nussknacker.engine.kafka.exception.{KafkaErrorTopicInitializer, KafkaJsonExceptionSerializationSchema} +import pl.touk.nussknacker.engine.kafka.exception.{ + DefaultKafkaErrorTopicInitializer, + KafkaJsonExceptionSerializationSchema +} import pl.touk.nussknacker.engine.lite.ScenarioInterpreterFactory.ScenarioInterpreterWithLifecycle import pl.touk.nussknacker.engine.lite.api.commonTypes.{ErrorType, ResultType} import pl.touk.nussknacker.engine.lite.api.interpreterTypes @@ -72,7 +75,7 @@ class KafkaSingleScenarioTaskRun( def init(): Unit = { configSanityCheck() - new KafkaErrorTopicInitializer(engineConfig.kafka, engineConfig.exceptionHandlingConfig).init() + new DefaultKafkaErrorTopicInitializer(engineConfig.kafka, engineConfig.exceptionHandlingConfig).init() producer = prepareProducer consumer = prepareConsumer diff --git a/engine/lite/kafka/runtime/src/test/scala/pl/touk/nussknacker/engine/lite/kafka/KafkaErrorTopicInitializerTest.scala b/engine/lite/kafka/runtime/src/test/scala/pl/touk/nussknacker/engine/lite/kafka/KafkaErrorTopicInitializerTest.scala index 27c1a1e3e97..f613d63e014 100644 --- a/engine/lite/kafka/runtime/src/test/scala/pl/touk/nussknacker/engine/lite/kafka/KafkaErrorTopicInitializerTest.scala +++ b/engine/lite/kafka/runtime/src/test/scala/pl/touk/nussknacker/engine/lite/kafka/KafkaErrorTopicInitializerTest.scala @@ -7,7 +7,7 @@ import net.ceedubs.ficus.readers.EnumerationReader._ import org.scalatest.matchers.should.Matchers import org.scalatest.funsuite.AnyFunSuite import pl.touk.nussknacker.engine.kafka.KafkaSpec -import pl.touk.nussknacker.engine.kafka.exception.KafkaErrorTopicInitializer +import pl.touk.nussknacker.engine.kafka.exception.{DefaultKafkaErrorTopicInitializer, KafkaErrorTopicInitializer} import pl.touk.nussknacker.engine.lite.kafka.KafkaTransactionalScenarioInterpreter._ import pl.touk.nussknacker.test.PatientScalaFutures @@ -19,7 +19,7 @@ class KafkaErrorTopicInitializerTest extends AnyFunSuite with KafkaSpec with Mat val engineConfig = config .withValue("exceptionHandlingConfig.topic", fromAnyRef(topic)) .as[KafkaInterpreterConfig] - new KafkaErrorTopicInitializer(engineConfig.kafka, engineConfig.exceptionHandlingConfig) + new DefaultKafkaErrorTopicInitializer(engineConfig.kafka, engineConfig.exceptionHandlingConfig) } test("should create topic if not exists") { diff --git a/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaErrorTopicInitializer.scala b/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaErrorTopicInitializer.scala index 154dc492105..fefd33fd32b 100644 --- a/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaErrorTopicInitializer.scala +++ b/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaErrorTopicInitializer.scala @@ -1,39 +1,46 @@ package pl.touk.nussknacker.engine.kafka.exception import com.typesafe.scalalogging.LazyLogging -import org.apache.kafka.clients.admin.{ListTopicsOptions, NewTopic} +import org.apache.kafka.clients.admin.{DescribeTopicsOptions, ListTopicsOptions, NewTopic} import pl.touk.nussknacker.engine.kafka.{KafkaConfig, KafkaUtils} -import java.util.Optional +import java.util.{Collections, Optional} import java.util.concurrent.TimeUnit import java.{lang, util} import scala.jdk.CollectionConverters._ import scala.util.control.NonFatal -class KafkaErrorTopicInitializer(kafkaConfig: KafkaConfig, exceptionHandlerConfig: KafkaExceptionConsumerConfig) - extends LazyLogging { +trait KafkaErrorTopicInitializer { + def topicName: String + def init(): Unit +} + +class DefaultKafkaErrorTopicInitializer(kafkaConfig: KafkaConfig, exceptionHandlerConfig: KafkaExceptionConsumerConfig) + extends KafkaErrorTopicInitializer + with LazyLogging { private val timeoutSeconds = 5 + val topicName: String = exceptionHandlerConfig.topic + def init(): Unit = { - val errorTopic = exceptionHandlerConfig.topic KafkaUtils.usingAdminClient(kafkaConfig) { admin => val topicNames = admin .listTopics(new ListTopicsOptions().timeoutMs(timeoutSeconds * 1000)) .names() .get(timeoutSeconds, TimeUnit.SECONDS) - val topicExists = topicNames.asScala.contains(errorTopic) + val topicExists = topicNames.asScala.contains(topicName) if (topicExists) { - logger.debug("Topic exists, skipping") + logger.debug(s"Topic $topicName already exists, skipping") } else { - logger.info(s"Creating error topic: $errorTopic with default configs, please check if the values are correct") - val errorTopicConfig = new NewTopic(errorTopic, Optional.empty[Integer](), Optional.empty[lang.Short]()) + logger.info(s"Creating error topic: $topicName with default configs, please check if the values are correct") + val errorTopicConfig = new NewTopic(topicName, Optional.empty[Integer](), Optional.empty[lang.Short]()) try { admin.createTopics(util.Arrays.asList(errorTopicConfig)).all().get(timeoutSeconds, TimeUnit.SECONDS) } catch { case NonFatal(e) => throw new IllegalStateException( - s"Failed to create $errorTopic (${e.getMessage}), cannot run scenario properly", + s"Failed to create $topicName (${e.getMessage}), cannot run scenario properly", e ) } @@ -42,3 +49,10 @@ class KafkaErrorTopicInitializer(kafkaConfig: KafkaConfig, exceptionHandlerConfi } } + +object NoopKafkaErrorTopicInitializer extends KafkaErrorTopicInitializer { + + override val topicName: String = "-" + + override def init(): Unit = {} +} diff --git a/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerConfig.scala b/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerConfig.scala index f0d91e8a05f..ad29ca11ec4 100644 --- a/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerConfig.scala +++ b/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerConfig.scala @@ -4,6 +4,8 @@ case class KafkaExceptionConsumerConfig( topic: String, // quite large to be able to show nested exception stackTraceLengthLimit: Int = 50, + // 1 MB (default max.message.bytes) - 100 bytes of margin for protocol overhead + maxMessageBytes: Int = 1048476, includeHost: Boolean = true, includeInputEvent: Boolean = false, // by default we use temp producer, as it's more robust diff --git a/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchema.scala b/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchema.scala index ff04709d36b..59171725b35 100644 --- a/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchema.scala +++ b/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchema.scala @@ -1,10 +1,12 @@ package pl.touk.nussknacker.engine.kafka.exception -import io.circe.Json +import io.circe.{Json, JsonObject} import io.circe.generic.JsonCodec import io.circe.syntax.EncoderOps import org.apache.kafka.clients.producer.ProducerRecord -import pl.touk.nussknacker.engine.api.{Context, MetaData} +import org.apache.kafka.common.record.DefaultRecordBatch +import org.apache.kafka.common.utils.Utils +import pl.touk.nussknacker.engine.api.MetaData import pl.touk.nussknacker.engine.api.exception.{NonTransientException, NuExceptionInfo} import pl.touk.nussknacker.engine.api.process.ProcessName import pl.touk.nussknacker.engine.kafka.serialization.KafkaSerializationSchema @@ -14,11 +16,133 @@ import java.io.{PrintWriter, StringWriter} import java.lang import java.net.InetAddress import java.nio.charset.StandardCharsets +import scala.annotation.tailrec import scala.io.Source +object KafkaJsonExceptionSerializationSchema { + private[exception] val recordOverhead = DefaultRecordBatch.RECORD_BATCH_OVERHEAD + private val indentLength = 2 + private val warningKey = "!warning" + + private def serializeValueWithSizeLimit(value: KafkaExceptionInfo, maxValueBytes: Int): Array[Byte] = { + val valueJson = value.asJson + val serializedValue = valueJson.spaces2.getBytes(StandardCharsets.UTF_8) + if (value.inputEvent.isDefined && serializedValue.length > maxValueBytes) { + valueJson.hcursor + .downField("inputEvent") + .withFocus { inputEventJson => + inputEventJson.asObject + .map { inputEventJsonObject => + val indentLevel = 1 + removeVariablesOverSizeLimit( + inputEventJsonObject, + indentLevel, + serializedValue.length, + maxValueBytes + ).toJson + } + .getOrElse(inputEventJson) + } + .top + .getOrElse(valueJson) + .spaces2 + .getBytes(StandardCharsets.UTF_8) + } else { + serializedValue + } + } + + // noinspection ScalaWeakerAccess + def removeVariablesOverSizeLimit( + containerObject: JsonObject, + containerIndentLevel: Int, + valueBytes: Int, + maxValueBytes: Int + ): JsonObject = { + if (valueBytes <= maxValueBytes) { + return containerObject + } + + // use a placeholder of the same length as the number of digits in valueBytes, + // because the final value will never be larger than that + val removedBytesPlaceholder = "$" * valueBytes.toString.length + // text below will increase JSON size, but we are working under the assumption that its size is much smaller + // than the size of inputEvent keys that will get removed + val messageTemplate = + s"variables truncated, original object had $valueBytes bytes, which was more then the max allowed length of $maxValueBytes bytes. " + + s"Removed variables ($removedBytesPlaceholder bytes)" + + val indentBytes = indentLength * (containerIndentLevel + 1) + // |{ + // | "inputEvent" : { + // | "!warning": ... + // (note: line below uses '' as quotes because Scala 2.12 can't handle escaped "") + val warningBytes = indentBytes + Utils.utf8Length(s"'$warningKey' : ${messageTemplate.asJson.spaces2},\n") + val bytesToCut = valueBytes + warningBytes - maxValueBytes + + val variablesWithLength = countVariableLengths(containerObject, indentBytes) + val (variablesToRemove, removedBytes) = calculateKeysToRemove(variablesWithLength, bytesToCut) + + if (removedBytes <= warningBytes) { + // this may happen only when doing tests with really low size limits + return containerObject + } + + val finalMessage = messageTemplate.replace(removedBytesPlaceholder, removedBytes.toString) + + variablesToRemove.toSeq.sorted.mkString(": ", ", ", "") + + containerObject + .filterKeys(key => !variablesToRemove.contains(key)) + .add(warningKey, finalMessage.asJson) + } + + private[exception] def countVariableLengths(json: JsonObject, indentBytes: Int): List[(String, Int)] = { + // Each removed key will be moved to a message that enumerates all removed keys, e.g. when the variable + // 'my_variable_name' is removed from: + // | "inputEvent" : { + // | "my_variable_name" : "value", + // | } + // it be transformed into a partial string: `, my_variable_name` (2-byte separator + variable name). + // Because of that the length of variable name and the last two characters (`,\n`) aren't added to counted size, + // as the space they take up will get reused. + // + // Assumption that all values are terminated by `,\n` may make us overestimate a single variable by one byte, + // but it's safer to remove more and still fit within given size limit. + val jsonKeyOverhead = indentBytes + 2 /* key quotes */ + 3 /* ` : ` */ + + json.toList + .map { case (key, value) => + val serializedVariable = value.spaces2 + val variableBytes = Utils.utf8Length(serializedVariable) + (indentBytes * serializedVariable.count(_ == '\n')) + key -> (jsonKeyOverhead + variableBytes) + } + } + + private def calculateKeysToRemove(keysWithLength: List[(String, Int)], bytesToRemove: Int): (Set[String], Int) = { + @tailrec + def collectKeysToRemove( + allKeys: List[(String, Int)], + keysToRemove: List[String], + bytesSoFar: Int + ): (Set[String], Int) = { + allKeys match { + case (key, bytes) :: tail if bytesSoFar < bytesToRemove => + collectKeysToRemove(tail, key :: keysToRemove, bytes + bytesSoFar) + case _ => + (keysToRemove.toSet, bytesSoFar) + } + } + + collectKeysToRemove(keysWithLength.sortBy { case (_, length) => length }.reverse, List.empty, 0) + } + +} + class KafkaJsonExceptionSerializationSchema(metaData: MetaData, consumerConfig: KafkaExceptionConsumerConfig) extends KafkaSerializationSchema[NuExceptionInfo[NonTransientException]] { + import KafkaJsonExceptionSerializationSchema._ + override def serialize( exceptionInfo: NuExceptionInfo[NonTransientException], timestamp: lang.Long @@ -26,9 +150,11 @@ class KafkaJsonExceptionSerializationSchema(metaData: MetaData, consumerConfig: val key = s"${metaData.name}-${exceptionInfo.nodeComponentInfo.map(_.nodeId).getOrElse("")}" .getBytes(StandardCharsets.UTF_8) + val value = KafkaExceptionInfo(metaData, exceptionInfo, consumerConfig) - val serializedValue = value.asJson.spaces2.getBytes(StandardCharsets.UTF_8) - new ProducerRecord(consumerConfig.topic, key, serializedValue) + val maxValueBytes = consumerConfig.maxMessageBytes - key.length - recordOverhead + val serializedValue = serializeValueWithSizeLimit(value, maxValueBytes) + new ProducerRecord(consumerConfig.topic, null, timestamp, key, serializedValue) } } diff --git a/utils/kafka-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchemaSpec.scala b/utils/kafka-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchemaSpec.scala new file mode 100644 index 00000000000..5e9b932e71b --- /dev/null +++ b/utils/kafka-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchemaSpec.scala @@ -0,0 +1,109 @@ +package pl.touk.nussknacker.engine.kafka.exception + +import io.circe.{Decoder, JsonObject, KeyDecoder} +import io.circe.parser.decode +import io.circe.syntax.EncoderOps +import org.scalatest.EitherValues +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers +import pl.touk.nussknacker.engine.api.exception.{NonTransientException, NuExceptionInfo} +import pl.touk.nussknacker.engine.api.{Context, MetaData, StreamMetaData} + +import java.nio.charset.StandardCharsets +import java.time.Instant + +class KafkaJsonExceptionSerializationSchemaSpec extends AnyFunSuite with Matchers with EitherValues { + private val timestamp = Instant.now() + private val exception = NonTransientException("input", "message", timestamp) + private implicit val mapDecoder: Decoder[Map[String, String]] = + Decoder.decodeMap(KeyDecoder.decodeKeyString, Decoder.decodeString) + + test("variable sizes are calculated correctly") { + def jsonBytes(variables: Map[String, Any]): Int = { + createKafkaExceptionInfo(variables).asJson.spaces2.length + } + + def serializedSize(variables: Map[String, Any], variableName: String): Int = { + jsonBytes(variables) - jsonBytes(variables - variableName) + } + + val variables = Map( + "str" -> "12345", + "int" -> 12345, + "number" -> 123.45, + "array" -> Seq(1, 2), + "object" -> Map("a" -> 1, "b" -> 2), + "complexObject" -> Map("a" -> 1, "b" -> Seq(1, 2, "c"), "c" -> Map.empty, "d" -> Map("e" -> 1)) + ) + val expectedSizes = variables.map { case (k, _) => + // remove bytes taken up by serialized key name + k -> (serializedSize(variables, k) - 2 - k.length) + } + + val inputEvent = createKafkaExceptionInfo(variables).asJson.hcursor.downField("inputEvent").as[JsonObject].value + KafkaJsonExceptionSerializationSchema + .countVariableLengths(inputEvent, indentBytes = 4) + .toMap shouldBe expectedSizes + } + + test("strips longest inputEvent entries") { + val baseContext = Map( + "longVariable" -> "", + "shortVariable1" -> "111", + "shortVariable2" -> "222222", + "shortVariable3" -> "shortVariableValue" + ) + val largeContext = baseContext ++ Map("longVariable" -> "1".repeat(400)) + + val (baseRecord, key) = getSerializedJson(baseContext) + decode[KafkaExceptionInfo](baseRecord).value.inputEvent.get shouldBe baseContext.asJson + + // limit that will remove longVariable, we need to account for the added !warning key + val maxRecord1Length = baseRecord.length + 350 + val (record1, _) = getSerializedJson(largeContext, maxMessageBytes = maxRecord1Length) + val inputEvent1 = decode[KafkaExceptionInfo](record1).value.inputEvent.get.as[Map[String, String]].value + + (inputEvent1.keySet - "!warning") shouldBe (largeContext.keySet - "longVariable") + inputEvent1("!warning") should startWith("variables truncated") + inputEvent1("!warning") should endWith(": longVariable") + record1.length should be <= maxRecord1Length + + // add padding to size limit, variable removal rounds some calculations + val maxRecord2Length = record1.length + key.length + KafkaJsonExceptionSerializationSchema.recordOverhead + 1 + val (record2, _) = getSerializedJson(largeContext, maxMessageBytes = maxRecord2Length) + val inputEvent2 = decode[KafkaExceptionInfo](record2).value.inputEvent.get.as[Map[String, String]].value + + inputEvent2.keySet shouldBe inputEvent1.keySet + record2.length should equal(record1.length) + } + + private def getSerializedJson( + contextVariables: Map[String, Any], + maxMessageBytes: Int = 1048576, + ): (String, String) = { + val serializer = serializerWithMaxMessageLength(maxMessageBytes) + val record = serializer.serialize(createData(contextVariables), timestamp.toEpochMilli) + + (new String(record.value(), StandardCharsets.UTF_8), new String(record.key(), StandardCharsets.UTF_8)) + } + + private def serializerWithMaxMessageLength(maxMessageLength: Int): KafkaJsonExceptionSerializationSchema = { + val config = KafkaExceptionConsumerConfig( + topic = "topic", + maxMessageBytes = maxMessageLength, + includeInputEvent = true + ) + new KafkaJsonExceptionSerializationSchema(MetaData("test", StreamMetaData()), config) + } + + private def createData(contextVariables: Map[String, Any]): NuExceptionInfo[NonTransientException] = { + NuExceptionInfo(None, exception, Context("contextId", contextVariables)) + } + + private def createKafkaExceptionInfo(variables: Map[String, Any]) = KafkaExceptionInfo( + MetaData("test", StreamMetaData()), + createData(variables), + KafkaExceptionConsumerConfig(topic = "topic", includeInputEvent = true) + ) + +}