Skip to content

Commit

Permalink
better error message, remove warn log
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrp committed Sep 27, 2024
1 parent 83e1418 commit 565df12
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,18 @@ trait BaseKafkaExceptionConsumer extends FlinkEspExceptionConsumer with LazyLogg
override final def consume(exceptionInfo: NuExceptionInfo[NonTransientException]): Unit = {
sendKafkaMessage(serializationSchema.serialize(exceptionInfo, System.currentTimeMillis()))
.recoverWith { case e: RecordTooLargeException =>
val scenario = metaData.id
val node = exceptionInfo.nodeComponentInfo.map(_.nodeId).getOrElse("-")
val error = exceptionInfo.throwable.message
logger.warn(
s"Cannot write to $topic, retrying with stripped context (scenario: $scenario, node: $node, error: $error). ${e.getMessage}"
)

// Try to reduce Kafka message size in hope that it will fit configured limits. It's practically impossible
// to correctly detect and handle this limit preemptively, because:
// * Kafka limits are imposed on compressed message sizes
// * there are limits on multiple levels: producer, topic, broker (access requires additional permissions)
val lightExceptionInfo = exceptionInfo.copy(
context = exceptionInfo.context.copy(variables = Map.empty, parentContext = None)
context = exceptionInfo.context.copy(
variables = Map(
"warning" -> s"variables truncated, they didn't fit within max allowed size of a Kafka message",
"truncationReason" -> e.getMessage
),
parentContext = None
)
)

sendKafkaMessage(serializationSchema.serialize(lightExceptionInfo, System.currentTimeMillis()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class KafkaExceptionConsumerSpec extends AnyFunSuite with OptionValues with Flin
val message = runTest(s"testProcess-shortString", stringVariable = "'short string'".spel)

val inputEvent = message.inputEvent.value
inputEvent.asObject.value.filterKeys(_ == "string").asJson shouldBe Json.obj(
inputEvent.asObject.value.filterKeys(_ != "input").asJson shouldBe Json.obj(
"string" -> "short string".asJson
)
}
Expand All @@ -38,7 +38,10 @@ class KafkaExceptionConsumerSpec extends AnyFunSuite with OptionValues with Flin
val message =
runTest("testProcess-longString", stringVariable = ("'xxxxxxxx'" + ".replaceAll('x', 'xxxxxxxx')".repeat(6)).spel)

message.inputEvent.value shouldBe Json.obj()
val inputEvent = message.inputEvent.value
inputEvent.asObject.value.filterKeys(k => k != "input" && k != "truncationReason").asJson shouldBe Json.obj(
"warning" -> "variables truncated, they didn't fit within max allowed size of a Kafka message".asJson
)
}

private def runTest(scenarioName: String, stringVariable: Expression): KafkaExceptionInfo = {
Expand Down

0 comments on commit 565df12

Please sign in to comment.