Skip to content

Commit

Permalink
feat(kafka source): expose rdkafka fetch params (#11203)
Browse files Browse the repository at this point in the history
Co-authored-by: tabVersion <[email protected]>
  • Loading branch information
fuyufjh and tabVersion authored Jul 28, 2023
1 parent b93354d commit fb9a324
Show file tree
Hide file tree
Showing 6 changed files with 337 additions and 3 deletions.
18 changes: 18 additions & 0 deletions e2e_test/source/basic/kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,16 @@ CREATE TABLE dbz_ignore_case_json (
topic = 'debezium_ignore_case_json'
) FORMAT DEBEZIUM ENCODE JSON

# create kafka source with additional rdkafka properties
statement ok
create table source_with_rdkafka_props (v1 int, v2 varchar) with (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest',
properties.queued.min.messages = 10000,
properties.queued.max.messages.kbytes = 65536
) FORMAT PLAIN ENCODE JSON

statement ok
flush;
Expand Down Expand Up @@ -698,6 +708,11 @@ SELECT * FROM source_mv3 ORDER BY id;
\x6b6b
\x776561776566776566

query I
select count(*) from source_with_rdkafka_props
----
4

statement ok
drop materialized view source_mv1

Expand Down Expand Up @@ -769,3 +784,6 @@ DROP TABLE upsert_students;

statement ok
drop table dbz_ignore_case_json;

statement ok
drop table source_with_rdkafka_props;
40 changes: 39 additions & 1 deletion src/connector/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use clickhouse::Client;
use rdkafka::ClientConfig;
use serde_derive::{Deserialize, Serialize};
use serde_with::json::JsonString;
use serde_with::serde_as;
use serde_with::{serde_as, DisplayFromStr};

use crate::aws_auth::AwsAuthProps;

Expand Down Expand Up @@ -104,6 +104,40 @@ pub struct KafkaCommon {
/// Configurations for SASL/OAUTHBEARER.
#[serde(rename = "properties.sasl.oauthbearer.config")]
sasl_oathbearer_config: Option<String>,

#[serde(flatten)]
pub rdkafka_properties: RdKafkaPropertiesCommon,
}

#[serde_as]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RdKafkaPropertiesCommon {
/// Maximum Kafka protocol request message size. Due to differing framing overhead between
/// protocol versions the producer is unable to reliably enforce a strict max message limit at
/// produce time and may exceed the maximum size by one message in protocol ProduceRequests,
/// the broker will enforce the the topic's max.message.bytes limit
#[serde(rename = "properties.message.max.bytes")]
#[serde_as(as = "Option<DisplayFromStr>")]
pub message_max_bytes: Option<usize>,

/// Maximum Kafka protocol response message size. This serves as a safety precaution to avoid
/// memory exhaustion in case of protocol hickups. This value must be at least fetch.max.bytes
/// + 512 to allow for protocol overhead; the value is adjusted automatically unless the
/// configuration property is explicitly set.
#[serde(rename = "properties.receive.message.max.bytes")]
#[serde_as(as = "Option<DisplayFromStr>")]
pub receive_message_max_bytes: Option<usize>,
}

impl RdKafkaPropertiesCommon {
pub(crate) fn set_client(&self, c: &mut rdkafka::ClientConfig) {
if let Some(v) = self.message_max_bytes {
c.set("message.max.bytes", v.to_string());
}
if let Some(v) = self.message_max_bytes {
c.set("receive.message.max.bytes", v.to_string());
}
}
}

impl KafkaCommon {
Expand Down Expand Up @@ -169,6 +203,10 @@ impl KafkaCommon {
// Currently, we only support unsecured OAUTH.
config.set("enable.sasl.oauthbearer.unsecure.jwt", "true");
}

pub(crate) fn set_client(&self, c: &mut rdkafka::ClientConfig) {
self.rdkafka_properties.set_client(c);
}
}

#[derive(Deserialize, Serialize, Debug, Clone)]
Expand Down
151 changes: 149 additions & 2 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ use rdkafka::ClientConfig;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_rpc_client::ConnectorClient;
use serde_derive::Deserialize;
use serde_derive::{Deserialize, Serialize};
use serde_json::Value;
use serde_with::{serde_as, DisplayFromStr};

use super::{
Sink, SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
Expand Down Expand Up @@ -68,8 +69,94 @@ const fn _default_force_append_only() -> bool {
false
}

#[serde_as]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RdKafkaPropertiesProducer {
/// Maximum number of messages allowed on the producer queue. This queue is shared by all
/// topics and partitions. A value of 0 disables this limit.
#[serde(rename = "properties.queue.buffering.max.messages")]
#[serde_as(as = "Option<DisplayFromStr>")]
pub queue_buffering_max_messages: Option<usize>,

/// Maximum total message size sum allowed on the producer queue. This queue is shared by all
/// topics and partitions. This property has higher priority than queue.buffering.max.messages.
#[serde(rename = "properties.queue.buffering.max.kbytes")]
#[serde_as(as = "Option<DisplayFromStr>")]
queue_buffering_max_kbytes: Option<usize>,

/// Delay in milliseconds to wait for messages in the producer queue to accumulate before
/// constructing message batches (MessageSets) to transmit to brokers. A higher value allows
/// larger and more effective (less overhead, improved compression) batches of messages to
/// accumulate at the expense of increased message delivery latency.
#[serde(rename = "properties.queue.buffering.max.ms")]
#[serde_as(as = "Option<DisplayFromStr>")]
queue_buffering_max_ms: Option<f64>,

/// When set to true, the producer will ensure that messages are successfully produced exactly
/// once and in the original produce order. The following configuration properties are adjusted
/// automatically (if not modified by the user) when idempotence is enabled:
/// max.in.flight.requests.per.connection=5 (must be less than or equal to 5),
/// retries=INT32_MAX (must be greater than 0), acks=all, queuing.strategy=fifo. Producer
/// will fail if user-supplied configuration is incompatible.
#[serde(rename = "properties.enable.idempotence")]
#[serde_as(as = "Option<DisplayFromStr>")]
enable_idempotence: Option<bool>,

/// How many times to retry sending a failing Message.
#[serde(rename = "properties.message.send.max.retries")]
#[serde_as(as = "Option<DisplayFromStr>")]
message_send_max_retries: Option<usize>,

/// The backoff time in milliseconds before retrying a protocol request.
#[serde(rename = "properties.retry.backoff.ms")]
#[serde_as(as = "Option<DisplayFromStr>")]
retry_backoff_ms: Option<usize>,

/// Maximum number of messages batched in one MessageSet
#[serde(rename = "properties.batch.num.messages")]
#[serde_as(as = "Option<DisplayFromStr>")]
batch_num_messages: Option<usize>,

/// Maximum size (in bytes) of all messages batched in one MessageSet, including protocol
/// framing overhead. This limit is applied after the first message has been added to the
/// batch, regardless of the first message's size, this is to ensure that messages that exceed
/// batch.size are produced.
#[serde(rename = "properties.batch.size")]
#[serde_as(as = "Option<DisplayFromStr>")]
batch_size: Option<usize>,
}

impl RdKafkaPropertiesProducer {
pub(crate) fn set_client(&self, c: &mut rdkafka::ClientConfig) {
if let Some(v) = self.queue_buffering_max_messages {
c.set("queue.buffering.max.messages", v.to_string());
}
if let Some(v) = self.queue_buffering_max_kbytes {
c.set("queue.buffering.max.kbytes", v.to_string());
}
if let Some(v) = self.queue_buffering_max_ms {
c.set("queue.buffering.max.ms", v.to_string());
}
if let Some(v) = self.enable_idempotence {
c.set("enable.idempotence", v.to_string());
}
if let Some(v) = self.message_send_max_retries {
c.set("message.send.max.retries", v.to_string());
}
if let Some(v) = self.retry_backoff_ms {
c.set("retry.backoff.ms", v.to_string());
}
if let Some(v) = self.batch_num_messages {
c.set("batch.num.messages", v.to_string());
}
if let Some(v) = self.batch_size {
c.set("batch.size", v.to_string());
}
}
}

#[serde_as]
#[derive(Debug, Clone, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct KafkaConfig {
#[serde(skip_serializing)]
pub connector: String, // Must be "kafka" here.
Expand Down Expand Up @@ -118,6 +205,9 @@ pub struct KafkaConfig {
/// the indices of the pk columns in the frontend, so we simply store the primary key here
/// as a string.
pub primary_key: Option<String>,

#[serde(flatten)]
pub rdkafka_properties: RdKafkaPropertiesProducer,
}

impl KafkaConfig {
Expand All @@ -139,6 +229,13 @@ impl KafkaConfig {
}
Ok(config)
}

pub(crate) fn set_client(&self, c: &mut rdkafka::ClientConfig) {
self.common.set_client(c);
self.rdkafka_properties.set_client(c);

tracing::info!("kafka client starts with: {:?}", c);
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -431,6 +528,7 @@ impl KafkaTransactionConductor {
let inner: ThreadedProducer<PrivateLinkProducerContext> = {
let mut c = ClientConfig::new();
config.common.set_security_properties(&mut c);
config.set_client(&mut c);
c.set("bootstrap.servers", &config.common.brokers)
.set("message.timeout.ms", "5000");
config.use_transaction = false;
Expand Down Expand Up @@ -504,6 +602,55 @@ mod test {
use super::*;
use crate::sink::utils::*;

#[test]
fn parse_rdkafka_props() {
let props: HashMap<String, String> = hashmap! {
// basic
"connector".to_string() => "kafka".to_string(),
"properties.bootstrap.server".to_string() => "localhost:9092".to_string(),
"topic".to_string() => "test".to_string(),
"type".to_string() => "append-only".to_string(),
// RdKafkaPropertiesCommon
"properties.message.max.bytes".to_string() => "12345".to_string(),
"properties.receive.message.max.bytes".to_string() => "54321".to_string(),
// RdKafkaPropertiesProducer
"properties.queue.buffering.max.messages".to_string() => "114514".to_string(),
"properties.queue.buffering.max.kbytes".to_string() => "114514".to_string(),
"properties.queue.buffering.max.ms".to_string() => "114.514".to_string(),
"properties.enable.idempotence".to_string() => "false".to_string(),
"properties.message.send.max.retries".to_string() => "114514".to_string(),
"properties.retry.backoff.ms".to_string() => "114514".to_string(),
"properties.batch.num.messages".to_string() => "114514".to_string(),
"properties.batch.size".to_string() => "114514".to_string(),
};
let c = KafkaConfig::from_hashmap(props).unwrap();
assert_eq!(
c.rdkafka_properties.queue_buffering_max_ms,
Some(114.514f64)
);

let props: HashMap<String, String> = hashmap! {
// basic
"connector".to_string() => "kafka".to_string(),
"properties.bootstrap.server".to_string() => "localhost:9092".to_string(),
"topic".to_string() => "test".to_string(),
"type".to_string() => "append-only".to_string(),

"properties.enable.idempotence".to_string() => "True".to_string(), // can only be 'true' or 'false'
};
assert!(KafkaConfig::from_hashmap(props).is_err());

let props: HashMap<String, String> = hashmap! {
// basic
"connector".to_string() => "kafka".to_string(),
"properties.bootstrap.server".to_string() => "localhost:9092".to_string(),
"topic".to_string() => "test".to_string(),
"type".to_string() => "append-only".to_string(),
"properties.queue.buffering.max.kbytes".to_string() => "-114514".to_string(), // usize cannot be negative
};
assert!(KafkaConfig::from_hashmap(props).is_err());
}

#[test]
fn parse_kafka_config() {
let properties: HashMap<String, String> = hashmap! {
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/source/kafka/enumerator/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ impl SplitEnumerator for KafkaSplitEnumerator {
config.set("bootstrap.servers", &broker_address);
config.set("isolation.level", KAFKA_ISOLATION_LEVEL);
common_props.set_security_properties(&mut config);
properties.set_client(&mut config);
let mut scan_start_offset = match properties
.scan_startup_mode
.as_ref()
Expand Down
Loading

0 comments on commit fb9a324

Please sign in to comment.