-
Notifications
You must be signed in to change notification settings - Fork 180
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
out_rdkafka2: Handle some of the exceptions as unrecoverable errors #510
Conversation
d23d2e9
to
de38e7d
Compare
This is because msg_size_too_large and topic_authorization_failed error codes should be treated as an unrecoverable error. In Fluentd, we need to mark as unrecoverable with raise Fluent::Unrecoverable. But current implementation does not handle them as unrecoverable errors. Signed-off-by: Hiroshi Hatake <[email protected]>
Signed-off-by: Hiroshi Hatake <[email protected]>
de38e7d
to
ce0274b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, I'm checking actual behaviour.
FYI: we use flog to generate arbitrary length of logs. In this case, we use |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this fix!
I have confirmed that this feature works as intended.
Sample config:
<source>
@type tcp
tag test
<parse>
@type none
</parse>
</source>
<match test.**>
@type rdkafka2
brokers localhost:9092
topic fluentd-test
<format>
@type json
</format>
<inject>
time_key time
</inject>
<buffer []>
flush_mode immediate
chunk_limit_size 100M
</buffer>
</match>
Send 10M characters:
$ echo $(base64 -w0 /dev/urandom | head -c 10000000) | nc localhost 5170
Then, msg_size_too_large
error occurs, and it is handled as an unrecoverable error.
2024-07-10 18:58:53 +0900 [warn]: #0 Send exception occurred: Rejected due to Broker: Message size too large (msg_size_too_large) at /home/daipom/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/out_rdkafka2.rb:529:in `rescue in block in enqueue_with_retry'
2024-07-10 18:58:53 +0900 [warn]: #0 got unrecoverable error in primary and no secondary error_class=Fluent::UnrecoverableError error="Rejected due to Broker: Message size too large (msg_size_too_large)"
2024-07-10 18:58:53 +0900 [warn]: #0 /home/daipom/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/out_rdkafka2.rb:529:in `rescue in block in enqueue_with_retry'
2024-07-10 18:58:53 +0900 [warn]: #0 /home/daipom/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/out_rdkafka2.rb:484:in `block in enqueue_with_retry'
2024-07-10 18:58:53 +0900 [warn]: #0 /home/daipom/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/out_rdkafka2.rb:483:in `loop'
2024-07-10 18:58:53 +0900 [warn]: #0 /home/daipom/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/out_rdkafka2.rb:483:in `enqueue_with_retry'
2024-07-10 18:58:53 +0900 [warn]: #0 /home/daipom/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/out_rdkafka2.rb:454:in `block in write'
2024-07-10 18:58:53 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/event.rb:319:in `each'
2024-07-10 18:58:53 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/event.rb:319:in `block in each'
2024-07-10 18:58:53 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/buffer/memory_chunk.rb:81:in `open'
2024-07-10 18:58:53 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/buffer/memory_chunk.rb:81:in `open'
2024-07-10 18:58:53 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/event.rb:318:in `each'
2024-07-10 18:58:53 +0900 [warn]: #0 /home/daipom/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/out_rdkafka2.rb:424:in `write'
2024-07-10 18:58:53 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/output.rb:1225:in `try_flush'
2024-07-10 18:58:53 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/output.rb:1538:in `flush_thread_run'
2024-07-10 18:58:53 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/output.rb:510:in `block (2 levels) in start'
2024-07-10 18:58:53 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
2024-07-10 18:58:53 +0900 [warn]: #0 bad chunk is moved to /tmp/fluent/backup/worker0/object_dc0/61ce1b1a61ef66b93c0d58386e983d46.log
Thanks! |
This is because msg_size_too_large and topic_authorization_failed error codes should be treated as an unrecoverable error. In Fluentd, we need to mark as unrecoverable with raise Fluent::Unrecoverable. But current implementation does not handle them as unrecoverable errors.