Skip to content

Commit

Permalink
Use Cleaner API for Pro procesing (#98)
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld authored Aug 28, 2023
1 parent 4f53e56 commit fda9487
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 19 deletions.
8 changes: 4 additions & 4 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ PATH
specs:
karafka-web (0.7.0)
erubi (~> 1.4)
karafka (>= 2.1.8, < 3.0.0)
karafka (>= 2.1.13, < 3.0.0)
karafka-core (>= 2.0.13, < 3.0.0)
roda (~> 3.68, >= 3.69)
tilt (~> 2.0)
Expand All @@ -26,10 +26,10 @@ GEM
ffi (1.15.5)
i18n (1.14.1)
concurrent-ruby (~> 1.0)
karafka (2.1.8)
karafka (2.1.13)
karafka-core (>= 2.1.1, < 2.2.0)
thor (>= 0.20)
waterdrop (>= 2.6.2, < 3.0.0)
waterdrop (>= 2.6.6, < 3.0.0)
zeitwerk (~> 2.3)
karafka-core (2.1.1)
concurrent-ruby (>= 1.1)
Expand Down Expand Up @@ -72,7 +72,7 @@ GEM
tilt (2.2.0)
tzinfo (2.0.6)
concurrent-ruby (~> 1.0)
waterdrop (2.6.5)
waterdrop (2.6.6)
karafka-core (>= 2.1.1, < 3.0.0)
zeitwerk (~> 2.3)
webrick (1.8.1)
Expand Down
2 changes: 1 addition & 1 deletion karafka-web.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Gem::Specification.new do |spec|
spec.licenses = %w[LGPL-3.0 Commercial]

spec.add_dependency 'erubi', '~> 1.4'
spec.add_dependency 'karafka', '>= 2.1.8', '< 3.0.0'
spec.add_dependency 'karafka', '>= 2.1.13', '< 3.0.0'
spec.add_dependency 'karafka-core', '>= 2.0.13', '< 3.0.0'
spec.add_dependency 'roda', '~> 3.68', '>= 3.69'
spec.add_dependency 'tilt', '~> 2.0'
Expand Down
31 changes: 17 additions & 14 deletions lib/karafka/web/processing/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,28 @@ def consume
consumers_messages = messages.select { |message| message.payload[:type] == 'consumer' }

# If there is even one incompatible message, we need to stop
if consumers_messages.all? { |message| @schema_manager.compatible?(message) }
consumers_messages.each do |message|
# We need to run the aggregations on each message in order to compensate for
# potential lags.
@state_aggregator.add(message.payload, message.offset)
@metrics_aggregator.add_report(message.payload)
@metrics_aggregator.add_stats(@state_aggregator.stats)
consumers_messages.each do |message|
unless @schema_manager.compatible?(message)
dispatch

raise ::Karafka::Web::Errors::Processing::IncompatibleSchemaError
end

return unless periodic_flush?
# We need to run the aggregations on each message in order to compensate for
# potential lags.
@state_aggregator.add(message.payload, message.offset)
@metrics_aggregator.add_report(message.payload)
@metrics_aggregator.add_stats(@state_aggregator.stats)

# Optimize memory usage in pro
message.clean! if Karafka.pro?
end

dispatch
return unless periodic_flush?

mark_as_consumed(messages.last)
else
dispatch
dispatch

raise ::Karafka::Web::Errors::Processing::IncompatibleSchemaError
end
mark_as_consumed(messages.last)
end

# Flush final state on shutdown
Expand Down

0 comments on commit fda9487

Please sign in to comment.