Skip to content

Commit

Permalink
feat: allow customize pulsar subscription name (#18379)
Browse files Browse the repository at this point in the history
Signed-off-by: tabVersion <[email protected]>
  • Loading branch information
tabVersion authored Sep 11, 2024
1 parent 8e7b099 commit 23410f0
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 3 deletions.
3 changes: 2 additions & 1 deletion integration_tests/twitter-pulsar/pb/create_source.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
CREATE SOURCE twitter WITH (
connector = 'pulsar',
pulsar.topic = 'twitter',
pulsar.service.url = 'pulsar://message_queue:6650'
pulsar.service.url = 'pulsar://message_queue:6650',
subscription.name.prefix = 'custom_prefix'
) ROW FORMAT PROTOBUF MESSAGE 'twitter.schema.Event' ROW SCHEMA LOCATION 'http://file_server:8080/schema';
10 changes: 10 additions & 0 deletions src/connector/src/source/pulsar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@ pub struct PulsarProperties {
#[serde(rename = "iceberg.bucket", default)]
pub iceberg_bucket: Option<String>,

/// Specify a custom consumer group id prefix for the source.
/// Defaults to `rw-consumer`.
///
/// Notes:
/// - Each job (materialized view) will have multiple subscriptions and
/// contains a generated suffix in the subscription name.
/// The subscription name will be `{subscription_name_prefix}-{fragment_id}-{actor_id}`.
#[serde(rename = "subscription.name.prefix")]
pub subscription_name_prefix: Option<String>,

#[serde(flatten)]
pub unknown_fields: HashMap<String, String>,
}
10 changes: 8 additions & 2 deletions src/connector/src/source/pulsar/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ use crate::source::{
SplitMetaData, SplitReader,
};

const PULSAR_DEFAULT_SUBSCRIPTION_PREFIX: &str = "rw-consumer";

pub enum PulsarSplitReader {
Broker(PulsarBrokerReader),
Iceberg(PulsarIcebergReader),
Expand Down Expand Up @@ -174,8 +176,12 @@ impl SplitReader for PulsarBrokerReader {
.with_topic(&topic)
.with_subscription_type(SubType::Exclusive)
.with_subscription(format!(
"rw-consumer-{}-{}",
source_ctx.fragment_id, source_ctx.actor_id
"{}-{}-{}",
props
.subscription_name_prefix
.unwrap_or(PULSAR_DEFAULT_SUBSCRIPTION_PREFIX.to_string()),
source_ctx.fragment_id,
source_ctx.actor_id
));

let builder = match split.start_offset.clone() {
Expand Down
11 changes: 11 additions & 0 deletions src/connector/with_options_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1016,6 +1016,17 @@ PulsarProperties:
field_type: String
required: false
default: Default::default
- name: subscription.name.prefix
field_type: String
comments: |-
Specify a custom consumer group id prefix for the source.
Defaults to `rw-consumer`.
Notes:
- Each job (materialized view) will have multiple subscriptions and
contains a generated suffix in the subscription name.
The subscription name will be `{subscription_name_prefix}-{fragment_id}-{actor_id}`.
required: false
S3Properties:
fields:
- name: s3.region_name
Expand Down

0 comments on commit 23410f0

Please sign in to comment.