diff --git a/integration_tests/twitter-pulsar/pb/create_source.sql b/integration_tests/twitter-pulsar/pb/create_source.sql index bf41939b40d91..22c4927ab3bb9 100644 --- a/integration_tests/twitter-pulsar/pb/create_source.sql +++ b/integration_tests/twitter-pulsar/pb/create_source.sql @@ -1,5 +1,6 @@ CREATE SOURCE twitter WITH ( connector = 'pulsar', pulsar.topic = 'twitter', - pulsar.service.url = 'pulsar://message_queue:6650' + pulsar.service.url = 'pulsar://message_queue:6650', + subscription.name.prefix = 'custom_prefix' ) ROW FORMAT PROTOBUF MESSAGE 'twitter.schema.Event' ROW SCHEMA LOCATION 'http://file_server:8080/schema'; \ No newline at end of file diff --git a/src/connector/src/source/pulsar/mod.rs b/src/connector/src/source/pulsar/mod.rs index 5d6d111b13bff..ffbc3be495bf9 100644 --- a/src/connector/src/source/pulsar/mod.rs +++ b/src/connector/src/source/pulsar/mod.rs @@ -74,6 +74,16 @@ pub struct PulsarProperties { #[serde(rename = "iceberg.bucket", default)] pub iceberg_bucket: Option, + /// Specify a custom consumer group id prefix for the source. + /// Defaults to `rw-consumer`. + /// + /// Notes: + /// - Each job (materialized view) will have multiple subscriptions and + /// contains a generated suffix in the subscription name. + /// The subscription name will be `{subscription_name_prefix}-{fragment_id}-{actor_id}`. + #[serde(rename = "subscription.name.prefix")] + pub subscription_name_prefix: Option, + #[serde(flatten)] pub unknown_fields: HashMap, } diff --git a/src/connector/src/source/pulsar/source/reader.rs b/src/connector/src/source/pulsar/source/reader.rs index 212c459388b25..20f6872474e88 100644 --- a/src/connector/src/source/pulsar/source/reader.rs +++ b/src/connector/src/source/pulsar/source/reader.rs @@ -42,6 +42,8 @@ use crate::source::{ SplitMetaData, SplitReader, }; +const PULSAR_DEFAULT_SUBSCRIPTION_PREFIX: &str = "rw-consumer"; + pub enum PulsarSplitReader { Broker(PulsarBrokerReader), Iceberg(PulsarIcebergReader), @@ -174,8 +176,12 @@ impl SplitReader for PulsarBrokerReader { .with_topic(&topic) .with_subscription_type(SubType::Exclusive) .with_subscription(format!( - "rw-consumer-{}-{}", - source_ctx.fragment_id, source_ctx.actor_id + "{}-{}-{}", + props + .subscription_name_prefix + .unwrap_or(PULSAR_DEFAULT_SUBSCRIPTION_PREFIX.to_string()), + source_ctx.fragment_id, + source_ctx.actor_id )); let builder = match split.start_offset.clone() { diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index 695a2aeaa1c14..c54dce97ad1cd 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -1016,6 +1016,17 @@ PulsarProperties: field_type: String required: false default: Default::default + - name: subscription.name.prefix + field_type: String + comments: |- + Specify a custom consumer group id prefix for the source. + Defaults to `rw-consumer`. + + Notes: + - Each job (materialized view) will have multiple subscriptions and + contains a generated suffix in the subscription name. + The subscription name will be `{subscription_name_prefix}-{fragment_id}-{actor_id}`. + required: false S3Properties: fields: - name: s3.region_name