Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka exceptionHandler: retry when message is too large, log when writing to Kafka fails #6958

Merged
merged 9 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -892,7 +892,8 @@ lazy val kafkaUtils = (project in utils("kafka-utils"))
name := "nussknacker-kafka-utils",
libraryDependencies ++= {
Seq(
"org.apache.kafka" % "kafka-clients" % kafkaV
"org.apache.kafka" % "kafka-clients" % kafkaV,
"org.scalatest" %% "scalatest" % scalaTestV % Test,
)
}
// Depends on componentsApi because of dependency to NuExceptionInfo and NonTransientException -
Expand Down
1 change: 1 addition & 0 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
* [#6952](https://github.com/TouK/nussknacker/pull/6952) Improvement: TypeInformation support for scala.Option
* [#6840](https://github.com/TouK/nussknacker/pull/6840) Introduce canCastTo, castTo and castToOrNull extension methods in SpeL.
* [#6974](https://github.com/TouK/nussknacker/pull/6974) Add SpeL suggestions for cast methods parameter.
* [#6958](https://github.com/TouK/nussknacker/pull/6958) Add message size limit in the "Kafka" exceptionHandler
* [#6988](https://github.com/TouK/nussknacker/pull/6988) Remove unused API classes: `MultiMap`, `TimestampedEvictableStateFunction`
* [#7000](https://github.com/TouK/nussknacker/pull/7000) Show all possible options for dictionary editor on open.

Expand Down
6 changes: 6 additions & 0 deletions docs/MigrationGuide.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ To see the biggest differences please consult the [changelog](Changelog.md).
* Migration API changes:
* POST `/api/migrate` supports v2 request format (with `scenarioLabels` field)

### Configuration changes

* [#6958](https://github.com/TouK/nussknacker/pull/6958) Added message size limit in the "Kafka" exceptionHandler: `maxMessageBytes`.
Its default value reflects Kafka's default size limit of 1 MB (`max.message.bytes`), you need to increase it if your
error topic allows for larger messages. Remember to add some margin for Kafka protocol overhead (100 bytes should be enough).

### Other changes

* [#6692](https://github.com/TouK/nussknacker/pull/6692) Kryo serializers for `UnmodifiableCollection`, `scala.Product` etc.
Expand Down
18 changes: 9 additions & 9 deletions docs/integration/KafkaIntegration.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,15 +227,15 @@ Errors can be sent to specified Kafka topic in following json format (see below
Following properties can be configured (please look at correct engine page : [Lite](../../configuration/model/Lite#exception-handling) or [Flink](../../configuration/model/Flink#configuring-exception-handling),
to see where they should be set):

| Name | Default value | Description |
|------------------------|---------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| topic | - | Topic where errors will be sent. If the topic does not exist, it will be created (with default settings - for production deployments make sure the config is ok) during deploy. If (e.g. due to ACL settings) the topic cannot be created, the scenarios will fail, in that case, the topic has to be configured manually. |
| stackTraceLengthLimit | 50 | Limit of stacktrace length that will be sent (0 to omit stacktrace at all) |
| includeHost | true | Should name of host where error occurred (e.g. TaskManager in case of Flink) be included. Can be misleading if there are many network interfaces or hostname is improperly configured) |
| includeInputEvent | false | Should input event be serialized (can be large or contain sensitive data so use with care) |
| useSharedProducer | false | For better performance shared Kafka producer can be used (by default it's created and closed for each error), shared Producer is kind of experimental feature and should be used with care |
| additionalParams | {} | Map of fixed parameters that can be added to Kafka message |

| Name | Default value | Description |
|-----------------------|---------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| topic | - | Topic where errors will be sent. If the topic does not exist, it will be created (with default settings - for production deployments make sure the config is ok) during deploy. If (e.g. due to ACL settings) the topic cannot be created, the scenarios will fail, in that case, the topic has to be configured manually. |
| stackTraceLengthLimit | 50 | Limit of stacktrace length that will be sent (0 to omit stacktrace at all) |
| maxMessageBytes | 1048476 | Limit Kafka message length by truncating variables from input event (enabled by `includeInputEvent`), defaults to Kafka's default `max.message.bytes` of 1 MB with 100 bytes of safety margin |
| includeHost | true | Should name of host where error occurred (e.g. TaskManager in case of Flink) be included. Can be misleading if there are many network interfaces or hostname is improperly configured) |
| includeInputEvent | false | Should input event be serialized (can be large or contain sensitive data so use with care) |
| useSharedProducer | false | For better performance shared Kafka producer can be used (by default it's created and closed for each error), shared Producer is kind of experimental feature and should be used with care |
| additionalParams | {} | Map of fixed parameters that can be added to Kafka message |

### Configuration for Flink engine

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package pl.touk.nussknacker.engine.kafka.exception

import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging
import net.ceedubs.ficus.Ficus._
import net.ceedubs.ficus.readers.ArbitraryTypeReader._
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.errors.RecordTooLargeException
import pl.touk.nussknacker.engine.api.MetaData
import pl.touk.nussknacker.engine.api.exception.{NonTransientException, NuExceptionInfo}
import pl.touk.nussknacker.engine.api.runtimecontext.EngineRuntimeContext
Expand All @@ -13,18 +16,20 @@ import pl.touk.nussknacker.engine.kafka.{DefaultProducerCreator, KafkaConfig, Ka
import pl.touk.nussknacker.engine.util.SynchronousExecutionContextAndIORuntime
import pl.touk.nussknacker.engine.util.config.ConfigEnrichments.RichConfig

import scala.concurrent.{ExecutionContext, Future}

class KafkaExceptionConsumerProvider extends FlinkEspExceptionConsumerProvider {

override def create(metaData: MetaData, exceptionHandlerConfig: Config): FlinkEspExceptionConsumer = {
val kafkaConfig = KafkaConfig.parseConfig(exceptionHandlerConfig)
val consumerConfig = exceptionHandlerConfig.rootAs[KafkaExceptionConsumerConfig]
val producerCreator = kafkaProducerCreator(kafkaConfig)
val serializationSchema = createSerializationSchema(metaData, consumerConfig)
val errorTopicInitializer = new KafkaErrorTopicInitializer(kafkaConfig, consumerConfig)
val errorTopicInitializer = new DefaultKafkaErrorTopicInitializer(kafkaConfig, consumerConfig)
if (consumerConfig.useSharedProducer) {
SharedProducerKafkaExceptionConsumer(metaData, serializationSchema, producerCreator, errorTopicInitializer)
} else {
TempProducerKafkaExceptionConsumer(serializationSchema, producerCreator, errorTopicInitializer)
TempProducerKafkaExceptionConsumer(metaData, serializationSchema, producerCreator, errorTopicInitializer)
}
}

Expand All @@ -42,42 +47,88 @@ class KafkaExceptionConsumerProvider extends FlinkEspExceptionConsumerProvider {

}

case class TempProducerKafkaExceptionConsumer(
serializationSchema: KafkaSerializationSchema[NuExceptionInfo[NonTransientException]],
kafkaProducerCreator: KafkaProducerCreator.Binary,
kafkaErrorTopicInitializer: KafkaErrorTopicInitializer
) extends FlinkEspExceptionConsumer {
trait BaseKafkaExceptionConsumer extends FlinkEspExceptionConsumer with LazyLogging {
protected val serializationSchema: KafkaSerializationSchema[NuExceptionInfo[NonTransientException]]
protected val kafkaErrorTopicInitializer: KafkaErrorTopicInitializer
protected val metaData: MetaData

private val topic: String = kafkaErrorTopicInitializer.topicName

protected def sendKafkaMessage(record: ProducerRecord[Array[Byte], Array[Byte]]): Future[_]

override def open(context: EngineRuntimeContext): Unit = {
super.open(context)
kafkaErrorTopicInitializer.init()
}

override def consume(exceptionInfo: NuExceptionInfo[NonTransientException]): Unit = {
KafkaUtils.sendToKafkaWithTempProducer(serializationSchema.serialize(exceptionInfo, System.currentTimeMillis()))(
kafkaProducerCreator
)
override final def consume(exceptionInfo: NuExceptionInfo[NonTransientException]): Unit = {
sendKafkaMessage(serializationSchema.serialize(exceptionInfo, System.currentTimeMillis()))
.recoverWith { case e: RecordTooLargeException =>
// Kafka message size should be kept within acceptable size by serializer, but it may be incorrectly configured.
// We may also encounter this exception due to producer's 'buffer.memory' being set too low.
//
// Try to reduce Kafka message size in hope that it will fit configured limits. It's practically impossible
// to correctly detect and handle this limit preemptively, because:
// * Kafka limits are imposed on compressed message sizes (with headers and protocol overhead)
// * there are limits on multiple levels: producer, topic, broker (access requires additional permissions)

val scenario = metaData.id
val node = exceptionInfo.nodeComponentInfo.map(_.nodeId).getOrElse("-")
val error = exceptionInfo.throwable.message
logger.warn(
s"Cannot write to $topic, retrying with stripped context (scenario: $scenario, node: $node, error: $error)." +
s"Verify your configuration of Kafka producer, error logging and errors topic and set correct limits. ${e.getMessage}"
)

val lightExceptionInfo = exceptionInfo.copy(
context = exceptionInfo.context.copy(
variables = Map(
"!warning" -> s"variables truncated, they didn't fit within max allowed size of a Kafka message: ${e.getMessage}",
),
parentContext = None
)
)

sendKafkaMessage(serializationSchema.serialize(lightExceptionInfo, System.currentTimeMillis()))
}(ExecutionContext.Implicits.global)
.recover { case e: Throwable =>
val scenario = metaData.id
val node = exceptionInfo.nodeComponentInfo.map(_.nodeId).getOrElse("-")
val error = exceptionInfo.throwable.message

logger.warn(
s"Failed to write to $topic (scenario: $scenario, node: $node, error: $error): ${e.getMessage}",
e
)
}(ExecutionContext.Implicits.global)
}

}

case class SharedProducerKafkaExceptionConsumer(
case class TempProducerKafkaExceptionConsumer(
metaData: MetaData,
serializationSchema: KafkaSerializationSchema[NuExceptionInfo[NonTransientException]],
kafkaProducerCreator: KafkaProducerCreator.Binary,
kafkaErrorTopicInitializer: KafkaErrorTopicInitializer
) extends FlinkEspExceptionConsumer
with WithSharedKafkaProducer {
) extends BaseKafkaExceptionConsumer {

override def open(context: EngineRuntimeContext): Unit = {
super.open(context)
kafkaErrorTopicInitializer.init()
override protected def sendKafkaMessage(record: ProducerRecord[Array[Byte], Array[Byte]]): Future[_] = {
KafkaUtils
.sendToKafkaWithTempProducer(record)(kafkaProducerCreator)
}

override def consume(exceptionInfo: NuExceptionInfo[NonTransientException]): Unit = {
sendToKafka(serializationSchema.serialize(exceptionInfo, System.currentTimeMillis()))(
SynchronousExecutionContextAndIORuntime.syncEc
)
}

case class SharedProducerKafkaExceptionConsumer(
metaData: MetaData,
serializationSchema: KafkaSerializationSchema[NuExceptionInfo[NonTransientException]],
kafkaProducerCreator: KafkaProducerCreator.Binary,
kafkaErrorTopicInitializer: KafkaErrorTopicInitializer
) extends BaseKafkaExceptionConsumer
with WithSharedKafkaProducer {

override protected def sendKafkaMessage(record: ProducerRecord[Array[Byte], Array[Byte]]): Future[_] = {
sendToKafka(record)(SynchronousExecutionContextAndIORuntime.syncEc)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,13 @@ class KafkaExceptionConsumerSerializationSpec extends AnyFunSuite with Matchers

private val serializationSchema = new KafkaJsonExceptionSerializationSchema(metaData, consumerConfig)

// null as we don't test open here...
private val consumer =
TempProducerKafkaExceptionConsumer(serializationSchema, MockProducerCreator(mockProducer), null)
TempProducerKafkaExceptionConsumer(
metaData,
serializationSchema,
MockProducerCreator(mockProducer),
NoopKafkaErrorTopicInitializer
)

test("records event") {
consumer.consume(exception)
Expand Down
Loading
Loading