diff --git a/src/connector/src/common.rs b/src/connector/src/common.rs index 4e2142fda0ad..a0be76d7a837 100644 --- a/src/connector/src/common.rs +++ b/src/connector/src/common.rs @@ -14,21 +14,28 @@ use std::borrow::Cow; use std::collections::HashMap; +use std::io::Write; use std::time::Duration; -use anyhow::Ok; +use anyhow::{anyhow, Ok}; use async_nats::jetstream::consumer::DeliverPolicy; use async_nats::jetstream::{self}; use aws_sdk_kinesis::Client as KinesisClient; use clickhouse::Client; +use pulsar::authentication::oauth2::{OAuth2Authentication, OAuth2Params}; +use pulsar::{Authentication, Pulsar, TokioExecutor}; use rdkafka::ClientConfig; -use risingwave_common::error::anyhow_error; +use risingwave_common::error::ErrorCode::InvalidParameterValue; +use risingwave_common::error::{anyhow_error, RwError}; use serde_derive::{Deserialize, Serialize}; use serde_with::json::JsonString; use serde_with::{serde_as, DisplayFromStr}; +use tempfile::NamedTempFile; use time::OffsetDateTime; +use url::Url; use crate::aws_auth::AwsAuthProps; +use crate::aws_utils::load_file_descriptor_from_s3; use crate::deserialize_duration_from_string; use crate::sink::SinkError; use crate::source::nats::source::NatsOffset; @@ -245,6 +252,106 @@ impl KafkaCommon { } } +#[derive(Clone, Debug, Deserialize)] +pub struct PulsarCommon { + #[serde(rename = "topic", alias = "pulsar.topic")] + pub topic: String, + + #[serde(rename = "service.url", alias = "pulsar.service.url")] + pub service_url: String, + + #[serde(rename = "auth.token")] + pub auth_token: Option, + + #[serde(flatten)] + pub oauth: Option, +} + +#[derive(Clone, Debug, Deserialize)] +pub struct PulsarOauthCommon { + #[serde(rename = "oauth.issuer.url")] + pub issuer_url: String, + + #[serde(rename = "oauth.credentials.url")] + pub credentials_url: String, + + #[serde(rename = "oauth.audience")] + pub audience: String, + + #[serde(rename = "oauth.scope")] + pub scope: Option, + + #[serde(flatten)] + /// required keys refer to [`crate::aws_utils::AWS_DEFAULT_CONFIG`] + pub s3_credentials: HashMap, +} + +impl PulsarCommon { + pub(crate) async fn build_client(&self) -> anyhow::Result> { + let mut pulsar_builder = Pulsar::builder(&self.service_url, TokioExecutor); + let mut temp_file = None; + if let Some(oauth) = &self.oauth { + let url = Url::parse(&oauth.credentials_url)?; + match url.scheme() { + "s3" => { + let credentials = load_file_descriptor_from_s3( + &url, + &AwsAuthProps::from_pairs( + oauth + .s3_credentials + .iter() + .map(|(k, v)| (k.as_str(), v.as_str())), + ), + ) + .await?; + let mut f = NamedTempFile::new()?; + f.write_all(&credentials)?; + f.as_file().sync_all()?; + temp_file = Some(f); + } + "file" => {} + _ => { + return Err(RwError::from(InvalidParameterValue(String::from( + "invalid credentials_url, only file url and s3 url are supported", + ))) + .into()); + } + } + + let auth_params = OAuth2Params { + issuer_url: oauth.issuer_url.clone(), + credentials_url: if temp_file.is_none() { + oauth.credentials_url.clone() + } else { + let mut raw_path = temp_file + .as_ref() + .unwrap() + .path() + .to_str() + .unwrap() + .to_string(); + raw_path.insert_str(0, "file://"); + raw_path + }, + audience: Some(oauth.audience.clone()), + scope: oauth.scope.clone(), + }; + + pulsar_builder = pulsar_builder + .with_auth_provider(OAuth2Authentication::client_credentials(auth_params)); + } else if let Some(auth_token) = &self.auth_token { + pulsar_builder = pulsar_builder.with_auth(Authentication { + name: "token".to_string(), + data: Vec::from(auth_token.as_str()), + }); + } + + let res = pulsar_builder.build().await.map_err(|e| anyhow!(e))?; + drop(temp_file); + Ok(res) + } +} + #[derive(Deserialize, Serialize, Debug, Clone)] pub struct KinesisCommon { #[serde(rename = "stream", alias = "kinesis.stream.name")] diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index b8fba4681319..9588e00aa655 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -22,6 +22,7 @@ pub mod iceberg; pub mod kafka; pub mod kinesis; pub mod nats; +pub mod pulsar; pub mod redis; pub mod remote; #[cfg(any(test, madsim))] @@ -51,6 +52,7 @@ use self::clickhouse::{ClickHouseConfig, ClickHouseSink}; use self::encoder::SerTo; use self::formatter::SinkFormatter; use self::iceberg::{IcebergSink, ICEBERG_SINK, REMOTE_ICEBERG_SINK}; +use self::pulsar::{PulsarConfig, PulsarSink}; use crate::sink::boxed::BoxSink; use crate::sink::catalog::{SinkCatalog, SinkId}; use crate::sink::clickhouse::CLICKHOUSE_SINK; @@ -58,6 +60,7 @@ use crate::sink::iceberg::{IcebergConfig, RemoteIcebergConfig, RemoteIcebergSink use crate::sink::kafka::{KafkaConfig, KafkaSink, KAFKA_SINK}; use crate::sink::kinesis::{KinesisSink, KinesisSinkConfig, KINESIS_SINK}; use crate::sink::nats::{NatsConfig, NatsSink, NATS_SINK}; +use crate::sink::pulsar::PULSAR_SINK; use crate::sink::redis::{RedisConfig, RedisSink}; use crate::sink::remote::{CoordinatedRemoteSink, RemoteConfig, RemoteSink}; #[cfg(any(test, madsim))] @@ -321,6 +324,7 @@ pub enum SinkConfig { Kinesis(Box), Iceberg(IcebergConfig), RemoteIceberg(RemoteIcebergConfig), + Pulsar(PulsarConfig), BlackHole, ClickHouse(Box), Nats(NatsConfig), @@ -386,6 +390,7 @@ impl SinkConfig { ClickHouseConfig::from_hashmap(properties)?, ))), BLACKHOLE_SINK => Ok(SinkConfig::BlackHole), + PULSAR_SINK => Ok(SinkConfig::Pulsar(PulsarConfig::from_hashmap(properties)?)), REMOTE_ICEBERG_SINK => Ok(SinkConfig::RemoteIceberg( RemoteIcebergConfig::from_hashmap(properties)?, )), @@ -411,6 +416,7 @@ pub enum SinkImpl { Redis(RedisSink), Kafka(KafkaSink), Remote(RemoteSink), + Pulsar(PulsarSink), BlackHole(BlackHoleSink), Kinesis(KinesisSink), ClickHouse(ClickHouseSink), @@ -426,6 +432,7 @@ impl SinkImpl { SinkImpl::Kafka(_) => "kafka", SinkImpl::Redis(_) => "redis", SinkImpl::Remote(_) => "remote", + SinkImpl::Pulsar(_) => "pulsar", SinkImpl::BlackHole(_) => "blackhole", SinkImpl::Kinesis(_) => "kinesis", SinkImpl::ClickHouse(_) => "clickhouse", @@ -446,6 +453,7 @@ macro_rules! dispatch_sink { SinkImpl::Redis($sink) => $body, SinkImpl::Kafka($sink) => $body, SinkImpl::Remote($sink) => $body, + SinkImpl::Pulsar($sink) => $body, SinkImpl::BlackHole($sink) => $body, SinkImpl::Kinesis($sink) => $body, SinkImpl::ClickHouse($sink) => $body, @@ -464,6 +472,7 @@ impl SinkImpl { SinkConfig::Kafka(cfg) => SinkImpl::Kafka(KafkaSink::new(*cfg, param)), SinkConfig::Kinesis(cfg) => SinkImpl::Kinesis(KinesisSink::new(*cfg, param)), SinkConfig::Remote(cfg) => SinkImpl::Remote(RemoteSink::new(cfg, param)), + SinkConfig::Pulsar(cfg) => SinkImpl::Pulsar(PulsarSink::new(cfg, param)), SinkConfig::BlackHole => SinkImpl::BlackHole(BlackHoleSink), SinkConfig::ClickHouse(cfg) => SinkImpl::ClickHouse(ClickHouseSink::new( *cfg, @@ -508,6 +517,8 @@ pub enum SinkError { ClickHouse(String), #[error("Nats error: {0}")] Nats(anyhow::Error), + #[error("Pulsar error: {0}")] + Pulsar(anyhow::Error), } impl From for SinkError { diff --git a/src/connector/src/sink/pulsar.rs b/src/connector/src/sink/pulsar.rs new file mode 100644 index 000000000000..306fc2046b71 --- /dev/null +++ b/src/connector/src/sink/pulsar.rs @@ -0,0 +1,354 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, VecDeque}; +use std::fmt::Debug; +use std::time::Duration; + +use anyhow::anyhow; +use async_trait::async_trait; +use futures::future::try_join_all; +use futures::TryFutureExt; +use pulsar::producer::{Message, SendFuture}; +use pulsar::{Producer, ProducerOptions, Pulsar, TokioExecutor}; +use risingwave_common::array::StreamChunk; +use risingwave_common::catalog::Schema; +use risingwave_rpc_client::ConnectorClient; +use serde::Deserialize; +use serde_with::{serde_as, DisplayFromStr}; + +use super::encoder::{JsonEncoder, TimestampHandlingMode}; +use super::formatter::{AppendOnlyFormatter, UpsertFormatter}; +use super::{ + FormattedSink, Sink, SinkError, SinkParam, SinkWriter, SinkWriterParam, SINK_TYPE_APPEND_ONLY, + SINK_TYPE_OPTION, SINK_TYPE_UPSERT, +}; +use crate::common::PulsarCommon; +use crate::deserialize_duration_from_string; +use crate::sink::{DummySinkCommitCoordinator, Result}; + +pub const PULSAR_SINK: &str = "pulsar"; + +/// The delivery buffer queue size +/// When the `SendFuture` the current `send_future_buffer` +/// is buffering is greater than this size, then enforcing commit once +const PULSAR_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536; + +const fn _default_max_retries() -> u32 { + 3 +} + +const fn _default_retry_backoff() -> Duration { + Duration::from_millis(100) +} + +const fn _default_batch_size() -> u32 { + 10000 +} + +const fn _default_batch_byte_size() -> usize { + 1 << 20 +} + +fn pulsar_to_sink_err(e: pulsar::Error) -> SinkError { + SinkError::Pulsar(anyhow!(e)) +} + +async fn build_pulsar_producer( + pulsar: &Pulsar, + config: &PulsarConfig, +) -> Result> { + pulsar + .producer() + .with_options(ProducerOptions { + batch_size: Some(config.producer_properties.batch_size), + batch_byte_size: Some(config.producer_properties.batch_byte_size), + ..Default::default() + }) + .with_topic(&config.common.topic) + .build() + .map_err(pulsar_to_sink_err) + .await +} + +#[serde_as] +#[derive(Debug, Clone, Deserialize)] +pub struct PulsarPropertiesProducer { + #[serde(rename = "properties.batch.size", default = "_default_batch_size")] + #[serde_as(as = "DisplayFromStr")] + batch_size: u32, + + #[serde( + rename = "properties.batch.byte.size", + default = "_default_batch_byte_size" + )] + #[serde_as(as = "DisplayFromStr")] + batch_byte_size: usize, +} + +#[serde_as] +#[derive(Debug, Clone, Deserialize)] +pub struct PulsarConfig { + #[serde(rename = "properties.retry.max", default = "_default_max_retries")] + #[serde_as(as = "DisplayFromStr")] + pub max_retry_num: u32, + + #[serde( + rename = "properties.retry.interval", + default = "_default_retry_backoff", + deserialize_with = "deserialize_duration_from_string" + )] + pub retry_interval: Duration, + + #[serde(flatten)] + pub common: PulsarCommon, + + #[serde(flatten)] + pub producer_properties: PulsarPropertiesProducer, + + pub r#type: String, // accept "append-only" or "upsert" +} + +impl PulsarConfig { + pub fn from_hashmap(values: HashMap) -> Result { + let config = serde_json::from_value::(serde_json::to_value(values).unwrap()) + .map_err(|e| SinkError::Config(anyhow!(e)))?; + + if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT { + return Err(SinkError::Config(anyhow!( + "`{}` must be {}, or {}", + SINK_TYPE_OPTION, + SINK_TYPE_APPEND_ONLY, + SINK_TYPE_UPSERT + ))); + } + Ok(config) + } +} + +#[derive(Debug)] +pub struct PulsarSink { + pub config: PulsarConfig, + schema: Schema, + downstream_pk: Vec, + is_append_only: bool, +} + +impl PulsarSink { + pub fn new(config: PulsarConfig, param: SinkParam) -> Self { + Self { + config, + schema: param.schema(), + downstream_pk: param.downstream_pk, + is_append_only: param.sink_type.is_append_only(), + } + } +} + +#[async_trait] +impl Sink for PulsarSink { + type Coordinator = DummySinkCommitCoordinator; + type Writer = PulsarSinkWriter; + + async fn new_writer(&self, _writer_param: SinkWriterParam) -> Result { + PulsarSinkWriter::new( + self.config.clone(), + self.schema.clone(), + self.downstream_pk.clone(), + self.is_append_only, + ) + .await + } + + async fn validate(&self, _client: Option) -> Result<()> { + // For upsert Pulsar sink, the primary key must be defined. + if !self.is_append_only && self.downstream_pk.is_empty() { + return Err(SinkError::Config(anyhow!( + "primary key not defined for {} pulsar sink (please define in `primary_key` field)", + self.config.r#type + ))); + } + + // Validate pulsar connection. + let pulsar = self.config.common.build_client().await?; + build_pulsar_producer(&pulsar, &self.config).await?; + + Ok(()) + } +} + +pub struct PulsarSinkWriter { + pulsar: Pulsar, + producer: Producer, + config: PulsarConfig, + schema: Schema, + downstream_pk: Vec, + is_append_only: bool, + send_future_buffer: VecDeque, +} + +impl PulsarSinkWriter { + pub async fn new( + config: PulsarConfig, + schema: Schema, + downstream_pk: Vec, + is_append_only: bool, + ) -> Result { + let pulsar = config.common.build_client().await?; + let producer = build_pulsar_producer(&pulsar, &config).await?; + Ok(Self { + pulsar, + producer, + config, + schema, + downstream_pk, + is_append_only, + send_future_buffer: VecDeque::new(), + }) + } + + async fn send_message(&mut self, message: Message) -> Result<()> { + let mut success_flag = false; + let mut connection_err = None; + + for _ in 0..self.config.max_retry_num { + match self.producer.send(message.clone()).await { + // If the message is sent successfully, + // a SendFuture holding the message receipt + // or error after sending is returned + Ok(send_future) => { + // Check if send_future_buffer is greater than the preset limit + while self.send_future_buffer.len() >= PULSAR_SEND_FUTURE_BUFFER_MAX_SIZE { + self.send_future_buffer + .pop_front() + .expect("Expect the SendFuture not to be None") + .map_err(|e| SinkError::Pulsar(anyhow!(e))) + .await?; + } + + success_flag = true; + self.send_future_buffer.push_back(send_future); + break; + } + // error upon sending + Err(e) => match e { + pulsar::Error::Connection(_) + | pulsar::Error::Producer(_) + | pulsar::Error::Consumer(_) => { + connection_err = Some(e); + tokio::time::sleep(self.config.retry_interval).await; + continue; + } + _ => return Err(SinkError::Pulsar(anyhow!(e))), + }, + } + } + + if !success_flag { + Err(SinkError::Pulsar(anyhow!(connection_err.unwrap()))) + } else { + Ok(()) + } + } + + async fn write_inner( + &mut self, + event_key_object: Option, + event_object: Option>, + ) -> Result<()> { + let message = Message { + partition_key: event_key_object, + payload: event_object.unwrap_or_default(), + ..Default::default() + }; + + self.send_message(message).await?; + Ok(()) + } + + async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> { + // TODO: Remove the clones here, only to satisfy borrow checker at present + let schema = self.schema.clone(); + let downstream_pk = self.downstream_pk.clone(); + let key_encoder = + JsonEncoder::new(&schema, Some(&downstream_pk), TimestampHandlingMode::Milli); + let val_encoder = JsonEncoder::new(&schema, None, TimestampHandlingMode::Milli); + + // Initialize the append_only_stream + let f = AppendOnlyFormatter::new(key_encoder, val_encoder); + + self.write_chunk(chunk, f).await + } + + async fn upsert(&mut self, chunk: StreamChunk) -> Result<()> { + // TODO: Remove the clones here, only to satisfy borrow checker at present + let schema = self.schema.clone(); + let downstream_pk = self.downstream_pk.clone(); + let key_encoder = + JsonEncoder::new(&schema, Some(&downstream_pk), TimestampHandlingMode::Milli); + let val_encoder = JsonEncoder::new(&schema, None, TimestampHandlingMode::Milli); + + // Initialize the upsert_stream + let f = UpsertFormatter::new(key_encoder, val_encoder); + + self.write_chunk(chunk, f).await + } + + async fn commit_inner(&mut self) -> Result<()> { + self.producer + .send_batch() + .map_err(pulsar_to_sink_err) + .await?; + try_join_all( + self.send_future_buffer + .drain(..) + .map(|send_future| send_future.map_err(|e| SinkError::Pulsar(anyhow!(e)))), + ) + .await?; + + Ok(()) + } +} + +impl FormattedSink for PulsarSinkWriter { + type K = String; + type V = Vec; + + async fn write_one(&mut self, k: Option, v: Option) -> Result<()> { + self.write_inner(k, v).await + } +} + +#[async_trait] +impl SinkWriter for PulsarSinkWriter { + async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { + if self.is_append_only { + self.append_only(chunk).await + } else { + self.upsert(chunk).await + } + } + + async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> { + Ok(()) + } + + async fn barrier(&mut self, is_checkpoint: bool) -> Result { + if is_checkpoint { + self.commit_inner().await?; + } + + Ok(()) + } +} diff --git a/src/connector/src/source/pulsar/enumerator/client.rs b/src/connector/src/source/pulsar/enumerator/client.rs index e7ec559cf91b..32362fb156f4 100644 --- a/src/connector/src/source/pulsar/enumerator/client.rs +++ b/src/connector/src/source/pulsar/enumerator/client.rs @@ -46,8 +46,8 @@ impl SplitEnumerator for PulsarSplitEnumerator { properties: PulsarProperties, _context: SourceEnumeratorContextRef, ) -> Result { - let pulsar = properties.build_pulsar_client().await?; - let topic = properties.topic; + let pulsar = properties.common.build_client().await?; + let topic = properties.common.topic; let parsed_topic = parse_topic(&topic)?; let mut scan_start_offset = match properties diff --git a/src/connector/src/source/pulsar/mod.rs b/src/connector/src/source/pulsar/mod.rs index 1dbcdf2e7bfb..544d1b7fb3ed 100644 --- a/src/connector/src/source/pulsar/mod.rs +++ b/src/connector/src/source/pulsar/mod.rs @@ -17,137 +17,32 @@ pub mod source; pub mod split; pub mod topic; -use std::collections::HashMap; -use std::io::Write; - -use anyhow::{anyhow, Result}; pub use enumerator::*; -use pulsar::authentication::oauth2::{OAuth2Authentication, OAuth2Params}; -use pulsar::{Authentication, Pulsar, TokioExecutor}; -use risingwave_common::error::ErrorCode::InvalidParameterValue; -use risingwave_common::error::RwError; use serde::Deserialize; pub use split::*; -use tempfile::NamedTempFile; -use url::Url; -use crate::aws_auth::AwsAuthProps; -use crate::aws_utils::load_file_descriptor_from_s3; +use crate::common::PulsarCommon; use crate::source::pulsar::source::reader::PulsarSplitReader; use crate::source::SourceProperties; pub const PULSAR_CONNECTOR: &str = "pulsar"; -#[derive(Clone, Debug, Deserialize)] -pub struct PulsarOauth { - #[serde(rename = "oauth.issuer.url")] - pub issuer_url: String, - - #[serde(rename = "oauth.credentials.url")] - pub credentials_url: String, - - #[serde(rename = "oauth.audience")] - pub audience: String, - - #[serde(rename = "oauth.scope")] - pub scope: Option, +impl SourceProperties for PulsarProperties { + type Split = PulsarSplit; + type SplitEnumerator = PulsarSplitEnumerator; + type SplitReader = PulsarSplitReader; - #[serde(flatten)] - /// required keys refer to [`crate::aws_utils::AWS_DEFAULT_CONFIG`] - pub s3_credentials: HashMap, + const SOURCE_NAME: &'static str = PULSAR_CONNECTOR; } #[derive(Clone, Debug, Deserialize)] pub struct PulsarProperties { - #[serde(rename = "topic", alias = "pulsar.topic")] - pub topic: String, - - #[serde(rename = "service.url", alias = "pulsar.service.url")] - pub service_url: String, - #[serde(rename = "scan.startup.mode", alias = "pulsar.scan.startup.mode")] pub scan_startup_mode: Option, #[serde(rename = "scan.startup.timestamp_millis", alias = "pulsar.time.offset")] pub time_offset: Option, - #[serde(rename = "auth.token")] - pub auth_token: Option, - #[serde(flatten)] - pub oauth: Option, -} - -impl SourceProperties for PulsarProperties { - type Split = PulsarSplit; - type SplitEnumerator = PulsarSplitEnumerator; - type SplitReader = PulsarSplitReader; - - const SOURCE_NAME: &'static str = PULSAR_CONNECTOR; -} - -impl PulsarProperties { - pub async fn build_pulsar_client(&self) -> Result> { - let mut pulsar_builder = Pulsar::builder(&self.service_url, TokioExecutor); - let mut temp_file = None; - if let Some(oauth) = &self.oauth { - let url = Url::parse(&oauth.credentials_url)?; - match url.scheme() { - "s3" => { - let credentials = load_file_descriptor_from_s3( - &url, - &AwsAuthProps::from_pairs( - oauth - .s3_credentials - .iter() - .map(|(k, v)| (k.as_str(), v.as_str())), - ), - ) - .await?; - let mut f = NamedTempFile::new()?; - f.write_all(&credentials)?; - f.as_file().sync_all()?; - temp_file = Some(f); - } - "file" => {} - _ => { - return Err(RwError::from(InvalidParameterValue(String::from( - "invalid credentials_url, only file url and s3 url are supported", - ))) - .into()); - } - } - - let auth_params = OAuth2Params { - issuer_url: oauth.issuer_url.clone(), - credentials_url: if temp_file.is_none() { - oauth.credentials_url.clone() - } else { - let mut raw_path = temp_file - .as_ref() - .unwrap() - .path() - .to_str() - .unwrap() - .to_string(); - raw_path.insert_str(0, "file://"); - raw_path - }, - audience: Some(oauth.audience.clone()), - scope: oauth.scope.clone(), - }; - - pulsar_builder = pulsar_builder - .with_auth_provider(OAuth2Authentication::client_credentials(auth_params)); - } else if let Some(auth_token) = &self.auth_token { - pulsar_builder = pulsar_builder.with_auth(Authentication { - name: "token".to_string(), - data: Vec::from(auth_token.as_str()), - }); - } - - let res = pulsar_builder.build().await.map_err(|e| anyhow!(e))?; - drop(temp_file); - Ok(res) - } + pub common: PulsarCommon, } diff --git a/src/connector/src/source/pulsar/source/reader.rs b/src/connector/src/source/pulsar/source/reader.rs index db6ccfedd726..85d85a8d1871 100644 --- a/src/connector/src/source/pulsar/source/reader.rs +++ b/src/connector/src/source/pulsar/source/reader.rs @@ -97,7 +97,7 @@ impl SplitReader for PulsarSplitReader { ) -> Result { ensure!(splits.len() == 1, "only support single split"); let split = splits.into_iter().next().unwrap(); - let pulsar = props.build_pulsar_client().await?; + let pulsar = props.common.build_client().await?; let topic = split.topic.to_string(); tracing::debug!("creating consumer for pulsar split topic {}", topic,);