Skip to content

Commit

Permalink
wi
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld committed Sep 22, 2023
1 parent 430d780 commit f318abc
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 47 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Karafka Web changelog

## 0.7.5 (Unreleased)
- [Improvement] Normalize per-process job tables and health tables structure (topic name on top).

## 0.7.4 (2023-09-19)
- [Improvement] Skip aggregations on older schemas during upgrades. This only skips process-reports (that are going to be rolled) on the 5s window in case of an upgrade that should not be a rolling one anyhow. This simplifies the operations and minimizes the risk on breaking upgrades.
- [Fix] Fix not working `ps` for macOS.
Expand Down
1 change: 1 addition & 0 deletions lib/karafka/web/tracking/consumers/contracts/partition.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class Partition < Web::Contracts::Base
required(:stored_offset_fd) { |val| val.is_a?(Integer) && val >= 0 }
required(:fetch_state) { |val| val.is_a?(String) && !val.empty? }
required(:poll_state) { |val| val.is_a?(String) && !val.empty? }
required(:poll_state_ch) { |val| val.is_a?(Integer) && val >= 0 }
required(:hi_offset) { |val| val.is_a?(Integer) }
required(:hi_offset_fd) { |val| val.is_a?(Integer) && val >= 0 }
required(:lo_offset) { |val| val.is_a?(Integer) }
Expand Down
11 changes: 7 additions & 4 deletions lib/karafka/web/tracking/consumers/listeners/pausing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@ class Pausing < Base
# Indicate pause
#
# @param event [Karafka::Core::Monitoring::Event]
def on_client_pause(event)
def on_consumer_consuming_pause(event)
track do |sampler|
sampler.pauses << pause_id(event)
sampler.pauses[pause_id(event)] = {
timeout: event[:timeout],
paused_till: monotonic_now + event[:timeout]
}
end
end

Expand All @@ -33,9 +36,9 @@ def on_client_resume(event)
def pause_id(event)
topic = event[:topic]
partition = event[:partition]
consumer_group_id = event[:subscription_group].consumer_group.id
subscription_group_id = event[:subscription_group].id

[consumer_group_id, topic, partition].join('-')
[subscription_group_id, topic, partition].join('-')
end
end
end
Expand Down
20 changes: 15 additions & 5 deletions lib/karafka/web/tracking/consumers/listeners/statistics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ def on_statistics_emitted(event)
}

topic_details[:partitions][pt_id] = metrics.merge(
id: pt_id,
id: pt_id
).merge(
# Pauses are stored on a consumer group since we do not process same topic
# twice in the multiple subscription groups
poll_state: poll_state(cg_id, topic_name, pt_id)
poll_details(sg_id, topic_name, pt_id)
)
end
end
Expand Down Expand Up @@ -136,10 +137,19 @@ def extract_partition_metrics(pt_stats)
# @param topic_name [String]
# @param pt_id [Integer]
# @return [String] poll state / is partition paused or not
def poll_state(cg_id, topic_name, pt_id)
pause_id = [cg_id, topic_name, pt_id].join('-')
def poll_details(sg_id, topic_name, pt_id)
pause_id = [sg_id, topic_name, pt_id].join('-')

sampler.pauses.include?(pause_id) ? 'paused' : 'active'
details = { poll_state: 'active', poll_state_ch: 0 }

pause_details = sampler.pauses[pause_id]

return details unless pause_details

{
poll_state: 'paused',
poll_state_ch: [(pause_details.fetch(:paused_till) - monotonic_now).round, 0].max
}
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/karafka/web/tracking/consumers/sampler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def initialize
@consumer_groups = {}
@errors = []
@started_at = float_now
@pauses = Set.new
@pauses = {}
@jobs = {}
@shell = MemoizedShell.new
@memory_total_usage = 0
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
<tr class="align-middle <%= lso_risk_state_bg(partition) %>">
<td>
<%= topic.name %>
</td>
<td>
<%= partition.id %>
</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,39 +73,45 @@
</div>
</div>
<% else %>
<div class="row mb-5">
<div class="col-sm-12">
<table class="processes bg-white table table-hover table-bordered table-striped mb-0 align-middle">
<thead>
<tr class="align-middle">
<th>Topic</th>
<th>Partition</th>
<th>Lag stored</th>
<th>Lag stored trend</th>
<th>Committed offset</th>
<th>Stored offset</th>
<th>Fetch state</th>
<th>Poll state</th>
<th>LSO state</th>
</tr>
</thead>
<tbody>
<% subscription_group.topics.each do |topic| %>
<% topic.partitions.each do |partition| %>
<%==
partial(
'consumers/consumer/partition',
locals: {
topic: topic,
partition: partition,
subscription_group: subscription_group
}
)
%>
<% end %>
<% end %>
</tbody>
</table>
<% subscription_group.topics.each do |topic| %>
<div class="row mb-5">
<div class="col-sm-12">
<table class="processes bg-white table table-hover table-bordered table-striped mb-0 align-middle">
<thead>
<tr class="align-middle">
<th colspan="10">
<h5 class="mb-0">
<%= topic.name %>
</h5>
</th>
<tr class="align-middle">
</tr>
<th>Partition</th>
<th>Lag stored</th>
<th>Lag stored trend</th>
<th>Committed offset</th>
<th>Stored offset</th>
<th>Fetch state</th>
<th>Poll state</th>
<th>LSO state</th>
</tr>
</thead>
<tbody>
<% topic.partitions.each do |partition| %>
<%==
partial(
'consumers/consumer/partition',
locals: {
topic: topic,
partition: partition,
subscription_group: subscription_group
}
)
%>
<% end %>
</tbody>
</table>
</div>
</div>
</div>
<% end %>
<% end %>

0 comments on commit f318abc

Please sign in to comment.