Skip to content

Commit

Permalink
feat(source): use fragment id only as Kafka consumer group id (#16111)
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan authored Apr 19, 2024
1 parent 4628706 commit efa1fda
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 22 deletions.
19 changes: 8 additions & 11 deletions e2e_test/source_inline/kafka/consumer_group.slt
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ b
c


# There are 4 consumer groups, 1 for batch query (not listed below), 3 for MV.
# There are 2 consumer groups, 1 for batch query (not listed below), 1 for MV.
# All of them are "Empty" state with 0 members, because we manually `assign` partitions to them.
# At the begginning, the MV's consumer group will not occur. They will be created after committing offset to Kafka.
# (enable.auto.commit defaults to true, and auto.commit.interval.ms defaults to 5s)
Expand All @@ -50,29 +50,23 @@ sleep 5s
system ok
./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv list-members
----
0,0,0
0


# The lag for batch query's group is 0, and each MV parition's group is 2 (1 of 3 consumed).
# The lag for MV's group is 0.
system ok
./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv list-lags
----
2,2,2
0


# We try to interfere by creating consumers that subscribing to the topic with the RW's group id.
system ok
./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv list-groups | tr ',' '\\n' | xargs -P4 -I {} sh -c "timeout 40s rpk topic consume test_consumer_group -g {}" &
./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv list-groups | tr ',' '\\n' | xargs -P4 -I {} sh -c "rpk topic consume test_consumer_group -g {}" &

# Wait a while for them to subscribe to the topic.
sleep 15s

# The lag is changed to 0
system ok
./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv list-lags
----
0,0,0


system ok
cat <<EOF | rpk topic produce test_consumer_group -f "%p %v\\n" -p 0
Expand All @@ -97,5 +91,8 @@ f
statement ok
DROP SOURCE s CASCADE;

system ok
pkill rpk

system ok
rpk topic delete test_consumer_group
1 change: 0 additions & 1 deletion src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ pub struct SourceEnumeratorInfo {
pub struct SourceContext {
pub actor_id: u32,
pub source_id: TableId,
// There should be a 1-1 mapping between `source_id` & `fragment_id`
pub fragment_id: u32,
pub source_name: String,
pub metrics: Arc<SourceMetrics>,
Expand Down
8 changes: 5 additions & 3 deletions src/connector/src/source/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,11 @@ pub struct RdKafkaPropertiesConsumer {
#[serde_as(as = "Option<DisplayFromStr>")]
pub fetch_max_bytes: Option<usize>,

/// Automatically and periodically commit offsets in the background.
/// Note: setting this to false does not prevent the consumer from fetching previously committed start offsets.
/// To circumvent this behaviour set specific start offsets per partition in the call to assign().
/// Whether to automatically and periodically commit offsets in the background.
///
/// Note that RisingWave does NOT rely on committed offsets. Committing offset is only for exposing the
/// progress for monitoring. Setting this to false can avoid creating consumer groups.
///
/// default: true
#[serde(rename = "properties.enable.auto.commit")]
#[serde_as(as = "Option<DisplayFromStr>")]
Expand Down
7 changes: 1 addition & 6 deletions src/connector/src/source/kafka/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ impl SplitReader for KafkaSplitReader {

// disable partition eof
config.set("enable.partition.eof", "false");
// change to `RdKafkaPropertiesConsumer::enable_auto_commit` to enable auto commit
// config.set("enable.auto.commit", "false");
config.set("auto.offset.reset", "smallest");
config.set("isolation.level", KAFKA_ISOLATION_LEVEL);
config.set("bootstrap.servers", bootstrap_servers);
Expand All @@ -77,10 +75,7 @@ impl SplitReader for KafkaSplitReader {

config.set(
"group.id",
format!(
"rw-consumer-{}-{}",
source_ctx.fragment_id, source_ctx.actor_id
),
format!("rw-consumer-{}", source_ctx.fragment_id),
);

let client_ctx = PrivateLinkConsumerContext::new(
Expand Down
2 changes: 1 addition & 1 deletion src/connector/with_options_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ KafkaProperties:
required: false
- name: properties.enable.auto.commit
field_type: bool
comments: 'Automatically and periodically commit offsets in the background. Note: setting this to false does not prevent the consumer from fetching previously committed start offsets. To circumvent this behaviour set specific start offsets per partition in the call to assign(). default: true'
comments: 'Whether to automatically and periodically commit offsets in the background. Note that RisingWave does NOT rely on committed offsets. Committing offset is only for exposing the progress for monitoring. Setting this to false can avoid creating consumer groups. default: true'
required: false
- name: broker.rewrite.endpoints
field_type: HashMap<String,String>
Expand Down

0 comments on commit efa1fda

Please sign in to comment.