Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld committed Sep 11, 2023
1 parent ceab04a commit cca7956
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 2 deletions.
12 changes: 11 additions & 1 deletion lib/karafka/web/processing/consumers/aggregators/metrics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ def materialize_consumers_groups_current_state
.map { |p_details| p_details.fetch(:hi_offset) }
.reject(&:negative?)

# Last stable offsets freeze durations - we pick the max freeze to indicate
# the longest open transaction that potentially may be hanging
ls_offsets_fd = partitions_data
.map { |p_details| p_details.fetch(:ls_offset_fd) }
.reject(&:negative?)

# If there is no lag that would not be negative, it means we did not mark
# any messages as consumed on this topic in any partitions, hence we cannot
# compute lag easily
Expand All @@ -127,7 +133,11 @@ def materialize_consumers_groups_current_state
cgs[group_name][topic_name] = {
lag_stored: lags_stored.sum,
lag: lags.sum,
pace: offsets_hi.sum
pace: offsets_hi.sum,
# Take max last stable offset duration without any change. This can
# indicate a hanging transaction, because the offset will not move forward
# and will stay with a growing freeze duration when stuck
ls_offset_fd: ls_offsets_fd.max
}
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class TopicStats < Web::Contracts::Base
required(:lag_stored) { |val| val.is_a?(Integer) }
required(:lag) { |val| val.is_a?(Integer) }
required(:pace) { |val| val.is_a?(Integer) }
required(:ls_offset_fd) { |val| val.is_a?(Integer) && val >= 0 }
end
end
end
Expand Down
17 changes: 17 additions & 0 deletions lib/karafka/web/ui/models/metrics/charts/topics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,23 @@ def topics_pace
topics.each_value(&:compact!)
topics.to_json
end

def max_lso_time
topics = Hash.new { |h, k| h[k] = Hash.new { |h2, k2| h2[k2] = [] } }

@data.to_h.each do |topic, metrics|
topic_without_cg = topic.split('[').first

metrics.each do |current|
topics[topic_without_cg][current.first] << ((current.last[:ls_offset_fd] || 0) / 1_000).round
end
end

topics.each_value(&:compact!)
topics.each_value { |metrics| metrics.transform_values!(&:max) }
topics.transform_values! { |values| values.to_a.sort_by!(&:first) }
topics.to_json
end
end
end
end
Expand Down
5 changes: 5 additions & 0 deletions lib/karafka/web/ui/pro/views/dashboard/index.erb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
<%== partial 'shared/tab_nav', locals: { title: 'Batches', id: 'batches' } %>
<%== partial 'shared/tab_nav', locals: { title: 'Lags stored', id: 'lags-stored' } %>
<%== partial 'shared/tab_nav', locals: { title: 'Topics pace', id: 'topics-pace' } %>
<%== partial 'shared/tab_nav', locals: { title: 'Max LSO time', id: 'max-lso-time' } %>
</ul>

<div class="tab-content">
Expand All @@ -35,6 +36,10 @@
<div class="tab-pane" id="topics-pace" role="tabpanel">
<%== partial 'shared/chart', locals: { data: @topics_charts.topics_pace, id: 'topics-pace' } %>
</div>

<div class="tab-pane" id="max-lso-time" role="tabpanel">
<%== partial 'shared/chart', locals: { data: @topics_charts.max_lso_time, id: 'max-lso-time' } %>
</div>
</div>
</div>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
{
lag_stored: 10,
lag: 5,
pace: 3
pace: 3,
lso_fd: 2
}
end

Expand Down Expand Up @@ -52,4 +53,22 @@

it { expect(contract.call(topic_stats)).not_to be_success }
end

context 'when lso_fd is not a number' do
before { topic_stats[:lso_fd] = 'test' }

it { expect(contract.call(topic_stats)).not_to be_success }
end

context 'when lso_fd is less than 0' do
before { topic_stats[:lso_fd] = -2 }

it { expect(contract.call(topic_stats)).not_to be_success }
end

context 'when lso_fd is missing' do
before { topic_stats.delete(:lso_fd) }

it { expect(contract.call(topic_stats)).not_to be_success }
end
end

0 comments on commit cca7956

Please sign in to comment.