diff --git a/lib/karafka/web/ui/models/partition.rb b/lib/karafka/web/ui/models/partition.rb index 6c3a6df7..1c8078a5 100644 --- a/lib/karafka/web/ui/models/partition.rb +++ b/lib/karafka/web/ui/models/partition.rb @@ -21,7 +21,7 @@ def lso_risk_state # If last stable is falling behind the high watermark if ls_offset < hi_offset # But it is changing and moving fast enough, it does not mean it is stuck - return :active unless ls_offset_fd >= ::Karafka::Web.config.ui.lso_threshold + return :active if ls_offset_fd < ::Karafka::Web.config.ui.lso_threshold # If it is stuck but we still have work to do, this is not a tragic situation because # maybe it will unstuck before we reach it diff --git a/spec/lib/karafka/web/ui/models/partition_spec.rb b/spec/lib/karafka/web/ui/models/partition_spec.rb index fdd25338..8d9ef896 100644 --- a/spec/lib/karafka/web/ui/models/partition_spec.rb +++ b/spec/lib/karafka/web/ui/models/partition_spec.rb @@ -3,9 +3,51 @@ RSpec.describe_current do subject(:partition) { described_class.new(data) } - let(:data) { {} } + let(:hi_offset) { 100 } + let(:ls_offset) { 100 } + let(:ls_offset_fd) { 100 } + let(:committed_offset) { 100 } + + let(:data) do + { + hi_offset: hi_offset, + ls_offset: ls_offset, + ls_offset_fd: ls_offset_fd, + committed_offset: committed_offset + } + end describe '#lso_risk_state' do - pending + let(:lso_risk_state) { partition.lso_risk_state } + + context 'when ls_offset is not behind hi_offset' do + it { expect(lso_risk_state).to eq(:active) } + end + + context 'when ls_offset behind hi_offset but within threshold' do + let(:hi_offset) { 100 } + let(:ls_offset) { 60 } + let(:ls_offset_fd) { 5 } + + it { expect(lso_risk_state).to eq(:active) } + end + + context 'when ls_offset behind hi_offset behind threshold but we are not there' do + let(:hi_offset) { 100 } + let(:ls_offset) { 60 } + let(:ls_offset_fd) { 10 * 60 * 1_000 } + let(:committed_offset) { ls_offset - 10 } + + it { expect(lso_risk_state).to eq(:at_risk) } + end + + context 'when ls_offset behind hi_offset behind threshold and we are there' do + let(:hi_offset) { 100 } + let(:ls_offset) { 60 } + let(:ls_offset_fd) { 10 * 60 * 1_000 } + let(:committed_offset) { ls_offset } + + it { expect(lso_risk_state).to eq(:stopped) } + end end end diff --git a/spec/lib/karafka/web/ui/pro/controllers/health_spec.rb b/spec/lib/karafka/web/ui/pro/controllers/health_spec.rb index ed71f791..6857ef72 100644 --- a/spec/lib/karafka/web/ui/pro/controllers/health_spec.rb +++ b/spec/lib/karafka/web/ui/pro/controllers/health_spec.rb @@ -3,7 +3,20 @@ RSpec.describe_current do subject(:app) { Karafka::Web::Ui::Pro::App } - describe '#index' do + let(:partition_scope) do + %w[ + consumer_groups + example_app6_app + subscription_groups + c4ca4238a0b9_0 + topics + default + partitions + 0 + ] + end + + describe '#overview' do context 'when no report data' do before do topics_config.consumers.reports = create_topic @@ -32,4 +45,96 @@ end end end + + describe '#offsets' do + let(:reports_topic) { topics_config.consumers.reports = create_topic } + + context 'when no report data' do + before do + reports_topic + get 'health/offsets' + end + + it do + expect(response).to be_ok + expect(body).to include(breadcrumbs) + expect(body).not_to include(pagination) + expect(body).not_to include(support_message) + expect(body).to include('No health data is available') + expect(body).not_to include('bg-warning') + expect(body).not_to include('bg-danger') + end + end + + context 'when data is present' do + before { get 'health/offsets' } + + it do + expect(response).to be_ok + expect(body).to include(breadcrumbs) + expect(body).not_to include(pagination) + expect(body).not_to include(support_message) + expect(body).to include('Not available until first offset') + expect(body).to include('327355') + expect(body).not_to include('bg-warning') + expect(body).not_to include('bg-danger') + end + end + + context 'when one of partitions is at risk due to LSO' do + before do + report = JSON.parse(fixtures_file('consumer_report.json')) + + partition_data = report.dig(*partition_scope) + + partition_data['committed_offset'] = 1_000 + partition_data['ls_offset'] = 3_000 + partition_data['ls_offset_fd'] = 1_000_000_000 + + produce(reports_topic, report.to_json) + + get 'health/offsets' + end + + it do + expect(response).to be_ok + expect(body).to include(breadcrumbs) + expect(body).not_to include(pagination) + expect(body).not_to include(support_message) + expect(body).to include('Not available until first offset') + expect(body).to include('bg-warning') + expect(body).to include('at_risk') + expect(body).not_to include('bg-danger') + expect(body).not_to include('stopped') + end + end + + context 'when one of partitions is stopped due to LSO' do + before do + report = JSON.parse(fixtures_file('consumer_report.json')) + + partition_data = report.dig(*partition_scope) + + partition_data['committed_offset'] = 3_000 + partition_data['ls_offset'] = 3_000 + partition_data['ls_offset_fd'] = 1_000_000_000 + + produce(reports_topic, report.to_json) + + get 'health/offsets' + end + + it do + expect(response).to be_ok + expect(body).to include(breadcrumbs) + expect(body).not_to include(pagination) + expect(body).not_to include(support_message) + expect(body).to include('Not available until first offset') + expect(body).to include('bg-danger') + expect(body).to include('stopped') + expect(body).not_to include('at_risk') + expect(body).not_to include('bg-warning') + end + end + end end