Skip to content

Commit

Permalink
Added support for custom topic names and topic configurations. (#44)
Browse files Browse the repository at this point in the history
Signed-off-by: Hermann Mayer <[email protected]>
  • Loading branch information
Jack12816 authored Nov 14, 2024
1 parent 4e2b300 commit f2d20a9
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 42 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
### next

* TODO: Replace this bullet point with an actual description of a change.
* Added support for custom topic names via the `full_name:` keyword argument on
the consumer routing table (#44)
* Added support to pass a block to the routing table
(`Rimless.consumer.topics`) to add custom topic configurations (#44)

### 1.7.7 (19 September 2024)

Expand Down
15 changes: 13 additions & 2 deletions lib/rimless/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,23 @@ def configure(&block)
# topics({ app: :test_app, name: :admins } => YourConsumer)
# topics({ app: :test_app, names: %i[users admins] } => YourConsumer)
#
# Examples:
#
# topics do
# topic('name') do
# consumer CustomConsumer
# end
# end
#
# @param topics [Hash{Hash => Class}] the topic to consumer mapping
# @yield the given block on the routing table
#
# rubocop:disable Metrics/MethodLength because of the Karafka DSL
def topics(topics)
def topics(topics = [], &block)
consumer_groups.draw do
consumer_group Rimless.configuration.client_id do
consumer_group(Rimless.configuration.client_id) do
instance_exec(&block) if block_given?

topics.each do |topic_parts, dest_consumer|
Rimless.consumer.topic_names(topic_parts).each do |topic_name|
topic(topic_name) do
Expand Down
11 changes: 10 additions & 1 deletion lib/rimless/kafka_helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,21 @@ module KafkaHelpers
# Rimless.topic(:users, app: 'test-api')
# @example Mix and match
# Rimless.topic(name: 'test', app: :fancy_app)
# @example Full name - use as is
# Rimless.topic(full_name: 'my.custom.topic.name')
#
# rubocop:disable Metrics/AbcSize because of the usage flexibility
# rubocop:disable Metrics/MethodLength because of the usage flexibility
# rubocop:disable Metrics/AbcSize dito
# rubocop:disable Metrics/CyclomaticComplexity dito
# rubocop:disable Metrics/PerceivedComplexity dito
def topic(*args)
opts = args.last
name = args.first if [String, Symbol].member?(args.first.class)

if opts.is_a?(Hash)
# When we got a full name, we use it as is
return opts[:full_name] if opts.key? :full_name

name = opts[:name] if opts.key?(:name)
app = opts[:app] if opts.key?(:app)
end
Expand All @@ -38,8 +45,10 @@ def topic(*args)

"#{Rimless.topic_prefix(app)}#{name}".tr('_', '-')
end
# rubocop:enable Metrics/MethodLength
# rubocop:enable Metrics/AbcSize
# rubocop:enable Metrics/CyclomaticComplexity
# rubocop:enable Metrics/PerceivedComplexity

# Send a single message to Apache Kafka. The data is encoded according to
# the given Apache Avro schema. The destination Kafka topic may be a
Expand Down
98 changes: 60 additions & 38 deletions spec/rimless/consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,56 +38,78 @@
describe '.topics' do
let(:topics) { Rimless.consumer.consumer_groups.first.topics }

before do
described_class.topics(topic1: Rimless::BaseConsumer,
topic2: Rimless::BaseConsumer)
end
before { described_class.consumer_groups.clear }

it 'configures a single consumer group' do
expect(Rimless.consumer.consumer_groups.count).to be(1)
end
context 'with topics, without block' do
before do
described_class.topics(topic1: Rimless::BaseConsumer,
topic2: Rimless::BaseConsumer)
end

it 'configures the correct consumer group name' do
expect(Rimless.consumer.consumer_groups.first.name).to \
eql('test-app')
end
it 'configures a single consumer group' do
expect(Rimless.consumer.consumer_groups.count).to be(1)
end

it 'configures two topics' do
expect(topics.count).to be(2)
end
it 'configures the correct consumer group name' do
expect(Rimless.consumer.consumer_groups.first.name).to \
eql('test-app')
end

it 'configures the first topic name correctly' do
expect(topics.first.name).to eql('test.test-app.topic1')
end
it 'configures two topics' do
expect(topics.count).to be(2)
end

it 'configures the first topic consumer correctly' do
expect(topics.first.consumer).to be(Rimless::BaseConsumer)
end
it 'configures the first topic name correctly' do
expect(topics.first.name).to eql('test.test-app.topic1')
end

it 'configures the first topic worker correctly' do
expect(topics.first.worker).to be(Rimless::ConsumerJob)
end
it 'configures the first topic consumer correctly' do
expect(topics.first.consumer).to be(Rimless::BaseConsumer)
end

it 'configures the first topic interchanger correctly' do
expect(topics.first.interchanger).to \
be_a(Rimless::Karafka::Base64Interchanger)
end
it 'configures the first topic worker correctly' do
expect(topics.first.worker).to be(Rimless::ConsumerJob)
end

it 'configures the second topic name correctly' do
expect(topics.last.name).to eql('test.test-app.topic2')
end
it 'configures the first topic interchanger correctly' do
expect(topics.first.interchanger).to \
be_a(Rimless::Karafka::Base64Interchanger)
end

it 'configures the second topic consumer correctly' do
expect(topics.last.consumer).to be(Rimless::BaseConsumer)
end
it 'configures the second topic name correctly' do
expect(topics.last.name).to eql('test.test-app.topic2')
end

it 'configures the second topic consumer correctly' do
expect(topics.last.consumer).to be(Rimless::BaseConsumer)
end

it 'configures the second topic worker correctly' do
expect(topics.last.worker).to be(Rimless::ConsumerJob)
end

it 'configures the second topic worker correctly' do
expect(topics.last.worker).to be(Rimless::ConsumerJob)
it 'configures the second topic interchanger correctly' do
expect(topics.last.interchanger).to \
be_a(Rimless::Karafka::Base64Interchanger)
end
end

it 'configures the second topic interchanger correctly' do
expect(topics.last.interchanger).to \
be_a(Rimless::Karafka::Base64Interchanger)
context 'without topics, but a block' do
before do
described_class.topics do
topic('my.custom.topic') do
consumer Rimless::BaseConsumer
end
end
end

it 'configures the topic name correctly' do
expect(topics.last.name).to eql('my.custom.topic')
end

it 'configures the topic consumer correctly' do
expect(topics.last.consumer).to be(Rimless::BaseConsumer)
end
end
end

Expand Down
5 changes: 5 additions & 0 deletions spec/rimless/kafka_helpers_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@
expect(described_class.topic(name: :new_customers, app: :test_api)).to \
eql('test.test-api.new-customers')
end

it 'returns the full name when given' do
expect(described_class.topic(full_name: 'my.custom.topic')).to \
eql('my.custom.topic')
end
end

describe '.sync_message' do
Expand Down

0 comments on commit f2d20a9

Please sign in to comment.