diff --git a/integration_tests/kafka-cdc-sink/risingwave.sql b/integration_tests/kafka-cdc-sink/risingwave.sql index 588731a33940..fabddc9229e2 100644 --- a/integration_tests/kafka-cdc-sink/risingwave.sql +++ b/integration_tests/kafka-cdc-sink/risingwave.sql @@ -22,6 +22,5 @@ connector = 'kafka', properties.bootstrap.server='message_queue:29092', topic = 'counts', type = 'debezium', -use_transaction = 'false', primary_key = 'id' ); \ No newline at end of file diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index a5a524048bfd..498850091991 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -50,10 +50,6 @@ use crate::{ pub const KAFKA_SINK: &str = "kafka"; -const fn _default_timeout() -> Duration { - Duration::from_secs(5) -} - const fn _default_max_retries() -> u32 { 3 } @@ -62,12 +58,16 @@ const fn _default_retry_backoff() -> Duration { Duration::from_millis(100) } -const fn _default_use_transaction() -> bool { +const fn _default_force_append_only() -> bool { false } -const fn _default_force_append_only() -> bool { - false +const fn _default_message_timeout_ms() -> usize { + 5000 +} + +const fn _default_max_in_flight_requests_per_connection() -> usize { + 5 } #[derive(Debug, Clone, PartialEq, Display, Serialize, Deserialize, EnumString)] @@ -80,6 +80,8 @@ enum CompressionCodec { Zstd, } +/// See +/// for the detailed meaning of these librdkafka producer properties #[serde_as] #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RdKafkaPropertiesProducer { @@ -140,6 +142,24 @@ pub struct RdKafkaPropertiesProducer { #[serde(rename = "properties.compression.codec")] #[serde_as(as = "Option")] compression_codec: Option, + + /// Produce message timeout. + /// This value is used to limits the time a produced message waits for + /// successful delivery (including retries). + #[serde( + rename = "properties.message.timeout.ms", + default = "_default_message_timeout_ms" + )] + #[serde_as(as = "DisplayFromStr")] + message_timeout_ms: usize, + + /// The maximum number of unacknowledged requests the client will send on a single connection before blocking. + #[serde( + rename = "properties.max.in.flight.requests.per.connection", + default = "_default_max_in_flight_requests_per_connection" + )] + #[serde_as(as = "DisplayFromStr")] + max_in_flight_requests_per_connection: usize, } impl RdKafkaPropertiesProducer { @@ -171,6 +191,11 @@ impl RdKafkaPropertiesProducer { if let Some(v) = &self.compression_codec { c.set("compression.codec", v.to_string()); } + c.set("message.timeout.ms", self.message_timeout_ms.to_string()); + c.set( + "max.in.flight.requests.per.connection", + self.max_in_flight_requests_per_connection.to_string(), + ); } } @@ -193,13 +218,6 @@ pub struct KafkaConfig { )] pub force_append_only: bool, - #[serde( - rename = "properties.timeout", - default = "_default_timeout", - deserialize_with = "deserialize_duration_from_string" - )] - pub timeout: Duration, - #[serde( rename = "properties.retry.max", default = "_default_max_retries", @@ -214,12 +232,6 @@ pub struct KafkaConfig { )] pub retry_interval: Duration, - #[serde( - default = "_default_use_transaction", - deserialize_with = "deserialize_bool_from_string" - )] - pub use_transaction: bool, - /// We have parsed the primary key for an upsert kafka sink into a `usize` vector representing /// the indices of the pk columns in the frontend, so we simply store the primary key here /// as a string. @@ -363,7 +375,7 @@ pub struct KafkaSinkWriter { } impl KafkaSinkWriter { - pub async fn new(mut config: KafkaConfig, formatter: SinkFormatterImpl) -> Result { + pub async fn new(config: KafkaConfig, formatter: SinkFormatterImpl) -> Result { let inner: FutureProducer = { let mut c = ClientConfig::new(); @@ -372,10 +384,7 @@ impl KafkaSinkWriter { config.set_client(&mut c); // ClientConfig configuration - c.set("bootstrap.servers", &config.common.brokers) - .set("message.timeout.ms", "5000"); - // Note that we will not use transaction during sinking, thus set it to false - config.use_transaction = false; + c.set("bootstrap.servers", &config.common.brokers); // Create the producer context, will be used to create the producer let producer_ctx = PrivateLinkProducerContext::new( @@ -598,6 +607,8 @@ mod test { "properties.batch.num.messages".to_string() => "114514".to_string(), "properties.batch.size".to_string() => "114514".to_string(), "properties.compression.codec".to_string() => "zstd".to_string(), + "properties.message.timeout.ms".to_string() => "114514".to_string(), + "properties.max.in.flight.requests.per.connection".to_string() => "114514".to_string(), }; let c = KafkaConfig::from_hashmap(props).unwrap(); assert_eq!( @@ -608,6 +619,11 @@ mod test { c.rdkafka_properties.compression_codec, Some(CompressionCodec::Zstd) ); + assert_eq!(c.rdkafka_properties.message_timeout_ms, 114514); + assert_eq!( + c.rdkafka_properties.max_in_flight_requests_per_connection, + 114514 + ); let props: HashMap = hashmap! { // basic @@ -649,12 +665,10 @@ mod test { "topic".to_string() => "test".to_string(), "type".to_string() => "append-only".to_string(), "force_append_only".to_string() => "true".to_string(), - "use_transaction".to_string() => "False".to_string(), "properties.security.protocol".to_string() => "SASL".to_string(), "properties.sasl.mechanism".to_string() => "SASL".to_string(), "properties.sasl.username".to_string() => "test".to_string(), "properties.sasl.password".to_string() => "test".to_string(), - "properties.timeout".to_string() => "10s".to_string(), "properties.retry.max".to_string() => "20".to_string(), "properties.retry.interval".to_string() => "500ms".to_string(), }; @@ -663,8 +677,6 @@ mod test { assert_eq!(config.common.topic, "test"); assert_eq!(config.r#type, "append-only"); assert!(config.force_append_only); - assert!(!config.use_transaction); - assert_eq!(config.timeout, Duration::from_secs(10)); assert_eq!(config.max_retry_num, 20); assert_eq!(config.retry_interval, Duration::from_millis(500)); @@ -677,8 +689,6 @@ mod test { }; let config = KafkaConfig::from_hashmap(properties).unwrap(); assert!(!config.force_append_only); - assert!(!config.use_transaction); - assert_eq!(config.timeout, Duration::from_secs(5)); assert_eq!(config.max_retry_num, 3); assert_eq!(config.retry_interval, Duration::from_millis(100));