diff --git a/CHANGELOG.md b/CHANGELOG.md index 49fc0e0f..524771b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## 0.7.5 (Unreleased) - [Enhancement] Update order of topics creation for the setup of Web to support zero-downtime setup of Web in running Karafka projects. +- [Fix] Fix a case where charts aggregated data would not include all topics. +- [Fix] Make sure, that most recent per partition data for Health is never overwritten by an old state from a previous partition owner. - [Fix] Cache assets for 1 year instead of 7 days. - [Fix] Remove source maps pointing to non-existing locations. - [Maintenance] Include license and copyrights notice for `timeago.js` that was missing in the JS min file. diff --git a/lib/karafka/web/processing/consumers/aggregators/metrics.rb b/lib/karafka/web/processing/consumers/aggregators/metrics.rb index f8913203..c8796890 100644 --- a/lib/karafka/web/processing/consumers/aggregators/metrics.rb +++ b/lib/karafka/web/processing/consumers/aggregators/metrics.rb @@ -86,65 +86,75 @@ def add_consumers_groups_metrics # Materializes the current state of consumers group data # - # At the moment we report only topics lags but the format we are using supports - # extending this information in the future if it would be needed. - # # @return [Hash] hash with nested consumers and their topics details structure # @note We do **not** report on a per partition basis because it would significantly # increase needed storage. def materialize_consumers_groups_current_state cgs = {} - @active_reports.each do |_, details| - details.fetch(:consumer_groups).each do |group_name, group_details| - group_details.fetch(:subscription_groups).each do |_sg_name, sg_details| - sg_details.fetch(:topics).each do |topic_name, topic_details| - partitions_data = topic_details.fetch(:partitions).values + iterate_partitions_data do |group_name, topic_name, partitions_data| + lags = partitions_data + .map { |p_details| p_details.fetch(:lag, -1) } + .reject(&:negative?) + + lags_stored = partitions_data + .map { |p_details| p_details.fetch(:lag_stored, -1) } + .reject(&:negative?) - lags = partitions_data - .map { |p_details| p_details.fetch(:lag, -1) } + offsets_hi = partitions_data + .map { |p_details| p_details.fetch(:hi_offset, -1) } .reject(&:negative?) - lags_stored = partitions_data - .map { |p_details| p_details.fetch(:lag_stored, -1) } - .reject(&:negative?) - - offsets_hi = partitions_data - .map { |p_details| p_details.fetch(:hi_offset, -1) } - .reject(&:negative?) - - # Last stable offsets freeze durations - we pick the max freeze to indicate - # the longest open transaction that potentially may be hanging - ls_offsets_fd = partitions_data - .map { |p_details| p_details.fetch(:ls_offset_fd, 0) } - .reject(&:negative?) - - # If there is no lag that would not be negative, it means we did not mark - # any messages as consumed on this topic in any partitions, hence we cannot - # compute lag easily - # We do not want to initialize any data for this topic, when there is nothing - # useful we could present - # - # In theory lag stored must mean that lag must exist but just to be sure we - # check both here - next if lags.empty? || lags_stored.empty? - - cgs[group_name] ||= {} - cgs[group_name][topic_name] = { - lag_stored: lags_stored.sum, - lag: lags.sum, - pace: offsets_hi.sum, - # Take max last stable offset duration without any change. This can - # indicate a hanging transaction, because the offset will not move forward - # and will stay with a growing freeze duration when stuck - ls_offset_fd: ls_offsets_fd.max - } + # Last stable offsets freeze durations - we pick the max freeze to indicate + # the longest open transaction that potentially may be hanging + ls_offsets_fd = partitions_data + .map { |p_details| p_details.fetch(:ls_offset_fd, 0) } + .reject(&:negative?) + + cgs[group_name] ||= {} + cgs[group_name][topic_name] = { + lag_stored: lags_stored.sum, + lag: lags.sum, + pace: offsets_hi.sum, + # Take max last stable offset duration without any change. This can + # indicate a hanging transaction, because the offset will not move forward + # and will stay with a growing freeze duration when stuck + ls_offset_fd: ls_offsets_fd.max || 0 + } + end + + cgs + end + + # Converts our reports data into an iterator per partition + # Compensates for a case where same partition data would be available for a short + # period of time in multiple processes reports due to rebalances. + def iterate_partitions_data + cgs_topics = Hash.new { |h, v| h[v] = Hash.new { |h2, v2| h2[v2] = {} } } + + # We need to sort them in case we have same reports containing data about same + # topics partitions. Mostly during shutdowns and rebalances + @active_reports + .values + .sort_by { |report| report.fetch(:dispatched_at) } + .map { |details| details.fetch(:consumer_groups) } + .each do |consumer_groups| + consumer_groups.each do |group_name, group_details| + group_details.fetch(:subscription_groups).each_value do |sg_details| + sg_details.fetch(:topics).each do |topic_name, topic_details| + topic_details.fetch(:partitions).each do |partition_id, partition_data| + cgs_topics[group_name][topic_name][partition_id] = partition_data + end + end end end end - end - cgs + cgs_topics.each do |group_name, topics_data| + topics_data.each do |topic_name, partitions_data| + yield(group_name, topic_name, partitions_data.values) + end + end end end end diff --git a/lib/karafka/web/processing/time_series_tracker.rb b/lib/karafka/web/processing/time_series_tracker.rb index cb81ef85..0c55cabe 100644 --- a/lib/karafka/web/processing/time_series_tracker.rb +++ b/lib/karafka/web/processing/time_series_tracker.rb @@ -116,9 +116,12 @@ def evict # available times << values.last unless values.empty? + # Keep the most recent state out of many that would come from the same time moment + # Squash in case there would be two events from the same time + times.reverse! times.uniq!(&:first) + times.reverse! - # Squash in case there would be two events from the same time times.sort_by!(&:first) @historicals[range_name] = times.last(limit) diff --git a/lib/karafka/web/ui/models/health.rb b/lib/karafka/web/ui/models/health.rb index 06eaa027..fe9be0de 100644 --- a/lib/karafka/web/ui/models/health.rb +++ b/lib/karafka/web/ui/models/health.rb @@ -62,7 +62,11 @@ def fetch_rebalance_ages(state, stats) # # @param state [State] def iterate_partitions(state) - processes = Processes.active(state) + # By default processes are sort by name and this is not what we want here + # We want to make sure that the newest data is processed the last, so we get + # the most accurate state in case of deployments and shutdowns, etc without the + # expired processes partitions data overwriting the newly created processes + processes = Processes.active(state).sort_by!(&:dispatched_at) processes.each do |process| process.consumer_groups.each do |consumer_group| diff --git a/spec/fixtures/multi_partition_reports/process_1.json b/spec/fixtures/multi_partition_reports/process_1.json new file mode 100644 index 00000000..d2f18c1c --- /dev/null +++ b/spec/fixtures/multi_partition_reports/process_1.json @@ -0,0 +1,177 @@ +{ + "schema_version": "1.2.3", + "type": "consumer", + "dispatched_at": 1695813057.892911, + "process": { + "started_at": 1695802487.537935, + "name": "shinra:1817527:7ad1ed4ce83a", + "status": "running", + "listeners": 2, + "workers": 2, + "memory_usage": 142036, + "memory_total_usage": 25765592, + "memory_size": 32763212, + "cpus": 8, + "threads": 25, + "cpu_usage": [ + 1.6, + 1.57, + 1.75 + ], + "tags": [ + "#b2e822e" + ] + }, + "versions": { + "ruby": "ruby 3.2.2-53 e51014", + "karafka": "2.2.6", + "karafka_core": "2.2.2", + "karafka_web": "0.7.4", + "waterdrop": "2.6.7", + "rdkafka": "0.13.5", + "librdkafka": "2.2.0" + }, + "stats": { + "busy": 0, + "enqueued": 0, + "utilization": 1.9147967048486074, + "total": { + "batches": 16, + "messages": 50, + "errors": 1, + "retries": 1, + "dead": 0 + } + }, + "consumer_groups": { + "example_app_app": { + "id": "example_app_app", + "subscription_groups": { + "c4ca4238a0b9_0": { + "id": "c4ca4238a0b9_0", + "state": { + "state": "up", + "join_state": "steady", + "stateage": 10565686, + "rebalance_age": 10563021, + "rebalance_cnt": 1, + "rebalance_reason": "Metadata for subscribed topic(s) has changed" + }, + "topics": { + "default": { + "name": "default", + "partitions": { + "0": { + "lag": 3, + "lag_d": 3, + "lag_stored": 0, + "lag_stored_d": 0, + "committed_offset": 162517, + "committed_offset_fd": 0, + "stored_offset": 162520, + "stored_offset_fd": 0, + "fetch_state": "active", + "hi_offset": 162520, + "hi_offset_fd": 0, + "lo_offset": 0, + "eof_offset": 162520, + "ls_offset": 162520, + "ls_offset_d": 9, + "ls_offset_fd": 0, + "id": 0, + "poll_state": "active" + }, + "1": { + "lag": 6, + "lag_d": 3, + "lag_stored": 0, + "lag_stored_d": 0, + "committed_offset": 162936, + "committed_offset_fd": 0, + "stored_offset": 162942, + "stored_offset_fd": 0, + "fetch_state": "active", + "hi_offset": 162942, + "hi_offset_fd": 0, + "lo_offset": 0, + "eof_offset": 162939, + "ls_offset": 162942, + "ls_offset_d": 18, + "ls_offset_fd": 0, + "id": 1, + "poll_state": "active" + }, + "2": { + "lag": 0, + "lag_d": 0, + "lag_stored": 0, + "lag_stored_d": 0, + "committed_offset": 162310, + "committed_offset_fd": 0, + "stored_offset": 162310, + "stored_offset_fd": 0, + "fetch_state": "active", + "hi_offset": 162310, + "hi_offset_fd": 0, + "lo_offset": 0, + "eof_offset": 162310, + "ls_offset": 162310, + "ls_offset_d": 6, + "ls_offset_fd": 0, + "id": 2, + "poll_state": "active" + } + } + }, + "visits": { + "name": "visits", + "partitions": { + "0": { + "lag": 3, + "lag_d": -4, + "lag_stored": 3, + "lag_stored_d": -4, + "committed_offset": 135642, + "committed_offset_fd": 0, + "stored_offset": 135642, + "stored_offset_fd": 0, + "fetch_state": "active", + "hi_offset": 135645, + "hi_offset_fd": 0, + "lo_offset": 0, + "eof_offset": -1001, + "ls_offset": 135645, + "ls_offset_d": 7, + "ls_offset_fd": 5000, + "id": 0, + "poll_state": "paused" + } + } + } + } + } + } + }, + "example_app_karafka_web": { + "id": "example_app_karafka_web", + "subscription_groups": { + "c81e728d9d4c_1": { + "id": "c81e728d9d4c_1", + "state": { + "state": "up", + "join_state": "steady", + "stateage": 10565218, + "rebalance_age": 10562553, + "rebalance_cnt": 1, + "rebalance_reason": "Metadata for subscribed topic(s) has changed" + }, + "topics": { + } + } + } + } + }, + "jobs": [ + + ] +} diff --git a/spec/fixtures/multi_partition_reports/process_2.json b/spec/fixtures/multi_partition_reports/process_2.json new file mode 100644 index 00000000..447319b3 --- /dev/null +++ b/spec/fixtures/multi_partition_reports/process_2.json @@ -0,0 +1,182 @@ +{ + "schema_version": "1.2.3", + "type": "consumer", + "dispatched_at": 1695813054.4401045, + "process": { + "started_at": 1695802478.2217133, + "name": "shinra:1817047:0bddec5483b0", + "status": "running", + "listeners": 2, + "workers": 2, + "memory_usage": 142944, + "memory_total_usage": 25984244, + "memory_size": 32763212, + "cpus": 8, + "threads": 25, + "cpu_usage": [ + 1.56, + 1.56, + 1.75 + ], + "tags": [ + "#b2e822e" + ] + }, + "versions": { + "ruby": "ruby 3.2.2-53 e51014", + "karafka": "2.2.6", + "karafka_core": "2.2.2", + "karafka_web": "0.7.4", + "waterdrop": "2.6.7", + "rdkafka": "0.13.5", + "librdkafka": "2.2.0" + }, + "stats": { + "busy": 0, + "enqueued": 0, + "utilization": 1.953262352347374, + "total": { + "batches": 16, + "messages": 38, + "errors": 1, + "retries": 1, + "dead": 0 + } + }, + "consumer_groups": { + "example_app_app": { + "id": "example_app_app", + "subscription_groups": { + "c4ca4238a0b9_0": { + "id": "c4ca4238a0b9_0", + "state": { + "state": "up", + "join_state": "steady", + "stateage": 10575679, + "rebalance_age": 10563674, + "rebalance_cnt": 3, + "rebalance_reason": "rebalance in progress" + }, + "topics": { + "default": { + "name": "default", + "partitions": { + "3": { + "lag": 0, + "lag_d": -3, + "lag_stored": 0, + "lag_stored_d": 0, + "committed_offset": 163010, + "committed_offset_fd": 0, + "stored_offset": 163010, + "stored_offset_fd": 0, + "fetch_state": "active", + "hi_offset": 163010, + "hi_offset_fd": 0, + "lo_offset": 0, + "eof_offset": 163010, + "ls_offset": 163010, + "ls_offset_d": 12, + "ls_offset_fd": 0, + "id": 3, + "poll_state": "active" + }, + "4": { + "lag": 6, + "lag_d": 3, + "lag_stored": 0, + "lag_stored_d": -3, + "committed_offset": 162416, + "committed_offset_fd": 0, + "stored_offset": 162422, + "stored_offset_fd": 0, + "fetch_state": "active", + "hi_offset": 162422, + "hi_offset_fd": 0, + "lo_offset": 0, + "eof_offset": 162419, + "ls_offset": 162422, + "ls_offset_d": 9, + "ls_offset_fd": 0, + "id": 4, + "poll_state": "active" + } + } + }, + "visits": { + "name": "visits", + "partitions": { + "1": { + "lag": 2, + "lag_d": 1, + "lag_stored": 2, + "lag_stored_d": 1, + "committed_offset": 135419, + "committed_offset_fd": 0, + "stored_offset": 135419, + "stored_offset_fd": 0, + "fetch_state": "active", + "hi_offset": 135421, + "hi_offset_fd": 0, + "lo_offset": 0, + "eof_offset": 135421, + "ls_offset": 135421, + "ls_offset_d": 11, + "ls_offset_fd": 0, + "id": 1, + "poll_state": "paused" + } + } + } + } + } + } + }, + "example_app_karafka_web": { + "id": "example_app_karafka_web", + "subscription_groups": { + "c81e728d9d4c_1": { + "id": "c81e728d9d4c_1", + "state": { + "state": "up", + "join_state": "steady", + "stateage": 10575551, + "rebalance_age": 10563545, + "rebalance_cnt": 3, + "rebalance_reason": "rebalance in progress" + }, + "topics": { + "karafka_consumers_reports": { + "name": "karafka_consumers_reports", + "partitions": { + "0": { + "lag": 0, + "lag_d": 0, + "lag_stored": 0, + "lag_stored_d": 0, + "committed_offset": 28972, + "committed_offset_fd": 0, + "stored_offset": 28972, + "stored_offset_fd": 0, + "fetch_state": "active", + "hi_offset": 28972, + "hi_offset_fd": 0, + "lo_offset": 0, + "eof_offset": 28972, + "ls_offset": 28972, + "ls_offset_d": 2, + "ls_offset_fd": 0, + "id": 0, + "poll_state": "active" + } + } + } + } + } + } + } + }, + "jobs": [ + + ] +} diff --git a/spec/lib/karafka/web/management/create_initial_states_spec.rb b/spec/lib/karafka/web/management/create_initial_states_spec.rb index 1ab8de2e..5bf254a0 100644 --- a/spec/lib/karafka/web/management/create_initial_states_spec.rb +++ b/spec/lib/karafka/web/management/create_initial_states_spec.rb @@ -14,7 +14,7 @@ end context 'when the consumers state already exists' do - before { produce(consumers_states_topic, fixtures_file('consumers_state.json')) } + before { produce(consumers_states_topic, Fixtures.file('consumers_state.json')) } it 'expect not to overwrite it' do create @@ -53,7 +53,7 @@ end context 'when the consumers metrics already exists' do - before { produce(consumers_metrics_topic, fixtures_file('consumers_metrics.json')) } + before { produce(consumers_metrics_topic, Fixtures.file('consumers_metrics.json')) } it 'expect not to overwrite it' do create diff --git a/spec/lib/karafka/web/processing/consumers/aggregators/metrics_spec.rb b/spec/lib/karafka/web/processing/consumers/aggregators/metrics_spec.rb new file mode 100644 index 00000000..01d0be7c --- /dev/null +++ b/spec/lib/karafka/web/processing/consumers/aggregators/metrics_spec.rb @@ -0,0 +1,82 @@ +# frozen_string_literal: true + +RSpec.describe_current do + subject(:metrics_aggregator) { described_class.new } + + let(:reports_topic) { Karafka::Web.config.topics.consumers.reports = create_topic } + let(:metrics_topic) { Karafka::Web.config.topics.consumers.metrics = create_topic } + let(:states_topic) { Karafka::Web.config.topics.consumers.states = create_topic } + let(:schema_manager) { Karafka::Web::Processing::Consumers::SchemaManager.new } + let(:state_aggregator) do + Karafka::Web::Processing::Consumers::Aggregators::State.new(schema_manager) + end + + before do + reports_topic + metrics_topic + states_topic + end + + context 'when there are no initial metrics' do + let(:expected_error) { Karafka::Web::Errors::Processing::MissingConsumersMetricsError } + + it { expect { metrics_aggregator.to_h }.to raise_error(expected_error) } + end + + context 'when there are initial metrics but no other data' do + before { Karafka::Web::Management::CreateInitialStates.new.call } + + it 'expect to have basic empty stats' do + hashed = metrics_aggregator.to_h + + expect(hashed[:aggregated]).to eq(days: [], hours: [], minutes: [], seconds: []) + expect(hashed[:consumer_groups]).to eq(days: [], hours: [], minutes: [], seconds: []) + expect(hashed[:schema_version]).to eq('1.0.0') + expect(hashed.key?(:dispatched_at)).to eq(true) + end + end + + context 'when we have data from a multi-topic, multi-partition setup' do + let(:process1_report) do + data = Fixtures.json('multi_partition_reports/process_1') + data[:dispatched_at] = Time.now.to_f + data + end + + let(:process2_report) do + data = Fixtures.json('multi_partition_reports/process_2') + data[:dispatched_at] = Time.now.to_f + data + end + + before do + Karafka::Web::Management::CreateInitialStates.new.call + + [process1_report, process2_report].each_with_index do |report, index| + state_aggregator.add(report, index) + metrics_aggregator.add_report(report) + metrics_aggregator.add_stats(state_aggregator.stats) + end + end + + it 'expected to compute multi-process states correctly for all the topics' do + topics1 = metrics_aggregator.to_h[:consumer_groups][:seconds][0][1][:example_app_app] + topics2 = metrics_aggregator.to_h[:consumer_groups][:seconds][0][1][:example_app_karafka_web] + + expect(topics1[:visits][:lag_stored]).to eq(5) + expect(topics1[:visits][:lag]).to eq(5) + expect(topics1[:visits][:pace]).to eq(271_066) + expect(topics1[:visits][:ls_offset_fd]).to eq(5_000) + + expect(topics1[:default][:lag_stored]).to eq(0) + expect(topics1[:default][:lag]).to eq(15) + expect(topics1[:default][:pace]).to eq(813_204) + expect(topics1[:default][:ls_offset_fd]).to eq(0) + + expect(topics2[:karafka_consumers_reports][:lag_stored]).to eq(0) + expect(topics2[:karafka_consumers_reports][:lag]).to eq(0) + expect(topics2[:karafka_consumers_reports][:pace]).to eq(28_972) + expect(topics2[:karafka_consumers_reports][:ls_offset_fd]).to eq(0) + end + end +end diff --git a/spec/lib/karafka/web/processing/consumers/metrics_spec.rb b/spec/lib/karafka/web/processing/consumers/metrics_spec.rb index da3ebe21..996d299c 100644 --- a/spec/lib/karafka/web/processing/consumers/metrics_spec.rb +++ b/spec/lib/karafka/web/processing/consumers/metrics_spec.rb @@ -2,7 +2,7 @@ RSpec.describe_current do let(:metrics_topic) { Karafka::Web.config.topics.consumers.metrics = create_topic } - let(:fixture) { fixtures_file('consumers_metrics.json') } + let(:fixture) { Fixtures.file('consumers_metrics.json') } describe '#current!' do subject(:metrics) { described_class.current! } diff --git a/spec/lib/karafka/web/processing/consumers/state_spec.rb b/spec/lib/karafka/web/processing/consumers/state_spec.rb index 8fbf505d..49bb9f2e 100644 --- a/spec/lib/karafka/web/processing/consumers/state_spec.rb +++ b/spec/lib/karafka/web/processing/consumers/state_spec.rb @@ -2,7 +2,7 @@ RSpec.describe_current do let(:states_topic) { Karafka::Web.config.topics.consumers.states = create_topic } - let(:fixture) { fixtures_file('consumers_state.json') } + let(:fixture) { Fixtures.file('consumers_state.json') } describe '#current!' do subject(:state) { described_class.current! } diff --git a/spec/lib/karafka/web/processing/time_series_tracker_spec.rb b/spec/lib/karafka/web/processing/time_series_tracker_spec.rb index ecd6ae22..a2fee81c 100644 --- a/spec/lib/karafka/web/processing/time_series_tracker_spec.rb +++ b/spec/lib/karafka/web/processing/time_series_tracker_spec.rb @@ -10,7 +10,7 @@ end context 'when there is existing data' do - let(:metrics) { JSON.parse(fixtures_file('consumers_metrics.json'), symbolize_names: true) } + let(:metrics) { Fixtures.json('consumers_metrics') } let(:existing) { metrics.fetch(:aggregated) } it { expect(tracker.to_h[:days]).to eq(existing[:days]) } @@ -28,10 +28,10 @@ end it 'expect to only keep the oldest one after materialization in a time window' do - expect(tracker.to_h[:days]).to eq([[1, { a: 1 }], [1_000_000, { a: 3 }]]) - expect(tracker.to_h[:hours]).to eq([[1, { a: 1 }], [1_000_000, { a: 3 }]]) - expect(tracker.to_h[:minutes]).to eq([[1, { a: 1 }], [1_000_000, { a: 3 }]]) - expect(tracker.to_h[:seconds]).to eq([[1, { a: 1 }], [1_000_000, { a: 3 }]]) + expect(tracker.to_h[:days]).to eq([[1, { a: 1 }], [1_000_000, { a: 4 }]]) + expect(tracker.to_h[:hours]).to eq([[1, { a: 1 }], [1_000_000, { a: 4 }]]) + expect(tracker.to_h[:minutes]).to eq([[1, { a: 1 }], [1_000_000, { a: 4 }]]) + expect(tracker.to_h[:seconds]).to eq([[1, { a: 1 }], [1_000_000, { a: 4 }]]) end end end diff --git a/spec/lib/karafka/web/tracking/consumers/listeners/statistics_spec.rb b/spec/lib/karafka/web/tracking/consumers/listeners/statistics_spec.rb index f9986718..3dd81466 100644 --- a/spec/lib/karafka/web/tracking/consumers/listeners/statistics_spec.rb +++ b/spec/lib/karafka/web/tracking/consumers/listeners/statistics_spec.rb @@ -3,7 +3,7 @@ RSpec.describe_current do subject(:listener) { described_class.new } - let(:statistics) { JSON.parse(fixtures_file('emitted_statistics.json')) } + let(:statistics) { Fixtures.json('emitted_statistics', symbolize_names: false) } let(:sampler) { ::Karafka::Web.config.tracking.consumers.sampler } let(:sg_details) { sampler.consumer_groups['cgid'][:subscription_groups] } let(:default_p0) { sg_details['sgid'][:topics]['default'][:partitions][0] } diff --git a/spec/lib/karafka/web/ui/controllers/consumers_spec.rb b/spec/lib/karafka/web/ui/controllers/consumers_spec.rb index 8b05af23..001ad5b5 100644 --- a/spec/lib/karafka/web/ui/controllers/consumers_spec.rb +++ b/spec/lib/karafka/web/ui/controllers/consumers_spec.rb @@ -57,8 +57,8 @@ topics_config.consumers.states = states_topic topics_config.consumers.reports = reports_topic - data = JSON.parse(fixtures_file('consumers_state.json')) - base_report = JSON.parse(fixtures_file('consumer_report.json')) + data = Fixtures.json('consumers_state', symbolize_names: false) + base_report = Fixtures.json('consumer_report', symbolize_names: false) 100.times do |i| name = "shinra:#{i}:#{i}" diff --git a/spec/lib/karafka/web/ui/controllers/dashboard_spec.rb b/spec/lib/karafka/web/ui/controllers/dashboard_spec.rb index 353e9980..ab07dd77 100644 --- a/spec/lib/karafka/web/ui/controllers/dashboard_spec.rb +++ b/spec/lib/karafka/web/ui/controllers/dashboard_spec.rb @@ -52,7 +52,7 @@ defaults = ::Karafka::Web::Management::CreateInitialStates produce(states_topic, defaults::DEFAULT_STATE.to_json) - produce(metrics_topic, fixtures_file('consumers_single_metrics.json')) + produce(metrics_topic, Fixtures.file('consumers_single_metrics.json')) get 'dashboard' end diff --git a/spec/lib/karafka/web/ui/controllers/errors_spec.rb b/spec/lib/karafka/web/ui/controllers/errors_spec.rb index 60d3ebfe..1cab3aa0 100644 --- a/spec/lib/karafka/web/ui/controllers/errors_spec.rb +++ b/spec/lib/karafka/web/ui/controllers/errors_spec.rb @@ -5,7 +5,7 @@ let(:errors_topic) { create_topic } let(:no_errors) { 'There are no errors in this errors topic partition' } - let(:error_report) { fixtures_file('error.json') } + let(:error_report) { Fixtures.file('error.json') } before { topics_config.errors = errors_topic } diff --git a/spec/lib/karafka/web/ui/controllers/jobs_spec.rb b/spec/lib/karafka/web/ui/controllers/jobs_spec.rb index dd992caa..654a4de9 100644 --- a/spec/lib/karafka/web/ui/controllers/jobs_spec.rb +++ b/spec/lib/karafka/web/ui/controllers/jobs_spec.rb @@ -41,8 +41,8 @@ topics_config.consumers.states = states_topic topics_config.consumers.reports = reports_topic - data = JSON.parse(fixtures_file('consumers_state.json')) - base_report = JSON.parse(fixtures_file('consumer_report.json')) + data = Fixtures.json('consumers_state', symbolize_names: false) + base_report = Fixtures.json('consumer_report', symbolize_names: false) 100.times do |i| name = "shinra:#{i}:#{i}" diff --git a/spec/lib/karafka/web/ui/models/consumers_metrics_spec.rb b/spec/lib/karafka/web/ui/models/consumers_metrics_spec.rb index 3f238ab2..42fb5cee 100644 --- a/spec/lib/karafka/web/ui/models/consumers_metrics_spec.rb +++ b/spec/lib/karafka/web/ui/models/consumers_metrics_spec.rb @@ -4,8 +4,8 @@ subject(:metrics) { described_class } let(:metrics_topic) { create_topic } - let(:fixture) { fixtures_file('consumers_metrics.json') } - let(:fixture_hash) { JSON.parse(fixture, symbolize_names: true) } + let(:fixture) { Fixtures.file('consumers_metrics.json') } + let(:fixture_hash) { Fixtures.json('consumers_metrics') } before { Karafka::Web.config.topics.consumers.metrics = metrics_topic } @@ -20,7 +20,7 @@ before do allow(status.class).to receive(:new).and_return(status) allow(status).to receive(:enabled).and_return(OpenStruct.new(success?: false)) - produce(metrics_topic, fixtures_file('consumers_metrics.json')) + produce(metrics_topic, Fixtures.file('consumers_metrics.json')) end it { expect(metrics.current).to eq(false) } @@ -36,10 +36,10 @@ end context 'when there are more metrics and karafka-web is enabled' do - let(:fixture1) { fixtures_file('consumers_metrics.json') } - let(:fixture2) { fixtures_file('consumers_metrics.json') } - let(:fixture_hash1) { JSON.parse(fixture1, symbolize_names: true) } - let(:fixture_hash2) { JSON.parse(fixture2, symbolize_names: true) } + let(:fixture1) { Fixtures.file('consumers_metrics.json') } + let(:fixture2) { Fixtures.file('consumers_metrics.json') } + let(:fixture_hash1) { Fixtures.json('consumers_metrics') } + let(:fixture_hash2) { Fixtures.json('consumers_metrics') } before do fixture_hash2[:dispatched_at] = 1 diff --git a/spec/lib/karafka/web/ui/models/consumers_state_spec.rb b/spec/lib/karafka/web/ui/models/consumers_state_spec.rb index 274b95c1..16651897 100644 --- a/spec/lib/karafka/web/ui/models/consumers_state_spec.rb +++ b/spec/lib/karafka/web/ui/models/consumers_state_spec.rb @@ -4,8 +4,8 @@ subject(:state) { described_class } let(:states_topic) { create_topic } - let(:fixture) { fixtures_file('consumers_state.json') } - let(:fixture_hash) { JSON.parse(fixture, symbolize_names: true) } + let(:fixture) { Fixtures.file('consumers_state.json') } + let(:fixture_hash) { Fixtures.json('consumers_state') } before { Karafka::Web.config.topics.consumers.states = states_topic } @@ -20,7 +20,7 @@ before do allow(status.class).to receive(:new).and_return(status) allow(status).to receive(:enabled).and_return(OpenStruct.new(success?: false)) - produce(states_topic, fixtures_file('consumers_state.json')) + produce(states_topic, Fixtures.file('consumers_state.json')) end it { expect(state.current).to eq(false) } @@ -36,10 +36,10 @@ end context 'when there are more states and karafka-web is enabled' do - let(:fixture1) { fixtures_file('consumers_state.json') } - let(:fixture2) { fixtures_file('consumers_state.json') } - let(:fixture_hash1) { JSON.parse(fixture1, symbolize_names: true) } - let(:fixture_hash2) { JSON.parse(fixture2, symbolize_names: true) } + let(:fixture1) { Fixtures.file('consumers_state.json') } + let(:fixture2) { Fixtures.file('consumers_state.json') } + let(:fixture_hash1) { Fixtures.json('consumers_state') } + let(:fixture_hash2) { Fixtures.json('consumers_state') } before do fixture_hash2[:dispatched_at] = 1 diff --git a/spec/lib/karafka/web/ui/models/counters_spec.rb b/spec/lib/karafka/web/ui/models/counters_spec.rb index 3a866be1..56350998 100644 --- a/spec/lib/karafka/web/ui/models/counters_spec.rb +++ b/spec/lib/karafka/web/ui/models/counters_spec.rb @@ -3,7 +3,7 @@ RSpec.describe_current do subject(:stats) { described_class.new(state) } - let(:state) { JSON.parse(fixtures_file('consumers_state.json'), symbolize_names: true) } + let(:state) { Fixtures.json('consumers_state') } let(:errors_topic) { create_topic } before { Karafka::Web.config.topics.errors = errors_topic } diff --git a/spec/lib/karafka/web/ui/models/health_spec.rb b/spec/lib/karafka/web/ui/models/health_spec.rb index dc2622af..29bd9356 100644 --- a/spec/lib/karafka/web/ui/models/health_spec.rb +++ b/spec/lib/karafka/web/ui/models/health_spec.rb @@ -3,8 +3,8 @@ RSpec.describe_current do subject(:stats) { described_class.current(state) } - let(:state) { JSON.parse(fixtures_file('consumers_state.json'), symbolize_names: true) } - let(:report) { JSON.parse(fixtures_file('consumer_report.json'), symbolize_names: true) } + let(:state) { Fixtures.json('consumers_state') } + let(:report) { Fixtures.json('consumer_report') } let(:reports_topic) { create_topic } before { Karafka::Web.config.topics.consumers.reports = reports_topic } diff --git a/spec/lib/karafka/web/ui/models/job_spec.rb b/spec/lib/karafka/web/ui/models/job_spec.rb index 39a2542e..11e42ba6 100644 --- a/spec/lib/karafka/web/ui/models/job_spec.rb +++ b/spec/lib/karafka/web/ui/models/job_spec.rb @@ -4,8 +4,8 @@ subject(:job) { process.jobs.first } let(:process) { Karafka::Web::Ui::Models::Process.find(state, '1') } - let(:state) { JSON.parse(fixtures_file('consumers_state.json'), symbolize_names: true) } - let(:report) { JSON.parse(fixtures_file('consumer_report.json'), symbolize_names: true) } + let(:state) { Fixtures.json('consumers_state') } + let(:report) { Fixtures.json('consumer_report') } let(:reports_topic) { create_topic } before do diff --git a/spec/lib/karafka/web/ui/models/process_spec.rb b/spec/lib/karafka/web/ui/models/process_spec.rb index 80ac8308..69d9bb79 100644 --- a/spec/lib/karafka/web/ui/models/process_spec.rb +++ b/spec/lib/karafka/web/ui/models/process_spec.rb @@ -1,8 +1,8 @@ # frozen_string_literal: true RSpec.describe_current do - let(:state) { JSON.parse(fixtures_file('consumers_state.json'), symbolize_names: true) } - let(:report) { JSON.parse(fixtures_file('consumer_report.json'), symbolize_names: true) } + let(:state) { Fixtures.json('consumers_state') } + let(:report) { Fixtures.json('consumer_report') } let(:reports_topic) { create_topic } before do diff --git a/spec/lib/karafka/web/ui/models/processes_spec.rb b/spec/lib/karafka/web/ui/models/processes_spec.rb index a475c1f0..0807412a 100644 --- a/spec/lib/karafka/web/ui/models/processes_spec.rb +++ b/spec/lib/karafka/web/ui/models/processes_spec.rb @@ -1,8 +1,8 @@ # frozen_string_literal: true RSpec.describe_current do - let(:state) { JSON.parse(fixtures_file('consumers_state.json'), symbolize_names: true) } - let(:report) { JSON.parse(fixtures_file('consumer_report.json'), symbolize_names: true) } + let(:state) { Fixtures.json('consumers_state') } + let(:report) { Fixtures.json('consumer_report') } let(:reports_topic) { create_topic } before do @@ -21,7 +21,7 @@ context 'when requested processes are too old' do let(:report) do - report = JSON.parse(fixtures_file('consumer_report.json'), symbolize_names: true) + report = Fixtures.json('consumer_report') report[:dispatched_at] = 1_690_883_271 report end diff --git a/spec/lib/karafka/web/ui/models/status_spec.rb b/spec/lib/karafka/web/ui/models/status_spec.rb index 62c5ce64..5b6571dc 100644 --- a/spec/lib/karafka/web/ui/models/status_spec.rb +++ b/spec/lib/karafka/web/ui/models/status_spec.rb @@ -7,9 +7,9 @@ let(:reports_topic) { Karafka::Web.config.topics.consumers.reports = create_topic } let(:metrics_topic) { Karafka::Web.config.topics.consumers.metrics = create_topic } let(:states_topic) { Karafka::Web.config.topics.consumers.states = create_topic } - let(:state) { fixtures_file('consumers_state.json') } - let(:metrics) { fixtures_file('consumers_metrics.json') } - let(:report) { fixtures_file('consumer_report.json') } + let(:state) { Fixtures.file('consumers_state.json') } + let(:metrics) { Fixtures.file('consumers_metrics.json') } + let(:report) { Fixtures.file('consumer_report.json') } let(:all_topics) do errors_topic diff --git a/spec/lib/karafka/web/ui/pro/controllers/consumers_spec.rb b/spec/lib/karafka/web/ui/pro/controllers/consumers_spec.rb index 6b4e7d2c..a397aa0d 100644 --- a/spec/lib/karafka/web/ui/pro/controllers/consumers_spec.rb +++ b/spec/lib/karafka/web/ui/pro/controllers/consumers_spec.rb @@ -58,8 +58,8 @@ topics_config.consumers.states = states_topic topics_config.consumers.reports = reports_topic - data = JSON.parse(fixtures_file('consumers_state.json')) - base_report = JSON.parse(fixtures_file('consumer_report.json')) + data = Fixtures.json('consumers_state', symbolize_names: false) + base_report = Fixtures.json('consumer_report', symbolize_names: false) 100.times do |i| name = "shinra:#{i}:#{i}" @@ -160,7 +160,7 @@ before do topics_config.consumers.reports = reports_topic - report = JSON.parse(fixtures_file('consumer_report.json')) + report = Fixtures.json('consumer_report', symbolize_names: false) report['jobs'] = [] produce(reports_topic, report.to_json) @@ -203,7 +203,7 @@ before do topics_config.consumers.reports = reports_topic - report = JSON.parse(fixtures_file('consumer_report.json')) + report = Fixtures.json('consumer_report', symbolize_names: false) report['consumer_groups'] = {} produce(reports_topic, report.to_json) diff --git a/spec/lib/karafka/web/ui/pro/controllers/dashboard_spec.rb b/spec/lib/karafka/web/ui/pro/controllers/dashboard_spec.rb index 58d08c4b..f3ce4754 100644 --- a/spec/lib/karafka/web/ui/pro/controllers/dashboard_spec.rb +++ b/spec/lib/karafka/web/ui/pro/controllers/dashboard_spec.rb @@ -52,7 +52,7 @@ defaults = ::Karafka::Web::Management::CreateInitialStates produce(states_topic, defaults::DEFAULT_STATE.to_json) - produce(metrics_topic, fixtures_file('consumers_single_metrics.json')) + produce(metrics_topic, Fixtures.file('consumers_single_metrics.json')) get 'dashboard' end diff --git a/spec/lib/karafka/web/ui/pro/controllers/errors_spec.rb b/spec/lib/karafka/web/ui/pro/controllers/errors_spec.rb index 33df7c3f..4b930d17 100644 --- a/spec/lib/karafka/web/ui/pro/controllers/errors_spec.rb +++ b/spec/lib/karafka/web/ui/pro/controllers/errors_spec.rb @@ -5,7 +5,7 @@ let(:errors_topic) { create_topic(partitions: partitions) } let(:partitions) { 2 } - let(:error_report) { fixtures_file('error.json') } + let(:error_report) { Fixtures.file('error.json') } let(:no_errors) { 'There are no errors in this errors topic partition' } before { topics_config.errors = errors_topic } diff --git a/spec/lib/karafka/web/ui/pro/controllers/health_spec.rb b/spec/lib/karafka/web/ui/pro/controllers/health_spec.rb index 6857ef72..e8c4491c 100644 --- a/spec/lib/karafka/web/ui/pro/controllers/health_spec.rb +++ b/spec/lib/karafka/web/ui/pro/controllers/health_spec.rb @@ -83,7 +83,7 @@ context 'when one of partitions is at risk due to LSO' do before do - report = JSON.parse(fixtures_file('consumer_report.json')) + report = Fixtures.json('consumer_report', symbolize_names: false) partition_data = report.dig(*partition_scope) @@ -111,7 +111,7 @@ context 'when one of partitions is stopped due to LSO' do before do - report = JSON.parse(fixtures_file('consumer_report.json')) + report = Fixtures.json('consumer_report', symbolize_names: false) partition_data = report.dig(*partition_scope) diff --git a/spec/lib/karafka/web/ui/pro/controllers/jobs_spec.rb b/spec/lib/karafka/web/ui/pro/controllers/jobs_spec.rb index 5e779067..65e2d2ab 100644 --- a/spec/lib/karafka/web/ui/pro/controllers/jobs_spec.rb +++ b/spec/lib/karafka/web/ui/pro/controllers/jobs_spec.rb @@ -41,8 +41,8 @@ topics_config.consumers.states = states_topic topics_config.consumers.reports = reports_topic - data = JSON.parse(fixtures_file('consumers_state.json')) - base_report = JSON.parse(fixtures_file('consumer_report.json')) + data = Fixtures.json('consumers_state', symbolize_names: false) + base_report = Fixtures.json('consumer_report', symbolize_names: false) 100.times do |i| name = "shinra:#{i}:#{i}" diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index e6844ea8..c38ff5f2 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -69,16 +69,6 @@ def self.token RSpec.extend RSpecLocator.new(__FILE__) include TopicsManagerHelper -# Fetches fixture content -# @param file_name [String] fixture file name -# @return [String] fixture content -def fixtures_file(file_name) - File - .dirname(__FILE__) - .then { |location| File.join(location, 'fixtures', file_name) } - .then { |fixture_path| File.read(fixture_path) } -end - module Karafka # Configuration for test env class App @@ -112,9 +102,9 @@ class App config.topics.errors = TOPICS[3] end -produce(TOPICS[0], fixtures_file('consumers_state.json')) -produce(TOPICS[1], fixtures_file('consumers_metrics.json')) -produce(TOPICS[2], fixtures_file('consumer_report.json')) +produce(TOPICS[0], Fixtures.file('consumers_state.json')) +produce(TOPICS[1], Fixtures.file('consumers_metrics.json')) +produce(TOPICS[2], Fixtures.file('consumer_report.json')) Karafka::Web.enable! diff --git a/spec/support/fixtures.rb b/spec/support/fixtures.rb new file mode 100644 index 00000000..b9ee4e4c --- /dev/null +++ b/spec/support/fixtures.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +# A simple wrapper to get fixtures both in plain text and in JSON if needed +class Fixtures + class << self + # Fetches fixture content + # + # @param file_name [String] fixture file name + # @return [String] fixture content + def file(file_name) + File + .dirname(__FILE__) + .then { |location| File.join(location, '../', 'fixtures', file_name) } + .then { |fixture_path| File.read(fixture_path) } + end + + # Fetches and parses to JSON data from the fixture file + # + # @param file_name [String] fixture file name without extension because `.json` expected + # @param symbolize_names [Boolean] should we parse to symbols + # @return [Array, Hash] deserialized JSON data + def json(file_name, symbolize_names: true) + JSON.parse( + file("#{file_name}.json"), + symbolize_names: symbolize_names + ) + end + end +end