diff --git a/src/connector/src/sink/blackhole.rs b/src/connector/src/sink/blackhole.rs index 1f1ace3b0d104..60b0506604c97 100644 --- a/src/connector/src/sink/blackhole.rs +++ b/src/connector/src/sink/blackhole.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use async_trait::async_trait; + use crate::sink::log_store::{LogReader, LogStoreReadItem, TruncateOffset}; use crate::sink::{ DummySinkCommitCoordinator, LogSinker, Result, Sink, SinkError, SinkParam, SinkWriterParam, @@ -45,6 +47,7 @@ impl Sink for BlackHoleSink { } } +#[async_trait] impl LogSinker for BlackHoleSink { async fn consume_log_and_sink(self, mut log_reader: impl LogReader) -> Result<()> { log_reader.init().await?; diff --git a/src/connector/src/sink/clickhouse.rs b/src/connector/src/sink/clickhouse.rs index 2bddf8026216f..f4fdf9b761f38 100644 --- a/src/connector/src/sink/clickhouse.rs +++ b/src/connector/src/sink/clickhouse.rs @@ -29,7 +29,11 @@ use serde_derive::Deserialize; use serde_with::serde_as; use super::{DummySinkCommitCoordinator, SinkWriterParam}; -use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt}; +use crate::sink::catalog::desc::SinkDesc; +use crate::sink::log_store::DeliveryFutureManagerAddFuture; +use crate::sink::writer::{ + AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, +}; use crate::sink::{ Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, }; @@ -243,10 +247,14 @@ impl ClickHouseSink { } impl Sink for ClickHouseSink { type Coordinator = DummySinkCommitCoordinator; - type LogSinker = LogSinkerOf; + type LogSinker = AsyncTruncateLogSinkerOf; const SINK_NAME: &'static str = CLICKHOUSE_SINK; + fn default_sink_decouple(desc: &SinkDesc) -> bool { + desc.sink_type.is_append_only() + } + async fn validate(&self) -> Result<()> { // For upsert clickhouse sink, the primary key must be defined. if !self.is_append_only && self.pk_indices.is_empty() { @@ -277,7 +285,7 @@ impl Sink for ClickHouseSink { Ok(()) } - async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { + async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result { Ok(ClickHouseSinkWriter::new( self.config.clone(), self.schema.clone(), @@ -285,7 +293,7 @@ impl Sink for ClickHouseSink { self.is_append_only, ) .await? - .into_log_sinker(writer_param.sink_metrics)) + .into_log_sinker(usize::MAX)) } } pub struct ClickHouseSinkWriter { @@ -496,24 +504,18 @@ impl ClickHouseSinkWriter { } } -#[async_trait::async_trait] -impl SinkWriter for ClickHouseSinkWriter { - async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { +impl AsyncTruncateSinkWriter for ClickHouseSinkWriter { + async fn write_chunk<'a>( + &'a mut self, + chunk: StreamChunk, + _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, + ) -> Result<()> { if self.is_append_only { self.append_only(chunk).await } else { self.upsert(chunk).await } } - - async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> { - // clickhouse no transactional guarantees, so we do nothing here. - Ok(()) - } - - async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> { - Ok(()) - } } #[derive(ClickHouseRow, Deserialize, Clone)] diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index a204a8d121706..f77b2b0a88c36 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -14,20 +14,18 @@ use std::collections::HashMap; use std::fmt::Debug; -use std::pin::pin; use std::sync::Arc; use std::time::Duration; use anyhow::anyhow; -use futures::future::{select, Either}; use futures::{Future, FutureExt, TryFuture}; use rdkafka::error::KafkaError; use rdkafka::message::ToBytes; use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord}; use rdkafka::types::RDKafkaErrorCode; use rdkafka::ClientConfig; +use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; -use risingwave_common::util::drop_either_future; use serde_derive::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; use strum_macros::{Display, EnumString}; @@ -37,11 +35,11 @@ use super::{Sink, SinkError, SinkParam}; use crate::common::KafkaCommon; use crate::sink::catalog::desc::SinkDesc; use crate::sink::formatter::SinkFormatterImpl; -use crate::sink::log_store::{ - DeliveryFutureManager, DeliveryFutureManagerAddFuture, LogReader, LogStoreReadItem, +use crate::sink::log_store::DeliveryFutureManagerAddFuture; +use crate::sink::writer::{ + AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink, }; -use crate::sink::writer::FormattedSink; -use crate::sink::{DummySinkCommitCoordinator, LogSinker, Result, SinkWriterParam}; +use crate::sink::{DummySinkCommitCoordinator, Result, SinkWriterParam}; use crate::source::kafka::{KafkaProperties, KafkaSplitEnumerator, PrivateLinkProducerContext}; use crate::source::{SourceEnumeratorContext, SplitEnumerator}; use crate::{ @@ -299,7 +297,7 @@ impl TryFrom for KafkaSink { impl Sink for KafkaSink { type Coordinator = DummySinkCommitCoordinator; - type LogSinker = KafkaLogSinker; + type LogSinker = AsyncTruncateLogSinkerOf; const SINK_NAME: &'static str = KAFKA_SINK; @@ -316,7 +314,18 @@ impl Sink for KafkaSink { self.sink_from_name.clone(), ) .await?; - KafkaLogSinker::new(self.config.clone(), formatter).await + let max_delivery_buffer_size = (self + .config + .rdkafka_properties + .queue_buffering_max_messages + .as_ref() + .cloned() + .unwrap_or(KAFKA_WRITER_MAX_QUEUE_SIZE) as f32 + * KAFKA_WRITER_MAX_QUEUE_SIZE_RATIO) as usize; + + Ok(KafkaSinkWriter::new(self.config.clone(), formatter) + .await? + .into_log_sinker(max_delivery_buffer_size)) } async fn validate(&self) -> Result<()> { @@ -370,16 +379,15 @@ struct KafkaPayloadWriter<'a> { config: &'a KafkaConfig, } -type KafkaSinkDeliveryFuture = impl TryFuture + Unpin + 'static; +pub type KafkaSinkDeliveryFuture = impl TryFuture + Unpin + 'static; -pub struct KafkaLogSinker { +pub struct KafkaSinkWriter { formatter: SinkFormatterImpl, inner: FutureProducer, - future_manager: DeliveryFutureManager, config: KafkaConfig, } -impl KafkaLogSinker { +impl KafkaSinkWriter { async fn new(config: KafkaConfig, formatter: SinkFormatterImpl) -> Result { let inner: FutureProducer = { let mut c = ClientConfig::new(); @@ -403,19 +411,29 @@ impl KafkaLogSinker { c.create_with_context(producer_ctx).await? }; - let max_delivery_buffer_size = (config - .rdkafka_properties - .queue_buffering_max_messages - .as_ref() - .cloned() - .unwrap_or(KAFKA_WRITER_MAX_QUEUE_SIZE) as f32 - * KAFKA_WRITER_MAX_QUEUE_SIZE_RATIO) as usize; - - Ok(KafkaLogSinker { + Ok(KafkaSinkWriter { formatter, inner, config: config.clone(), - future_manager: DeliveryFutureManager::new(max_delivery_buffer_size), + }) + } +} + +impl AsyncTruncateSinkWriter for KafkaSinkWriter { + type DeliveryFuture = KafkaSinkDeliveryFuture; + + async fn write_chunk<'a>( + &'a mut self, + chunk: StreamChunk, + add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, + ) -> Result<()> { + let mut payload_writer = KafkaPayloadWriter { + inner: &mut self.inner, + add_future, + config: &self.config, + }; + dispatch_sink_formatter_impl!(&self.formatter, formatter, { + payload_writer.write_chunk(chunk, formatter).await }) } } @@ -537,50 +555,6 @@ impl<'a> FormattedSink for KafkaPayloadWriter<'a> { } } -impl LogSinker for KafkaLogSinker { - async fn consume_log_and_sink(mut self, mut log_reader: impl LogReader) -> Result<()> { - log_reader.init().await?; - loop { - let select_result = drop_either_future( - select( - pin!(log_reader.next_item()), - pin!(self.future_manager.next_truncate_offset()), - ) - .await, - ); - match select_result { - Either::Left(item_result) => { - let (epoch, item) = item_result?; - match item { - LogStoreReadItem::StreamChunk { chunk_id, chunk } => { - dispatch_sink_formatter_impl!(&self.formatter, formatter, { - let mut writer = KafkaPayloadWriter { - inner: &self.inner, - add_future: self - .future_manager - .start_write_chunk(epoch, chunk_id), - config: &self.config, - }; - writer.write_chunk(chunk, formatter).await?; - }) - } - LogStoreReadItem::Barrier { - is_checkpoint: _is_checkpoint, - } => { - self.future_manager.add_barrier(epoch); - } - LogStoreReadItem::UpdateVnodeBitmap(_) => {} - } - } - Either::Right(offset_result) => { - let offset = offset_result?; - log_reader.truncate(offset).await?; - } - } - } - } -} - #[cfg(test)] mod test { use maplit::hashmap; @@ -748,7 +722,7 @@ mod test { let kafka_config = KafkaConfig::from_hashmap(properties)?; // Create the actual sink writer to Kafka - let mut sink = KafkaLogSinker::new( + let sink = KafkaSinkWriter::new( kafka_config.clone(), SinkFormatterImpl::AppendOnlyJson(AppendOnlyFormatter::new( // We do not specify primary key for this schema @@ -759,12 +733,16 @@ mod test { .await .unwrap(); + use crate::sink::log_store::DeliveryFutureManager; + + let mut future_manager = DeliveryFutureManager::new(usize::MAX); + for i in 0..10 { println!("epoch: {}", i); for j in 0..100 { let mut writer = KafkaPayloadWriter { inner: &sink.inner, - add_future: sink.future_manager.start_write_chunk(i, j), + add_future: future_manager.start_write_chunk(i, j), config: &sink.config, }; match writer diff --git a/src/connector/src/sink/kinesis.rs b/src/connector/src/sink/kinesis.rs index dd8518af39948..605edde3b1eb0 100644 --- a/src/connector/src/sink/kinesis.rs +++ b/src/connector/src/sink/kinesis.rs @@ -30,8 +30,12 @@ use super::catalog::SinkFormatDesc; use super::SinkParam; use crate::common::KinesisCommon; use crate::dispatch_sink_formatter_impl; +use crate::sink::catalog::desc::SinkDesc; use crate::sink::formatter::SinkFormatterImpl; -use crate::sink::writer::{FormattedSink, LogSinkerOf, SinkWriter, SinkWriterExt}; +use crate::sink::log_store::DeliveryFutureManagerAddFuture; +use crate::sink::writer::{ + AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink, +}; use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkError, SinkWriterParam}; pub const KINESIS_SINK: &str = "kinesis"; @@ -67,10 +71,14 @@ impl TryFrom for KinesisSink { impl Sink for KinesisSink { type Coordinator = DummySinkCommitCoordinator; - type LogSinker = LogSinkerOf; + type LogSinker = AsyncTruncateLogSinkerOf; const SINK_NAME: &'static str = KINESIS_SINK; + fn default_sink_decouple(desc: &SinkDesc) -> bool { + desc.sink_type.is_append_only() + } + async fn validate(&self) -> Result<()> { // Kinesis requires partition key. There is no builtin support for round-robin as in kafka/pulsar. // https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#Streams-PutRecord-request-PartitionKey @@ -103,7 +111,7 @@ impl Sink for KinesisSink { Ok(()) } - async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { + async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result { Ok(KinesisSinkWriter::new( self.config.clone(), self.schema.clone(), @@ -113,7 +121,7 @@ impl Sink for KinesisSink { self.sink_from_name.clone(), ) .await? - .into_log_sinker(writer_param.sink_metrics)) + .into_log_sinker(usize::MAX)) } } @@ -214,20 +222,16 @@ impl FormattedSink for KinesisSinkPayloadWriter { } } -#[async_trait::async_trait] -impl SinkWriter for KinesisSinkWriter { - async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { - dispatch_sink_formatter_impl!(&self.formatter, formatter, { +impl AsyncTruncateSinkWriter for KinesisSinkWriter { + async fn write_chunk<'a>( + &'a mut self, + chunk: StreamChunk, + _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, + ) -> Result<()> { + dispatch_sink_formatter_impl!( + &self.formatter, + formatter, self.payload_writer.write_chunk(chunk, formatter).await - }) - } - - async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> { - // Kinesis offers no transactional guarantees, so we do nothing here. - Ok(()) - } - - async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> { - Ok(()) + ) } } diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 7769a87f4e715..6afd08778cd96 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -34,7 +34,6 @@ pub mod utils; pub mod writer; use std::collections::HashMap; -use std::future::Future; use ::clickhouse::error::Error as ClickHouseError; use ::redis::RedisError; @@ -278,11 +277,9 @@ pub trait Sink: TryFrom { } } -pub trait LogSinker: Send + 'static { - fn consume_log_and_sink( - self, - log_reader: impl LogReader, - ) -> impl Future> + Send + 'static; +#[async_trait] +pub trait LogSinker: 'static { + async fn consume_log_and_sink(self, log_reader: impl LogReader) -> Result<()>; } #[async_trait] diff --git a/src/connector/src/sink/nats.rs b/src/connector/src/sink/nats.rs index 8e3f3e2c18022..2f810eed786a9 100644 --- a/src/connector/src/sink/nats.rs +++ b/src/connector/src/sink/nats.rs @@ -25,10 +25,14 @@ use tokio_retry::strategy::{jitter, ExponentialBackoff}; use tokio_retry::Retry; use super::utils::chunk_to_json; -use super::{DummySinkCommitCoordinator, SinkWriter, SinkWriterParam}; +use super::{DummySinkCommitCoordinator, SinkWriterParam}; use crate::common::NatsCommon; +use crate::sink::catalog::desc::SinkDesc; use crate::sink::encoder::{JsonEncoder, TimestampHandlingMode}; -use crate::sink::writer::{LogSinkerOf, SinkWriterExt}; +use crate::sink::log_store::DeliveryFutureManagerAddFuture; +use crate::sink::writer::{ + AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, +}; use crate::sink::{Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY}; pub const NATS_SINK: &str = "nats"; @@ -88,10 +92,14 @@ impl TryFrom for NatsSink { impl Sink for NatsSink { type Coordinator = DummySinkCommitCoordinator; - type LogSinker = LogSinkerOf; + type LogSinker = AsyncTruncateLogSinkerOf; const SINK_NAME: &'static str = NATS_SINK; + fn default_sink_decouple(desc: &SinkDesc) -> bool { + desc.sink_type.is_append_only() + } + async fn validate(&self) -> Result<()> { if !self.is_append_only { return Err(SinkError::Nats(anyhow!( @@ -110,11 +118,11 @@ impl Sink for NatsSink { Ok(()) } - async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { + async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result { Ok( NatsSinkWriter::new(self.config.clone(), self.schema.clone()) .await? - .into_log_sinker(writer_param.sink_metrics), + .into_log_sinker(usize::MAX), ) } } @@ -153,17 +161,12 @@ impl NatsSinkWriter { } } -#[async_trait::async_trait] -impl SinkWriter for NatsSinkWriter { - async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { +impl AsyncTruncateSinkWriter for NatsSinkWriter { + async fn write_chunk<'a>( + &'a mut self, + chunk: StreamChunk, + _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, + ) -> Result<()> { self.append_only(chunk).await } - - async fn begin_epoch(&mut self, _epoch_id: u64) -> Result<()> { - Ok(()) - } - - async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> { - Ok(()) - } } diff --git a/src/connector/src/sink/pulsar.rs b/src/connector/src/sink/pulsar.rs index f980b2ad9f9b1..9eb57c1ae0771 100644 --- a/src/connector/src/sink/pulsar.rs +++ b/src/connector/src/sink/pulsar.rs @@ -12,14 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, VecDeque}; +use std::collections::HashMap; use std::fmt::Debug; use std::time::Duration; use anyhow::anyhow; -use async_trait::async_trait; -use futures::future::try_join_all; -use futures::TryFutureExt; +use futures::{FutureExt, TryFuture, TryFutureExt}; use pulsar::producer::{Message, SendFuture}; use pulsar::{Producer, ProducerOptions, Pulsar, TokioExecutor}; use risingwave_common::array::StreamChunk; @@ -28,10 +26,15 @@ use serde::Deserialize; use serde_with::{serde_as, DisplayFromStr}; use super::catalog::{SinkFormat, SinkFormatDesc}; -use super::{Sink, SinkError, SinkParam, SinkWriter, SinkWriterParam}; +use super::{Sink, SinkError, SinkParam, SinkWriterParam}; use crate::common::PulsarCommon; -use crate::sink::formatter::SinkFormatterImpl; -use crate::sink::writer::{FormattedSink, LogSinkerOf, SinkWriterExt}; +use crate::sink::catalog::desc::SinkDesc; +use crate::sink::encoder::SerTo; +use crate::sink::formatter::{SinkFormatter, SinkFormatterImpl}; +use crate::sink::log_store::DeliveryFutureManagerAddFuture; +use crate::sink::writer::{ + AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink, +}; use crate::sink::{DummySinkCommitCoordinator, Result}; use crate::{deserialize_duration_from_string, dispatch_sink_formatter_impl}; @@ -155,11 +158,15 @@ impl TryFrom for PulsarSink { impl Sink for PulsarSink { type Coordinator = DummySinkCommitCoordinator; - type LogSinker = LogSinkerOf; + type LogSinker = AsyncTruncateLogSinkerOf; const SINK_NAME: &'static str = PULSAR_SINK; - async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { + fn default_sink_decouple(desc: &SinkDesc) -> bool { + desc.sink_type.is_append_only() + } + + async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result { Ok(PulsarSinkWriter::new( self.config.clone(), self.schema.clone(), @@ -169,7 +176,7 @@ impl Sink for PulsarSink { self.sink_from_name.clone(), ) .await? - .into_log_sinker(writer_param.sink_metrics)) + .into_log_sinker(PULSAR_SEND_FUTURE_BUFFER_MAX_SIZE)) } async fn validate(&self) -> Result<()> { @@ -199,15 +206,26 @@ impl Sink for PulsarSink { } pub struct PulsarSinkWriter { - payload_writer: PulsarPayloadWriter, formatter: SinkFormatterImpl, -} - -struct PulsarPayloadWriter { pulsar: Pulsar, producer: Producer, config: PulsarConfig, - send_future_buffer: VecDeque, +} + +struct PulsarPayloadWriter<'w> { + producer: &'w mut Producer, + config: &'w PulsarConfig, + add_future: DeliveryFutureManagerAddFuture<'w, PulsarDeliveryFuture>, +} + +pub type PulsarDeliveryFuture = impl TryFuture + Unpin + 'static; + +fn may_delivery_future(future: SendFuture) -> PulsarDeliveryFuture { + future.map(|result| { + result + .map(|_| ()) + .map_err(|e: pulsar::Error| SinkError::Pulsar(anyhow!(e))) + }) } impl PulsarSinkWriter { @@ -226,17 +244,14 @@ impl PulsarSinkWriter { let producer = build_pulsar_producer(&pulsar, &config).await?; Ok(Self { formatter, - payload_writer: PulsarPayloadWriter { - pulsar, - producer, - config, - send_future_buffer: VecDeque::new(), - }, + pulsar, + producer, + config, }) } } -impl PulsarPayloadWriter { +impl<'w> PulsarPayloadWriter<'w> { async fn send_message(&mut self, message: Message) -> Result<()> { let mut success_flag = false; let mut connection_err = None; @@ -247,17 +262,10 @@ impl PulsarPayloadWriter { // a SendFuture holding the message receipt // or error after sending is returned Ok(send_future) => { - // Check if send_future_buffer is greater than the preset limit - while self.send_future_buffer.len() >= PULSAR_SEND_FUTURE_BUFFER_MAX_SIZE { - self.send_future_buffer - .pop_front() - .expect("Expect the SendFuture not to be None") - .map_err(|e| SinkError::Pulsar(anyhow!(e))) - .await?; - } - + self.add_future + .add_future_may_await(may_delivery_future(send_future)) + .await?; success_flag = true; - self.send_future_buffer.push_back(send_future); break; } // error upon sending @@ -295,24 +303,9 @@ impl PulsarPayloadWriter { self.send_message(message).await?; Ok(()) } - - async fn commit_inner(&mut self) -> Result<()> { - self.producer - .send_batch() - .map_err(pulsar_to_sink_err) - .await?; - try_join_all( - self.send_future_buffer - .drain(..) - .map(|send_future| send_future.map_err(|e| SinkError::Pulsar(anyhow!(e)))), - ) - .await?; - - Ok(()) - } } -impl FormattedSink for PulsarPayloadWriter { +impl<'w> FormattedSink for PulsarPayloadWriter<'w> { type K = String; type V = Vec; @@ -321,23 +314,33 @@ impl FormattedSink for PulsarPayloadWriter { } } -#[async_trait] -impl SinkWriter for PulsarSinkWriter { - async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { +impl AsyncTruncateSinkWriter for PulsarSinkWriter { + type DeliveryFuture = PulsarDeliveryFuture; + + async fn write_chunk<'a>( + &'a mut self, + chunk: StreamChunk, + add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, + ) -> Result<()> { dispatch_sink_formatter_impl!(&self.formatter, formatter, { - self.payload_writer.write_chunk(chunk, formatter).await + let mut payload_writer = PulsarPayloadWriter { + producer: &mut self.producer, + add_future, + config: &self.config, + }; + // TODO: we can call `payload_writer.write_chunk(chunk, formatter)`, + // but for an unknown reason, this will greatly increase the compile time, + // by nearly 4x. May investigate it later. + for r in formatter.format_chunk(&chunk) { + let (key, value) = r?; + payload_writer + .write_inner( + key.map(SerTo::ser_to).transpose()?, + value.map(SerTo::ser_to).transpose()?, + ) + .await?; + } + Ok(()) }) } - - async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> { - Ok(()) - } - - async fn barrier(&mut self, is_checkpoint: bool) -> Result { - if is_checkpoint { - self.payload_writer.commit_inner().await?; - } - - Ok(()) - } } diff --git a/src/connector/src/sink/redis.rs b/src/connector/src/sink/redis.rs index 6120075a049df..af3ec3b981620 100644 --- a/src/connector/src/sink/redis.rs +++ b/src/connector/src/sink/redis.rs @@ -29,8 +29,11 @@ use super::formatter::SinkFormatterImpl; use super::writer::FormattedSink; use super::{SinkError, SinkParam}; use crate::dispatch_sink_formatter_impl; -use crate::sink::writer::{LogSinkerOf, SinkWriterExt}; -use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam}; +use crate::sink::log_store::DeliveryFutureManagerAddFuture; +use crate::sink::writer::{ + AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, +}; +use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriterParam}; pub const REDIS_SINK: &str = "redis"; pub const KEY_FORMAT: &str = "key_format"; @@ -99,11 +102,11 @@ impl TryFrom for RedisSink { impl Sink for RedisSink { type Coordinator = DummySinkCommitCoordinator; - type LogSinker = LogSinkerOf; + type LogSinker = AsyncTruncateLogSinkerOf; const SINK_NAME: &'static str = "redis"; - async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { + async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result { Ok(RedisSinkWriter::new( self.config.clone(), self.schema.clone(), @@ -113,7 +116,7 @@ impl Sink for RedisSink { self.sink_from_name.clone(), ) .await? - .into_log_sinker(writer_param.sink_metrics)) + .into_log_sinker(usize::MAX)) } async fn validate(&self) -> Result<()> { @@ -257,25 +260,16 @@ impl RedisSinkWriter { } } -#[async_trait] -impl SinkWriter for RedisSinkWriter { - async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { +impl AsyncTruncateSinkWriter for RedisSinkWriter { + async fn write_chunk<'a>( + &'a mut self, + chunk: StreamChunk, + _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, + ) -> Result<()> { dispatch_sink_formatter_impl!(&self.formatter, formatter, { self.payload_writer.write_chunk(chunk, formatter).await }) } - - async fn begin_epoch(&mut self, epoch: u64) -> Result<()> { - self.epoch = epoch; - Ok(()) - } - - async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> { - if is_checkpoint { - self.payload_writer.commit().await?; - } - Ok(()) - } } #[cfg(test)] @@ -290,6 +284,7 @@ mod test { use super::*; use crate::sink::catalog::{SinkEncode, SinkFormat}; + use crate::sink::log_store::DeliveryFutureManager; #[tokio::test] async fn test_write() { @@ -326,8 +321,10 @@ mod test { ], ); + let mut manager = DeliveryFutureManager::new(0); + redis_sink_writer - .write_batch(chunk_a) + .write_chunk(chunk_a, manager.start_write_chunk(0, 0)) .await .expect("failed to write batch"); let expected_a = @@ -383,6 +380,8 @@ mod test { .await .unwrap(); + let mut future_manager = DeliveryFutureManager::new(0); + let chunk_a = StreamChunk::new( vec![Op::Insert, Op::Insert, Op::Insert], vec![ @@ -392,7 +391,7 @@ mod test { ); redis_sink_writer - .write_batch(chunk_a) + .write_chunk(chunk_a, future_manager.start_write_chunk(0, 0)) .await .expect("failed to write batch"); let expected_a = vec![ diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 310213262b2ad..3c52cb720dbd4 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -227,6 +227,7 @@ async fn await_future_with_monitor_receiver_err> } } +#[async_trait] impl LogSinker for RemoteLogSinker { async fn consume_log_and_sink(self, mut log_reader: impl LogReader) -> Result<()> { // Note: this is a total copy of the implementation of LogSinkerOf, diff --git a/src/connector/src/sink/writer.rs b/src/connector/src/sink/writer.rs index 37ad452831b2e..64261bb42ab48 100644 --- a/src/connector/src/sink/writer.rs +++ b/src/connector/src/sink/writer.rs @@ -12,17 +12,25 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::future::{Future, Ready}; +use std::pin::pin; use std::sync::Arc; use std::time::Instant; use async_trait::async_trait; +use futures::future::{select, Either}; +use futures::TryFuture; use risingwave_common::array::StreamChunk; use risingwave_common::buffer::Bitmap; +use risingwave_common::util::drop_either_future; use crate::sink::encoder::SerTo; use crate::sink::formatter::SinkFormatter; -use crate::sink::log_store::{LogReader, LogStoreReadItem, TruncateOffset}; -use crate::sink::{LogSinker, Result, SinkMetrics}; +use crate::sink::log_store::{ + DeliveryFutureManager, DeliveryFutureManagerAddFuture, LogReader, LogStoreReadItem, + TruncateOffset, +}; +use crate::sink::{LogSinker, Result, SinkError, SinkMetrics}; #[async_trait] pub trait SinkWriter: Send + 'static { @@ -48,22 +56,17 @@ pub trait SinkWriter: Send + 'static { } } -// TODO: remove this trait after KafkaSinkWriter implements SinkWriter -#[async_trait] -// An old version of SinkWriter for backward compatibility -pub trait SinkWriterV1: Send + 'static { - async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()>; - - // the following interface is for transactions, if not supported, return Ok(()) - // start a transaction with epoch number. Note that epoch number should be increasing. - async fn begin_epoch(&mut self, epoch: u64) -> Result<()>; +pub type DummyDeliveryFuture = Ready>; - // commits the current transaction and marks all messages in the transaction success. - async fn commit(&mut self) -> Result<()>; +pub trait AsyncTruncateSinkWriter: Send + 'static { + type DeliveryFuture: TryFuture + Unpin + Send + 'static = + DummyDeliveryFuture; - // aborts the current transaction because some error happens. we should rollback to the last - // commit point. - async fn abort(&mut self) -> Result<()>; + fn write_chunk<'a>( + &'a mut self, + chunk: StreamChunk, + add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, + ) -> impl Future> + Send + 'a; } /// A free-form sink that may output in multiple formats and encodings. Examples include kafka, @@ -104,12 +107,12 @@ pub trait FormattedSink { } } -pub struct LogSinkerOf> { +pub struct LogSinkerOf { writer: W, sink_metrics: SinkMetrics, } -impl> LogSinkerOf { +impl LogSinkerOf { pub fn new(writer: W, sink_metrics: SinkMetrics) -> Self { LogSinkerOf { writer, @@ -118,6 +121,7 @@ impl> LogSinkerOf { } } +#[async_trait] impl> LogSinker for LogSinkerOf { async fn consume_log_and_sink(self, mut log_reader: impl LogReader) -> Result<()> { let mut sink_writer = self.writer; @@ -222,3 +226,64 @@ where } } } + +pub struct AsyncTruncateLogSinkerOf { + writer: W, + future_manager: DeliveryFutureManager, +} + +impl AsyncTruncateLogSinkerOf { + pub fn new(writer: W, max_future_count: usize) -> Self { + AsyncTruncateLogSinkerOf { + writer, + future_manager: DeliveryFutureManager::new(max_future_count), + } + } +} + +#[async_trait] +impl LogSinker for AsyncTruncateLogSinkerOf { + async fn consume_log_and_sink(mut self, mut log_reader: impl LogReader) -> Result<()> { + log_reader.init().await?; + loop { + let select_result = drop_either_future( + select( + pin!(log_reader.next_item()), + pin!(self.future_manager.next_truncate_offset()), + ) + .await, + ); + match select_result { + Either::Left(item_result) => { + let (epoch, item) = item_result?; + match item { + LogStoreReadItem::StreamChunk { chunk_id, chunk } => { + let add_future = self.future_manager.start_write_chunk(epoch, chunk_id); + self.writer.write_chunk(chunk, add_future).await?; + } + LogStoreReadItem::Barrier { + is_checkpoint: _is_checkpoint, + } => { + self.future_manager.add_barrier(epoch); + } + LogStoreReadItem::UpdateVnodeBitmap(_) => {} + } + } + Either::Right(offset_result) => { + let offset = offset_result?; + log_reader.truncate(offset).await?; + } + } + } + } +} + +#[easy_ext::ext(AsyncTruncateSinkWriterExt)] +impl T +where + T: AsyncTruncateSinkWriter + Sized, +{ + pub fn into_log_sinker(self, max_future_count: usize) -> AsyncTruncateLogSinkerOf { + AsyncTruncateLogSinkerOf::new(self, max_future_count) + } +}