Skip to content

Commit

Permalink
extract common logic
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Sep 28, 2023
1 parent 53789be commit d38837e
Show file tree
Hide file tree
Showing 2 changed files with 212 additions and 187 deletions.
214 changes: 28 additions & 186 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{HashMap, VecDeque};
use std::collections::HashMap;
use std::fmt::Debug;
use std::future::poll_fn;
use std::pin::pin;
use std::sync::Arc;
use std::task::Poll;
use std::time::Duration;

use anyhow::anyhow;
use futures::future::{select, Either};
use futures::{Future, FutureExt};
use rdkafka::error::{KafkaError, KafkaResult};
use rdkafka::error::KafkaError;
use rdkafka::message::ToBytes;
use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord};
use rdkafka::types::RDKafkaErrorCode;
Expand All @@ -33,14 +31,17 @@ use risingwave_common::util::drop_either_future;
use serde_derive::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use strum_macros::{Display, EnumString};
use tonic::codegen::futures_core::TryFuture;

use super::{
Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
SINK_TYPE_UPSERT,
};
use crate::common::KafkaCommon;
use crate::sink::formatter::SinkFormatterImpl;
use crate::sink::log_store::{ChunkId, LogReader, LogStoreReadItem, TruncateOffset};
use crate::sink::log_store::{
DeliveryFutureManager, DeliveryFutureManagerAddFuture, LogReader, LogStoreReadItem,
};
use crate::sink::writer::FormattedSink;
use crate::sink::{DummySinkCommitCoordinator, LogSinker, Result, SinkWriterParam};
use crate::source::kafka::{KafkaProperties, KafkaSplitEnumerator, PrivateLinkProducerContext};
Expand Down Expand Up @@ -346,27 +347,7 @@ impl Sink for KafkaSink {
}
}

trait DeliveryFutureTrait = Future<Output = <DeliveryFuture as Future>::Output> + Unpin + 'static;

enum KafkaDeliveryFutureManagerItem<F> {
Chunk {
chunk_id: ChunkId,
// earlier future at the front
futures: VecDeque<F>,
},
Barrier,
}

struct KafkaDeliveryFutureManager<F> {
future_count: usize,
max_future_count: usize,
// earlier items at the front
items: VecDeque<(u64, KafkaDeliveryFutureManagerItem<F>)>,
}

fn map_future_result(
delivery_future_result: <DeliveryFuture as Future>::Output,
) -> KafkaResult<()> {
fn map_future_result(delivery_future_result: <DeliveryFuture as Future>::Output) -> Result<()> {
match delivery_future_result {
// Successfully sent the record
// Will return the partition and offset of the message (i32, i64)
Expand All @@ -377,168 +358,18 @@ fn map_future_result(
// i.e., (KafkaError, OwnedMessage)
// We will just stop the loop, and return the error
// The sink executor will back to the latest checkpoint
Ok(Err((k_err, _msg))) => Err(k_err),
Ok(Err((k_err, _msg))) => Err(k_err.into()),
// This represents the producer is dropped
// before the delivery status is received
// Return `KafkaError::Canceled`
Err(_) => Err(KafkaError::Canceled),
Err(_) => Err(KafkaError::Canceled.into()),
}
}

impl<F: DeliveryFutureTrait> KafkaDeliveryFutureManager<F> {
fn new(max_future_count: usize) -> Self {
Self {
future_count: 0,
max_future_count,
items: Default::default(),
}
}
type KafkaSinkDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;

fn add_barrier(&mut self, epoch: u64) {
if let Some((item_epoch, last_item)) = self.items.back() {
match last_item {
KafkaDeliveryFutureManagerItem::Chunk { .. } => {
assert_eq!(*item_epoch, epoch)
}
KafkaDeliveryFutureManagerItem::Barrier => {
assert!(
epoch > *item_epoch,
"new barrier epoch {} should be greater than prev barrier {}",
epoch,
item_epoch
);
}
}
}
self.items
.push_back((epoch, KafkaDeliveryFutureManagerItem::Barrier));
}

fn start_write_chunk(
&mut self,
epoch: u64,
chunk_id: ChunkId,
) -> KafkaDeliveryFutureManagerAddFuture<'_, F> {
if let Some((item_epoch, item)) = self.items.back() {
match item {
KafkaDeliveryFutureManagerItem::Chunk {
chunk_id: item_chunk_id,
..
} => {
assert_eq!(epoch, *item_epoch);
assert!(
chunk_id > *item_chunk_id,
"new chunk id {} should be greater than prev chunk id {}",
chunk_id,
item_chunk_id
);
}
KafkaDeliveryFutureManagerItem::Barrier => {
assert!(
epoch > *item_epoch,
"new chunk epoch {} should be greater than prev barrier: {}",
epoch,
item_epoch
);
}
}
}
self.items.push_back((
epoch,
KafkaDeliveryFutureManagerItem::Chunk {
chunk_id,
futures: VecDeque::new(),
},
));
KafkaDeliveryFutureManagerAddFuture(self)
}
}

struct KafkaDeliveryFutureManagerAddFuture<'a, F>(&'a mut KafkaDeliveryFutureManager<F>);

impl<'a, F: DeliveryFutureTrait> KafkaDeliveryFutureManagerAddFuture<'a, F> {
async fn add_future(&mut self, future: F) -> Result<()> {
while self.0.future_count >= self.0.max_future_count {
tracing::warn!(
"Number of records being delivered ({}) >= expected kafka producer queue size ({}).
This indicates the default value of queue.buffering.max.messages has changed.",
self.0.future_count,
self.0.max_future_count,
);
self.await_one_delivery().await?;
}
match self.0.items.back_mut() {
Some((_, KafkaDeliveryFutureManagerItem::Chunk { futures, .. })) => {
futures.push_back(future);
self.0.future_count += 1;
Ok(())
}
_ => unreachable!("should add future only after add a new chunk"),
}
}

async fn await_one_delivery(&mut self) -> Result<()> {
for (_, item) in self.0.items.iter_mut().rev() {
if let KafkaDeliveryFutureManagerItem::Chunk {futures, ..} = item && !futures.is_empty() {
let delivery_future = futures.pop_front().expect("have checked non-empty");
self.0.future_count -= 1;
return map_future_result(delivery_future.await).map_err(SinkError::Kafka);
} else {
continue;
}
}
Ok(())
}
}

impl<F: DeliveryFutureTrait> KafkaDeliveryFutureManager<F> {
fn next_truncate_offset(&mut self) -> impl Future<Output = Result<TruncateOffset>> + '_ {
poll_fn(move |cx| {
let mut latest_offset: Option<TruncateOffset> = None;
'outer: loop {
if let Some((epoch, item)) = self.items.front_mut() {
match item {
KafkaDeliveryFutureManagerItem::Chunk { chunk_id, futures } => {
while let Some(future) = futures.front_mut() {
match future.poll_unpin(cx) {
Poll::Ready(result) => match map_future_result(result) {
Ok(()) => {
self.future_count -= 1;
futures.pop_front();
}
Err(result) => {
return Poll::Ready(Err(SinkError::Kafka(result)));
}
},
Poll::Pending => {
break 'outer;
}
}
}

// when we reach here, there must not be any pending or error future.
// Which means all futures of this stream chunk have been finished
assert!(futures.is_empty());
latest_offset = Some(TruncateOffset::Chunk {
epoch: *epoch,
chunk_id: *chunk_id,
});
self.items.pop_front().expect("items not empty");
}
KafkaDeliveryFutureManagerItem::Barrier => {
// Barrier will be yielded anyway
return Poll::Ready(Ok(TruncateOffset::Barrier { epoch: *epoch }));
}
}
}
}
if let Some(offset) = latest_offset {
Poll::Ready(Ok(offset))
} else {
Poll::Pending
}
})
}
fn map_delivery_future(future: DeliveryFuture) -> KafkaSinkDeliveryFuture {
future.map(map_future_result)
}

/// When the `DeliveryFuture` the current `future_delivery_buffer`
Expand All @@ -552,14 +383,14 @@ const KAFKA_WRITER_MAX_QUEUE_SIZE: usize = 100000;

struct KafkaPayloadWriter<'a> {
inner: &'a FutureProducer<PrivateLinkProducerContext>,
add_future: KafkaDeliveryFutureManagerAddFuture<'a, DeliveryFuture>,
add_future: DeliveryFutureManagerAddFuture<'a, KafkaSinkDeliveryFuture>,
config: &'a KafkaConfig,
}

pub struct KafkaLogSinker {
formatter: SinkFormatterImpl,
inner: FutureProducer<PrivateLinkProducerContext>,
future_manager: KafkaDeliveryFutureManager<DeliveryFuture>,
future_manager: DeliveryFutureManager<KafkaSinkDeliveryFuture>,
config: KafkaConfig,
}

Expand Down Expand Up @@ -602,7 +433,7 @@ impl KafkaLogSinker {
formatter,
inner,
config: config.clone(),
future_manager: KafkaDeliveryFutureManager::new(max_delivery_buffer_size),
future_manager: DeliveryFutureManager::new(max_delivery_buffer_size),
})
}
}
Expand All @@ -622,7 +453,18 @@ impl<'w> KafkaPayloadWriter<'w> {
for i in 0..self.config.max_retry_num {
match self.inner.send_result(record) {
Ok(delivery_future) => {
self.add_future.add_future(delivery_future).await?;
if self
.add_future
.add_future_may_await(map_delivery_future(delivery_future))
.await?
{
tracing::warn!(
"Number of records being delivered ({}) >= expected kafka producer queue size ({}).
This indicates the default value of queue.buffering.max.messages has changed.",
self.add_future.future_count(),
self.add_future.max_future_count()
);
}
success_flag = true;
break;
}
Expand All @@ -640,7 +482,7 @@ impl<'w> KafkaPayloadWriter<'w> {
KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull) => {
tracing::warn!(
"Producer queue full. Delivery future buffer size={}. Await and retry #{}",
self.add_future.0.future_count,
self.add_future.future_count(),
i
);
self.add_future.await_one_delivery().await?;
Expand Down
Loading

0 comments on commit d38837e

Please sign in to comment.