Skip to content

Commit

Permalink
do not flush without a reason
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld committed Sep 19, 2023
1 parent 1034250 commit 1977a8c
Showing 1 changed file with 10 additions and 5 deletions.
15 changes: 10 additions & 5 deletions lib/karafka/web/processing/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def initialize(*args)

# We set this that way so we report with first batch and so we report as fast as possible
@flushed_at = monotonic_now - @flush_interval
@changed = false
end

# Aggregates consumers state into a single current state representation
Expand Down Expand Up @@ -59,6 +60,10 @@ def consume
@state_aggregator.add(message.payload, message.offset)
@metrics_aggregator.add_report(message.payload)
@metrics_aggregator.add_stats(@state_aggregator.stats)
# Indicates that we had at least one report we used to enrich data
# If there were no state changes, there is no reason to flush data. This can occur
# when we had some messages but we skipped them for any reason
@changed = true

# Optimize memory usage in pro
message.clean! if Karafka.pro?
Expand All @@ -73,20 +78,20 @@ def consume

# Flush final state on shutdown
def shutdown
return unless @state_aggregator

materialize
validate!
flush
dispatch
end

private

# Flushes the state of the Web-UI to the DB
def dispatch
return unless @changed

materialize
validate!
flush

@changed = false
end

# @return [Boolean] is it time to persist the new current state
Expand Down

0 comments on commit 1977a8c

Please sign in to comment.