Skip to content

Commit

Permalink
feat(sink): implement pulsar sink (#12286)
Browse files Browse the repository at this point in the history
  • Loading branch information
Rossil2012 authored Sep 19, 2023
1 parent a789c61 commit aa5e798
Show file tree
Hide file tree
Showing 6 changed files with 484 additions and 117 deletions.
111 changes: 109 additions & 2 deletions src/connector/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,28 @@

use std::borrow::Cow;
use std::collections::HashMap;
use std::io::Write;
use std::time::Duration;

use anyhow::Ok;
use anyhow::{anyhow, Ok};
use async_nats::jetstream::consumer::DeliverPolicy;
use async_nats::jetstream::{self};
use aws_sdk_kinesis::Client as KinesisClient;
use clickhouse::Client;
use pulsar::authentication::oauth2::{OAuth2Authentication, OAuth2Params};
use pulsar::{Authentication, Pulsar, TokioExecutor};
use rdkafka::ClientConfig;
use risingwave_common::error::anyhow_error;
use risingwave_common::error::ErrorCode::InvalidParameterValue;
use risingwave_common::error::{anyhow_error, RwError};
use serde_derive::{Deserialize, Serialize};
use serde_with::json::JsonString;
use serde_with::{serde_as, DisplayFromStr};
use tempfile::NamedTempFile;
use time::OffsetDateTime;
use url::Url;

use crate::aws_auth::AwsAuthProps;
use crate::aws_utils::load_file_descriptor_from_s3;
use crate::deserialize_duration_from_string;
use crate::sink::SinkError;
use crate::source::nats::source::NatsOffset;
Expand Down Expand Up @@ -245,6 +252,106 @@ impl KafkaCommon {
}
}

#[derive(Clone, Debug, Deserialize)]
pub struct PulsarCommon {
#[serde(rename = "topic", alias = "pulsar.topic")]
pub topic: String,

#[serde(rename = "service.url", alias = "pulsar.service.url")]
pub service_url: String,

#[serde(rename = "auth.token")]
pub auth_token: Option<String>,

#[serde(flatten)]
pub oauth: Option<PulsarOauthCommon>,
}

#[derive(Clone, Debug, Deserialize)]
pub struct PulsarOauthCommon {
#[serde(rename = "oauth.issuer.url")]
pub issuer_url: String,

#[serde(rename = "oauth.credentials.url")]
pub credentials_url: String,

#[serde(rename = "oauth.audience")]
pub audience: String,

#[serde(rename = "oauth.scope")]
pub scope: Option<String>,

#[serde(flatten)]
/// required keys refer to [`crate::aws_utils::AWS_DEFAULT_CONFIG`]
pub s3_credentials: HashMap<String, String>,
}

impl PulsarCommon {
pub(crate) async fn build_client(&self) -> anyhow::Result<Pulsar<TokioExecutor>> {
let mut pulsar_builder = Pulsar::builder(&self.service_url, TokioExecutor);
let mut temp_file = None;
if let Some(oauth) = &self.oauth {
let url = Url::parse(&oauth.credentials_url)?;
match url.scheme() {
"s3" => {
let credentials = load_file_descriptor_from_s3(
&url,
&AwsAuthProps::from_pairs(
oauth
.s3_credentials
.iter()
.map(|(k, v)| (k.as_str(), v.as_str())),
),
)
.await?;
let mut f = NamedTempFile::new()?;
f.write_all(&credentials)?;
f.as_file().sync_all()?;
temp_file = Some(f);
}
"file" => {}
_ => {
return Err(RwError::from(InvalidParameterValue(String::from(
"invalid credentials_url, only file url and s3 url are supported",
)))
.into());
}
}

let auth_params = OAuth2Params {
issuer_url: oauth.issuer_url.clone(),
credentials_url: if temp_file.is_none() {
oauth.credentials_url.clone()
} else {
let mut raw_path = temp_file
.as_ref()
.unwrap()
.path()
.to_str()
.unwrap()
.to_string();
raw_path.insert_str(0, "file://");
raw_path
},
audience: Some(oauth.audience.clone()),
scope: oauth.scope.clone(),
};

pulsar_builder = pulsar_builder
.with_auth_provider(OAuth2Authentication::client_credentials(auth_params));
} else if let Some(auth_token) = &self.auth_token {
pulsar_builder = pulsar_builder.with_auth(Authentication {
name: "token".to_string(),
data: Vec::from(auth_token.as_str()),
});
}

let res = pulsar_builder.build().await.map_err(|e| anyhow!(e))?;
drop(temp_file);
Ok(res)
}
}

#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct KinesisCommon {
#[serde(rename = "stream", alias = "kinesis.stream.name")]
Expand Down
11 changes: 11 additions & 0 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod iceberg;
pub mod kafka;
pub mod kinesis;
pub mod nats;
pub mod pulsar;
pub mod redis;
pub mod remote;
#[cfg(any(test, madsim))]
Expand Down Expand Up @@ -51,13 +52,15 @@ use self::clickhouse::{ClickHouseConfig, ClickHouseSink};
use self::encoder::SerTo;
use self::formatter::SinkFormatter;
use self::iceberg::{IcebergSink, ICEBERG_SINK, REMOTE_ICEBERG_SINK};
use self::pulsar::{PulsarConfig, PulsarSink};
use crate::sink::boxed::BoxSink;
use crate::sink::catalog::{SinkCatalog, SinkId};
use crate::sink::clickhouse::CLICKHOUSE_SINK;
use crate::sink::iceberg::{IcebergConfig, RemoteIcebergConfig, RemoteIcebergSink};
use crate::sink::kafka::{KafkaConfig, KafkaSink, KAFKA_SINK};
use crate::sink::kinesis::{KinesisSink, KinesisSinkConfig, KINESIS_SINK};
use crate::sink::nats::{NatsConfig, NatsSink, NATS_SINK};
use crate::sink::pulsar::PULSAR_SINK;
use crate::sink::redis::{RedisConfig, RedisSink};
use crate::sink::remote::{CoordinatedRemoteSink, RemoteConfig, RemoteSink};
#[cfg(any(test, madsim))]
Expand Down Expand Up @@ -321,6 +324,7 @@ pub enum SinkConfig {
Kinesis(Box<KinesisSinkConfig>),
Iceberg(IcebergConfig),
RemoteIceberg(RemoteIcebergConfig),
Pulsar(PulsarConfig),
BlackHole,
ClickHouse(Box<ClickHouseConfig>),
Nats(NatsConfig),
Expand Down Expand Up @@ -386,6 +390,7 @@ impl SinkConfig {
ClickHouseConfig::from_hashmap(properties)?,
))),
BLACKHOLE_SINK => Ok(SinkConfig::BlackHole),
PULSAR_SINK => Ok(SinkConfig::Pulsar(PulsarConfig::from_hashmap(properties)?)),
REMOTE_ICEBERG_SINK => Ok(SinkConfig::RemoteIceberg(
RemoteIcebergConfig::from_hashmap(properties)?,
)),
Expand All @@ -411,6 +416,7 @@ pub enum SinkImpl {
Redis(RedisSink),
Kafka(KafkaSink),
Remote(RemoteSink),
Pulsar(PulsarSink),
BlackHole(BlackHoleSink),
Kinesis(KinesisSink),
ClickHouse(ClickHouseSink),
Expand All @@ -426,6 +432,7 @@ impl SinkImpl {
SinkImpl::Kafka(_) => "kafka",
SinkImpl::Redis(_) => "redis",
SinkImpl::Remote(_) => "remote",
SinkImpl::Pulsar(_) => "pulsar",
SinkImpl::BlackHole(_) => "blackhole",
SinkImpl::Kinesis(_) => "kinesis",
SinkImpl::ClickHouse(_) => "clickhouse",
Expand All @@ -446,6 +453,7 @@ macro_rules! dispatch_sink {
SinkImpl::Redis($sink) => $body,
SinkImpl::Kafka($sink) => $body,
SinkImpl::Remote($sink) => $body,
SinkImpl::Pulsar($sink) => $body,
SinkImpl::BlackHole($sink) => $body,
SinkImpl::Kinesis($sink) => $body,
SinkImpl::ClickHouse($sink) => $body,
Expand All @@ -464,6 +472,7 @@ impl SinkImpl {
SinkConfig::Kafka(cfg) => SinkImpl::Kafka(KafkaSink::new(*cfg, param)),
SinkConfig::Kinesis(cfg) => SinkImpl::Kinesis(KinesisSink::new(*cfg, param)),
SinkConfig::Remote(cfg) => SinkImpl::Remote(RemoteSink::new(cfg, param)),
SinkConfig::Pulsar(cfg) => SinkImpl::Pulsar(PulsarSink::new(cfg, param)),
SinkConfig::BlackHole => SinkImpl::BlackHole(BlackHoleSink),
SinkConfig::ClickHouse(cfg) => SinkImpl::ClickHouse(ClickHouseSink::new(
*cfg,
Expand Down Expand Up @@ -508,6 +517,8 @@ pub enum SinkError {
ClickHouse(String),
#[error("Nats error: {0}")]
Nats(anyhow::Error),
#[error("Pulsar error: {0}")]
Pulsar(anyhow::Error),
}

impl From<RpcError> for SinkError {
Expand Down
Loading

0 comments on commit aa5e798

Please sign in to comment.