diff --git a/e2e_test/source/basic/kafka.slt b/e2e_test/source/basic/kafka.slt index 2bee8838ee404..da4e0d5dbd458 100644 --- a/e2e_test/source/basic/kafka.slt +++ b/e2e_test/source/basic/kafka.slt @@ -425,6 +425,16 @@ CREATE TABLE dbz_ignore_case_json ( topic = 'debezium_ignore_case_json' ) FORMAT DEBEZIUM ENCODE JSON +# create kafka source with additional rdkafka properties +statement ok +create table source_with_rdkafka_props (v1 int, v2 varchar) with ( + connector = 'kafka', + topic = 'kafka_1_partition_topic', + properties.bootstrap.server = 'message_queue:29092', + scan.startup.mode = 'earliest', + properties.queued.min.messages = 10000, + properties.queued.max.messages.kbytes = 65536 +) FORMAT PLAIN ENCODE JSON statement ok flush; @@ -698,6 +708,11 @@ SELECT * FROM source_mv3 ORDER BY id; \x6b6b \x776561776566776566 +query I +select count(*) from source_with_rdkafka_props +---- +4 + statement ok drop materialized view source_mv1 @@ -769,3 +784,6 @@ DROP TABLE upsert_students; statement ok drop table dbz_ignore_case_json; + +statement ok +drop table source_with_rdkafka_props; diff --git a/src/connector/src/common.rs b/src/connector/src/common.rs index fc89deed9b3ef..25b25fa405268 100644 --- a/src/connector/src/common.rs +++ b/src/connector/src/common.rs @@ -21,7 +21,7 @@ use clickhouse::Client; use rdkafka::ClientConfig; use serde_derive::{Deserialize, Serialize}; use serde_with::json::JsonString; -use serde_with::serde_as; +use serde_with::{serde_as, DisplayFromStr}; use crate::aws_auth::AwsAuthProps; @@ -104,6 +104,40 @@ pub struct KafkaCommon { /// Configurations for SASL/OAUTHBEARER. #[serde(rename = "properties.sasl.oauthbearer.config")] sasl_oathbearer_config: Option, + + #[serde(flatten)] + pub rdkafka_properties: RdKafkaPropertiesCommon, +} + +#[serde_as] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RdKafkaPropertiesCommon { + /// Maximum Kafka protocol request message size. Due to differing framing overhead between + /// protocol versions the producer is unable to reliably enforce a strict max message limit at + /// produce time and may exceed the maximum size by one message in protocol ProduceRequests, + /// the broker will enforce the the topic's max.message.bytes limit + #[serde(rename = "properties.message.max.bytes")] + #[serde_as(as = "Option")] + pub message_max_bytes: Option, + + /// Maximum Kafka protocol response message size. This serves as a safety precaution to avoid + /// memory exhaustion in case of protocol hickups. This value must be at least fetch.max.bytes + /// + 512 to allow for protocol overhead; the value is adjusted automatically unless the + /// configuration property is explicitly set. + #[serde(rename = "properties.receive.message.max.bytes")] + #[serde_as(as = "Option")] + pub receive_message_max_bytes: Option, +} + +impl RdKafkaPropertiesCommon { + pub(crate) fn set_client(&self, c: &mut rdkafka::ClientConfig) { + if let Some(v) = self.message_max_bytes { + c.set("message.max.bytes", v.to_string()); + } + if let Some(v) = self.message_max_bytes { + c.set("receive.message.max.bytes", v.to_string()); + } + } } impl KafkaCommon { @@ -169,6 +203,10 @@ impl KafkaCommon { // Currently, we only support unsecured OAUTH. config.set("enable.sasl.oauthbearer.unsecure.jwt", "true"); } + + pub(crate) fn set_client(&self, c: &mut rdkafka::ClientConfig) { + self.rdkafka_properties.set_client(c); + } } #[derive(Deserialize, Serialize, Debug, Clone)] diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index a67e9c20a6600..98d221ef0040d 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -27,8 +27,9 @@ use rdkafka::ClientConfig; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; use risingwave_rpc_client::ConnectorClient; -use serde_derive::Deserialize; +use serde_derive::{Deserialize, Serialize}; use serde_json::Value; +use serde_with::{serde_as, DisplayFromStr}; use super::{ Sink, SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, @@ -68,8 +69,94 @@ const fn _default_force_append_only() -> bool { false } +#[serde_as] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RdKafkaPropertiesProducer { + /// Maximum number of messages allowed on the producer queue. This queue is shared by all + /// topics and partitions. A value of 0 disables this limit. + #[serde(rename = "properties.queue.buffering.max.messages")] + #[serde_as(as = "Option")] + pub queue_buffering_max_messages: Option, + + /// Maximum total message size sum allowed on the producer queue. This queue is shared by all + /// topics and partitions. This property has higher priority than queue.buffering.max.messages. + #[serde(rename = "properties.queue.buffering.max.kbytes")] + #[serde_as(as = "Option")] + queue_buffering_max_kbytes: Option, + + /// Delay in milliseconds to wait for messages in the producer queue to accumulate before + /// constructing message batches (MessageSets) to transmit to brokers. A higher value allows + /// larger and more effective (less overhead, improved compression) batches of messages to + /// accumulate at the expense of increased message delivery latency. + #[serde(rename = "properties.queue.buffering.max.ms")] + #[serde_as(as = "Option")] + queue_buffering_max_ms: Option, + + /// When set to true, the producer will ensure that messages are successfully produced exactly + /// once and in the original produce order. The following configuration properties are adjusted + /// automatically (if not modified by the user) when idempotence is enabled: + /// max.in.flight.requests.per.connection=5 (must be less than or equal to 5), + /// retries=INT32_MAX (must be greater than 0), acks=all, queuing.strategy=fifo. Producer + /// will fail if user-supplied configuration is incompatible. + #[serde(rename = "properties.enable.idempotence")] + #[serde_as(as = "Option")] + enable_idempotence: Option, + + /// How many times to retry sending a failing Message. + #[serde(rename = "properties.message.send.max.retries")] + #[serde_as(as = "Option")] + message_send_max_retries: Option, + + /// The backoff time in milliseconds before retrying a protocol request. + #[serde(rename = "properties.retry.backoff.ms")] + #[serde_as(as = "Option")] + retry_backoff_ms: Option, + + /// Maximum number of messages batched in one MessageSet + #[serde(rename = "properties.batch.num.messages")] + #[serde_as(as = "Option")] + batch_num_messages: Option, + + /// Maximum size (in bytes) of all messages batched in one MessageSet, including protocol + /// framing overhead. This limit is applied after the first message has been added to the + /// batch, regardless of the first message's size, this is to ensure that messages that exceed + /// batch.size are produced. + #[serde(rename = "properties.batch.size")] + #[serde_as(as = "Option")] + batch_size: Option, +} + +impl RdKafkaPropertiesProducer { + pub(crate) fn set_client(&self, c: &mut rdkafka::ClientConfig) { + if let Some(v) = self.queue_buffering_max_messages { + c.set("queue.buffering.max.messages", v.to_string()); + } + if let Some(v) = self.queue_buffering_max_kbytes { + c.set("queue.buffering.max.kbytes", v.to_string()); + } + if let Some(v) = self.queue_buffering_max_ms { + c.set("queue.buffering.max.ms", v.to_string()); + } + if let Some(v) = self.enable_idempotence { + c.set("enable.idempotence", v.to_string()); + } + if let Some(v) = self.message_send_max_retries { + c.set("message.send.max.retries", v.to_string()); + } + if let Some(v) = self.retry_backoff_ms { + c.set("retry.backoff.ms", v.to_string()); + } + if let Some(v) = self.batch_num_messages { + c.set("batch.num.messages", v.to_string()); + } + if let Some(v) = self.batch_size { + c.set("batch.size", v.to_string()); + } + } +} + +#[serde_as] #[derive(Debug, Clone, Deserialize)] -#[serde(deny_unknown_fields)] pub struct KafkaConfig { #[serde(skip_serializing)] pub connector: String, // Must be "kafka" here. @@ -118,6 +205,9 @@ pub struct KafkaConfig { /// the indices of the pk columns in the frontend, so we simply store the primary key here /// as a string. pub primary_key: Option, + + #[serde(flatten)] + pub rdkafka_properties: RdKafkaPropertiesProducer, } impl KafkaConfig { @@ -139,6 +229,13 @@ impl KafkaConfig { } Ok(config) } + + pub(crate) fn set_client(&self, c: &mut rdkafka::ClientConfig) { + self.common.set_client(c); + self.rdkafka_properties.set_client(c); + + tracing::info!("kafka client starts with: {:?}", c); + } } #[derive(Debug)] @@ -431,6 +528,7 @@ impl KafkaTransactionConductor { let inner: ThreadedProducer = { let mut c = ClientConfig::new(); config.common.set_security_properties(&mut c); + config.set_client(&mut c); c.set("bootstrap.servers", &config.common.brokers) .set("message.timeout.ms", "5000"); config.use_transaction = false; @@ -504,6 +602,55 @@ mod test { use super::*; use crate::sink::utils::*; + #[test] + fn parse_rdkafka_props() { + let props: HashMap = hashmap! { + // basic + "connector".to_string() => "kafka".to_string(), + "properties.bootstrap.server".to_string() => "localhost:9092".to_string(), + "topic".to_string() => "test".to_string(), + "type".to_string() => "append-only".to_string(), + // RdKafkaPropertiesCommon + "properties.message.max.bytes".to_string() => "12345".to_string(), + "properties.receive.message.max.bytes".to_string() => "54321".to_string(), + // RdKafkaPropertiesProducer + "properties.queue.buffering.max.messages".to_string() => "114514".to_string(), + "properties.queue.buffering.max.kbytes".to_string() => "114514".to_string(), + "properties.queue.buffering.max.ms".to_string() => "114.514".to_string(), + "properties.enable.idempotence".to_string() => "false".to_string(), + "properties.message.send.max.retries".to_string() => "114514".to_string(), + "properties.retry.backoff.ms".to_string() => "114514".to_string(), + "properties.batch.num.messages".to_string() => "114514".to_string(), + "properties.batch.size".to_string() => "114514".to_string(), + }; + let c = KafkaConfig::from_hashmap(props).unwrap(); + assert_eq!( + c.rdkafka_properties.queue_buffering_max_ms, + Some(114.514f64) + ); + + let props: HashMap = hashmap! { + // basic + "connector".to_string() => "kafka".to_string(), + "properties.bootstrap.server".to_string() => "localhost:9092".to_string(), + "topic".to_string() => "test".to_string(), + "type".to_string() => "append-only".to_string(), + + "properties.enable.idempotence".to_string() => "True".to_string(), // can only be 'true' or 'false' + }; + assert!(KafkaConfig::from_hashmap(props).is_err()); + + let props: HashMap = hashmap! { + // basic + "connector".to_string() => "kafka".to_string(), + "properties.bootstrap.server".to_string() => "localhost:9092".to_string(), + "topic".to_string() => "test".to_string(), + "type".to_string() => "append-only".to_string(), + "properties.queue.buffering.max.kbytes".to_string() => "-114514".to_string(), // usize cannot be negative + }; + assert!(KafkaConfig::from_hashmap(props).is_err()); + } + #[test] fn parse_kafka_config() { let properties: HashMap = hashmap! { diff --git a/src/connector/src/source/kafka/enumerator/client.rs b/src/connector/src/source/kafka/enumerator/client.rs index e8f12c27ba9c8..815343a56148d 100644 --- a/src/connector/src/source/kafka/enumerator/client.rs +++ b/src/connector/src/source/kafka/enumerator/client.rs @@ -67,6 +67,7 @@ impl SplitEnumerator for KafkaSplitEnumerator { config.set("bootstrap.servers", &broker_address); config.set("isolation.level", KAFKA_ISOLATION_LEVEL); common_props.set_security_properties(&mut config); + properties.set_client(&mut config); let mut scan_start_offset = match properties .scan_startup_mode .as_ref() diff --git a/src/connector/src/source/kafka/mod.rs b/src/connector/src/source/kafka/mod.rs index 878f4ff8e78a3..66562cfe5bb37 100644 --- a/src/connector/src/source/kafka/mod.rs +++ b/src/connector/src/source/kafka/mod.rs @@ -15,6 +15,7 @@ use std::time::Duration; use serde::Deserialize; +use serde_with::{serde_as, DisplayFromStr}; pub mod enumerator; pub mod private_link; @@ -33,6 +34,51 @@ pub const KAFKA_PROPS_BROKER_KEY: &str = "properties.bootstrap.server"; pub const KAFKA_PROPS_BROKER_KEY_ALIAS: &str = "kafka.brokers"; pub const PRIVATELINK_CONNECTION: &str = "privatelink"; +/// Properties for the rdkafka library. Leave a field as `None` to use the default value. +/// These properties are not intended to be exposed to users in the majority of cases. +/// +/// See also +#[serde_as] +#[derive(Clone, Debug, Deserialize)] +pub struct RdKafkaPropertiesConsumer { + /// Minimum number of messages per topic+partition librdkafka tries to maintain in the local + /// consumer queue. + #[serde(rename = "properties.queued.min.messages")] + #[serde_as(as = "Option")] + pub queued_min_messages: Option, + + #[serde(rename = "properties.queued.max.messages.kbytes")] + #[serde_as(as = "Option")] + pub queued_max_messages_kbytes: Option, + + /// Maximum time the broker may wait to fill the Fetch response with `fetch.min.`bytes of + /// messages. + #[serde(rename = "properties.fetch.wait.max.ms")] + #[serde_as(as = "Option")] + pub fetch_wait_max_ms: Option, + + /// How long to postpone the next fetch request for a topic+partition in case the current fetch + /// queue thresholds (`queued.min.messages` or `queued.max.messages.kbytes`) have been + /// exceeded. This property may need to be decreased if the queue thresholds are set low + /// and the application is experiencing long (~1s) delays between messages. Low values may + /// increase CPU utilization. + // FIXME: need to upgrade rdkafka to v2.2.0 to use this property + // #[serde(rename = "properties.fetch.queue.backoff.ms")] + // #[serde_as(as = "Option")] + // pub fetch_queue_backoff_ms: Option, + + /// Maximum amount of data the broker shall return for a Fetch request. Messages are fetched in + /// batches by the consumer and if the first message batch in the first non-empty partition of + /// the Fetch request is larger than this value, then the message batch will still be returned + /// to ensure the consumer can make progress. The maximum message batch size accepted by the + /// broker is defined via `message.max.bytes` (broker config) or `max.message.bytes` (broker + /// topic config). `fetch.max.bytes` is automatically adjusted upwards to be at least + /// `message.max.bytes` (consumer config). + #[serde(rename = "properties.fetch.max.bytes")] + #[serde_as(as = "Option")] + pub fetch_max_bytes: Option, +} + #[derive(Clone, Debug, Deserialize)] pub struct KafkaProperties { /// This parameter is not intended to be exposed to users. @@ -73,9 +119,89 @@ pub struct KafkaProperties { #[serde(flatten)] pub common: KafkaCommon, + + #[serde(flatten)] + pub rdkafka_properties: RdKafkaPropertiesConsumer, +} + +impl KafkaProperties { + pub fn set_client(&self, c: &mut rdkafka::ClientConfig) { + self.common.set_client(c); + self.rdkafka_properties.set_client(c); + + tracing::info!("kafka client starts with: {:?}", c); + } } const fn default_kafka_sync_call_timeout() -> Duration { Duration::from_secs(5) } const KAFKA_ISOLATION_LEVEL: &str = "read_committed"; + +impl RdKafkaPropertiesConsumer { + pub fn set_client(&self, c: &mut rdkafka::ClientConfig) { + if let Some(v) = &self.queued_min_messages { + c.set("queued.min.messages", v.to_string()); + } + if let Some(v) = &self.queued_max_messages_kbytes { + c.set("queued.max.messages.kbytes", v.to_string()); + } + if let Some(v) = &self.fetch_wait_max_ms { + c.set("fetch.wait.max.ms", v.to_string()); + } + // if let Some(v) = &self.fetch_queue_backoff_ms { + // c.set("fetch.queue.backoff.ms", v.to_string()); + // } + if let Some(v) = &self.fetch_max_bytes { + c.set("fetch.max.bytes", v.to_string()); + } + } +} + +#[cfg(test)] +mod test { + use std::collections::HashMap; + + use maplit::hashmap; + + use super::*; + + #[test] + fn test_parse_config_consumer_common() { + let config: HashMap = hashmap! { + // common + "properties.bootstrap.server".to_string() => "127.0.0.1:9092".to_string(), + "topic".to_string() => "test".to_string(), + // kafka props + "scan.startup.mode".to_string() => "earliest".to_string(), + // RdKafkaPropertiesCommon + "properties.message.max.bytes".to_string() => "12345".to_string(), + "properties.receive.message.max.bytes".to_string() => "54321".to_string(), + // RdKafkaPropertiesConsumer + "properties.queued.min.messages".to_string() => "114514".to_string(), + "properties.queued.max.messages.kbytes".to_string() => "114514".to_string(), + "properties.fetch.wait.max.ms".to_string() => "114514".to_string(), + "properties.fetch.max.bytes".to_string() => "114514".to_string(), + }; + + let props: KafkaProperties = + serde_json::from_value(serde_json::to_value(config).unwrap()).unwrap(); + + assert_eq!(props.scan_startup_mode, Some("earliest".to_string())); + assert_eq!( + props.common.rdkafka_properties.receive_message_max_bytes, + Some(54321) + ); + assert_eq!( + props.common.rdkafka_properties.message_max_bytes, + Some(12345) + ); + assert_eq!(props.rdkafka_properties.queued_min_messages, Some(114514)); + assert_eq!( + props.rdkafka_properties.queued_max_messages_kbytes, + Some(114514) + ); + assert_eq!(props.rdkafka_properties.fetch_wait_max_ms, Some(114514)); + assert_eq!(props.rdkafka_properties.fetch_max_bytes, Some(114514)); + } +} diff --git a/src/connector/src/source/kafka/source/reader.rs b/src/connector/src/source/kafka/source/reader.rs index f75f5167e84a5..cafaafc6bcfdc 100644 --- a/src/connector/src/source/kafka/source/reader.rs +++ b/src/connector/src/source/kafka/source/reader.rs @@ -71,6 +71,10 @@ impl SplitReader for KafkaSplitReader { config.set("bootstrap.servers", bootstrap_servers); properties.common.set_security_properties(&mut config); + properties.set_client(&mut config); + + // rdkafka fetching config + properties.rdkafka_properties.set_client(&mut config); if config.get("group.id").is_none() { config.set(