From 72a7b1ac30cf621d8f1f1381035597629639fe96 Mon Sep 17 00:00:00 2001 From: Piotr Przybylski Date: Thu, 26 Sep 2024 23:03:56 +0200 Subject: [PATCH] Kafka exceptionHandler: retry when message is too large, log when writing to Kafka fails --- docs/Changelog.md | 2 + .../exception/KafkaExceptionConsumer.scala | 81 ++++++++++++++----- ...kaExceptionConsumerSerializationSpec.scala | 2 +- .../KafkaExceptionConsumerSpec.scala | 57 +++++++++---- .../KafkaErrorTopicInitializer.scala | 2 +- ...afkaJsonExceptionSerializationSchema.scala | 2 +- 6 files changed, 107 insertions(+), 39 deletions(-) diff --git a/docs/Changelog.md b/docs/Changelog.md index c09c4e9ee67..9eb31ecb5a7 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -62,6 +62,8 @@ * Scenario Activity API implementation * [#6925](https://github.com/TouK/nussknacker/pull/6925) Fix situation when preset labels were presented as `null` when node didn't pass the validation. * [#6935](https://github.com/TouK/nussknacker/pull/6935) Spel: Scenario labels added to meta variable - `#meta.scenarioLabels` +* [#6958](https://github.com/TouK/nussknacker/pull/6958) Try to handle large contexts in the "Kafka" exceptionHandler, + log a warning when writing to Kafka fails ## 1.17 diff --git a/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumer.scala b/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumer.scala index 3ecf09f105e..ec1d080e769 100644 --- a/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumer.scala +++ b/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumer.scala @@ -1,8 +1,11 @@ package pl.touk.nussknacker.engine.kafka.exception import com.typesafe.config.Config +import com.typesafe.scalalogging.LazyLogging import net.ceedubs.ficus.Ficus._ import net.ceedubs.ficus.readers.ArbitraryTypeReader._ +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.errors.RecordTooLargeException import pl.touk.nussknacker.engine.api.MetaData import pl.touk.nussknacker.engine.api.exception.{NonTransientException, NuExceptionInfo} import pl.touk.nussknacker.engine.api.runtimecontext.EngineRuntimeContext @@ -13,6 +16,8 @@ import pl.touk.nussknacker.engine.kafka.{DefaultProducerCreator, KafkaConfig, Ka import pl.touk.nussknacker.engine.util.SynchronousExecutionContextAndIORuntime import pl.touk.nussknacker.engine.util.config.ConfigEnrichments.RichConfig +import scala.concurrent.{ExecutionContext, Future} + class KafkaExceptionConsumerProvider extends FlinkEspExceptionConsumerProvider { override def create(metaData: MetaData, exceptionHandlerConfig: Config): FlinkEspExceptionConsumer = { @@ -24,7 +29,7 @@ class KafkaExceptionConsumerProvider extends FlinkEspExceptionConsumerProvider { if (consumerConfig.useSharedProducer) { SharedProducerKafkaExceptionConsumer(metaData, serializationSchema, producerCreator, errorTopicInitializer) } else { - TempProducerKafkaExceptionConsumer(serializationSchema, producerCreator, errorTopicInitializer) + TempProducerKafkaExceptionConsumer(metaData, serializationSchema, producerCreator, errorTopicInitializer) } } @@ -42,21 +47,62 @@ class KafkaExceptionConsumerProvider extends FlinkEspExceptionConsumerProvider { } -case class TempProducerKafkaExceptionConsumer( - serializationSchema: KafkaSerializationSchema[NuExceptionInfo[NonTransientException]], - kafkaProducerCreator: KafkaProducerCreator.Binary, - kafkaErrorTopicInitializer: KafkaErrorTopicInitializer -) extends FlinkEspExceptionConsumer { +trait BaseKafkaExceptionConsumer extends FlinkEspExceptionConsumer with LazyLogging { + protected val topic: String + protected val serializationSchema: KafkaSerializationSchema[NuExceptionInfo[NonTransientException]] + protected val kafkaErrorTopicInitializer: KafkaErrorTopicInitializer + protected val metaData: MetaData + + protected def sendKafkaMessage(record: ProducerRecord[Array[Byte], Array[Byte]]): Future[Unit] override def open(context: EngineRuntimeContext): Unit = { super.open(context) kafkaErrorTopicInitializer.init() } - override def consume(exceptionInfo: NuExceptionInfo[NonTransientException]): Unit = { - KafkaUtils.sendToKafkaWithTempProducer(serializationSchema.serialize(exceptionInfo, System.currentTimeMillis()))( - kafkaProducerCreator - ) + override final def consume(exceptionInfo: NuExceptionInfo[NonTransientException]): Unit = { + sendKafkaMessage(serializationSchema.serialize(exceptionInfo, System.currentTimeMillis())) + .recoverWith { case e: RecordTooLargeException => + val scenario = metaData.id + val node = exceptionInfo.nodeComponentInfo.map(_.nodeId).getOrElse("-") + val error = exceptionInfo.throwable.message + logger.warn( + s"Cannot write to $topic, retrying with stripped context (scenario: $scenario, node: $node, error: $error). ${e.getMessage}" + ) + + val lightExceptionInfo = exceptionInfo.copy( + context = exceptionInfo.context.copy(variables = Map.empty, parentContext = None) + ) + + sendKafkaMessage(serializationSchema.serialize(lightExceptionInfo, System.currentTimeMillis())) + }(ExecutionContext.Implicits.global) + .recover { case e: Throwable => + val scenario = metaData.id + val node = exceptionInfo.nodeComponentInfo.map(_.nodeId).getOrElse("-") + val error = exceptionInfo.throwable.message + + logger.warn( + s"Failed to write to $topic (scenario: $scenario, node: $node, error: $error): ${e.getMessage}", + e + ) + }(ExecutionContext.Implicits.global) + } + +} + +case class TempProducerKafkaExceptionConsumer( + metaData: MetaData, + serializationSchema: KafkaSerializationSchema[NuExceptionInfo[NonTransientException]], + kafkaProducerCreator: KafkaProducerCreator.Binary, + kafkaErrorTopicInitializer: KafkaErrorTopicInitializer +) extends BaseKafkaExceptionConsumer { + + override protected val topic: String = kafkaErrorTopicInitializer.exceptionHandlerConfig.topic + + override protected def sendKafkaMessage(record: ProducerRecord[Array[Byte], Array[Byte]]): Future[Unit] = { + KafkaUtils + .sendToKafkaWithTempProducer(record)(kafkaProducerCreator) + .map(_ => ())(ExecutionContext.global) } } @@ -66,18 +112,15 @@ case class SharedProducerKafkaExceptionConsumer( serializationSchema: KafkaSerializationSchema[NuExceptionInfo[NonTransientException]], kafkaProducerCreator: KafkaProducerCreator.Binary, kafkaErrorTopicInitializer: KafkaErrorTopicInitializer -) extends FlinkEspExceptionConsumer +) extends BaseKafkaExceptionConsumer with WithSharedKafkaProducer { - override def open(context: EngineRuntimeContext): Unit = { - super.open(context) - kafkaErrorTopicInitializer.init() - } + // can be null in tests + override protected val topic: String = + Option(kafkaErrorTopicInitializer).map(_.exceptionHandlerConfig.topic).getOrElse("-") - override def consume(exceptionInfo: NuExceptionInfo[NonTransientException]): Unit = { - sendToKafka(serializationSchema.serialize(exceptionInfo, System.currentTimeMillis()))( - SynchronousExecutionContextAndIORuntime.syncEc - ) + override protected def sendKafkaMessage(record: ProducerRecord[Array[Byte], Array[Byte]]): Future[Unit] = { + sendToKafka(record)(SynchronousExecutionContextAndIORuntime.syncEc) } } diff --git a/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSerializationSpec.scala b/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSerializationSpec.scala index 9be8cdf11a6..4b010a6e228 100644 --- a/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSerializationSpec.scala +++ b/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSerializationSpec.scala @@ -44,7 +44,7 @@ class KafkaExceptionConsumerSerializationSpec extends AnyFunSuite with Matchers // null as we don't test open here... private val consumer = - TempProducerKafkaExceptionConsumer(serializationSchema, MockProducerCreator(mockProducer), null) + TempProducerKafkaExceptionConsumer(metaData, serializationSchema, MockProducerCreator(mockProducer), null) test("records event") { consumer.consume(exception) diff --git a/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSpec.scala b/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSpec.scala index a181c837473..c9b05d920ec 100644 --- a/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSpec.scala +++ b/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSpec.scala @@ -1,13 +1,16 @@ package pl.touk.nussknacker.engine.kafka.exception import com.typesafe.config.ConfigValueFactory.fromAnyRef +import io.circe.Json +import io.circe.syntax.EncoderOps +import org.scalatest.OptionValues import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers -import pl.touk.nussknacker.engine.ModelData import pl.touk.nussknacker.engine.api.component.ComponentDefinition import pl.touk.nussknacker.engine.api.process.SinkFactory import pl.touk.nussknacker.engine.build.ScenarioBuilder import pl.touk.nussknacker.engine.flink.test.FlinkSpec +import pl.touk.nussknacker.engine.graph.expression.Expression import pl.touk.nussknacker.engine.kafka.KafkaSpec import pl.touk.nussknacker.engine.process.helpers.SampleNodes import pl.touk.nussknacker.engine.process.helpers.SampleNodes.SimpleRecord @@ -17,22 +20,38 @@ import pl.touk.nussknacker.engine.testing.LocalModelData import java.util.Date -class KafkaExceptionConsumerSpec extends AnyFunSuite with FlinkSpec with KafkaSpec with Matchers { +class KafkaExceptionConsumerSpec extends AnyFunSuite with OptionValues with FlinkSpec with KafkaSpec with Matchers { import pl.touk.nussknacker.engine.kafka.KafkaTestUtils.richConsumer - private val topicName = "testingErrors" - protected var modelData: ModelData = _ + test("should record errors on topic") { + val message = runTest(s"testProcess-shortString", stringVariable = "'short string'".spel) + + val inputEvent = message.inputEvent.value + inputEvent.asObject.value.filterKeys(_ == "string").asJson shouldBe Json.obj( + "string" -> "short string".asJson + ) + } + + test("should record errors on topic - strips context from too large error input") { + // long string variable: 8^7 = 2097152 = 2 MB + val message = + runTest("testProcess-longString", stringVariable = ("'xxxxxxxx'" + ".replaceAll('x', 'xxxxxxxx')".repeat(6)).spel) + + message.inputEvent.value shouldBe Json.obj() + } + + private def runTest(scenarioName: String, stringVariable: Expression): KafkaExceptionInfo = { + val topicName = s"$scenarioName.errors" - protected override def beforeAll(): Unit = { - super.beforeAll() val configWithExceptionHandler = config .withValue("exceptionHandler.type", fromAnyRef("Kafka")) .withValue("exceptionHandler.topic", fromAnyRef(topicName)) + .withValue("exceptionHandler.includeInputEvent", fromAnyRef(true)) .withValue("exceptionHandler.additionalParams.configurableKey", fromAnyRef("sampleValue")) .withValue("exceptionHandler.kafka", config.getConfig("kafka").root()) - modelData = LocalModelData( + val modelData = LocalModelData( configWithExceptionHandler, List( ComponentDefinition( @@ -42,28 +61,32 @@ class KafkaExceptionConsumerSpec extends AnyFunSuite with FlinkSpec with KafkaSp ComponentDefinition("sink", SinkFactory.noParam(SampleNodes.MonitorEmptySink)) ) ) - } - test("should record errors on topic") { val process = ScenarioBuilder - .streaming("testProcess") + .streaming(scenarioName) .source("source", "source") + .buildSimpleVariable("string", "string", stringVariable) .filter("shouldFail", "1/{0, 1}[0] != 10".spel) .emptySink("end", "sink") val env = flinkMiniCluster.createExecutionEnvironment() UnitTestsFlinkRunner.registerInEnvironmentWithModel(env, modelData)(process) - env.withJobRunning(process.name.value) { + val message = env.withJobRunning(process.name.value) { val consumed = kafkaClient.createConsumer().consumeWithJson[KafkaExceptionInfo](topicName).take(1).head - consumed.key() shouldBe "testProcess-shouldFail" - consumed.message().nodeId shouldBe Some("shouldFail") - consumed.message().processName.value shouldBe "testProcess" - consumed.message().message shouldBe Some("Expression [1/{0, 1}[0] != 10] evaluation failed, message: / by zero") - consumed.message().exceptionInput shouldBe Some("1/{0, 1}[0] != 10") - consumed.message().additionalData shouldBe Map("configurableKey" -> "sampleValue") + consumed.key() shouldBe s"$scenarioName-shouldFail" + + consumed.message() } + message.processName.value shouldBe scenarioName + message.nodeId shouldBe Some("shouldFail") + message.message shouldBe Some("Expression [1/{0, 1}[0] != 10] evaluation failed, message: / by zero") + message.exceptionInput shouldBe Some("1/{0, 1}[0] != 10") + message.stackTrace.value should include("evaluation failed, message:") + message.additionalData shouldBe Map("configurableKey" -> "sampleValue") + + message } } diff --git a/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaErrorTopicInitializer.scala b/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaErrorTopicInitializer.scala index 154dc492105..6223654ac81 100644 --- a/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaErrorTopicInitializer.scala +++ b/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaErrorTopicInitializer.scala @@ -10,7 +10,7 @@ import java.{lang, util} import scala.jdk.CollectionConverters._ import scala.util.control.NonFatal -class KafkaErrorTopicInitializer(kafkaConfig: KafkaConfig, exceptionHandlerConfig: KafkaExceptionConsumerConfig) +class KafkaErrorTopicInitializer(kafkaConfig: KafkaConfig, val exceptionHandlerConfig: KafkaExceptionConsumerConfig) extends LazyLogging { private val timeoutSeconds = 5 diff --git a/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchema.scala b/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchema.scala index ff04709d36b..b27b6a35eb3 100644 --- a/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchema.scala +++ b/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchema.scala @@ -28,7 +28,7 @@ class KafkaJsonExceptionSerializationSchema(metaData: MetaData, consumerConfig: .getBytes(StandardCharsets.UTF_8) val value = KafkaExceptionInfo(metaData, exceptionInfo, consumerConfig) val serializedValue = value.asJson.spaces2.getBytes(StandardCharsets.UTF_8) - new ProducerRecord(consumerConfig.topic, key, serializedValue) + new ProducerRecord(consumerConfig.topic, null, timestamp, key, serializedValue) } }