From 2da639ffffa970da9f2217c451d6f8f9a708c739 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 20 Aug 2024 11:14:22 +0800 Subject: [PATCH] feat: support custom kafka group id prefix Signed-off-by: xxchan --- .../source_inline/kafka/consumer_group.slt | 44 ++++++++++++++++++- src/connector/src/sink/kafka.rs | 1 + src/connector/src/source/kafka/mod.rs | 14 ++++++ .../src/source/kafka/source/reader.rs | 6 ++- 4 files changed, 62 insertions(+), 3 deletions(-) diff --git a/e2e_test/source_inline/kafka/consumer_group.slt b/e2e_test/source_inline/kafka/consumer_group.slt index a71329e46509..69a3e004a53a 100644 --- a/e2e_test/source_inline/kafka/consumer_group.slt +++ b/e2e_test/source_inline/kafka/consumer_group.slt @@ -20,9 +20,23 @@ WITH( scan.startup.mode = 'earliest', ) FORMAT PLAIN ENCODE JSON; +# custom group id prefix +statement ok +CREATE SOURCE s2(x varchar) +WITH( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'test_consumer_group', + scan.startup.mode = 'earliest', + group.id.prefix = 'my_group' +) FORMAT PLAIN ENCODE JSON; + + statement ok CREATE MATERIALIZED VIEW mv AS SELECT * from s; +statement ok +CREATE MATERIALIZED VIEW mv2 AS SELECT * from s2; + query ? SELECT * FROM s order by x; ---- @@ -87,11 +101,37 @@ d e f + +query ? +SELECT * FROM mv2 order by x; +---- +a +b +c +d +e +f + + statement ok DROP SOURCE s CASCADE; +statement ok +DROP SOURCE s2 CASCADE; + +## fragment id is not deterministic so comment out +# system ok +# rpk group list +# --- +# BROKER GROUP STATE +# 0 my_group-8 Empty +# 0 rw-consumer-3 Empty +# 0 rw-consumer-4294967295 Empty +# 0 rw-consumer-7 Empty + + system ok pkill rpk -system ok -rpk topic delete test_consumer_group +# system ok +# rpk topic delete test_consumer_group diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 3a6914d2249c..588c5d99ae95 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -280,6 +280,7 @@ impl From for KafkaProperties { rdkafka_properties_consumer: Default::default(), privatelink_common: val.privatelink_common, aws_auth_props: val.aws_auth_props, + group_id_prefix: None, unknown_fields: Default::default(), } } diff --git a/src/connector/src/source/kafka/mod.rs b/src/connector/src/source/kafka/mod.rs index 2360f9fb8a33..125bf73a3529 100644 --- a/src/connector/src/source/kafka/mod.rs +++ b/src/connector/src/source/kafka/mod.rs @@ -120,6 +120,20 @@ pub struct KafkaProperties { )] pub time_offset: Option, + /// Specify a custom consumer group id prefix for the source. + /// Defaults to `rw-consumer`. + /// + /// Notes: + /// - Each job (materialized view) will have a separated consumer group and + /// contains a generated suffix in the group id. + /// The consumer group will be `{group_id_prefix}-{fragment_id}`. + /// - The consumer group is solely for monintoring progress in some external + /// Kafka tools, and for authorization. RisingWave does not rely on committed + /// offsets, and does not join the consumer group. It just reports offsets + /// to the group. + #[serde(rename = "group.id.prefix")] + pub group_id_prefix: Option, + /// This parameter is used to tell `KafkaSplitReader` to produce `UpsertMessage`s, which /// combine both key and value fields of the Kafka message. /// TODO: Currently, `Option` can not be parsed here. diff --git a/src/connector/src/source/kafka/source/reader.rs b/src/connector/src/source/kafka/source/reader.rs index 798069ddb12f..5ace1820b424 100644 --- a/src/connector/src/source/kafka/source/reader.rs +++ b/src/connector/src/source/kafka/source/reader.rs @@ -73,9 +73,13 @@ impl SplitReader for KafkaSplitReader { properties.common.set_security_properties(&mut config); properties.set_client(&mut config); + let group_id_prefix = properties + .group_id_prefix + .as_deref() + .unwrap_or("rw-consumer"); config.set( "group.id", - format!("rw-consumer-{}", source_ctx.fragment_id), + format!("{}-{}", group_id_prefix, source_ctx.fragment_id), ); let ctx_common = KafkaContextCommon::new(