Skip to content

Commit

Permalink
report lso and other offsets
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld committed Sep 10, 2023
1 parent 073d73a commit 786c96a
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
- [Improvement] Provide flash messages support.
- [Improvement] Use replication factor of two by default (if not overridden) for Web UI topics when there is more than one broker.
- [Improvement] Show a warning when replication factor of 1 is used for Web UI topics in production.
- [Improvement] Collect extra additional metrics useful for hanging transactions detection.
- [Fix] Return 402 status instead of 500 on Pro features that are not available in OSS.
- [Fix] Fix a case where errors would not be visible without Rails due to the `String#first` usage.
- [Fix] Fix a case where live-poll would be disabled but would still update data.
Expand Down
22 changes: 11 additions & 11 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ PATH
karafka-web (0.7.0)
erubi (~> 1.4)
karafka (>= 2.2.0, < 3.0.0)
karafka-core (>= 2.0.13, < 3.0.0)
karafka-core (>= 2.2.1, < 3.0.0)
roda (~> 3.68, >= 3.69)
tilt (~> 2.0)

GEM
remote: https://rubygems.org/
specs:
activesupport (7.0.7.2)
activesupport (7.0.8)
concurrent-ruby (~> 1.0, >= 1.0.2)
i18n (>= 1.6, < 2)
minitest (>= 5.1)
Expand All @@ -26,28 +26,28 @@ GEM
ffi (1.15.5)
i18n (1.14.1)
concurrent-ruby (~> 1.0)
karafka (2.2.0)
karafka-core (>= 2.1.1, < 2.2.0)
karafka (2.2.1)
karafka-core (>= 2.2.0, < 2.3.0)
thor (>= 0.20)
waterdrop (>= 2.6.6, < 3.0.0)
zeitwerk (~> 2.3)
karafka-core (2.1.1)
karafka-core (2.2.1)
concurrent-ruby (>= 1.1)
karafka-rdkafka (>= 0.13.1, < 0.14.0)
karafka-rdkafka (0.13.3)
karafka-rdkafka (0.13.4)
ffi (~> 1.15)
mini_portile2 (~> 2.6)
rake (> 12)
mini_portile2 (2.8.4)
minitest (5.19.0)
minitest (5.20.0)
rack (3.0.8)
rack-test (2.1.0)
rack (>= 1.3)
rackup (0.2.3)
rack (>= 3.0.0.beta1)
webrick
rake (13.0.6)
roda (3.70.0)
roda (3.71.0)
rack
rspec (3.12.0)
rspec-core (~> 3.12.0)
Expand All @@ -72,11 +72,11 @@ GEM
tilt (2.2.0)
tzinfo (2.0.6)
concurrent-ruby (~> 1.0)
waterdrop (2.6.6)
waterdrop (2.6.7)
karafka-core (>= 2.1.1, < 3.0.0)
zeitwerk (~> 2.3)
webrick (1.8.1)
zeitwerk (2.6.8)
zeitwerk (2.6.11)

PLATFORMS
x86_64-linux
Expand All @@ -91,4 +91,4 @@ DEPENDENCIES
simplecov

BUNDLED WITH
2.4.12
2.4.19
2 changes: 1 addition & 1 deletion karafka-web.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Gem::Specification.new do |spec|

spec.add_dependency 'erubi', '~> 1.4'
spec.add_dependency 'karafka', '>= 2.2.0', '< 3.0.0'
spec.add_dependency 'karafka-core', '>= 2.0.13', '< 3.0.0'
spec.add_dependency 'karafka-core', '>= 2.2.1', '< 3.0.0'
spec.add_dependency 'roda', '~> 3.68', '>= 3.69'
spec.add_dependency 'tilt', '~> 2.0'

Expand Down
7 changes: 7 additions & 0 deletions lib/karafka/web/tracking/consumers/contracts/partition.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,14 @@ class Partition < Web::Contracts::Base
required(:lag_stored_d) { |val| val.is_a?(Integer) }
required(:committed_offset) { |val| val.is_a?(Integer) }
required(:stored_offset) { |val| val.is_a?(Integer) }
required(:fetch_state) { |val| val.is_a?(String) && !val.empty? }
required(:poll_state) { |val| val.is_a?(String) && !val.empty? }
required(:hi_offset) { |val| val.is_a?(Integer) }
required(:lo_offset) { |val| val.is_a?(Integer) }
required(:eof_offset) { |val| val.is_a?(Integer) }
required(:ls_offset) { |val| val.is_a?(Integer) }
required(:ls_offset_d) { |val| val.is_a?(Integer) }
required(:ls_offset_fd) { |val| val.is_a?(Integer) && val >= 0 }
end
end
end
Expand Down
8 changes: 7 additions & 1 deletion lib/karafka/web/tracking/consumers/listeners/statistics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,13 @@ def extract_partition_metrics(pt_stats)
'committed_offset',
'stored_offset',
'fetch_state',
'hi_offset'
'hi_offset',
'lo_offset',
'eof_offset',
'ls_offset',
# Two below can be useful for detection of hanging transactions
'ls_offset_d',
'ls_offset_fd'
)

# Rename as we do not need `consumer_` prefix
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,14 @@
lag_d: -1,
committed_offset: 0,
stored_offset: 0,
hi_offset: 1
fetch_state: 'active',
poll_state: 'active',
hi_offset: 1,
lo_offset: 0,
eof_offset: 0,
ls_offset: 0,
ls_offset_d: 0,
ls_offset_fd: 0
}
end

Expand All @@ -26,6 +33,29 @@
it { expect(contract.call(config)).not_to be_success }
end

%i[
fetch_state
poll_state
].each do |state|
context "when #{state} is not present" do
before { config.delete(state) }

it { expect(contract.call(config)).not_to be_success }
end

context "when #{state} is not a string" do
before { config[state] = rand }

it { expect(contract.call(config)).not_to be_success }
end

context "when #{state} is empty" do
before { config[state] = '' }

it { expect(contract.call(config)).not_to be_success }
end
end

%i[
id
lag_stored
Expand All @@ -35,6 +65,11 @@
committed_offset
stored_offset
hi_offset
lo_offset
eof_offset
ls_offset
ls_offset_d
ls_offset_fd
].each do |key|
context "when #{key} is not numeric" do
before { config[key] = '2' }
Expand All @@ -48,4 +83,10 @@
it { expect(contract.call(config)).not_to be_success }
end
end

context 'when ls_offset_fd is less than 0' do
before { config[:ls_offset_fd] = -1 }

it { expect(contract.call(config)).not_to be_success }
end
end

0 comments on commit 786c96a

Please sign in to comment.