diff --git a/CHANGELOG.md b/CHANGELOG.md
index 5c088f44..e0718f1a 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -5,6 +5,7 @@
- [Improvement] Use a five second cache for non-production environments to improve dev experience.
- [Improvement] Limit number of partitions listed on the Consumers view if they exceed 10 to improve readability and indicate, that there are more in OSS similar to Pro.
- [Improvement] Squash processes reports based on the key instead of payload skipping deserialization for duplicated reports.
+- [Improvement] Make sure, that the Karafka topics present data can be deserialized and report on the status page if not.
- [Fix] Extensive data-poll on processes despite no processes being available.
## 0.7.1 (2023-09-15)
diff --git a/lib/karafka/web/ui/models/status.rb b/lib/karafka/web/ui/models/status.rb
index d4c208d7..2f12ccbe 100644
--- a/lib/karafka/web/ui/models/status.rb
+++ b/lib/karafka/web/ui/models/status.rb
@@ -134,46 +134,75 @@ def replication
)
end
- # @return [Status::Step] Is the initial consumers state present in Kafka
+ # @return [Status::Step] Is the initial consumers state present in Kafka and that they
+ # can be deserialized
def initial_consumers_state
+ details = { issue_type: :presence }
+
if replication.success?
- @current_state ||= Models::ConsumersState.current
- status = @current_state ? :success : :failure
+ begin
+ @current_state ||= Models::ConsumersState.current
+ status = @current_state ? :success : :failure
+ rescue JSON::ParserError
+ status = :failure
+ details[:issue_type] = :deserialization
+ end
else
status = :halted
end
Step.new(
status,
- nil
+ details
)
end
- # @return [Status::Step] Is the initial consumers metrics record present in Kafka
+ # @return [Status::Step] Is the initial consumers metrics record present in Kafka and
+ # that they can be deserialized
def initial_consumers_metrics
+ details = { issue_type: :presence }
+
if initial_consumers_state.success?
- @current_metrics ||= Models::ConsumersMetrics.current
- status = @current_metrics ? :success : :failure
+ begin
+ @current_metrics ||= Models::ConsumersMetrics.current
+ status = @current_metrics ? :success : :failure
+ rescue JSON::ParserError
+ status = :failure
+ details[:issue_type] = :deserialization
+ end
else
status = :halted
end
Step.new(
status,
- nil
+ details
)
end
- # @return [Status::Step] Is there at least one active karafka server reporting to the
- # Web UI
- def live_reporting
+ # @return [Status::Step] could we read and operate on the current processes data (if any)
+ def consumers_reports
if initial_consumers_metrics.success?
@processes ||= Models::Processes.active(@current_state)
- status = @processes.empty? ? :failure : :success
+ status = :success
else
status = :halted
end
+ Step.new(status, nil)
+ rescue JSON::ParserError
+ Step.new(:failure, nil)
+ end
+
+ # @return [Status::Step] Is there at least one active karafka server reporting to the
+ # Web UI
+ def live_reporting
+ status = if consumers_reports.success?
+ @processes.empty? ? :failure : :success
+ else
+ :halted
+ end
+
Step.new(
status,
nil
diff --git a/lib/karafka/web/ui/views/status/failures/_consumers_reports.erb b/lib/karafka/web/ui/views/status/failures/_consumers_reports.erb
new file mode 100644
index 00000000..e6ec06f2
--- /dev/null
+++ b/lib/karafka/web/ui/views/status/failures/_consumers_reports.erb
@@ -0,0 +1,11 @@
+
+ At least one consumer report appears to be corrupted.
+
+
+
+ This issue typically arises when invalid messages have been sent to the Karafka consumers' reports topic or when the topic has been populated with data from a newer Karafka Web UI without updating it.
+
+
+
+ To resolve this, please first attempt to upgrade the Karafka Web UI. If the problem persists, execute bundle exec karafka-web reset
to reset the Web UI.
+
diff --git a/lib/karafka/web/ui/views/status/failures/_initial_consumers_metrics.erb b/lib/karafka/web/ui/views/status/failures/_initial_consumers_metrics.erb
index f8ed400e..e47f921f 100644
--- a/lib/karafka/web/ui/views/status/failures/_initial_consumers_metrics.erb
+++ b/lib/karafka/web/ui/views/status/failures/_initial_consumers_metrics.erb
@@ -1,11 +1,25 @@
-
- The initial consumers metrics for the Web UI were not created.
-
+<% if details[:issue_type] == :deserialization %>
+
+ The initial state of the consumers metrics appears to be corrupted.
+
-
- It means that the bundle exec karafka-web migrate
was not executed or failed.
-
+
+ This issue typically arises when invalid messages have been sent to the Karafka consumers' metrics topic or when the topic has been populated with data from a newer Karafka Web UI without updating it.
+
-
- To fix this, you need to ensure that the bundle exec karafka-web migrate
runs successfully.
-
+
+ To resolve this, please first attempt to upgrade the Karafka Web UI. If the problem persists, execute bundle exec karafka-web reset
to reset the Web UI.
+
+<% else %>
+
+ The initial consumers metrics for the Web UI were not created.
+
+
+
+ It means that the bundle exec karafka-web migrate
was not executed or failed.
+
+
+
+ To fix this, you need to ensure that the bundle exec karafka-web migrate
runs successfully.
+
+<% end %>
diff --git a/lib/karafka/web/ui/views/status/failures/_initial_consumers_state.erb b/lib/karafka/web/ui/views/status/failures/_initial_consumers_state.erb
index ab8902af..4d6b2f88 100644
--- a/lib/karafka/web/ui/views/status/failures/_initial_consumers_state.erb
+++ b/lib/karafka/web/ui/views/status/failures/_initial_consumers_state.erb
@@ -1,11 +1,25 @@
-
- The initial consumers state for the Web UI was not created.
-
+<% if details[:issue_type] == :deserialization %>
+
+ The initial state of the consumers appears to be corrupted.
+
-
- It means that the bundle exec karafka-web install
was not executed or failed.
-
+
+ This issue typically arises when invalid messages have been sent to the Karafka consumers' state topic or when the topic has been populated with data from a newer Karafka Web UI without updating it.
+
-
- To fix this, you need to ensure that the bundle exec karafka-web migrate
runs successfully.
-
+
+ To resolve this, please first attempt to upgrade the Karafka Web UI. If the problem persists, execute bundle exec karafka-web reset
to reset the Web UI.
+
+<% else %>
+
+ The initial consumers state for the Web UI was not created.
+
+
+
+ It means that the bundle exec karafka-web migrate
was not executed or failed.
+
+
+
+ To fix this, you need to ensure that the bundle exec karafka-web migrate
runs successfully.
+
+<% end %>
diff --git a/lib/karafka/web/ui/views/status/show.erb b/lib/karafka/web/ui/views/status/show.erb
index d9d47450..e784dae2 100644
--- a/lib/karafka/web/ui/views/status/show.erb
+++ b/lib/karafka/web/ui/views/status/show.erb
@@ -82,7 +82,7 @@
partial(
"status/#{@status.initial_consumers_state.to_s}",
locals: {
- title: 'Initial consumers state presence',
+ title: 'Initial consumers state',
description: partial(
'status/failures/initial_consumers_state',
locals: {
@@ -97,7 +97,7 @@
partial(
"status/#{@status.initial_consumers_metrics.to_s}",
locals: {
- title: 'Initial consumers metrics presence',
+ title: 'Initial consumers metrics',
description: partial(
'status/failures/initial_consumers_metrics',
locals: {
@@ -108,6 +108,21 @@
)
%>
+ <%==
+ partial(
+ "status/#{@status.consumers_reports.to_s}",
+ locals: {
+ title: 'Consumers reports',
+ description: partial(
+ 'status/failures/consumers_reports',
+ locals: {
+ details: @status.consumers_reports.details
+ }
+ )
+ }
+ )
+ %>
+
<%==
partial(
"status/#{@status.live_reporting.to_s}",
diff --git a/spec/lib/karafka/web/ui/controllers/status_spec.rb b/spec/lib/karafka/web/ui/controllers/status_spec.rb
index bbb8288d..4372dd69 100644
--- a/spec/lib/karafka/web/ui/controllers/status_spec.rb
+++ b/spec/lib/karafka/web/ui/controllers/status_spec.rb
@@ -11,6 +11,7 @@
expect(response).to be_ok
expect(body).to include(support_message)
expect(body).to include(breadcrumbs)
+ expect(body).not_to include('The initial state of the consumers appears to')
end
end
@@ -56,5 +57,42 @@
expect(body).not_to include('Please ensure all those topics have a replication')
end
end
+
+ context 'when consumers states topic received corrupted data' do
+ let(:states_topic) { create_topic }
+
+ before do
+ topics_config.consumers.states = states_topic
+ # Corrupted on purpose
+ produce(states_topic, '{')
+
+ get 'status'
+ end
+
+ it do
+ expect(response).to be_ok
+ expect(body).to include(support_message)
+ expect(body).to include(breadcrumbs)
+ expect(body).to include('The initial state of the consumers appears to')
+ end
+ end
+
+ context 'when consumers metrics topic received corrupted data' do
+ let(:metrics_topic) { create_topic }
+
+ before do
+ topics_config.consumers.metrics = metrics_topic
+ produce(metrics_topic, '{')
+
+ get 'status'
+ end
+
+ it do
+ expect(response).to be_ok
+ expect(body).to include(support_message)
+ expect(body).to include(breadcrumbs)
+ expect(body).to include('The initial state of the consumers metrics appears to')
+ end
+ end
end
end
diff --git a/spec/lib/karafka/web/ui/models/status_spec.rb b/spec/lib/karafka/web/ui/models/status_spec.rb
index 71735aaa..62c5ce64 100644
--- a/spec/lib/karafka/web/ui/models/status_spec.rb
+++ b/spec/lib/karafka/web/ui/models/status_spec.rb
@@ -227,7 +227,7 @@
it 'expect to halt' do
expect(result.success?).to eq(false)
expect(result.to_s).to eq('halted')
- expect(result.details).to eq(nil)
+ expect(result.details).to eq({ issue_type: :presence })
expect(result.partial_namespace).to eq('failures')
end
end
@@ -238,7 +238,7 @@
it 'expect to fail' do
expect(result.success?).to eq(false)
expect(result.to_s).to eq('failure')
- expect(result.details).to eq(nil)
+ expect(result.details).to eq({ issue_type: :presence })
expect(result.partial_namespace).to eq('failures')
end
end
@@ -252,11 +252,25 @@
it 'expect all to be ok' do
expect(result.success?).to eq(true)
expect(result.to_s).to eq('success')
- expect(result.details).to eq(nil)
+ expect(result.details).to eq({ issue_type: :presence })
expect(result.partial_namespace).to eq('successes')
end
end
+ context 'when state is present but corrupted' do
+ before do
+ all_topics
+ produce(states_topic, '{')
+ end
+
+ it 'expect all to be ok' do
+ expect(result.success?).to eq(false)
+ expect(result.to_s).to eq('failure')
+ expect(result.details).to eq({ issue_type: :deserialization })
+ expect(result.partial_namespace).to eq('failures')
+ end
+ end
+
context 'when state is present but replication factor is 1 in prod' do
before do
all_topics
@@ -268,7 +282,7 @@
it 'expect all to be ok because replication is a warning' do
expect(result.success?).to eq(true)
expect(result.to_s).to eq('success')
- expect(result.details).to eq(nil)
+ expect(result.details).to eq({ issue_type: :presence })
expect(result.partial_namespace).to eq('successes')
end
end
@@ -288,7 +302,7 @@
it 'expect to halt' do
expect(result.success?).to eq(false)
expect(result.to_s).to eq('halted')
- expect(result.details).to eq(nil)
+ expect(result.details).to eq({ issue_type: :presence })
expect(result.partial_namespace).to eq('failures')
end
end
@@ -302,7 +316,22 @@
it 'expect to fail' do
expect(result.success?).to eq(false)
expect(result.to_s).to eq('failure')
- expect(result.details).to eq(nil)
+ expect(result.details).to eq({ issue_type: :presence })
+ expect(result.partial_namespace).to eq('failures')
+ end
+ end
+
+ context 'when initial consumers metrics are present but corrupted' do
+ before do
+ all_topics
+ produce(states_topic, state)
+ produce(metrics_topic, '{')
+ end
+
+ it 'expect to fail' do
+ expect(result.success?).to eq(false)
+ expect(result.to_s).to eq('failure')
+ expect(result.details).to eq({ issue_type: :deserialization })
expect(result.partial_namespace).to eq('failures')
end
end
@@ -310,6 +339,50 @@
context 'when state and metrics are present' do
before { ready_topics }
+ it 'expect all to be ok' do
+ expect(result.success?).to eq(true)
+ expect(result.to_s).to eq('success')
+ expect(result.details).to eq({ issue_type: :presence })
+ expect(result.partial_namespace).to eq('successes')
+ end
+ end
+ end
+
+ describe '#consumers_reports' do
+ subject(:result) { status.consumers_reports }
+
+ context 'when there is no initial consumers metrics state' do
+ before do
+ all_topics
+ produce(states_topic, state)
+ end
+
+ it 'expect to halt' do
+ expect(result.success?).to eq(false)
+ expect(result.to_s).to eq('halted')
+ expect(result.details).to eq(nil)
+ expect(result.partial_namespace).to eq('failures')
+ end
+ end
+
+ context 'when at least one process is active' do
+ before { ready_topics }
+
+ it 'expect all to be ok' do
+ expect(result.success?).to eq(true)
+ expect(result.to_s).to eq('success')
+ expect(result.details).to eq(nil)
+ expect(result.partial_namespace).to eq('successes')
+ end
+ end
+
+ context 'when there are no processes' do
+ before do
+ all_topics
+ produce(states_topic, state)
+ produce(metrics_topic, metrics)
+ end
+
it 'expect all to be ok' do
expect(result.success?).to eq(true)
expect(result.to_s).to eq('success')
@@ -317,15 +390,32 @@
expect(result.partial_namespace).to eq('successes')
end
end
+
+ context 'when process data is corrupted' do
+ before do
+ all_topics
+ produce(states_topic, state)
+ produce(metrics_topic, metrics)
+ produce(reports_topic, '{')
+ end
+
+ it 'expect all to be ok' do
+ expect(result.success?).to eq(false)
+ expect(result.to_s).to eq('failure')
+ expect(result.details).to eq(nil)
+ expect(result.partial_namespace).to eq('failures')
+ end
+ end
end
describe '#live_reporting' do
subject(:result) { status.live_reporting }
- context 'when there is no initial consumers metrics state' do
+ context 'when initial metrics state is corrupted' do
before do
all_topics
produce(states_topic, state)
+ produce(metrics_topic, '{')
end
it 'expect to halt' do