From 565df127252c8459fafcee8d7533d7ab87444164 Mon Sep 17 00:00:00 2001 From: Piotr Przybylski Date: Fri, 27 Sep 2024 16:11:31 +0200 Subject: [PATCH] better error message, remove warn log --- .../exception/KafkaExceptionConsumer.scala | 19 +++++++++++-------- .../KafkaExceptionConsumerSpec.scala | 7 +++++-- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumer.scala b/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumer.scala index d5ee2f36fb2..75eda6a4404 100644 --- a/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumer.scala +++ b/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumer.scala @@ -64,15 +64,18 @@ trait BaseKafkaExceptionConsumer extends FlinkEspExceptionConsumer with LazyLogg override final def consume(exceptionInfo: NuExceptionInfo[NonTransientException]): Unit = { sendKafkaMessage(serializationSchema.serialize(exceptionInfo, System.currentTimeMillis())) .recoverWith { case e: RecordTooLargeException => - val scenario = metaData.id - val node = exceptionInfo.nodeComponentInfo.map(_.nodeId).getOrElse("-") - val error = exceptionInfo.throwable.message - logger.warn( - s"Cannot write to $topic, retrying with stripped context (scenario: $scenario, node: $node, error: $error). ${e.getMessage}" - ) - + // Try to reduce Kafka message size in hope that it will fit configured limits. It's practically impossible + // to correctly detect and handle this limit preemptively, because: + // * Kafka limits are imposed on compressed message sizes + // * there are limits on multiple levels: producer, topic, broker (access requires additional permissions) val lightExceptionInfo = exceptionInfo.copy( - context = exceptionInfo.context.copy(variables = Map.empty, parentContext = None) + context = exceptionInfo.context.copy( + variables = Map( + "warning" -> s"variables truncated, they didn't fit within max allowed size of a Kafka message", + "truncationReason" -> e.getMessage + ), + parentContext = None + ) ) sendKafkaMessage(serializationSchema.serialize(lightExceptionInfo, System.currentTimeMillis())) diff --git a/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSpec.scala b/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSpec.scala index c9b05d920ec..1809e1b69b2 100644 --- a/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSpec.scala +++ b/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSpec.scala @@ -28,7 +28,7 @@ class KafkaExceptionConsumerSpec extends AnyFunSuite with OptionValues with Flin val message = runTest(s"testProcess-shortString", stringVariable = "'short string'".spel) val inputEvent = message.inputEvent.value - inputEvent.asObject.value.filterKeys(_ == "string").asJson shouldBe Json.obj( + inputEvent.asObject.value.filterKeys(_ != "input").asJson shouldBe Json.obj( "string" -> "short string".asJson ) } @@ -38,7 +38,10 @@ class KafkaExceptionConsumerSpec extends AnyFunSuite with OptionValues with Flin val message = runTest("testProcess-longString", stringVariable = ("'xxxxxxxx'" + ".replaceAll('x', 'xxxxxxxx')".repeat(6)).spel) - message.inputEvent.value shouldBe Json.obj() + val inputEvent = message.inputEvent.value + inputEvent.asObject.value.filterKeys(k => k != "input" && k != "truncationReason").asJson shouldBe Json.obj( + "warning" -> "variables truncated, they didn't fit within max allowed size of a Kafka message".asJson + ) } private def runTest(scenarioName: String, stringVariable: Expression): KafkaExceptionInfo = {