From cca7956303dc91c5af4ec36e89fe539517799cfc Mon Sep 17 00:00:00 2001 From: Maciej Mensfeld Date: Mon, 11 Sep 2023 08:38:17 +0200 Subject: [PATCH] wip --- .../consumers/aggregators/metrics.rb | 12 ++++++++++- .../consumers/contracts/topic_stats.rb | 1 + .../web/ui/models/metrics/charts/topics.rb | 17 +++++++++++++++ .../web/ui/pro/views/dashboard/index.erb | 5 +++++ .../consumers/contracts/topic_stats_spec.rb | 21 ++++++++++++++++++- 5 files changed, 54 insertions(+), 2 deletions(-) diff --git a/lib/karafka/web/processing/consumers/aggregators/metrics.rb b/lib/karafka/web/processing/consumers/aggregators/metrics.rb index 50d89bb6..0d0a3545 100644 --- a/lib/karafka/web/processing/consumers/aggregators/metrics.rb +++ b/lib/karafka/web/processing/consumers/aggregators/metrics.rb @@ -113,6 +113,12 @@ def materialize_consumers_groups_current_state .map { |p_details| p_details.fetch(:hi_offset) } .reject(&:negative?) + # Last stable offsets freeze durations - we pick the max freeze to indicate + # the longest open transaction that potentially may be hanging + ls_offsets_fd = partitions_data + .map { |p_details| p_details.fetch(:ls_offset_fd) } + .reject(&:negative?) + # If there is no lag that would not be negative, it means we did not mark # any messages as consumed on this topic in any partitions, hence we cannot # compute lag easily @@ -127,7 +133,11 @@ def materialize_consumers_groups_current_state cgs[group_name][topic_name] = { lag_stored: lags_stored.sum, lag: lags.sum, - pace: offsets_hi.sum + pace: offsets_hi.sum, + # Take max last stable offset duration without any change. This can + # indicate a hanging transaction, because the offset will not move forward + # and will stay with a growing freeze duration when stuck + ls_offset_fd: ls_offsets_fd.max } end end diff --git a/lib/karafka/web/processing/consumers/contracts/topic_stats.rb b/lib/karafka/web/processing/consumers/contracts/topic_stats.rb index b470dc31..b1ca97e7 100644 --- a/lib/karafka/web/processing/consumers/contracts/topic_stats.rb +++ b/lib/karafka/web/processing/consumers/contracts/topic_stats.rb @@ -12,6 +12,7 @@ class TopicStats < Web::Contracts::Base required(:lag_stored) { |val| val.is_a?(Integer) } required(:lag) { |val| val.is_a?(Integer) } required(:pace) { |val| val.is_a?(Integer) } + required(:ls_offset_fd) { |val| val.is_a?(Integer) && val >= 0 } end end end diff --git a/lib/karafka/web/ui/models/metrics/charts/topics.rb b/lib/karafka/web/ui/models/metrics/charts/topics.rb index 7d53bc8a..d3b9452e 100644 --- a/lib/karafka/web/ui/models/metrics/charts/topics.rb +++ b/lib/karafka/web/ui/models/metrics/charts/topics.rb @@ -66,6 +66,23 @@ def topics_pace topics.each_value(&:compact!) topics.to_json end + + def max_lso_time + topics = Hash.new { |h, k| h[k] = Hash.new { |h2, k2| h2[k2] = [] } } + + @data.to_h.each do |topic, metrics| + topic_without_cg = topic.split('[').first + + metrics.each do |current| + topics[topic_without_cg][current.first] << ((current.last[:ls_offset_fd] || 0) / 1_000).round + end + end + + topics.each_value(&:compact!) + topics.each_value { |metrics| metrics.transform_values!(&:max) } + topics.transform_values! { |values| values.to_a.sort_by!(&:first) } + topics.to_json + end end end end diff --git a/lib/karafka/web/ui/pro/views/dashboard/index.erb b/lib/karafka/web/ui/pro/views/dashboard/index.erb index 39ba7f0f..ccb5ac1d 100644 --- a/lib/karafka/web/ui/pro/views/dashboard/index.erb +++ b/lib/karafka/web/ui/pro/views/dashboard/index.erb @@ -15,6 +15,7 @@ <%== partial 'shared/tab_nav', locals: { title: 'Batches', id: 'batches' } %> <%== partial 'shared/tab_nav', locals: { title: 'Lags stored', id: 'lags-stored' } %> <%== partial 'shared/tab_nav', locals: { title: 'Topics pace', id: 'topics-pace' } %> + <%== partial 'shared/tab_nav', locals: { title: 'Max LSO time', id: 'max-lso-time' } %>
@@ -35,6 +36,10 @@
<%== partial 'shared/chart', locals: { data: @topics_charts.topics_pace, id: 'topics-pace' } %>
+ +
+ <%== partial 'shared/chart', locals: { data: @topics_charts.max_lso_time, id: 'max-lso-time' } %> +
diff --git a/spec/lib/karafka/web/processing/consumers/contracts/topic_stats_spec.rb b/spec/lib/karafka/web/processing/consumers/contracts/topic_stats_spec.rb index 82ba6bcf..7c54d747 100644 --- a/spec/lib/karafka/web/processing/consumers/contracts/topic_stats_spec.rb +++ b/spec/lib/karafka/web/processing/consumers/contracts/topic_stats_spec.rb @@ -7,7 +7,8 @@ { lag_stored: 10, lag: 5, - pace: 3 + pace: 3, + lso_fd: 2 } end @@ -52,4 +53,22 @@ it { expect(contract.call(topic_stats)).not_to be_success } end + + context 'when lso_fd is not a number' do + before { topic_stats[:lso_fd] = 'test' } + + it { expect(contract.call(topic_stats)).not_to be_success } + end + + context 'when lso_fd is less than 0' do + before { topic_stats[:lso_fd] = -2 } + + it { expect(contract.call(topic_stats)).not_to be_success } + end + + context 'when lso_fd is missing' do + before { topic_stats.delete(:lso_fd) } + + it { expect(contract.call(topic_stats)).not_to be_success } + end end