From 2a7ef0d808f17a0e82ee72e6376e7f77c2ba07aa Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Mon, 9 Oct 2023 14:32:32 +0800 Subject: [PATCH] fix(log-store): handle unaligned epoch after recovery (#12407) --- .../common/log_store_impl/kv_log_store/mod.rs | 248 +++++++++-- .../log_store_impl/kv_log_store/reader.rs | 6 +- .../log_store_impl/kv_log_store/serde.rs | 392 ++++++++++++------ .../log_store_impl/kv_log_store/test_utils.rs | 160 +++++-- src/stream/src/common/table/test_utils.rs | 43 ++ 5 files changed, 660 insertions(+), 189 deletions(-) diff --git a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs index a84850b04f069..c93cd2e36eae4 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs @@ -101,6 +101,10 @@ impl LogStoreFactory for KvLogStoreFactory { #[cfg(test)] mod tests { + use std::sync::Arc; + + use risingwave_common::buffer::{Bitmap, BitmapBuilder}; + use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::EpochPair; use risingwave_connector::sink::log_store::{ LogReader, LogStoreFactory, LogStoreReadItem, LogWriter, TruncateOffset, @@ -111,7 +115,8 @@ mod tests { use risingwave_storage::StateStore; use crate::common::log_store_impl::kv_log_store::test_utils::{ - gen_stream_chunk, gen_test_log_store_table, + calculate_vnode_bitmap, check_rows_eq, check_stream_chunk_eq, + gen_multi_vnode_stream_chunks, gen_stream_chunk, gen_test_log_store_table, }; use crate::common::log_store_impl::kv_log_store::KvLogStoreFactory; @@ -129,17 +134,18 @@ mod tests { test_env.register_table(table.clone()).await; + let stream_chunk1 = gen_stream_chunk(0); + let stream_chunk2 = gen_stream_chunk(10); + let bitmap = calculate_vnode_bitmap(stream_chunk1.rows().chain(stream_chunk2.rows())); + let factory = KvLogStoreFactory::new( test_env.storage.clone(), table.clone(), - None, + Some(Arc::new(bitmap)), max_stream_chunk_count, ); let (mut reader, mut writer) = factory.build().await; - let stream_chunk1 = gen_stream_chunk(0); - let stream_chunk2 = gen_stream_chunk(10); - let epoch1 = test_env .storage .get_pinned_version() @@ -172,7 +178,7 @@ mod tests { }, ) => { assert_eq!(epoch, epoch1); - assert_eq!(stream_chunk1, read_stream_chunk); + assert!(check_stream_chunk_eq(&stream_chunk1, &read_stream_chunk)); } _ => unreachable!(), } @@ -192,7 +198,7 @@ mod tests { }, ) => { assert_eq!(epoch, epoch2); - assert_eq!(stream_chunk2, read_stream_chunk); + assert!(check_stream_chunk_eq(&stream_chunk2, &read_stream_chunk)); } _ => unreachable!(), } @@ -219,17 +225,19 @@ mod tests { test_env.register_table(table.clone()).await; + let stream_chunk1 = gen_stream_chunk(0); + let stream_chunk2 = gen_stream_chunk(10); + let bitmap = calculate_vnode_bitmap(stream_chunk1.rows().chain(stream_chunk2.rows())); + let bitmap = Arc::new(bitmap); + let factory = KvLogStoreFactory::new( test_env.storage.clone(), table.clone(), - None, + Some(bitmap.clone()), max_stream_chunk_count, ); let (mut reader, mut writer) = factory.build().await; - let stream_chunk1 = gen_stream_chunk(0); - let stream_chunk2 = gen_stream_chunk(10); - let epoch1 = test_env .storage .get_pinned_version() @@ -259,7 +267,7 @@ mod tests { }, ) => { assert_eq!(epoch, epoch1); - assert_eq!(stream_chunk1, read_stream_chunk); + assert!(check_stream_chunk_eq(&stream_chunk1, &read_stream_chunk)); } _ => unreachable!(), } @@ -279,7 +287,7 @@ mod tests { }, ) => { assert_eq!(epoch, epoch2); - assert_eq!(stream_chunk2, read_stream_chunk); + assert!(check_stream_chunk_eq(&stream_chunk2, &read_stream_chunk)); } _ => unreachable!(), } @@ -310,7 +318,7 @@ mod tests { let factory = KvLogStoreFactory::new( test_env.storage.clone(), table.clone(), - None, + Some(bitmap), max_stream_chunk_count, ); let (mut reader, mut writer) = factory.build().await; @@ -328,7 +336,7 @@ mod tests { }, ) => { assert_eq!(epoch, epoch1); - assert_eq!(stream_chunk1, read_stream_chunk); + assert!(check_stream_chunk_eq(&stream_chunk1, &read_stream_chunk)); } _ => unreachable!(), } @@ -348,7 +356,7 @@ mod tests { }, ) => { assert_eq!(epoch, epoch2); - assert_eq!(stream_chunk2, read_stream_chunk); + assert!(check_stream_chunk_eq(&stream_chunk2, &read_stream_chunk)); } _ => unreachable!(), } @@ -375,18 +383,27 @@ mod tests { test_env.register_table(table.clone()).await; + let stream_chunk1_1 = gen_stream_chunk(0); + let stream_chunk1_2 = gen_stream_chunk(10); + let stream_chunk2 = gen_stream_chunk(20); + let stream_chunk3 = gen_stream_chunk(20); + let bitmap = calculate_vnode_bitmap( + stream_chunk1_1 + .rows() + .chain(stream_chunk1_2.rows()) + .chain(stream_chunk2.rows()) + .chain(stream_chunk3.rows()), + ); + let bitmap = Arc::new(bitmap); + let factory = KvLogStoreFactory::new( test_env.storage.clone(), table.clone(), - None, + Some(bitmap.clone()), max_stream_chunk_count, ); let (mut reader, mut writer) = factory.build().await; - let stream_chunk1_1 = gen_stream_chunk(0); - let stream_chunk1_2 = gen_stream_chunk(10); - let stream_chunk2 = gen_stream_chunk(20); - let epoch1 = test_env .storage .get_pinned_version() @@ -415,7 +432,7 @@ mod tests { }, ) => { assert_eq!(epoch, epoch1); - assert_eq!(stream_chunk1_1, read_stream_chunk); + assert!(check_stream_chunk_eq(&stream_chunk1_1, &read_stream_chunk)); chunk_id } _ => unreachable!(), @@ -429,7 +446,7 @@ mod tests { }, ) => { assert_eq!(epoch, epoch1); - assert_eq!(stream_chunk1_2, read_stream_chunk); + assert!(check_stream_chunk_eq(&stream_chunk1_2, &read_stream_chunk)); chunk_id } _ => unreachable!(), @@ -452,7 +469,7 @@ mod tests { }, ) => { assert_eq!(epoch, epoch2); - assert_eq!(stream_chunk2, read_stream_chunk); + assert!(check_stream_chunk_eq(&stream_chunk2, &read_stream_chunk)); } _ => unreachable!(), } @@ -491,7 +508,7 @@ mod tests { let factory = KvLogStoreFactory::new( test_env.storage.clone(), table.clone(), - None, + Some(bitmap), max_stream_chunk_count, ); let (mut reader, mut writer) = factory.build().await; @@ -500,7 +517,7 @@ mod tests { .init(EpochPair::new_test_epoch(epoch3)) .await .unwrap(); - let stream_chunk3 = gen_stream_chunk(30); + writer.write_chunk(stream_chunk3.clone()).await.unwrap(); reader.init().await.unwrap(); @@ -513,7 +530,7 @@ mod tests { }, ) => { assert_eq!(epoch, epoch1); - assert_eq!(stream_chunk1_2, read_stream_chunk); + assert!(check_stream_chunk_eq(&stream_chunk1_2, &read_stream_chunk)); } _ => unreachable!(), } @@ -533,7 +550,7 @@ mod tests { }, ) => { assert_eq!(epoch, epoch2); - assert_eq!(stream_chunk2, read_stream_chunk); + assert!(check_stream_chunk_eq(&stream_chunk2, &read_stream_chunk)); } _ => unreachable!(), } @@ -553,7 +570,180 @@ mod tests { }, ) => { assert_eq!(epoch, epoch3); - assert_eq!(stream_chunk3, read_stream_chunk); + assert!(check_stream_chunk_eq(&stream_chunk3, &read_stream_chunk)); + } + _ => unreachable!(), + } + } + + #[tokio::test] + async fn test_update_vnode_recover() { + let test_env = prepare_hummock_test_env().await; + + let table = gen_test_log_store_table(); + + test_env.register_table(table.clone()).await; + + fn build_bitmap(indexes: impl Iterator) -> Arc { + let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + for i in indexes { + builder.set(i, true); + } + Arc::new(builder.finish()) + } + + let vnodes1 = build_bitmap((0..VirtualNode::COUNT).filter(|i| i % 2 == 0)); + let vnodes2 = build_bitmap((0..VirtualNode::COUNT).filter(|i| i % 2 == 1)); + + let factory1 = + KvLogStoreFactory::new(test_env.storage.clone(), table.clone(), Some(vnodes1), 10); + let factory2 = + KvLogStoreFactory::new(test_env.storage.clone(), table.clone(), Some(vnodes2), 10); + let (mut reader1, mut writer1) = factory1.build().await; + let (mut reader2, mut writer2) = factory2.build().await; + + let epoch1 = test_env + .storage + .get_pinned_version() + .version() + .max_committed_epoch + + 1; + writer1 + .init(EpochPair::new_test_epoch(epoch1)) + .await + .unwrap(); + writer2 + .init(EpochPair::new_test_epoch(epoch1)) + .await + .unwrap(); + reader1.init().await.unwrap(); + reader2.init().await.unwrap(); + let [chunk1_1, chunk1_2] = gen_multi_vnode_stream_chunks::<2>(0, 100); + writer1.write_chunk(chunk1_1.clone()).await.unwrap(); + writer2.write_chunk(chunk1_2.clone()).await.unwrap(); + let epoch2 = epoch1 + 1; + writer1.flush_current_epoch(epoch2, false).await.unwrap(); + writer2.flush_current_epoch(epoch2, false).await.unwrap(); + let [chunk2_1, chunk2_2] = gen_multi_vnode_stream_chunks::<2>(200, 100); + writer1.write_chunk(chunk2_1.clone()).await.unwrap(); + writer2.write_chunk(chunk2_2.clone()).await.unwrap(); + + match reader1.next_item().await.unwrap() { + (epoch, LogStoreReadItem::StreamChunk { chunk, .. }) => { + assert_eq!(epoch, epoch1); + assert!(check_stream_chunk_eq(&chunk1_1, &chunk)); + } + _ => unreachable!(), + }; + match reader1.next_item().await.unwrap() { + (epoch, LogStoreReadItem::Barrier { is_checkpoint }) => { + assert_eq!(epoch, epoch1); + assert!(!is_checkpoint); + } + _ => unreachable!(), + } + + match reader2.next_item().await.unwrap() { + (epoch, LogStoreReadItem::StreamChunk { chunk, .. }) => { + assert_eq!(epoch, epoch1); + assert!(check_stream_chunk_eq(&chunk1_2, &chunk)); + } + _ => unreachable!(), + } + match reader2.next_item().await.unwrap() { + (epoch, LogStoreReadItem::Barrier { is_checkpoint }) => { + assert_eq!(epoch, epoch1); + assert!(!is_checkpoint); + } + _ => unreachable!(), + } + + // Only reader1 will truncate + reader1 + .truncate(TruncateOffset::Barrier { epoch: epoch1 }) + .await + .unwrap(); + + match reader1.next_item().await.unwrap() { + (epoch, LogStoreReadItem::StreamChunk { chunk, .. }) => { + assert_eq!(epoch, epoch2); + assert!(check_stream_chunk_eq(&chunk2_1, &chunk)); + } + _ => unreachable!(), + } + match reader2.next_item().await.unwrap() { + (epoch, LogStoreReadItem::StreamChunk { chunk, .. }) => { + assert_eq!(epoch, epoch2); + assert!(check_stream_chunk_eq(&chunk2_2, &chunk)); + } + _ => unreachable!(), + } + + let epoch3 = epoch2 + 1; + writer1.flush_current_epoch(epoch3, true).await.unwrap(); + writer2.flush_current_epoch(epoch3, true).await.unwrap(); + + match reader1.next_item().await.unwrap() { + (epoch, LogStoreReadItem::Barrier { is_checkpoint }) => { + assert_eq!(epoch, epoch2); + assert!(is_checkpoint); + } + _ => unreachable!(), + } + match reader2.next_item().await.unwrap() { + (epoch, LogStoreReadItem::Barrier { is_checkpoint }) => { + assert_eq!(epoch, epoch2); + assert!(is_checkpoint); + } + _ => unreachable!(), + } + + // Truncation of reader1 on epoch1 should work because it is before this sync + test_env.storage.seal_epoch(epoch1, false); + test_env.commit_epoch(epoch2).await; + test_env + .storage + .try_wait_epoch(HummockReadEpoch::Committed(epoch2)) + .await + .unwrap(); + + // Recovery + test_env.storage.clear_shared_buffer().await.unwrap(); + + let vnodes = build_bitmap(0..VirtualNode::COUNT); + let factory = + KvLogStoreFactory::new(test_env.storage.clone(), table.clone(), Some(vnodes), 10); + let (mut reader, mut writer) = factory.build().await; + writer.init(EpochPair::new(epoch3, epoch2)).await.unwrap(); + reader.init().await.unwrap(); + match reader.next_item().await.unwrap() { + (epoch, LogStoreReadItem::StreamChunk { chunk, .. }) => { + assert_eq!(epoch, epoch1); + assert!(check_stream_chunk_eq(&chunk1_2, &chunk)); + } + _ => unreachable!(), + } + match reader.next_item().await.unwrap() { + (epoch, LogStoreReadItem::Barrier { is_checkpoint }) => { + assert_eq!(epoch, epoch1); + assert!(!is_checkpoint); + } + _ => unreachable!(), + } + match reader.next_item().await.unwrap() { + (epoch, LogStoreReadItem::StreamChunk { chunk, .. }) => { + assert_eq!(epoch, epoch2); + assert!(check_rows_eq( + chunk2_1.rows().chain(chunk2_2.rows()), + chunk.rows() + )); + } + _ => unreachable!(), + } + match reader.next_item().await.unwrap() { + (epoch, LogStoreReadItem::Barrier { is_checkpoint }) => { + assert_eq!(epoch, epoch2); + assert!(is_checkpoint); } _ => unreachable!(), } diff --git a/src/stream/src/common/log_store_impl/kv_log_store/reader.rs b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs index 4336c3d961626..f8dfa5850af9a 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/reader.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs @@ -35,7 +35,7 @@ use crate::common::log_store_impl::kv_log_store::buffer::{ LogStoreBufferItem, LogStoreBufferReceiver, }; use crate::common::log_store_impl::kv_log_store::serde::{ - new_log_store_item_stream, KvLogStoreItem, LogStoreItemStream, LogStoreRowSerde, + merge_log_store_item_stream, KvLogStoreItem, LogStoreItemMergeStream, LogStoreRowSerde, }; pub struct KvLogStoreReader { @@ -51,7 +51,7 @@ pub struct KvLogStoreReader { first_write_epoch: Option, /// `Some` means consuming historical log data - state_store_stream: Option>>>, + state_store_stream: Option>>>, latest_offset: TruncateOffset, @@ -108,7 +108,7 @@ impl LogReader for KvLogStoreReader { "should not init twice" ); // TODO: set chunk size by config - self.state_store_stream = Some(Box::pin(new_log_store_item_stream( + self.state_store_stream = Some(Box::pin(merge_log_store_item_stream( streams, self.serde.clone(), 1024, diff --git a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs index 2aa01c42af196..a875206fa29e6 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use anyhow::anyhow; use bytes::Bytes; -use futures::stream::{FuturesUnordered, StreamFuture}; +use futures::stream::{FuturesUnordered, Peekable, StreamFuture}; use futures::{pin_mut, Stream, StreamExt, TryStreamExt}; use futures_async_stream::try_stream; use itertools::Itertools; @@ -41,6 +41,7 @@ use risingwave_common::util::value_encoding::{ use risingwave_connector::sink::log_store::LogStoreResult; use risingwave_hummock_sdk::key::{next_key, TableKey}; use risingwave_pb::catalog::Table; +use risingwave_storage::error::StorageError; use risingwave_storage::row_serde::row_serde_util::serialize_pk_with_vnode; use risingwave_storage::row_serde::value_serde::ValueRowSerdeNew; use risingwave_storage::store::StateStoreReadIterStream; @@ -371,14 +372,18 @@ pub(crate) enum KvLogStoreItem { Barrier { is_checkpoint: bool }, } +type BoxPeekableLogStoreItemStream = Pin>>>; + struct LogStoreRowOpStream { serde: LogStoreRowSerde, /// Streams that have not reached a barrier - row_streams: FuturesUnordered>>>, + row_streams: FuturesUnordered>>, /// Streams that have reached a barrier - barrier_streams: Vec>>, + barrier_streams: Vec>, + + not_started_streams: Vec<(u64, BoxPeekableLogStoreItemStream)>, stream_state: StreamState, } @@ -387,46 +392,17 @@ impl LogStoreRowOpStream { pub(crate) fn new(streams: Vec, serde: LogStoreRowSerde) -> Self { assert!(!streams.is_empty()); Self { - serde, - barrier_streams: Vec::with_capacity(streams.len()), - row_streams: streams + serde: serde.clone(), + barrier_streams: streams .into_iter() - .map(|s| Box::pin(s).into_future()) + .map(|s| Box::pin(deserialize_stream(s, serde.clone()).peekable())) .collect(), + row_streams: FuturesUnordered::new(), + not_started_streams: Vec::new(), stream_state: StreamState::Uninitialized, } } - fn check_epoch(&self, epoch: u64) -> LogStoreResult<()> { - match &self.stream_state { - StreamState::Uninitialized => Ok(()), - StreamState::AllConsumingRow { curr_epoch } - | StreamState::BarrierAligning { curr_epoch, .. } => { - if *curr_epoch != epoch { - Err(anyhow!( - "epoch {} does not match with current epoch {}", - epoch, - curr_epoch - )) - } else { - Ok(()) - } - } - - StreamState::BarrierEmitted { prev_epoch } => { - if *prev_epoch >= epoch { - Err(anyhow!( - "epoch {} should be greater than prev epoch {}", - epoch, - prev_epoch - )) - } else { - Ok(()) - } - } - } - } - fn check_is_checkpoint(&self, is_checkpoint: bool) -> LogStoreResult<()> { if let StreamState::BarrierAligning { is_checkpoint: curr_is_checkpoint, @@ -448,11 +424,16 @@ impl LogStoreRowOpStream { } #[try_stream(ok = (u64, KvLogStoreItem), error = anyhow::Error)] - async fn into_log_store_item_stream(self, chunk_size: usize) { + async fn into_log_store_item_stream(mut self, chunk_size: usize) { let mut ops = Vec::with_capacity(chunk_size); let mut data_chunk_builder = DataChunkBuilder::new(self.serde.payload_schema.clone(), chunk_size); + if !self.init().await? { + // no data in all stream + return Ok(()); + } + let this = self; pin_mut!(this); @@ -483,37 +464,145 @@ impl LogStoreRowOpStream { } } -pub(crate) type LogStoreItemStream = impl Stream>; -pub(crate) fn new_log_store_item_stream( +pub(crate) type LogStoreItemMergeStream = + impl Stream>; +pub(crate) fn merge_log_store_item_stream( streams: Vec, serde: LogStoreRowSerde, chunk_size: usize, -) -> LogStoreItemStream { +) -> LogStoreItemMergeStream { LogStoreRowOpStream::new(streams, serde).into_log_store_item_stream(chunk_size) } +type LogStoreItemStream = + impl Stream> + Send; +fn deserialize_stream( + stream: S, + serde: LogStoreRowSerde, +) -> LogStoreItemStream { + stream.map( + move |result: Result<_, StorageError>| -> LogStoreResult<(u64, LogStoreRowOp)> { + match result { + Ok((_key, value)) => serde.deserialize(value), + Err(e) => Err(e.into()), + } + }, + ) +} + impl LogStoreRowOpStream { + // Return Ok(false) means all streams have reach the end. + async fn init(&mut self) -> LogStoreResult { + match &self.stream_state { + StreamState::Uninitialized => {} + _ => unreachable!("cannot call init for twice"), + }; + + // before init, all streams are in `barrier_streams` + assert!( + self.row_streams.is_empty(), + "when uninitialized, row_streams should be empty" + ); + assert!(self.not_started_streams.is_empty()); + assert!(!self.barrier_streams.is_empty()); + + for mut stream in self.barrier_streams.drain(..) { + match stream.as_mut().peek().await { + Some(Ok((epoch, _))) => { + self.not_started_streams.push((*epoch, stream)); + } + Some(Err(_)) => match stream.next().await { + Some(Err(e)) => { + return Err(e); + } + _ => unreachable!("on peek we have checked it's Some(Err(_))"), + }, + None => { + continue; + } + } + } + + if self.not_started_streams.is_empty() { + // No stream has data + return Ok(false); + } + + // sorted by epoch descending. Earlier epoch at the end + self.not_started_streams + .sort_by_key(|(epoch, _)| u64::MAX - *epoch); + + let (epoch, stream) = self + .not_started_streams + .pop() + .expect("have check non-empty"); + self.row_streams.push(stream.into_future()); + while let Some((stream_epoch, _)) = self.not_started_streams.last() && *stream_epoch == epoch { + let (_, stream) = self.not_started_streams.pop().expect("should not be empty"); + self.row_streams.push(stream.into_future()); + } + self.stream_state = StreamState::AllConsumingRow { curr_epoch: epoch }; + Ok(true) + } + + fn may_init_epoch(&mut self, epoch: u64) -> LogStoreResult<()> { + let prev_epoch = match &self.stream_state { + StreamState::Uninitialized => unreachable!("should have init"), + StreamState::BarrierEmitted { prev_epoch } => *prev_epoch, + StreamState::AllConsumingRow { curr_epoch } + | StreamState::BarrierAligning { curr_epoch, .. } => { + return if *curr_epoch != epoch { + Err(anyhow!( + "epoch {} does not match with current epoch {}", + epoch, + curr_epoch + )) + } else { + Ok(()) + }; + } + }; + + if prev_epoch >= epoch { + return Err(anyhow!( + "epoch {} should be greater than prev epoch {}", + epoch, + prev_epoch + )); + } + + while let Some((stream_epoch, _)) = self.not_started_streams.last() { + if *stream_epoch > epoch { + // Current epoch has not reached the first epoch of + // the stream. Later streams must also have greater epoch, so break here. + break; + } + if *stream_epoch < epoch { + return Err(anyhow!( + "current epoch {} has exceed epoch {} of stream not started", + epoch, + stream_epoch + )); + } + let (_, stream) = self.not_started_streams.pop().expect("should not be empty"); + self.row_streams.push(stream.into_future()); + } + + self.stream_state = StreamState::AllConsumingRow { curr_epoch: epoch }; + Ok(()) + } + async fn next_op(&mut self) -> LogStoreResult> { - assert!(!self.row_streams.is_empty()); while let (Some(result), stream) = self .row_streams .next() .await .expect("row stream should not be empty when polled") { - let (_key, value): (_, Bytes) = result?; - let (decoded_epoch, op) = self.serde.deserialize(value)?; - self.check_epoch(decoded_epoch)?; + let (decoded_epoch, op) = result?; + self.may_init_epoch(decoded_epoch)?; match op { LogStoreRowOp::Row { op, row } => { - match &self.stream_state { - StreamState::Uninitialized | StreamState::BarrierEmitted { .. } => { - self.stream_state = StreamState::AllConsumingRow { - curr_epoch: decoded_epoch, - } - } - _ => {} - }; self.row_streams.push(stream.into_future()); return Ok(Some((decoded_epoch, LogStoreRowOp::Row { op, row }))); } @@ -545,19 +634,23 @@ impl LogStoreRowOpStream { } // End of stream match &self.stream_state { - StreamState::BarrierEmitted { .. } | StreamState::Uninitialized => {} - s => { - return Err(anyhow!( - "when any of the stream reaches the end, it should be right \ - after emitting an barrier. Current state: {:?}", + StreamState::BarrierEmitted { .. } => {}, + s => return Err( + anyhow!( + "when any of the stream reaches the end, it should be right after emitting an barrier. Current state: {:?}", s - )); - } + ) + ), } assert!( self.barrier_streams.is_empty(), "should not have any pending barrier received stream after barrier emit" ); + if !self.not_started_streams.is_empty() { + return Err(anyhow!( + "a stream has reached the end but some other stream has not started yet" + )); + } if cfg!(debug_assertion) { while let Some((opt, _stream)) = self.row_streams.next().await { if let Some(result) = opt { @@ -573,15 +666,20 @@ impl LogStoreRowOpStream { #[cfg(test)] mod tests { + use std::cmp::min; use std::future::poll_fn; + use std::sync::Arc; use std::task::Poll; + use bytes::Bytes; use futures::stream::empty; use futures::{pin_mut, stream, StreamExt, TryStreamExt}; use itertools::Itertools; use rand::prelude::SliceRandom; use rand::thread_rng; use risingwave_common::array::{Op, StreamChunk}; + use risingwave_common::buffer::Bitmap; + use risingwave_common::hash::VirtualNode; use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::types::DataType; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; @@ -592,22 +690,23 @@ mod tests { use tokio::sync::oneshot::Sender; use crate::common::log_store_impl::kv_log_store::serde::{ - new_log_store_item_stream, KvLogStoreItem, LogStoreRowOp, LogStoreRowOpStream, + merge_log_store_item_stream, KvLogStoreItem, LogStoreRowOp, LogStoreRowOpStream, LogStoreRowSerde, }; use crate::common::log_store_impl::kv_log_store::test_utils::{ - gen_test_data, gen_test_log_store_table, TEST_TABLE_ID, + check_rows_eq, gen_test_data, gen_test_log_store_table, TEST_TABLE_ID, }; use crate::common::log_store_impl::kv_log_store::SeqIdType; - const EPOCH1: u64 = 233; + const EPOCH0: u64 = 233; + const EPOCH1: u64 = EPOCH0 + 1; const EPOCH2: u64 = EPOCH1 + 1; #[test] fn test_serde() { let table = gen_test_log_store_table(); - let serde = LogStoreRowSerde::new(&table, None); + let serde = LogStoreRowSerde::new(&table, Some(Arc::new(Bitmap::ones(VirtualNode::COUNT)))); let (ops, rows) = gen_test_data(0); @@ -623,12 +722,17 @@ mod tests { let mut serialized_keys = vec![]; let mut seq_id = 1; - let delete_range_right1 = - serde.serialize_truncation_offset_watermark(DEFAULT_VNODE, (epoch, None)); + fn remove_vnode_prefix(key: &Bytes) -> Bytes { + key.slice(VirtualNode::SIZE..) + } + let delete_range_right1 = remove_vnode_prefix( + &serde.serialize_truncation_offset_watermark(DEFAULT_VNODE, (epoch, None)), + ); for (op, row) in stream_chunk.rows() { let (_, key, value) = serde.serialize_data_row(epoch, seq_id, op, row); - assert!(key.as_ref() < delete_range_right1); + let key = remove_vnode_prefix(&key.0); + assert!(key < delete_range_right1); serialized_keys.push(key); let (decoded_epoch, row_op) = serde.deserialize(value).unwrap(); assert_eq!(decoded_epoch, epoch); @@ -646,6 +750,7 @@ mod tests { } let (key, encoded_barrier) = serde.serialize_barrier(epoch, DEFAULT_VNODE, false); + let key = remove_vnode_prefix(&key.0); match serde.deserialize(encoded_barrier).unwrap() { (decoded_epoch, LogStoreRowOp::Barrier { is_checkpoint }) => { assert!(!is_checkpoint); @@ -659,13 +764,15 @@ mod tests { seq_id = 1; epoch += 1; - let delete_range_right2 = - serde.serialize_truncation_offset_watermark(DEFAULT_VNODE, (epoch, None)); + let delete_range_right2 = remove_vnode_prefix( + &serde.serialize_truncation_offset_watermark(DEFAULT_VNODE, (epoch, None)), + ); for (op, row) in stream_chunk.rows() { let (_, key, value) = serde.serialize_data_row(epoch, seq_id, op, row); - assert!(key.as_ref() >= delete_range_right1); - assert!(key.as_ref() < delete_range_right2); + let key = remove_vnode_prefix(&key.0); + assert!(key >= delete_range_right1); + assert!(key < delete_range_right2); serialized_keys.push(key); let (decoded_epoch, row_op) = serde.deserialize(value).unwrap(); assert_eq!(decoded_epoch, epoch); @@ -683,6 +790,7 @@ mod tests { } let (key, encoded_checkpoint_barrier) = serde.serialize_barrier(epoch, DEFAULT_VNODE, true); + let key = remove_vnode_prefix(&key.0); match serde.deserialize(encoded_checkpoint_barrier).unwrap() { (decoded_epoch, LogStoreRowOp::Barrier { is_checkpoint }) => { assert_eq!(decoded_epoch, epoch); @@ -728,8 +836,7 @@ mod tests { #[tokio::test] async fn test_deserialize_stream_chunk() { let table = gen_test_log_store_table(); - let serde = LogStoreRowSerde::new(&table, None); - + let serde = LogStoreRowSerde::new(&table, Some(Arc::new(Bitmap::ones(VirtualNode::COUNT)))); let (ops, rows) = gen_test_data(0); let mut seq_id = 1; @@ -789,25 +896,34 @@ mod tests { impl StateStoreReadIterStream, oneshot::Sender<()>, oneshot::Sender<()>, + Vec, + Vec, ) { let (ops, rows) = gen_test_data(base); + let first_barrier = { + let (key, value) = serde.serialize_barrier(EPOCH0, DEFAULT_VNODE, true); + Ok((FullKey::new(TEST_TABLE_ID, key, EPOCH0), value)) + }; + let stream = stream::once(async move { first_barrier }); let (row_stream, tx1) = gen_row_stream(serde.clone(), ops.clone(), rows.clone(), EPOCH1, seq_id); - let stream = row_stream.chain(stream::once({ + let stream = stream.chain(row_stream); + let stream = stream.chain(stream::once({ let serde = serde.clone(); async move { let (key, value) = serde.serialize_barrier(EPOCH1, DEFAULT_VNODE, false); Ok((FullKey::new(TEST_TABLE_ID, key, EPOCH1), value)) } })); - let (row_stream, tx2) = gen_row_stream(serde.clone(), ops, rows, EPOCH2, seq_id); + let (row_stream, tx2) = + gen_row_stream(serde.clone(), ops.clone(), rows.clone(), EPOCH2, seq_id); let stream = stream.chain(row_stream).chain(stream::once({ async move { let (key, value) = serde.serialize_barrier(EPOCH2, DEFAULT_VNODE, true); Ok((FullKey::new(TEST_TABLE_ID, key, EPOCH2), value)) } })); - (stream, tx1, tx2) + (stream, tx1, tx2, ops, rows) } #[allow(clippy::type_complexity)] @@ -825,18 +941,20 @@ mod tests { let mut streams = Vec::new(); let mut tx1 = Vec::new(); let mut tx2 = Vec::new(); + let mut ops = Vec::new(); + let mut rows = Vec::new(); for i in 0..size { - let (s, t1, t2) = gen_single_test_stream(serde.clone(), &mut seq_id, (100 * i) as _); + let (s, t1, t2, op_list, row_list) = + gen_single_test_stream(serde.clone(), &mut seq_id, (100 * i) as _); streams.push(s); tx1.push(Some(t1)); tx2.push(Some(t2)); + ops.push(op_list); + rows.push(row_list); } let stream = LogStoreRowOpStream::new(streams, serde); - let mut ops = Vec::new(); - let mut rows = Vec::new(); - for i in 0..size { let (o, r) = gen_test_data((100 * i) as _); ops.push(o); @@ -850,14 +968,26 @@ mod tests { async fn test_row_stream_basic() { let table = gen_test_log_store_table(); - let serde = LogStoreRowSerde::new(&table, None); + let serde = LogStoreRowSerde::new(&table, Some(Arc::new(Bitmap::ones(VirtualNode::COUNT)))); const MERGE_SIZE: usize = 10; - let (stream, mut tx1, mut tx2, ops, rows) = gen_multi_test_stream(serde, MERGE_SIZE); + let (mut stream, mut tx1, mut tx2, ops, rows) = gen_multi_test_stream(serde, MERGE_SIZE); + + stream.init().await.unwrap(); pin_mut!(stream); + assert_eq!( + ( + EPOCH0, + LogStoreRowOp::Barrier { + is_checkpoint: true + } + ), + stream.next_op().await.unwrap().unwrap() + ); + let mut index = (0..MERGE_SIZE).collect_vec(); index.shuffle(&mut thread_rng()); @@ -923,48 +1053,51 @@ mod tests { async fn test_log_store_stream_basic() { let table = gen_test_log_store_table(); - let serde = LogStoreRowSerde::new(&table, None); + let serde = LogStoreRowSerde::new(&table, Some(Arc::new(Bitmap::ones(VirtualNode::COUNT)))); let mut seq_id = 1; - let (stream, tx1, tx2) = gen_single_test_stream(serde.clone(), &mut seq_id, 0); - let (ops, rows) = gen_test_data(0); + let (stream, tx1, tx2, ops, rows) = gen_single_test_stream(serde.clone(), &mut seq_id, 0); const CHUNK_SIZE: usize = 3; - let stream = new_log_store_item_stream(vec![stream], serde, CHUNK_SIZE); + let stream = merge_log_store_item_stream(vec![stream], serde, CHUNK_SIZE); pin_mut!(stream); + let (epoch, item): (_, KvLogStoreItem) = stream.try_next().await.unwrap().unwrap(); + assert_eq!(EPOCH0, epoch); + match item { + KvLogStoreItem::StreamChunk(_) => unreachable!(), + KvLogStoreItem::Barrier { is_checkpoint } => { + assert!(is_checkpoint); + } + } + assert!(poll_fn(|cx| Poll::Ready(stream.poll_next_unpin(cx))) .await .is_pending()); tx1.send(()).unwrap(); - let (epoch, item): (_, KvLogStoreItem) = stream.try_next().await.unwrap().unwrap(); - assert_eq!(EPOCH1, epoch); - match item { - KvLogStoreItem::StreamChunk(chunk) => { - assert_eq!(chunk.cardinality(), CHUNK_SIZE); - for (i, (op, row)) in chunk.rows().enumerate() { - assert_eq!(op, ops[i]); - assert_eq!(row.to_owned_row(), rows[i]); - } - } - _ => unreachable!(), - } - - let (epoch, item): (_, KvLogStoreItem) = stream.try_next().await.unwrap().unwrap(); - assert_eq!(EPOCH1, epoch); - match item { - KvLogStoreItem::StreamChunk(chunk) => { - assert_eq!(chunk.cardinality(), ops.len() - CHUNK_SIZE); - for (i, (op, row)) in chunk.rows().skip(CHUNK_SIZE).enumerate() { - assert_eq!(op, ops[i + CHUNK_SIZE]); - assert_eq!(row.to_owned_row(), rows[i + CHUNK_SIZE]); + { + let mut remain = ops.len(); + while remain > 0 { + let size = min(remain, CHUNK_SIZE); + let start_index = ops.len() - remain; + remain -= size; + let (epoch, item): (_, KvLogStoreItem) = stream.try_next().await.unwrap().unwrap(); + assert_eq!(EPOCH1, epoch); + match item { + KvLogStoreItem::StreamChunk(chunk) => { + assert_eq!(chunk.cardinality(), size); + assert!(check_rows_eq( + chunk.rows(), + (start_index..(start_index + size)).map(|i| (ops[i], &rows[i])) + )); + } + _ => unreachable!(), } } - _ => unreachable!(), } let (epoch, item): (_, KvLogStoreItem) = stream.try_next().await.unwrap().unwrap(); @@ -982,30 +1115,25 @@ mod tests { tx2.send(()).unwrap(); - let (epoch, item): (_, KvLogStoreItem) = stream.try_next().await.unwrap().unwrap(); - assert_eq!(EPOCH2, epoch); - match item { - KvLogStoreItem::StreamChunk(chunk) => { - assert_eq!(chunk.cardinality(), CHUNK_SIZE); - for (i, (op, row)) in chunk.rows().enumerate() { - assert_eq!(op, ops[i]); - assert_eq!(row.to_owned_row(), rows[i]); - } - } - _ => unreachable!(), - } - - let (epoch, item): (_, KvLogStoreItem) = stream.try_next().await.unwrap().unwrap(); - assert_eq!(EPOCH2, epoch); - match item { - KvLogStoreItem::StreamChunk(chunk) => { - assert_eq!(chunk.cardinality(), ops.len() - CHUNK_SIZE); - for (i, (op, row)) in chunk.rows().skip(CHUNK_SIZE).enumerate() { - assert_eq!(op, ops[i + CHUNK_SIZE]); - assert_eq!(row.to_owned_row(), rows[i + CHUNK_SIZE]); + { + let mut remain = ops.len(); + while remain > 0 { + let size = min(remain, CHUNK_SIZE); + let start_index = ops.len() - remain; + remain -= size; + let (epoch, item): (_, KvLogStoreItem) = stream.try_next().await.unwrap().unwrap(); + assert_eq!(EPOCH2, epoch); + match item { + KvLogStoreItem::StreamChunk(chunk) => { + assert_eq!(chunk.cardinality(), size); + assert!(check_rows_eq( + chunk.rows(), + (start_index..(start_index + size)).map(|i| (ops[i], &rows[i])) + )); + } + _ => unreachable!(), } } - _ => unreachable!(), } let (epoch, item): (_, KvLogStoreItem) = stream.try_next().await.unwrap().unwrap(); @@ -1024,11 +1152,11 @@ mod tests { async fn test_empty_stream() { let table = gen_test_log_store_table(); - let serde = LogStoreRowSerde::new(&table, None); + let serde = LogStoreRowSerde::new(&table, Some(Arc::new(Bitmap::ones(VirtualNode::COUNT)))); const CHUNK_SIZE: usize = 3; - let stream = new_log_store_item_stream(vec![empty(), empty()], serde, CHUNK_SIZE); + let stream = merge_log_store_item_stream(vec![empty(), empty()], serde, CHUNK_SIZE); pin_mut!(stream); diff --git a/src/stream/src/common/log_store_impl/kv_log_store/test_utils.rs b/src/stream/src/common/log_store_impl/kv_log_store/test_utils.rs index 8eb3a82fb742d..829f52a8a9d6a 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/test_utils.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/test_utils.rs @@ -12,46 +12,77 @@ // See the License for the specific language governing permissions and // limitations under the License. -use itertools::Itertools; -use risingwave_common::array::{Op, StreamChunk}; +use itertools::{zip_eq, Itertools}; +use rand::RngCore; +use risingwave_common::array::{Op, RowRef, StreamChunk}; +use risingwave_common::buffer::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId}; -use risingwave_common::row::OwnedRow; +use risingwave_common::constants::log_store::KV_LOG_STORE_PREDEFINED_COLUMNS; +use risingwave_common::hash::VirtualNode; +use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::types::{DataType, ScalarImpl, ScalarRef}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_common::util::sort_util::OrderType; use risingwave_pb::catalog::PbTable; -use crate::common::table::test_utils::gen_prost_table; +use crate::common::table::test_utils::gen_prost_table_with_dist_key; pub(crate) const TEST_TABLE_ID: TableId = TableId { table_id: 233 }; pub(crate) fn gen_test_data(base: i64) -> (Vec, Vec) { - let ops = vec![Op::Insert, Op::Delete, Op::UpdateDelete, Op::UpdateInsert]; - let rows = vec![ - OwnedRow::new(vec![ - Some(ScalarImpl::Int64(1 + base)), - Some(ScalarImpl::Utf8("name1".to_owned_scalar())), - ]), - OwnedRow::new(vec![ - Some(ScalarImpl::Int64(2 + base)), - Some(ScalarImpl::Utf8("name2".to_owned_scalar())), - ]), - OwnedRow::new(vec![ - Some(ScalarImpl::Int64(3 + base)), - Some(ScalarImpl::Utf8("name3".to_owned_scalar())), - ]), - OwnedRow::new(vec![ - Some(ScalarImpl::Int64(3 + base)), - Some(ScalarImpl::Utf8("name4".to_owned_scalar())), - ]), - ]; + gen_sized_test_data(base, 10) +} + +pub(crate) fn gen_sized_test_data(base: i64, max_count: usize) -> (Vec, Vec) { + let mut ops = Vec::new(); + let mut rows = Vec::new(); + while ops.len() < max_count - 1 { + let index = ops.len() as i64; + match rand::thread_rng().next_u32() % 3 { + 0 => { + ops.push(Op::Insert); + rows.push(OwnedRow::new(vec![ + Some(ScalarImpl::Int64(index + base)), + Some(ScalarImpl::Utf8( + format!("name{}", index).as_str().to_owned_scalar(), + )), + ])); + } + 1 => { + ops.push(Op::Delete); + rows.push(OwnedRow::new(vec![ + Some(ScalarImpl::Int64(index + base)), + Some(ScalarImpl::Utf8( + format!("name{}", index).as_str().to_owned_scalar(), + )), + ])); + } + 2 => { + ops.push(Op::UpdateDelete); + rows.push(OwnedRow::new(vec![ + Some(ScalarImpl::Int64(index + base)), + Some(ScalarImpl::Utf8( + format!("name{}", index).as_str().to_owned_scalar(), + )), + ])); + ops.push(Op::UpdateInsert); + rows.push(OwnedRow::new(vec![ + Some(ScalarImpl::Int64(index + base)), + Some(ScalarImpl::Utf8( + format!("name{}", index + 1).as_str().to_owned_scalar(), + )), + ])); + } + _ => unreachable!(), + } + } (ops, rows) } pub(crate) fn test_payload_schema() -> Vec { vec![ ColumnDesc::unnamed(ColumnId::from(3), DataType::Int64), // id - ColumnDesc::unnamed(ColumnId::from(2), DataType::Varchar), // name + ColumnDesc::unnamed(ColumnId::from(4), DataType::Varchar), // name ] } @@ -81,16 +112,95 @@ pub(crate) fn gen_stream_chunk(base: i64) -> StreamChunk { StreamChunk::from_parts(ops, data_chunk) } +pub(crate) fn gen_multi_vnode_stream_chunks( + base: i64, + max_count: usize, +) -> [StreamChunk; MOD_COUNT] { + let mut data_builder = (0..MOD_COUNT) + .map(|_| { + ( + Vec::new() as Vec, + DataChunkBuilder::new( + test_payload_schema() + .iter() + .map(|col| col.data_type.clone()) + .collect_vec(), + max_count, + ), + ) + }) + .collect_vec(); + let (ops, rows) = gen_sized_test_data(base, max_count); + for (op, row) in zip_eq(ops, rows) { + let vnode = VirtualNode::compute_row(&row, &[TEST_SCHEMA_DIST_KEY_INDEX]); + let (ops, builder) = &mut data_builder[vnode.to_index() % MOD_COUNT]; + ops.push(op); + assert!(builder.append_one_row(row).is_none()); + } + + data_builder + .into_iter() + .map(|(ops, mut builder)| StreamChunk::from_parts(ops, builder.consume_all().unwrap())) + .collect_vec() + .try_into() + .unwrap() +} + +pub(crate) const TEST_SCHEMA_DIST_KEY_INDEX: usize = 0; + pub(crate) fn gen_test_log_store_table() -> PbTable { let schema = test_log_store_table_schema(); let order_types = vec![OrderType::ascending(), OrderType::ascending_nulls_last()]; let pk_index = vec![0_usize, 1_usize]; let read_prefix_len_hint = 0; - gen_prost_table( + gen_prost_table_with_dist_key( TEST_TABLE_ID, schema, order_types, pk_index, read_prefix_len_hint, + vec![TEST_SCHEMA_DIST_KEY_INDEX + KV_LOG_STORE_PREDEFINED_COLUMNS.len()], // id field ) } + +pub(crate) fn calculate_vnode_bitmap<'a>( + test_data: impl Iterator)>, +) -> Bitmap { + let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + for vnode in + test_data.map(|(_, row)| VirtualNode::compute_row(row, &[TEST_SCHEMA_DIST_KEY_INDEX])) + { + builder.set(vnode.to_index(), true); + } + builder.finish() +} + +pub(crate) fn check_rows_eq( + first: impl Iterator, + second: impl Iterator, +) -> bool { + for ((op1, row1), (op2, row2)) in zip_eq( + first.sorted_by_key(|(_, row)| { + row.datum_at(TEST_SCHEMA_DIST_KEY_INDEX) + .unwrap() + .into_int64() + }), + second.sorted_by_key(|(_, row)| { + row.datum_at(TEST_SCHEMA_DIST_KEY_INDEX) + .unwrap() + .into_int64() + }), + ) { + if op1 != op2 { + return false; + } + if row1.to_owned_row() != row2.to_owned_row() { + return false; + } + } + true +} + +pub(crate) fn check_stream_chunk_eq(first: &StreamChunk, second: &StreamChunk) -> bool { + check_rows_eq(first.rows(), second.rows()) +} diff --git a/src/stream/src/common/table/test_utils.rs b/src/stream/src/common/table/test_utils.rs index 526f6864b3a99..90e7886df26bf 100644 --- a/src/stream/src/common/table/test_utils.rs +++ b/src/stream/src/common/table/test_utils.rs @@ -38,6 +38,26 @@ pub(crate) fn gen_prost_table( ) } +pub(crate) fn gen_prost_table_with_dist_key( + table_id: TableId, + column_descs: Vec, + order_types: Vec, + pk_index: Vec, + read_prefix_len_hint: u32, + distribution_key: Vec, +) -> PbTable { + let col_len = column_descs.len() as i32; + gen_prost_table_inner( + table_id, + column_descs, + order_types, + pk_index, + read_prefix_len_hint, + (0..col_len).collect_vec(), + distribution_key, + ) +} + pub(crate) fn gen_prost_table_with_value_indices( table_id: TableId, column_descs: Vec, @@ -45,6 +65,26 @@ pub(crate) fn gen_prost_table_with_value_indices( pk_index: Vec, read_prefix_len_hint: u32, value_indices: Vec, +) -> PbTable { + gen_prost_table_inner( + table_id, + column_descs, + order_types, + pk_index, + read_prefix_len_hint, + value_indices, + Vec::default(), + ) +} + +pub(crate) fn gen_prost_table_inner( + table_id: TableId, + column_descs: Vec, + order_types: Vec, + pk_index: Vec, + read_prefix_len_hint: u32, + value_indices: Vec, + distribution_key: Vec, ) -> PbTable { let prost_pk = pk_index .iter() @@ -62,12 +102,15 @@ pub(crate) fn gen_prost_table_with_value_indices( }) .collect(); + let distribution_key = distribution_key.into_iter().map(|i| i as i32).collect_vec(); + PbTable { id: table_id.table_id(), columns: prost_columns, pk: prost_pk, read_prefix_len_hint, value_indices, + distribution_key, ..Default::default() } }