From f2d20a90284d5ed4a93ff251acda7d0d46f1535a Mon Sep 17 00:00:00 2001 From: Hermann Mayer Date: Thu, 14 Nov 2024 12:29:15 +0100 Subject: [PATCH] Added support for custom topic names and topic configurations. (#44) Signed-off-by: Hermann Mayer --- CHANGELOG.md | 5 +- lib/rimless/consumer.rb | 15 ++++- lib/rimless/kafka_helpers.rb | 11 +++- spec/rimless/consumer_spec.rb | 98 ++++++++++++++++++------------ spec/rimless/kafka_helpers_spec.rb | 5 ++ 5 files changed, 92 insertions(+), 42 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e91748d..ac07737 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,9 @@ ### next -* TODO: Replace this bullet point with an actual description of a change. +* Added support for custom topic names via the `full_name:` keyword argument on + the consumer routing table (#44) +* Added support to pass a block to the routing table + (`Rimless.consumer.topics`) to add custom topic configurations (#44) ### 1.7.7 (19 September 2024) diff --git a/lib/rimless/consumer.rb b/lib/rimless/consumer.rb index b7dd5d5..2f833fc 100644 --- a/lib/rimless/consumer.rb +++ b/lib/rimless/consumer.rb @@ -126,12 +126,23 @@ def configure(&block) # topics({ app: :test_app, name: :admins } => YourConsumer) # topics({ app: :test_app, names: %i[users admins] } => YourConsumer) # + # Examples: + # + # topics do + # topic('name') do + # consumer CustomConsumer + # end + # end + # # @param topics [Hash{Hash => Class}] the topic to consumer mapping + # @yield the given block on the routing table # # rubocop:disable Metrics/MethodLength because of the Karafka DSL - def topics(topics) + def topics(topics = [], &block) consumer_groups.draw do - consumer_group Rimless.configuration.client_id do + consumer_group(Rimless.configuration.client_id) do + instance_exec(&block) if block_given? + topics.each do |topic_parts, dest_consumer| Rimless.consumer.topic_names(topic_parts).each do |topic_name| topic(topic_name) do diff --git a/lib/rimless/kafka_helpers.rb b/lib/rimless/kafka_helpers.rb index e424648..a4ea080 100644 --- a/lib/rimless/kafka_helpers.rb +++ b/lib/rimless/kafka_helpers.rb @@ -19,14 +19,21 @@ module KafkaHelpers # Rimless.topic(:users, app: 'test-api') # @example Mix and match # Rimless.topic(name: 'test', app: :fancy_app) + # @example Full name - use as is + # Rimless.topic(full_name: 'my.custom.topic.name') # - # rubocop:disable Metrics/AbcSize because of the usage flexibility + # rubocop:disable Metrics/MethodLength because of the usage flexibility + # rubocop:disable Metrics/AbcSize dito # rubocop:disable Metrics/CyclomaticComplexity dito + # rubocop:disable Metrics/PerceivedComplexity dito def topic(*args) opts = args.last name = args.first if [String, Symbol].member?(args.first.class) if opts.is_a?(Hash) + # When we got a full name, we use it as is + return opts[:full_name] if opts.key? :full_name + name = opts[:name] if opts.key?(:name) app = opts[:app] if opts.key?(:app) end @@ -38,8 +45,10 @@ def topic(*args) "#{Rimless.topic_prefix(app)}#{name}".tr('_', '-') end + # rubocop:enable Metrics/MethodLength # rubocop:enable Metrics/AbcSize # rubocop:enable Metrics/CyclomaticComplexity + # rubocop:enable Metrics/PerceivedComplexity # Send a single message to Apache Kafka. The data is encoded according to # the given Apache Avro schema. The destination Kafka topic may be a diff --git a/spec/rimless/consumer_spec.rb b/spec/rimless/consumer_spec.rb index 91a3b41..abd8dd9 100644 --- a/spec/rimless/consumer_spec.rb +++ b/spec/rimless/consumer_spec.rb @@ -38,56 +38,78 @@ describe '.topics' do let(:topics) { Rimless.consumer.consumer_groups.first.topics } - before do - described_class.topics(topic1: Rimless::BaseConsumer, - topic2: Rimless::BaseConsumer) - end + before { described_class.consumer_groups.clear } - it 'configures a single consumer group' do - expect(Rimless.consumer.consumer_groups.count).to be(1) - end + context 'with topics, without block' do + before do + described_class.topics(topic1: Rimless::BaseConsumer, + topic2: Rimless::BaseConsumer) + end - it 'configures the correct consumer group name' do - expect(Rimless.consumer.consumer_groups.first.name).to \ - eql('test-app') - end + it 'configures a single consumer group' do + expect(Rimless.consumer.consumer_groups.count).to be(1) + end - it 'configures two topics' do - expect(topics.count).to be(2) - end + it 'configures the correct consumer group name' do + expect(Rimless.consumer.consumer_groups.first.name).to \ + eql('test-app') + end - it 'configures the first topic name correctly' do - expect(topics.first.name).to eql('test.test-app.topic1') - end + it 'configures two topics' do + expect(topics.count).to be(2) + end - it 'configures the first topic consumer correctly' do - expect(topics.first.consumer).to be(Rimless::BaseConsumer) - end + it 'configures the first topic name correctly' do + expect(topics.first.name).to eql('test.test-app.topic1') + end - it 'configures the first topic worker correctly' do - expect(topics.first.worker).to be(Rimless::ConsumerJob) - end + it 'configures the first topic consumer correctly' do + expect(topics.first.consumer).to be(Rimless::BaseConsumer) + end - it 'configures the first topic interchanger correctly' do - expect(topics.first.interchanger).to \ - be_a(Rimless::Karafka::Base64Interchanger) - end + it 'configures the first topic worker correctly' do + expect(topics.first.worker).to be(Rimless::ConsumerJob) + end - it 'configures the second topic name correctly' do - expect(topics.last.name).to eql('test.test-app.topic2') - end + it 'configures the first topic interchanger correctly' do + expect(topics.first.interchanger).to \ + be_a(Rimless::Karafka::Base64Interchanger) + end - it 'configures the second topic consumer correctly' do - expect(topics.last.consumer).to be(Rimless::BaseConsumer) - end + it 'configures the second topic name correctly' do + expect(topics.last.name).to eql('test.test-app.topic2') + end + + it 'configures the second topic consumer correctly' do + expect(topics.last.consumer).to be(Rimless::BaseConsumer) + end + + it 'configures the second topic worker correctly' do + expect(topics.last.worker).to be(Rimless::ConsumerJob) + end - it 'configures the second topic worker correctly' do - expect(topics.last.worker).to be(Rimless::ConsumerJob) + it 'configures the second topic interchanger correctly' do + expect(topics.last.interchanger).to \ + be_a(Rimless::Karafka::Base64Interchanger) + end end - it 'configures the second topic interchanger correctly' do - expect(topics.last.interchanger).to \ - be_a(Rimless::Karafka::Base64Interchanger) + context 'without topics, but a block' do + before do + described_class.topics do + topic('my.custom.topic') do + consumer Rimless::BaseConsumer + end + end + end + + it 'configures the topic name correctly' do + expect(topics.last.name).to eql('my.custom.topic') + end + + it 'configures the topic consumer correctly' do + expect(topics.last.consumer).to be(Rimless::BaseConsumer) + end end end diff --git a/spec/rimless/kafka_helpers_spec.rb b/spec/rimless/kafka_helpers_spec.rb index e320a8b..ff30664 100644 --- a/spec/rimless/kafka_helpers_spec.rb +++ b/spec/rimless/kafka_helpers_spec.rb @@ -31,6 +31,11 @@ expect(described_class.topic(name: :new_customers, app: :test_api)).to \ eql('test.test-api.new-customers') end + + it 'returns the full name when given' do + expect(described_class.topic(full_name: 'my.custom.topic')).to \ + eql('my.custom.topic') + end end describe '.sync_message' do