diff --git a/README.md b/README.md index e4ad29d..743ef9c 100644 --- a/README.md +++ b/README.md @@ -558,6 +558,8 @@ You need to install rdkafka gem. # load of both Fluentd and Kafka when excessive messages are attempted # to send. Default is no limit. max_enqueue_bytes_per_second (integer) :default => nil + unrecoverable_error_codes (array) :default => ["topic_authorization_failed", "msg_size_too_large"] + `rdkafka2` supports `discard_kafka_delivery_failed_regex` parameter: diff --git a/lib/fluent/plugin/out_rdkafka2.rb b/lib/fluent/plugin/out_rdkafka2.rb index 0fc18e7..1703f78 100644 --- a/lib/fluent/plugin/out_rdkafka2.rb +++ b/lib/fluent/plugin/out_rdkafka2.rb @@ -126,6 +126,8 @@ class Fluent::Rdkafka2Output < Output config_param :max_enqueue_bytes_per_second, :size, :default => nil, :desc => 'The maximum number of enqueueing bytes per second' config_param :service_name, :string, :default => nil, :desc => 'Used for sasl.kerberos.service.name' + config_param :unrecoverable_error_codes, :array, :default => ["topic_authorization_failed", "msg_size_too_large"], + :desc => 'Handle some of the error codes should be unrecoverable if specified' config_section :buffer do config_set_default :chunk_keys, ["topic"] @@ -522,7 +524,12 @@ def enqueue_with_retry(producer, topic, record_buf, message_key, partition, head raise e else - raise e + if unrecoverable_error_codes.include?(e.code.to_s) + # some of the errors should be handled as an unrecoverable error + raise Fluent::UnrecoverableError, "Rejected due to #{e}" + else + raise e + end end end end