From ac072f57c342a0985fc4936c6af7bd0c9f39b02f Mon Sep 17 00:00:00 2001 From: Philipp Trulson Date: Fri, 11 Mar 2022 11:54:00 +0100 Subject: [PATCH] Support kafka 3 topics --- manifests/topic.pp | 25 ++++++++++++++++++++----- spec/defines/topic_spec.rb | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 5 deletions(-) diff --git a/manifests/topic.pp b/manifests/topic.pp index fa66a32..627b1cd 100644 --- a/manifests/topic.pp +++ b/manifests/topic.pp @@ -14,7 +14,10 @@ # # @param zookeeper # The connection string for the ZooKeeper connection in the form host:port. -# Multiple hosts can be given to allow fail-over. +# Multiple hosts can be given to allow fail-over. Kafka < 3.0.0 only! +# +# @param bootstrap_server +# The Kafka server to connect to in the form host:port. Kafka >= 2.2.0 only! # # @param replication_factor # The replication factor for each partition in the topic being created. If @@ -32,17 +35,29 @@ # See the Kafka documentation for full details on the topic configs. # define kafka::topic ( - String[1] $ensure = '', - String[1] $zookeeper = '', + Optional[String[1]] $ensure = undef, + Optional[String[1]] $zookeeper = undef, + Optional[String[1]] $bootstrap_server = undef, Integer $replication_factor = 1, Integer $partitions = 1, String[1] $bin_dir = '/opt/kafka/bin', Optional[Hash[String[1],String[1]]] $config = undef, ) { $_zookeeper = "--zookeeper ${zookeeper}" + $_bootstrap_server = "--bootstrap-server ${bootstrap_server}" $_replication_factor = "--replication-factor ${replication_factor}" $_partitions = "--partitions ${partitions}" + if !$zookeeper and !$bootstrap_server { + fail('Either zookeeper or bootstrap_server parameter must be defined!') + } + + if $zookeeper { + $_connection = $_zookeeper + } else { + $_connection = $_bootstrap_server + } + if $config { $_config_array = $config.map |$key, $value| { "--config ${key}=${value}" } $_config = join($_config_array, ' ') @@ -53,8 +68,8 @@ if $ensure == 'present' { exec { "create topic ${name}": path => "/usr/bin:/usr/sbin/:/bin:/sbin:${bin_dir}", - command => "kafka-topics.sh --create ${_zookeeper} ${_replication_factor} ${_partitions} --topic ${name} ${_config}", - unless => "kafka-topics.sh --list ${_zookeeper} | grep -x ${name}", + command => "kafka-topics.sh --create ${_connection} ${_replication_factor} ${_partitions} --topic ${name} ${_config}", + unless => "kafka-topics.sh --list ${_connection} | grep -x ${name}", } } } diff --git a/spec/defines/topic_spec.rb b/spec/defines/topic_spec.rb index 8fe4942..4ddea38 100644 --- a/spec/defines/topic_spec.rb +++ b/spec/defines/topic_spec.rb @@ -25,6 +25,39 @@ } end + context 'when create topic demo for kafka v3' do + let(:title) { 'demo' } + let :params do + { + 'ensure' => 'present', + 'bootstrap_server' => 'localhost:9092', + 'replication_factor' => 1, + 'partitions' => 1 + } + end + + it { + is_expected.to contain_exec('create topic demo').with( + command: 'kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic demo ' + ) + } + end + + context 'when create topic without zookeeper or bootstrap_server' do + let(:title) { 'demo' } + let :params do + { + 'ensure' => 'present', + 'replication_factor' => 1, + 'partitions' => 1 + } + end + + it { + is_expected.to compile.and_raise_error(%r{Either zookeeper or bootstrap_server parameter must be defined!}) + } + end + context 'when create topic demo with config' do let(:title) { 'demo' } let :params do