From 48dbf64d44f10c543b7a8e079f9e0ba689ccb58c Mon Sep 17 00:00:00 2001 From: Rossil <40714231+Rossil2012@users.noreply.github.com> Date: Sun, 17 Sep 2023 23:19:08 +0800 Subject: [PATCH] implement pulsar connection validation --- src/connector/src/sink/pulsar.rs | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/src/connector/src/sink/pulsar.rs b/src/connector/src/sink/pulsar.rs index 462688ec1f512..a4171bcc5dd13 100644 --- a/src/connector/src/sink/pulsar.rs +++ b/src/connector/src/sink/pulsar.rs @@ -57,6 +57,22 @@ const fn _default_retry_backoff() -> Duration { Duration::from_millis(100) } +fn pulsar_to_sink_err(e: pulsar::Error) -> SinkError { + SinkError::Pulsar(anyhow!(e)) +} + +async fn build_pulsar_producer( + pulsar: &Pulsar, + config: &PulsarConfig, +) -> Result> { + pulsar + .producer() + .with_topic(&config.common.topic) + .build() + .map_err(pulsar_to_sink_err) + .await +} + #[serde_as] #[derive(Debug, Clone, Deserialize)] pub struct PulsarConfig { @@ -137,7 +153,9 @@ impl Sink for PulsarSink { ))); } - // TODO: validate pulsar connection + // Validate pulsar connection. + let pulsar = self.config.common.build_client().await?; + build_pulsar_producer(&pulsar, &self.config).await?; Ok(()) } @@ -161,12 +179,7 @@ impl PulsarSinkWriter { is_append_only: bool, ) -> Result { let pulsar = config.common.build_client().await?; - let producer = pulsar - .producer() - .with_topic(&config.common.topic) - .build() - .map_err(|e| SinkError::Pulsar(anyhow!("Pulsar sink error: {}", e))) - .await?; + let producer = build_pulsar_producer(&pulsar, &config).await?; Ok(Self { pulsar, producer,