Skip to content

Commit

Permalink
Kafka exceptionHandler: retry when message is too large, log when wri…
Browse files Browse the repository at this point in the history
…ting to Kafka fails
  • Loading branch information
piotrp committed Sep 26, 2024
1 parent a2e3279 commit 31b999c
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 39 deletions.
2 changes: 2 additions & 0 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
* Scenario Activity API implementation
* [#6925](https://github.com/TouK/nussknacker/pull/6925) Fix situation when preset labels were presented as `null` when node didn't pass the validation.
* [#6935](https://github.com/TouK/nussknacker/pull/6935) Spel: Scenario labels added to meta variable - `#meta.scenarioLabels`
* [#6958](https://github.com/TouK/nussknacker/pull/6958) Try to handle large contexts in the "Kafka" exceptionHandler,
log a warning when writing to Kafka fails

## 1.17

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package pl.touk.nussknacker.engine.kafka.exception

import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging
import net.ceedubs.ficus.Ficus._
import net.ceedubs.ficus.readers.ArbitraryTypeReader._
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.errors.RecordTooLargeException
import pl.touk.nussknacker.engine.api.MetaData
import pl.touk.nussknacker.engine.api.exception.{NonTransientException, NuExceptionInfo}
import pl.touk.nussknacker.engine.api.runtimecontext.EngineRuntimeContext
Expand All @@ -13,6 +16,8 @@ import pl.touk.nussknacker.engine.kafka.{DefaultProducerCreator, KafkaConfig, Ka
import pl.touk.nussknacker.engine.util.SynchronousExecutionContextAndIORuntime
import pl.touk.nussknacker.engine.util.config.ConfigEnrichments.RichConfig

import scala.concurrent.{ExecutionContext, Future}

class KafkaExceptionConsumerProvider extends FlinkEspExceptionConsumerProvider {

override def create(metaData: MetaData, exceptionHandlerConfig: Config): FlinkEspExceptionConsumer = {
Expand All @@ -24,7 +29,7 @@ class KafkaExceptionConsumerProvider extends FlinkEspExceptionConsumerProvider {
if (consumerConfig.useSharedProducer) {
SharedProducerKafkaExceptionConsumer(metaData, serializationSchema, producerCreator, errorTopicInitializer)
} else {
TempProducerKafkaExceptionConsumer(serializationSchema, producerCreator, errorTopicInitializer)
TempProducerKafkaExceptionConsumer(metaData, serializationSchema, producerCreator, errorTopicInitializer)
}
}

Expand All @@ -42,21 +47,62 @@ class KafkaExceptionConsumerProvider extends FlinkEspExceptionConsumerProvider {

}

case class TempProducerKafkaExceptionConsumer(
serializationSchema: KafkaSerializationSchema[NuExceptionInfo[NonTransientException]],
kafkaProducerCreator: KafkaProducerCreator.Binary,
kafkaErrorTopicInitializer: KafkaErrorTopicInitializer
) extends FlinkEspExceptionConsumer {
trait BaseKafkaExceptionConsumer extends FlinkEspExceptionConsumer with LazyLogging {
protected val topic: String
protected val serializationSchema: KafkaSerializationSchema[NuExceptionInfo[NonTransientException]]
protected val kafkaErrorTopicInitializer: KafkaErrorTopicInitializer
protected val metaData: MetaData

protected def sendKafkaMessage(record: ProducerRecord[Array[Byte], Array[Byte]]): Future[Unit]

override def open(context: EngineRuntimeContext): Unit = {
super.open(context)
kafkaErrorTopicInitializer.init()
}

override def consume(exceptionInfo: NuExceptionInfo[NonTransientException]): Unit = {
KafkaUtils.sendToKafkaWithTempProducer(serializationSchema.serialize(exceptionInfo, System.currentTimeMillis()))(
kafkaProducerCreator
)
override final def consume(exceptionInfo: NuExceptionInfo[NonTransientException]): Unit = {
sendKafkaMessage(serializationSchema.serialize(exceptionInfo, System.currentTimeMillis()))
.recoverWith { case e: RecordTooLargeException =>
val scenario = metaData.id
val node = exceptionInfo.nodeComponentInfo.map(_.nodeId).getOrElse("-")
val error = exceptionInfo.throwable.message
logger.warn(
s"Cannot write to $topic, retrying with stripped context (scenario: $scenario, node: $node, error: $error). ${e.getMessage}"
)

val lightExceptionInfo = exceptionInfo.copy(
context = exceptionInfo.context.copy(variables = Map.empty, parentContext = None)
)

sendKafkaMessage(serializationSchema.serialize(lightExceptionInfo, System.currentTimeMillis()))
}(ExecutionContext.Implicits.global)
.recover { case e: Throwable =>
val scenario = metaData.id
val node = exceptionInfo.nodeComponentInfo.map(_.nodeId).getOrElse("-")
val error = exceptionInfo.throwable.message

logger.warn(
s"Failed to write to $topic (scenario: $scenario, node: $node, error: $error): ${e.getMessage}",
e
)
}(ExecutionContext.Implicits.global)
}

}

case class TempProducerKafkaExceptionConsumer(
metaData: MetaData,
serializationSchema: KafkaSerializationSchema[NuExceptionInfo[NonTransientException]],
kafkaProducerCreator: KafkaProducerCreator.Binary,
kafkaErrorTopicInitializer: KafkaErrorTopicInitializer
) extends BaseKafkaExceptionConsumer {

override protected val topic: String = kafkaErrorTopicInitializer.exceptionHandlerConfig.topic

Check failure on line 100 in engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumer.scala

View workflow job for this annotation

GitHub Actions / REPORT-BackendTests-2.13

pl.touk.nussknacker.engine.kafka.exception.KafkaExceptionConsumerSerializationSpec ► (It is not a test it is a sbt.testing.SuiteSelector)

Failed test found in: engine/flink/kafka-components-utils/target/test-reports/TEST-pl.touk.nussknacker.engine.kafka.exception.KafkaExceptionConsumerSerializationSpec.xml Error: java.lang.NullPointerException
Raw output
java.lang.NullPointerException
	at pl.touk.nussknacker.engine.kafka.exception.TempProducerKafkaExceptionConsumer.<init>(KafkaExceptionConsumer.scala:100)
	at pl.touk.nussknacker.engine.kafka.exception.KafkaExceptionConsumerSerializationSpec.<init>(KafkaExceptionConsumerSerializationSpec.scala:47)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
	at java.base/java.lang.Class.newInstance(Class.java:584)
	at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:454)
	at sbt.TestRunner.runTest$1(TestFramework.scala:153)
	at sbt.TestRunner.run(TestFramework.scala:168)
	at sbt.TestFramework$$anon$3$$anonfun$$lessinit$greater$1.$anonfun$apply$1(TestFramework.scala:336)
	at sbt.TestFramework$.sbt$TestFramework$$withContextLoader(TestFramework.scala:296)
	at sbt.TestFramework$$anon$3$$anonfun$$lessinit$greater$1.apply(TestFramework.scala:336)
	at sbt.TestFramework$$anon$3$$anonfun$$lessinit$greater$1.apply(TestFramework.scala:336)
	at sbt.TestFunction.apply(TestFramework.scala:348)
	at sbt.Tests$.processRunnable$1(Tests.scala:475)
	at sbt.Tests$.$anonfun$makeSerial$1(Tests.scala:481)
	at sbt.std.Transform$$anon$3.$anonfun$apply$2(Transform.scala:47)
	at sbt.std.Transform$$anon$4.work(Transform.scala:69)
	at sbt.Execute.$anonfun$submit$2(Execute.scala:283)
	at sbt.internal.util.ErrorHandling$.wideConvert(ErrorHandling.scala:24)
	at sbt.Execute.work(Execute.scala:292)
	at sbt.Execute.$anonfun$submit$1(Execute.scala:283)
	at sbt.ConcurrentRestrictions$$anon$4.$anonfun$submitValid$1(ConcurrentRestrictions.scala:265)
	at sbt.CompletionService$$anon$2.call(CompletionService.scala:65)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

override protected def sendKafkaMessage(record: ProducerRecord[Array[Byte], Array[Byte]]): Future[Unit] = {
KafkaUtils
.sendToKafkaWithTempProducer(record)(kafkaProducerCreator)
.map(_ => ())(ExecutionContext.global)
}

}
Expand All @@ -66,18 +112,13 @@ case class SharedProducerKafkaExceptionConsumer(
serializationSchema: KafkaSerializationSchema[NuExceptionInfo[NonTransientException]],
kafkaProducerCreator: KafkaProducerCreator.Binary,
kafkaErrorTopicInitializer: KafkaErrorTopicInitializer
) extends FlinkEspExceptionConsumer
) extends BaseKafkaExceptionConsumer
with WithSharedKafkaProducer {

override def open(context: EngineRuntimeContext): Unit = {
super.open(context)
kafkaErrorTopicInitializer.init()
}
override protected val topic: String = kafkaErrorTopicInitializer.exceptionHandlerConfig.topic

override def consume(exceptionInfo: NuExceptionInfo[NonTransientException]): Unit = {
sendToKafka(serializationSchema.serialize(exceptionInfo, System.currentTimeMillis()))(
SynchronousExecutionContextAndIORuntime.syncEc
)
override protected def sendKafkaMessage(record: ProducerRecord[Array[Byte], Array[Byte]]): Future[Unit] = {
sendToKafka(record)(SynchronousExecutionContextAndIORuntime.syncEc)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class KafkaExceptionConsumerSerializationSpec extends AnyFunSuite with Matchers

// null as we don't test open here...
private val consumer =
TempProducerKafkaExceptionConsumer(serializationSchema, MockProducerCreator(mockProducer), null)
TempProducerKafkaExceptionConsumer(metaData, serializationSchema, MockProducerCreator(mockProducer), null)

test("records event") {
consumer.consume(exception)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package pl.touk.nussknacker.engine.kafka.exception

import com.typesafe.config.ConfigValueFactory.fromAnyRef
import io.circe.Json
import io.circe.syntax.EncoderOps
import org.scalatest.OptionValues
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import pl.touk.nussknacker.engine.ModelData
import pl.touk.nussknacker.engine.api.component.ComponentDefinition
import pl.touk.nussknacker.engine.api.process.SinkFactory
import pl.touk.nussknacker.engine.build.ScenarioBuilder
import pl.touk.nussknacker.engine.flink.test.FlinkSpec
import pl.touk.nussknacker.engine.graph.expression.Expression
import pl.touk.nussknacker.engine.kafka.KafkaSpec
import pl.touk.nussknacker.engine.process.helpers.SampleNodes
import pl.touk.nussknacker.engine.process.helpers.SampleNodes.SimpleRecord
Expand All @@ -17,22 +20,38 @@ import pl.touk.nussknacker.engine.testing.LocalModelData

import java.util.Date

class KafkaExceptionConsumerSpec extends AnyFunSuite with FlinkSpec with KafkaSpec with Matchers {
class KafkaExceptionConsumerSpec extends AnyFunSuite with OptionValues with FlinkSpec with KafkaSpec with Matchers {

import pl.touk.nussknacker.engine.kafka.KafkaTestUtils.richConsumer
private val topicName = "testingErrors"

protected var modelData: ModelData = _
test("should record errors on topic") {
val message = runTest(s"testProcess-shortString", stringVariable = "'short string'".spel)

val inputEvent = message.inputEvent.value
inputEvent.asObject.value.filterKeys(_ == "string").asJson shouldBe Json.obj(
"string" -> "short string".asJson
)
}

test("should record errors on topic - strips context from too large error input") {
// long string variable: 8^7 = 2097152 = 2 MB
val message =
runTest("testProcess-longString", stringVariable = ("'xxxxxxxx'" + ".replaceAll('x', 'xxxxxxxx')".repeat(6)).spel)

message.inputEvent.value shouldBe Json.obj()
}

private def runTest(scenarioName: String, stringVariable: Expression): KafkaExceptionInfo = {
val topicName = s"$scenarioName.errors"

protected override def beforeAll(): Unit = {
super.beforeAll()
val configWithExceptionHandler = config
.withValue("exceptionHandler.type", fromAnyRef("Kafka"))
.withValue("exceptionHandler.topic", fromAnyRef(topicName))
.withValue("exceptionHandler.includeInputEvent", fromAnyRef(true))
.withValue("exceptionHandler.additionalParams.configurableKey", fromAnyRef("sampleValue"))
.withValue("exceptionHandler.kafka", config.getConfig("kafka").root())

modelData = LocalModelData(
val modelData = LocalModelData(
configWithExceptionHandler,
List(
ComponentDefinition(
Expand All @@ -42,28 +61,32 @@ class KafkaExceptionConsumerSpec extends AnyFunSuite with FlinkSpec with KafkaSp
ComponentDefinition("sink", SinkFactory.noParam(SampleNodes.MonitorEmptySink))
)
)
}

test("should record errors on topic") {
val process = ScenarioBuilder
.streaming("testProcess")
.streaming(scenarioName)
.source("source", "source")
.buildSimpleVariable("string", "string", stringVariable)
.filter("shouldFail", "1/{0, 1}[0] != 10".spel)
.emptySink("end", "sink")

val env = flinkMiniCluster.createExecutionEnvironment()
UnitTestsFlinkRunner.registerInEnvironmentWithModel(env, modelData)(process)
env.withJobRunning(process.name.value) {
val message = env.withJobRunning(process.name.value) {
val consumed = kafkaClient.createConsumer().consumeWithJson[KafkaExceptionInfo](topicName).take(1).head
consumed.key() shouldBe "testProcess-shouldFail"

consumed.message().nodeId shouldBe Some("shouldFail")
consumed.message().processName.value shouldBe "testProcess"
consumed.message().message shouldBe Some("Expression [1/{0, 1}[0] != 10] evaluation failed, message: / by zero")
consumed.message().exceptionInput shouldBe Some("1/{0, 1}[0] != 10")
consumed.message().additionalData shouldBe Map("configurableKey" -> "sampleValue")
consumed.key() shouldBe s"$scenarioName-shouldFail"

consumed.message()
}

message.processName.value shouldBe scenarioName
message.nodeId shouldBe Some("shouldFail")
message.message shouldBe Some("Expression [1/{0, 1}[0] != 10] evaluation failed, message: / by zero")
message.exceptionInput shouldBe Some("1/{0, 1}[0] != 10")
message.stackTrace.value should include("evaluation failed, message:")
message.additionalData shouldBe Map("configurableKey" -> "sampleValue")

message
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import java.{lang, util}
import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal

class KafkaErrorTopicInitializer(kafkaConfig: KafkaConfig, exceptionHandlerConfig: KafkaExceptionConsumerConfig)
class KafkaErrorTopicInitializer(kafkaConfig: KafkaConfig, val exceptionHandlerConfig: KafkaExceptionConsumerConfig)
extends LazyLogging {

private val timeoutSeconds = 5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class KafkaJsonExceptionSerializationSchema(metaData: MetaData, consumerConfig:
.getBytes(StandardCharsets.UTF_8)
val value = KafkaExceptionInfo(metaData, exceptionInfo, consumerConfig)
val serializedValue = value.asJson.spaces2.getBytes(StandardCharsets.UTF_8)
new ProducerRecord(consumerConfig.topic, key, serializedValue)
new ProducerRecord(consumerConfig.topic, null, timestamp, key, serializedValue)
}

}
Expand Down

0 comments on commit 31b999c

Please sign in to comment.