Skip to content

Commit

Permalink
LSO reporting + observability (#111)
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld authored Sep 12, 2023
1 parent ceab04a commit 194d7d8
Show file tree
Hide file tree
Showing 40 changed files with 697 additions and 150 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- **[Feature]** Provide ability to reproduce a given message to the same topic partition with all the details from the per message explorer view.
- **[Feature]** Provide "surrounding" navigation link that allows to view the given message in the context of its surrounding. Useful for debugging of failures where the batch context may be relevant.
- **[Feature]** Allow for time based lookups per topic partition.
- **[Feature]** Introduce Offsets Health inspection view for frozen LSO lookups with `lso_threshold` configuration option.
- [Improvement] Support pattern subscriptions details in the routing view both by displaying the pattern as well as expanded routing details.
- [Improvement] Collect total number of threads per process for the process details view.
- [Improvement] Normalize naming of metrics to better reflect what they do (in reports and in the Web UI).
Expand Down Expand Up @@ -53,6 +54,8 @@
- [Improvement] Use replication factor of two by default (if not overridden) for Web UI topics when there is more than one broker.
- [Improvement] Show a warning when replication factor of 1 is used for Web UI topics in production.
- [Improvement] Collect extra additional metrics useful for hanging transactions detection.
- [Improvement] Reorganize how the Health view looks.
- [Improvement] Hide all private Kafka topics by default in the explorer. Configurable with `show_internal_topics` config setting.
- [Fix] Return 402 status instead of 500 on Pro features that are not available in OSS.
- [Fix] Fix a case where errors would not be visible without Rails due to the `String#first` usage.
- [Fix] Fix a case where live-poll would be disabled but would still update data.
Expand Down
4 changes: 2 additions & 2 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ PATH
karafka-web (0.7.0)
erubi (~> 1.4)
karafka (>= 2.2.0, < 3.0.0)
karafka-core (>= 2.2.1, < 3.0.0)
karafka-core (>= 2.2.2, < 3.0.0)
roda (~> 3.68, >= 3.69)
tilt (~> 2.0)

Expand All @@ -31,7 +31,7 @@ GEM
thor (>= 0.20)
waterdrop (>= 2.6.6, < 3.0.0)
zeitwerk (~> 2.3)
karafka-core (2.2.1)
karafka-core (2.2.2)
concurrent-ruby (>= 1.1)
karafka-rdkafka (>= 0.13.1, < 0.14.0)
karafka-rdkafka (0.13.4)
Expand Down
2 changes: 1 addition & 1 deletion karafka-web.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Gem::Specification.new do |spec|

spec.add_dependency 'erubi', '~> 1.4'
spec.add_dependency 'karafka', '>= 2.2.0', '< 3.0.0'
spec.add_dependency 'karafka-core', '>= 2.2.1', '< 3.0.0'
spec.add_dependency 'karafka-core', '>= 2.2.2', '< 3.0.0'
spec.add_dependency 'roda', '~> 3.68', '>= 3.69'
spec.add_dependency 'tilt', '~> 2.0'

Expand Down
9 changes: 9 additions & 0 deletions lib/karafka/web/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,18 @@ class Config
# UI cache to improve performance of views that reuse states that are not often changed
setting :cache, default: Ui::Lib::TtlCache.new(60_000 * 5)

# Should we display internal topics of Kafka. The once starting with `__`
# By default we do not display them as they are not usable from regular users perspective
setting :show_internal_topics, default: false

# How many elements should we display on pages that support pagination
setting :per_page, default: 25

# Time beyond which the last stable offset freeze is considered a risk
# (unless same as high). This is used to show on the UI that there may be a hanging
# transaction that will cause given consumer group to halt processing and wait
setting :lso_threshold, default: 5 * 60 * 1_000

# Allows to manage visibility of payload, headers and message key in the UI
# In some cases you may want to limit what is being displayed due to the type of data you
# are dealing with
Expand Down
1 change: 1 addition & 0 deletions lib/karafka/web/contracts/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class Config < Web::Contracts::Base
required(:secret) { |val| val.is_a?(String) && val.length >= 64 }
end

required(:show_internal_topics) { |val| [true, false].include?(val) }
required(:cache) { |val| !val.nil? }
required(:per_page) { |val| val.is_a?(Integer) && val >= 1 && val <= 100 }
required(:visibility_filter) { |val| !val.nil? }
Expand Down
12 changes: 11 additions & 1 deletion lib/karafka/web/processing/consumers/aggregators/metrics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ def materialize_consumers_groups_current_state
.map { |p_details| p_details.fetch(:hi_offset) }
.reject(&:negative?)

# Last stable offsets freeze durations - we pick the max freeze to indicate
# the longest open transaction that potentially may be hanging
ls_offsets_fd = partitions_data
.map { |p_details| p_details.fetch(:ls_offset_fd) }
.reject(&:negative?)

# If there is no lag that would not be negative, it means we did not mark
# any messages as consumed on this topic in any partitions, hence we cannot
# compute lag easily
Expand All @@ -127,7 +133,11 @@ def materialize_consumers_groups_current_state
cgs[group_name][topic_name] = {
lag_stored: lags_stored.sum,
lag: lags.sum,
pace: offsets_hi.sum
pace: offsets_hi.sum,
# Take max last stable offset duration without any change. This can
# indicate a hanging transaction, because the offset will not move forward
# and will stay with a growing freeze duration when stuck
ls_offset_fd: ls_offsets_fd.max
}
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class TopicStats < Web::Contracts::Base
required(:lag_stored) { |val| val.is_a?(Integer) }
required(:lag) { |val| val.is_a?(Integer) }
required(:pace) { |val| val.is_a?(Integer) }
required(:ls_offset_fd) { |val| val.is_a?(Integer) && val >= 0 }
end
end
end
Expand Down
3 changes: 3 additions & 0 deletions lib/karafka/web/tracking/consumers/contracts/partition.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@ class Partition < Web::Contracts::Base
required(:lag_stored) { |val| val.is_a?(Integer) }
required(:lag_stored_d) { |val| val.is_a?(Integer) }
required(:committed_offset) { |val| val.is_a?(Integer) }
required(:committed_offset_fd) { |val| val.is_a?(Integer) && val >= 0 }
required(:stored_offset) { |val| val.is_a?(Integer) }
required(:stored_offset_fd) { |val| val.is_a?(Integer) && val >= 0 }
required(:fetch_state) { |val| val.is_a?(String) && !val.empty? }
required(:poll_state) { |val| val.is_a?(String) && !val.empty? }
required(:hi_offset) { |val| val.is_a?(Integer) }
required(:hi_offset_fd) { |val| val.is_a?(Integer) && val >= 0 }
required(:lo_offset) { |val| val.is_a?(Integer) }
required(:eof_offset) { |val| val.is_a?(Integer) }
required(:ls_offset) { |val| val.is_a?(Integer) }
Expand Down
5 changes: 5 additions & 0 deletions lib/karafka/web/tracking/consumers/listeners/statistics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,14 @@ def extract_partition_metrics(pt_stats)
'consumer_lag_stored',
'consumer_lag_stored_d',
'committed_offset',
# Can be useful to track the frequency of flushes when there is progress
'committed_offset_fd',
'stored_offset',
# Can be useful to track the frequency of flushes when there is progress
'stored_offset_fd',
'fetch_state',
'hi_offset',
'hi_offset_fd',
'lo_offset',
'eof_offset',
'ls_offset',
Expand Down
11 changes: 7 additions & 4 deletions lib/karafka/web/ui/controllers/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,13 @@ def index
# @return [Array<Hash>] array with topics to be displayed sorted in an alphabetical
# order
def displayable_topics(cluster_info)
cluster_info
.topics
.reject { |topic| topic[:topic_name] == '__consumer_offsets' }
.sort_by { |topic| topic[:topic_name] }
all = cluster_info
.topics
.sort_by { |topic| topic[:topic_name] }

return all if ::Karafka::Web.config.ui.show_internal_topics

all.reject { |topic| topic[:topic_name].start_with?('__') }
end
end
end
Expand Down
16 changes: 16 additions & 0 deletions lib/karafka/web/ui/helpers/application_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,22 @@ def offset_with_label(topic_name, partition_id, offset, explore: false)
end
end

# @param details [::Karafka::Web::Ui::Models::Partition] partition information with
# lso risk state info
# @return [String] background classes for row marking
def lso_risk_state_bg(details)
case details.lso_risk_state
when :active
''
when :at_risk
'bg-warning bg-opacity-25'
when :stopped
'bg-danger bg-opacity-25'
else
raise ::Karafka::Errors::UnsupportedCaseError
end
end

# Returns the view title html code
#
# @param title [String] page title
Expand Down
24 changes: 18 additions & 6 deletions lib/karafka/web/ui/lib/hash_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,17 @@ module Lib
# It is mostly used for flat hashes.
#
# It is in a way similar to openstruct but has abilities to dive deep into objects
#
# It is not super fast but it is enough for the UI and how deep structures we have.
class HashProxy
extend Forwardable

def_delegators :@hash, :[], :[]=, :key?, :each, :find

# @param hash [Hash] hash we want to convert to a proxy
def initialize(hash)
@hash = hash
end

# @param key [Object] hash key
# @return [Object] key content or nil if missing
def [](key)
@hash[key]
@visited = []
end

# @return [Original hash]
Expand All @@ -35,7 +36,12 @@ def to_h
def method_missing(method_name, *args, &block)
return super unless args.empty? && block.nil?

@visited.clear

result = deep_find(@hash, method_name.to_sym)

@visited.clear

result.nil? ? super : result
end

Expand All @@ -51,6 +57,12 @@ def respond_to_missing?(method_name, include_private = false)
# @param obj [Object] local scope of iterating
# @param key [Symbol, String] key we are looking for
def deep_find(obj, key)
# Prevent circular dependency lookups by making sure we do not check the same object
# multiple times
return nil if @visited.include?(obj)

@visited << obj

if obj.respond_to?(:key?) && obj.key?(key)
obj[key]
elsif obj.respond_to?(:each)
Expand Down
2 changes: 1 addition & 1 deletion lib/karafka/web/ui/models/health.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def fetch_topics_data(state, stats)

stats[cg_name] ||= { topics: {} }
stats[cg_name][:topics][t_name] ||= {}
stats[cg_name][:topics][t_name][pt_id] = partition.to_h
stats[cg_name][:topics][t_name][pt_id] = partition
stats[cg_name][:topics][t_name][pt_id][:process] = process
end
end
Expand Down
34 changes: 34 additions & 0 deletions lib/karafka/web/ui/models/metrics/charts/topics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,40 @@ def topics_pace
topics.each_value(&:compact!)
topics.to_json
end

# @return [String] JSON with per-topic, highest LSO freeze duration. Useful for
# debugging of issues arising from hanging transactions
def max_lso_time
topics = Hash.new { |h, k| h[k] = Hash.new { |h2, k2| h2[k2] = [] } }

@data.to_h.each do |topic, metrics|
topic_without_cg = topic.split('[').first

metrics.each do |current|
ls_offset = current.last[:ls_offset] || 0
ls_offset_fd = current.last[:ls_offset_fd] || 0
hi_offset = current.last[:hi_offset] || 0

# We convert this to seconds from milliseconds due to our Web UI precision
# Reporting is in ms for consistency
normalized_fd = (ls_offset_fd / 1_000).round
# In case ls_offset and hi_offset are the same, it means we're reached eof
# and we just don't have more data. In cases like this, LSO freeze duration
# will grow because LSO will remain unchanged, but it does not mean it is
# frozen. It means there is just no more data in the topic partition
# This means we need to nullify this case, otherwise it would report, that
# lso is hanging.
normalized_fd = 0 if ls_offset == hi_offset

topics[topic_without_cg][current.first] << normalized_fd
end
end

topics.each_value(&:compact!)
topics.each_value { |metrics| metrics.transform_values!(&:max) }
topics.transform_values! { |values| values.to_a.sort_by!(&:first) }
topics.to_json
end
end
end
end
Expand Down
31 changes: 26 additions & 5 deletions lib/karafka/web/ui/models/partition.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,32 @@ module Ui
module Models
# Single topic partition data representation model
class Partition < Lib::HashProxy
# @return [Integer] lag
# @note We check the presence because prior to schema version 1.2.0, this metrics was
# not reported from the processes
def lag
to_h.fetch(:lag, -1)
# @return [Symbol] one of three states in which LSO can be in the correlation to given
# partition in the context of a consumer group.
#
# @note States descriptions:
# - `:active` all good. No hanging transactions, processing is ok
# - `:at_risk` - there may be hanging transactions but they do not affect processing
# before being stuck. This means, that the transaction still may be finished
# without affecting the processing, hence not having any impact.
# - `:stopped` - we have reached a hanging LSO and we cannot move forward despite more
# data being available. Unless the hanging transaction is killed or it finishes,
# we will not move forward.
def lso_risk_state
# If last stable is falling behind the high watermark
if ls_offset < hi_offset
# But it is changing and moving fast enough, it does not mean it is stuck
return :active if ls_offset_fd < ::Karafka::Web.config.ui.lso_threshold

# If it is stuck but we still have work to do, this is not a tragic situation because
# maybe it will unstuck before we reach it
return :at_risk if (committed_offset || 0) < ls_offset

# If it is not changing and falling behind high, it is stuck
:stopped
else
:active
end
end
end
end
Expand Down
15 changes: 13 additions & 2 deletions lib/karafka/web/ui/pro/app.rb
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,20 @@ class App < Ui::Base
end
end

r.get 'health' do
r.on 'health' do
controller = Controllers::Health.new(params)
controller.index

r.get 'offsets' do
controller.offsets
end

r.get 'overview' do
controller.overview
end

r.get do
r.redirect root_path('health/overview')
end
end

r.get 'cluster' do
Expand Down
5 changes: 4 additions & 1 deletion lib/karafka/web/ui/pro/controllers/explorer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@ class Explorer < Ui::Controllers::Base
def index
@topics = Models::ClusterInfo
.topics
.reject { |topic| topic[:topic_name] == '__consumer_offsets' }
.sort_by { |topic| topic[:topic_name] }

unless ::Karafka::Web.config.ui.show_internal_topics
@topics.reject! { |topic| topic[:topic_name].start_with?('__') }
end

respond
end

Expand Down
10 changes: 9 additions & 1 deletion lib/karafka/web/ui/pro/controllers/health.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,20 @@ module Controllers
# Health state controller
class Health < Ui::Controllers::Base
# Displays the current system state
def index
def overview
current_state = Models::ConsumersState.current!
@stats = Models::Health.current(current_state)

respond
end

# Displays details about offsets and their progression/statuses
def offsets
# Same data as overview but presented differently
overview

respond
end
end
end
end
Expand Down
5 changes: 5 additions & 0 deletions lib/karafka/web/ui/pro/views/dashboard/index.erb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
<%== partial 'shared/tab_nav', locals: { title: 'Batches', id: 'batches' } %>
<%== partial 'shared/tab_nav', locals: { title: 'Lags stored', id: 'lags-stored' } %>
<%== partial 'shared/tab_nav', locals: { title: 'Topics pace', id: 'topics-pace' } %>
<%== partial 'shared/tab_nav', locals: { title: 'Max LSO time', id: 'max-lso-time' } %>
</ul>

<div class="tab-content">
Expand All @@ -35,6 +36,10 @@
<div class="tab-pane" id="topics-pace" role="tabpanel">
<%== partial 'shared/chart', locals: { data: @topics_charts.topics_pace, id: 'topics-pace' } %>
</div>

<div class="tab-pane" id="max-lso-time" role="tabpanel">
<%== partial 'shared/chart', locals: { data: @topics_charts.max_lso_time, id: 'max-lso-time' } %>
</div>
</div>
</div>
</div>
Expand Down
Loading

0 comments on commit 194d7d8

Please sign in to comment.