diff --git a/CHANGELOG.md b/CHANGELOG.md index 6ec48d72..7b5b3c07 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ - **[Feature]** Provide ability to reproduce a given message to the same topic partition with all the details from the per message explorer view. - **[Feature]** Provide "surrounding" navigation link that allows to view the given message in the context of its surrounding. Useful for debugging of failures where the batch context may be relevant. - **[Feature]** Allow for time based lookups per topic partition. +- **[Feature]** Introduce Offsets Health inspection view for frozen LSO lookups with `lso_threshold` configuration option. - [Improvement] Support pattern subscriptions details in the routing view both by displaying the pattern as well as expanded routing details. - [Improvement] Collect total number of threads per process for the process details view. - [Improvement] Normalize naming of metrics to better reflect what they do (in reports and in the Web UI). @@ -53,6 +54,8 @@ - [Improvement] Use replication factor of two by default (if not overridden) for Web UI topics when there is more than one broker. - [Improvement] Show a warning when replication factor of 1 is used for Web UI topics in production. - [Improvement] Collect extra additional metrics useful for hanging transactions detection. +- [Improvement] Reorganize how the Health view looks. +- [Improvement] Hide all private Kafka topics by default in the explorer. Configurable with `show_internal_topics` config setting. - [Fix] Return 402 status instead of 500 on Pro features that are not available in OSS. - [Fix] Fix a case where errors would not be visible without Rails due to the `String#first` usage. - [Fix] Fix a case where live-poll would be disabled but would still update data. diff --git a/Gemfile.lock b/Gemfile.lock index afd30bfb..88a270e3 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -4,7 +4,7 @@ PATH karafka-web (0.7.0) erubi (~> 1.4) karafka (>= 2.2.0, < 3.0.0) - karafka-core (>= 2.2.1, < 3.0.0) + karafka-core (>= 2.2.2, < 3.0.0) roda (~> 3.68, >= 3.69) tilt (~> 2.0) @@ -31,7 +31,7 @@ GEM thor (>= 0.20) waterdrop (>= 2.6.6, < 3.0.0) zeitwerk (~> 2.3) - karafka-core (2.2.1) + karafka-core (2.2.2) concurrent-ruby (>= 1.1) karafka-rdkafka (>= 0.13.1, < 0.14.0) karafka-rdkafka (0.13.4) diff --git a/karafka-web.gemspec b/karafka-web.gemspec index 28736761..f595275f 100644 --- a/karafka-web.gemspec +++ b/karafka-web.gemspec @@ -18,7 +18,7 @@ Gem::Specification.new do |spec| spec.add_dependency 'erubi', '~> 1.4' spec.add_dependency 'karafka', '>= 2.2.0', '< 3.0.0' - spec.add_dependency 'karafka-core', '>= 2.2.1', '< 3.0.0' + spec.add_dependency 'karafka-core', '>= 2.2.2', '< 3.0.0' spec.add_dependency 'roda', '~> 3.68', '>= 3.69' spec.add_dependency 'tilt', '~> 2.0' diff --git a/lib/karafka/web/config.rb b/lib/karafka/web/config.rb index dd7514be..0419d027 100644 --- a/lib/karafka/web/config.rb +++ b/lib/karafka/web/config.rb @@ -94,9 +94,18 @@ class Config # UI cache to improve performance of views that reuse states that are not often changed setting :cache, default: Ui::Lib::TtlCache.new(60_000 * 5) + # Should we display internal topics of Kafka. The once starting with `__` + # By default we do not display them as they are not usable from regular users perspective + setting :show_internal_topics, default: false + # How many elements should we display on pages that support pagination setting :per_page, default: 25 + # Time beyond which the last stable offset freeze is considered a risk + # (unless same as high). This is used to show on the UI that there may be a hanging + # transaction that will cause given consumer group to halt processing and wait + setting :lso_threshold, default: 5 * 60 * 1_000 + # Allows to manage visibility of payload, headers and message key in the UI # In some cases you may want to limit what is being displayed due to the type of data you # are dealing with diff --git a/lib/karafka/web/contracts/config.rb b/lib/karafka/web/contracts/config.rb index cbd537e1..57f04fed 100644 --- a/lib/karafka/web/contracts/config.rb +++ b/lib/karafka/web/contracts/config.rb @@ -52,6 +52,7 @@ class Config < Web::Contracts::Base required(:secret) { |val| val.is_a?(String) && val.length >= 64 } end + required(:show_internal_topics) { |val| [true, false].include?(val) } required(:cache) { |val| !val.nil? } required(:per_page) { |val| val.is_a?(Integer) && val >= 1 && val <= 100 } required(:visibility_filter) { |val| !val.nil? } diff --git a/lib/karafka/web/processing/consumers/aggregators/metrics.rb b/lib/karafka/web/processing/consumers/aggregators/metrics.rb index 50d89bb6..0d0a3545 100644 --- a/lib/karafka/web/processing/consumers/aggregators/metrics.rb +++ b/lib/karafka/web/processing/consumers/aggregators/metrics.rb @@ -113,6 +113,12 @@ def materialize_consumers_groups_current_state .map { |p_details| p_details.fetch(:hi_offset) } .reject(&:negative?) + # Last stable offsets freeze durations - we pick the max freeze to indicate + # the longest open transaction that potentially may be hanging + ls_offsets_fd = partitions_data + .map { |p_details| p_details.fetch(:ls_offset_fd) } + .reject(&:negative?) + # If there is no lag that would not be negative, it means we did not mark # any messages as consumed on this topic in any partitions, hence we cannot # compute lag easily @@ -127,7 +133,11 @@ def materialize_consumers_groups_current_state cgs[group_name][topic_name] = { lag_stored: lags_stored.sum, lag: lags.sum, - pace: offsets_hi.sum + pace: offsets_hi.sum, + # Take max last stable offset duration without any change. This can + # indicate a hanging transaction, because the offset will not move forward + # and will stay with a growing freeze duration when stuck + ls_offset_fd: ls_offsets_fd.max } end end diff --git a/lib/karafka/web/processing/consumers/contracts/topic_stats.rb b/lib/karafka/web/processing/consumers/contracts/topic_stats.rb index b470dc31..b1ca97e7 100644 --- a/lib/karafka/web/processing/consumers/contracts/topic_stats.rb +++ b/lib/karafka/web/processing/consumers/contracts/topic_stats.rb @@ -12,6 +12,7 @@ class TopicStats < Web::Contracts::Base required(:lag_stored) { |val| val.is_a?(Integer) } required(:lag) { |val| val.is_a?(Integer) } required(:pace) { |val| val.is_a?(Integer) } + required(:ls_offset_fd) { |val| val.is_a?(Integer) && val >= 0 } end end end diff --git a/lib/karafka/web/tracking/consumers/contracts/partition.rb b/lib/karafka/web/tracking/consumers/contracts/partition.rb index 04de47bd..0fd4549f 100644 --- a/lib/karafka/web/tracking/consumers/contracts/partition.rb +++ b/lib/karafka/web/tracking/consumers/contracts/partition.rb @@ -15,10 +15,13 @@ class Partition < Web::Contracts::Base required(:lag_stored) { |val| val.is_a?(Integer) } required(:lag_stored_d) { |val| val.is_a?(Integer) } required(:committed_offset) { |val| val.is_a?(Integer) } + required(:committed_offset_fd) { |val| val.is_a?(Integer) && val >= 0 } required(:stored_offset) { |val| val.is_a?(Integer) } + required(:stored_offset_fd) { |val| val.is_a?(Integer) && val >= 0 } required(:fetch_state) { |val| val.is_a?(String) && !val.empty? } required(:poll_state) { |val| val.is_a?(String) && !val.empty? } required(:hi_offset) { |val| val.is_a?(Integer) } + required(:hi_offset_fd) { |val| val.is_a?(Integer) && val >= 0 } required(:lo_offset) { |val| val.is_a?(Integer) } required(:eof_offset) { |val| val.is_a?(Integer) } required(:ls_offset) { |val| val.is_a?(Integer) } diff --git a/lib/karafka/web/tracking/consumers/listeners/statistics.rb b/lib/karafka/web/tracking/consumers/listeners/statistics.rb index a2ea95f6..8bd6f2b5 100644 --- a/lib/karafka/web/tracking/consumers/listeners/statistics.rb +++ b/lib/karafka/web/tracking/consumers/listeners/statistics.rb @@ -109,9 +109,14 @@ def extract_partition_metrics(pt_stats) 'consumer_lag_stored', 'consumer_lag_stored_d', 'committed_offset', + # Can be useful to track the frequency of flushes when there is progress + 'committed_offset_fd', 'stored_offset', + # Can be useful to track the frequency of flushes when there is progress + 'stored_offset_fd', 'fetch_state', 'hi_offset', + 'hi_offset_fd', 'lo_offset', 'eof_offset', 'ls_offset', diff --git a/lib/karafka/web/ui/controllers/cluster.rb b/lib/karafka/web/ui/controllers/cluster.rb index f33475b3..d173c0d3 100644 --- a/lib/karafka/web/ui/controllers/cluster.rb +++ b/lib/karafka/web/ui/controllers/cluster.rb @@ -35,10 +35,13 @@ def index # @return [Array] array with topics to be displayed sorted in an alphabetical # order def displayable_topics(cluster_info) - cluster_info - .topics - .reject { |topic| topic[:topic_name] == '__consumer_offsets' } - .sort_by { |topic| topic[:topic_name] } + all = cluster_info + .topics + .sort_by { |topic| topic[:topic_name] } + + return all if ::Karafka::Web.config.ui.show_internal_topics + + all.reject { |topic| topic[:topic_name].start_with?('__') } end end end diff --git a/lib/karafka/web/ui/helpers/application_helper.rb b/lib/karafka/web/ui/helpers/application_helper.rb index 3b5196ff..503617d2 100644 --- a/lib/karafka/web/ui/helpers/application_helper.rb +++ b/lib/karafka/web/ui/helpers/application_helper.rb @@ -156,6 +156,22 @@ def offset_with_label(topic_name, partition_id, offset, explore: false) end end + # @param details [::Karafka::Web::Ui::Models::Partition] partition information with + # lso risk state info + # @return [String] background classes for row marking + def lso_risk_state_bg(details) + case details.lso_risk_state + when :active + '' + when :at_risk + 'bg-warning bg-opacity-25' + when :stopped + 'bg-danger bg-opacity-25' + else + raise ::Karafka::Errors::UnsupportedCaseError + end + end + # Returns the view title html code # # @param title [String] page title diff --git a/lib/karafka/web/ui/lib/hash_proxy.rb b/lib/karafka/web/ui/lib/hash_proxy.rb index 5186852f..0ff88838 100644 --- a/lib/karafka/web/ui/lib/hash_proxy.rb +++ b/lib/karafka/web/ui/lib/hash_proxy.rb @@ -12,16 +12,17 @@ module Lib # It is mostly used for flat hashes. # # It is in a way similar to openstruct but has abilities to dive deep into objects + # + # It is not super fast but it is enough for the UI and how deep structures we have. class HashProxy + extend Forwardable + + def_delegators :@hash, :[], :[]=, :key?, :each, :find + # @param hash [Hash] hash we want to convert to a proxy def initialize(hash) @hash = hash - end - - # @param key [Object] hash key - # @return [Object] key content or nil if missing - def [](key) - @hash[key] + @visited = [] end # @return [Original hash] @@ -35,7 +36,12 @@ def to_h def method_missing(method_name, *args, &block) return super unless args.empty? && block.nil? + @visited.clear + result = deep_find(@hash, method_name.to_sym) + + @visited.clear + result.nil? ? super : result end @@ -51,6 +57,12 @@ def respond_to_missing?(method_name, include_private = false) # @param obj [Object] local scope of iterating # @param key [Symbol, String] key we are looking for def deep_find(obj, key) + # Prevent circular dependency lookups by making sure we do not check the same object + # multiple times + return nil if @visited.include?(obj) + + @visited << obj + if obj.respond_to?(:key?) && obj.key?(key) obj[key] elsif obj.respond_to?(:each) diff --git a/lib/karafka/web/ui/models/health.rb b/lib/karafka/web/ui/models/health.rb index a8f3d224..06eaa027 100644 --- a/lib/karafka/web/ui/models/health.rb +++ b/lib/karafka/web/ui/models/health.rb @@ -31,7 +31,7 @@ def fetch_topics_data(state, stats) stats[cg_name] ||= { topics: {} } stats[cg_name][:topics][t_name] ||= {} - stats[cg_name][:topics][t_name][pt_id] = partition.to_h + stats[cg_name][:topics][t_name][pt_id] = partition stats[cg_name][:topics][t_name][pt_id][:process] = process end end diff --git a/lib/karafka/web/ui/models/metrics/charts/topics.rb b/lib/karafka/web/ui/models/metrics/charts/topics.rb index 7d53bc8a..3d01dc70 100644 --- a/lib/karafka/web/ui/models/metrics/charts/topics.rb +++ b/lib/karafka/web/ui/models/metrics/charts/topics.rb @@ -66,6 +66,40 @@ def topics_pace topics.each_value(&:compact!) topics.to_json end + + # @return [String] JSON with per-topic, highest LSO freeze duration. Useful for + # debugging of issues arising from hanging transactions + def max_lso_time + topics = Hash.new { |h, k| h[k] = Hash.new { |h2, k2| h2[k2] = [] } } + + @data.to_h.each do |topic, metrics| + topic_without_cg = topic.split('[').first + + metrics.each do |current| + ls_offset = current.last[:ls_offset] || 0 + ls_offset_fd = current.last[:ls_offset_fd] || 0 + hi_offset = current.last[:hi_offset] || 0 + + # We convert this to seconds from milliseconds due to our Web UI precision + # Reporting is in ms for consistency + normalized_fd = (ls_offset_fd / 1_000).round + # In case ls_offset and hi_offset are the same, it means we're reached eof + # and we just don't have more data. In cases like this, LSO freeze duration + # will grow because LSO will remain unchanged, but it does not mean it is + # frozen. It means there is just no more data in the topic partition + # This means we need to nullify this case, otherwise it would report, that + # lso is hanging. + normalized_fd = 0 if ls_offset == hi_offset + + topics[topic_without_cg][current.first] << normalized_fd + end + end + + topics.each_value(&:compact!) + topics.each_value { |metrics| metrics.transform_values!(&:max) } + topics.transform_values! { |values| values.to_a.sort_by!(&:first) } + topics.to_json + end end end end diff --git a/lib/karafka/web/ui/models/partition.rb b/lib/karafka/web/ui/models/partition.rb index 8f25b9ae..1c8078a5 100644 --- a/lib/karafka/web/ui/models/partition.rb +++ b/lib/karafka/web/ui/models/partition.rb @@ -6,11 +6,32 @@ module Ui module Models # Single topic partition data representation model class Partition < Lib::HashProxy - # @return [Integer] lag - # @note We check the presence because prior to schema version 1.2.0, this metrics was - # not reported from the processes - def lag - to_h.fetch(:lag, -1) + # @return [Symbol] one of three states in which LSO can be in the correlation to given + # partition in the context of a consumer group. + # + # @note States descriptions: + # - `:active` all good. No hanging transactions, processing is ok + # - `:at_risk` - there may be hanging transactions but they do not affect processing + # before being stuck. This means, that the transaction still may be finished + # without affecting the processing, hence not having any impact. + # - `:stopped` - we have reached a hanging LSO and we cannot move forward despite more + # data being available. Unless the hanging transaction is killed or it finishes, + # we will not move forward. + def lso_risk_state + # If last stable is falling behind the high watermark + if ls_offset < hi_offset + # But it is changing and moving fast enough, it does not mean it is stuck + return :active if ls_offset_fd < ::Karafka::Web.config.ui.lso_threshold + + # If it is stuck but we still have work to do, this is not a tragic situation because + # maybe it will unstuck before we reach it + return :at_risk if (committed_offset || 0) < ls_offset + + # If it is not changing and falling behind high, it is stuck + :stopped + else + :active + end end end end diff --git a/lib/karafka/web/ui/pro/app.rb b/lib/karafka/web/ui/pro/app.rb index b0371dad..c22c540f 100644 --- a/lib/karafka/web/ui/pro/app.rb +++ b/lib/karafka/web/ui/pro/app.rb @@ -135,9 +135,20 @@ class App < Ui::Base end end - r.get 'health' do + r.on 'health' do controller = Controllers::Health.new(params) - controller.index + + r.get 'offsets' do + controller.offsets + end + + r.get 'overview' do + controller.overview + end + + r.get do + r.redirect root_path('health/overview') + end end r.get 'cluster' do diff --git a/lib/karafka/web/ui/pro/controllers/explorer.rb b/lib/karafka/web/ui/pro/controllers/explorer.rb index b151fc7c..74bac88c 100644 --- a/lib/karafka/web/ui/pro/controllers/explorer.rb +++ b/lib/karafka/web/ui/pro/controllers/explorer.rb @@ -24,9 +24,12 @@ class Explorer < Ui::Controllers::Base def index @topics = Models::ClusterInfo .topics - .reject { |topic| topic[:topic_name] == '__consumer_offsets' } .sort_by { |topic| topic[:topic_name] } + unless ::Karafka::Web.config.ui.show_internal_topics + @topics.reject! { |topic| topic[:topic_name].start_with?('__') } + end + respond end diff --git a/lib/karafka/web/ui/pro/controllers/health.rb b/lib/karafka/web/ui/pro/controllers/health.rb index 92c05fe9..5466b0df 100644 --- a/lib/karafka/web/ui/pro/controllers/health.rb +++ b/lib/karafka/web/ui/pro/controllers/health.rb @@ -19,12 +19,20 @@ module Controllers # Health state controller class Health < Ui::Controllers::Base # Displays the current system state - def index + def overview current_state = Models::ConsumersState.current! @stats = Models::Health.current(current_state) respond end + + # Displays details about offsets and their progression/statuses + def offsets + # Same data as overview but presented differently + overview + + respond + end end end end diff --git a/lib/karafka/web/ui/pro/views/dashboard/index.erb b/lib/karafka/web/ui/pro/views/dashboard/index.erb index 39ba7f0f..ccb5ac1d 100644 --- a/lib/karafka/web/ui/pro/views/dashboard/index.erb +++ b/lib/karafka/web/ui/pro/views/dashboard/index.erb @@ -15,6 +15,7 @@ <%== partial 'shared/tab_nav', locals: { title: 'Batches', id: 'batches' } %> <%== partial 'shared/tab_nav', locals: { title: 'Lags stored', id: 'lags-stored' } %> <%== partial 'shared/tab_nav', locals: { title: 'Topics pace', id: 'topics-pace' } %> + <%== partial 'shared/tab_nav', locals: { title: 'Max LSO time', id: 'max-lso-time' } %>
@@ -35,6 +36,10 @@
<%== partial 'shared/chart', locals: { data: @topics_charts.topics_pace, id: 'topics-pace' } %>
+ +
+ <%== partial 'shared/chart', locals: { data: @topics_charts.max_lso_time, id: 'max-lso-time' } %> +
diff --git a/lib/karafka/web/ui/pro/views/health/_breadcrumbs.erb b/lib/karafka/web/ui/pro/views/health/_breadcrumbs.erb index 1dcbd383..f87f94e0 100644 --- a/lib/karafka/web/ui/pro/views/health/_breadcrumbs.erb +++ b/lib/karafka/web/ui/pro/views/health/_breadcrumbs.erb @@ -3,3 +3,19 @@ Consumers groups health + +<% if current_path.include?('/overview') %> + +<% end %> + +<% if current_path.include?('/offsets') %> + +<% end %> diff --git a/lib/karafka/web/ui/pro/views/health/_no_data.erb b/lib/karafka/web/ui/pro/views/health/_no_data.erb new file mode 100644 index 00000000..465bc041 --- /dev/null +++ b/lib/karafka/web/ui/pro/views/health/_no_data.erb @@ -0,0 +1,9 @@ +
+
+
+ +
+
+
diff --git a/lib/karafka/web/ui/pro/views/health/_partition.erb b/lib/karafka/web/ui/pro/views/health/_partition.erb index 2909fda0..d24dd691 100644 --- a/lib/karafka/web/ui/pro/views/health/_partition.erb +++ b/lib/karafka/web/ui/pro/views/health/_partition.erb @@ -1,37 +1,39 @@ - - - <%= topic_name %> - + <%= partition_id %> - <%== lag_with_label details[:lag_stored] %> + <%== lag_with_label details.lag_stored %> - - <%= details[:lag_stored_d] %> + + <%= details.lag_stored_d %> - <%== offset_with_label topic_name, partition_id, details[:committed_offset] %> + <%== offset_with_label topic_name, partition_id, details.committed_offset %> - <%== offset_with_label topic_name, partition_id, details[:stored_offset] %> + <%== offset_with_label topic_name, partition_id, details.stored_offset %> + + + + <%= details.fetch_state %> + - - <%= details[:fetch_state] %> + + <%= details.poll_state %> - - <%= details[:poll_state] %> + + <%= details.lso_risk_state %> - - <%= details[:process].name %> + + <%= details.process.name %> diff --git a/lib/karafka/web/ui/pro/views/health/_partition_offset.erb b/lib/karafka/web/ui/pro/views/health/_partition_offset.erb new file mode 100644 index 00000000..a360ba63 --- /dev/null +++ b/lib/karafka/web/ui/pro/views/health/_partition_offset.erb @@ -0,0 +1,40 @@ + + + <%= partition_id %> + + + <%== lag_with_label details.lag_stored %> + + + <%== offset_with_label topic_name, partition_id, details.committed_offset %> + + + <%== relative_time(Time.now - details.committed_offset_fd / 1_000) %> + + + <%== offset_with_label topic_name, partition_id, details.stored_offset %> + + + <%== relative_time(Time.now - details.stored_offset_fd / 1_000) %> + + + <%== offset_with_label topic_name, partition_id, details.lo_offset %> + + + <%== offset_with_label topic_name, partition_id, details.hi_offset %> + + + <%== relative_time(Time.now - details.hi_offset_fd / 1_000) %> + + + <%== offset_with_label topic_name, partition_id, details.ls_offset %> + + + <%== relative_time(Time.now - details.ls_offset_fd / 1_000) %> + + + + <%= details.lso_risk_state %> + + + diff --git a/lib/karafka/web/ui/pro/views/health/_tabs.erb b/lib/karafka/web/ui/pro/views/health/_tabs.erb new file mode 100644 index 00000000..5e2598fb --- /dev/null +++ b/lib/karafka/web/ui/pro/views/health/_tabs.erb @@ -0,0 +1,27 @@ +
+
+
+ + + +
+
+
diff --git a/lib/karafka/web/ui/pro/views/health/index.erb b/lib/karafka/web/ui/pro/views/health/index.erb deleted file mode 100644 index 41951775..00000000 --- a/lib/karafka/web/ui/pro/views/health/index.erb +++ /dev/null @@ -1,71 +0,0 @@ -<%== view_title('Consumers groups health') %> - -<% if @stats.empty? %> -
-
-
- -
-
-
-<% end %> - -<% @stats.each do |cg_name, details| %> -
-
-
-

<%= cg_name %>

-
-
-
- -
-
- - - - - - - - - - - - - - - - <% details[:topics].sort_by(&:first).each do |topic_name, partitions| %> - <% partitions.sort_by(&:first).each do |partition_id, details| %> - <%== - partial( - 'health/partition', - locals: { - topic_name: topic_name, - partition_id: partition_id, - details: details - } - ) - %> - <% end %> - <% end %> - -
TopicPartitionLag storedLag stored trendCommitted offsetStored offsetFetch statePoll stateProcess name
-
-
- -
-
- - Last rebalance: - - <%== relative_time(details[:rebalanced_at]) %> - - -
-
-
-<% end %> diff --git a/lib/karafka/web/ui/pro/views/health/offsets.erb b/lib/karafka/web/ui/pro/views/health/offsets.erb new file mode 100644 index 00000000..b6f7e597 --- /dev/null +++ b/lib/karafka/web/ui/pro/views/health/offsets.erb @@ -0,0 +1,71 @@ +<%== view_title('Consumers groups offsets details') %> + +<% if @stats.empty? %> + <%== partial 'health/no_data' %> +<% else %> + <%== partial 'health/tabs' %> +<% end %> + +<% @stats.each_with_index do |(cg_name, details), index| %> +
+
+
+

<%= cg_name %>

+
+ +
+ + Last rebalance: + + <%== relative_time(details[:rebalanced_at]) %> + + +
+
+ +
+
+ <% topics = details[:topics].sort_by(&:first) %> + <% topics.each_with_index do |(topic_name, partitions), index| %> + + + + + + + + + + + + + + + + + + + + + + <% partitions.sort_by(&:first).each do |partition_id, details| %> + <%== + partial( + 'health/partition_offset', + locals: { + topic_name: topic_name, + partition_id: partition_id, + details: details + } + ) + %> + <% end %> + +
+
<%= topic_name %>
+
PartitionLag storedCommitted offsetCommitted offset changeStored offsetStored offset changeLow offsetHigh offsetHigh offset changeLast stable offsetLast stable offset changeLast stable offset state
+ <% end %> +
+
+
+<% end %> diff --git a/lib/karafka/web/ui/pro/views/health/overview.erb b/lib/karafka/web/ui/pro/views/health/overview.erb new file mode 100644 index 00000000..b135ccd3 --- /dev/null +++ b/lib/karafka/web/ui/pro/views/health/overview.erb @@ -0,0 +1,68 @@ +<%== view_title('Consumers groups overview') %> + +<% if @stats.empty? %> + <%== partial 'health/no_data' %> +<% else %> + <%== partial 'health/tabs' %> +<% end %> + +<% @stats.each_with_index do |(cg_name, details), index| %> +
+
+
+

<%= cg_name %>

+
+ +
+ + Last rebalance: + + <%== relative_time(details[:rebalanced_at]) %> + + +
+
+ +
+
+ <% topics = details[:topics].sort_by(&:first) %> + <% topics.each_with_index do |(topic_name, partitions), index| %> + + + + + + + + + + + + + + + + + + + <% partitions.sort_by(&:first).each do |partition_id, details| %> + <%== + partial( + 'health/partition', + locals: { + topic_name: topic_name, + partition_id: partition_id, + details: details + } + ) + %> + <% end %> + +
+
<%= topic_name %>
+
PartitionLag storedLag stored trendCommitted offsetStored offsetFetch statePoll stateLSO stateProcess name
+ <% end %> +
+
+
+<% end %> diff --git a/lib/karafka/web/ui/views/cluster/_partition.erb b/lib/karafka/web/ui/views/cluster/_partition.erb index 267eb01e..f007da88 100644 --- a/lib/karafka/web/ui/views/cluster/_partition.erb +++ b/lib/karafka/web/ui/views/cluster/_partition.erb @@ -1,24 +1,22 @@ <% topic = partition[:topic] %> -<% if topic[:topic_name] != '__consumer_offsets' %> - - - - <%= topic[:topic_name] %> - - - - <%= partition[:partition_id] %> - - - - <%= partition[:leader] %> - - - <%= partition[:replica_count] %> - - - <%= partition[:in_sync_replica_brokers] %> - - -<% end %> + + + + <%= topic[:topic_name] %> + + + + <%= partition[:partition_id] %> + + + + <%= partition[:leader] %> + + + <%= partition[:replica_count] %> + + + <%= partition[:in_sync_replica_brokers] %> + + diff --git a/spec/fixtures/consumer_report.json b/spec/fixtures/consumer_report.json index 14ad2877..07d58ebc 100644 --- a/spec/fixtures/consumer_report.json +++ b/spec/fixtures/consumer_report.json @@ -68,9 +68,16 @@ "lag_stored": 1, "lag_stored_d": -3, "committed_offset": 327343, + "committed_offset_fd": 5000, "stored_offset": 327355, + "stored_offset_fd": 5, "fetch_state": "active", "hi_offset": 327356, + "hi_offset_fd": 100, + "lo_offset": 0, + "eof_offset": 327356, + "ls_offset": 200, + "ls_offset_fd": 1000, "id": 0, "poll_state": "active" } @@ -85,9 +92,16 @@ "lag_stored": -1, "lag_stored_d": 0, "committed_offset": -1001, + "committed_offset_fd": 0, "stored_offset": -1001, + "stored_offset_fd": 0, "fetch_state": "active", "hi_offset": 0, + "hi_offset_fd": 100, + "lo_offset": 0, + "eof_offset": 0, + "ls_offset": 0, + "ls_offset_fd": 0, "id": 0, "poll_state": "active" } @@ -102,9 +116,16 @@ "lag_stored": -1, "lag_stored_d": 0, "committed_offset": 27, + "committed_offset_fd": 0, "stored_offset": -1001, + "stored_offset_fd": 0, "fetch_state": "active", "hi_offset": 27, + "hi_offset_fd": 100, + "lo_offset": 0, + "eof_offset": 0, + "ls_offset": 0, + "ls_offset_fd": 0, "id": 0, "poll_state": "active" } diff --git a/spec/lib/karafka/web/contracts/config_spec.rb b/spec/lib/karafka/web/contracts/config_spec.rb index e8ce8bfa..5f7aa401 100644 --- a/spec/lib/karafka/web/contracts/config_spec.rb +++ b/spec/lib/karafka/web/contracts/config_spec.rb @@ -37,6 +37,7 @@ key: 'some_key', secret: 'a' * 64 }, + show_internal_topics: true, cache: Object.new, per_page: 50, visibility_filter: Object.new @@ -168,5 +169,17 @@ it { expect(contract.call(params)).not_to be_success } end + + context 'when show_internal_topics is nil' do + before { params[:ui][:show_internal_topics] = nil } + + it { expect(contract.call(params)).not_to be_success } + end + + context 'when show_internal_topics is not boolean' do + before { params[:ui][:show_internal_topics] = '1' } + + it { expect(contract.call(params)).not_to be_success } + end end end diff --git a/spec/lib/karafka/web/processing/consumers/contracts/topic_stats_spec.rb b/spec/lib/karafka/web/processing/consumers/contracts/topic_stats_spec.rb index 82ba6bcf..7a92c692 100644 --- a/spec/lib/karafka/web/processing/consumers/contracts/topic_stats_spec.rb +++ b/spec/lib/karafka/web/processing/consumers/contracts/topic_stats_spec.rb @@ -7,7 +7,8 @@ { lag_stored: 10, lag: 5, - pace: 3 + pace: 3, + ls_offset_fd: 2 } end @@ -52,4 +53,22 @@ it { expect(contract.call(topic_stats)).not_to be_success } end + + context 'when ls_offset_fd is not a number' do + before { topic_stats[:ls_offset_fd] = 'test' } + + it { expect(contract.call(topic_stats)).not_to be_success } + end + + context 'when ls_offset_fd is less than 0' do + before { topic_stats[:ls_offset_fd] = -2 } + + it { expect(contract.call(topic_stats)).not_to be_success } + end + + context 'when ls_offset_fd is missing' do + before { topic_stats.delete(:ls_offset_fd) } + + it { expect(contract.call(topic_stats)).not_to be_success } + end end diff --git a/spec/lib/karafka/web/tracking/consumers/contracts/consumer_group_spec.rb b/spec/lib/karafka/web/tracking/consumers/contracts/consumer_group_spec.rb index 722f11cc..ad59e0ef 100644 --- a/spec/lib/karafka/web/tracking/consumers/contracts/consumer_group_spec.rb +++ b/spec/lib/karafka/web/tracking/consumers/contracts/consumer_group_spec.rb @@ -25,11 +25,14 @@ lag_stored: 0, lag_stored_d: 0, committed_offset: 18, + committed_offset_fd: 0, stored_offset: 18, + stored_offset_fd: 0, fetch_state: 'active', id: 0, poll_state: 'active', hi_offset: 10, + hi_offset_fd: 10, lag_d: 10, lag: 10, lo_offset: 0, diff --git a/spec/lib/karafka/web/tracking/consumers/contracts/partition_spec.rb b/spec/lib/karafka/web/tracking/consumers/contracts/partition_spec.rb index c97569cc..ace9d135 100644 --- a/spec/lib/karafka/web/tracking/consumers/contracts/partition_spec.rb +++ b/spec/lib/karafka/web/tracking/consumers/contracts/partition_spec.rb @@ -11,10 +11,13 @@ lag: 2, lag_d: -1, committed_offset: 0, + committed_offset_fd: 0, stored_offset: 0, + stored_offset_fd: 0, fetch_state: 'active', poll_state: 'active', hi_offset: 1, + hi_offset_fd: 0, lo_offset: 0, eof_offset: 0, ls_offset: 0, @@ -63,8 +66,11 @@ lag lag_d committed_offset + committed_offset_fd stored_offset + stored_offset_fd hi_offset + hi_offset_fd lo_offset eof_offset ls_offset @@ -84,9 +90,16 @@ end end - context 'when ls_offset_fd is less than 0' do - before { config[:ls_offset_fd] = -1 } + %i[ + committed_offset_fd + stored_offset_fd + ls_offset_fd + hi_offset_fd + ].each do |fd| + context "when #{fd} is less than 0" do + before { config[fd] = -1 } - it { expect(contract.call(config)).not_to be_success } + it { expect(contract.call(config)).not_to be_success } + end end end diff --git a/spec/lib/karafka/web/tracking/consumers/contracts/report_spec.rb b/spec/lib/karafka/web/tracking/consumers/contracts/report_spec.rb index 05ac2826..b899aa74 100644 --- a/spec/lib/karafka/web/tracking/consumers/contracts/report_spec.rb +++ b/spec/lib/karafka/web/tracking/consumers/contracts/report_spec.rb @@ -73,11 +73,14 @@ lag: 0, lag_d: 0, committed_offset: 18, + committed_offset_fd: 0, stored_offset: 18, + stored_offset_fd: 0, fetch_state: 'active', id: 0, poll_state: 'active', hi_offset: 1, + hi_offset_fd: 1, lo_offset: 0, eof_offset: 0, ls_offset: 0, diff --git a/spec/lib/karafka/web/tracking/consumers/contracts/subscription_group_spec.rb b/spec/lib/karafka/web/tracking/consumers/contracts/subscription_group_spec.rb index 6078ff65..dac735ef 100644 --- a/spec/lib/karafka/web/tracking/consumers/contracts/subscription_group_spec.rb +++ b/spec/lib/karafka/web/tracking/consumers/contracts/subscription_group_spec.rb @@ -17,8 +17,11 @@ lag: 0, lag_d: 0, committed_offset: 100, + committed_offset_fd: 0, stored_offset: 95, + stored_offset_fd: 0, hi_offset: 2, + hi_offset_fd: 2, lo_offset: 0, eof_offset: 0, ls_offset: 0, diff --git a/spec/lib/karafka/web/tracking/consumers/contracts/topic_spec.rb b/spec/lib/karafka/web/tracking/consumers/contracts/topic_spec.rb index 899c2065..e62fc120 100644 --- a/spec/lib/karafka/web/tracking/consumers/contracts/topic_spec.rb +++ b/spec/lib/karafka/web/tracking/consumers/contracts/topic_spec.rb @@ -14,10 +14,13 @@ lag: 2, lag_d: 1, committed_offset: 100, + committed_offset_fd: 0, stored_offset: 95, + stored_offset_fd: 0, fetch_state: 'active', poll_state: 'active', hi_offset: 1, + hi_offset_fd: 1, lo_offset: 0, eof_offset: 0, ls_offset: 0, diff --git a/spec/lib/karafka/web/ui/models/health_spec.rb b/spec/lib/karafka/web/ui/models/health_spec.rb index c5f7883e..dc2622af 100644 --- a/spec/lib/karafka/web/ui/models/health_spec.rb +++ b/spec/lib/karafka/web/ui/models/health_spec.rb @@ -89,9 +89,10 @@ keys = %i[ lag lag_d lag_stored lag_stored_d committed_offset stored_offset fetch_state hi_offset id - poll_state process - ] - expect(sg[:topics][:default][:partitions]['0'.to_sym].keys).to eq(keys) + poll_state process hi_offset_fd stored_offset_fd lo_offset ls_offset ls_offset_fd + eof_offset committed_offset_fd + ].sort + expect(sg[:topics][:default][:partitions]['0'.to_sym].keys.sort).to eq(keys) end end end diff --git a/spec/lib/karafka/web/ui/models/partition_spec.rb b/spec/lib/karafka/web/ui/models/partition_spec.rb index cff3db61..8d9ef896 100644 --- a/spec/lib/karafka/web/ui/models/partition_spec.rb +++ b/spec/lib/karafka/web/ui/models/partition_spec.rb @@ -3,17 +3,51 @@ RSpec.describe_current do subject(:partition) { described_class.new(data) } - let(:data) { {} } + let(:hi_offset) { 100 } + let(:ls_offset) { 100 } + let(:ls_offset_fd) { 100 } + let(:committed_offset) { 100 } - describe '#lag' do - context 'when not available' do - it { expect(partition.lag).to eq(-1) } + let(:data) do + { + hi_offset: hi_offset, + ls_offset: ls_offset, + ls_offset_fd: ls_offset_fd, + committed_offset: committed_offset + } + end + + describe '#lso_risk_state' do + let(:lso_risk_state) { partition.lso_risk_state } + + context 'when ls_offset is not behind hi_offset' do + it { expect(lso_risk_state).to eq(:active) } + end + + context 'when ls_offset behind hi_offset but within threshold' do + let(:hi_offset) { 100 } + let(:ls_offset) { 60 } + let(:ls_offset_fd) { 5 } + + it { expect(lso_risk_state).to eq(:active) } + end + + context 'when ls_offset behind hi_offset behind threshold but we are not there' do + let(:hi_offset) { 100 } + let(:ls_offset) { 60 } + let(:ls_offset_fd) { 10 * 60 * 1_000 } + let(:committed_offset) { ls_offset - 10 } + + it { expect(lso_risk_state).to eq(:at_risk) } end - context 'when available' do - let(:data) { { lag: 100 } } + context 'when ls_offset behind hi_offset behind threshold and we are there' do + let(:hi_offset) { 100 } + let(:ls_offset) { 60 } + let(:ls_offset_fd) { 10 * 60 * 1_000 } + let(:committed_offset) { ls_offset } - it { expect(partition.lag).to eq(100) } + it { expect(lso_risk_state).to eq(:stopped) } end end end diff --git a/spec/lib/karafka/web/ui/pro/controllers/explorer_spec.rb b/spec/lib/karafka/web/ui/pro/controllers/explorer_spec.rb index 4c8e1025..d13cf4eb 100644 --- a/spec/lib/karafka/web/ui/pro/controllers/explorer_spec.rb +++ b/spec/lib/karafka/web/ui/pro/controllers/explorer_spec.rb @@ -6,9 +6,13 @@ let(:topic) { create_topic(partitions: partitions) } let(:partitions) { 1 } let(:removed_or_compacted) { 'This message has either been removed or compacted' } + let(:internal_topic) { "__#{SecureRandom.uuid}" } describe '#index' do - before { get 'explorer' } + before do + create_topic(topic_name: internal_topic) + get 'explorer' + end it do expect(response).to be_ok @@ -19,6 +23,7 @@ expect(body).to include(topics_config.consumers.metrics) expect(body).to include(topics_config.consumers.reports) expect(body).to include(topics_config.errors) + expect(body).not_to include(internal_topic) end context 'when there are no topics' do @@ -35,6 +40,25 @@ expect(body).to include('There are no available topics in the current cluster') end end + + context 'when internal topics should be displayed' do + before do + allow(::Karafka::Web.config.ui).to receive(:show_internal_topics).and_return(true) + get 'explorer' + end + + it do + expect(response).to be_ok + expect(body).to include(breadcrumbs) + expect(body).not_to include(pagination) + expect(body).not_to include(support_message) + expect(body).to include(topics_config.consumers.states) + expect(body).to include(topics_config.consumers.metrics) + expect(body).to include(topics_config.consumers.reports) + expect(body).to include(topics_config.errors) + expect(body).to include(internal_topic) + end + end end describe '#topic' do diff --git a/spec/lib/karafka/web/ui/pro/controllers/health_spec.rb b/spec/lib/karafka/web/ui/pro/controllers/health_spec.rb index 7bf8f523..6857ef72 100644 --- a/spec/lib/karafka/web/ui/pro/controllers/health_spec.rb +++ b/spec/lib/karafka/web/ui/pro/controllers/health_spec.rb @@ -3,11 +3,24 @@ RSpec.describe_current do subject(:app) { Karafka::Web::Ui::Pro::App } - describe '#index' do + let(:partition_scope) do + %w[ + consumer_groups + example_app6_app + subscription_groups + c4ca4238a0b9_0 + topics + default + partitions + 0 + ] + end + + describe '#overview' do context 'when no report data' do before do topics_config.consumers.reports = create_topic - get 'health' + get 'health/overview' end it do @@ -20,7 +33,7 @@ end context 'when data is present' do - before { get 'health' } + before { get 'health/overview' } it do expect(response).to be_ok @@ -32,4 +45,96 @@ end end end + + describe '#offsets' do + let(:reports_topic) { topics_config.consumers.reports = create_topic } + + context 'when no report data' do + before do + reports_topic + get 'health/offsets' + end + + it do + expect(response).to be_ok + expect(body).to include(breadcrumbs) + expect(body).not_to include(pagination) + expect(body).not_to include(support_message) + expect(body).to include('No health data is available') + expect(body).not_to include('bg-warning') + expect(body).not_to include('bg-danger') + end + end + + context 'when data is present' do + before { get 'health/offsets' } + + it do + expect(response).to be_ok + expect(body).to include(breadcrumbs) + expect(body).not_to include(pagination) + expect(body).not_to include(support_message) + expect(body).to include('Not available until first offset') + expect(body).to include('327355') + expect(body).not_to include('bg-warning') + expect(body).not_to include('bg-danger') + end + end + + context 'when one of partitions is at risk due to LSO' do + before do + report = JSON.parse(fixtures_file('consumer_report.json')) + + partition_data = report.dig(*partition_scope) + + partition_data['committed_offset'] = 1_000 + partition_data['ls_offset'] = 3_000 + partition_data['ls_offset_fd'] = 1_000_000_000 + + produce(reports_topic, report.to_json) + + get 'health/offsets' + end + + it do + expect(response).to be_ok + expect(body).to include(breadcrumbs) + expect(body).not_to include(pagination) + expect(body).not_to include(support_message) + expect(body).to include('Not available until first offset') + expect(body).to include('bg-warning') + expect(body).to include('at_risk') + expect(body).not_to include('bg-danger') + expect(body).not_to include('stopped') + end + end + + context 'when one of partitions is stopped due to LSO' do + before do + report = JSON.parse(fixtures_file('consumer_report.json')) + + partition_data = report.dig(*partition_scope) + + partition_data['committed_offset'] = 3_000 + partition_data['ls_offset'] = 3_000 + partition_data['ls_offset_fd'] = 1_000_000_000 + + produce(reports_topic, report.to_json) + + get 'health/offsets' + end + + it do + expect(response).to be_ok + expect(body).to include(breadcrumbs) + expect(body).not_to include(pagination) + expect(body).not_to include(support_message) + expect(body).to include('Not available until first offset') + expect(body).to include('bg-danger') + expect(body).to include('stopped') + expect(body).not_to include('at_risk') + expect(body).not_to include('bg-warning') + end + end + end end