Skip to content

Commit

Permalink
Add new status check for ability to deserialize the internal karafka …
Browse files Browse the repository at this point in the history
…states (#133)

* more status reporting

* remarks
  • Loading branch information
mensfeld authored Sep 18, 2023
1 parent 32b2833 commit 40048a3
Show file tree
Hide file tree
Showing 8 changed files with 251 additions and 39 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- [Improvement] Use a five second cache for non-production environments to improve dev experience.
- [Improvement] Limit number of partitions listed on the Consumers view if they exceed 10 to improve readability and indicate, that there are more in OSS similar to Pro.
- [Improvement] Squash processes reports based on the key instead of payload skipping deserialization for duplicated reports.
- [Improvement] Make sure, that the Karafka topics present data can be deserialized and report on the status page if not.
- [Fix] Extensive data-poll on processes despite no processes being available.

## 0.7.1 (2023-09-15)
Expand Down
53 changes: 41 additions & 12 deletions lib/karafka/web/ui/models/status.rb
Original file line number Diff line number Diff line change
Expand Up @@ -134,46 +134,75 @@ def replication
)
end

# @return [Status::Step] Is the initial consumers state present in Kafka
# @return [Status::Step] Is the initial consumers state present in Kafka and that they
# can be deserialized
def initial_consumers_state
details = { issue_type: :presence }

if replication.success?
@current_state ||= Models::ConsumersState.current
status = @current_state ? :success : :failure
begin
@current_state ||= Models::ConsumersState.current
status = @current_state ? :success : :failure
rescue JSON::ParserError
status = :failure
details[:issue_type] = :deserialization
end
else
status = :halted
end

Step.new(
status,
nil
details
)
end

# @return [Status::Step] Is the initial consumers metrics record present in Kafka
# @return [Status::Step] Is the initial consumers metrics record present in Kafka and
# that they can be deserialized
def initial_consumers_metrics
details = { issue_type: :presence }

if initial_consumers_state.success?
@current_metrics ||= Models::ConsumersMetrics.current
status = @current_metrics ? :success : :failure
begin
@current_metrics ||= Models::ConsumersMetrics.current
status = @current_metrics ? :success : :failure
rescue JSON::ParserError
status = :failure
details[:issue_type] = :deserialization
end
else
status = :halted
end

Step.new(
status,
nil
details
)
end

# @return [Status::Step] Is there at least one active karafka server reporting to the
# Web UI
def live_reporting
# @return [Status::Step] could we read and operate on the current processes data (if any)
def consumers_reports
if initial_consumers_metrics.success?
@processes ||= Models::Processes.active(@current_state)
status = @processes.empty? ? :failure : :success
status = :success
else
status = :halted
end

Step.new(status, nil)
rescue JSON::ParserError
Step.new(:failure, nil)
end

# @return [Status::Step] Is there at least one active karafka server reporting to the
# Web UI
def live_reporting
status = if consumers_reports.success?
@processes.empty? ? :failure : :success
else
:halted
end

Step.new(
status,
nil
Expand Down
11 changes: 11 additions & 0 deletions lib/karafka/web/ui/views/status/failures/_consumers_reports.erb
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<p>
At least one consumer report appears to be corrupted.
</p>

<p>
This issue typically arises when invalid messages have been sent to the Karafka consumers' reports topic or when the topic has been populated with data from a newer Karafka Web UI without updating it.
</p>

<p class="mb-0">
To resolve this, please first attempt to upgrade the Karafka Web UI. If the problem persists, execute <code>bundle exec karafka-web reset</code> to reset the Web UI.
</p>
Original file line number Diff line number Diff line change
@@ -1,11 +1,25 @@
<p>
The initial consumers metrics for the Web UI were not created.
</p>
<% if details[:issue_type] == :deserialization %>
<p>
The initial state of the consumers metrics appears to be corrupted.
</p>

<p>
It means that the <code>bundle exec karafka-web migrate</code> was not executed or failed.
</p>
<p>
This issue typically arises when invalid messages have been sent to the Karafka consumers' metrics topic or when the topic has been populated with data from a newer Karafka Web UI without updating it.
</p>

<p class="mb-0">
To fix this, you need to ensure that the <code>bundle exec karafka-web migrate</code> runs successfully.
</p>
<p class="mb-0">
To resolve this, please first attempt to upgrade the Karafka Web UI. If the problem persists, execute <code>bundle exec karafka-web reset</code> to reset the Web UI.
</p>
<% else %>
<p>
The initial consumers metrics for the Web UI were not created.
</p>

<p>
It means that the <code>bundle exec karafka-web migrate</code> was not executed or failed.
</p>

<p class="mb-0">
To fix this, you need to ensure that the <code>bundle exec karafka-web migrate</code> runs successfully.
</p>
<% end %>
Original file line number Diff line number Diff line change
@@ -1,11 +1,25 @@
<p>
The initial consumers state for the Web UI was not created.
</p>
<% if details[:issue_type] == :deserialization %>
<p>
The initial state of the consumers appears to be corrupted.
</p>

<p>
It means that the <code>bundle exec karafka-web install</code> was not executed or failed.
</p>
<p>
This issue typically arises when invalid messages have been sent to the Karafka consumers' state topic or when the topic has been populated with data from a newer Karafka Web UI without updating it.
</p>

<p class="mb-0">
To fix this, you need to ensure that the <code>bundle exec karafka-web migrate</code> runs successfully.
</p>
<p class="mb-0">
To resolve this, please first attempt to upgrade the Karafka Web UI. If the problem persists, execute <code>bundle exec karafka-web reset</code> to reset the Web UI.
</p>
<% else %>
<p>
The initial consumers state for the Web UI was not created.
</p>

<p>
It means that the <code>bundle exec karafka-web migrate</code> was not executed or failed.
</p>

<p class="mb-0">
To fix this, you need to ensure that the <code>bundle exec karafka-web migrate</code> runs successfully.
</p>
<% end %>
19 changes: 17 additions & 2 deletions lib/karafka/web/ui/views/status/show.erb
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
partial(
"status/#{@status.initial_consumers_state.to_s}",
locals: {
title: 'Initial consumers state presence',
title: 'Initial consumers state',
description: partial(
'status/failures/initial_consumers_state',
locals: {
Expand All @@ -97,7 +97,7 @@
partial(
"status/#{@status.initial_consumers_metrics.to_s}",
locals: {
title: 'Initial consumers metrics presence',
title: 'Initial consumers metrics',
description: partial(
'status/failures/initial_consumers_metrics',
locals: {
Expand All @@ -108,6 +108,21 @@
)
%>

<%==
partial(
"status/#{@status.consumers_reports.to_s}",
locals: {
title: 'Consumers reports',
description: partial(
'status/failures/consumers_reports',
locals: {
details: @status.consumers_reports.details
}
)
}
)
%>

<%==
partial(
"status/#{@status.live_reporting.to_s}",
Expand Down
38 changes: 38 additions & 0 deletions spec/lib/karafka/web/ui/controllers/status_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
expect(response).to be_ok
expect(body).to include(support_message)
expect(body).to include(breadcrumbs)
expect(body).not_to include('The initial state of the consumers appears to')
end
end

Expand Down Expand Up @@ -56,5 +57,42 @@
expect(body).not_to include('Please ensure all those topics have a replication')
end
end

context 'when consumers states topic received corrupted data' do
let(:states_topic) { create_topic }

before do
topics_config.consumers.states = states_topic
# Corrupted on purpose
produce(states_topic, '{')

get 'status'
end

it do
expect(response).to be_ok
expect(body).to include(support_message)
expect(body).to include(breadcrumbs)
expect(body).to include('The initial state of the consumers appears to')
end
end

context 'when consumers metrics topic received corrupted data' do
let(:metrics_topic) { create_topic }

before do
topics_config.consumers.metrics = metrics_topic
produce(metrics_topic, '{')

get 'status'
end

it do
expect(response).to be_ok
expect(body).to include(support_message)
expect(body).to include(breadcrumbs)
expect(body).to include('The initial state of the consumers metrics appears to')
end
end
end
end
Loading

0 comments on commit 40048a3

Please sign in to comment.