Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

better processing crash errors #162

Merged
merged 1 commit into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion lib/karafka/web/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,18 @@ module Processing
# If you see this error, it probably means, that you did not bootstrap Web-UI correctly
MissingConsumersStateError = Class.new(BaseError)

# Similar to the above. It should be created during install
# Raised when we try to materialize the state but the consumers states topic does not
# exist and we do not have a way to get the initial state.
# It differs from the above because above indicates that the topic exists but that there
# is no initial state, while this indicates, that there is no consumers states topic.
MissingConsumersStatesTopicError = Class.new(BaseError)

# Similar to the above. It should be created during install / migration
MissingConsumersMetricsError = Class.new(BaseError)

# Similar to the one related to consumers states
MissingConsumersMetricsTopicError = Class.new(BaseError)

# This error occurs when consumer running older version of the web-ui tries to materialize
# states from newer versions. Karafka Web-UI provides only backwards compatibility, so
# you need to have an up-to-date consumer materializing reported states.
Expand Down
29 changes: 27 additions & 2 deletions lib/karafka/web/installer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def install(replication_factor: 1)
puts 'Creating necessary topics and populating state data...'
puts
Management::CreateTopics.new.call(replication_factor)
puts
wait_for_topics
Management::CreateInitialStates.new.call
puts
Management::ExtendBootFile.new.call
Expand All @@ -36,6 +36,7 @@ def migrate(replication_factor: 1)
puts 'Creating necessary topics and populating state data...'
puts
Management::CreateTopics.new.call(replication_factor)
wait_for_topics
Management::CreateInitialStates.new.call
puts
puts("Migration #{green('completed')}. Have fun!")
Expand All @@ -51,7 +52,7 @@ def reset(replication_factor: 1)
Management::DeleteTopics.new.call
puts
Management::CreateTopics.new.call(replication_factor)
puts
wait_for_topics
Management::CreateInitialStates.new.call
puts
puts("Resetting #{green('completed')}. Have fun!")
Expand All @@ -74,6 +75,30 @@ def uninstall
def enable!
Management::Enable.new.call
end

private

# Waits with a message, that we are waiting on topics
# This is not doing much, just waiting as there are some cases that it takes a bit of time
# for Kafka to actually propagate new topics knowledge across the cluster. We give it that
# bit of time just in case.
def wait_for_topics
puts
print 'Waiting for the topics to synchronize in the cluster'
wait(5)
puts
end

# Waits for given number of seconds and prints `.` every second.
# @param time_in_seconds [Integer] time of wait
def wait(time_in_seconds)
time_in_seconds.times do
sleep(1)
print '.'
end

print "\n"
end
end
end
end
4 changes: 4 additions & 0 deletions lib/karafka/web/processing/consumers/metrics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ def current!
return metrics_message.payload if metrics_message

raise(::Karafka::Web::Errors::Processing::MissingConsumersMetricsError)
rescue Rdkafka::RdkafkaError => e
raise(e) unless e.code == :unknown_partition

raise(::Karafka::Web::Errors::Processing::MissingConsumersMetricsTopicError)
end
end
end
Expand Down
4 changes: 4 additions & 0 deletions lib/karafka/web/processing/consumers/state.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ def current!
return state_message.payload if state_message

raise(::Karafka::Web::Errors::Processing::MissingConsumersStateError)
rescue Rdkafka::RdkafkaError => e
raise(e) unless e.code == :unknown_partition

raise(::Karafka::Web::Errors::Processing::MissingConsumersStatesTopicError)
end
end
end
Expand Down
10 changes: 10 additions & 0 deletions spec/lib/karafka/web/processing/consumers/metrics_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@
it { expect { metrics }.to raise_error(expected_error) }
end

context 'when metrics topic does not exist' do
let(:expected_error) do
::Karafka::Web::Errors::Processing::MissingConsumersMetricsTopicError
end

before { Karafka::Web.config.topics.consumers.metrics = SecureRandom.uuid }

it { expect { metrics }.to raise_error(expected_error) }
end

context 'when current state exists' do
before { produce(metrics_topic, fixture) }

Expand Down
10 changes: 10 additions & 0 deletions spec/lib/karafka/web/processing/consumers/state_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@
it { expect { state }.to raise_error(expected_error) }
end

context 'when states topic does not exist' do
let(:expected_error) do
::Karafka::Web::Errors::Processing::MissingConsumersStatesTopicError
end

before { Karafka::Web.config.topics.consumers.states = SecureRandom.uuid }

it { expect { state }.to raise_error(expected_error) }
end

context 'when current state exists' do
before { produce(states_topic, fixture) }

Expand Down