Skip to content

Commit

Permalink
feat(sink): async truncate for kinesis, pulsar, nats, redis and click…
Browse files Browse the repository at this point in the history
…house sink (#12930)
  • Loading branch information
wenym1 authored Oct 25, 2023
1 parent 4fcde99 commit d6bd676
Show file tree
Hide file tree
Showing 10 changed files with 284 additions and 229 deletions.
3 changes: 3 additions & 0 deletions src/connector/src/sink/blackhole.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use async_trait::async_trait;

use crate::sink::log_store::{LogReader, LogStoreReadItem, TruncateOffset};
use crate::sink::{
DummySinkCommitCoordinator, LogSinker, Result, Sink, SinkError, SinkParam, SinkWriterParam,
Expand Down Expand Up @@ -45,6 +47,7 @@ impl Sink for BlackHoleSink {
}
}

#[async_trait]
impl LogSinker for BlackHoleSink {
async fn consume_log_and_sink(self, mut log_reader: impl LogReader) -> Result<()> {
log_reader.init().await?;
Expand Down
34 changes: 18 additions & 16 deletions src/connector/src/sink/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ use serde_derive::Deserialize;
use serde_with::serde_as;

use super::{DummySinkCommitCoordinator, SinkWriterParam};
use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt};
use crate::sink::catalog::desc::SinkDesc;
use crate::sink::log_store::DeliveryFutureManagerAddFuture;
use crate::sink::writer::{
AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt,
};
use crate::sink::{
Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
};
Expand Down Expand Up @@ -243,10 +247,14 @@ impl ClickHouseSink {
}
impl Sink for ClickHouseSink {
type Coordinator = DummySinkCommitCoordinator;
type LogSinker = LogSinkerOf<ClickHouseSinkWriter>;
type LogSinker = AsyncTruncateLogSinkerOf<ClickHouseSinkWriter>;

const SINK_NAME: &'static str = CLICKHOUSE_SINK;

fn default_sink_decouple(desc: &SinkDesc) -> bool {
desc.sink_type.is_append_only()
}

async fn validate(&self) -> Result<()> {
// For upsert clickhouse sink, the primary key must be defined.
if !self.is_append_only && self.pk_indices.is_empty() {
Expand Down Expand Up @@ -277,15 +285,15 @@ impl Sink for ClickHouseSink {
Ok(())
}

async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
Ok(ClickHouseSinkWriter::new(
self.config.clone(),
self.schema.clone(),
self.pk_indices.clone(),
self.is_append_only,
)
.await?
.into_log_sinker(writer_param.sink_metrics))
.into_log_sinker(usize::MAX))
}
}
pub struct ClickHouseSinkWriter {
Expand Down Expand Up @@ -496,24 +504,18 @@ impl ClickHouseSinkWriter {
}
}

#[async_trait::async_trait]
impl SinkWriter for ClickHouseSinkWriter {
async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
impl AsyncTruncateSinkWriter for ClickHouseSinkWriter {
async fn write_chunk<'a>(
&'a mut self,
chunk: StreamChunk,
_add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
) -> Result<()> {
if self.is_append_only {
self.append_only(chunk).await
} else {
self.upsert(chunk).await
}
}

async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
// clickhouse no transactional guarantees, so we do nothing here.
Ok(())
}

async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> {
Ok(())
}
}

#[derive(ClickHouseRow, Deserialize, Clone)]
Expand Down
116 changes: 47 additions & 69 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,18 @@

use std::collections::HashMap;
use std::fmt::Debug;
use std::pin::pin;
use std::sync::Arc;
use std::time::Duration;

use anyhow::anyhow;
use futures::future::{select, Either};
use futures::{Future, FutureExt, TryFuture};
use rdkafka::error::KafkaError;
use rdkafka::message::ToBytes;
use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord};
use rdkafka::types::RDKafkaErrorCode;
use rdkafka::ClientConfig;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::util::drop_either_future;
use serde_derive::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use strum_macros::{Display, EnumString};
Expand All @@ -37,11 +35,11 @@ use super::{Sink, SinkError, SinkParam};
use crate::common::KafkaCommon;
use crate::sink::catalog::desc::SinkDesc;
use crate::sink::formatter::SinkFormatterImpl;
use crate::sink::log_store::{
DeliveryFutureManager, DeliveryFutureManagerAddFuture, LogReader, LogStoreReadItem,
use crate::sink::log_store::DeliveryFutureManagerAddFuture;
use crate::sink::writer::{
AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink,
};
use crate::sink::writer::FormattedSink;
use crate::sink::{DummySinkCommitCoordinator, LogSinker, Result, SinkWriterParam};
use crate::sink::{DummySinkCommitCoordinator, Result, SinkWriterParam};
use crate::source::kafka::{KafkaProperties, KafkaSplitEnumerator, PrivateLinkProducerContext};
use crate::source::{SourceEnumeratorContext, SplitEnumerator};
use crate::{
Expand Down Expand Up @@ -299,7 +297,7 @@ impl TryFrom<SinkParam> for KafkaSink {

impl Sink for KafkaSink {
type Coordinator = DummySinkCommitCoordinator;
type LogSinker = KafkaLogSinker;
type LogSinker = AsyncTruncateLogSinkerOf<KafkaSinkWriter>;

const SINK_NAME: &'static str = KAFKA_SINK;

Expand All @@ -316,7 +314,18 @@ impl Sink for KafkaSink {
self.sink_from_name.clone(),
)
.await?;
KafkaLogSinker::new(self.config.clone(), formatter).await
let max_delivery_buffer_size = (self
.config
.rdkafka_properties
.queue_buffering_max_messages
.as_ref()
.cloned()
.unwrap_or(KAFKA_WRITER_MAX_QUEUE_SIZE) as f32
* KAFKA_WRITER_MAX_QUEUE_SIZE_RATIO) as usize;

Ok(KafkaSinkWriter::new(self.config.clone(), formatter)
.await?
.into_log_sinker(max_delivery_buffer_size))
}

async fn validate(&self) -> Result<()> {
Expand Down Expand Up @@ -370,16 +379,15 @@ struct KafkaPayloadWriter<'a> {
config: &'a KafkaConfig,
}

type KafkaSinkDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
pub type KafkaSinkDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;

pub struct KafkaLogSinker {
pub struct KafkaSinkWriter {
formatter: SinkFormatterImpl,
inner: FutureProducer<PrivateLinkProducerContext>,
future_manager: DeliveryFutureManager<KafkaSinkDeliveryFuture>,
config: KafkaConfig,
}

impl KafkaLogSinker {
impl KafkaSinkWriter {
async fn new(config: KafkaConfig, formatter: SinkFormatterImpl) -> Result<Self> {
let inner: FutureProducer<PrivateLinkProducerContext> = {
let mut c = ClientConfig::new();
Expand All @@ -403,19 +411,29 @@ impl KafkaLogSinker {
c.create_with_context(producer_ctx).await?
};

let max_delivery_buffer_size = (config
.rdkafka_properties
.queue_buffering_max_messages
.as_ref()
.cloned()
.unwrap_or(KAFKA_WRITER_MAX_QUEUE_SIZE) as f32
* KAFKA_WRITER_MAX_QUEUE_SIZE_RATIO) as usize;

Ok(KafkaLogSinker {
Ok(KafkaSinkWriter {
formatter,
inner,
config: config.clone(),
future_manager: DeliveryFutureManager::new(max_delivery_buffer_size),
})
}
}

impl AsyncTruncateSinkWriter for KafkaSinkWriter {
type DeliveryFuture = KafkaSinkDeliveryFuture;

async fn write_chunk<'a>(
&'a mut self,
chunk: StreamChunk,
add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
) -> Result<()> {
let mut payload_writer = KafkaPayloadWriter {
inner: &mut self.inner,
add_future,
config: &self.config,
};
dispatch_sink_formatter_impl!(&self.formatter, formatter, {
payload_writer.write_chunk(chunk, formatter).await
})
}
}
Expand Down Expand Up @@ -537,50 +555,6 @@ impl<'a> FormattedSink for KafkaPayloadWriter<'a> {
}
}

impl LogSinker for KafkaLogSinker {
async fn consume_log_and_sink(mut self, mut log_reader: impl LogReader) -> Result<()> {
log_reader.init().await?;
loop {
let select_result = drop_either_future(
select(
pin!(log_reader.next_item()),
pin!(self.future_manager.next_truncate_offset()),
)
.await,
);
match select_result {
Either::Left(item_result) => {
let (epoch, item) = item_result?;
match item {
LogStoreReadItem::StreamChunk { chunk_id, chunk } => {
dispatch_sink_formatter_impl!(&self.formatter, formatter, {
let mut writer = KafkaPayloadWriter {
inner: &self.inner,
add_future: self
.future_manager
.start_write_chunk(epoch, chunk_id),
config: &self.config,
};
writer.write_chunk(chunk, formatter).await?;
})
}
LogStoreReadItem::Barrier {
is_checkpoint: _is_checkpoint,
} => {
self.future_manager.add_barrier(epoch);
}
LogStoreReadItem::UpdateVnodeBitmap(_) => {}
}
}
Either::Right(offset_result) => {
let offset = offset_result?;
log_reader.truncate(offset).await?;
}
}
}
}
}

#[cfg(test)]
mod test {
use maplit::hashmap;
Expand Down Expand Up @@ -748,7 +722,7 @@ mod test {
let kafka_config = KafkaConfig::from_hashmap(properties)?;

// Create the actual sink writer to Kafka
let mut sink = KafkaLogSinker::new(
let sink = KafkaSinkWriter::new(
kafka_config.clone(),
SinkFormatterImpl::AppendOnlyJson(AppendOnlyFormatter::new(
// We do not specify primary key for this schema
Expand All @@ -759,12 +733,16 @@ mod test {
.await
.unwrap();

use crate::sink::log_store::DeliveryFutureManager;

let mut future_manager = DeliveryFutureManager::new(usize::MAX);

for i in 0..10 {
println!("epoch: {}", i);
for j in 0..100 {
let mut writer = KafkaPayloadWriter {
inner: &sink.inner,
add_future: sink.future_manager.start_write_chunk(i, j),
add_future: future_manager.start_write_chunk(i, j),
config: &sink.config,
};
match writer
Expand Down
40 changes: 22 additions & 18 deletions src/connector/src/sink/kinesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,12 @@ use super::catalog::SinkFormatDesc;
use super::SinkParam;
use crate::common::KinesisCommon;
use crate::dispatch_sink_formatter_impl;
use crate::sink::catalog::desc::SinkDesc;
use crate::sink::formatter::SinkFormatterImpl;
use crate::sink::writer::{FormattedSink, LogSinkerOf, SinkWriter, SinkWriterExt};
use crate::sink::log_store::DeliveryFutureManagerAddFuture;
use crate::sink::writer::{
AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink,
};
use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkError, SinkWriterParam};

pub const KINESIS_SINK: &str = "kinesis";
Expand Down Expand Up @@ -67,10 +71,14 @@ impl TryFrom<SinkParam> for KinesisSink {

impl Sink for KinesisSink {
type Coordinator = DummySinkCommitCoordinator;
type LogSinker = LogSinkerOf<KinesisSinkWriter>;
type LogSinker = AsyncTruncateLogSinkerOf<KinesisSinkWriter>;

const SINK_NAME: &'static str = KINESIS_SINK;

fn default_sink_decouple(desc: &SinkDesc) -> bool {
desc.sink_type.is_append_only()
}

async fn validate(&self) -> Result<()> {
// Kinesis requires partition key. There is no builtin support for round-robin as in kafka/pulsar.
// https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#Streams-PutRecord-request-PartitionKey
Expand Down Expand Up @@ -103,7 +111,7 @@ impl Sink for KinesisSink {
Ok(())
}

async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
Ok(KinesisSinkWriter::new(
self.config.clone(),
self.schema.clone(),
Expand All @@ -113,7 +121,7 @@ impl Sink for KinesisSink {
self.sink_from_name.clone(),
)
.await?
.into_log_sinker(writer_param.sink_metrics))
.into_log_sinker(usize::MAX))
}
}

Expand Down Expand Up @@ -214,20 +222,16 @@ impl FormattedSink for KinesisSinkPayloadWriter {
}
}

#[async_trait::async_trait]
impl SinkWriter for KinesisSinkWriter {
async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
dispatch_sink_formatter_impl!(&self.formatter, formatter, {
impl AsyncTruncateSinkWriter for KinesisSinkWriter {
async fn write_chunk<'a>(
&'a mut self,
chunk: StreamChunk,
_add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
) -> Result<()> {
dispatch_sink_formatter_impl!(
&self.formatter,
formatter,
self.payload_writer.write_chunk(chunk, formatter).await
})
}

async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
// Kinesis offers no transactional guarantees, so we do nothing here.
Ok(())
}

async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> {
Ok(())
)
}
}
Loading

0 comments on commit d6bd676

Please sign in to comment.