diff --git a/lib/karafka/web/processing/consumer.rb b/lib/karafka/web/processing/consumer.rb index 53187183..0bf5cada 100644 --- a/lib/karafka/web/processing/consumer.rb +++ b/lib/karafka/web/processing/consumer.rb @@ -26,6 +26,7 @@ def initialize(*args) # We set this that way so we report with first batch and so we report as fast as possible @flushed_at = monotonic_now - @flush_interval + @changed = false end # Aggregates consumers state into a single current state representation @@ -59,6 +60,10 @@ def consume @state_aggregator.add(message.payload, message.offset) @metrics_aggregator.add_report(message.payload) @metrics_aggregator.add_stats(@state_aggregator.stats) + # Indicates that we had at least one report we used to enrich data + # If there were no state changes, there is no reason to flush data. This can occur + # when we had some messages but we skipped them for any reason + @changed = true # Optimize memory usage in pro message.clean! if Karafka.pro? @@ -73,20 +78,20 @@ def consume # Flush final state on shutdown def shutdown - return unless @state_aggregator - - materialize - validate! - flush + dispatch end private # Flushes the state of the Web-UI to the DB def dispatch + return unless @changed + materialize validate! flush + + @changed = false end # @return [Boolean] is it time to persist the new current state