Skip to content

Commit

Permalink
feat: allow kafka auto creating topics by `properties.allow.auto.crea…
Browse files Browse the repository at this point in the history
…te.topics` (#12766)
  • Loading branch information
tabVersion authored Oct 11, 2023
1 parent 93a7054 commit 2db6f94
Showing 1 changed file with 8 additions and 0 deletions.
8 changes: 8 additions & 0 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ enum CompressionCodec {
#[serde_as]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RdKafkaPropertiesProducer {
/// Allow automatic topic creation on the broker when subscribing to or assigning non-existent topics.
#[serde(rename = "properties.allow.auto.create.topics")]
#[serde_as(as = "Option<DisplayFromStr>")]
pub allow_auto_create_topics: Option<bool>,

/// Maximum number of messages allowed on the producer queue. This queue is shared by all
/// topics and partitions. A value of 0 disables this limit.
#[serde(rename = "properties.queue.buffering.max.messages")]
Expand Down Expand Up @@ -160,6 +165,9 @@ pub struct RdKafkaPropertiesProducer {

impl RdKafkaPropertiesProducer {
pub(crate) fn set_client(&self, c: &mut rdkafka::ClientConfig) {
if let Some(v) = self.allow_auto_create_topics {
c.set("allow.auto.create.topics", v.to_string());
}
if let Some(v) = self.queue_buffering_max_messages {
c.set("queue.buffering.max.messages", v.to_string());
}
Expand Down

0 comments on commit 2db6f94

Please sign in to comment.