From 106fc6908b986c59bd55d4aefb4c32010170175a Mon Sep 17 00:00:00 2001 From: Piotr Przybylski Date: Thu, 26 Sep 2024 23:03:56 +0200 Subject: [PATCH 1/8] Kafka exceptionHandler: retry when message is too large, log when writing to Kafka fails --- docs/Changelog.md | 1 + .../exception/KafkaExceptionConsumer.scala | 79 ++++++++++++++----- ...kaExceptionConsumerSerializationSpec.scala | 2 +- .../KafkaExceptionConsumerSpec.scala | 57 +++++++++---- .../KafkaErrorTopicInitializer.scala | 2 +- ...afkaJsonExceptionSerializationSchema.scala | 2 +- 6 files changed, 103 insertions(+), 40 deletions(-) diff --git a/docs/Changelog.md b/docs/Changelog.md index 6b15d808e68..c53ae768f7e 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -64,6 +64,7 @@ * [#6952](https://github.com/TouK/nussknacker/pull/6952) Improvement: TypeInformation support for scala.Option * [#6840](https://github.com/TouK/nussknacker/pull/6840) Introduce canCastTo, castTo and castToOrNull extension methods in SpeL. * [#6974](https://github.com/TouK/nussknacker/pull/6974) Add SpeL suggestions for cast methods parameter. +* [#6958](https://github.com/TouK/nussknacker/pull/6958) Add message size limit in the "Kafka" exceptionHandler ## 1.17 diff --git a/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumer.scala b/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumer.scala index 3ecf09f105e..cefd3ddf0fc 100644 --- a/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumer.scala +++ b/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumer.scala @@ -1,8 +1,11 @@ package pl.touk.nussknacker.engine.kafka.exception import com.typesafe.config.Config +import com.typesafe.scalalogging.LazyLogging import net.ceedubs.ficus.Ficus._ import net.ceedubs.ficus.readers.ArbitraryTypeReader._ +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.errors.RecordTooLargeException import pl.touk.nussknacker.engine.api.MetaData import pl.touk.nussknacker.engine.api.exception.{NonTransientException, NuExceptionInfo} import pl.touk.nussknacker.engine.api.runtimecontext.EngineRuntimeContext @@ -13,6 +16,8 @@ import pl.touk.nussknacker.engine.kafka.{DefaultProducerCreator, KafkaConfig, Ka import pl.touk.nussknacker.engine.util.SynchronousExecutionContextAndIORuntime import pl.touk.nussknacker.engine.util.config.ConfigEnrichments.RichConfig +import scala.concurrent.{ExecutionContext, Future} + class KafkaExceptionConsumerProvider extends FlinkEspExceptionConsumerProvider { override def create(metaData: MetaData, exceptionHandlerConfig: Config): FlinkEspExceptionConsumer = { @@ -24,7 +29,7 @@ class KafkaExceptionConsumerProvider extends FlinkEspExceptionConsumerProvider { if (consumerConfig.useSharedProducer) { SharedProducerKafkaExceptionConsumer(metaData, serializationSchema, producerCreator, errorTopicInitializer) } else { - TempProducerKafkaExceptionConsumer(serializationSchema, producerCreator, errorTopicInitializer) + TempProducerKafkaExceptionConsumer(metaData, serializationSchema, producerCreator, errorTopicInitializer) } } @@ -42,42 +47,76 @@ class KafkaExceptionConsumerProvider extends FlinkEspExceptionConsumerProvider { } -case class TempProducerKafkaExceptionConsumer( - serializationSchema: KafkaSerializationSchema[NuExceptionInfo[NonTransientException]], - kafkaProducerCreator: KafkaProducerCreator.Binary, - kafkaErrorTopicInitializer: KafkaErrorTopicInitializer -) extends FlinkEspExceptionConsumer { +trait BaseKafkaExceptionConsumer extends FlinkEspExceptionConsumer with LazyLogging { + protected val serializationSchema: KafkaSerializationSchema[NuExceptionInfo[NonTransientException]] + protected val kafkaErrorTopicInitializer: KafkaErrorTopicInitializer + protected val metaData: MetaData + + // can be null in tests + private val topic: String = + Option(kafkaErrorTopicInitializer).map(_.exceptionHandlerConfig.topic).getOrElse("-") + + protected def sendKafkaMessage(record: ProducerRecord[Array[Byte], Array[Byte]]): Future[_] override def open(context: EngineRuntimeContext): Unit = { super.open(context) kafkaErrorTopicInitializer.init() } - override def consume(exceptionInfo: NuExceptionInfo[NonTransientException]): Unit = { - KafkaUtils.sendToKafkaWithTempProducer(serializationSchema.serialize(exceptionInfo, System.currentTimeMillis()))( - kafkaProducerCreator - ) + override final def consume(exceptionInfo: NuExceptionInfo[NonTransientException]): Unit = { + sendKafkaMessage(serializationSchema.serialize(exceptionInfo, System.currentTimeMillis())) + .recoverWith { case e: RecordTooLargeException => + val scenario = metaData.id + val node = exceptionInfo.nodeComponentInfo.map(_.nodeId).getOrElse("-") + val error = exceptionInfo.throwable.message + logger.warn( + s"Cannot write to $topic, retrying with stripped context (scenario: $scenario, node: $node, error: $error). ${e.getMessage}" + ) + + val lightExceptionInfo = exceptionInfo.copy( + context = exceptionInfo.context.copy(variables = Map.empty, parentContext = None) + ) + + sendKafkaMessage(serializationSchema.serialize(lightExceptionInfo, System.currentTimeMillis())) + }(ExecutionContext.Implicits.global) + .recover { case e: Throwable => + val scenario = metaData.id + val node = exceptionInfo.nodeComponentInfo.map(_.nodeId).getOrElse("-") + val error = exceptionInfo.throwable.message + + logger.warn( + s"Failed to write to $topic (scenario: $scenario, node: $node, error: $error): ${e.getMessage}", + e + ) + }(ExecutionContext.Implicits.global) } } -case class SharedProducerKafkaExceptionConsumer( +case class TempProducerKafkaExceptionConsumer( metaData: MetaData, serializationSchema: KafkaSerializationSchema[NuExceptionInfo[NonTransientException]], kafkaProducerCreator: KafkaProducerCreator.Binary, kafkaErrorTopicInitializer: KafkaErrorTopicInitializer -) extends FlinkEspExceptionConsumer - with WithSharedKafkaProducer { +) extends BaseKafkaExceptionConsumer { - override def open(context: EngineRuntimeContext): Unit = { - super.open(context) - kafkaErrorTopicInitializer.init() + override protected def sendKafkaMessage(record: ProducerRecord[Array[Byte], Array[Byte]]): Future[_] = { + KafkaUtils + .sendToKafkaWithTempProducer(record)(kafkaProducerCreator) } - override def consume(exceptionInfo: NuExceptionInfo[NonTransientException]): Unit = { - sendToKafka(serializationSchema.serialize(exceptionInfo, System.currentTimeMillis()))( - SynchronousExecutionContextAndIORuntime.syncEc - ) +} + +case class SharedProducerKafkaExceptionConsumer( + metaData: MetaData, + serializationSchema: KafkaSerializationSchema[NuExceptionInfo[NonTransientException]], + kafkaProducerCreator: KafkaProducerCreator.Binary, + kafkaErrorTopicInitializer: KafkaErrorTopicInitializer +) extends BaseKafkaExceptionConsumer + with WithSharedKafkaProducer { + + override protected def sendKafkaMessage(record: ProducerRecord[Array[Byte], Array[Byte]]): Future[_] = { + sendToKafka(record)(SynchronousExecutionContextAndIORuntime.syncEc) } } diff --git a/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSerializationSpec.scala b/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSerializationSpec.scala index 9be8cdf11a6..4b010a6e228 100644 --- a/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSerializationSpec.scala +++ b/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSerializationSpec.scala @@ -44,7 +44,7 @@ class KafkaExceptionConsumerSerializationSpec extends AnyFunSuite with Matchers // null as we don't test open here... private val consumer = - TempProducerKafkaExceptionConsumer(serializationSchema, MockProducerCreator(mockProducer), null) + TempProducerKafkaExceptionConsumer(metaData, serializationSchema, MockProducerCreator(mockProducer), null) test("records event") { consumer.consume(exception) diff --git a/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSpec.scala b/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSpec.scala index a181c837473..c9b05d920ec 100644 --- a/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSpec.scala +++ b/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSpec.scala @@ -1,13 +1,16 @@ package pl.touk.nussknacker.engine.kafka.exception import com.typesafe.config.ConfigValueFactory.fromAnyRef +import io.circe.Json +import io.circe.syntax.EncoderOps +import org.scalatest.OptionValues import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers -import pl.touk.nussknacker.engine.ModelData import pl.touk.nussknacker.engine.api.component.ComponentDefinition import pl.touk.nussknacker.engine.api.process.SinkFactory import pl.touk.nussknacker.engine.build.ScenarioBuilder import pl.touk.nussknacker.engine.flink.test.FlinkSpec +import pl.touk.nussknacker.engine.graph.expression.Expression import pl.touk.nussknacker.engine.kafka.KafkaSpec import pl.touk.nussknacker.engine.process.helpers.SampleNodes import pl.touk.nussknacker.engine.process.helpers.SampleNodes.SimpleRecord @@ -17,22 +20,38 @@ import pl.touk.nussknacker.engine.testing.LocalModelData import java.util.Date -class KafkaExceptionConsumerSpec extends AnyFunSuite with FlinkSpec with KafkaSpec with Matchers { +class KafkaExceptionConsumerSpec extends AnyFunSuite with OptionValues with FlinkSpec with KafkaSpec with Matchers { import pl.touk.nussknacker.engine.kafka.KafkaTestUtils.richConsumer - private val topicName = "testingErrors" - protected var modelData: ModelData = _ + test("should record errors on topic") { + val message = runTest(s"testProcess-shortString", stringVariable = "'short string'".spel) + + val inputEvent = message.inputEvent.value + inputEvent.asObject.value.filterKeys(_ == "string").asJson shouldBe Json.obj( + "string" -> "short string".asJson + ) + } + + test("should record errors on topic - strips context from too large error input") { + // long string variable: 8^7 = 2097152 = 2 MB + val message = + runTest("testProcess-longString", stringVariable = ("'xxxxxxxx'" + ".replaceAll('x', 'xxxxxxxx')".repeat(6)).spel) + + message.inputEvent.value shouldBe Json.obj() + } + + private def runTest(scenarioName: String, stringVariable: Expression): KafkaExceptionInfo = { + val topicName = s"$scenarioName.errors" - protected override def beforeAll(): Unit = { - super.beforeAll() val configWithExceptionHandler = config .withValue("exceptionHandler.type", fromAnyRef("Kafka")) .withValue("exceptionHandler.topic", fromAnyRef(topicName)) + .withValue("exceptionHandler.includeInputEvent", fromAnyRef(true)) .withValue("exceptionHandler.additionalParams.configurableKey", fromAnyRef("sampleValue")) .withValue("exceptionHandler.kafka", config.getConfig("kafka").root()) - modelData = LocalModelData( + val modelData = LocalModelData( configWithExceptionHandler, List( ComponentDefinition( @@ -42,28 +61,32 @@ class KafkaExceptionConsumerSpec extends AnyFunSuite with FlinkSpec with KafkaSp ComponentDefinition("sink", SinkFactory.noParam(SampleNodes.MonitorEmptySink)) ) ) - } - test("should record errors on topic") { val process = ScenarioBuilder - .streaming("testProcess") + .streaming(scenarioName) .source("source", "source") + .buildSimpleVariable("string", "string", stringVariable) .filter("shouldFail", "1/{0, 1}[0] != 10".spel) .emptySink("end", "sink") val env = flinkMiniCluster.createExecutionEnvironment() UnitTestsFlinkRunner.registerInEnvironmentWithModel(env, modelData)(process) - env.withJobRunning(process.name.value) { + val message = env.withJobRunning(process.name.value) { val consumed = kafkaClient.createConsumer().consumeWithJson[KafkaExceptionInfo](topicName).take(1).head - consumed.key() shouldBe "testProcess-shouldFail" - consumed.message().nodeId shouldBe Some("shouldFail") - consumed.message().processName.value shouldBe "testProcess" - consumed.message().message shouldBe Some("Expression [1/{0, 1}[0] != 10] evaluation failed, message: / by zero") - consumed.message().exceptionInput shouldBe Some("1/{0, 1}[0] != 10") - consumed.message().additionalData shouldBe Map("configurableKey" -> "sampleValue") + consumed.key() shouldBe s"$scenarioName-shouldFail" + + consumed.message() } + message.processName.value shouldBe scenarioName + message.nodeId shouldBe Some("shouldFail") + message.message shouldBe Some("Expression [1/{0, 1}[0] != 10] evaluation failed, message: / by zero") + message.exceptionInput shouldBe Some("1/{0, 1}[0] != 10") + message.stackTrace.value should include("evaluation failed, message:") + message.additionalData shouldBe Map("configurableKey" -> "sampleValue") + + message } } diff --git a/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaErrorTopicInitializer.scala b/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaErrorTopicInitializer.scala index 154dc492105..6223654ac81 100644 --- a/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaErrorTopicInitializer.scala +++ b/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaErrorTopicInitializer.scala @@ -10,7 +10,7 @@ import java.{lang, util} import scala.jdk.CollectionConverters._ import scala.util.control.NonFatal -class KafkaErrorTopicInitializer(kafkaConfig: KafkaConfig, exceptionHandlerConfig: KafkaExceptionConsumerConfig) +class KafkaErrorTopicInitializer(kafkaConfig: KafkaConfig, val exceptionHandlerConfig: KafkaExceptionConsumerConfig) extends LazyLogging { private val timeoutSeconds = 5 diff --git a/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchema.scala b/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchema.scala index ff04709d36b..b27b6a35eb3 100644 --- a/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchema.scala +++ b/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchema.scala @@ -28,7 +28,7 @@ class KafkaJsonExceptionSerializationSchema(metaData: MetaData, consumerConfig: .getBytes(StandardCharsets.UTF_8) val value = KafkaExceptionInfo(metaData, exceptionInfo, consumerConfig) val serializedValue = value.asJson.spaces2.getBytes(StandardCharsets.UTF_8) - new ProducerRecord(consumerConfig.topic, key, serializedValue) + new ProducerRecord(consumerConfig.topic, null, timestamp, key, serializedValue) } } From 22022e5d4f4f6aacac483395946babc74b796806 Mon Sep 17 00:00:00 2001 From: Piotr Przybylski Date: Fri, 27 Sep 2024 11:02:09 +0200 Subject: [PATCH 2/8] remove null --- .../exception/KafkaExceptionConsumer.scala | 6 ++-- ...kaExceptionConsumerSerializationSpec.scala | 8 +++-- .../kafka/KafkaSingleScenarioTaskRun.scala | 7 ++-- .../KafkaErrorTopicInitializerTest.scala | 4 +-- .../KafkaErrorTopicInitializer.scala | 34 +++++++++++++------ 5 files changed, 39 insertions(+), 20 deletions(-) diff --git a/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumer.scala b/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumer.scala index cefd3ddf0fc..d5ee2f36fb2 100644 --- a/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumer.scala +++ b/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumer.scala @@ -25,7 +25,7 @@ class KafkaExceptionConsumerProvider extends FlinkEspExceptionConsumerProvider { val consumerConfig = exceptionHandlerConfig.rootAs[KafkaExceptionConsumerConfig] val producerCreator = kafkaProducerCreator(kafkaConfig) val serializationSchema = createSerializationSchema(metaData, consumerConfig) - val errorTopicInitializer = new KafkaErrorTopicInitializer(kafkaConfig, consumerConfig) + val errorTopicInitializer = new DefaultKafkaErrorTopicInitializer(kafkaConfig, consumerConfig) if (consumerConfig.useSharedProducer) { SharedProducerKafkaExceptionConsumer(metaData, serializationSchema, producerCreator, errorTopicInitializer) } else { @@ -52,9 +52,7 @@ trait BaseKafkaExceptionConsumer extends FlinkEspExceptionConsumer with LazyLogg protected val kafkaErrorTopicInitializer: KafkaErrorTopicInitializer protected val metaData: MetaData - // can be null in tests - private val topic: String = - Option(kafkaErrorTopicInitializer).map(_.exceptionHandlerConfig.topic).getOrElse("-") + private val topic: String = kafkaErrorTopicInitializer.topicName protected def sendKafkaMessage(record: ProducerRecord[Array[Byte], Array[Byte]]): Future[_] diff --git a/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSerializationSpec.scala b/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSerializationSpec.scala index 4b010a6e228..7b5df7153e8 100644 --- a/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSerializationSpec.scala +++ b/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSerializationSpec.scala @@ -42,9 +42,13 @@ class KafkaExceptionConsumerSerializationSpec extends AnyFunSuite with Matchers private val serializationSchema = new KafkaJsonExceptionSerializationSchema(metaData, consumerConfig) - // null as we don't test open here... private val consumer = - TempProducerKafkaExceptionConsumer(metaData, serializationSchema, MockProducerCreator(mockProducer), null) + TempProducerKafkaExceptionConsumer( + metaData, + serializationSchema, + MockProducerCreator(mockProducer), + NoopKafkaErrorTopicInitializer + ) test("records event") { consumer.consume(exception) diff --git a/engine/lite/kafka/runtime/src/main/scala/pl/touk/nussknacker/engine/lite/kafka/KafkaSingleScenarioTaskRun.scala b/engine/lite/kafka/runtime/src/main/scala/pl/touk/nussknacker/engine/lite/kafka/KafkaSingleScenarioTaskRun.scala index a510f7ab5da..f2a15d4860f 100644 --- a/engine/lite/kafka/runtime/src/main/scala/pl/touk/nussknacker/engine/lite/kafka/KafkaSingleScenarioTaskRun.scala +++ b/engine/lite/kafka/runtime/src/main/scala/pl/touk/nussknacker/engine/lite/kafka/KafkaSingleScenarioTaskRun.scala @@ -17,7 +17,10 @@ import pl.touk.nussknacker.engine.api.process.TopicName import pl.touk.nussknacker.engine.api.runtimecontext.EngineRuntimeContext import pl.touk.nussknacker.engine.api.{MetaData, VariableConstants} import pl.touk.nussknacker.engine.kafka.KafkaUtils -import pl.touk.nussknacker.engine.kafka.exception.{KafkaErrorTopicInitializer, KafkaJsonExceptionSerializationSchema} +import pl.touk.nussknacker.engine.kafka.exception.{ + DefaultKafkaErrorTopicInitializer, + KafkaJsonExceptionSerializationSchema +} import pl.touk.nussknacker.engine.lite.ScenarioInterpreterFactory.ScenarioInterpreterWithLifecycle import pl.touk.nussknacker.engine.lite.api.commonTypes.{ErrorType, ResultType} import pl.touk.nussknacker.engine.lite.api.interpreterTypes @@ -72,7 +75,7 @@ class KafkaSingleScenarioTaskRun( def init(): Unit = { configSanityCheck() - new KafkaErrorTopicInitializer(engineConfig.kafka, engineConfig.exceptionHandlingConfig).init() + new DefaultKafkaErrorTopicInitializer(engineConfig.kafka, engineConfig.exceptionHandlingConfig).init() producer = prepareProducer consumer = prepareConsumer diff --git a/engine/lite/kafka/runtime/src/test/scala/pl/touk/nussknacker/engine/lite/kafka/KafkaErrorTopicInitializerTest.scala b/engine/lite/kafka/runtime/src/test/scala/pl/touk/nussknacker/engine/lite/kafka/KafkaErrorTopicInitializerTest.scala index 27c1a1e3e97..f613d63e014 100644 --- a/engine/lite/kafka/runtime/src/test/scala/pl/touk/nussknacker/engine/lite/kafka/KafkaErrorTopicInitializerTest.scala +++ b/engine/lite/kafka/runtime/src/test/scala/pl/touk/nussknacker/engine/lite/kafka/KafkaErrorTopicInitializerTest.scala @@ -7,7 +7,7 @@ import net.ceedubs.ficus.readers.EnumerationReader._ import org.scalatest.matchers.should.Matchers import org.scalatest.funsuite.AnyFunSuite import pl.touk.nussknacker.engine.kafka.KafkaSpec -import pl.touk.nussknacker.engine.kafka.exception.KafkaErrorTopicInitializer +import pl.touk.nussknacker.engine.kafka.exception.{DefaultKafkaErrorTopicInitializer, KafkaErrorTopicInitializer} import pl.touk.nussknacker.engine.lite.kafka.KafkaTransactionalScenarioInterpreter._ import pl.touk.nussknacker.test.PatientScalaFutures @@ -19,7 +19,7 @@ class KafkaErrorTopicInitializerTest extends AnyFunSuite with KafkaSpec with Mat val engineConfig = config .withValue("exceptionHandlingConfig.topic", fromAnyRef(topic)) .as[KafkaInterpreterConfig] - new KafkaErrorTopicInitializer(engineConfig.kafka, engineConfig.exceptionHandlingConfig) + new DefaultKafkaErrorTopicInitializer(engineConfig.kafka, engineConfig.exceptionHandlingConfig) } test("should create topic if not exists") { diff --git a/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaErrorTopicInitializer.scala b/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaErrorTopicInitializer.scala index 6223654ac81..fefd33fd32b 100644 --- a/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaErrorTopicInitializer.scala +++ b/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaErrorTopicInitializer.scala @@ -1,39 +1,46 @@ package pl.touk.nussknacker.engine.kafka.exception import com.typesafe.scalalogging.LazyLogging -import org.apache.kafka.clients.admin.{ListTopicsOptions, NewTopic} +import org.apache.kafka.clients.admin.{DescribeTopicsOptions, ListTopicsOptions, NewTopic} import pl.touk.nussknacker.engine.kafka.{KafkaConfig, KafkaUtils} -import java.util.Optional +import java.util.{Collections, Optional} import java.util.concurrent.TimeUnit import java.{lang, util} import scala.jdk.CollectionConverters._ import scala.util.control.NonFatal -class KafkaErrorTopicInitializer(kafkaConfig: KafkaConfig, val exceptionHandlerConfig: KafkaExceptionConsumerConfig) - extends LazyLogging { +trait KafkaErrorTopicInitializer { + def topicName: String + def init(): Unit +} + +class DefaultKafkaErrorTopicInitializer(kafkaConfig: KafkaConfig, exceptionHandlerConfig: KafkaExceptionConsumerConfig) + extends KafkaErrorTopicInitializer + with LazyLogging { private val timeoutSeconds = 5 + val topicName: String = exceptionHandlerConfig.topic + def init(): Unit = { - val errorTopic = exceptionHandlerConfig.topic KafkaUtils.usingAdminClient(kafkaConfig) { admin => val topicNames = admin .listTopics(new ListTopicsOptions().timeoutMs(timeoutSeconds * 1000)) .names() .get(timeoutSeconds, TimeUnit.SECONDS) - val topicExists = topicNames.asScala.contains(errorTopic) + val topicExists = topicNames.asScala.contains(topicName) if (topicExists) { - logger.debug("Topic exists, skipping") + logger.debug(s"Topic $topicName already exists, skipping") } else { - logger.info(s"Creating error topic: $errorTopic with default configs, please check if the values are correct") - val errorTopicConfig = new NewTopic(errorTopic, Optional.empty[Integer](), Optional.empty[lang.Short]()) + logger.info(s"Creating error topic: $topicName with default configs, please check if the values are correct") + val errorTopicConfig = new NewTopic(topicName, Optional.empty[Integer](), Optional.empty[lang.Short]()) try { admin.createTopics(util.Arrays.asList(errorTopicConfig)).all().get(timeoutSeconds, TimeUnit.SECONDS) } catch { case NonFatal(e) => throw new IllegalStateException( - s"Failed to create $errorTopic (${e.getMessage}), cannot run scenario properly", + s"Failed to create $topicName (${e.getMessage}), cannot run scenario properly", e ) } @@ -42,3 +49,10 @@ class KafkaErrorTopicInitializer(kafkaConfig: KafkaConfig, val exceptionHandlerC } } + +object NoopKafkaErrorTopicInitializer extends KafkaErrorTopicInitializer { + + override val topicName: String = "-" + + override def init(): Unit = {} +} From aa5ab9598aefa6c7cc0cf6a68c130f28d1c10359 Mon Sep 17 00:00:00 2001 From: Piotr Przybylski Date: Fri, 27 Sep 2024 16:11:31 +0200 Subject: [PATCH 3/8] better error message, remove warn log --- .../exception/KafkaExceptionConsumer.scala | 19 +++++++++++-------- .../KafkaExceptionConsumerSpec.scala | 7 +++++-- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumer.scala b/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumer.scala index d5ee2f36fb2..75eda6a4404 100644 --- a/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumer.scala +++ b/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumer.scala @@ -64,15 +64,18 @@ trait BaseKafkaExceptionConsumer extends FlinkEspExceptionConsumer with LazyLogg override final def consume(exceptionInfo: NuExceptionInfo[NonTransientException]): Unit = { sendKafkaMessage(serializationSchema.serialize(exceptionInfo, System.currentTimeMillis())) .recoverWith { case e: RecordTooLargeException => - val scenario = metaData.id - val node = exceptionInfo.nodeComponentInfo.map(_.nodeId).getOrElse("-") - val error = exceptionInfo.throwable.message - logger.warn( - s"Cannot write to $topic, retrying with stripped context (scenario: $scenario, node: $node, error: $error). ${e.getMessage}" - ) - + // Try to reduce Kafka message size in hope that it will fit configured limits. It's practically impossible + // to correctly detect and handle this limit preemptively, because: + // * Kafka limits are imposed on compressed message sizes + // * there are limits on multiple levels: producer, topic, broker (access requires additional permissions) val lightExceptionInfo = exceptionInfo.copy( - context = exceptionInfo.context.copy(variables = Map.empty, parentContext = None) + context = exceptionInfo.context.copy( + variables = Map( + "warning" -> s"variables truncated, they didn't fit within max allowed size of a Kafka message", + "truncationReason" -> e.getMessage + ), + parentContext = None + ) ) sendKafkaMessage(serializationSchema.serialize(lightExceptionInfo, System.currentTimeMillis())) diff --git a/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSpec.scala b/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSpec.scala index c9b05d920ec..1809e1b69b2 100644 --- a/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSpec.scala +++ b/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSpec.scala @@ -28,7 +28,7 @@ class KafkaExceptionConsumerSpec extends AnyFunSuite with OptionValues with Flin val message = runTest(s"testProcess-shortString", stringVariable = "'short string'".spel) val inputEvent = message.inputEvent.value - inputEvent.asObject.value.filterKeys(_ == "string").asJson shouldBe Json.obj( + inputEvent.asObject.value.filterKeys(_ != "input").asJson shouldBe Json.obj( "string" -> "short string".asJson ) } @@ -38,7 +38,10 @@ class KafkaExceptionConsumerSpec extends AnyFunSuite with OptionValues with Flin val message = runTest("testProcess-longString", stringVariable = ("'xxxxxxxx'" + ".replaceAll('x', 'xxxxxxxx')".repeat(6)).spel) - message.inputEvent.value shouldBe Json.obj() + val inputEvent = message.inputEvent.value + inputEvent.asObject.value.filterKeys(k => k != "input" && k != "truncationReason").asJson shouldBe Json.obj( + "warning" -> "variables truncated, they didn't fit within max allowed size of a Kafka message".asJson + ) } private def runTest(scenarioName: String, stringVariable: Expression): KafkaExceptionInfo = { From 28b7e733fc2aa9080fd0fedf3ae60a27b9203062 Mon Sep 17 00:00:00 2001 From: Piotr Przybylski Date: Sun, 6 Oct 2024 22:40:01 +0200 Subject: [PATCH 4/8] smart variable pruning --- build.sbt | 3 +- docs/MigrationGuide.md | 6 + docs/integration/KafkaIntegration.md | 18 +-- .../exception/KafkaExceptionConsumer.scala | 17 ++- .../KafkaExceptionConsumerSpec.scala | 42 ++++-- .../KafkaExceptionConsumerConfig.scala | 2 + ...afkaJsonExceptionSerializationSchema.scala | 120 +++++++++++++++++- ...JsonExceptionSerializationSchemaSpec.scala | 108 ++++++++++++++++ 8 files changed, 287 insertions(+), 29 deletions(-) create mode 100644 utils/kafka-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchemaSpec.scala diff --git a/build.sbt b/build.sbt index 36546a0a884..da0efeb77a7 100644 --- a/build.sbt +++ b/build.sbt @@ -891,7 +891,8 @@ lazy val kafkaUtils = (project in utils("kafka-utils")) name := "nussknacker-kafka-utils", libraryDependencies ++= { Seq( - "org.apache.kafka" % "kafka-clients" % kafkaV + "org.apache.kafka" % "kafka-clients" % kafkaV, + "org.scalatest" %% "scalatest" % scalaTestV % Test, ) } // Depends on componentsApi because of dependency to NuExceptionInfo and NonTransientException - diff --git a/docs/MigrationGuide.md b/docs/MigrationGuide.md index 0dcbf40e175..14a4cfa9339 100644 --- a/docs/MigrationGuide.md +++ b/docs/MigrationGuide.md @@ -23,6 +23,12 @@ To see the biggest differences please consult the [changelog](Changelog.md). * Migration API changes: * POST `/api/migrate` supports v2 request format (with `scenarioLabels` field) +### Configuration changes + +* [#6958](https://github.com/TouK/nussknacker/pull/6958) Added message size limit in the "Kafka" exceptionHandler: `maxMessageBytes`. + Its default value reflects Kafka's default size limit of 1 MB (`max.message.bytes`), you need to increase it if your + error topic allows for larger messages. Remember to add some margin for Kafka protocol overhead (100 bytes should be enough). + ### Other changes * [#6692](https://github.com/TouK/nussknacker/pull/6692) Kryo serializers for `UnmodifiableCollection`, `scala.Product` etc. diff --git a/docs/integration/KafkaIntegration.md b/docs/integration/KafkaIntegration.md index aba90eedd29..fd2c4edef06 100644 --- a/docs/integration/KafkaIntegration.md +++ b/docs/integration/KafkaIntegration.md @@ -227,15 +227,15 @@ Errors can be sent to specified Kafka topic in following json format (see below Following properties can be configured (please look at correct engine page : [Lite](../../configuration/model/Lite#exception-handling) or [Flink](../../configuration/model/Flink#configuring-exception-handling), to see where they should be set): -| Name | Default value | Description | -|------------------------|---------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| topic | - | Topic where errors will be sent. If the topic does not exist, it will be created (with default settings - for production deployments make sure the config is ok) during deploy. If (e.g. due to ACL settings) the topic cannot be created, the scenarios will fail, in that case, the topic has to be configured manually. | -| stackTraceLengthLimit | 50 | Limit of stacktrace length that will be sent (0 to omit stacktrace at all) | -| includeHost | true | Should name of host where error occurred (e.g. TaskManager in case of Flink) be included. Can be misleading if there are many network interfaces or hostname is improperly configured) | -| includeInputEvent | false | Should input event be serialized (can be large or contain sensitive data so use with care) | -| useSharedProducer | false | For better performance shared Kafka producer can be used (by default it's created and closed for each error), shared Producer is kind of experimental feature and should be used with care | -| additionalParams | {} | Map of fixed parameters that can be added to Kafka message | - +| Name | Default value | Description | +|-----------------------|---------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| topic | - | Topic where errors will be sent. If the topic does not exist, it will be created (with default settings - for production deployments make sure the config is ok) during deploy. If (e.g. due to ACL settings) the topic cannot be created, the scenarios will fail, in that case, the topic has to be configured manually. | +| stackTraceLengthLimit | 50 | Limit of stacktrace length that will be sent (0 to omit stacktrace at all) | +| maxMessageBytes | 1048476 | Limit Kafka message length by truncating variables from input event (enabled by `includeInputEvent`), defaults to Kafka's default `max.message.bytes` of 1 MB with 100 bytes of safety margin | +| includeHost | true | Should name of host where error occurred (e.g. TaskManager in case of Flink) be included. Can be misleading if there are many network interfaces or hostname is improperly configured) | +| includeInputEvent | false | Should input event be serialized (can be large or contain sensitive data so use with care) | +| useSharedProducer | false | For better performance shared Kafka producer can be used (by default it's created and closed for each error), shared Producer is kind of experimental feature and should be used with care | +| additionalParams | {} | Map of fixed parameters that can be added to Kafka message | ### Configuration for Flink engine diff --git a/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumer.scala b/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumer.scala index 75eda6a4404..411f790e210 100644 --- a/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumer.scala +++ b/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumer.scala @@ -64,15 +64,26 @@ trait BaseKafkaExceptionConsumer extends FlinkEspExceptionConsumer with LazyLogg override final def consume(exceptionInfo: NuExceptionInfo[NonTransientException]): Unit = { sendKafkaMessage(serializationSchema.serialize(exceptionInfo, System.currentTimeMillis())) .recoverWith { case e: RecordTooLargeException => + // Kafka message size should be kept within acceptable size by serializer, but it may be incorrectly configured. + // We may also encounter this exception due to producer's 'buffer.memory' being set too low. + // // Try to reduce Kafka message size in hope that it will fit configured limits. It's practically impossible // to correctly detect and handle this limit preemptively, because: - // * Kafka limits are imposed on compressed message sizes + // * Kafka limits are imposed on compressed message sizes (with headers and protocol overhead) // * there are limits on multiple levels: producer, topic, broker (access requires additional permissions) + + val scenario = metaData.id + val node = exceptionInfo.nodeComponentInfo.map(_.nodeId).getOrElse("-") + val error = exceptionInfo.throwable.message + logger.warn( + s"Cannot write to $topic, retrying with stripped context (scenario: $scenario, node: $node, error: $error)." + + s"Verify your configuration of Kafka producer, error logging and errors topic and set correct limits. ${e.getMessage}" + ) + val lightExceptionInfo = exceptionInfo.copy( context = exceptionInfo.context.copy( variables = Map( - "warning" -> s"variables truncated, they didn't fit within max allowed size of a Kafka message", - "truncationReason" -> e.getMessage + "!warning" -> s"variables truncated, they didn't fit within max allowed size of a Kafka message: ${e.getMessage}", ), parentContext = None ) diff --git a/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSpec.scala b/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSpec.scala index 1809e1b69b2..02d6d5389e0 100644 --- a/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSpec.scala +++ b/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSpec.scala @@ -1,9 +1,7 @@ package pl.touk.nussknacker.engine.kafka.exception import com.typesafe.config.ConfigValueFactory.fromAnyRef -import io.circe.Json -import io.circe.syntax.EncoderOps -import org.scalatest.OptionValues +import org.scalatest.{EitherValues, OptionValues} import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers import pl.touk.nussknacker.engine.api.component.ComponentDefinition @@ -20,36 +18,51 @@ import pl.touk.nussknacker.engine.testing.LocalModelData import java.util.Date -class KafkaExceptionConsumerSpec extends AnyFunSuite with OptionValues with FlinkSpec with KafkaSpec with Matchers { +class KafkaExceptionConsumerSpec + extends AnyFunSuite + with OptionValues + with FlinkSpec + with KafkaSpec + with Matchers + with EitherValues { import pl.touk.nussknacker.engine.kafka.KafkaTestUtils.richConsumer test("should record errors on topic") { val message = runTest(s"testProcess-shortString", stringVariable = "'short string'".spel) - val inputEvent = message.inputEvent.value - inputEvent.asObject.value.filterKeys(_ != "input").asJson shouldBe Json.obj( - "string" -> "short string".asJson + val inputEvent = extractInputEventMap(message) + inputEvent.view.filterKeys(_ != "input").toMap shouldBe Map( + "string" -> "short string" ) } test("should record errors on topic - strips context from too large error input") { // long string variable: 8^7 = 2097152 = 2 MB - val message = - runTest("testProcess-longString", stringVariable = ("'xxxxxxxx'" + ".replaceAll('x', 'xxxxxxxx')".repeat(6)).spel) + val message = runTest( + "testProcess-longString", + stringVariable = ("'xxxxxxxx'" + ".replaceAll('x', 'xxxxxxxx')".repeat(6)).spel, + maxMessageBytes = 5242880 + ) - val inputEvent = message.inputEvent.value - inputEvent.asObject.value.filterKeys(k => k != "input" && k != "truncationReason").asJson shouldBe Json.obj( - "warning" -> "variables truncated, they didn't fit within max allowed size of a Kafka message".asJson + val inputEvent = extractInputEventMap(message) + inputEvent.keySet shouldBe Set("!warning") + inputEvent("!warning") should startWith( + "variables truncated, they didn't fit within max allowed size of a Kafka message:" ) } - private def runTest(scenarioName: String, stringVariable: Expression): KafkaExceptionInfo = { + private def runTest( + scenarioName: String, + stringVariable: Expression, + maxMessageBytes: Int = 1048576 + ): KafkaExceptionInfo = { val topicName = s"$scenarioName.errors" val configWithExceptionHandler = config .withValue("exceptionHandler.type", fromAnyRef("Kafka")) .withValue("exceptionHandler.topic", fromAnyRef(topicName)) + .withValue("exceptionHandler.maxMessageBytes", fromAnyRef(maxMessageBytes)) .withValue("exceptionHandler.includeInputEvent", fromAnyRef(true)) .withValue("exceptionHandler.additionalParams.configurableKey", fromAnyRef("sampleValue")) .withValue("exceptionHandler.kafka", config.getConfig("kafka").root()) @@ -92,4 +105,7 @@ class KafkaExceptionConsumerSpec extends AnyFunSuite with OptionValues with Flin message } + def extractInputEventMap(message: KafkaExceptionInfo): Map[String, String] = + message.inputEvent.value.as[Map[String, String]].value + } diff --git a/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerConfig.scala b/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerConfig.scala index f0d91e8a05f..ad29ca11ec4 100644 --- a/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerConfig.scala +++ b/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerConfig.scala @@ -4,6 +4,8 @@ case class KafkaExceptionConsumerConfig( topic: String, // quite large to be able to show nested exception stackTraceLengthLimit: Int = 50, + // 1 MB (default max.message.bytes) - 100 bytes of margin for protocol overhead + maxMessageBytes: Int = 1048476, includeHost: Boolean = true, includeInputEvent: Boolean = false, // by default we use temp producer, as it's more robust diff --git a/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchema.scala b/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchema.scala index b27b6a35eb3..5771d091944 100644 --- a/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchema.scala +++ b/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchema.scala @@ -1,10 +1,12 @@ package pl.touk.nussknacker.engine.kafka.exception -import io.circe.Json +import io.circe.{ACursor, Json} import io.circe.generic.JsonCodec import io.circe.syntax.EncoderOps import org.apache.kafka.clients.producer.ProducerRecord -import pl.touk.nussknacker.engine.api.{Context, MetaData} +import org.apache.kafka.common.record.DefaultRecordBatch +import org.apache.kafka.common.utils.Utils +import pl.touk.nussknacker.engine.api.MetaData import pl.touk.nussknacker.engine.api.exception.{NonTransientException, NuExceptionInfo} import pl.touk.nussknacker.engine.api.process.ProcessName import pl.touk.nussknacker.engine.kafka.serialization.KafkaSerializationSchema @@ -14,11 +16,122 @@ import java.io.{PrintWriter, StringWriter} import java.lang import java.net.InetAddress import java.nio.charset.StandardCharsets +import scala.annotation.tailrec import scala.io.Source +object KafkaJsonExceptionSerializationSchema { + private[exception] val recordOverhead = DefaultRecordBatch.RECORD_BATCH_OVERHEAD + private val indentLength = 2 + private val warningKey = "!warning" + + private def serializeValueWithSizeLimit(value: KafkaExceptionInfo, maxValueBytes: Int): Array[Byte] = { + val valueJson = value.asJson + val serializedValue = valueJson.spaces2.getBytes(StandardCharsets.UTF_8) + if (value.inputEvent.isDefined && serializedValue.length > maxValueBytes) { + removeInputEventKeys(valueJson, serializedValue.length, maxValueBytes).spaces2.getBytes(StandardCharsets.UTF_8) + } else { + serializedValue + } + } + + private def removeInputEventKeys(value: Json, valueBytes: Int, maxValueBytes: Int): Json = { + if (valueBytes <= maxValueBytes) { + return value + } + + val removedBytesPlaceholder = "$" * valueBytes.toString.length + // text below will increase JSON size, but we are working under the assumption that its size is much smaller + // than the size of inputEvent keys that will get removed + val messageTemplate = + s"inputEvent truncated, original error event had $valueBytes bytes, which was more then the max allowed length of $maxValueBytes bytes. " + + s"Removed variables ($removedBytesPlaceholder bytes)" + + // |{ + // | "inputEvent" : { + // | "name": ... + val warningBytes = 2 * indentLength + Utils.utf8Length(s"\"$warningKey\" : ${messageTemplate.asJson.spaces2},\n") + val bytesToCut = valueBytes + warningBytes - maxValueBytes + + val variablesWithLength = countVariableLengths(value.hcursor.downField("inputEvent")) + val (variablesToRemove, removedBytes) = calculateKeysToRemove(variablesWithLength, bytesToCut) + + if (removedBytes <= warningBytes) { + // this may happen only when doing tests with really low size limits + return value + } + + val finalMessage = messageTemplate.replace(removedBytesPlaceholder, removedBytes.toString) + + variablesToRemove.toSeq.sorted.mkString(": ", ", ", "") + + value.hcursor + .downField("inputEvent") + .withFocus { json => + json.asObject + .map { inputEventObject => + inputEventObject + .filterKeys(key => !variablesToRemove.contains(key)) + .add(warningKey, finalMessage.asJson) + .toJson + } + .getOrElse(json) + } + .top + .getOrElse(value) + } + + private[exception] def countVariableLengths(inputEvent: ACursor): List[(String, Int)] = { + // Each removed key will be moved to a message that enumerates all removed keys, so this: + // | "inputEvent" : { + // | "variable_name" : "value", + // | } + // will be transformed into a partial string: `, variable_name` (2-byte separator + variable name). + // Because of that counted size doesn't include `,\n`, as these bytes will be reused. + // + // Assumption that all values are terminated by `,\n` may make us overestimate a single variable by one byte, + // but it's safer to remove more and still fit within given size limit. + val jsonIndent = 2 * indentLength + val jsonKeyOverhead = jsonIndent + 2 /* key quotes */ + 3 /* ` : ` */ + + inputEvent.keys + .getOrElse(Iterable.empty) + .map { key => + val variableBytes = inputEvent + .get[Json](key) + .map { variable => + val serializedVariable = variable.spaces2 + Utils.utf8Length(serializedVariable) + (jsonIndent * serializedVariable.count(_ == '\n')) + } + .getOrElse(0) + key -> (jsonKeyOverhead + variableBytes) + } + .toList + } + + private def calculateKeysToRemove(keysWithLength: List[(String, Int)], bytesToRecover: Int): (Set[String], Int) = { + @tailrec + def collectKeysToRemove( + allKeys: List[(String, Int)], + keysToRemove: List[String], + bytesSoFar: Int + ): (Set[String], Int) = { + allKeys match { + case (key, bytes) :: tail if bytesSoFar < bytesToRecover => + collectKeysToRemove(tail, key :: keysToRemove, bytes + bytesSoFar) + case _ => + (keysToRemove.toSet, bytesSoFar) + } + } + + collectKeysToRemove(keysWithLength.sortBy { case (_, length) => length }.reverse, List.empty, 0) + } + +} + class KafkaJsonExceptionSerializationSchema(metaData: MetaData, consumerConfig: KafkaExceptionConsumerConfig) extends KafkaSerializationSchema[NuExceptionInfo[NonTransientException]] { + import KafkaJsonExceptionSerializationSchema._ + override def serialize( exceptionInfo: NuExceptionInfo[NonTransientException], timestamp: lang.Long @@ -27,7 +140,8 @@ class KafkaJsonExceptionSerializationSchema(metaData: MetaData, consumerConfig: s"${metaData.name}-${exceptionInfo.nodeComponentInfo.map(_.nodeId).getOrElse("")}" .getBytes(StandardCharsets.UTF_8) val value = KafkaExceptionInfo(metaData, exceptionInfo, consumerConfig) - val serializedValue = value.asJson.spaces2.getBytes(StandardCharsets.UTF_8) + val maxValueBytes = consumerConfig.maxMessageBytes - key.length - recordOverhead + val serializedValue = serializeValueWithSizeLimit(value, maxValueBytes) new ProducerRecord(consumerConfig.topic, null, timestamp, key, serializedValue) } diff --git a/utils/kafka-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchemaSpec.scala b/utils/kafka-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchemaSpec.scala new file mode 100644 index 00000000000..067a6d13fc6 --- /dev/null +++ b/utils/kafka-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchemaSpec.scala @@ -0,0 +1,108 @@ +package pl.touk.nussknacker.engine.kafka.exception + +import io.circe.{Decoder, KeyDecoder} +import io.circe.parser.decode +import io.circe.syntax.EncoderOps +import org.scalatest.EitherValues +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers +import pl.touk.nussknacker.engine.api.exception.{NonTransientException, NuExceptionInfo} +import pl.touk.nussknacker.engine.api.{Context, MetaData, StreamMetaData} + +import java.nio.charset.StandardCharsets +import java.time.Instant + +class KafkaJsonExceptionSerializationSchemaSpec extends AnyFunSuite with Matchers with EitherValues { + private val timestamp = Instant.now() + private val exception = NonTransientException("input", "message", timestamp) + private implicit val mapDecoder: Decoder[Map[String, String]] = + Decoder.decodeMap(KeyDecoder.decodeKeyString, Decoder.decodeString) + + test("variable sizes are calculated correctly") { + def jsonBytes(variables: Map[String, Any]): Int = { + createKafkaExceptionInfo(variables).asJson.spaces2.length + } + + def serializedSize(variables: Map[String, Any], variableName: String): Int = { + jsonBytes(variables) - jsonBytes(variables - variableName) + } + + val variables = Map( + "str" -> "12345", + "int" -> 12345, + "number" -> 123.45, + "array" -> Seq(1, 2), + "object" -> Map("a" -> 1, "b" -> 2), + "complexObject" -> Map("a" -> 1, "b" -> Seq(1, 2, "c"), "c" -> Map.empty, "d" -> Map("e" -> 1)) + ) + val expectedSizes = variables.map { case (k, _) => + // remove bytes taken up by serialized key name + k -> (serializedSize(variables, k) - 2 - k.length) + } + + KafkaJsonExceptionSerializationSchema + .countVariableLengths(createKafkaExceptionInfo(variables).asJson.hcursor.downField("inputEvent")) + .toMap shouldBe expectedSizes + } + + test("strips longest inputEvent entries") { + val baseContext = Map( + "longVariable" -> "", + "shortVariable1" -> "111", + "shortVariable2" -> "222222", + "shortVariable3" -> "shortVariableValue" + ) + val largeContext = baseContext ++ Map("longVariable" -> "1".repeat(400)) + + val (baseRecord, key) = getSerializedJson(baseContext) + decode[KafkaExceptionInfo](baseRecord).value.inputEvent.get shouldBe baseContext.asJson + + // limit that will remove longVariable, we need to account for the added !warning key + val maxRecord1Length = baseRecord.length + 350 + val (record1, _) = getSerializedJson(largeContext, maxMessageBytes = maxRecord1Length) + val inputEvent1 = decode[KafkaExceptionInfo](record1).value.inputEvent.get.as[Map[String, String]].value + + inputEvent1.removed("!warning").keySet shouldBe largeContext.removed("longVariable").keySet + inputEvent1("!warning") should startWith("inputEvent truncated") + inputEvent1("!warning") should endWith(": longVariable") + record1.length should be <= maxRecord1Length + + // add padding to size limit, variable removal rounds some calculations + val maxRecord2Length = record1.length + key.length + KafkaJsonExceptionSerializationSchema.recordOverhead + 1 + val (record2, _) = getSerializedJson(largeContext, maxMessageBytes = maxRecord2Length) + val inputEvent2 = decode[KafkaExceptionInfo](record2).value.inputEvent.get.as[Map[String, String]].value + + inputEvent2.keySet shouldBe inputEvent1.keySet + record2.length should equal(record1.length) + } + + private def getSerializedJson( + contextVariables: Map[String, Any], + maxMessageBytes: Int = 1048576, + ): (String, String) = { + val serializer = serializerWithMaxMessageLength(maxMessageBytes) + val record = serializer.serialize(createData(contextVariables), timestamp.toEpochMilli) + + (new String(record.value(), StandardCharsets.UTF_8), new String(record.key(), StandardCharsets.UTF_8)) + } + + private def serializerWithMaxMessageLength(maxMessageLength: Int): KafkaJsonExceptionSerializationSchema = { + val config = KafkaExceptionConsumerConfig( + topic = "topic", + maxMessageBytes = maxMessageLength, + includeInputEvent = true + ) + new KafkaJsonExceptionSerializationSchema(MetaData("test", StreamMetaData()), config) + } + + private def createData(contextVariables: Map[String, Any]): NuExceptionInfo[NonTransientException] = { + NuExceptionInfo(None, exception, Context("contextId", contextVariables)) + } + + private def createKafkaExceptionInfo(variables: Map[String, Any]) = KafkaExceptionInfo( + MetaData("test", StreamMetaData()), + createData(variables), + KafkaExceptionConsumerConfig(topic = "topic", includeInputEvent = true) + ) + +} From bd4a310849b98982f139d79c6f4d164f2fdd0968 Mon Sep 17 00:00:00 2001 From: Piotr Przybylski Date: Sun, 6 Oct 2024 23:02:38 +0200 Subject: [PATCH 5/8] make Scala 2.12 happy --- .../exception/KafkaJsonExceptionSerializationSchema.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchema.scala b/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchema.scala index 5771d091944..686753100a2 100644 --- a/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchema.scala +++ b/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchema.scala @@ -49,7 +49,8 @@ object KafkaJsonExceptionSerializationSchema { // |{ // | "inputEvent" : { // | "name": ... - val warningBytes = 2 * indentLength + Utils.utf8Length(s"\"$warningKey\" : ${messageTemplate.asJson.spaces2},\n") + // (note: line below uses '' as quotes because Scala 2.12 can't handle escaped "") + val warningBytes = 2 * indentLength + Utils.utf8Length(s"'$warningKey' : ${messageTemplate.asJson.spaces2},\n") val bytesToCut = valueBytes + warningBytes - maxValueBytes val variablesWithLength = countVariableLengths(value.hcursor.downField("inputEvent")) From a79e6e2b5860e8fcfe2d92d141fa16da1f24b99e Mon Sep 17 00:00:00 2001 From: Piotr Przybylski Date: Sun, 6 Oct 2024 23:33:02 +0200 Subject: [PATCH 6/8] scala 2.12 --- .../engine/kafka/exception/KafkaExceptionConsumerSpec.scala | 2 +- .../exception/KafkaJsonExceptionSerializationSchemaSpec.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSpec.scala b/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSpec.scala index 02d6d5389e0..7ebaa242c62 100644 --- a/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSpec.scala +++ b/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSpec.scala @@ -32,7 +32,7 @@ class KafkaExceptionConsumerSpec val message = runTest(s"testProcess-shortString", stringVariable = "'short string'".spel) val inputEvent = extractInputEventMap(message) - inputEvent.view.filterKeys(_ != "input").toMap shouldBe Map( + (inputEvent - "input") shouldBe Map( "string" -> "short string" ) } diff --git a/utils/kafka-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchemaSpec.scala b/utils/kafka-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchemaSpec.scala index 067a6d13fc6..97596601aee 100644 --- a/utils/kafka-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchemaSpec.scala +++ b/utils/kafka-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchemaSpec.scala @@ -62,7 +62,7 @@ class KafkaJsonExceptionSerializationSchemaSpec extends AnyFunSuite with Matcher val (record1, _) = getSerializedJson(largeContext, maxMessageBytes = maxRecord1Length) val inputEvent1 = decode[KafkaExceptionInfo](record1).value.inputEvent.get.as[Map[String, String]].value - inputEvent1.removed("!warning").keySet shouldBe largeContext.removed("longVariable").keySet + (inputEvent1.keySet - "!warning") shouldBe (largeContext.keySet - "longVariable") inputEvent1("!warning") should startWith("inputEvent truncated") inputEvent1("!warning") should endWith(": longVariable") record1.length should be <= maxRecord1Length From b7af20bd064c5a18acd78582d07baefdaf8d7333 Mon Sep 17 00:00:00 2001 From: Piotr Przybylski Date: Mon, 7 Oct 2024 21:52:50 +0200 Subject: [PATCH 7/8] fix variable name and comment --- .../KafkaJsonExceptionSerializationSchema.scala | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchema.scala b/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchema.scala index 686753100a2..4fa7b2c82c3 100644 --- a/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchema.scala +++ b/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchema.scala @@ -39,6 +39,8 @@ object KafkaJsonExceptionSerializationSchema { return value } + // use a placeholder of the same length as the number of digits in valueBytes, + // because the final value will never be larger than that val removedBytesPlaceholder = "$" * valueBytes.toString.length // text below will increase JSON size, but we are working under the assumption that its size is much smaller // than the size of inputEvent keys that will get removed @@ -81,12 +83,14 @@ object KafkaJsonExceptionSerializationSchema { } private[exception] def countVariableLengths(inputEvent: ACursor): List[(String, Int)] = { - // Each removed key will be moved to a message that enumerates all removed keys, so this: + // Each removed key will be moved to a message that enumerates all removed keys, e.g. when the variable + // 'my_variable_name' is removed from: // | "inputEvent" : { - // | "variable_name" : "value", + // | "my_variable_name" : "value", // | } - // will be transformed into a partial string: `, variable_name` (2-byte separator + variable name). - // Because of that counted size doesn't include `,\n`, as these bytes will be reused. + // it be transformed into a partial string: `, my_variable_name` (2-byte separator + variable name). + // Because of that the length of variable name and the last two characters (`,\n`) aren't added to counted size, + // as the space they take up will get reused. // // Assumption that all values are terminated by `,\n` may make us overestimate a single variable by one byte, // but it's safer to remove more and still fit within given size limit. @@ -108,7 +112,7 @@ object KafkaJsonExceptionSerializationSchema { .toList } - private def calculateKeysToRemove(keysWithLength: List[(String, Int)], bytesToRecover: Int): (Set[String], Int) = { + private def calculateKeysToRemove(keysWithLength: List[(String, Int)], bytesToRemove: Int): (Set[String], Int) = { @tailrec def collectKeysToRemove( allKeys: List[(String, Int)], @@ -116,7 +120,7 @@ object KafkaJsonExceptionSerializationSchema { bytesSoFar: Int ): (Set[String], Int) = { allKeys match { - case (key, bytes) :: tail if bytesSoFar < bytesToRecover => + case (key, bytes) :: tail if bytesSoFar < bytesToRemove => collectKeysToRemove(tail, key :: keysToRemove, bytes + bytesSoFar) case _ => (keysToRemove.toSet, bytesSoFar) From 881c10be53c6bb94190b706fd8d2518179876656 Mon Sep 17 00:00:00 2001 From: Piotr Przybylski Date: Wed, 9 Oct 2024 15:09:49 +0200 Subject: [PATCH 8/8] make variable removal public --- ...afkaJsonExceptionSerializationSchema.scala | 83 ++++++++++--------- ...JsonExceptionSerializationSchemaSpec.scala | 7 +- 2 files changed, 49 insertions(+), 41 deletions(-) diff --git a/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchema.scala b/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchema.scala index 4fa7b2c82c3..59171725b35 100644 --- a/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchema.scala +++ b/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchema.scala @@ -1,6 +1,6 @@ package pl.touk.nussknacker.engine.kafka.exception -import io.circe.{ACursor, Json} +import io.circe.{Json, JsonObject} import io.circe.generic.JsonCodec import io.circe.syntax.EncoderOps import org.apache.kafka.clients.producer.ProducerRecord @@ -28,15 +28,39 @@ object KafkaJsonExceptionSerializationSchema { val valueJson = value.asJson val serializedValue = valueJson.spaces2.getBytes(StandardCharsets.UTF_8) if (value.inputEvent.isDefined && serializedValue.length > maxValueBytes) { - removeInputEventKeys(valueJson, serializedValue.length, maxValueBytes).spaces2.getBytes(StandardCharsets.UTF_8) + valueJson.hcursor + .downField("inputEvent") + .withFocus { inputEventJson => + inputEventJson.asObject + .map { inputEventJsonObject => + val indentLevel = 1 + removeVariablesOverSizeLimit( + inputEventJsonObject, + indentLevel, + serializedValue.length, + maxValueBytes + ).toJson + } + .getOrElse(inputEventJson) + } + .top + .getOrElse(valueJson) + .spaces2 + .getBytes(StandardCharsets.UTF_8) } else { serializedValue } } - private def removeInputEventKeys(value: Json, valueBytes: Int, maxValueBytes: Int): Json = { + // noinspection ScalaWeakerAccess + def removeVariablesOverSizeLimit( + containerObject: JsonObject, + containerIndentLevel: Int, + valueBytes: Int, + maxValueBytes: Int + ): JsonObject = { if (valueBytes <= maxValueBytes) { - return value + return containerObject } // use a placeholder of the same length as the number of digits in valueBytes, @@ -45,44 +69,34 @@ object KafkaJsonExceptionSerializationSchema { // text below will increase JSON size, but we are working under the assumption that its size is much smaller // than the size of inputEvent keys that will get removed val messageTemplate = - s"inputEvent truncated, original error event had $valueBytes bytes, which was more then the max allowed length of $maxValueBytes bytes. " + + s"variables truncated, original object had $valueBytes bytes, which was more then the max allowed length of $maxValueBytes bytes. " + s"Removed variables ($removedBytesPlaceholder bytes)" + val indentBytes = indentLength * (containerIndentLevel + 1) // |{ // | "inputEvent" : { - // | "name": ... + // | "!warning": ... // (note: line below uses '' as quotes because Scala 2.12 can't handle escaped "") - val warningBytes = 2 * indentLength + Utils.utf8Length(s"'$warningKey' : ${messageTemplate.asJson.spaces2},\n") + val warningBytes = indentBytes + Utils.utf8Length(s"'$warningKey' : ${messageTemplate.asJson.spaces2},\n") val bytesToCut = valueBytes + warningBytes - maxValueBytes - val variablesWithLength = countVariableLengths(value.hcursor.downField("inputEvent")) + val variablesWithLength = countVariableLengths(containerObject, indentBytes) val (variablesToRemove, removedBytes) = calculateKeysToRemove(variablesWithLength, bytesToCut) if (removedBytes <= warningBytes) { // this may happen only when doing tests with really low size limits - return value + return containerObject } val finalMessage = messageTemplate.replace(removedBytesPlaceholder, removedBytes.toString) + variablesToRemove.toSeq.sorted.mkString(": ", ", ", "") - value.hcursor - .downField("inputEvent") - .withFocus { json => - json.asObject - .map { inputEventObject => - inputEventObject - .filterKeys(key => !variablesToRemove.contains(key)) - .add(warningKey, finalMessage.asJson) - .toJson - } - .getOrElse(json) - } - .top - .getOrElse(value) + containerObject + .filterKeys(key => !variablesToRemove.contains(key)) + .add(warningKey, finalMessage.asJson) } - private[exception] def countVariableLengths(inputEvent: ACursor): List[(String, Int)] = { + private[exception] def countVariableLengths(json: JsonObject, indentBytes: Int): List[(String, Int)] = { // Each removed key will be moved to a message that enumerates all removed keys, e.g. when the variable // 'my_variable_name' is removed from: // | "inputEvent" : { @@ -94,22 +108,14 @@ object KafkaJsonExceptionSerializationSchema { // // Assumption that all values are terminated by `,\n` may make us overestimate a single variable by one byte, // but it's safer to remove more and still fit within given size limit. - val jsonIndent = 2 * indentLength - val jsonKeyOverhead = jsonIndent + 2 /* key quotes */ + 3 /* ` : ` */ - - inputEvent.keys - .getOrElse(Iterable.empty) - .map { key => - val variableBytes = inputEvent - .get[Json](key) - .map { variable => - val serializedVariable = variable.spaces2 - Utils.utf8Length(serializedVariable) + (jsonIndent * serializedVariable.count(_ == '\n')) - } - .getOrElse(0) + val jsonKeyOverhead = indentBytes + 2 /* key quotes */ + 3 /* ` : ` */ + + json.toList + .map { case (key, value) => + val serializedVariable = value.spaces2 + val variableBytes = Utils.utf8Length(serializedVariable) + (indentBytes * serializedVariable.count(_ == '\n')) key -> (jsonKeyOverhead + variableBytes) } - .toList } private def calculateKeysToRemove(keysWithLength: List[(String, Int)], bytesToRemove: Int): (Set[String], Int) = { @@ -144,6 +150,7 @@ class KafkaJsonExceptionSerializationSchema(metaData: MetaData, consumerConfig: val key = s"${metaData.name}-${exceptionInfo.nodeComponentInfo.map(_.nodeId).getOrElse("")}" .getBytes(StandardCharsets.UTF_8) + val value = KafkaExceptionInfo(metaData, exceptionInfo, consumerConfig) val maxValueBytes = consumerConfig.maxMessageBytes - key.length - recordOverhead val serializedValue = serializeValueWithSizeLimit(value, maxValueBytes) diff --git a/utils/kafka-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchemaSpec.scala b/utils/kafka-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchemaSpec.scala index 97596601aee..5e9b932e71b 100644 --- a/utils/kafka-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchemaSpec.scala +++ b/utils/kafka-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaJsonExceptionSerializationSchemaSpec.scala @@ -1,6 +1,6 @@ package pl.touk.nussknacker.engine.kafka.exception -import io.circe.{Decoder, KeyDecoder} +import io.circe.{Decoder, JsonObject, KeyDecoder} import io.circe.parser.decode import io.circe.syntax.EncoderOps import org.scalatest.EitherValues @@ -40,8 +40,9 @@ class KafkaJsonExceptionSerializationSchemaSpec extends AnyFunSuite with Matcher k -> (serializedSize(variables, k) - 2 - k.length) } + val inputEvent = createKafkaExceptionInfo(variables).asJson.hcursor.downField("inputEvent").as[JsonObject].value KafkaJsonExceptionSerializationSchema - .countVariableLengths(createKafkaExceptionInfo(variables).asJson.hcursor.downField("inputEvent")) + .countVariableLengths(inputEvent, indentBytes = 4) .toMap shouldBe expectedSizes } @@ -63,7 +64,7 @@ class KafkaJsonExceptionSerializationSchemaSpec extends AnyFunSuite with Matcher val inputEvent1 = decode[KafkaExceptionInfo](record1).value.inputEvent.get.as[Map[String, String]].value (inputEvent1.keySet - "!warning") shouldBe (largeContext.keySet - "longVariable") - inputEvent1("!warning") should startWith("inputEvent truncated") + inputEvent1("!warning") should startWith("variables truncated") inputEvent1("!warning") should endWith(": longVariable") record1.length should be <= maxRecord1Length