Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow customize pulsar subscription name #18379

Merged
merged 4 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion integration_tests/twitter-pulsar/pb/create_source.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
CREATE SOURCE twitter WITH (
connector = 'pulsar',
pulsar.topic = 'twitter',
pulsar.service.url = 'pulsar://message_queue:6650'
pulsar.service.url = 'pulsar://message_queue:6650',
subscription.name.prefix = 'custom_prefix'
) ROW FORMAT PROTOBUF MESSAGE 'twitter.schema.Event' ROW SCHEMA LOCATION 'http://file_server:8080/schema';
10 changes: 10 additions & 0 deletions src/connector/src/source/pulsar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@ pub struct PulsarProperties {
#[serde(rename = "iceberg.bucket", default)]
pub iceberg_bucket: Option<String>,

/// Specify a custom consumer group id prefix for the source.
/// Defaults to `rw-consumer`.
///
/// Notes:
/// - Each job (materialized view) will have multiple subscriptions and
/// contains a generated suffix in the subscription name.
/// The subscription name will be `{subscription_name_prefix}-{fragment_id}-{actor_id}`.
#[serde(rename = "subscription.name.prefix")]
pub subscription_name_prefix: Option<String>,
tabVersion marked this conversation as resolved.
Show resolved Hide resolved

#[serde(flatten)]
pub unknown_fields: HashMap<String, String>,
}
10 changes: 8 additions & 2 deletions src/connector/src/source/pulsar/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ use crate::source::{
SplitMetaData, SplitReader,
};

const PULSAR_DEFAULT_SUBSCRIPTION_PREFIX: &str = "rw-consumer";

pub enum PulsarSplitReader {
Broker(PulsarBrokerReader),
Iceberg(PulsarIcebergReader),
Expand Down Expand Up @@ -174,8 +176,12 @@ impl SplitReader for PulsarBrokerReader {
.with_topic(&topic)
.with_subscription_type(SubType::Exclusive)
.with_subscription(format!(
"rw-consumer-{}-{}",
source_ctx.fragment_id, source_ctx.actor_id
"{}-{}-{}",
props
.subscription_name_prefix
.unwrap_or(PULSAR_DEFAULT_SUBSCRIPTION_PREFIX.to_string()),
source_ctx.fragment_id,
source_ctx.actor_id
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is actor_id needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we use exclusive subscription here, which mean a partition can only be consumed by one subscription. So actor-id is essential to avoid collision.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 pulsar split corresponds to 1 topic. In one source (i.e., one fragment), each actor will be assigned to different topics. So actor_id is not necessary. There's no collision.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 pulsar split corresponds to 1 topic. In one source (i.e., one fragment), each actor will be assigned to different topics.

I dont get the logic

Copy link
Member

@xxchan xxchan Sep 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assignment looks like this. Where's the collision? Correct me if I'm wrong

  • fragment 6 (topic t)
    • actor 1001 - topic t-partition-1, subscription rw-consumer-6
    • actor 1002 - topic t-partition-2, subscription rw-consumer-6
    • actor 1003 - topic t-partition-3, subscription rw-consumer-6
    • ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am afraid things not work in this way

The exclusive type is a subscription type that only allows a single consumer to attach to the subscription. If multiple consumers subscribe to a topic using the same subscription, an error occurs. Note that if the topic is partitioned, all partitions will be consumed by the single consumer allowed to be connected to the subscription. source

We cannot attach multiple consumers to one subscription in exclusive mode. Since we want to manage the data parallelism, ie. partitions, in RisingWave, we have to separate the consumers from each parallelism into different subscriptions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe each subscription belongs to a different (sub)topic... They are not "one subscription"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the example above, rw-consumer-6 are 3 different subscriptions

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already separated the consumers by subscribing to different sub topics. It's not by separate them into different subscriptions..

));

let builder = match split.start_offset.clone() {
Expand Down
11 changes: 11 additions & 0 deletions src/connector/with_options_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,17 @@ PulsarProperties:
field_type: String
required: false
default: Default::default
- name: subscription.name.prefix
field_type: String
comments: |-
Specify a custom consumer group id prefix for the source.
Defaults to `rw-consumer`.

Notes:
- Each job (materialized view) will have multiple subscriptions and
contains a generated suffix in the subscription name.
The subscription name will be `{subscription_name_prefix}-{fragment_id}-{actor_id}`.
required: false
S3Properties:
fields:
- name: s3.region_name
Expand Down
Loading