Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_rdkafka2: Handle some of the exceptions as unrecoverable errors #510

Merged
merged 2 commits into from
Jul 10, 2024

Conversation

cosmo0920
Copy link
Contributor

This is because msg_size_too_large and topic_authorization_failed error codes should be treated as an unrecoverable error. In Fluentd, we need to mark as unrecoverable with raise Fluent::Unrecoverable. But current implementation does not handle them as unrecoverable errors.

@cosmo0920 cosmo0920 force-pushed the some-of-errors-handled-as-unrecoverable-errors branch from d23d2e9 to de38e7d Compare June 25, 2024 08:12
This is because msg_size_too_large and topic_authorization_failed error
codes should be treated as an unrecoverable error.
In Fluentd, we need to mark as unrecoverable with raise
Fluent::Unrecoverable. But current implementation does not handle them
as unrecoverable errors.

Signed-off-by: Hiroshi Hatake <[email protected]>
@cosmo0920 cosmo0920 force-pushed the some-of-errors-handled-as-unrecoverable-errors branch from de38e7d to ce0274b Compare June 25, 2024 08:18
Copy link
Member

@ashie ashie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, I'm checking actual behaviour.

@cosmo0920
Copy link
Contributor Author

FYI: we use flog to generate arbitrary length of logs. In this case, we use flog -b 1024000 -n 1 to create too huge line to consume by out_rdkafka2 plugin.

Copy link

@daipom daipom left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this fix!
I have confirmed that this feature works as intended.

Sample config:

<source>
  @type tcp
  tag test
  <parse>
    @type none
  </parse>
</source>

<match test.**>
  @type rdkafka2
  brokers localhost:9092
  topic fluentd-test
  <format>
    @type json
  </format>
  <inject>
    time_key time
  </inject>
  <buffer []>
    flush_mode immediate
    chunk_limit_size 100M
  </buffer>
</match>

Send 10M characters:

$ echo $(base64 -w0 /dev/urandom | head -c 10000000) | nc localhost 5170

Then, msg_size_too_large error occurs, and it is handled as an unrecoverable error.

2024-07-10 18:58:53 +0900 [warn]: #0 Send exception occurred: Rejected due to Broker: Message size too large (msg_size_too_large) at /home/daipom/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/out_rdkafka2.rb:529:in `rescue in block in enqueue_with_retry'
2024-07-10 18:58:53 +0900 [warn]: #0 got unrecoverable error in primary and no secondary error_class=Fluent::UnrecoverableError error="Rejected due to Broker: Message size too large (msg_size_too_large)"
  2024-07-10 18:58:53 +0900 [warn]: #0 /home/daipom/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/out_rdkafka2.rb:529:in `rescue in block in enqueue_with_retry'
  2024-07-10 18:58:53 +0900 [warn]: #0 /home/daipom/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/out_rdkafka2.rb:484:in `block in enqueue_with_retry'
  2024-07-10 18:58:53 +0900 [warn]: #0 /home/daipom/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/out_rdkafka2.rb:483:in `loop'
  2024-07-10 18:58:53 +0900 [warn]: #0 /home/daipom/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/out_rdkafka2.rb:483:in `enqueue_with_retry'
  2024-07-10 18:58:53 +0900 [warn]: #0 /home/daipom/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/out_rdkafka2.rb:454:in `block in write'
  2024-07-10 18:58:53 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/event.rb:319:in `each'
  2024-07-10 18:58:53 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/event.rb:319:in `block in each'
  2024-07-10 18:58:53 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/buffer/memory_chunk.rb:81:in `open'
  2024-07-10 18:58:53 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/buffer/memory_chunk.rb:81:in `open'
  2024-07-10 18:58:53 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/event.rb:318:in `each'
  2024-07-10 18:58:53 +0900 [warn]: #0 /home/daipom/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/out_rdkafka2.rb:424:in `write'
  2024-07-10 18:58:53 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/output.rb:1225:in `try_flush'
  2024-07-10 18:58:53 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/output.rb:1538:in `flush_thread_run'
  2024-07-10 18:58:53 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/output.rb:510:in `block (2 levels) in start'
  2024-07-10 18:58:53 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
2024-07-10 18:58:53 +0900 [warn]: #0 bad chunk is moved to /tmp/fluent/backup/worker0/object_dc0/61ce1b1a61ef66b93c0d58386e983d46.log

@ashie ashie merged commit 548f0c4 into master Jul 10, 2024
6 of 9 checks passed
@ashie ashie deleted the some-of-errors-handled-as-unrecoverable-errors branch July 10, 2024 23:59
@ashie
Copy link
Member

ashie commented Jul 10, 2024

Thanks!

@ashie ashie added this to the v0.19.3 milestone Jul 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants