From d38837ed5018b11825b471eb87181766178f255a Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 28 Sep 2023 16:54:29 +0800 Subject: [PATCH] extract common logic --- src/connector/src/sink/kafka.rs | 214 ++++------------------------ src/connector/src/sink/log_store.rs | 185 +++++++++++++++++++++++- 2 files changed, 212 insertions(+), 187 deletions(-) diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index af86e793605a..2172d7da5bb8 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -12,18 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, VecDeque}; +use std::collections::HashMap; use std::fmt::Debug; -use std::future::poll_fn; use std::pin::pin; use std::sync::Arc; -use std::task::Poll; use std::time::Duration; use anyhow::anyhow; use futures::future::{select, Either}; use futures::{Future, FutureExt}; -use rdkafka::error::{KafkaError, KafkaResult}; +use rdkafka::error::KafkaError; use rdkafka::message::ToBytes; use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord}; use rdkafka::types::RDKafkaErrorCode; @@ -33,6 +31,7 @@ use risingwave_common::util::drop_either_future; use serde_derive::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; use strum_macros::{Display, EnumString}; +use tonic::codegen::futures_core::TryFuture; use super::{ Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, @@ -40,7 +39,9 @@ use super::{ }; use crate::common::KafkaCommon; use crate::sink::formatter::SinkFormatterImpl; -use crate::sink::log_store::{ChunkId, LogReader, LogStoreReadItem, TruncateOffset}; +use crate::sink::log_store::{ + DeliveryFutureManager, DeliveryFutureManagerAddFuture, LogReader, LogStoreReadItem, +}; use crate::sink::writer::FormattedSink; use crate::sink::{DummySinkCommitCoordinator, LogSinker, Result, SinkWriterParam}; use crate::source::kafka::{KafkaProperties, KafkaSplitEnumerator, PrivateLinkProducerContext}; @@ -346,27 +347,7 @@ impl Sink for KafkaSink { } } -trait DeliveryFutureTrait = Future::Output> + Unpin + 'static; - -enum KafkaDeliveryFutureManagerItem { - Chunk { - chunk_id: ChunkId, - // earlier future at the front - futures: VecDeque, - }, - Barrier, -} - -struct KafkaDeliveryFutureManager { - future_count: usize, - max_future_count: usize, - // earlier items at the front - items: VecDeque<(u64, KafkaDeliveryFutureManagerItem)>, -} - -fn map_future_result( - delivery_future_result: ::Output, -) -> KafkaResult<()> { +fn map_future_result(delivery_future_result: ::Output) -> Result<()> { match delivery_future_result { // Successfully sent the record // Will return the partition and offset of the message (i32, i64) @@ -377,168 +358,18 @@ fn map_future_result( // i.e., (KafkaError, OwnedMessage) // We will just stop the loop, and return the error // The sink executor will back to the latest checkpoint - Ok(Err((k_err, _msg))) => Err(k_err), + Ok(Err((k_err, _msg))) => Err(k_err.into()), // This represents the producer is dropped // before the delivery status is received // Return `KafkaError::Canceled` - Err(_) => Err(KafkaError::Canceled), + Err(_) => Err(KafkaError::Canceled.into()), } } -impl KafkaDeliveryFutureManager { - fn new(max_future_count: usize) -> Self { - Self { - future_count: 0, - max_future_count, - items: Default::default(), - } - } +type KafkaSinkDeliveryFuture = impl TryFuture + Unpin + 'static; - fn add_barrier(&mut self, epoch: u64) { - if let Some((item_epoch, last_item)) = self.items.back() { - match last_item { - KafkaDeliveryFutureManagerItem::Chunk { .. } => { - assert_eq!(*item_epoch, epoch) - } - KafkaDeliveryFutureManagerItem::Barrier => { - assert!( - epoch > *item_epoch, - "new barrier epoch {} should be greater than prev barrier {}", - epoch, - item_epoch - ); - } - } - } - self.items - .push_back((epoch, KafkaDeliveryFutureManagerItem::Barrier)); - } - - fn start_write_chunk( - &mut self, - epoch: u64, - chunk_id: ChunkId, - ) -> KafkaDeliveryFutureManagerAddFuture<'_, F> { - if let Some((item_epoch, item)) = self.items.back() { - match item { - KafkaDeliveryFutureManagerItem::Chunk { - chunk_id: item_chunk_id, - .. - } => { - assert_eq!(epoch, *item_epoch); - assert!( - chunk_id > *item_chunk_id, - "new chunk id {} should be greater than prev chunk id {}", - chunk_id, - item_chunk_id - ); - } - KafkaDeliveryFutureManagerItem::Barrier => { - assert!( - epoch > *item_epoch, - "new chunk epoch {} should be greater than prev barrier: {}", - epoch, - item_epoch - ); - } - } - } - self.items.push_back(( - epoch, - KafkaDeliveryFutureManagerItem::Chunk { - chunk_id, - futures: VecDeque::new(), - }, - )); - KafkaDeliveryFutureManagerAddFuture(self) - } -} - -struct KafkaDeliveryFutureManagerAddFuture<'a, F>(&'a mut KafkaDeliveryFutureManager); - -impl<'a, F: DeliveryFutureTrait> KafkaDeliveryFutureManagerAddFuture<'a, F> { - async fn add_future(&mut self, future: F) -> Result<()> { - while self.0.future_count >= self.0.max_future_count { - tracing::warn!( - "Number of records being delivered ({}) >= expected kafka producer queue size ({}). - This indicates the default value of queue.buffering.max.messages has changed.", - self.0.future_count, - self.0.max_future_count, - ); - self.await_one_delivery().await?; - } - match self.0.items.back_mut() { - Some((_, KafkaDeliveryFutureManagerItem::Chunk { futures, .. })) => { - futures.push_back(future); - self.0.future_count += 1; - Ok(()) - } - _ => unreachable!("should add future only after add a new chunk"), - } - } - - async fn await_one_delivery(&mut self) -> Result<()> { - for (_, item) in self.0.items.iter_mut().rev() { - if let KafkaDeliveryFutureManagerItem::Chunk {futures, ..} = item && !futures.is_empty() { - let delivery_future = futures.pop_front().expect("have checked non-empty"); - self.0.future_count -= 1; - return map_future_result(delivery_future.await).map_err(SinkError::Kafka); - } else { - continue; - } - } - Ok(()) - } -} - -impl KafkaDeliveryFutureManager { - fn next_truncate_offset(&mut self) -> impl Future> + '_ { - poll_fn(move |cx| { - let mut latest_offset: Option = None; - 'outer: loop { - if let Some((epoch, item)) = self.items.front_mut() { - match item { - KafkaDeliveryFutureManagerItem::Chunk { chunk_id, futures } => { - while let Some(future) = futures.front_mut() { - match future.poll_unpin(cx) { - Poll::Ready(result) => match map_future_result(result) { - Ok(()) => { - self.future_count -= 1; - futures.pop_front(); - } - Err(result) => { - return Poll::Ready(Err(SinkError::Kafka(result))); - } - }, - Poll::Pending => { - break 'outer; - } - } - } - - // when we reach here, there must not be any pending or error future. - // Which means all futures of this stream chunk have been finished - assert!(futures.is_empty()); - latest_offset = Some(TruncateOffset::Chunk { - epoch: *epoch, - chunk_id: *chunk_id, - }); - self.items.pop_front().expect("items not empty"); - } - KafkaDeliveryFutureManagerItem::Barrier => { - // Barrier will be yielded anyway - return Poll::Ready(Ok(TruncateOffset::Barrier { epoch: *epoch })); - } - } - } - } - if let Some(offset) = latest_offset { - Poll::Ready(Ok(offset)) - } else { - Poll::Pending - } - }) - } +fn map_delivery_future(future: DeliveryFuture) -> KafkaSinkDeliveryFuture { + future.map(map_future_result) } /// When the `DeliveryFuture` the current `future_delivery_buffer` @@ -552,14 +383,14 @@ const KAFKA_WRITER_MAX_QUEUE_SIZE: usize = 100000; struct KafkaPayloadWriter<'a> { inner: &'a FutureProducer, - add_future: KafkaDeliveryFutureManagerAddFuture<'a, DeliveryFuture>, + add_future: DeliveryFutureManagerAddFuture<'a, KafkaSinkDeliveryFuture>, config: &'a KafkaConfig, } pub struct KafkaLogSinker { formatter: SinkFormatterImpl, inner: FutureProducer, - future_manager: KafkaDeliveryFutureManager, + future_manager: DeliveryFutureManager, config: KafkaConfig, } @@ -602,7 +433,7 @@ impl KafkaLogSinker { formatter, inner, config: config.clone(), - future_manager: KafkaDeliveryFutureManager::new(max_delivery_buffer_size), + future_manager: DeliveryFutureManager::new(max_delivery_buffer_size), }) } } @@ -622,7 +453,18 @@ impl<'w> KafkaPayloadWriter<'w> { for i in 0..self.config.max_retry_num { match self.inner.send_result(record) { Ok(delivery_future) => { - self.add_future.add_future(delivery_future).await?; + if self + .add_future + .add_future_may_await(map_delivery_future(delivery_future)) + .await? + { + tracing::warn!( + "Number of records being delivered ({}) >= expected kafka producer queue size ({}). + This indicates the default value of queue.buffering.max.messages has changed.", + self.add_future.future_count(), + self.add_future.max_future_count() + ); + } success_flag = true; break; } @@ -640,7 +482,7 @@ impl<'w> KafkaPayloadWriter<'w> { KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull) => { tracing::warn!( "Producer queue full. Delivery future buffer size={}. Await and retry #{}", - self.add_future.0.future_count, + self.add_future.future_count(), i ); self.add_future.await_one_delivery().await?; diff --git a/src/connector/src/sink/log_store.rs b/src/connector/src/sink/log_store.rs index d8dae1db0ce1..8ff02e726812 100644 --- a/src/connector/src/sink/log_store.rs +++ b/src/connector/src/sink/log_store.rs @@ -13,11 +13,14 @@ // limitations under the License. use std::cmp::Ordering; +use std::collections::VecDeque; use std::fmt::Debug; -use std::future::Future; +use std::future::{poll_fn, Future}; use std::sync::Arc; +use std::task::Poll; use anyhow::anyhow; +use futures::{TryFuture, TryFutureExt}; use risingwave_common::array::StreamChunk; use risingwave_common::buffer::Bitmap; use risingwave_common::util::epoch::EpochPair; @@ -192,6 +195,186 @@ where } } +enum DeliveryFutureManagerItem { + Chunk { + chunk_id: ChunkId, + // earlier future at the front + futures: VecDeque, + }, + Barrier, +} + +pub struct DeliveryFutureManager { + future_count: usize, + max_future_count: usize, + // earlier items at the front + items: VecDeque<(u64, DeliveryFutureManagerItem)>, +} + +impl DeliveryFutureManager { + pub fn new(max_future_count: usize) -> Self { + Self { + future_count: 0, + max_future_count, + items: Default::default(), + } + } + + pub fn add_barrier(&mut self, epoch: u64) { + if let Some((item_epoch, last_item)) = self.items.back() { + match last_item { + DeliveryFutureManagerItem::Chunk { .. } => { + assert_eq!(*item_epoch, epoch) + } + DeliveryFutureManagerItem::Barrier => { + assert!( + epoch > *item_epoch, + "new barrier epoch {} should be greater than prev barrier {}", + epoch, + item_epoch + ); + } + } + } + self.items + .push_back((epoch, DeliveryFutureManagerItem::Barrier)); + } + + pub fn start_write_chunk( + &mut self, + epoch: u64, + chunk_id: ChunkId, + ) -> DeliveryFutureManagerAddFuture<'_, F> { + if let Some((item_epoch, item)) = self.items.back() { + match item { + DeliveryFutureManagerItem::Chunk { + chunk_id: item_chunk_id, + .. + } => { + assert_eq!(epoch, *item_epoch); + assert!( + chunk_id > *item_chunk_id, + "new chunk id {} should be greater than prev chunk id {}", + chunk_id, + item_chunk_id + ); + } + DeliveryFutureManagerItem::Barrier => { + assert!( + epoch > *item_epoch, + "new chunk epoch {} should be greater than prev barrier: {}", + epoch, + item_epoch + ); + } + } + } + self.items.push_back(( + epoch, + DeliveryFutureManagerItem::Chunk { + chunk_id, + futures: VecDeque::new(), + }, + )); + DeliveryFutureManagerAddFuture(self) + } +} + +pub struct DeliveryFutureManagerAddFuture<'a, F>(&'a mut DeliveryFutureManager); + +impl<'a, F: TryFuture + Unpin + 'static> DeliveryFutureManagerAddFuture<'a, F> { + /// Add a new future to the latest started written chunk. + /// The returned bool value indicate whether we have awaited on any previous futures. + pub async fn add_future_may_await(&mut self, future: F) -> Result { + let mut has_await = false; + while self.0.future_count >= self.0.max_future_count { + self.await_one_delivery().await?; + has_await = true; + } + match self.0.items.back_mut() { + Some((_, DeliveryFutureManagerItem::Chunk { futures, .. })) => { + futures.push_back(future); + self.0.future_count += 1; + Ok(has_await) + } + _ => unreachable!("should add future only after add a new chunk"), + } + } + + pub async fn await_one_delivery(&mut self) -> Result<(), F::Error> { + for (_, item) in self.0.items.iter_mut().rev() { + if let DeliveryFutureManagerItem::Chunk {futures, ..} = item && !futures.is_empty() { + let mut delivery_future = futures.pop_front().expect("have checked non-empty"); + self.0.future_count -= 1; + return poll_fn(|cx| delivery_future.try_poll_unpin(cx)).await; + } else { + continue; + } + } + Ok(()) + } + + pub fn future_count(&self) -> usize { + self.0.future_count + } + + pub fn max_future_count(&self) -> usize { + self.0.max_future_count + } +} + +impl + Unpin + 'static> DeliveryFutureManager { + pub fn next_truncate_offset( + &mut self, + ) -> impl Future> + '_ { + poll_fn(move |cx| { + let mut latest_offset: Option = None; + 'outer: loop { + if let Some((epoch, item)) = self.items.front_mut() { + match item { + DeliveryFutureManagerItem::Chunk { chunk_id, futures } => { + while let Some(future) = futures.front_mut() { + match future.try_poll_unpin(cx) { + Poll::Ready(result) => match result { + Ok(()) => { + self.future_count -= 1; + futures.pop_front(); + } + Err(e) => { + return Poll::Ready(Err(e)); + } + }, + Poll::Pending => { + break 'outer; + } + } + } + + // when we reach here, there must not be any pending or error future. + // Which means all futures of this stream chunk have been finished + assert!(futures.is_empty()); + latest_offset = Some(TruncateOffset::Chunk { + epoch: *epoch, + chunk_id: *chunk_id, + }); + self.items.pop_front().expect("items not empty"); + } + DeliveryFutureManagerItem::Barrier => { + // Barrier will be yielded anyway + return Poll::Ready(Ok(TruncateOffset::Barrier { epoch: *epoch })); + } + } + } + } + if let Some(offset) = latest_offset { + Poll::Ready(Ok(offset)) + } else { + Poll::Pending + } + }) + } +} + #[cfg(test)] mod tests { use crate::sink::log_store::TruncateOffset;