Skip to content

Commit

Permalink
feat(sink): implement upsert pulsar sink
Browse files Browse the repository at this point in the history
  • Loading branch information
Rossil2012 committed Sep 15, 2023
1 parent 621ec6d commit c438327
Showing 1 changed file with 147 additions and 15 deletions.
162 changes: 147 additions & 15 deletions src/connector/src/sink/pulsar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::fmt::Debug;
use std::time::Duration;

use anyhow::anyhow;
use futures::future::try_join_all;
use futures::TryFutureExt;
use serde::Deserialize;
use async_trait::async_trait;
use futures_async_stream::for_await;
use pulsar::{Pulsar, TokioExecutor, Producer};
use pulsar::producer::{SendFuture, Message};
use risingwave_common::catalog::Schema;
use risingwave_common::array::StreamChunk;
use risingwave_rpc_client::ConnectorClient;
Expand All @@ -29,19 +32,49 @@ use super::{
Sink, SinkError, SinkWriterParam, SinkWriter, SinkParam,
SINK_TYPE_OPTION, SINK_TYPE_APPEND_ONLY, SINK_TYPE_UPSERT,
};
use super::encoder::{JsonEncoder, TimestampHandlingMode};
use crate::{deserialize_u32_from_string, deserialize_duration_from_string};
use crate::sink::{Result, DummySinkCommitCoordinator};
use crate::sink::utils::{
gen_append_only_message_stream,
AppendOnlyAdapterOpts,
gen_append_only_message_stream, gen_upsert_message_stream,
AppendOnlyAdapterOpts, UpsertAdapterOpts,
};
use crate::common::PulsarCommon;

pub const PULSAR_SINK: &str = "pulsar";

/// The delivery buffer queue size
/// When the `SendFuture` the current `send_future_buffer`
/// is buffering is greater than this size, then enforcing commit once
const PULSAR_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;

const fn _default_max_retries() -> u32 {
3
}

const fn _default_retry_backoff() -> Duration {
Duration::from_millis(100)
}

#[derive(Debug, Clone, Deserialize)]
pub struct PulsarConfig {
#[serde(
rename = "properties.retry.max",
default = "_default_max_retries",
deserialize_with = "deserialize_u32_from_string"
)]
pub max_retry_num: u32,

#[serde(
rename = "properties.retry.interval",
default = "_default_retry_backoff",
deserialize_with = "deserialize_duration_from_string"
)]
pub retry_interval: Duration,

#[serde(flatten)]
pub common: PulsarCommon,

pub r#type: String, // accept "append-only" or "upsert"
}

Expand Down Expand Up @@ -113,9 +146,11 @@ impl Sink for PulsarSink {
pub struct PulsarSinkWriter {
pulsar: Pulsar<TokioExecutor>,
producer: Producer<TokioExecutor>,
config: PulsarConfig,
schema: Schema,
downstream_pk: Vec<usize>,
is_append_only: bool,
send_future_buffer: VecDeque<SendFuture>,
}

impl PulsarSinkWriter {
Expand All @@ -135,36 +170,129 @@ impl PulsarSinkWriter {
Ok(Self {
pulsar,
producer,
config,
schema,
downstream_pk,
is_append_only,
send_future_buffer: VecDeque::new(),
})
}

async fn send_message(&mut self, message: Message) -> Result<()> {
let mut success_flag = false;
let mut connection_err = None;

for _ in 0..self.config.max_retry_num {
match self.producer.send(message.clone()).await {
// If the message is sent successfully,
// a SendFuture holding the message receipt
// or error after sending is returned
Ok(send_future) => {
// Check if send_future_buffer is greater than the preset limit
while self.send_future_buffer.len() >= PULSAR_SEND_FUTURE_BUFFER_MAX_SIZE {
self.send_future_buffer
.pop_front()
.expect("Expect the SendFuture not to be None")
.map_err(|e| SinkError::Pulsar(anyhow!(e)))
.await?;
}

success_flag = true;
self.send_future_buffer.push_back(send_future);
break;
}
// error upon sending
Err(e) => {
match e {
pulsar::Error::Connection(e) => {
connection_err = Some(e);
tokio::time::sleep(self.config.retry_interval).await;
continue;
},
_ => return Err(SinkError::Pulsar(anyhow!(e)))
}
}
}
}

if !success_flag {
Err(SinkError::Pulsar(anyhow!(connection_err.unwrap())))
} else {
Ok(())
}
}

async fn write_json_objects(
&mut self,
event_key_object: Option<serde_json::Value>,
event_object: Option<serde_json::Value>,
) -> Result<()> {
let message = Message {
partition_key: event_key_object.map(|key| key.to_string()),
payload: serde_json::to_vec(&event_object)
.map_err(|e| SinkError::Pulsar(anyhow!(e)))?,
..Default::default()
};

self.send_message(message).await?;
Ok(())
}

async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> {
// TODO: Remove the clones here, only to satisfy borrow checker at present
let schema = self.schema.clone();
let downstream_pk = self.downstream_pk.clone();
let key_encoder =
JsonEncoder::new(&schema, Some(&downstream_pk), TimestampHandlingMode::Milli);
let val_encoder = JsonEncoder::new(&schema, None, TimestampHandlingMode::Milli);

let append_only_stream = gen_append_only_message_stream(
&self.schema,
&self.downstream_pk,
chunk,
AppendOnlyAdapterOpts::default(),
key_encoder,
val_encoder,
);

#[for_await]
for msg in append_only_stream {
let (event_key_object, event_object) = msg?;
tracing::warn!("pulsar sink: {:?} {:?} {:?}", self.downstream_pk,
event_key_object.as_ref().unwrap().to_string(), event_object.as_ref().unwrap().to_string());
self.producer.send(pulsar::producer::Message {
payload: serde_json::to_vec(&event_object).
map_err(|e| SinkError::Pulsar(anyhow!("Pulsar sink error: {}", e)))?,
..Default::default()
}).map_err(|e| SinkError::Pulsar(anyhow!("Pulsar sink error: {}", e))).await?;
self.write_json_objects(event_key_object, event_object).await?;
}
Ok(())
}

async fn upsert(&mut self, chunk: StreamChunk) -> Result<()> {
// TODO: Remove the clones here, only to satisfy borrow checker at present
let schema = self.schema.clone();
let downstream_pk = self.downstream_pk.clone();
let key_encoder =
JsonEncoder::new(&schema, Some(&downstream_pk), TimestampHandlingMode::Milli);
let val_encoder = JsonEncoder::new(&schema, None, TimestampHandlingMode::Milli);

let upsert_stream = gen_upsert_message_stream(
chunk,
UpsertAdapterOpts::default(),
key_encoder,
val_encoder,
);

#[for_await]
for msg in upsert_stream {
let (event_key_object, event_object) = msg?;
self.write_json_objects(event_key_object, event_object).await?;
}
Ok(())
}

async fn upsert(&self, _chunk: StreamChunk) -> Result<()> {
// TODO
async fn commit_inner(&mut self) -> Result<()> {
try_join_all(
self.send_future_buffer
.drain(..)
.map(|send_future| send_future
.map_err(|e| SinkError::Pulsar(anyhow!(e)))
),
).await?;

Ok(())
}
}
Expand All @@ -183,7 +311,11 @@ impl SinkWriter for PulsarSinkWriter {
Ok(())
}

async fn barrier(&mut self, _is_checkpoint: bool) -> Result<Self::CommitMetadata> {
async fn barrier(&mut self, is_checkpoint: bool) -> Result<Self::CommitMetadata> {
if is_checkpoint {
self.commit_inner().await?;
}

Ok(())
}
}

0 comments on commit c438327

Please sign in to comment.