Skip to content

Commit

Permalink
Fix incorrect topic state data aggregation (#160)
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld authored Sep 27, 2023
1 parent 2ef4c77 commit fd27f77
Show file tree
Hide file tree
Showing 31 changed files with 593 additions and 114 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## 0.7.5 (Unreleased)
- [Enhancement] Update order of topics creation for the setup of Web to support zero-downtime setup of Web in running Karafka projects.
- [Fix] Fix a case where charts aggregated data would not include all topics.
- [Fix] Make sure, that most recent per partition data for Health is never overwritten by an old state from a previous partition owner.
- [Fix] Cache assets for 1 year instead of 7 days.
- [Fix] Remove source maps pointing to non-existing locations.
- [Maintenance] Include license and copyrights notice for `timeago.js` that was missing in the JS min file.
Expand Down
102 changes: 56 additions & 46 deletions lib/karafka/web/processing/consumers/aggregators/metrics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -86,65 +86,75 @@ def add_consumers_groups_metrics

# Materializes the current state of consumers group data
#
# At the moment we report only topics lags but the format we are using supports
# extending this information in the future if it would be needed.
#
# @return [Hash] hash with nested consumers and their topics details structure
# @note We do **not** report on a per partition basis because it would significantly
# increase needed storage.
def materialize_consumers_groups_current_state
cgs = {}

@active_reports.each do |_, details|
details.fetch(:consumer_groups).each do |group_name, group_details|
group_details.fetch(:subscription_groups).each do |_sg_name, sg_details|
sg_details.fetch(:topics).each do |topic_name, topic_details|
partitions_data = topic_details.fetch(:partitions).values
iterate_partitions_data do |group_name, topic_name, partitions_data|
lags = partitions_data
.map { |p_details| p_details.fetch(:lag, -1) }
.reject(&:negative?)

lags_stored = partitions_data
.map { |p_details| p_details.fetch(:lag_stored, -1) }
.reject(&:negative?)

lags = partitions_data
.map { |p_details| p_details.fetch(:lag, -1) }
offsets_hi = partitions_data
.map { |p_details| p_details.fetch(:hi_offset, -1) }
.reject(&:negative?)

lags_stored = partitions_data
.map { |p_details| p_details.fetch(:lag_stored, -1) }
.reject(&:negative?)

offsets_hi = partitions_data
.map { |p_details| p_details.fetch(:hi_offset, -1) }
.reject(&:negative?)

# Last stable offsets freeze durations - we pick the max freeze to indicate
# the longest open transaction that potentially may be hanging
ls_offsets_fd = partitions_data
.map { |p_details| p_details.fetch(:ls_offset_fd, 0) }
.reject(&:negative?)

# If there is no lag that would not be negative, it means we did not mark
# any messages as consumed on this topic in any partitions, hence we cannot
# compute lag easily
# We do not want to initialize any data for this topic, when there is nothing
# useful we could present
#
# In theory lag stored must mean that lag must exist but just to be sure we
# check both here
next if lags.empty? || lags_stored.empty?

cgs[group_name] ||= {}
cgs[group_name][topic_name] = {
lag_stored: lags_stored.sum,
lag: lags.sum,
pace: offsets_hi.sum,
# Take max last stable offset duration without any change. This can
# indicate a hanging transaction, because the offset will not move forward
# and will stay with a growing freeze duration when stuck
ls_offset_fd: ls_offsets_fd.max
}
# Last stable offsets freeze durations - we pick the max freeze to indicate
# the longest open transaction that potentially may be hanging
ls_offsets_fd = partitions_data
.map { |p_details| p_details.fetch(:ls_offset_fd, 0) }
.reject(&:negative?)

cgs[group_name] ||= {}
cgs[group_name][topic_name] = {
lag_stored: lags_stored.sum,
lag: lags.sum,
pace: offsets_hi.sum,
# Take max last stable offset duration without any change. This can
# indicate a hanging transaction, because the offset will not move forward
# and will stay with a growing freeze duration when stuck
ls_offset_fd: ls_offsets_fd.max || 0
}
end

cgs
end

# Converts our reports data into an iterator per partition
# Compensates for a case where same partition data would be available for a short
# period of time in multiple processes reports due to rebalances.
def iterate_partitions_data
cgs_topics = Hash.new { |h, v| h[v] = Hash.new { |h2, v2| h2[v2] = {} } }

# We need to sort them in case we have same reports containing data about same
# topics partitions. Mostly during shutdowns and rebalances
@active_reports
.values
.sort_by { |report| report.fetch(:dispatched_at) }
.map { |details| details.fetch(:consumer_groups) }
.each do |consumer_groups|
consumer_groups.each do |group_name, group_details|
group_details.fetch(:subscription_groups).each_value do |sg_details|
sg_details.fetch(:topics).each do |topic_name, topic_details|
topic_details.fetch(:partitions).each do |partition_id, partition_data|
cgs_topics[group_name][topic_name][partition_id] = partition_data
end
end
end
end
end
end

cgs
cgs_topics.each do |group_name, topics_data|
topics_data.each do |topic_name, partitions_data|
yield(group_name, topic_name, partitions_data.values)
end
end
end
end
end
Expand Down
5 changes: 4 additions & 1 deletion lib/karafka/web/processing/time_series_tracker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,12 @@ def evict
# available
times << values.last unless values.empty?

# Keep the most recent state out of many that would come from the same time moment
# Squash in case there would be two events from the same time
times.reverse!
times.uniq!(&:first)
times.reverse!

# Squash in case there would be two events from the same time
times.sort_by!(&:first)

@historicals[range_name] = times.last(limit)
Expand Down
6 changes: 5 additions & 1 deletion lib/karafka/web/ui/models/health.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@ def fetch_rebalance_ages(state, stats)
#
# @param state [State]
def iterate_partitions(state)
processes = Processes.active(state)
# By default processes are sort by name and this is not what we want here
# We want to make sure that the newest data is processed the last, so we get
# the most accurate state in case of deployments and shutdowns, etc without the
# expired processes partitions data overwriting the newly created processes
processes = Processes.active(state).sort_by!(&:dispatched_at)

processes.each do |process|
process.consumer_groups.each do |consumer_group|
Expand Down
177 changes: 177 additions & 0 deletions spec/fixtures/multi_partition_reports/process_1.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
{
"schema_version": "1.2.3",
"type": "consumer",
"dispatched_at": 1695813057.892911,
"process": {
"started_at": 1695802487.537935,
"name": "shinra:1817527:7ad1ed4ce83a",
"status": "running",
"listeners": 2,
"workers": 2,
"memory_usage": 142036,
"memory_total_usage": 25765592,
"memory_size": 32763212,
"cpus": 8,
"threads": 25,
"cpu_usage": [
1.6,
1.57,
1.75
],
"tags": [
"#b2e822e"
]
},
"versions": {
"ruby": "ruby 3.2.2-53 e51014",
"karafka": "2.2.6",
"karafka_core": "2.2.2",
"karafka_web": "0.7.4",
"waterdrop": "2.6.7",
"rdkafka": "0.13.5",
"librdkafka": "2.2.0"
},
"stats": {
"busy": 0,
"enqueued": 0,
"utilization": 1.9147967048486074,
"total": {
"batches": 16,
"messages": 50,
"errors": 1,
"retries": 1,
"dead": 0
}
},
"consumer_groups": {
"example_app_app": {
"id": "example_app_app",
"subscription_groups": {
"c4ca4238a0b9_0": {
"id": "c4ca4238a0b9_0",
"state": {
"state": "up",
"join_state": "steady",
"stateage": 10565686,
"rebalance_age": 10563021,
"rebalance_cnt": 1,
"rebalance_reason": "Metadata for subscribed topic(s) has changed"
},
"topics": {
"default": {
"name": "default",
"partitions": {
"0": {
"lag": 3,
"lag_d": 3,
"lag_stored": 0,
"lag_stored_d": 0,
"committed_offset": 162517,
"committed_offset_fd": 0,
"stored_offset": 162520,
"stored_offset_fd": 0,
"fetch_state": "active",
"hi_offset": 162520,
"hi_offset_fd": 0,
"lo_offset": 0,
"eof_offset": 162520,
"ls_offset": 162520,
"ls_offset_d": 9,
"ls_offset_fd": 0,
"id": 0,
"poll_state": "active"
},
"1": {
"lag": 6,
"lag_d": 3,
"lag_stored": 0,
"lag_stored_d": 0,
"committed_offset": 162936,
"committed_offset_fd": 0,
"stored_offset": 162942,
"stored_offset_fd": 0,
"fetch_state": "active",
"hi_offset": 162942,
"hi_offset_fd": 0,
"lo_offset": 0,
"eof_offset": 162939,
"ls_offset": 162942,
"ls_offset_d": 18,
"ls_offset_fd": 0,
"id": 1,
"poll_state": "active"
},
"2": {
"lag": 0,
"lag_d": 0,
"lag_stored": 0,
"lag_stored_d": 0,
"committed_offset": 162310,
"committed_offset_fd": 0,
"stored_offset": 162310,
"stored_offset_fd": 0,
"fetch_state": "active",
"hi_offset": 162310,
"hi_offset_fd": 0,
"lo_offset": 0,
"eof_offset": 162310,
"ls_offset": 162310,
"ls_offset_d": 6,
"ls_offset_fd": 0,
"id": 2,
"poll_state": "active"
}
}
},
"visits": {
"name": "visits",
"partitions": {
"0": {
"lag": 3,
"lag_d": -4,
"lag_stored": 3,
"lag_stored_d": -4,
"committed_offset": 135642,
"committed_offset_fd": 0,
"stored_offset": 135642,
"stored_offset_fd": 0,
"fetch_state": "active",
"hi_offset": 135645,
"hi_offset_fd": 0,
"lo_offset": 0,
"eof_offset": -1001,
"ls_offset": 135645,
"ls_offset_d": 7,
"ls_offset_fd": 5000,
"id": 0,
"poll_state": "paused"
}
}
}
}
}
}
},
"example_app_karafka_web": {
"id": "example_app_karafka_web",
"subscription_groups": {
"c81e728d9d4c_1": {
"id": "c81e728d9d4c_1",
"state": {
"state": "up",
"join_state": "steady",
"stateage": 10565218,
"rebalance_age": 10562553,
"rebalance_cnt": 1,
"rebalance_reason": "Metadata for subscribed topic(s) has changed"
},
"topics": {
}
}
}
}
},
"jobs": [

]
}
Loading

0 comments on commit fd27f77

Please sign in to comment.