Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support custom kafka group id prefix (#18115) #18154

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions e2e_test/source_inline/kafka/consumer_group.slt
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,23 @@ WITH(
scan.startup.mode = 'earliest',
) FORMAT PLAIN ENCODE JSON;

# custom group id prefix
statement ok
CREATE SOURCE s2(x varchar)
WITH(
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_consumer_group',
scan.startup.mode = 'earliest',
group.id.prefix = 'my_group'
) FORMAT PLAIN ENCODE JSON;


statement ok
CREATE MATERIALIZED VIEW mv AS SELECT * from s;

statement ok
CREATE MATERIALIZED VIEW mv2 AS SELECT * from s2;

query ?
SELECT * FROM s order by x;
----
Expand Down Expand Up @@ -87,9 +101,35 @@ d
e
f


query ?
SELECT * FROM mv2 order by x;
----
a
b
c
d
e
f


statement ok
DROP SOURCE s CASCADE;

statement ok
DROP SOURCE s2 CASCADE;

## fragment id is not deterministic so comment out
# system ok
# rpk group list
# ---
# BROKER GROUP STATE
# 0 my_group-8 Empty
# 0 rw-consumer-3 Empty
# 0 rw-consumer-4294967295 Empty
# 0 rw-consumer-7 Empty


system ok
pkill rpk

Expand Down
1 change: 1 addition & 0 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ impl From<KafkaConfig> for KafkaProperties {
rdkafka_properties_consumer: Default::default(),
privatelink_common: val.privatelink_common,
aws_auth_props: val.aws_auth_props,
group_id_prefix: None,
unknown_fields: Default::default(),
}
}
Expand Down
14 changes: 14 additions & 0 deletions src/connector/src/source/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,20 @@ pub struct KafkaProperties {
)]
pub time_offset: Option<String>,

/// Specify a custom consumer group id prefix for the source.
/// Defaults to `rw-consumer`.
///
/// Notes:
/// - Each job (materialized view) will have a separated consumer group and
/// contains a generated suffix in the group id.
/// The consumer group will be `{group_id_prefix}-{fragment_id}`.
/// - The consumer group is solely for monintoring progress in some external
/// Kafka tools, and for authorization. RisingWave does not rely on committed
/// offsets, and does not join the consumer group. It just reports offsets
/// to the group.
#[serde(rename = "group.id.prefix")]
pub group_id_prefix: Option<String>,

/// This parameter is used to tell `KafkaSplitReader` to produce `UpsertMessage`s, which
/// combine both key and value fields of the Kafka message.
/// TODO: Currently, `Option<bool>` can not be parsed here.
Expand Down
6 changes: 5 additions & 1 deletion src/connector/src/source/kafka/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,13 @@ impl SplitReader for KafkaSplitReader {
properties.common.set_security_properties(&mut config);
properties.set_client(&mut config);

let group_id_prefix = properties
.group_id_prefix
.as_deref()
.unwrap_or("rw-consumer");
config.set(
"group.id",
format!("rw-consumer-{}", source_ctx.fragment_id),
format!("{}-{}", group_id_prefix, source_ctx.fragment_id),
);

let ctx_common = KafkaContextCommon::new(
Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/with_options_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ fn generate_with_options_yaml_inner(path: &Path) -> String {

// Generate the output
format!(
"# THIS FILE IS AUTO_GENERATED. DO NOT EDIT\n\n{}",
"# THIS FILE IS AUTO_GENERATED. DO NOT EDIT\n# UPDATE WITH: ./risedev generate-with-options\n\n{}",
serde_yaml::to_string(&struct_infos).unwrap()
)
}
Expand Down Expand Up @@ -238,14 +238,14 @@ fn extract_comments(attrs: &[Attribute]) -> String {
if let Ok(Meta::NameValue(mnv)) = attr.parse_meta() {
if mnv.path.is_ident("doc") {
if let syn::Lit::Str(lit_str) = mnv.lit {
return Some(lit_str.value());
return Some(lit_str.value().trim().to_string());
}
}
}
None
})
.collect::<Vec<_>>()
.join(" ")
.join("\n")
.trim()
.to_string()
}
Expand Down
123 changes: 100 additions & 23 deletions src/connector/with_options_sink.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# THIS FILE IS AUTO_GENERATED. DO NOT EDIT
# UPDATE WITH: ./risedev generate-with-options

BigQueryConfig:
fields:
Expand Down Expand Up @@ -224,11 +225,17 @@ GooglePubSubConfig:
required: true
- name: pubsub.emulator_host
field_type: String
comments: use the connector with a pubsub emulator <https://cloud.google.com/pubsub/docs/emulator>
comments: |-
use the connector with a pubsub emulator
<https://cloud.google.com/pubsub/docs/emulator>
required: false
- name: pubsub.credentials
field_type: String
comments: A JSON string containing the service account credentials for authorization, see the [service-account](https://developers.google.com/workspace/guides/create-credentials#create_credentials_for_a_service_account) credentials guide. The provided account credential must have the `pubsub.publisher` [role](https://cloud.google.com/pubsub/docs/access-control#roles)
comments: |-
A JSON string containing the service account credentials for authorization,
see the [service-account](https://developers.google.com/workspace/guides/create-credentials#create_credentials_for_a_service_account) credentials guide.
The provided account credential must have the
`pubsub.publisher` [role](https://cloud.google.com/pubsub/docs/access-control#roles)
required: false
IcebergConfig:
fields:
Expand Down Expand Up @@ -301,7 +308,9 @@ KafkaConfig:
default: 'Duration :: from_secs (5)'
- name: properties.security.protocol
field_type: String
comments: Security protocol used for RisingWave to communicate with Kafka brokers. Could be PLAINTEXT, SSL, SASL_PLAINTEXT or SASL_SSL.
comments: |-
Security protocol used for RisingWave to communicate with Kafka brokers. Could be
PLAINTEXT, SSL, SASL_PLAINTEXT or SASL_SSL.
required: false
- name: properties.ssl.endpoint.identification.algorithm
field_type: String
Expand Down Expand Up @@ -368,15 +377,26 @@ KafkaConfig:
default: 'Duration :: from_millis (100)'
- name: primary_key
field_type: String
comments: We have parsed the primary key for an upsert kafka sink into a `usize` vector representing the indices of the pk columns in the frontend, so we simply store the primary key here as a string.
comments: |-
We have parsed the primary key for an upsert kafka sink into a `usize` vector representing
the indices of the pk columns in the frontend, so we simply store the primary key here
as a string.
required: false
- name: properties.message.max.bytes
field_type: usize
comments: Maximum Kafka protocol request message size. Due to differing framing overhead between protocol versions the producer is unable to reliably enforce a strict max message limit at produce time and may exceed the maximum size by one message in protocol ProduceRequests, the broker will enforce the the topic's max.message.bytes limit
comments: |-
Maximum Kafka protocol request message size. Due to differing framing overhead between
protocol versions the producer is unable to reliably enforce a strict max message limit at
produce time and may exceed the maximum size by one message in protocol ProduceRequests,
the broker will enforce the the topic's max.message.bytes limit
required: false
- name: properties.receive.message.max.bytes
field_type: usize
comments: Maximum Kafka protocol response message size. This serves as a safety precaution to avoid memory exhaustion in case of protocol hickups. This value must be at least fetch.max.bytes + 512 to allow for protocol overhead; the value is adjusted automatically unless the configuration property is explicitly set.
comments: |-
Maximum Kafka protocol response message size. This serves as a safety precaution to avoid
memory exhaustion in case of protocol hickups. This value must be at least fetch.max.bytes
+ 512 to allow for protocol overhead; the value is adjusted automatically unless the
configuration property is explicitly set.
required: false
- name: properties.statistics.interval.ms
field_type: usize
Expand All @@ -394,19 +414,33 @@ KafkaConfig:
required: false
- name: properties.queue.buffering.max.messages
field_type: usize
comments: Maximum number of messages allowed on the producer queue. This queue is shared by all topics and partitions. A value of 0 disables this limit.
comments: |-
Maximum number of messages allowed on the producer queue. This queue is shared by all
topics and partitions. A value of 0 disables this limit.
required: false
- name: properties.queue.buffering.max.kbytes
field_type: usize
comments: Maximum total message size sum allowed on the producer queue. This queue is shared by all topics and partitions. This property has higher priority than queue.buffering.max.messages.
comments: |-
Maximum total message size sum allowed on the producer queue. This queue is shared by all
topics and partitions. This property has higher priority than queue.buffering.max.messages.
required: false
- name: properties.queue.buffering.max.ms
field_type: f64
comments: Delay in milliseconds to wait for messages in the producer queue to accumulate before constructing message batches (MessageSets) to transmit to brokers. A higher value allows larger and more effective (less overhead, improved compression) batches of messages to accumulate at the expense of increased message delivery latency.
comments: |-
Delay in milliseconds to wait for messages in the producer queue to accumulate before
constructing message batches (MessageSets) to transmit to brokers. A higher value allows
larger and more effective (less overhead, improved compression) batches of messages to
accumulate at the expense of increased message delivery latency.
required: false
- name: properties.enable.idempotence
field_type: bool
comments: 'When set to true, the producer will ensure that messages are successfully produced exactly once and in the original produce order. The following configuration properties are adjusted automatically (if not modified by the user) when idempotence is enabled: max.in.flight.requests.per.connection=5 (must be less than or equal to 5), retries=INT32_MAX (must be greater than 0), acks=all, queuing.strategy=fifo. Producer will fail if user-supplied configuration is incompatible.'
comments: |-
When set to true, the producer will ensure that messages are successfully produced exactly
once and in the original produce order. The following configuration properties are adjusted
automatically (if not modified by the user) when idempotence is enabled:
max.in.flight.requests.per.connection=5 (must be less than or equal to 5),
retries=INT32_MAX (must be greater than 0), acks=all, queuing.strategy=fifo. Producer
will fail if user-supplied configuration is incompatible.
required: false
- name: properties.message.send.max.retries
field_type: usize
Expand All @@ -422,15 +456,22 @@ KafkaConfig:
required: false
- name: properties.batch.size
field_type: usize
comments: Maximum size (in bytes) of all messages batched in one MessageSet, including protocol framing overhead. This limit is applied after the first message has been added to the batch, regardless of the first message's size, this is to ensure that messages that exceed batch.size are produced.
comments: |-
Maximum size (in bytes) of all messages batched in one MessageSet, including protocol
framing overhead. This limit is applied after the first message has been added to the
batch, regardless of the first message's size, this is to ensure that messages that exceed
batch.size are produced.
required: false
- name: properties.compression.codec
field_type: CompressionCodec
comments: Compression codec to use for compressing message sets.
required: false
- name: properties.message.timeout.ms
field_type: usize
comments: Produce message timeout. This value is used to limits the time a produced message waits for successful delivery (including retries).
comments: |-
Produce message timeout.
This value is used to limits the time a produced message waits for
successful delivery (including retries).
required: false
default: '5000'
- name: properties.max.in.flight.requests.per.connection
Expand Down Expand Up @@ -538,18 +579,27 @@ MongodbConfig:
required: true
- name: collection.name
field_type: String
comments: The collection name where data should be written to or read from. For sinks, the format is `db_name.collection_name`. Data can also be written to dynamic collections, see `collection.name.field` for more information.
comments: |-
The collection name where data should be written to or read from. For sinks, the format is
`db_name.collection_name`. Data can also be written to dynamic collections, see `collection.name.field`
for more information.
required: true
- name: r#type
field_type: String
required: true
- name: collection.name.field
field_type: String
comments: The dynamic collection name where data should be sunk to. If specified, the field value will be used as the collection name. The collection name format is same as `collection.name`. If the field value is null or an empty string, then the `collection.name` will be used as a fallback destination.
comments: |-
The dynamic collection name where data should be sunk to. If specified, the field value will be used
as the collection name. The collection name format is same as `collection.name`. If the field value is
null or an empty string, then the `collection.name` will be used as a fallback destination.
required: false
- name: collection.name.field.drop
field_type: bool
comments: Controls whether the field value of `collection.name.field` should be dropped when sinking. Set this option to true to avoid the duplicate values of `collection.name.field` being written to the result collection.
comments: |-
Controls whether the field value of `collection.name.field` should be dropped when sinking.
Set this option to true to avoid the duplicate values of `collection.name.field` being written to the
result collection.
required: false
default: Default::default
- name: mongodb.bulk_write.max_entries
Expand All @@ -561,11 +611,17 @@ MqttConfig:
fields:
- name: url
field_type: String
comments: The url of the broker to connect to. e.g. tcp://localhost. Must be prefixed with one of either `tcp://`, `mqtt://`, `ssl://`,`mqtts://`, to denote the protocol for establishing a connection with the broker. `mqtts://`, `ssl://` will use the native certificates if no ca is specified
comments: |-
The url of the broker to connect to. e.g. tcp://localhost.
Must be prefixed with one of either `tcp://`, `mqtt://`, `ssl://`,`mqtts://`,
to denote the protocol for establishing a connection with the broker.
`mqtts://`, `ssl://` will use the native certificates if no ca is specified
required: true
- name: qos
field_type: QualityOfService
comments: The quality of service to use when publishing messages. Defaults to at_most_once. Could be at_most_once, at_least_once or exactly_once
comments: |-
The quality of service to use when publishing messages. Defaults to at_most_once.
Could be at_most_once, at_least_once or exactly_once
required: false
- name: username
field_type: String
Expand All @@ -577,11 +633,19 @@ MqttConfig:
required: false
- name: client_prefix
field_type: String
comments: Prefix for the mqtt client id. The client id will be generated as `client_prefix`_`actor_id`_`(executor_id|source_id)`. Defaults to risingwave
comments: |-
Prefix for the mqtt client id.
The client id will be generated as `client_prefix`_`actor_id`_`(executor_id|source_id)`. Defaults to risingwave
required: false
- name: clean_start
field_type: bool
comments: '`clean_start = true` removes all the state from queues & instructs the broker to clean all the client state when client disconnects. When set `false`, broker will hold the client state and performs pending operations on the client when reconnection with same `client_id` happens. Local queue state is also held to retransmit packets after reconnection.'
comments: |-
`clean_start = true` removes all the state from queues & instructs the broker
to clean all the client state when client disconnects.

When set `false`, broker will hold the client state and performs pending
operations on the client when reconnection with same `client_id`
happens. Local queue state is also held to retransmit packets after reconnection.
required: false
default: Default::default
- name: inflight_messages
Expand All @@ -594,11 +658,15 @@ MqttConfig:
required: false
- name: tls.client_cert
field_type: String
comments: Path to client's certificate file (PEM). Required for client authentication. Can be a file path under fs:// or a string with the certificate content.
comments: |-
Path to client's certificate file (PEM). Required for client authentication.
Can be a file path under fs:// or a string with the certificate content.
required: false
- name: tls.client_key
field_type: String
comments: Path to client's private key file (PEM). Required for client authentication. Can be a file path under fs:// or a string with the private key content.
comments: |-
Path to client's private key file (PEM). Required for client authentication.
Can be a file path under fs:// or a string with the private key content.
required: false
- name: topic
field_type: String
Expand Down Expand Up @@ -757,7 +825,11 @@ SnowflakeConfig:
- s3.bucket_name
- name: snowflake.s3_path
field_type: String
comments: The optional s3 path to be specified the actual file location would be `s3://<s3_bucket>/<s3_path>/<rw_auto_gen_intermediate_file_name>` if this field is specified by user(s) otherwise it would be `s3://<s3_bucket>/<rw_auto_gen_intermediate_file_name>`
comments: |-
The optional s3 path to be specified
the actual file location would be `s3://<s3_bucket>/<s3_path>/<rw_auto_gen_intermediate_file_name>`
if this field is specified by user(s)
otherwise it would be `s3://<s3_bucket>/<rw_auto_gen_intermediate_file_name>`
required: false
alias:
- s3.path
Expand Down Expand Up @@ -847,7 +919,12 @@ StarrocksConfig:
default: 30 * 1000
- name: commit_checkpoint_interval
field_type: u64
comments: Set this option to a positive integer n, RisingWave will try to commit data to Starrocks at every n checkpoints by leveraging the [StreamLoad Transaction API](https://docs.starrocks.io/docs/loading/Stream_Load_transaction_interface/), also, in this time, the `sink_decouple` option should be enabled as well. Defaults to 1 if commit_checkpoint_interval <= 0
comments: |-
Set this option to a positive integer n, RisingWave will try to commit data
to Starrocks at every n checkpoints by leveraging the
[StreamLoad Transaction API](https://docs.starrocks.io/docs/loading/Stream_Load_transaction_interface/),
also, in this time, the `sink_decouple` option should be enabled as well.
Defaults to 1 if commit_checkpoint_interval <= 0
required: false
default: Default::default
- name: starrocks.partial_update
Expand Down
Loading
Loading