Skip to content

Commit

Permalink
implement pulsar connection validation
Browse files Browse the repository at this point in the history
  • Loading branch information
Rossil2012 committed Sep 17, 2023
1 parent 7cbcb35 commit 48dbf64
Showing 1 changed file with 20 additions and 7 deletions.
27 changes: 20 additions & 7 deletions src/connector/src/sink/pulsar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,22 @@ const fn _default_retry_backoff() -> Duration {
Duration::from_millis(100)
}

fn pulsar_to_sink_err(e: pulsar::Error) -> SinkError {
SinkError::Pulsar(anyhow!(e))
}

async fn build_pulsar_producer(
pulsar: &Pulsar<TokioExecutor>,
config: &PulsarConfig,
) -> Result<Producer<TokioExecutor>> {
pulsar
.producer()
.with_topic(&config.common.topic)
.build()
.map_err(pulsar_to_sink_err)
.await
}

#[serde_as]
#[derive(Debug, Clone, Deserialize)]
pub struct PulsarConfig {
Expand Down Expand Up @@ -137,7 +153,9 @@ impl Sink for PulsarSink {
)));
}

// TODO: validate pulsar connection
// Validate pulsar connection.
let pulsar = self.config.common.build_client().await?;
build_pulsar_producer(&pulsar, &self.config).await?;

Ok(())
}
Expand All @@ -161,12 +179,7 @@ impl PulsarSinkWriter {
is_append_only: bool,
) -> Result<Self> {
let pulsar = config.common.build_client().await?;
let producer = pulsar
.producer()
.with_topic(&config.common.topic)
.build()
.map_err(|e| SinkError::Pulsar(anyhow!("Pulsar sink error: {}", e)))
.await?;
let producer = build_pulsar_producer(&pulsar, &config).await?;
Ok(Self {
pulsar,
producer,
Expand Down

0 comments on commit 48dbf64

Please sign in to comment.