From da89875fe63e48c6e150059be56d8f5fdc2a1450 Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Wed, 6 Sep 2023 18:27:52 +0800 Subject: [PATCH] feat(log-store): refine log store trait and add update vnode bitmap in reader (#11959) --- src/connector/src/sink/boxed.rs | 3 +- src/connector/src/sink/clickhouse.rs | 9 - src/connector/src/sink/coordinate.rs | 4 +- src/connector/src/sink/iceberg.rs | 6 - src/connector/src/sink/kinesis.rs | 9 - src/connector/src/sink/mod.rs | 21 +- src/connector/src/sink/nats.rs | 9 - src/connector/src/sink/redis.rs | 9 - src/connector/src/sink/remote.rs | 10 - src/stream/src/common/log_store/in_mem.rs | 247 ++++++++--------- .../src/common/log_store/kv_log_store/mod.rs | 53 ++-- .../common/log_store/kv_log_store/reader.rs | 251 ++++++++---------- .../common/log_store/kv_log_store/serde.rs | 10 +- .../common/log_store/kv_log_store/writer.rs | 137 +++++----- src/stream/src/common/log_store/mod.rs | 47 ++-- src/stream/src/executor/sink.rs | 14 + src/stream/src/lib.rs | 1 + .../tests/integration_tests/sink/basic.rs | 11 - 18 files changed, 371 insertions(+), 480 deletions(-) diff --git a/src/connector/src/sink/boxed.rs b/src/connector/src/sink/boxed.rs index 4b3176dec7282..0a7082961a00e 100644 --- a/src/connector/src/sink/boxed.rs +++ b/src/connector/src/sink/boxed.rs @@ -14,6 +14,7 @@ use std::fmt::{Debug, Formatter}; use std::ops::{Deref, DerefMut}; +use std::sync::Arc; use async_trait::async_trait; use risingwave_common::array::StreamChunk; @@ -54,7 +55,7 @@ impl SinkWriter for BoxWriter { self.deref_mut().abort().await } - async fn update_vnode_bitmap(&mut self, vnode_bitmap: Bitmap) -> crate::sink::Result<()> { + async fn update_vnode_bitmap(&mut self, vnode_bitmap: Arc) -> crate::sink::Result<()> { self.deref_mut().update_vnode_bitmap(vnode_bitmap).await } } diff --git a/src/connector/src/sink/clickhouse.rs b/src/connector/src/sink/clickhouse.rs index f6ff3c9bbc998..db6a714c8922f 100644 --- a/src/connector/src/sink/clickhouse.rs +++ b/src/connector/src/sink/clickhouse.rs @@ -19,7 +19,6 @@ use anyhow::anyhow; use clickhouse::{Client, Row as ClickHouseRow}; use itertools::Itertools; use risingwave_common::array::{Op, RowRef, StreamChunk}; -use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; use risingwave_common::row::Row; use risingwave_common::types::{DataType, ScalarRefImpl, Serial}; @@ -439,14 +438,6 @@ impl SinkWriter for ClickHouseSinkWriter { async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> { Ok(()) } - - async fn abort(&mut self) -> Result<()> { - Ok(()) - } - - async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Bitmap) -> Result<()> { - Ok(()) - } } #[derive(ClickHouseRow, Deserialize)] diff --git a/src/connector/src/sink/coordinate.rs b/src/connector/src/sink/coordinate.rs index f81a2fe1bac3b..bbcc7b636b17c 100644 --- a/src/connector/src/sink/coordinate.rs +++ b/src/connector/src/sink/coordinate.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use anyhow::anyhow; use risingwave_common::array::StreamChunk; use risingwave_common::buffer::Bitmap; @@ -81,7 +83,7 @@ impl>> SinkWriter for Coordi self.inner.abort().await } - async fn update_vnode_bitmap(&mut self, vnode_bitmap: Bitmap) -> Result<()> { + async fn update_vnode_bitmap(&mut self, vnode_bitmap: Arc) -> Result<()> { self.inner.update_vnode_bitmap(vnode_bitmap).await } } diff --git a/src/connector/src/sink/iceberg.rs b/src/connector/src/sink/iceberg.rs index b71af7c4bf214..451c8b2686ec7 100644 --- a/src/connector/src/sink/iceberg.rs +++ b/src/connector/src/sink/iceberg.rs @@ -411,12 +411,6 @@ impl SinkWriter for IcebergWriter { // TODO: abort should clean up all the data written in this epoch. Ok(()) } - - /// Update the vnode bitmap of current sink writer - async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Bitmap) -> Result<()> { - // Just skip it. - Ok(()) - } } pub struct IcebergSinkCommitter { diff --git a/src/connector/src/sink/kinesis.rs b/src/connector/src/sink/kinesis.rs index 4a6f3a789bf14..4d76a3235e381 100644 --- a/src/connector/src/sink/kinesis.rs +++ b/src/connector/src/sink/kinesis.rs @@ -22,7 +22,6 @@ use aws_sdk_kinesis::primitives::Blob; use aws_sdk_kinesis::Client as KinesisClient; use futures_async_stream::for_await; use risingwave_common::array::StreamChunk; -use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; use risingwave_rpc_client::ConnectorClient; use serde_derive::Deserialize; @@ -272,14 +271,6 @@ impl SinkWriter for KinesisSinkWriter { async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> { Ok(()) } - - async fn abort(&mut self) -> Result<()> { - Ok(()) - } - - async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Bitmap) -> Result<()> { - Ok(()) - } } #[macro_export] diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 4976b94cf504a..fd1de194671b3 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -27,6 +27,7 @@ pub mod test_sink; pub mod utils; use std::collections::HashMap; +use std::sync::Arc; use ::clickhouse::error::Error as ClickHouseError; use anyhow::anyhow; @@ -173,10 +174,14 @@ pub trait SinkWriter: Send + 'static { async fn barrier(&mut self, is_checkpoint: bool) -> Result; /// Clean up - async fn abort(&mut self) -> Result<()>; + async fn abort(&mut self) -> Result<()> { + Ok(()) + } /// Update the vnode bitmap of current sink writer - async fn update_vnode_bitmap(&mut self, vnode_bitmap: Bitmap) -> Result<()>; + async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Arc) -> Result<()> { + Ok(()) + } } #[async_trait] @@ -240,10 +245,6 @@ impl SinkWriter for SinkWriterV1Adapter { async fn abort(&mut self) -> Result<()> { self.inner.abort().await } - - async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Bitmap) -> Result<()> { - Ok(()) - } } #[async_trait] @@ -314,17 +315,9 @@ impl SinkWriter for BlackHoleSink { Ok(()) } - async fn abort(&mut self) -> Result<()> { - Ok(()) - } - async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> { Ok(()) } - - async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Bitmap) -> Result<()> { - Ok(()) - } } impl SinkConfig { diff --git a/src/connector/src/sink/nats.rs b/src/connector/src/sink/nats.rs index 76df9486bc39a..c2408acbdab9d 100644 --- a/src/connector/src/sink/nats.rs +++ b/src/connector/src/sink/nats.rs @@ -17,7 +17,6 @@ use std::collections::HashMap; use anyhow::anyhow; use async_nats::jetstream::context::Context; use risingwave_common::array::StreamChunk; -use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; use risingwave_common::error::anyhow_error; use risingwave_rpc_client::ConnectorClient; @@ -155,12 +154,4 @@ impl SinkWriter for NatsSinkWriter { async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> { Ok(()) } - - async fn abort(&mut self) -> Result<()> { - Ok(()) - } - - async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Bitmap) -> Result<()> { - Ok(()) - } } diff --git a/src/connector/src/sink/redis.rs b/src/connector/src/sink/redis.rs index 1c059a1fcb16e..e5afa88c38b2f 100644 --- a/src/connector/src/sink/redis.rs +++ b/src/connector/src/sink/redis.rs @@ -14,7 +14,6 @@ use async_trait::async_trait; use risingwave_common::array::StreamChunk; -use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; use risingwave_rpc_client::ConnectorClient; @@ -58,15 +57,7 @@ impl SinkWriter for RedisSinkWriter { todo!() } - async fn abort(&mut self) -> Result<()> { - todo!() - } - async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> { todo!() } - - async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Bitmap) -> Result<()> { - todo!() - } } diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index dcaaeb0389d95..1d62577a88ad8 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -20,7 +20,6 @@ use async_trait::async_trait; use itertools::Itertools; use prost::Message; use risingwave_common::array::StreamChunk; -use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; use risingwave_common::error::anyhow_error; use risingwave_common::types::DataType; @@ -409,15 +408,6 @@ where Ok(::non_checkpoint_return_value()) } } - - async fn abort(&mut self) -> Result<()> { - Ok(()) - } - - async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Bitmap) -> Result<()> { - // TODO: handle scaling - Ok(()) - } } pub struct RemoteCoordinator { diff --git a/src/stream/src/common/log_store/in_mem.rs b/src/stream/src/common/log_store/in_mem.rs index ce218d781f6c3..f387d971e8ffe 100644 --- a/src/stream/src/common/log_store/in_mem.rs +++ b/src/stream/src/common/log_store/in_mem.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::future::Future; use std::sync::Arc; use anyhow::anyhow; @@ -35,6 +34,7 @@ enum InMemLogStoreItem { next_epoch: u64, is_checkpoint: bool, }, + UpdateVnodeBitmap(Arc), } /// An in-memory log store that can buffer a bounded amount of stream chunk in memory via bounded @@ -95,162 +95,149 @@ impl LogStoreFactory for BoundedInMemLogStoreFactory { type Reader = BoundedInMemLogStoreReader; type Writer = BoundedInMemLogStoreWriter; - type BuildFuture = impl Future; - - fn build(self) -> Self::BuildFuture { - async move { - let (init_epoch_tx, init_epoch_rx) = oneshot::channel(); - let (item_tx, item_rx) = channel(self.bound); - let (truncated_epoch_tx, truncated_epoch_rx) = unbounded_channel(); - let reader = BoundedInMemLogStoreReader { - epoch_progress: UNINITIALIZED, - init_epoch_rx: Some(init_epoch_rx), - item_rx, - truncated_epoch_tx, - }; - let writer = BoundedInMemLogStoreWriter { - curr_epoch: None, - init_epoch_tx: Some(init_epoch_tx), - item_tx, - truncated_epoch_rx, - }; - (reader, writer) - } + #[expect(clippy::unused_async)] + async fn build(self) -> (Self::Reader, Self::Writer) { + let (init_epoch_tx, init_epoch_rx) = oneshot::channel(); + let (item_tx, item_rx) = channel(self.bound); + let (truncated_epoch_tx, truncated_epoch_rx) = unbounded_channel(); + let reader = BoundedInMemLogStoreReader { + epoch_progress: UNINITIALIZED, + init_epoch_rx: Some(init_epoch_rx), + item_rx, + truncated_epoch_tx, + }; + let writer = BoundedInMemLogStoreWriter { + curr_epoch: None, + init_epoch_tx: Some(init_epoch_tx), + item_tx, + truncated_epoch_rx, + }; + (reader, writer) } } impl LogReader for BoundedInMemLogStoreReader { - type InitFuture<'a> = impl Future> + 'a; - type NextItemFuture<'a> = impl Future> + 'a; - type TruncateFuture<'a> = impl Future> + 'a; - - fn init(&mut self) -> Self::InitFuture<'_> { - async { - let init_epoch_rx = self - .init_epoch_rx - .take() - .expect("should not init for twice"); - let epoch = init_epoch_rx - .await - .map_err(|e| anyhow!("unable to get init epoch: {:?}", e))?; - assert_eq!(self.epoch_progress, UNINITIALIZED); - self.epoch_progress = LogReaderEpochProgress::Consuming(epoch); - Ok(()) - } + async fn init(&mut self) -> LogStoreResult<()> { + let init_epoch_rx = self + .init_epoch_rx + .take() + .expect("should not init for twice"); + let epoch = init_epoch_rx + .await + .map_err(|e| anyhow!("unable to get init epoch: {:?}", e))?; + assert_eq!(self.epoch_progress, UNINITIALIZED); + self.epoch_progress = LogReaderEpochProgress::Consuming(epoch); + Ok(()) } - fn next_item(&mut self) -> Self::NextItemFuture<'_> { - async { - match self.item_rx.recv().await { - Some(item) => match self.epoch_progress { - Consuming(current_epoch) => match item { - InMemLogStoreItem::StreamChunk(chunk) => { - Ok((current_epoch, LogStoreReadItem::StreamChunk(chunk))) - } - InMemLogStoreItem::Barrier { - is_checkpoint, - next_epoch, - } => { - if is_checkpoint { - self.epoch_progress = AwaitingTruncate { - next_epoch, - sealed_epoch: current_epoch, - }; - } else { - self.epoch_progress = Consuming(next_epoch); - } - Ok((current_epoch, LogStoreReadItem::Barrier { is_checkpoint })) + async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> { + match self.item_rx.recv().await { + Some(item) => match self.epoch_progress { + Consuming(current_epoch) => match item { + InMemLogStoreItem::StreamChunk(chunk) => { + Ok((current_epoch, LogStoreReadItem::StreamChunk(chunk))) + } + InMemLogStoreItem::Barrier { + is_checkpoint, + next_epoch, + } => { + if is_checkpoint { + self.epoch_progress = AwaitingTruncate { + next_epoch, + sealed_epoch: current_epoch, + }; + } else { + self.epoch_progress = Consuming(next_epoch); } - }, - AwaitingTruncate { .. } => { - unreachable!("should not be awaiting for when barrier comes") + Ok((current_epoch, LogStoreReadItem::Barrier { is_checkpoint })) } + InMemLogStoreItem::UpdateVnodeBitmap(vnode_bitmap) => Ok(( + current_epoch, + LogStoreReadItem::UpdateVnodeBitmap(vnode_bitmap), + )), }, - None => Err(LogStoreError::EndOfLogStream), - } + AwaitingTruncate { .. } => { + unreachable!("should not be awaiting for when barrier comes") + } + }, + None => Err(LogStoreError::EndOfLogStream), } } - fn truncate(&mut self) -> Self::TruncateFuture<'_> { - async move { - let sealed_epoch = match self.epoch_progress { - Consuming(_) => unreachable!("should be awaiting truncate"), - AwaitingTruncate { - sealed_epoch, - next_epoch, - } => { - self.epoch_progress = Consuming(next_epoch); - sealed_epoch - } - }; - self.truncated_epoch_tx - .send(sealed_epoch) - .map_err(|_| anyhow!("unable to send sealed epoch"))?; - Ok(()) - } + #[expect(clippy::unused_async)] + async fn truncate(&mut self) -> LogStoreResult<()> { + let sealed_epoch = match self.epoch_progress { + Consuming(_) => unreachable!("should be awaiting truncate"), + AwaitingTruncate { + sealed_epoch, + next_epoch, + } => { + self.epoch_progress = Consuming(next_epoch); + sealed_epoch + } + }; + self.truncated_epoch_tx + .send(sealed_epoch) + .map_err(|_| anyhow!("unable to send sealed epoch"))?; + Ok(()) } } impl LogWriter for BoundedInMemLogStoreWriter { - type FlushCurrentEpoch<'a> = impl Future> + 'a; - type InitFuture<'a> = impl Future> + 'a; - type WriteChunkFuture<'a> = impl Future> + 'a; - - fn init(&mut self, epoch: EpochPair) -> Self::InitFuture<'_> { - async move { - let init_epoch_tx = self.init_epoch_tx.take().expect("cannot be init for twice"); - init_epoch_tx - .send(epoch.curr) - .map_err(|_| anyhow!("unable to send init epoch"))?; - self.curr_epoch = Some(epoch.curr); - Ok(()) - } + #[expect(clippy::unused_async)] + async fn init(&mut self, epoch: EpochPair) -> LogStoreResult<()> { + let init_epoch_tx = self.init_epoch_tx.take().expect("cannot be init for twice"); + init_epoch_tx + .send(epoch.curr) + .map_err(|_| anyhow!("unable to send init epoch"))?; + self.curr_epoch = Some(epoch.curr); + Ok(()) } - fn write_chunk(&mut self, chunk: StreamChunk) -> Self::WriteChunkFuture<'_> { - async { - self.item_tx - .send(InMemLogStoreItem::StreamChunk(chunk)) - .await - .map_err(|_| anyhow!("unable to send stream chunk"))?; - Ok(()) - } + async fn write_chunk(&mut self, chunk: StreamChunk) -> LogStoreResult<()> { + self.item_tx + .send(InMemLogStoreItem::StreamChunk(chunk)) + .await + .map_err(|_| anyhow!("unable to send stream chunk"))?; + Ok(()) } - fn flush_current_epoch( + async fn flush_current_epoch( &mut self, next_epoch: u64, is_checkpoint: bool, - ) -> Self::FlushCurrentEpoch<'_> { - async move { - self.item_tx - .send(InMemLogStoreItem::Barrier { - next_epoch, - is_checkpoint, - }) + ) -> LogStoreResult<()> { + self.item_tx + .send(InMemLogStoreItem::Barrier { + next_epoch, + is_checkpoint, + }) + .await + .map_err(|_| anyhow!("unable to send barrier"))?; + + let prev_epoch = self + .curr_epoch + .replace(next_epoch) + .expect("should have epoch"); + + if is_checkpoint { + let truncated_epoch = self + .truncated_epoch_rx + .recv() .await - .map_err(|_| anyhow!("unable to send barrier"))?; - - let prev_epoch = self - .curr_epoch - .replace(next_epoch) - .expect("should have epoch"); - - if is_checkpoint { - let truncated_epoch = self - .truncated_epoch_rx - .recv() - .await - .ok_or_else(|| anyhow!("cannot get truncated epoch"))?; - assert_eq!(truncated_epoch, prev_epoch); - } - - Ok(()) + .ok_or_else(|| anyhow!("cannot get truncated epoch"))?; + assert_eq!(truncated_epoch, prev_epoch); } + + Ok(()) } - fn update_vnode_bitmap(&mut self, _new_vnodes: Arc) { - // Since this is in memory, we don't need to handle the vnode bitmap + async fn update_vnode_bitmap(&mut self, new_vnodes: Arc) -> LogStoreResult<()> { + Ok(self + .item_tx + .send(InMemLogStoreItem::UpdateVnodeBitmap(new_vnodes)) + .await + .map_err(|_| anyhow!("unable to send vnode bitmap"))?) } } diff --git a/src/stream/src/common/log_store/kv_log_store/mod.rs b/src/stream/src/common/log_store/kv_log_store/mod.rs index d7c62dbf5c33d..f54ef44c49141 100644 --- a/src/stream/src/common/log_store/kv_log_store/mod.rs +++ b/src/stream/src/common/log_store/kv_log_store/mod.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::future::Future; use std::sync::Arc; use risingwave_common::buffer::Bitmap; @@ -72,34 +71,30 @@ impl LogStoreFactory for KvLogStoreFactory { type Reader = KvLogStoreReader; type Writer = KvLogStoreWriter; - type BuildFuture = impl Future; - - fn build(self) -> Self::BuildFuture { - async move { - let table_id = TableId::new(self.table_catalog.id); - let serde = LogStoreRowSerde::new(&self.table_catalog, self.vnodes); - let local_state_store = self - .state_store - .new_local(NewLocalOptions { - table_id: TableId { - table_id: self.table_catalog.id, - }, - is_consistent_op: false, - table_option: TableOption { - retention_seconds: None, - }, - is_replicated: false, - }) - .await; - - let (tx, rx) = new_log_store_buffer(self.max_stream_chunk_count); - - let reader = KvLogStoreReader::new(table_id, self.state_store, serde.clone(), rx); - - let writer = KvLogStoreWriter::new(table_id, local_state_store, serde, tx); - - (reader, writer) - } + async fn build(self) -> (Self::Reader, Self::Writer) { + let table_id = TableId::new(self.table_catalog.id); + let serde = LogStoreRowSerde::new(&self.table_catalog, self.vnodes); + let local_state_store = self + .state_store + .new_local(NewLocalOptions { + table_id: TableId { + table_id: self.table_catalog.id, + }, + is_consistent_op: false, + table_option: TableOption { + retention_seconds: None, + }, + is_replicated: false, + }) + .await; + + let (tx, rx) = new_log_store_buffer(self.max_stream_chunk_count); + + let reader = KvLogStoreReader::new(table_id, self.state_store, serde.clone(), rx); + + let writer = KvLogStoreWriter::new(table_id, local_state_store, serde, tx); + + (reader, writer) } } diff --git a/src/stream/src/common/log_store/kv_log_store/reader.rs b/src/stream/src/common/log_store/kv_log_store/reader.rs index 96c1c8db3aff1..ffed40d9ce47b 100644 --- a/src/stream/src/common/log_store/kv_log_store/reader.rs +++ b/src/stream/src/common/log_store/kv_log_store/reader.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::future::Future; use std::ops::Bound::{Excluded, Included}; use std::pin::Pin; @@ -75,148 +74,130 @@ impl KvLogStoreReader { } impl LogReader for KvLogStoreReader { - type InitFuture<'a> = impl Future> + 'a; - type NextItemFuture<'a> = impl Future> + 'a; - type TruncateFuture<'a> = impl Future> + 'a; + async fn init(&mut self) -> LogStoreResult<()> { + let first_write_epoch = self.rx.init().await; + let streams = try_join_all(self.serde.vnodes().iter_vnodes().map(|vnode| { + let range_start = Bytes::from(Vec::from(vnode.to_be_bytes())); + let range_end = self.serde.serialize_epoch(vnode, first_write_epoch); + let table_id = self.table_id; + let state_store = self.state_store.clone(); + async move { + state_store + .iter( + (Included(range_start), Excluded(range_end)), + u64::MAX, + ReadOptions { + prefetch_options: PrefetchOptions::new_for_exhaust_iter(), + cache_policy: CachePolicy::Fill(CachePriority::Low), + table_id, + ..Default::default() + }, + ) + .await + } + })) + .await?; + // TODO: set chunk size by config + let state_store_stream = + Box::pin(new_log_store_item_stream(streams, self.serde.clone(), 1024)); + self.reader_state = ReaderState::ConsumingStateStore { + first_write_epoch, + state_store_stream, + }; + Ok(()) + } - fn init(&mut self) -> Self::InitFuture<'_> { - async move { - let first_write_epoch = self.rx.init().await; - let streams = try_join_all(self.serde.vnodes().iter_vnodes().map(|vnode| { - let range_start = Bytes::from(Vec::from(vnode.to_be_bytes())); - let range_end = self.serde.serialize_epoch(vnode, first_write_epoch); - let table_id = self.table_id; - let state_store = self.state_store.clone(); - async move { - state_store - .iter( - (Included(range_start), Excluded(range_end)), - u64::MAX, - ReadOptions { - prefetch_options: PrefetchOptions::new_for_exhaust_iter(), - cache_policy: CachePolicy::Fill(CachePriority::Low), - table_id, - ..Default::default() - }, - ) - .await - } - })) - .await?; - // TODO: set chunk size by config - let state_store_stream = - Box::pin(new_log_store_item_stream(streams, self.serde.clone(), 1024)); - self.reader_state = ReaderState::ConsumingStateStore { + async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> { + let epoch = match &mut self.reader_state { + ReaderState::Uninitialized => unreachable!("should be initialized"), + ReaderState::ConsumingStateStore { first_write_epoch, state_store_stream, - }; - Ok(()) - } - } - - fn next_item(&mut self) -> Self::NextItemFuture<'_> { - async move { - let epoch = match &mut self.reader_state { - ReaderState::Uninitialized => unreachable!("should be initialized"), - ReaderState::ConsumingStateStore { - first_write_epoch, - state_store_stream, - } => { - match state_store_stream.try_next().await? { - Some((epoch, item)) => { - return Ok((epoch, item)); - } - None => { - let first_write_epoch = *first_write_epoch; - // all consumed - self.reader_state = ReaderState::ConsumingStream { - epoch: first_write_epoch, - }; - first_write_epoch - } - } - } - ReaderState::ConsumingStream { epoch } => *epoch, - }; - loop { - let (item_epoch, item) = self.rx.next_item().await; - assert_eq!(epoch, item_epoch); - match item { - LogStoreBufferItem::StreamChunk { chunk, .. } => { - return Ok((epoch, LogStoreReadItem::StreamChunk(chunk))); - } - LogStoreBufferItem::Flushed { - vnode_bitmap, - start_seq_id, - end_seq_id, - } => { - let streams = try_join_all(vnode_bitmap.iter_vnodes().map(|vnode| { - let range_start = - self.serde - .serialize_log_store_pk(vnode, epoch, start_seq_id); - let range_end = - self.serde.serialize_log_store_pk(vnode, epoch, end_seq_id); - let state_store = self.state_store.clone(); - let table_id = self.table_id; - // Use u64::MAX here because the epoch to consume may be below the safe - // epoch - async move { - Ok::<_, LogStoreError>(Box::pin( - state_store - .iter( - (Included(range_start), Included(range_end)), - u64::MAX, - ReadOptions { - prefetch_options: - PrefetchOptions::new_for_exhaust_iter(), - cache_policy: CachePolicy::Fill(CachePriority::Low), - table_id, - ..Default::default() - }, - ) - .await?, - )) - } - })) - .await?; - let combined_stream = select_all(streams); - let stream_chunk = self - .serde - .deserialize_stream_chunk( - combined_stream, - start_seq_id, - end_seq_id, - epoch, - ) - .await?; - return Ok((epoch, LogStoreReadItem::StreamChunk(stream_chunk))); + } => { + match state_store_stream.try_next().await? { + Some((epoch, item)) => { + return Ok((epoch, item)); } - LogStoreBufferItem::Barrier { - is_checkpoint, - next_epoch, - } => { - assert!( - epoch < next_epoch, - "next epoch {} should be greater than current epoch {}", - next_epoch, - epoch - ); - self.reader_state = ReaderState::ConsumingStream { epoch: next_epoch }; - return Ok((epoch, LogStoreReadItem::Barrier { is_checkpoint })); - } - LogStoreBufferItem::UpdateVnodes(bitmap) => { - self.serde.update_vnode_bitmap(bitmap); - continue; + None => { + let first_write_epoch = *first_write_epoch; + // all consumed + self.reader_state = ReaderState::ConsumingStream { + epoch: first_write_epoch, + }; + first_write_epoch } } } - } + ReaderState::ConsumingStream { epoch } => *epoch, + }; + let (item_epoch, item) = self.rx.next_item().await; + assert_eq!(epoch, item_epoch); + Ok(match item { + LogStoreBufferItem::StreamChunk { chunk, .. } => { + (epoch, LogStoreReadItem::StreamChunk(chunk)) + } + LogStoreBufferItem::Flushed { + vnode_bitmap, + start_seq_id, + end_seq_id, + } => { + let streams = try_join_all(vnode_bitmap.iter_vnodes().map(|vnode| { + let range_start = self + .serde + .serialize_log_store_pk(vnode, epoch, start_seq_id); + let range_end = self.serde.serialize_log_store_pk(vnode, epoch, end_seq_id); + let state_store = self.state_store.clone(); + let table_id = self.table_id; + // Use u64::MAX here because the epoch to consume may be below the safe + // epoch + async move { + Ok::<_, LogStoreError>(Box::pin( + state_store + .iter( + (Included(range_start), Included(range_end)), + u64::MAX, + ReadOptions { + prefetch_options: PrefetchOptions::new_for_exhaust_iter(), + cache_policy: CachePolicy::Fill(CachePriority::Low), + table_id, + ..Default::default() + }, + ) + .await?, + )) + } + })) + .await?; + let combined_stream = select_all(streams); + let stream_chunk = self + .serde + .deserialize_stream_chunk(combined_stream, start_seq_id, end_seq_id, epoch) + .await?; + (epoch, LogStoreReadItem::StreamChunk(stream_chunk)) + } + LogStoreBufferItem::Barrier { + is_checkpoint, + next_epoch, + } => { + assert!( + epoch < next_epoch, + "next epoch {} should be greater than current epoch {}", + next_epoch, + epoch + ); + self.reader_state = ReaderState::ConsumingStream { epoch: next_epoch }; + (epoch, LogStoreReadItem::Barrier { is_checkpoint }) + } + LogStoreBufferItem::UpdateVnodes(bitmap) => { + self.serde.update_vnode_bitmap(bitmap.clone()); + (epoch, LogStoreReadItem::UpdateVnodeBitmap(bitmap)) + } + }) } - fn truncate(&mut self) -> Self::TruncateFuture<'_> { - async move { - self.rx.truncate(); - Ok(()) - } + #[expect(clippy::unused_async)] + async fn truncate(&mut self) -> LogStoreResult<()> { + self.rx.truncate(); + Ok(()) } } diff --git a/src/stream/src/common/log_store/kv_log_store/serde.rs b/src/stream/src/common/log_store/kv_log_store/serde.rs index 11b290ed700bc..472451e007475 100644 --- a/src/stream/src/common/log_store/kv_log_store/serde.rs +++ b/src/stream/src/common/log_store/kv_log_store/serde.rs @@ -944,7 +944,7 @@ mod tests { assert_eq!(row.to_owned_row(), rows[i]); } } - LogStoreReadItem::Barrier { .. } => unreachable!(), + _ => unreachable!(), } let (epoch, item): (_, LogStoreReadItem) = stream.try_next().await.unwrap().unwrap(); @@ -957,7 +957,7 @@ mod tests { assert_eq!(row.to_owned_row(), rows[i + CHUNK_SIZE]); } } - LogStoreReadItem::Barrier { .. } => unreachable!(), + _ => unreachable!(), } let (epoch, item): (_, LogStoreReadItem) = stream.try_next().await.unwrap().unwrap(); @@ -967,6 +967,7 @@ mod tests { LogStoreReadItem::Barrier { is_checkpoint } => { assert!(!is_checkpoint); } + _ => unreachable!(), } assert!(poll_fn(|cx| Poll::Ready(stream.poll_next_unpin(cx))) @@ -985,7 +986,7 @@ mod tests { assert_eq!(row.to_owned_row(), rows[i]); } } - LogStoreReadItem::Barrier { .. } => unreachable!(), + _ => unreachable!(), } let (epoch, item): (_, LogStoreReadItem) = stream.try_next().await.unwrap().unwrap(); @@ -998,7 +999,7 @@ mod tests { assert_eq!(row.to_owned_row(), rows[i + CHUNK_SIZE]); } } - LogStoreReadItem::Barrier { .. } => unreachable!(), + _ => unreachable!(), } let (epoch, item): (_, LogStoreReadItem) = stream.try_next().await.unwrap().unwrap(); @@ -1008,6 +1009,7 @@ mod tests { LogStoreReadItem::Barrier { is_checkpoint } => { assert!(is_checkpoint); } + _ => unreachable!(), } assert!(stream.next().await.is_none()); diff --git a/src/stream/src/common/log_store/kv_log_store/writer.rs b/src/stream/src/common/log_store/kv_log_store/writer.rs index aa624800f44a7..2b46177be3985 100644 --- a/src/stream/src/common/log_store/kv_log_store/writer.rs +++ b/src/stream/src/common/log_store/kv_log_store/writer.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::future::Future; use std::ops::Bound::{Excluded, Included}; use std::sync::Arc; @@ -59,93 +58,85 @@ impl KvLogStoreWriter { } impl LogWriter for KvLogStoreWriter { - type FlushCurrentEpoch<'a> = impl Future> + 'a; - type InitFuture<'a> = impl Future> + 'a; - type WriteChunkFuture<'a> = impl Future> + 'a; - - fn init(&mut self, epoch: EpochPair) -> Self::InitFuture<'_> { - async move { - self.state_store - .init(InitOptions::new_with_epoch(epoch)) - .await?; - self.seq_id = FIRST_SEQ_ID; - self.tx.init(epoch.curr); - Ok(()) - } + async fn init(&mut self, epoch: EpochPair) -> LogStoreResult<()> { + self.state_store + .init(InitOptions::new_with_epoch(epoch)) + .await?; + self.seq_id = FIRST_SEQ_ID; + self.tx.init(epoch.curr); + Ok(()) } - fn write_chunk(&mut self, chunk: StreamChunk) -> Self::WriteChunkFuture<'_> { - async move { - assert!(chunk.cardinality() > 0); - let epoch = self.state_store.epoch(); - let start_seq_id = self.seq_id; - self.seq_id += chunk.cardinality() as SeqIdType; - let end_seq_id = self.seq_id - 1; - if let Some(chunk) = - self.tx - .try_add_stream_chunk(epoch, chunk, start_seq_id, end_seq_id) - { - // When enter this branch, the chunk cannot be added directly, and should be add to - // state store and flush - let mut vnode_bitmap_builder = BitmapBuilder::zeroed(VirtualNode::COUNT); - for (i, (op, row)) in chunk.rows().enumerate() { - let seq_id = start_seq_id + (i as SeqIdType); - assert!(seq_id <= end_seq_id); - let (vnode, key, value) = self.serde.serialize_data_row(epoch, seq_id, op, row); - vnode_bitmap_builder.set(vnode.to_index(), true); - self.state_store.insert(key, value, None)?; - } - self.state_store.flush(Vec::new()).await?; - - let vnode_bitmap = vnode_bitmap_builder.finish(); - self.tx - .add_flushed(epoch, start_seq_id, end_seq_id, vnode_bitmap); + async fn write_chunk(&mut self, chunk: StreamChunk) -> LogStoreResult<()> { + assert!(chunk.cardinality() > 0); + let epoch = self.state_store.epoch(); + let start_seq_id = self.seq_id; + self.seq_id += chunk.cardinality() as SeqIdType; + let end_seq_id = self.seq_id - 1; + if let Some(chunk) = self + .tx + .try_add_stream_chunk(epoch, chunk, start_seq_id, end_seq_id) + { + // When enter this branch, the chunk cannot be added directly, and should be add to + // state store and flush + let mut vnode_bitmap_builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + for (i, (op, row)) in chunk.rows().enumerate() { + let seq_id = start_seq_id + (i as SeqIdType); + assert!(seq_id <= end_seq_id); + let (vnode, key, value) = self.serde.serialize_data_row(epoch, seq_id, op, row); + vnode_bitmap_builder.set(vnode.to_index(), true); + self.state_store.insert(key, value, None)?; } - Ok(()) + self.state_store.flush(Vec::new()).await?; + + let vnode_bitmap = vnode_bitmap_builder.finish(); + self.tx + .add_flushed(epoch, start_seq_id, end_seq_id, vnode_bitmap); } + Ok(()) } - fn flush_current_epoch( + async fn flush_current_epoch( &mut self, next_epoch: u64, is_checkpoint: bool, - ) -> Self::FlushCurrentEpoch<'_> { - async move { - let epoch = self.state_store.epoch(); - for vnode in self.serde.vnodes().iter_vnodes() { - let (key, value) = self.serde.serialize_barrier(epoch, vnode, is_checkpoint); - self.state_store.insert(key, value, None)?; - } - self.tx - .flush_all_unflushed(|chunk, epoch, start_seq_id, end_seq_id| { - for (i, (op, row)) in chunk.rows().enumerate() { - let seq_id = start_seq_id + (i as SeqIdType); - assert!(seq_id <= end_seq_id); - let (_, key, value) = self.serde.serialize_data_row(epoch, seq_id, op, row); - self.state_store.insert(key, value, None)?; - } - Ok(()) - })?; - let mut delete_range = Vec::with_capacity(self.serde.vnodes().count_ones()); - if let Some(truncation_offset) = self.tx.pop_truncation() { - for vnode in self.serde.vnodes().iter_vnodes() { - let range_begin = Bytes::from(vnode.to_be_bytes().to_vec()); - let range_end = self - .serde - .serialize_truncation_offset_watermark(vnode, truncation_offset); - delete_range.push((Included(range_begin), Excluded(range_end))); + ) -> LogStoreResult<()> { + let epoch = self.state_store.epoch(); + for vnode in self.serde.vnodes().iter_vnodes() { + let (key, value) = self.serde.serialize_barrier(epoch, vnode, is_checkpoint); + self.state_store.insert(key, value, None)?; + } + self.tx + .flush_all_unflushed(|chunk, epoch, start_seq_id, end_seq_id| { + for (i, (op, row)) in chunk.rows().enumerate() { + let seq_id = start_seq_id + (i as SeqIdType); + assert!(seq_id <= end_seq_id); + let (_, key, value) = self.serde.serialize_data_row(epoch, seq_id, op, row); + self.state_store.insert(key, value, None)?; } + Ok(()) + })?; + let mut delete_range = Vec::with_capacity(self.serde.vnodes().count_ones()); + if let Some(truncation_offset) = self.tx.pop_truncation() { + for vnode in self.serde.vnodes().iter_vnodes() { + let range_begin = Bytes::from(vnode.to_be_bytes().to_vec()); + let range_end = self + .serde + .serialize_truncation_offset_watermark(vnode, truncation_offset); + delete_range.push((Included(range_begin), Excluded(range_end))); } - self.state_store.flush(delete_range).await?; - self.state_store.seal_current_epoch(next_epoch); - self.tx.barrier(epoch, is_checkpoint, next_epoch); - self.seq_id = FIRST_SEQ_ID; - Ok(()) } + self.state_store.flush(delete_range).await?; + self.state_store.seal_current_epoch(next_epoch); + self.tx.barrier(epoch, is_checkpoint, next_epoch); + self.seq_id = FIRST_SEQ_ID; + Ok(()) } - fn update_vnode_bitmap(&mut self, new_vnodes: Arc) { + #[expect(clippy::unused_async)] + async fn update_vnode_bitmap(&mut self, new_vnodes: Arc) -> LogStoreResult<()> { self.serde.update_vnode_bitmap(new_vnodes.clone()); self.tx.update_vnode(self.state_store.epoch(), new_vnodes); + Ok(()) } } diff --git a/src/stream/src/common/log_store/mod.rs b/src/stream/src/common/log_store/mod.rs index dc517edb4a1d4..f343cfdfc8f03 100644 --- a/src/stream/src/common/log_store/mod.rs +++ b/src/stream/src/common/log_store/mod.rs @@ -46,63 +46,50 @@ pub type LogStoreResult = Result; pub enum LogStoreReadItem { StreamChunk(StreamChunk), Barrier { is_checkpoint: bool }, + UpdateVnodeBitmap(Arc), } pub trait LogWriter { - type InitFuture<'a>: Future> + Send + 'a - where - Self: 'a; - type WriteChunkFuture<'a>: Future> + Send + 'a - where - Self: 'a; - type FlushCurrentEpoch<'a>: Future> + Send + 'a - where - Self: 'a; - /// Initialize the log writer with an epoch - fn init(&mut self, epoch: EpochPair) -> Self::InitFuture<'_>; + fn init(&mut self, epoch: EpochPair) -> impl Future> + Send + '_; /// Write a stream chunk to the log writer - fn write_chunk(&mut self, chunk: StreamChunk) -> Self::WriteChunkFuture<'_>; + fn write_chunk( + &mut self, + chunk: StreamChunk, + ) -> impl Future> + Send + '_; /// Mark current epoch as finished and sealed, and flush the unconsumed log data. fn flush_current_epoch( &mut self, next_epoch: u64, is_checkpoint: bool, - ) -> Self::FlushCurrentEpoch<'_>; + ) -> impl Future> + Send + '_; /// Update the vnode bitmap of the log writer - fn update_vnode_bitmap(&mut self, new_vnodes: Arc); + fn update_vnode_bitmap( + &mut self, + new_vnodes: Arc, + ) -> impl Future> + Send + '_; } pub trait LogReader { - type InitFuture<'a>: Future> + Send + 'a - where - Self: 'a; - type NextItemFuture<'a>: Future> + Send + 'a - where - Self: 'a; - type TruncateFuture<'a>: Future> + Send + 'a - where - Self: 'a; - /// Initialize the log reader. Usually function as waiting for log writer to be initialized. - fn init(&mut self) -> Self::InitFuture<'_>; + fn init(&mut self) -> impl Future> + Send + '_; /// Emit the next item. - fn next_item(&mut self) -> Self::NextItemFuture<'_>; + fn next_item( + &mut self, + ) -> impl Future> + Send + '_; /// Mark that all items emitted so far have been consumed and it is safe to truncate the log /// from the current offset. - fn truncate(&mut self) -> Self::TruncateFuture<'_>; + fn truncate(&mut self) -> impl Future> + Send + '_; } pub trait LogStoreFactory: 'static { type Reader: LogReader + Send + 'static; type Writer: LogWriter + Send + 'static; - type BuildFuture: Future + Send; - - fn build(self) -> Self::BuildFuture; + fn build(self) -> impl Future + Send; } diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 97979b8527f04..375c3587a8650 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -206,6 +206,7 @@ impl SinkExecutor { .filter_map(|(idx, column)| (!column.is_hidden).then_some(idx)) .collect_vec(); + #[derive(Debug)] enum LogConsumerState { /// Mark that the log consumer is not initialized yet Uninitialized, @@ -221,6 +222,16 @@ impl SinkExecutor { loop { let (epoch, item): (u64, LogStoreReadItem) = log_reader.next_item().await?; + if let LogStoreReadItem::UpdateVnodeBitmap(_) = &item { + match &state { + LogConsumerState::BarrierReceived { .. } => {} + _ => unreachable!( + "update vnode bitmap can be accepted only right after \ + barrier, but current state is {:?}", + state + ), + } + } // begin_epoch when not previously began state = match state { LogConsumerState::Uninitialized => { @@ -278,6 +289,9 @@ impl SinkExecutor { }; state = LogConsumerState::BarrierReceived { prev_epoch } } + LogStoreReadItem::UpdateVnodeBitmap(vnode_bitmap) => { + sink_writer.update_vnode_bitmap(vnode_bitmap).await?; + } } } } diff --git a/src/stream/src/lib.rs b/src/stream/src/lib.rs index 7636252d2f066..e57a87d059710 100644 --- a/src/stream/src/lib.rs +++ b/src/stream/src/lib.rs @@ -45,6 +45,7 @@ #![feature(is_sorted)] #![feature(btree_cursors)] #![feature(assert_matches)] +#![feature(async_fn_in_trait)] #[macro_use] extern crate tracing; diff --git a/src/tests/simulation/tests/integration_tests/sink/basic.rs b/src/tests/simulation/tests/integration_tests/sink/basic.rs index d15e3a68ea2b6..a12bc3643b542 100644 --- a/src/tests/simulation/tests/integration_tests/sink/basic.rs +++ b/src/tests/simulation/tests/integration_tests/sink/basic.rs @@ -57,17 +57,6 @@ impl SinkWriter for TestWriter { sleep(Duration::from_millis(100)).await; Ok(()) } - - async fn abort(&mut self) -> risingwave_connector::sink::Result<()> { - Ok(()) - } - - async fn update_vnode_bitmap( - &mut self, - _vnode_bitmap: Bitmap, - ) -> risingwave_connector::sink::Result<()> { - Ok(()) - } } impl Drop for TestWriter {