diff --git a/e2e_test/source/basic/kafka.slt b/e2e_test/source/basic/kafka.slt index 5ab9496d6633b..5377f03fdd3df 100644 --- a/e2e_test/source/basic/kafka.slt +++ b/e2e_test/source/basic/kafka.slt @@ -778,7 +778,7 @@ create table s25 (v1 int, v2 varchar) with ( properties.bootstrap.server = 'message_queue:29092', scan.startup.mode = 'earliest', properties.fetch.queue.backoff.ms = 250, - properties.queued.min.messages = 10000 + properties.queued.max.messages.kbytes = 65536 ) FORMAT PLAIN ENCODE JSON query I diff --git a/src/connector/src/source/kafka/mod.rs b/src/connector/src/source/kafka/mod.rs index 6d2d712227983..aacd39315d6d2 100644 --- a/src/connector/src/source/kafka/mod.rs +++ b/src/connector/src/source/kafka/mod.rs @@ -62,6 +62,7 @@ pub struct RdKafkaProperties { /// exceeded. This property may need to be decreased if the queue thresholds are set low /// and the application is experiencing long (~1s) delays between messages. Low values may /// increase CPU utilization. + // FIXME: need to upgrade rdkafka to v2.2.0 to use this property #[serde(rename = "properties.fetch.queue.backoff.ms")] #[serde_as(as = "Option")] pub fetch_queue_backoff_ms: Option,