diff --git a/CHANGELOG.md b/CHANGELOG.md index b29c51d8..efb19c57 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## 0.10.4 (Unreleased) - **[Breaking]** Drop Ruby `3.0` support according to the EOL schedule. +- [Enhancement] Extract producers tracking `sync_threshold` into an internal config - [Fix] Toggle menu button post-turbo refresh stops working. ## 0.10.3 (2024-09-17) diff --git a/lib/karafka/web/config.rb b/lib/karafka/web/config.rb index 40e9d52e..ef818587 100644 --- a/lib/karafka/web/config.rb +++ b/lib/karafka/web/config.rb @@ -76,7 +76,7 @@ class Config # Reports the metrics collected in the consumer sampler setting :reporter, default: Tracking::Consumers::Reporter.new - # Minimum number of messages to produce to produce them in sync mode + # Minimum number of messages to produce them in sync mode # This acts as a small back-off not to overload the system in case we would have # extremely big number of errors and reports happening setting :sync_threshold, default: 50 @@ -98,6 +98,11 @@ class Config end setting :producers do + # Minimum number of messages to produce them in sync mode + # This acts as a small back-off not to overload the system in case we would have + # extremely big number of errors happening + setting :sync_threshold, default: 25 + # Reports the metrics collected in the producer sampler setting :reporter, default: Tracking::Producers::Reporter.new diff --git a/lib/karafka/web/contracts/config.rb b/lib/karafka/web/contracts/config.rb index 8fb26c5c..bb19990a 100644 --- a/lib/karafka/web/contracts/config.rb +++ b/lib/karafka/web/contracts/config.rb @@ -41,6 +41,7 @@ class Config < Web::Contracts::Base required(:reporter) { |val| !val.nil? } required(:sampler) { |val| !val.nil? } required(:listeners) { |val| val.is_a?(Array) } + required(:sync_threshold) { |val| val.is_a?(Integer) && val.positive? } end end diff --git a/lib/karafka/web/tracking/producers/reporter.rb b/lib/karafka/web/tracking/producers/reporter.rb index 131e2d3a..0bc3bc9b 100644 --- a/lib/karafka/web/tracking/producers/reporter.rb +++ b/lib/karafka/web/tracking/producers/reporter.rb @@ -10,13 +10,6 @@ module Producers # because there is no expectation on immediate status updates for producers and their # dispatch flow is always periodic based. class Reporter < Tracking::Reporter - # Minimum number of messages to produce to produce them in sync mode - # This acts as a small back-off not to overload the system in case we would have - # extremely big number of errors happening - PRODUCE_SYNC_THRESHOLD = 25 - - private_constant :PRODUCE_SYNC_THRESHOLD - # This mutex is shared between tracker and samplers so there is no case where metrics # would be collected same time tracker reports MUTEX = Mutex.new @@ -82,7 +75,7 @@ def sampler # normal operations we should not have that many messages to dispatch and it should not # slowdown any processing. def produce(messages) - if messages.count >= PRODUCE_SYNC_THRESHOLD + if messages.count >= ::Karafka::Web.config.tracking.producers.sync_threshold ::Karafka::Web.producer.produce_many_sync(messages) else ::Karafka::Web.producer.produce_many_async(messages) diff --git a/spec/lib/karafka/web/contracts/config_spec.rb b/spec/lib/karafka/web/contracts/config_spec.rb index 7c1d2e01..983382e5 100644 --- a/spec/lib/karafka/web/contracts/config_spec.rb +++ b/spec/lib/karafka/web/contracts/config_spec.rb @@ -28,7 +28,8 @@ producers: { reporter: Object.new, sampler: Object.new, - listeners: [] + listeners: [], + sync_threshold: 10 } }, processing: { @@ -119,6 +120,18 @@ it { expect(contract.call(params)).not_to be_success } end + context 'when producers sync_threshold is less than 0' do + before { params[:tracking][:producers][:sync_threshold] = -1 } + + it { expect(contract.call(params)).not_to be_success } + end + + context 'when producers sync_threshold is not an integer' do + before { params[:tracking][:producers][:sync_threshold] = 1.1 } + + it { expect(contract.call(params)).not_to be_success } + end + %i[consumers producers].each do |entity| context "when checking #{entity} scoped data" do %i[reporter sampler].each do |field|