From fda9487947a349dacb38e1aefe7e238824f7bd13 Mon Sep 17 00:00:00 2001 From: Maciej Mensfeld Date: Mon, 28 Aug 2023 22:31:28 +0200 Subject: [PATCH] Use Cleaner API for Pro procesing (#98) --- Gemfile.lock | 8 +++---- karafka-web.gemspec | 2 +- lib/karafka/web/processing/consumer.rb | 31 ++++++++++++++------------ 3 files changed, 22 insertions(+), 19 deletions(-) diff --git a/Gemfile.lock b/Gemfile.lock index 607ebbb3..288accfe 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -3,7 +3,7 @@ PATH specs: karafka-web (0.7.0) erubi (~> 1.4) - karafka (>= 2.1.8, < 3.0.0) + karafka (>= 2.1.13, < 3.0.0) karafka-core (>= 2.0.13, < 3.0.0) roda (~> 3.68, >= 3.69) tilt (~> 2.0) @@ -26,10 +26,10 @@ GEM ffi (1.15.5) i18n (1.14.1) concurrent-ruby (~> 1.0) - karafka (2.1.8) + karafka (2.1.13) karafka-core (>= 2.1.1, < 2.2.0) thor (>= 0.20) - waterdrop (>= 2.6.2, < 3.0.0) + waterdrop (>= 2.6.6, < 3.0.0) zeitwerk (~> 2.3) karafka-core (2.1.1) concurrent-ruby (>= 1.1) @@ -72,7 +72,7 @@ GEM tilt (2.2.0) tzinfo (2.0.6) concurrent-ruby (~> 1.0) - waterdrop (2.6.5) + waterdrop (2.6.6) karafka-core (>= 2.1.1, < 3.0.0) zeitwerk (~> 2.3) webrick (1.8.1) diff --git a/karafka-web.gemspec b/karafka-web.gemspec index 42d0e3c5..50f60a6a 100644 --- a/karafka-web.gemspec +++ b/karafka-web.gemspec @@ -17,7 +17,7 @@ Gem::Specification.new do |spec| spec.licenses = %w[LGPL-3.0 Commercial] spec.add_dependency 'erubi', '~> 1.4' - spec.add_dependency 'karafka', '>= 2.1.8', '< 3.0.0' + spec.add_dependency 'karafka', '>= 2.1.13', '< 3.0.0' spec.add_dependency 'karafka-core', '>= 2.0.13', '< 3.0.0' spec.add_dependency 'roda', '~> 3.68', '>= 3.69' spec.add_dependency 'tilt', '~> 2.0' diff --git a/lib/karafka/web/processing/consumer.rb b/lib/karafka/web/processing/consumer.rb index 536d0e5a..8d823f59 100644 --- a/lib/karafka/web/processing/consumer.rb +++ b/lib/karafka/web/processing/consumer.rb @@ -33,25 +33,28 @@ def consume consumers_messages = messages.select { |message| message.payload[:type] == 'consumer' } # If there is even one incompatible message, we need to stop - if consumers_messages.all? { |message| @schema_manager.compatible?(message) } - consumers_messages.each do |message| - # We need to run the aggregations on each message in order to compensate for - # potential lags. - @state_aggregator.add(message.payload, message.offset) - @metrics_aggregator.add_report(message.payload) - @metrics_aggregator.add_stats(@state_aggregator.stats) + consumers_messages.each do |message| + unless @schema_manager.compatible?(message) + dispatch + + raise ::Karafka::Web::Errors::Processing::IncompatibleSchemaError end - return unless periodic_flush? + # We need to run the aggregations on each message in order to compensate for + # potential lags. + @state_aggregator.add(message.payload, message.offset) + @metrics_aggregator.add_report(message.payload) + @metrics_aggregator.add_stats(@state_aggregator.stats) + + # Optimize memory usage in pro + message.clean! if Karafka.pro? + end - dispatch + return unless periodic_flush? - mark_as_consumed(messages.last) - else - dispatch + dispatch - raise ::Karafka::Web::Errors::Processing::IncompatibleSchemaError - end + mark_as_consumed(messages.last) end # Flush final state on shutdown