Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support custom kafka group id prefix #18115

Merged
merged 7 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions e2e_test/source_inline/kafka/consumer_group.slt
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,23 @@ WITH(
scan.startup.mode = 'earliest',
) FORMAT PLAIN ENCODE JSON;

# custom group id prefix
statement ok
CREATE SOURCE s2(x varchar)
WITH(
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_consumer_group',
scan.startup.mode = 'earliest',
group.id.prefix = 'my_group'
) FORMAT PLAIN ENCODE JSON;


statement ok
CREATE MATERIALIZED VIEW mv AS SELECT * from s;

statement ok
CREATE MATERIALIZED VIEW mv2 AS SELECT * from s2;

query ?
SELECT * FROM s order by x;
----
Expand Down Expand Up @@ -87,9 +101,35 @@ d
e
f


query ?
SELECT * FROM mv2 order by x;
----
a
b
c
d
e
f


statement ok
DROP SOURCE s CASCADE;

statement ok
DROP SOURCE s2 CASCADE;

## fragment id is not deterministic so comment out
# system ok
# rpk group list
# ---
# BROKER GROUP STATE
# 0 my_group-8 Empty
# 0 rw-consumer-3 Empty
# 0 rw-consumer-4294967295 Empty
# 0 rw-consumer-7 Empty


system ok
pkill rpk

Expand Down
1 change: 1 addition & 0 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ impl From<KafkaConfig> for KafkaProperties {
rdkafka_properties_consumer: Default::default(),
privatelink_common: val.privatelink_common,
aws_auth_props: val.aws_auth_props,
group_id_prefix: None,
unknown_fields: Default::default(),
}
}
Expand Down
14 changes: 14 additions & 0 deletions src/connector/src/source/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,20 @@ pub struct KafkaProperties {
)]
pub time_offset: Option<String>,

/// Specify a custom consumer group id prefix for the source.
/// Defaults to `rw-consumer`.
///
/// Notes:
/// - Each job (materialized view) will have a separated consumer group and
/// contains a generated suffix in the group id.
/// The consumer group will be `{group_id_prefix}-{fragment_id}`.
/// - The consumer group is solely for monintoring progress in some external
/// Kafka tools, and for authorization. RisingWave does not rely on committed
/// offsets, and does not join the consumer group. It just reports offsets
/// to the group.
#[serde(rename = "group.id.prefix")]
pub group_id_prefix: Option<String>,

/// This parameter is used to tell `KafkaSplitReader` to produce `UpsertMessage`s, which
/// combine both key and value fields of the Kafka message.
/// TODO: Currently, `Option<bool>` can not be parsed here.
Expand Down
6 changes: 5 additions & 1 deletion src/connector/src/source/kafka/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,13 @@ impl SplitReader for KafkaSplitReader {
properties.common.set_security_properties(&mut config);
properties.set_client(&mut config);

let group_id_prefix = properties
.group_id_prefix
.as_deref()
.unwrap_or("rw-consumer");
config.set(
"group.id",
format!("rw-consumer-{}", source_ctx.fragment_id),
format!("{}-{}", group_id_prefix, source_ctx.fragment_id),
);

let ctx_common = KafkaContextCommon::new(
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/with_options_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ fn generate_with_options_yaml_inner(path: &Path) -> String {

// Generate the output
format!(
"# THIS FILE IS AUTO_GENERATED. DO NOT EDIT\n\n{}",
"# THIS FILE IS AUTO_GENERATED. DO NOT EDIT\n# UPDATE WITH: ./risedev generate-with-options\n\n{}",
serde_yaml::to_string(&struct_infos).unwrap()
)
}
Expand Down
1 change: 1 addition & 0 deletions src/connector/with_options_sink.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# THIS FILE IS AUTO_GENERATED. DO NOT EDIT
# UPDATE WITH: ./risedev generate-with-options

BigQueryConfig:
fields:
Expand Down
5 changes: 5 additions & 0 deletions src/connector/with_options_source.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# THIS FILE IS AUTO_GENERATED. DO NOT EDIT
# UPDATE WITH: ./risedev generate-with-options

DatagenProperties:
fields:
Expand Down Expand Up @@ -100,6 +101,10 @@ KafkaProperties:
alias:
- kafka.time.offset
- scan.startup.timestamp_millis
- name: group.id.prefix
field_type: String
comments: 'Specify a custom consumer group id prefix for the source. Defaults to `rw-consumer`. Notes: - Each job (materialized view) will have a separated consumer group and contains a generated suffix in the group id. The consumer group will be `{group_id_prefix}-{fragment_id}`. - The consumer group is solely for monintoring progress in some external Kafka tools, and for authorization. RisingWave does not rely on committed offsets, and does not join the consumer group. It just reports offsets to the group.'
required: false
- name: upsert
field_type: String
comments: 'This parameter is used to tell `KafkaSplitReader` to produce `UpsertMessage`s, which combine both key and value fields of the Kafka message. TODO: Currently, `Option<bool>` can not be parsed here.'
Expand Down
Loading