From 572b017eec26098dff40f2e82087e1f01aff3f9e Mon Sep 17 00:00:00 2001 From: William Wen Date: Fri, 1 Nov 2024 13:51:06 +0800 Subject: [PATCH 1/9] feat(stream): wait committed epoch in state table init_epoch --- proto/stream_service.proto | 1 - src/compute/tests/integration_tests.rs | 2 +- src/ctl/src/cmd_impl/bench.rs | 2 +- src/meta/src/barrier/rpc.rs | 6 +- src/rpc_client/src/stream_client.rs | 3 - .../hummock_test/src/state_store_tests.rs | 4 +- .../event_handler/hummock_event_handler.rs | 196 +----------------- src/storage/src/hummock/event_handler/mod.rs | 6 +- .../src/hummock/event_handler/refiller.rs | 15 -- .../src/hummock/store/hummock_storage.rs | 4 +- .../hummock/store/local_hummock_storage.rs | 4 +- src/stream/benches/bench_state_table.rs | 4 +- src/stream/spill_test/src/test_mem_table.rs | 4 +- .../common/log_store_impl/kv_log_store/mod.rs | 25 +-- src/stream/src/common/table/state_table.rs | 30 +-- .../src/common/table/test_state_table.rs | 28 +-- .../src/common/table/test_storage_table.rs | 10 +- .../src/executor/aggregation/distinct.rs | 12 +- src/stream/src/executor/aggregation/minput.rs | 18 +- .../approx_percentile/global_state.rs | 4 +- src/stream/src/executor/asof_join.rs | 11 +- .../executor/backfill/arrangement_backfill.rs | 10 +- .../src/executor/backfill/cdc/cdc_backfill.rs | 8 +- src/stream/src/executor/backfill/cdc/state.rs | 4 +- .../executor/backfill/no_shuffle_backfill.rs | 9 +- .../src/executor/dedup/append_only_dedup.rs | 4 +- src/stream/src/executor/dynamic_filter.rs | 10 +- src/stream/src/executor/hash_agg.rs | 8 +- src/stream/src/executor/hash_join.rs | 11 +- src/stream/src/executor/join/hash_join.rs | 7 +- src/stream/src/executor/mview/materialize.rs | 4 +- src/stream/src/executor/mview/test_utils.rs | 2 +- src/stream/src/executor/now.rs | 31 ++- src/stream/src/executor/over_window/eowc.rs | 4 +- .../src/executor/over_window/general.rs | 4 +- src/stream/src/executor/simple_agg.rs | 11 +- src/stream/src/executor/sort.rs | 3 +- .../src/executor/source/fetch_executor.rs | 9 +- .../src/executor/source/fs_source_executor.rs | 23 +- .../source/source_backfill_executor.rs | 19 +- .../source/source_backfill_state_table.rs | 4 +- .../src/executor/source/source_executor.rs | 33 +-- .../executor/source/state_table_handler.rs | 8 +- src/stream/src/executor/temporal_join.rs | 15 +- src/stream/src/executor/top_n/group_top_n.rs | 5 +- .../executor/top_n/group_top_n_appendonly.rs | 5 +- .../src/executor/top_n/top_n_appendonly.rs | 4 +- src/stream/src/executor/top_n/top_n_plain.rs | 4 +- src/stream/src/executor/top_n/top_n_state.rs | 16 +- src/stream/src/executor/top_n/utils.rs | 9 +- src/stream/src/executor/watermark_filter.rs | 5 +- src/stream/src/task/barrier_manager.rs | 5 +- src/stream/src/task/stream_manager.rs | 5 +- 53 files changed, 245 insertions(+), 443 deletions(-) diff --git a/proto/stream_service.proto b/proto/stream_service.proto index 011c06284397a..ba07ae61b07ff 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -64,7 +64,6 @@ message WaitEpochCommitResponse { message StreamingControlStreamRequest { message InitRequest { - uint64 version_id = 1; repeated stream_plan.SubscriptionUpstreamInfo subscriptions = 2; } diff --git a/src/compute/tests/integration_tests.rs b/src/compute/tests/integration_tests.rs index f36cd6cf8164f..798e0120ee3d5 100644 --- a/src/compute/tests/integration_tests.rs +++ b/src/compute/tests/integration_tests.rs @@ -471,7 +471,7 @@ async fn test_row_seq_scan() -> StreamResult<()> { ); let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); - state.init_epoch(epoch); + state.init_epoch(epoch).await?; state.insert(OwnedRow::new(vec![ Some(1_i32.into()), Some(4_i32.into()), diff --git a/src/ctl/src/cmd_impl/bench.rs b/src/ctl/src/cmd_impl/bench.rs index dce4a21115d6a..3ea966ea9c63a 100644 --- a/src/ctl/src/cmd_impl/bench.rs +++ b/src/ctl/src/cmd_impl/bench.rs @@ -107,7 +107,7 @@ pub async fn do_bench(context: &CtlContext, cmd: BenchCommands) -> Result<()> { tracing::info!(thread = i, "starting scan"); let state_table = { let mut tb = make_state_table(hummock, &table).await; - tb.init_epoch(EpochPair::new_test_epoch(u64::MAX)); + tb.init_epoch(EpochPair::new_test_epoch(u64::MAX)).await?; tb }; loop { diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 1fbb38f42b267..481b1bed15cff 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -437,16 +437,12 @@ impl GlobalBarrierWorkerContextImpl { node: &WorkerNode, mv_depended_subscriptions: &HashMap>, ) -> MetaResult { - let initial_version_id = self - .hummock_manager - .on_current_version(|version| version.id) - .await; let handle = self .env .stream_client_pool() .get(node) .await? - .start_streaming_control(initial_version_id, mv_depended_subscriptions) + .start_streaming_control(mv_depended_subscriptions) .await?; Ok(handle) } diff --git a/src/rpc_client/src/stream_client.rs b/src/rpc_client/src/stream_client.rs index 42eeaa780d099..6ed4608493334 100644 --- a/src/rpc_client/src/stream_client.rs +++ b/src/rpc_client/src/stream_client.rs @@ -23,7 +23,6 @@ use risingwave_common::catalog::TableId; use risingwave_common::config::MAX_CONNECTION_WINDOW_SIZE; use risingwave_common::monitor::{EndpointExt, TcpConfig}; use risingwave_common::util::addr::HostAddr; -use risingwave_hummock_sdk::HummockVersionId; use risingwave_pb::stream_plan::SubscriptionUpstreamInfo; use risingwave_pb::stream_service::stream_service_client::StreamServiceClient; use risingwave_pb::stream_service::streaming_control_stream_request::InitRequest; @@ -89,13 +88,11 @@ pub type StreamingControlHandle = impl StreamClient { pub async fn start_streaming_control( &self, - version_id: HummockVersionId, mv_depended_subscriptions: &HashMap>, ) -> Result { let first_request = StreamingControlStreamRequest { request: Some(streaming_control_stream_request::Request::Init( InitRequest { - version_id: version_id.to_u64(), subscriptions: mv_depended_subscriptions .iter() .flat_map(|(table_id, subscriptions)| { diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index 6734235225654..0893e44e9a1b5 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -1357,9 +1357,7 @@ async fn test_clear_shared_buffer() { drop(local_hummock_storage); - hummock_storage - .clear_shared_buffer(hummock_storage.get_pinned_version().id()) - .await; + hummock_storage.clear_shared_buffer().await; } /// Test the following behaviours: diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index 7a33ed81b4373..74c0de8a9e7a1 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -28,7 +28,7 @@ use prometheus::core::{AtomicU64, GenericGauge}; use prometheus::{Histogram, IntGauge}; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::SstDeltaInfo; -use risingwave_hummock_sdk::{HummockEpoch, HummockVersionId, SyncResult}; +use risingwave_hummock_sdk::{HummockEpoch, SyncResult}; use tokio::spawn; use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; @@ -465,67 +465,14 @@ impl HummockEventHandler { .start_sync_epoch(new_sync_epoch, sync_result_sender, table_ids); } - async fn handle_clear(&mut self, notifier: oneshot::Sender<()>, version_id: HummockVersionId) { + fn handle_clear(&mut self, notifier: oneshot::Sender<()>) { info!( - ?version_id, current_version_id = ?self.uploader.hummock_version().id(), "handle clear event" ); self.uploader.clear(); - let current_version = self.uploader.hummock_version(); - - if current_version.id < version_id { - let mut latest_version = if let Some(CacheRefillerEvent { - pinned_version, - new_pinned_version, - }) = self.refiller.clear() - { - assert_eq!( - current_version.id(), - pinned_version.id(), - "refiller earliest version {:?} not equal to current version {:?}", - *pinned_version, - **current_version - ); - - info!(?version_id, "refiller is clear in recovery"); - - Some(new_pinned_version) - } else { - None - }; - - while let latest_version_ref = latest_version.as_ref().unwrap_or(current_version) - && latest_version_ref.id < version_id - { - let version_update = self - .version_update_rx - .recv() - .await - .expect("should not be empty"); - let prev_version_id = latest_version_ref.id(); - if let Some(new_version) = Self::resolve_version_update_info( - latest_version_ref.clone(), - version_update, - None, - ) { - info!( - ?prev_version_id, - new_version_id = ?new_version.id(), - "recv new version" - ); - latest_version = Some(new_version); - } - } - - self.apply_version_update( - current_version.clone(), - latest_version.expect("must have some version update to raise the mce"), - ); - } - assert!( self.local_read_version_mapping.is_empty(), "read version mapping not empty when clear. remaining tables: {:?}", @@ -540,7 +487,7 @@ impl HummockEventHandler { error!("failed to notify completion of clear event: {:?}", e); }); - info!(?version_id, "clear finished"); + info!("clear finished"); } fn handle_version_update(&mut self, version_payload: HummockVersionUpdate) { @@ -646,9 +593,6 @@ impl HummockEventHandler { event = pin!(self.hummock_event_rx.recv()) => { let Some(event) = event else { break }; match event { - HummockEvent::Clear(notifier, version_id) => { - self.handle_clear(notifier, version_id).await - }, HummockEvent::Shutdown => { info!("event handler shutdown"); return; @@ -690,8 +634,8 @@ impl HummockEventHandler { } => { self.handle_sync_epoch(new_sync_epoch, sync_result_sender, table_ids); } - HummockEvent::Clear(_, _) => { - unreachable!("clear is handled in separated async context") + HummockEvent::Clear(notifier) => { + self.handle_clear(notifier); } HummockEvent::Shutdown => { unreachable!("shutdown is handled specially") @@ -886,7 +830,7 @@ impl SyncedData { #[cfg(test)] mod tests { use std::collections::{HashMap, HashSet}; - use std::future::{poll_fn, Future}; + use std::future::poll_fn; use std::sync::Arc; use std::task::Poll; @@ -905,7 +849,7 @@ mod tests { use crate::hummock::event_handler::refiller::CacheRefiller; use crate::hummock::event_handler::uploader::test_utils::{gen_imm, TEST_TABLE_ID}; use crate::hummock::event_handler::uploader::UploadTaskOutput; - use crate::hummock::event_handler::{HummockEvent, HummockEventHandler, HummockVersionUpdate}; + use crate::hummock::event_handler::{HummockEvent, HummockEventHandler}; use crate::hummock::iterator::test_utils::mock_sstable_store; use crate::hummock::local_version::pinned_version::PinnedVersion; use crate::hummock::store::version::{StagingData, VersionUpdate}; @@ -914,132 +858,6 @@ mod tests { use crate::monitor::HummockStateStoreMetrics; use crate::store::SealCurrentEpochOptions; - #[tokio::test] - async fn test_clear_shared_buffer() { - let mut next_version_id = 1; - let mut make_new_version = || { - let id = next_version_id; - next_version_id += 1; - HummockVersion::from_rpc_protobuf(&PbHummockVersion { - id, - ..Default::default() - }) - }; - - let initial_version = PinnedVersion::new(make_new_version(), unbounded_channel().0); - - let (version_update_tx, version_update_rx) = unbounded_channel(); - let (refill_task_tx, mut refill_task_rx) = unbounded_channel(); - - let refill_task_tx_clone = refill_task_tx.clone(); - - let event_handler = HummockEventHandler::new_inner( - version_update_rx, - initial_version.clone(), - mock_sstable_store().await, - Arc::new(HummockStateStoreMetrics::unused()), - &default_opts_for_test(), - Arc::new(|_, _| unreachable!("should not spawn upload task")), - Arc::new(move |_, _, old_version, new_version| { - let (tx, rx) = oneshot::channel(); - refill_task_tx_clone - .send((old_version, new_version, tx)) - .unwrap(); - spawn(async move { - let _ = rx.await; - }) - }), - ); - - let event_tx = event_handler.event_sender(); - let latest_version = event_handler.recent_versions.clone(); - let latest_version_update_tx = event_handler.version_update_notifier_tx.clone(); - - let send_clear = |version_id| { - let (tx, rx) = oneshot::channel(); - event_tx.send(HummockEvent::Clear(tx, version_id)).unwrap(); - rx - }; - - let _join_handle = spawn(event_handler.start_hummock_event_handler_worker()); - - // test normal recovery - send_clear(initial_version.id()).await.unwrap(); - - // test normal refill finish - let version1 = make_new_version(); - { - version_update_tx - .send(HummockVersionUpdate::PinnedVersion(Box::new( - version1.clone(), - ))) - .unwrap(); - let (old_version, new_version, refill_finish_tx) = refill_task_rx.recv().await.unwrap(); - assert_eq!(*old_version, *initial_version); - assert_eq!(*new_version, version1); - assert_eq!(**latest_version.load().latest_version(), *initial_version); - - let mut changed = latest_version_update_tx.subscribe(); - refill_finish_tx.send(()).unwrap(); - changed.changed().await.unwrap(); - assert_eq!(**latest_version.load().latest_version(), version1); - } - - // test recovery with pending refill task - let version2 = make_new_version(); - let version3 = make_new_version(); - { - version_update_tx - .send(HummockVersionUpdate::PinnedVersion(Box::new( - version2.clone(), - ))) - .unwrap(); - version_update_tx - .send(HummockVersionUpdate::PinnedVersion(Box::new( - version3.clone(), - ))) - .unwrap(); - let (old_version2, new_version2, _refill_finish_tx2) = - refill_task_rx.recv().await.unwrap(); - assert_eq!(*old_version2, version1); - assert_eq!(*new_version2, version2); - let (old_version3, new_version3, _refill_finish_tx3) = - refill_task_rx.recv().await.unwrap(); - assert_eq!(*old_version3, version2); - assert_eq!(*new_version3, version3); - assert_eq!(**latest_version.load().latest_version(), version1); - - let rx = send_clear(version3.id); - rx.await.unwrap(); - assert_eq!(**latest_version.load().latest_version(), version3); - } - - async fn assert_pending(fut: &mut (impl Future + Unpin)) { - assert!(poll_fn(|cx| Poll::Ready(fut.poll_unpin(cx).is_pending())).await); - } - - // test recovery with later arriving version update - let version4 = make_new_version(); - let version5 = make_new_version(); - { - let mut rx = send_clear(version5.id); - assert_pending(&mut rx).await; - version_update_tx - .send(HummockVersionUpdate::PinnedVersion(Box::new( - version4.clone(), - ))) - .unwrap(); - assert_pending(&mut rx).await; - version_update_tx - .send(HummockVersionUpdate::PinnedVersion(Box::new( - version5.clone(), - ))) - .unwrap(); - rx.await.unwrap(); - assert_eq!(**latest_version.load().latest_version(), version5); - } - } - #[tokio::test] async fn test_old_epoch_sync_fail() { let epoch0 = test_epoch(233); diff --git a/src/storage/src/hummock/event_handler/mod.rs b/src/storage/src/hummock/event_handler/mod.rs index f0a4b2a899874..910c567e5d4da 100644 --- a/src/storage/src/hummock/event_handler/mod.rs +++ b/src/storage/src/hummock/event_handler/mod.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use parking_lot::{RwLock, RwLockReadGuard}; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::TableId; -use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId, HummockVersionId}; +use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId}; use thiserror_ext::AsReport; use tokio::sync::oneshot; @@ -65,7 +65,7 @@ pub enum HummockEvent { }, /// Clear shared buffer and reset all states - Clear(oneshot::Sender<()>, HummockVersionId), + Clear(oneshot::Sender<()>), Shutdown, @@ -122,7 +122,7 @@ impl HummockEvent { table_ids, } => format!("AwaitSyncEpoch epoch {} {:?}", new_sync_epoch, table_ids), - HummockEvent::Clear(_, version_id) => format!("Clear {}", version_id), + HummockEvent::Clear(_) => "Clear".to_string(), HummockEvent::Shutdown => "Shutdown".to_string(), diff --git a/src/storage/src/hummock/event_handler/refiller.rs b/src/storage/src/hummock/event_handler/refiller.rs index 09526f6b83c17..dcf5c7a565eb4 100644 --- a/src/storage/src/hummock/event_handler/refiller.rs +++ b/src/storage/src/hummock/event_handler/refiller.rs @@ -287,21 +287,6 @@ impl CacheRefiller { pub(crate) fn last_new_pinned_version(&self) -> Option<&PinnedVersion> { self.queue.back().map(|item| &item.event.new_pinned_version) } - - /// Clear the queue for cache refill and return an event that merges all pending cache refill events - /// into a single event that takes the earliest and latest version. - pub(crate) fn clear(&mut self) -> Option { - let last_item = self.queue.pop_back()?; - let mut event = last_item.event; - while let Some(item) = self.queue.pop_back() { - assert_eq!( - event.pinned_version.id(), - item.event.new_pinned_version.id() - ); - event.pinned_version = item.event.pinned_version; - } - Some(event) - } } impl CacheRefiller { diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 665b064181687..ffc41893208c9 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -511,10 +511,10 @@ impl HummockStorage { ) } - pub async fn clear_shared_buffer(&self, version_id: HummockVersionId) { + pub async fn clear_shared_buffer(&self) { let (tx, rx) = oneshot::channel(); self.hummock_event_sender - .send(HummockEvent::Clear(tx, version_id)) + .send(HummockEvent::Clear(tx)) .expect("should send success"); rx.await.expect("should wait success"); } diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index d0082f21b31f9..8279530d60fb7 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -482,9 +482,7 @@ impl LocalStateStore for LocalHummockStorage { async fn init(&mut self, options: InitOptions) -> StorageResult<()> { let epoch = options.epoch; - if self.is_replicated { - self.wait_for_epoch(epoch.prev).await?; - } + self.wait_for_epoch(epoch.prev).await?; assert!( self.epoch.replace(epoch.curr).is_none(), "local state store of table id {:?} is init for more than once", diff --git a/src/stream/benches/bench_state_table.rs b/src/stream/benches/bench_state_table.rs index ceaabdec5b637..445c5f7ddc321 100644 --- a/src/stream/benches/bench_state_table.rs +++ b/src/stream/benches/bench_state_table.rs @@ -113,7 +113,7 @@ async fn run_bench_state_table_inserts( rows: Vec, ) { let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); - state_table.init_epoch(epoch); + state_table.init_epoch(epoch).await.unwrap(); for row in rows { state_table.insert(row); } @@ -173,7 +173,7 @@ async fn run_bench_state_table_chunks( chunks: Vec, ) { let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); - state_table.init_epoch(epoch); + state_table.init_epoch(epoch).await.unwrap(); for chunk in chunks { state_table.write_chunk(chunk); } diff --git a/src/stream/spill_test/src/test_mem_table.rs b/src/stream/spill_test/src/test_mem_table.rs index 75407feeaa286..a24d998ac88f3 100644 --- a/src/stream/spill_test/src/test_mem_table.rs +++ b/src/stream/spill_test/src/test_mem_table.rs @@ -65,7 +65,7 @@ async fn test_mem_table_spill_in_streaming() { .await; let epoch = EpochPair::new_test_epoch(test_epoch(1)); - state_table.init_epoch(epoch); + state_table.init_epoch(epoch).await.unwrap(); state_table.insert(OwnedRow::new(vec![ Some(1_i32.into()), @@ -195,7 +195,7 @@ async fn test_mem_table_spill_in_streaming_multiple_times() { .await; let epoch = EpochPair::new_test_epoch(test_epoch(1)); - state_table.init_epoch(epoch); + state_table.init_epoch(epoch).await.unwrap(); state_table.insert(OwnedRow::new(vec![ Some(1_i32.into()), diff --git a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs index 49ca6bbc17d5e..490c5bf9abca3 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs @@ -696,10 +696,7 @@ mod tests { drop(writer); // Recovery - test_env - .storage - .clear_shared_buffer(test_env.manager.get_current_version().await.id) - .await; + test_env.storage.clear_shared_buffer().await; // Rebuild log reader and writer in recovery let factory = KvLogStoreFactory::new( @@ -915,10 +912,7 @@ mod tests { drop(writer); // Recovery - test_env - .storage - .clear_shared_buffer(test_env.manager.get_current_version().await.id) - .await; + test_env.storage.clear_shared_buffer().await; // Rebuild log reader and writer in recovery let factory = KvLogStoreFactory::new( @@ -1155,10 +1149,7 @@ mod tests { drop(writer2); // Recovery - test_env - .storage - .clear_shared_buffer(test_env.manager.get_current_version().await.id) - .await; + test_env.storage.clear_shared_buffer().await; let vnodes = build_bitmap(0..VirtualNode::COUNT_FOR_TEST); let factory = KvLogStoreFactory::new( @@ -1779,10 +1770,7 @@ mod tests { drop(writer); // Recovery - test_env - .storage - .clear_shared_buffer(test_env.manager.get_current_version().await.id) - .await; + test_env.storage.clear_shared_buffer().await; // Rebuild log reader and writer in recovery let factory = KvLogStoreFactory::new( @@ -1846,10 +1834,7 @@ mod tests { drop(writer); // Recovery - test_env - .storage - .clear_shared_buffer(test_env.manager.get_current_version().await.id) - .await; + test_env.storage.clear_shared_buffer().await; // Rebuild log reader and writer in recovery let factory = KvLogStoreFactory::new( diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 0ecb0eac90718..7eb8e4cd7f23d 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use bytes::{BufMut, Bytes, BytesMut}; use either::Either; use foyer::CacheContext; -use futures::{pin_mut, FutureExt, Stream, StreamExt, TryStreamExt}; +use futures::{pin_mut, Stream, StreamExt, TryStreamExt}; use futures_async_stream::for_await; use itertools::{izip, Itertools}; use risingwave_common::array::stream_record::Record; @@ -44,7 +44,7 @@ use risingwave_hummock_sdk::key::{ }; use risingwave_hummock_sdk::table_watermark::{VnodeWatermark, WatermarkDirection}; use risingwave_pb::catalog::Table; -use risingwave_storage::error::{ErrorKind, StorageError, StorageResult}; +use risingwave_storage::error::{ErrorKind, StorageError}; use risingwave_storage::hummock::CachePolicy; use risingwave_storage::mem_table::MemTableError; use risingwave_storage::row_serde::find_columns_by_ids; @@ -170,33 +170,17 @@ pub type WatermarkCacheParameterizedStateTable; // initialize -impl StateTableInner -where - S: StateStore, - SD: ValueRowSerde, -{ - /// get the newest epoch of the state store and panic if the `init_epoch()` has never be called - /// async interface only used for replicated state table, - /// as it needs to wait for prev epoch to be committed. - pub async fn init_epoch(&mut self, epoch: EpochPair) -> StorageResult<()> { - self.local_store.init(InitOptions::new(epoch)).await - } -} - -// initialize -impl StateTableInner +impl + StateTableInner where S: StateStore, SD: ValueRowSerde, { /// get the newest epoch of the state store and panic if the `init_epoch()` has never be called /// No need to `wait_for_epoch`, so it should complete immediately. - pub fn init_epoch(&mut self, epoch: EpochPair) { - self.local_store - .init(InitOptions::new(epoch)) - .now_or_never() - .expect("non-replicated state store should start immediately.") - .expect("non-replicated state store should not wait_for_epoch, and fail because of it.") + pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { + self.local_store.init(InitOptions::new(epoch)).await?; + Ok(()) } pub fn state_store(&self) -> &S { diff --git a/src/stream/src/common/table/test_state_table.rs b/src/stream/src/common/table/test_state_table.rs index 99036e968f046..7b8959bbf2c93 100644 --- a/src/stream/src/common/table/test_state_table.rs +++ b/src/stream/src/common/table/test_state_table.rs @@ -65,7 +65,7 @@ async fn test_state_table_update_insert() { test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); - state_table.init_epoch(epoch); + state_table.init_epoch(epoch).await.unwrap(); state_table.insert(OwnedRow::new(vec![ Some(6_i32.into()), @@ -258,7 +258,7 @@ async fn test_state_table_iter_with_prefix() { test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); - state_table.init_epoch(epoch); + state_table.init_epoch(epoch).await.unwrap(); state_table.insert(OwnedRow::new(vec![ Some(1_i32.into()), @@ -393,7 +393,7 @@ async fn test_state_table_iter_with_pk_range() { test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); - state_table.init_epoch(epoch); + state_table.init_epoch(epoch).await.unwrap(); state_table.insert(OwnedRow::new(vec![ Some(1_i32.into()), @@ -532,7 +532,7 @@ async fn test_mem_table_assertion() { test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); - state_table.init_epoch(epoch); + state_table.init_epoch(epoch).await.unwrap(); state_table.insert(OwnedRow::new(vec![ Some(1_i32.into()), Some(11_i32.into()), @@ -578,7 +578,7 @@ async fn test_state_table_iter_with_value_indices() { test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); - state_table.init_epoch(epoch); + state_table.init_epoch(epoch).await.unwrap(); state_table.insert(OwnedRow::new(vec![ Some(1_i32.into()), @@ -751,7 +751,7 @@ async fn test_state_table_iter_with_shuffle_value_indices() { test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); - state_table.init_epoch(epoch); + state_table.init_epoch(epoch).await.unwrap(); state_table.insert(OwnedRow::new(vec![ Some(1_i32.into()), @@ -998,7 +998,7 @@ async fn test_state_table_write_chunk() { test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); - state_table.init_epoch(epoch); + state_table.init_epoch(epoch).await.unwrap(); let chunk = StreamChunk::from_rows( &[ @@ -1130,7 +1130,7 @@ async fn test_state_table_write_chunk_visibility() { test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); - state_table.init_epoch(epoch); + state_table.init_epoch(epoch).await.unwrap(); let chunk = StreamChunk::from_rows( &[ @@ -1257,7 +1257,7 @@ async fn test_state_table_write_chunk_value_indices() { test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); - state_table.init_epoch(epoch); + state_table.init_epoch(epoch).await.unwrap(); let chunk = StreamChunk::from_rows( &[ @@ -1354,7 +1354,7 @@ async fn test_state_table_watermark_cache_ignore_null() { test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); - state_table.init_epoch(epoch); + state_table.init_epoch(epoch).await.unwrap(); let rows = vec![ ( @@ -1480,7 +1480,7 @@ async fn test_state_table_watermark_cache_write_chunk() { test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); - state_table.init_epoch(epoch); + state_table.init_epoch(epoch).await.unwrap(); let cache = state_table.get_watermark_cache(); assert_eq!(cache.len(), 0); @@ -1655,7 +1655,7 @@ async fn test_state_table_watermark_cache_refill() { test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); - state_table.init_epoch(epoch); + state_table.init_epoch(epoch).await.unwrap(); let rows = vec![ OwnedRow::new(vec![ @@ -1751,7 +1751,7 @@ async fn test_state_table_iter_prefix_and_sub_range() { test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); - state_table.init_epoch(epoch); + state_table.init_epoch(epoch).await.unwrap(); state_table.insert(OwnedRow::new(vec![ Some(1_i32.into()), @@ -1952,7 +1952,7 @@ async fn test_replicated_state_table_replication() { test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); - state_table.init_epoch(epoch); + state_table.init_epoch(epoch).await.unwrap(); replicated_state_table.init_epoch(epoch).await.unwrap(); // Insert first record into base state table diff --git a/src/stream/src/common/table/test_storage_table.rs b/src/stream/src/common/table/test_storage_table.rs index be09fa1e91833..bf1dd0098feb9 100644 --- a/src/stream/src/common/table/test_storage_table.rs +++ b/src/stream/src/common/table/test_storage_table.rs @@ -82,7 +82,7 @@ async fn test_storage_table_value_indices() { test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); - state.init_epoch(epoch); + state.init_epoch(epoch).await.unwrap(); state.insert(OwnedRow::new(vec![ Some(1_i32.into()), @@ -208,7 +208,7 @@ async fn test_shuffled_column_id_for_storage_table_get_row() { test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); - state.init_epoch(epoch); + state.init_epoch(epoch).await.unwrap(); let table = StorageTable::for_test( test_env.storage.clone(), @@ -327,7 +327,7 @@ async fn test_row_based_storage_table_point_get_in_batch_mode() { test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); - state.init_epoch(epoch); + state.init_epoch(epoch).await.unwrap(); state.insert(OwnedRow::new(vec![Some(1_i32.into()), None, None])); state.insert(OwnedRow::new(vec![ @@ -438,7 +438,7 @@ async fn test_batch_scan_with_value_indices() { test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); - state.init_epoch(epoch); + state.init_epoch(epoch).await.unwrap(); state.insert(OwnedRow::new(vec![ Some(1_i32.into()), @@ -542,7 +542,7 @@ async fn test_batch_scan_chunk_with_value_indices() { test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); - state.init_epoch(epoch); + state.init_epoch(epoch).await.unwrap(); let gen_row = |i: i32, is_update: bool| { let scale = if is_update { 10 } else { 1 }; diff --git a/src/stream/src/executor/aggregation/distinct.rs b/src/stream/src/executor/aggregation/distinct.rs index 3ec5aae2e97bb..4025bddcae8c8 100644 --- a/src/stream/src/executor/aggregation/distinct.rs +++ b/src/stream/src/executor/aggregation/distinct.rs @@ -375,9 +375,9 @@ mod tests { let store = MemoryStateStore::new(); let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); let mut dedup_tables = infer_dedup_tables(&agg_calls, &[], store).await; - dedup_tables - .values_mut() - .for_each(|table| table.init_epoch(epoch)); + for table in dedup_tables.values_mut() { + table.init_epoch(epoch).await.unwrap() + } let mut deduplicater = DistinctDeduplicater::new( &agg_calls, @@ -555,9 +555,9 @@ mod tests { let store = MemoryStateStore::new(); let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); let mut dedup_tables = infer_dedup_tables(&agg_calls, &group_key_types, store).await; - dedup_tables - .values_mut() - .for_each(|table| table.init_epoch(epoch)); + for table in dedup_tables.values_mut() { + table.init_epoch(epoch).await.unwrap() + } let mut deduplicater = DistinctDeduplicater::new( &agg_calls, diff --git a/src/stream/src/executor/aggregation/minput.rs b/src/stream/src/executor/aggregation/minput.rs index 4405411867204..fe88faf4a2bd7 100644 --- a/src/stream/src/executor/aggregation/minput.rs +++ b/src/stream/src/executor/aggregation/minput.rs @@ -410,7 +410,7 @@ mod tests { .unwrap(); let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); - table.init_epoch(epoch); + table.init_epoch(epoch).await.unwrap(); { let chunk = create_chunk( @@ -516,7 +516,7 @@ mod tests { .unwrap(); let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); - table.init_epoch(epoch); + table.init_epoch(epoch).await.unwrap(); { let chunk = create_chunk( @@ -619,8 +619,8 @@ mod tests { .await; let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); - table_1.init_epoch(epoch); - table_2.init_epoch(epoch); + table_1.init_epoch(epoch).await.unwrap(); + table_2.init_epoch(epoch).await.unwrap(); let order_columns_1 = vec![ ColumnOrder::new(0, OrderType::ascending()), // a ASC for AggKind::Min @@ -741,7 +741,7 @@ mod tests { .unwrap(); let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); - table.init_epoch(epoch); + table.init_epoch(epoch).await.unwrap(); { let chunk = create_chunk( @@ -830,7 +830,7 @@ mod tests { .await; let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); - table.init_epoch(epoch); + table.init_epoch(epoch).await.unwrap(); let order_columns = vec![ ColumnOrder::new(0, OrderType::ascending()), // a ASC for AggKind::Min @@ -956,7 +956,7 @@ mod tests { .unwrap(); let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); - table.init_epoch(epoch); + table.init_epoch(epoch).await.unwrap(); { let chunk = create_chunk( @@ -1075,7 +1075,7 @@ mod tests { .unwrap(); let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); - table.init_epoch(epoch); + table.init_epoch(epoch).await.unwrap(); { let chunk = create_chunk( @@ -1164,7 +1164,7 @@ mod tests { .unwrap(); let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); - table.init_epoch(epoch); + table.init_epoch(epoch).await.unwrap(); { let chunk = create_chunk( " T i i I diff --git a/src/stream/src/executor/approx_percentile/global_state.rs b/src/stream/src/executor/approx_percentile/global_state.rs index 58011a8450c88..3f95a57e22613 100644 --- a/src/stream/src/executor/approx_percentile/global_state.rs +++ b/src/stream/src/executor/approx_percentile/global_state.rs @@ -60,8 +60,8 @@ impl GlobalApproxPercentileState { pub async fn init(&mut self, init_epoch: EpochPair) -> StreamExecutorResult<()> { // Init state tables. - self.count_state_table.init_epoch(init_epoch); - self.bucket_state_table.init_epoch(init_epoch); + self.count_state_table.init_epoch(init_epoch).await?; + self.bucket_state_table.init_epoch(init_epoch).await?; // Refill row_count let row_count_state = self.get_row_count_state().await?; diff --git a/src/stream/src/executor/asof_join.rs b/src/stream/src/executor/asof_join.rs index 74e55deca75fa..f2a002e4f8115 100644 --- a/src/stream/src/executor/asof_join.rs +++ b/src/stream/src/executor/asof_join.rs @@ -100,8 +100,8 @@ impl JoinSide { // self.ht.clear(); } - pub fn init(&mut self, epoch: EpochPair) { - self.ht.init(epoch); + pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { + self.ht.init(epoch).await } } @@ -340,11 +340,12 @@ impl AsOfJoinExecutor pin_mut!(aligned_stream); let barrier = expect_first_barrier_from_aligned_stream(&mut aligned_stream).await?; - self.side_l.init(barrier.epoch); - self.side_r.init(barrier.epoch); - + let first_epoch = barrier.epoch; // The first barrier message should be propagated. yield Message::Barrier(barrier); + self.side_l.init(first_epoch).await?; + self.side_r.init(first_epoch).await?; + let actor_id_str = self.ctx.id.to_string(); let fragment_id_str = self.ctx.fragment_id.to_string(); diff --git a/src/stream/src/executor/backfill/arrangement_backfill.rs b/src/stream/src/executor/backfill/arrangement_backfill.rs index 540ffe1a020fc..e2d28bce04bc0 100644 --- a/src/stream/src/executor/backfill/arrangement_backfill.rs +++ b/src/stream/src/executor/backfill/arrangement_backfill.rs @@ -137,7 +137,11 @@ where let first_barrier = expect_first_barrier(&mut upstream).await?; let mut paused = first_barrier.is_pause_on_startup(); let first_epoch = first_barrier.epoch; - self.state_table.init_epoch(first_barrier.epoch); + let is_newly_added = first_barrier.is_newly_added(self.actor_id); + // The first barrier message should be propagated. + yield Message::Barrier(first_barrier); + + self.state_table.init_epoch(first_epoch).await?; let progress_per_vnode = get_progress_per_vnode(&self.state_table).await?; @@ -148,11 +152,9 @@ where ) }); if is_completely_finished { - assert!(!first_barrier.is_newly_added(self.actor_id)); + assert!(!is_newly_added); } - // The first barrier message should be propagated. - yield Message::Barrier(first_barrier); upstream_table.init_epoch(first_epoch).await?; let mut backfill_state: BackfillState = progress_per_vnode.into(); diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index 066dc86ba551c..e4cd24f58d4dd 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -159,6 +159,9 @@ impl CdcBackfillExecutor { let first_barrier = expect_first_barrier(&mut upstream).await?; let mut is_snapshot_paused = first_barrier.is_pause_on_startup(); + let first_barrier_epoch = first_barrier.epoch; + // The first barrier message should be propagated. + yield Message::Barrier(first_barrier); let mut rate_limit_to_zero = self.rate_limit_rps.is_some_and(|val| val == 0); // Check whether this parallelism has been assigned splits, @@ -169,7 +172,7 @@ impl CdcBackfillExecutor { .boxed() .peekable(); - state_impl.init_epoch(first_barrier.epoch); + state_impl.init_epoch(first_barrier_epoch).await?; // restore backfill state let state = state_impl.restore_state().await?; @@ -177,9 +180,6 @@ impl CdcBackfillExecutor { let to_backfill = !self.options.disable_backfill && !state.is_finished; - // The first barrier message should be propagated. - yield Message::Barrier(first_barrier); - // Keep track of rows from the snapshot. let mut total_snapshot_row_count = state.row_count as u64; diff --git a/src/stream/src/executor/backfill/cdc/state.rs b/src/stream/src/executor/backfill/cdc/state.rs index 9fda6c811de13..b36e90e2a2b0b 100644 --- a/src/stream/src/executor/backfill/cdc/state.rs +++ b/src/stream/src/executor/backfill/cdc/state.rs @@ -50,8 +50,8 @@ impl CdcBackfillState { } } - pub fn init_epoch(&mut self, epoch: EpochPair) { - self.state_table.init_epoch(epoch) + pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { + self.state_table.init_epoch(epoch).await } /// Restore the backfill state from storage diff --git a/src/stream/src/executor/backfill/no_shuffle_backfill.rs b/src/stream/src/executor/backfill/no_shuffle_backfill.rs index d8de07375d721..9d6d95ff7e7cb 100644 --- a/src/stream/src/executor/backfill/no_shuffle_backfill.rs +++ b/src/stream/src/executor/backfill/no_shuffle_backfill.rs @@ -140,9 +140,13 @@ where // Poll the upstream to get the first barrier. let first_barrier = expect_first_barrier(&mut upstream).await?; let mut paused = first_barrier.is_pause_on_startup(); + let first_epoch = first_barrier.epoch; let init_epoch = first_barrier.epoch.prev; + // The first barrier message should be propagated. + yield Message::Barrier(first_barrier); + if let Some(state_table) = self.state_table.as_mut() { - state_table.init_epoch(first_barrier.epoch); + state_table.init_epoch(first_epoch).await?; } let BackfillState { @@ -162,9 +166,6 @@ where // which will then be persisted. let mut current_state: Vec = vec![None; state_len]; - // The first barrier message should be propagated. - yield Message::Barrier(first_barrier); - // If no need backfill, but state was still "unfinished" we need to finish it. // So we just update the state + progress to meta at the next barrier to finish progress, // and forward other messages. diff --git a/src/stream/src/executor/dedup/append_only_dedup.rs b/src/stream/src/executor/dedup/append_only_dedup.rs index 0a38c82f294fd..7bd4620649989 100644 --- a/src/stream/src/executor/dedup/append_only_dedup.rs +++ b/src/stream/src/executor/dedup/append_only_dedup.rs @@ -59,10 +59,10 @@ impl AppendOnlyDedupExecutor { // Consume the first barrier message and initialize state table. let barrier = expect_first_barrier(&mut input).await?; - self.state_table.init_epoch(barrier.epoch); - + let first_epoch = barrier.epoch; // The first barrier message should be propagated. yield Message::Barrier(barrier); + self.state_table.init_epoch(first_epoch).await?; #[for_await] for msg in input { diff --git a/src/stream/src/executor/dynamic_filter.rs b/src/stream/src/executor/dynamic_filter.rs index d34de5c009ddb..8df058b9903a2 100644 --- a/src/stream/src/executor/dynamic_filter.rs +++ b/src/stream/src/executor/dynamic_filter.rs @@ -321,8 +321,11 @@ impl DynamicFilterExecutor DynamicFilterExecutor HashAggExecutor { // First barrier let mut input = input.execute(); let barrier = expect_first_barrier(&mut input).await?; - this.all_state_tables_mut().for_each(|table| { - table.init_epoch(barrier.epoch); - }); - + let first_epoch = barrier.epoch; yield Message::Barrier(barrier); + for table in this.all_state_tables_mut() { + table.init_epoch(first_epoch).await?; + } #[for_await] for msg in input { diff --git a/src/stream/src/executor/hash_join.rs b/src/stream/src/executor/hash_join.rs index 6c3e51f5ee55f..74db1d267b1ac 100644 --- a/src/stream/src/executor/hash_join.rs +++ b/src/stream/src/executor/hash_join.rs @@ -115,8 +115,8 @@ impl JoinSide { // self.ht.clear(); } - pub fn init(&mut self, epoch: EpochPair) { - self.ht.init(epoch); + pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { + self.ht.init(epoch).await } } @@ -465,11 +465,12 @@ impl HashJoinExecutor JoinHashMap { } } - pub fn init(&mut self, epoch: EpochPair) { - self.state.table.init_epoch(epoch); + pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { + self.state.table.init_epoch(epoch).await?; if let Some(degree_state) = &mut self.degree_state { - degree_state.table.init_epoch(epoch); + degree_state.table.init_epoch(epoch).await?; } + Ok(()) } /// Update the vnode bitmap and manipulate the cache if necessary. diff --git a/src/stream/src/executor/mview/materialize.rs b/src/stream/src/executor/mview/materialize.rs index a5dc24d5cd74b..faa63b51649c7 100644 --- a/src/stream/src/executor/mview/materialize.rs +++ b/src/stream/src/executor/mview/materialize.rs @@ -170,10 +170,10 @@ impl MaterializeExecutor { let mut input = self.input.execute(); let barrier = expect_first_barrier(&mut input).await?; - self.state_table.init_epoch(barrier.epoch); - + let first_epoch = barrier.epoch; // The first barrier message should be propagated. yield Message::Barrier(barrier); + self.state_table.init_epoch(first_epoch).await?; #[for_await] for msg in input { diff --git a/src/stream/src/executor/mview/test_utils.rs b/src/stream/src/executor/mview/test_utils.rs index c194142c0a49f..b6dfcdcda1d3e 100644 --- a/src/stream/src/executor/mview/test_utils.rs +++ b/src/stream/src/executor/mview/test_utils.rs @@ -55,7 +55,7 @@ pub async fn gen_basic_table(row_count: usize) -> StorageTable vec![0, 1, 2], ); let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); - state.init_epoch(epoch); + state.init_epoch(epoch).await.unwrap(); for idx in 0..row_count { let idx = idx as i32; diff --git a/src/stream/src/executor/now.rs b/src/stream/src/executor/now.rs index df77c459a8784..0ee7069e2aef9 100644 --- a/src/stream/src/executor/now.rs +++ b/src/stream/src/executor/now.rs @@ -127,9 +127,23 @@ impl NowExecutor { ); } for barrier in barriers { + let new_timestamp = Some(barrier.get_curr_epoch().as_scalar()); + let pause_mutation = + barrier + .mutation + .as_deref() + .and_then(|mutation| match mutation { + Mutation::Pause => Some(true), + Mutation::Resume => Some(false), + _ => None, + }); + if !initialized { + let first_epoch = barrier.epoch; + let is_pause_on_startup = barrier.is_pause_on_startup(); + yield Message::Barrier(barrier); // Handle the initial barrier. - state_table.init_epoch(barrier.epoch); + state_table.init_epoch(first_epoch).await?; let state_row = { let sub_range: &(Bound, Bound) = &(Unbounded, Unbounded); @@ -144,25 +158,20 @@ impl NowExecutor { } }; last_timestamp = state_row.and_then(|row| row[0].clone()); - paused = barrier.is_pause_on_startup(); + paused = is_pause_on_startup; initialized = true; } else { state_table.commit(barrier.epoch).await?; + yield Message::Barrier(barrier); } // Extract timestamp from the current epoch. - curr_timestamp = Some(barrier.get_curr_epoch().as_scalar()); + curr_timestamp = new_timestamp; // Update paused state. - if let Some(mutation) = barrier.mutation.as_deref() { - match mutation { - Mutation::Pause => paused = true, - Mutation::Resume => paused = false, - _ => {} - } + if let Some(pause_mutation) = pause_mutation { + paused = pause_mutation; } - - yield Message::Barrier(barrier); } // Do not yield any messages if paused. diff --git a/src/stream/src/executor/over_window/eowc.rs b/src/stream/src/executor/over_window/eowc.rs index 263b578d5d832..3974947389ee4 100644 --- a/src/stream/src/executor/over_window/eowc.rs +++ b/src/stream/src/executor/over_window/eowc.rs @@ -347,9 +347,9 @@ impl EowcOverWindowExecutor { let mut input = input.execute(); let barrier = expect_first_barrier(&mut input).await?; - this.state_table.init_epoch(barrier.epoch); - + let first_epoch = barrier.epoch; yield Message::Barrier(barrier); + this.state_table.init_epoch(first_epoch).await?; #[for_await] for msg in input { diff --git a/src/stream/src/executor/over_window/general.rs b/src/stream/src/executor/over_window/general.rs index 16bc8065f2a00..fa17d5f5a78f0 100644 --- a/src/stream/src/executor/over_window/general.rs +++ b/src/stream/src/executor/over_window/general.rs @@ -634,9 +634,9 @@ impl OverWindowExecutor { let mut input = input.execute(); let barrier = expect_first_barrier(&mut input).await?; - this.state_table.init_epoch(barrier.epoch); - + let first_epoch = barrier.epoch; yield Message::Barrier(barrier); + this.state_table.init_epoch(first_epoch).await?; #[for_await] for msg in input { diff --git a/src/stream/src/executor/simple_agg.rs b/src/stream/src/executor/simple_agg.rs index 6f85266615896..5bbd925dea722 100644 --- a/src/stream/src/executor/simple_agg.rs +++ b/src/stream/src/executor/simple_agg.rs @@ -241,9 +241,12 @@ impl SimpleAggExecutor { let mut input = input.execute(); let barrier = expect_first_barrier(&mut input).await?; - this.all_state_tables_mut().for_each(|table| { - table.init_epoch(barrier.epoch); - }); + let first_epoch = barrier.epoch; + yield Message::Barrier(barrier); + + for table in this.all_state_tables_mut() { + table.init_epoch(first_epoch).await?; + } let distinct_dedup = DistinctDeduplicater::new( &this.agg_calls, @@ -252,8 +255,6 @@ impl SimpleAggExecutor { &this.actor_ctx, ); - yield Message::Barrier(barrier); - // This will fetch previous agg states from the intermediate state table. let mut vars = ExecutionVars { agg_group: AggGroup::create( diff --git a/src/stream/src/executor/sort.rs b/src/stream/src/executor/sort.rs index 1556ebf419aaf..98f840b8349ec 100644 --- a/src/stream/src/executor/sort.rs +++ b/src/stream/src/executor/sort.rs @@ -76,8 +76,9 @@ impl SortExecutor { let mut input = input.execute(); let barrier = expect_first_barrier(&mut input).await?; - this.buffer_table.init_epoch(barrier.epoch); + let first_epoch = barrier.epoch; yield Message::Barrier(barrier); + this.buffer_table.init_epoch(first_epoch).await?; let mut vars = ExecutionVars { buffer: SortBuffer::new(this.sort_column_index, &this.buffer_table), diff --git a/src/stream/src/executor/source/fetch_executor.rs b/src/stream/src/executor/source/fetch_executor.rs index 766c42a5e2c87..ecea95a0484cf 100644 --- a/src/stream/src/executor/source/fetch_executor.rs +++ b/src/stream/src/executor/source/fetch_executor.rs @@ -193,6 +193,9 @@ impl FsFetchExecutor { async fn into_stream(mut self) { let mut upstream = self.upstream.take().unwrap().execute(); let barrier = expect_first_barrier(&mut upstream).await?; + let first_epoch = barrier.epoch; + let is_pause_on_startup = barrier.is_pause_on_startup(); + yield Message::Barrier(barrier); let mut core = self.stream_source_core.take().unwrap(); let mut state_store_handler = core.split_state_store; @@ -209,13 +212,13 @@ impl FsFetchExecutor { unreachable!("Partition and offset columns must be set."); }; // Initialize state table. - state_store_handler.init_epoch(barrier.epoch); + state_store_handler.init_epoch(first_epoch).await?; let mut splits_on_fetch: usize = 0; let mut stream = StreamReaderWithPause::::new(upstream, stream::pending().boxed()); - if barrier.is_pause_on_startup() { + if is_pause_on_startup { stream.pause_stream(); } @@ -233,8 +236,6 @@ impl FsFetchExecutor { ) .await?; - yield Message::Barrier(barrier); - while let Some(msg) = stream.next().await { match msg { Err(e) => { diff --git a/src/stream/src/executor/source/fs_source_executor.rs b/src/stream/src/executor/source/fs_source_executor.rs index 70f0ce5f4f24b..0a3af51d4bff4 100644 --- a/src/stream/src/executor/source/fs_source_executor.rs +++ b/src/stream/src/executor/source/fs_source_executor.rs @@ -299,6 +299,16 @@ impl FsSourceExecutor { self.stream_source_core.source_id ) })?; + // If the first barrier requires us to pause on startup, pause the stream. + let start_with_paused = barrier.is_pause_on_startup(); + let first_epoch = barrier.epoch; + let mut boot_state = Vec::default(); + if let Some(splits) = barrier.initial_split_assignment(self.actor_ctx.id) { + boot_state = splits.to_vec(); + } + let boot_state = boot_state; + + yield Message::Barrier(barrier); let source_desc_builder: SourceDescBuilder = self.stream_source_core.source_desc_builder.take().unwrap(); @@ -312,17 +322,10 @@ impl FsSourceExecutor { unreachable!("Partition and offset columns must be set."); }; - // If the first barrier requires us to pause on startup, pause the stream. - let start_with_paused = barrier.is_pause_on_startup(); - - let mut boot_state = Vec::default(); - if let Some(splits) = barrier.initial_split_assignment(self.actor_ctx.id) { - boot_state = splits.to_vec(); - } - self.stream_source_core .split_state_store - .init_epoch(barrier.epoch); + .init_epoch(first_epoch) + .await?; let all_completed: HashSet = self .stream_source_core @@ -368,8 +371,6 @@ impl FsSourceExecutor { stream.pause_stream(); } - yield Message::Barrier(barrier); - // We allow data to flow for 5 * `expected_barrier_latency_ms` milliseconds, considering // some other latencies like network and cost in Meta. let mut max_wait_barrier_time_ms = diff --git a/src/stream/src/executor/source/source_backfill_executor.rs b/src/stream/src/executor/source/source_backfill_executor.rs index 34f9eb12d692a..604d1983b1a14 100644 --- a/src/stream/src/executor/source/source_backfill_executor.rs +++ b/src/stream/src/executor/source/source_backfill_executor.rs @@ -326,6 +326,13 @@ impl SourceBackfillExecutorInner { // Poll the upstream to get the first barrier. let barrier = expect_first_barrier(&mut input).await?; + let first_epoch = barrier.epoch; + let owned_splits = barrier + .initial_split_assignment(self.actor_ctx.id) + .unwrap_or(&[]) + .to_vec(); + let is_pause_on_startup = barrier.is_pause_on_startup(); + yield Message::Barrier(barrier); let mut core = self.stream_source_core; @@ -338,12 +345,7 @@ impl SourceBackfillExecutorInner { unreachable!("Partition and offset columns must be set."); }; - let mut owned_splits = Vec::default(); - if let Some(splits) = barrier.initial_split_assignment(self.actor_ctx.id) { - owned_splits = splits.to_vec(); - } - - self.backfill_state_store.init_epoch(barrier.epoch); + self.backfill_state_store.init_epoch(first_epoch).await?; let mut backfill_states: BackfillStates = HashMap::new(); for split in &owned_splits { @@ -421,7 +423,7 @@ impl SourceBackfillExecutorInner { } // If the first barrier requires us to pause on startup, pause the stream. - if barrier.is_pause_on_startup() { + if is_pause_on_startup { pause_reader!(); } @@ -431,7 +433,7 @@ impl SourceBackfillExecutorInner { tokio::spawn(async move { // This is for self.backfill_finished() to be safe. // We wait for 1st epoch's curr, i.e., the 2nd epoch's prev. - let epoch = barrier.epoch.curr; + let epoch = first_epoch.curr; tracing::info!("waiting for epoch: {}", epoch); state_store .try_wait_epoch( @@ -443,7 +445,6 @@ impl SourceBackfillExecutorInner { STATE_TABLE_INITIALIZED.call_once(|| ()); tracing::info!("finished waiting for epoch: {}", epoch); }); - yield Message::Barrier(barrier); { let source_backfill_row_count = self diff --git a/src/stream/src/executor/source/source_backfill_state_table.rs b/src/stream/src/executor/source/source_backfill_state_table.rs index b0ca8d363b9d7..384a86211fdc2 100644 --- a/src/stream/src/executor/source/source_backfill_state_table.rs +++ b/src/stream/src/executor/source/source_backfill_state_table.rs @@ -40,8 +40,8 @@ impl BackfillStateTableHandler { } } - pub fn init_epoch(&mut self, epoch: EpochPair) { - self.state_store.init_epoch(epoch); + pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { + self.state_store.init_epoch(epoch).await } fn string_to_scalar(rhs: impl Into) -> ScalarImpl { diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 6d5cf710d3bb0..d5bff79b1cd83 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -430,6 +430,17 @@ impl SourceExecutor { self.stream_source_core.as_ref().unwrap().source_id ) })?; + let first_epoch = barrier.epoch; + let mut boot_state = + if let Some(splits) = barrier.initial_split_assignment(self.actor_ctx.id) { + tracing::debug!(?splits, "boot with splits"); + splits.to_vec() + } else { + Vec::default() + }; + let is_pause_on_startup = barrier.is_pause_on_startup(); + + yield Message::Barrier(barrier); let mut core = self.stream_source_core.unwrap(); @@ -447,13 +458,7 @@ impl SourceExecutor { unreachable!("Partition and offset columns must be set."); }; - let mut boot_state = Vec::default(); - if let Some(splits) = barrier.initial_split_assignment(self.actor_ctx.id) { - tracing::debug!(?splits, "boot with splits"); - boot_state = splits.to_vec(); - } - - core.split_state_store.init_epoch(barrier.epoch); + core.split_state_store.init_epoch(first_epoch).await?; for ele in &mut boot_state { if let Some(recover_state) = core @@ -506,7 +511,7 @@ impl SourceExecutor { // - For shared source, pause until there's a MV. // - If the first barrier requires us to pause on startup, pause the stream. - if (self.is_shared && is_uninitialized) || barrier.is_pause_on_startup() { + if (self.is_shared && is_uninitialized) || is_pause_on_startup { tracing::info!( is_shared = self.is_shared, is_uninitialized = is_uninitialized, @@ -515,8 +520,6 @@ impl SourceExecutor { stream.pause_stream(); } - yield Message::Barrier(barrier); - // We allow data to flow for `WAIT_BARRIER_MULTIPLE_TIMES` * `expected_barrier_latency_ms` // milliseconds, considering some other latencies like network and cost in Meta. let mut max_wait_barrier_time_ms = @@ -1098,7 +1101,10 @@ mod tests { ) .await; // there must exist state for new add partition - source_state_handler.init_epoch(EpochPair::new_test_epoch(test_epoch(2))); + source_state_handler + .init_epoch(EpochPair::new_test_epoch(test_epoch(2))) + .await + .unwrap(); source_state_handler .get(new_assignment[1].id()) .await @@ -1137,7 +1143,10 @@ mod tests { ) .await; - source_state_handler.init_epoch(EpochPair::new_test_epoch(5 * test_epoch(1))); + source_state_handler + .init_epoch(EpochPair::new_test_epoch(5 * test_epoch(1))) + .await + .unwrap(); assert!(source_state_handler .try_recover_from_state_store(&prev_assignment[0]) diff --git a/src/stream/src/executor/source/state_table_handler.rs b/src/stream/src/executor/source/state_table_handler.rs index 1c9615c542bfe..dfc12fcd710d7 100644 --- a/src/stream/src/executor/source/state_table_handler.rs +++ b/src/stream/src/executor/source/state_table_handler.rs @@ -71,8 +71,8 @@ impl SourceStateTableHandler { } } - pub fn init_epoch(&mut self, epoch: EpochPair) { - self.state_table.init_epoch(epoch); + pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { + self.state_table.init_epoch(epoch).await } fn string_to_scalar(rhs: impl Into) -> ScalarImpl { @@ -283,7 +283,7 @@ pub(crate) mod tests { let init_epoch = EpochPair::new_test_epoch(init_epoch_num); let next_epoch = EpochPair::new_test_epoch(init_epoch_num + test_epoch(1)); - state_table.init_epoch(init_epoch); + state_table.init_epoch(init_epoch).await.unwrap(); state_table.insert(OwnedRow::new(vec![a.clone(), b.clone()])); state_table.commit(next_epoch).await.unwrap(); @@ -308,7 +308,7 @@ pub(crate) mod tests { let epoch_2 = EpochPair::new_test_epoch(test_epoch(2)); let epoch_3 = EpochPair::new_test_epoch(test_epoch(3)); - state_table_handler.init_epoch(epoch_1); + state_table_handler.init_epoch(epoch_1).await?; state_table_handler .set_states(vec![split_impl.clone()]) .await?; diff --git a/src/stream/src/executor/temporal_join.rs b/src/stream/src/executor/temporal_join.rs index 8a994acb5e8db..b3fc61aa8a24f 100644 --- a/src/stream/src/executor/temporal_join.rs +++ b/src/stream/src/executor/temporal_join.rs @@ -811,19 +811,27 @@ impl { + let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.ctx.id); + let barrier_epoch = barrier.epoch; if !APPEND_ONLY { if wait_first_barrier { wait_first_barrier = false; - self.memo_table.as_mut().unwrap().init_epoch(barrier.epoch); + yield Message::Barrier(barrier); + self.memo_table + .as_mut() + .unwrap() + .init_epoch(barrier_epoch) + .await?; } else { self.memo_table .as_mut() .unwrap() .commit(barrier.epoch) .await?; + yield Message::Barrier(barrier); } } - if let Some(vnodes) = barrier.as_update_vnode_bitmap(self.ctx.id) { + if let Some(vnodes) = update_vnode_bitmap { let prev_vnodes = self.right_table.source.update_vnode_bitmap(vnodes.clone()); if cache_may_stale(&prev_vnodes, &vnodes) { @@ -835,8 +843,7 @@ impl StreamExecutorResult<()> { - self.managed_state.init_epoch(epoch); - Ok(()) + async fn init_after_yield_barrier(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { + self.managed_state.init_epoch(epoch).await } async fn handle_watermark(&mut self, watermark: Watermark) -> Option { diff --git a/src/stream/src/executor/top_n/group_top_n_appendonly.rs b/src/stream/src/executor/top_n/group_top_n_appendonly.rs index 2f9bd2d547025..597c43d8ff940 100644 --- a/src/stream/src/executor/top_n/group_top_n_appendonly.rs +++ b/src/stream/src/executor/top_n/group_top_n_appendonly.rs @@ -201,9 +201,8 @@ where self.caches.evict() } - async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { - self.managed_state.init_epoch(epoch); - Ok(()) + async fn init_after_yield_barrier(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { + self.managed_state.init_epoch(epoch).await } async fn handle_watermark(&mut self, watermark: Watermark) -> Option { diff --git a/src/stream/src/executor/top_n/top_n_appendonly.rs b/src/stream/src/executor/top_n/top_n_appendonly.rs index 1b45c82c9c83a..8a195af8685dc 100644 --- a/src/stream/src/executor/top_n/top_n_appendonly.rs +++ b/src/stream/src/executor/top_n/top_n_appendonly.rs @@ -135,8 +135,8 @@ where self.managed_state.try_flush().await } - async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { - self.managed_state.init_epoch(epoch); + async fn init_after_yield_barrier(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { + self.managed_state.init_epoch(epoch).await?; self.managed_state .init_topn_cache(NO_GROUP_KEY, &mut self.cache) .await diff --git a/src/stream/src/executor/top_n/top_n_plain.rs b/src/stream/src/executor/top_n/top_n_plain.rs index 5c9370a9f35db..4f4835584e47e 100644 --- a/src/stream/src/executor/top_n/top_n_plain.rs +++ b/src/stream/src/executor/top_n/top_n_plain.rs @@ -169,8 +169,8 @@ where self.managed_state.try_flush().await } - async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { - self.managed_state.init_epoch(epoch); + async fn init_after_yield_barrier(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { + self.managed_state.init_epoch(epoch).await?; self.managed_state .init_topn_cache(NO_GROUP_KEY, &mut self.cache) .await diff --git a/src/stream/src/executor/top_n/top_n_state.rs b/src/stream/src/executor/top_n/top_n_state.rs index 52e5fd176a97f..919197f48ed32 100644 --- a/src/stream/src/executor/top_n/top_n_state.rs +++ b/src/stream/src/executor/top_n/top_n_state.rs @@ -68,8 +68,8 @@ impl ManagedTopNState { } /// Init epoch for the managed state table. - pub fn init_epoch(&mut self, epoch: EpochPair) { - self.state_table.init_epoch(epoch) + pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { + self.state_table.init_epoch(epoch).await } /// Update vnode bitmap of state table, returning `cache_may_stale`. @@ -352,7 +352,9 @@ mod tests { &[0, 1], ) .await; - tb.init_epoch(EpochPair::new_test_epoch(test_epoch(1))); + tb.init_epoch(EpochPair::new_test_epoch(test_epoch(1))) + .await + .unwrap(); tb }; @@ -432,7 +434,9 @@ mod tests { &[0, 1], ) .await; - tb.init_epoch(EpochPair::new_test_epoch(test_epoch(1))); + tb.init_epoch(EpochPair::new_test_epoch(test_epoch(1))) + .await + .unwrap(); tb }; @@ -479,7 +483,9 @@ mod tests { &[0, 1], ) .await; - tb.init_epoch(EpochPair::new_test_epoch(test_epoch(1))); + tb.init_epoch(EpochPair::new_test_epoch(test_epoch(1))) + .await + .unwrap(); tb }; diff --git a/src/stream/src/executor/top_n/utils.rs b/src/stream/src/executor/top_n/utils.rs index 207c6a6d9366a..2c9a04532da67 100644 --- a/src/stream/src/executor/top_n/utils.rs +++ b/src/stream/src/executor/top_n/utils.rs @@ -50,7 +50,10 @@ pub trait TopNExecutorBase: Send + 'static { fn evict(&mut self) {} - fn init(&mut self, epoch: EpochPair) -> impl Future> + Send; + fn init_after_yield_barrier( + &mut self, + epoch: EpochPair, + ) -> impl Future> + Send; /// Handle incoming watermarks fn handle_watermark( @@ -87,9 +90,9 @@ where let mut input = self.input.execute(); let barrier = expect_first_barrier(&mut input).await?; - self.inner.init(barrier.epoch).await?; - + let barrier_epoch = barrier.epoch; yield Message::Barrier(barrier); + self.inner.init_after_yield_barrier(barrier_epoch).await?; #[for_await] for msg in input { diff --git a/src/stream/src/executor/watermark_filter.rs b/src/stream/src/executor/watermark_filter.rs index 01497c37fdab5..2364d0dfa6592 100644 --- a/src/stream/src/executor/watermark_filter.rs +++ b/src/stream/src/executor/watermark_filter.rs @@ -98,10 +98,11 @@ impl WatermarkFilterExecutor { let mut input = input.execute(); let first_barrier = expect_first_barrier(&mut input).await?; - let prev_epoch = first_barrier.epoch.prev; - table.init_epoch(first_barrier.epoch); + let first_epoch = first_barrier.epoch; // The first barrier message should be propagated. yield Message::Barrier(first_barrier); + let prev_epoch = first_epoch.prev; + table.init_epoch(first_epoch).await?; // Initiate and yield the first watermark. let mut current_watermark = Self::get_global_max_watermark( diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 139f58a696f42..14d4526eeb3a8 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -49,7 +49,7 @@ pub use progress::CreateMviewProgressReporter; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::runtime::BackgroundShutdownRuntime; use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; -use risingwave_hummock_sdk::{HummockVersionId, LocalSstableInfo, SyncResult}; +use risingwave_hummock_sdk::{LocalSstableInfo, SyncResult}; use risingwave_pb::stream_plan::barrier::BarrierKind; use risingwave_pb::stream_service::streaming_control_stream_request::{InitRequest, Request}; use risingwave_pb::stream_service::streaming_control_stream_response::{ @@ -334,7 +334,7 @@ impl LocalBarrierWorker { match actor_op { LocalActorOperation::NewControlStream { handle, init_request } => { self.control_stream_handle.reset_stream_with_err(Status::internal("control stream has been reset to a new one")); - self.reset(HummockVersionId::new(init_request.version_id)).await; + self.reset().await; self.state.add_subscriptions(init_request.subscriptions); self.control_stream_handle = handle; self.control_stream_handle.send_response(StreamingControlStreamResponse { @@ -916,7 +916,6 @@ pub(crate) mod barrier_test_utils { UnboundedReceiverStream::new(request_rx).boxed(), ), init_request: InitRequest { - version_id: 0, subscriptions: vec![], }, }); diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 799f98fc1a848..361c5d9582d64 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -28,7 +28,6 @@ use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{ColumnId, Field, Schema, TableId}; use risingwave_common::config::MetricLevel; use risingwave_common::{bail, must_match}; -use risingwave_hummock_sdk::HummockVersionId; use risingwave_pb::common::ActorInfo; use risingwave_pb::plan_common::StorageTableDesc; use risingwave_pb::stream_plan; @@ -249,7 +248,7 @@ impl LocalStreamManager { impl LocalBarrierWorker { /// Force stop all actors on this worker, and then drop their resources. - pub(super) async fn reset(&mut self, version_id: HummockVersionId) { + pub(super) async fn reset(&mut self) { self.state.abort_actors().await; if let Some(m) = self.actor_manager.await_tree_reg.as_ref() { m.clear(); @@ -257,7 +256,7 @@ impl LocalBarrierWorker { if let Some(hummock) = self.actor_manager.env.state_store().as_hummock() { hummock - .clear_shared_buffer(version_id) + .clear_shared_buffer() .verbose_instrument_await("store_clear_shared_buffer") .await } From e2f66a33a9fa6680c30a80ff88f7eea65af247c8 Mon Sep 17 00:00:00 2001 From: William Wen Date: Fri, 1 Nov 2024 14:05:19 +0800 Subject: [PATCH 2/9] fix --- src/stream/src/executor/approx_percentile/global.rs | 5 +++-- src/stream/src/executor/approx_percentile/global_state.rs | 5 ++++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/stream/src/executor/approx_percentile/global.rs b/src/stream/src/executor/approx_percentile/global.rs index 2ccff36c47390..c7366d3e67a4f 100644 --- a/src/stream/src/executor/approx_percentile/global.rs +++ b/src/stream/src/executor/approx_percentile/global.rs @@ -52,9 +52,10 @@ impl GlobalApproxPercentileExecutor { // Initialize state let mut input_stream = self.input.execute(); let first_barrier = expect_first_barrier(&mut input_stream).await?; - let mut state = self.state; - state.init(first_barrier.epoch).await?; + let first_epoch = first_barrier.epoch; yield Message::Barrier(first_barrier); + let mut state = self.state; + state.init_after_yield_barrier(first_epoch).await?; // Get row count state, and row_count. #[for_await] diff --git a/src/stream/src/executor/approx_percentile/global_state.rs b/src/stream/src/executor/approx_percentile/global_state.rs index 3f95a57e22613..ca53cd4c067ce 100644 --- a/src/stream/src/executor/approx_percentile/global_state.rs +++ b/src/stream/src/executor/approx_percentile/global_state.rs @@ -58,7 +58,10 @@ impl GlobalApproxPercentileState { } } - pub async fn init(&mut self, init_epoch: EpochPair) -> StreamExecutorResult<()> { + pub async fn init_after_yield_barrier( + &mut self, + init_epoch: EpochPair, + ) -> StreamExecutorResult<()> { // Init state tables. self.count_state_table.init_epoch(init_epoch).await?; self.bucket_state_table.init_epoch(init_epoch).await?; From 4f3653689716a9ea6cce1f0dfec38c600c8ee023 Mon Sep 17 00:00:00 2001 From: William Wen Date: Fri, 1 Nov 2024 16:21:53 +0800 Subject: [PATCH 3/9] fix ut --- src/connector/src/sink/log_store.rs | 8 +-- .../hummock_test/src/compactor_tests.rs | 37 +++++++++++-- .../hummock_test/src/hummock_storage_tests.rs | 52 ++++++++++++++++--- .../src/local_state_store_test_utils.rs | 8 +++ .../hummock_test/src/state_store_tests.rs | 3 +- .../src/hummock/store/hummock_storage.rs | 7 ++- .../src/common/log_store_impl/in_mem.rs | 4 +- .../common/log_store_impl/kv_log_store/mod.rs | 30 +++++------ .../log_store_impl/kv_log_store/reader.rs | 4 +- .../log_store_impl/kv_log_store/writer.rs | 2 +- src/stream/src/executor/sink.rs | 9 ++-- src/stream/src/executor/temporal_join.rs | 2 + 12 files changed, 125 insertions(+), 41 deletions(-) diff --git a/src/connector/src/sink/log_store.rs b/src/connector/src/sink/log_store.rs index 3c10f0639c47a..d7240bb4a4dd4 100644 --- a/src/connector/src/sink/log_store.rs +++ b/src/connector/src/sink/log_store.rs @@ -119,7 +119,7 @@ pub enum LogStoreReadItem { pub trait LogWriter: Send { /// Initialize the log writer with an epoch - fn init( + fn init_after_yield_barrier( &mut self, epoch: EpochPair, pause_read_on_bootstrap: bool, @@ -354,7 +354,7 @@ pub struct LogWriterMetrics { } impl LogWriter for MonitoredLogWriter { - async fn init( + async fn init_after_yield_barrier( &mut self, epoch: EpochPair, pause_read_on_bootstrap: bool, @@ -365,7 +365,9 @@ impl LogWriter for MonitoredLogWriter { self.metrics .log_store_latest_write_epoch .set(epoch.curr as _); - self.inner.init(epoch, pause_read_on_bootstrap).await + self.inner + .init_after_yield_barrier(epoch, pause_read_on_bootstrap) + .await } async fn write_chunk(&mut self, chunk: StreamChunk) -> LogStoreResult<()> { diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 3b7054a51d864..d0ba14195ed2a 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -724,7 +724,11 @@ pub(crate) mod tests { let val = Bytes::from(b"0"[..].to_vec()); // 1 Byte value let kv_count = 11; - // let base_epoch = Epoch(0); + let prev_epoch: u64 = hummock_manager_ref + .get_current_version() + .await + .table_committed_epoch(existing_table_id.into()) + .unwrap(); let base_epoch = Epoch::now(); let mut epoch: u64 = base_epoch.0; let millisec_interval_epoch: u64 = (1 << 16) * 100; @@ -741,7 +745,10 @@ pub(crate) mod tests { let next_epoch = epoch + millisec_interval_epoch; storage.start_epoch(next_epoch, table_id_set.clone()); if i == 0 { - local.init_for_test(epoch).await.unwrap(); + local + .init_for_test_with_prev_epoch(epoch, prev_epoch) + .await + .unwrap(); } epoch_set.insert(epoch); let mut prefix = BytesMut::default(); @@ -935,6 +942,11 @@ pub(crate) mod tests { // 1. add sstables let val = Bytes::from(b"0"[..].to_vec()); // 1 Byte value let kv_count = 11; + let prev_epoch: u64 = hummock_manager_ref + .get_current_version() + .await + .table_committed_epoch(existing_table_id.into()) + .unwrap(); // let base_epoch = Epoch(0); let base_epoch = Epoch::now(); let mut epoch: u64 = base_epoch.0; @@ -948,7 +960,10 @@ pub(crate) mod tests { storage.start_epoch(epoch, table_id_set.clone()); for i in 0..kv_count { if i == 0 { - local.init_for_test(epoch).await.unwrap(); + local + .init_for_test_with_prev_epoch(epoch, prev_epoch) + .await + .unwrap(); } let next_epoch = epoch + millisec_interval_epoch; storage.start_epoch(next_epoch, table_id_set.clone()); @@ -1912,11 +1927,23 @@ pub(crate) mod tests { let table_id_set = HashSet::from_iter(vec![local_1.0.table_id(), local_2.0.table_id()].into_iter()); + let version = hummock_meta_client.get_current_version().await.unwrap(); + storage.start_epoch(*epoch, table_id_set.clone()); for i in 0..kv_count { if i == 0 && *is_init { - local_1.0.init_for_test(*epoch).await.unwrap(); - local_2.0.init_for_test(*epoch).await.unwrap(); + let prev_epoch_1 = version.table_committed_epoch(local_1.0.table_id()).unwrap(); + local_1 + .0 + .init_for_test_with_prev_epoch(*epoch, prev_epoch_1) + .await + .unwrap(); + let prev_epoch_2 = version.table_committed_epoch(local_2.0.table_id()).unwrap(); + local_2 + .0 + .init_for_test_with_prev_epoch(*epoch, prev_epoch_2) + .await + .unwrap(); *is_init = false; } diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index 4e6ab26a539c6..49cfff9854d7b 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -476,7 +476,7 @@ async fn test_state_store_sync() { .committed() .table_committed_epoch(TEST_TABLE_ID) .unwrap(); - let epoch1 = test_epoch(base_epoch.next_epoch()); + let epoch1 = base_epoch.next_epoch(); test_env .storage .start_epoch(epoch1, HashSet::from_iter([TEST_TABLE_ID])); @@ -1133,6 +1133,13 @@ async fn test_iter_with_min_epoch() { .new_local(NewLocalOptions::for_test(TEST_TABLE_ID)) .await; + let prev_epoch = test_env + .manager + .get_current_version() + .await + .table_committed_epoch(TEST_TABLE_ID) + .unwrap(); + let epoch1 = (31 * 1000) << 16; test_env .storage @@ -1149,7 +1156,10 @@ async fn test_iter_with_min_epoch() { .map(|index| (gen_key(index), StorageValue::new_put(gen_val(index)))) .collect(); - hummock_storage.init_for_test(epoch1).await.unwrap(); + hummock_storage + .init_for_test_with_prev_epoch(epoch1, prev_epoch) + .await + .unwrap(); hummock_storage .ingest_batch( @@ -1422,7 +1432,16 @@ async fn test_hummock_version_reader() { .map(|index| (gen_key(index), StorageValue::new_put(gen_val(index)))) .collect(); { - hummock_storage.init_for_test(epoch1).await.unwrap(); + let prev_epoch = test_env + .manager + .get_current_version() + .await + .table_committed_epoch(TEST_TABLE_ID) + .unwrap(); + hummock_storage + .init_for_test_with_prev_epoch(epoch1, prev_epoch) + .await + .unwrap(); hummock_storage .ingest_batch( batch_epoch1, @@ -1852,7 +1871,16 @@ async fn test_get_with_min_epoch() { test_env .storage .start_epoch(epoch1, HashSet::from_iter([TEST_TABLE_ID])); - hummock_storage.init_for_test(epoch1).await.unwrap(); + let prev_epoch = test_env + .manager + .get_current_version() + .await + .table_committed_epoch(TEST_TABLE_ID) + .unwrap(); + hummock_storage + .init_for_test_with_prev_epoch(epoch1, prev_epoch) + .await + .unwrap(); let gen_key = |index: usize| -> TableKey { gen_key_from_str(VirtualNode::ZERO, format!("key_{}", index).as_str()) @@ -2125,9 +2153,21 @@ async fn test_table_watermark() { test_env .storage .start_epoch(epoch1, HashSet::from_iter([TEST_TABLE_ID])); - local1.init_for_test(epoch1).await.unwrap(); + let prev_epoch = test_env + .manager + .get_current_version() + .await + .table_committed_epoch(TEST_TABLE_ID) + .unwrap(); + local1 + .init_for_test_with_prev_epoch(epoch1, prev_epoch) + .await + .unwrap(); local1.update_vnode_bitmap(vnode_bitmap1.clone()); - local2.init_for_test(epoch1).await.unwrap(); + local2 + .init_for_test_with_prev_epoch(epoch1, prev_epoch) + .await + .unwrap(); local2.update_vnode_bitmap(vnode_bitmap2.clone()); fn gen_inner_key(index: usize) -> Bytes { diff --git a/src/storage/hummock_test/src/local_state_store_test_utils.rs b/src/storage/hummock_test/src/local_state_store_test_utils.rs index fed253a0488df..650c55774f42b 100644 --- a/src/storage/hummock_test/src/local_state_store_test_utils.rs +++ b/src/storage/hummock_test/src/local_state_store_test_utils.rs @@ -22,5 +22,13 @@ pub trait LocalStateStoreTestExt: LocalStateStore { fn init_for_test(&mut self, epoch: u64) -> impl Future> + Send + '_ { self.init(InitOptions::new(EpochPair::new_test_epoch(epoch))) } + + fn init_for_test_with_prev_epoch( + &mut self, + epoch: u64, + prev_epoch: u64, + ) -> impl Future> + Send + '_ { + self.init(InitOptions::new(EpochPair::new(epoch, prev_epoch))) + } } impl LocalStateStoreTestExt for T {} diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index 0893e44e9a1b5..914517ff83336 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -1478,7 +1478,8 @@ async fn test_replicated_local_hummock_storage() { .new_local(NewLocalOptions::for_test(TEST_TABLE_ID)) .await; - local_hummock_storage_2.init_for_test(epoch2).await.unwrap(); + local_hummock_storage_2.init_for_test(epoch1).await.unwrap(); + local_hummock_storage_2.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); // ingest 16B batch let mut batch2 = vec![ diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index f1e41f2b1a621..3b4a143d0ad53 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -30,7 +30,7 @@ use risingwave_hummock_sdk::key::{ use risingwave_hummock_sdk::sstable_info::SstableInfo; use risingwave_hummock_sdk::table_watermark::TableWatermarksIndex; use risingwave_hummock_sdk::version::HummockVersion; -use risingwave_hummock_sdk::{HummockReadEpoch, HummockSstableObjectId, HummockVersionId}; +use risingwave_hummock_sdk::{HummockReadEpoch, HummockSstableObjectId}; use risingwave_rpc_client::HummockMetaClient; use thiserror_ext::AsReport; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; @@ -787,7 +787,10 @@ impl HummockStorage { &self.hummock_version_reader } - pub async fn wait_version_update(&self, old_id: HummockVersionId) -> HummockVersionId { + pub async fn wait_version_update( + &self, + old_id: risingwave_hummock_sdk::HummockVersionId, + ) -> risingwave_hummock_sdk::HummockVersionId { use tokio::task::yield_now; loop { let cur_id = self.recent_versions.load().latest_version().id(); diff --git a/src/stream/src/common/log_store_impl/in_mem.rs b/src/stream/src/common/log_store_impl/in_mem.rs index 16de320aa7edd..16a8c89bbeac1 100644 --- a/src/stream/src/common/log_store_impl/in_mem.rs +++ b/src/stream/src/common/log_store_impl/in_mem.rs @@ -245,7 +245,7 @@ impl LogReader for BoundedInMemLogStoreReader { } impl LogWriter for BoundedInMemLogStoreWriter { - async fn init( + async fn init_after_yield_barrier( &mut self, epoch: EpochPair, _pause_read_on_bootstrap: bool, @@ -361,7 +361,7 @@ mod tests { let mut join_handle = tokio::spawn(async move { writer - .init(EpochPair::new_test_epoch(init_epoch), false) + .init_after_yield_barrier(EpochPair::new_test_epoch(init_epoch), false) .await .unwrap(); writer diff --git a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs index 490c5bf9abca3..73267b6399ac7 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs @@ -514,7 +514,7 @@ mod tests { .storage .start_epoch(epoch1, HashSet::from_iter([TableId::new(table.id)])); writer - .init(EpochPair::new_test_epoch(epoch1), false) + .init_after_yield_barrier(EpochPair::new_test_epoch(epoch1), false) .await .unwrap(); writer.write_chunk(stream_chunk1.clone()).await.unwrap(); @@ -624,7 +624,7 @@ mod tests { .storage .start_epoch(epoch1, HashSet::from_iter([TableId::new(table.id)])); writer - .init(EpochPair::new_test_epoch(epoch1), false) + .init_after_yield_barrier(EpochPair::new_test_epoch(epoch1), false) .await .unwrap(); writer.write_chunk(stream_chunk1.clone()).await.unwrap(); @@ -713,7 +713,7 @@ mod tests { .storage .start_epoch(epoch3, HashSet::from_iter([TableId::new(table.id)])); writer - .init(EpochPair::new_test_epoch(epoch3), false) + .init_after_yield_barrier(EpochPair::new_test_epoch(epoch3), false) .await .unwrap(); reader.init().await.unwrap(); @@ -814,7 +814,7 @@ mod tests { .storage .start_epoch(epoch1, HashSet::from_iter([TableId::new(table.id)])); writer - .init(EpochPair::new_test_epoch(epoch1), false) + .init_after_yield_barrier(EpochPair::new_test_epoch(epoch1), false) .await .unwrap(); writer.write_chunk(stream_chunk1_1.clone()).await.unwrap(); @@ -930,7 +930,7 @@ mod tests { .storage .start_epoch(epoch3, HashSet::from_iter([TableId::new(table.id)])); writer - .init(EpochPair::new_test_epoch(epoch3), false) + .init_after_yield_barrier(EpochPair::new_test_epoch(epoch3), false) .await .unwrap(); @@ -1043,11 +1043,11 @@ mod tests { .storage .start_epoch(epoch1, HashSet::from_iter([TableId::new(table.id)])); writer1 - .init(EpochPair::new_test_epoch(epoch1), false) + .init_after_yield_barrier(EpochPair::new_test_epoch(epoch1), false) .await .unwrap(); writer2 - .init(EpochPair::new_test_epoch(epoch1), false) + .init_after_yield_barrier(EpochPair::new_test_epoch(epoch1), false) .await .unwrap(); reader1.init().await.unwrap(); @@ -1166,7 +1166,7 @@ mod tests { .storage .start_epoch(epoch3, HashSet::from_iter([TableId::new(table.id)])); writer - .init(EpochPair::new(epoch3, epoch2), false) + .init_after_yield_barrier(EpochPair::new(epoch3, epoch2), false) .await .unwrap(); reader.init().await.unwrap(); @@ -1238,7 +1238,7 @@ mod tests { .storage .start_epoch(epoch1, HashSet::from_iter([TableId::new(table.id)])); writer - .init(EpochPair::new_test_epoch(epoch1), false) + .init_after_yield_barrier(EpochPair::new_test_epoch(epoch1), false) .await .unwrap(); writer.write_chunk(stream_chunk1.clone()).await.unwrap(); @@ -1378,7 +1378,7 @@ mod tests { .storage .start_epoch(epoch1, HashSet::from_iter([TableId::new(table.id)])); writer - .init(EpochPair::new_test_epoch(epoch1), false) + .init_after_yield_barrier(EpochPair::new_test_epoch(epoch1), false) .await .unwrap(); writer.write_chunk(stream_chunk1.clone()).await.unwrap(); @@ -1487,7 +1487,7 @@ mod tests { .storage .start_epoch(epoch4, HashSet::from_iter([TableId::new(table.id)])); writer - .init(EpochPair::new(epoch4, epoch3), false) + .init_after_yield_barrier(EpochPair::new(epoch4, epoch3), false) .await .unwrap(); @@ -1548,7 +1548,7 @@ mod tests { let (mut reader, mut writer) = factory.build().await; writer - .init(EpochPair::new(epoch4, epoch3), false) + .init_after_yield_barrier(EpochPair::new(epoch4, epoch3), false) .await .unwrap(); writer.write_chunk(stream_chunk4.clone()).await.unwrap(); @@ -1718,7 +1718,7 @@ mod tests { .storage .start_epoch(epoch1, HashSet::from_iter([TableId::new(table.id)])); writer - .init(EpochPair::new_test_epoch(epoch1), false) + .init_after_yield_barrier(EpochPair::new_test_epoch(epoch1), false) .await .unwrap(); writer.write_chunk(stream_chunk1.clone()).await.unwrap(); @@ -1787,7 +1787,7 @@ mod tests { .storage .start_epoch(epoch3, HashSet::from_iter([TableId::new(table.id)])); writer - .init(EpochPair::new_test_epoch(epoch3), false) + .init_after_yield_barrier(EpochPair::new_test_epoch(epoch3), false) .await .unwrap(); reader.init().await.unwrap(); @@ -1851,7 +1851,7 @@ mod tests { .storage .start_epoch(epoch4, HashSet::from_iter([TableId::new(table.id)])); writer - .init(EpochPair::new_test_epoch(epoch4), false) + .init_after_yield_barrier(EpochPair::new_test_epoch(epoch4), false) .await .unwrap(); reader.init().await.unwrap(); diff --git a/src/stream/src/common/log_store_impl/kv_log_store/reader.rs b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs index c84db97002b02..b2b19c45bd0a5 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/reader.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs @@ -110,7 +110,9 @@ impl RewindDelay { self.rewind_count.inc(); if let Some(delay) = self.backoff_policy.next() { self.rewind_delay.observe(delay.as_secs_f64()); - sleep(delay).await; + if !cfg!(test) { + sleep(delay).await; + } } } } diff --git a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs index 93e4c1211fe61..d23f6bdba0d8d 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs @@ -78,7 +78,7 @@ impl KvLogStoreWriter { } impl LogWriter for KvLogStoreWriter { - async fn init( + async fn init_after_yield_barrier( &mut self, epoch: EpochPair, pause_read_on_bootstrap: bool, diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 206b8c4989817..1f06affda6d11 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -278,18 +278,17 @@ impl SinkExecutor { ) { pin_mut!(input); let barrier = expect_first_barrier(&mut input).await?; - let epoch_pair = barrier.epoch; + let is_pause_on_startup = barrier.is_pause_on_startup(); + // Propagate the first barrier + yield Message::Barrier(barrier); log_writer - .init(epoch_pair, barrier.is_pause_on_startup()) + .init_after_yield_barrier(epoch_pair, is_pause_on_startup) .await?; let mut is_paused = false; - // Propagate the first barrier - yield Message::Barrier(barrier); - #[for_await] for msg in input { match msg? { diff --git a/src/stream/src/executor/temporal_join.rs b/src/stream/src/executor/temporal_join.rs index b3fc61aa8a24f..c1d5cbc0486d7 100644 --- a/src/stream/src/executor/temporal_join.rs +++ b/src/stream/src/executor/temporal_join.rs @@ -830,6 +830,8 @@ impl Date: Mon, 4 Nov 2024 11:45:37 +0800 Subject: [PATCH 4/9] fix bench --- src/ctl/src/cmd_impl/bench.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/ctl/src/cmd_impl/bench.rs b/src/ctl/src/cmd_impl/bench.rs index 3ea966ea9c63a..eeba749a14836 100644 --- a/src/ctl/src/cmd_impl/bench.rs +++ b/src/ctl/src/cmd_impl/bench.rs @@ -18,7 +18,7 @@ use std::sync::atomic::AtomicU64; use std::sync::Arc; use std::time::Instant; -use anyhow::Result; +use anyhow::{anyhow, Result}; use clap::Subcommand; use futures::future::try_join_all; use futures::{pin_mut, Future, StreamExt}; @@ -97,6 +97,11 @@ pub async fn do_bench(context: &CtlContext, cmd: BenchCommands) -> Result<()> { )?) .await?; let table = get_table_catalog(meta.clone(), mv_name).await?; + let committed_epoch = hummock + .inner() + .get_pinned_version() + .table_committed_epoch(table.id) + .ok_or_else(|| anyhow!("table id {} not exist", table.id))?; let mut handlers = vec![]; for i in 0..threads { let table = table.clone(); @@ -107,7 +112,8 @@ pub async fn do_bench(context: &CtlContext, cmd: BenchCommands) -> Result<()> { tracing::info!(thread = i, "starting scan"); let state_table = { let mut tb = make_state_table(hummock, &table).await; - tb.init_epoch(EpochPair::new_test_epoch(u64::MAX)).await?; + tb.init_epoch(EpochPair::new(u64::MAX, committed_epoch)) + .await?; tb }; loop { From ac3825c9f5befb6ea5ea79b6172e740c011ba41c Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Mon, 4 Nov 2024 17:27:54 +0800 Subject: [PATCH 5/9] fix --- src/connector/src/parser/mysql.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/connector/src/parser/mysql.rs b/src/connector/src/parser/mysql.rs index e9a8eeba70cb3..9970ad50003c1 100644 --- a/src/connector/src/parser/mysql.rs +++ b/src/connector/src/parser/mysql.rs @@ -75,6 +75,14 @@ pub fn mysql_datum_to_rw_datum( ) -> Result { match rw_data_type { DataType::Boolean => { + // TinyInt(1) is used to represent boolean in MySQL + // This handles backwards compatibility, + // before https://github.com/risingwavelabs/risingwave/pull/19071 + // we permit boolean and tinyint(1) to be equivalent to boolean in RW. + if let Some(Ok(val)) = mysql_row.get_opt::, _>(mysql_datum_index) { + let _ = mysql_row.take::(mysql_datum_index); + return Ok(val.map(ScalarImpl::from)); + } // Bit(1) match mysql_row.take_opt::>, _>(mysql_datum_index) { None => bail!( From 1e9d0ab30052de8990ec78866bb65261ff304747 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Mon, 4 Nov 2024 17:38:31 +0800 Subject: [PATCH 6/9] test --- e2e_test/source_legacy/cdc/cdc.share_stream.slt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/e2e_test/source_legacy/cdc/cdc.share_stream.slt b/e2e_test/source_legacy/cdc/cdc.share_stream.slt index 53148c66836e8..63eebe5e36375 100644 --- a/e2e_test/source_legacy/cdc/cdc.share_stream.slt +++ b/e2e_test/source_legacy/cdc/cdc.share_stream.slt @@ -232,10 +232,10 @@ SELECT order_id,order_date,customer_name,product_id FROM orders_test order by or 10003 2020-07-30 12:00:30 Edward 106 query IIIIITTTTTTTTT -SELECT c_tinyint, c_smallint, c_mediumint, c_integer, c_bigint, c_decimal, c_float, c_double, c_char_255, c_varchar_10000, c_date, c_time, c_datetime, c_timestamp FROM mysql_all_types order by c_bigint; +SELECT c_boolean, c_bit, c_tinyint, c_smallint, c_mediumint, c_integer, c_bigint, c_decimal, c_float, c_double, c_char_255, c_varchar_10000, c_date, c_time, c_datetime, c_timestamp FROM mysql_all_types order by c_bigint; ---- --128 -32767 -8388608 -2147483647 -9223372036854775807 -10 -10000 -10000 a b 1001-01-01 00:00:00 1998-01-01 00:00:00 1970-01-01 00:00:01+00:00 -NULL NULL -8388608 -2147483647 9223372036854775806 -10 -10000 -10000 c d 1001-01-01 NULL 2000-01-01 00:00:00 NULL +t, t, -128 -32767 -8388608 -2147483647 -9223372036854775807 -10 -10000 -10000 a b 1001-01-01 00:00:00 1998-01-01 00:00:00 1970-01-01 00:00:01+00:00 +f, f, NULL NULL -8388608 -2147483647 9223372036854775806 -10 -10000 -10000 c d 1001-01-01 NULL 2000-01-01 00:00:00 NULL statement ok create secret pg_pwd with ( From d9894c8695ee27f3188f5c42f8a360f875e71a3d Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 4 Nov 2024 18:05:38 +0800 Subject: [PATCH 7/9] remove comma --- e2e_test/source_legacy/cdc/cdc.share_stream.slt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/e2e_test/source_legacy/cdc/cdc.share_stream.slt b/e2e_test/source_legacy/cdc/cdc.share_stream.slt index 63eebe5e36375..cf1000957b6fb 100644 --- a/e2e_test/source_legacy/cdc/cdc.share_stream.slt +++ b/e2e_test/source_legacy/cdc/cdc.share_stream.slt @@ -234,8 +234,8 @@ SELECT order_id,order_date,customer_name,product_id FROM orders_test order by or query IIIIITTTTTTTTT SELECT c_boolean, c_bit, c_tinyint, c_smallint, c_mediumint, c_integer, c_bigint, c_decimal, c_float, c_double, c_char_255, c_varchar_10000, c_date, c_time, c_datetime, c_timestamp FROM mysql_all_types order by c_bigint; ---- -t, t, -128 -32767 -8388608 -2147483647 -9223372036854775807 -10 -10000 -10000 a b 1001-01-01 00:00:00 1998-01-01 00:00:00 1970-01-01 00:00:01+00:00 -f, f, NULL NULL -8388608 -2147483647 9223372036854775806 -10 -10000 -10000 c d 1001-01-01 NULL 2000-01-01 00:00:00 NULL +t t -128 -32767 -8388608 -2147483647 -9223372036854775807 -10 -10000 -10000 a b 1001-01-01 00:00:00 1998-01-01 00:00:00 1970-01-01 00:00:01+00:00 +f f NULL NULL -8388608 -2147483647 9223372036854775806 -10 -10000 -10000 c d 1001-01-01 NULL 2000-01-01 00:00:00 NULL statement ok create secret pg_pwd with ( From d4cb8b5326a0063aa069efddbd1a631b0e9a353e Mon Sep 17 00:00:00 2001 From: William Wen Date: Tue, 12 Nov 2024 14:09:02 +0800 Subject: [PATCH 8/9] address comment --- src/connector/src/sink/log_store.rs | 8 ++--- .../src/common/log_store_impl/in_mem.rs | 4 +-- .../common/log_store_impl/kv_log_store/mod.rs | 30 +++++++++---------- .../log_store_impl/kv_log_store/writer.rs | 2 +- src/stream/src/common/table/state_table.rs | 2 -- .../src/executor/approx_percentile/global.rs | 2 +- .../approx_percentile/global_state.rs | 5 +--- src/stream/src/executor/sink.rs | 4 +-- src/stream/src/executor/top_n/group_top_n.rs | 2 +- .../executor/top_n/group_top_n_appendonly.rs | 2 +- .../src/executor/top_n/top_n_appendonly.rs | 2 +- src/stream/src/executor/top_n/top_n_plain.rs | 2 +- src/stream/src/executor/top_n/utils.rs | 7 ++--- 13 files changed, 30 insertions(+), 42 deletions(-) diff --git a/src/connector/src/sink/log_store.rs b/src/connector/src/sink/log_store.rs index d7240bb4a4dd4..3c10f0639c47a 100644 --- a/src/connector/src/sink/log_store.rs +++ b/src/connector/src/sink/log_store.rs @@ -119,7 +119,7 @@ pub enum LogStoreReadItem { pub trait LogWriter: Send { /// Initialize the log writer with an epoch - fn init_after_yield_barrier( + fn init( &mut self, epoch: EpochPair, pause_read_on_bootstrap: bool, @@ -354,7 +354,7 @@ pub struct LogWriterMetrics { } impl LogWriter for MonitoredLogWriter { - async fn init_after_yield_barrier( + async fn init( &mut self, epoch: EpochPair, pause_read_on_bootstrap: bool, @@ -365,9 +365,7 @@ impl LogWriter for MonitoredLogWriter { self.metrics .log_store_latest_write_epoch .set(epoch.curr as _); - self.inner - .init_after_yield_barrier(epoch, pause_read_on_bootstrap) - .await + self.inner.init(epoch, pause_read_on_bootstrap).await } async fn write_chunk(&mut self, chunk: StreamChunk) -> LogStoreResult<()> { diff --git a/src/stream/src/common/log_store_impl/in_mem.rs b/src/stream/src/common/log_store_impl/in_mem.rs index 16a8c89bbeac1..16de320aa7edd 100644 --- a/src/stream/src/common/log_store_impl/in_mem.rs +++ b/src/stream/src/common/log_store_impl/in_mem.rs @@ -245,7 +245,7 @@ impl LogReader for BoundedInMemLogStoreReader { } impl LogWriter for BoundedInMemLogStoreWriter { - async fn init_after_yield_barrier( + async fn init( &mut self, epoch: EpochPair, _pause_read_on_bootstrap: bool, @@ -361,7 +361,7 @@ mod tests { let mut join_handle = tokio::spawn(async move { writer - .init_after_yield_barrier(EpochPair::new_test_epoch(init_epoch), false) + .init(EpochPair::new_test_epoch(init_epoch), false) .await .unwrap(); writer diff --git a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs index 73267b6399ac7..490c5bf9abca3 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs @@ -514,7 +514,7 @@ mod tests { .storage .start_epoch(epoch1, HashSet::from_iter([TableId::new(table.id)])); writer - .init_after_yield_barrier(EpochPair::new_test_epoch(epoch1), false) + .init(EpochPair::new_test_epoch(epoch1), false) .await .unwrap(); writer.write_chunk(stream_chunk1.clone()).await.unwrap(); @@ -624,7 +624,7 @@ mod tests { .storage .start_epoch(epoch1, HashSet::from_iter([TableId::new(table.id)])); writer - .init_after_yield_barrier(EpochPair::new_test_epoch(epoch1), false) + .init(EpochPair::new_test_epoch(epoch1), false) .await .unwrap(); writer.write_chunk(stream_chunk1.clone()).await.unwrap(); @@ -713,7 +713,7 @@ mod tests { .storage .start_epoch(epoch3, HashSet::from_iter([TableId::new(table.id)])); writer - .init_after_yield_barrier(EpochPair::new_test_epoch(epoch3), false) + .init(EpochPair::new_test_epoch(epoch3), false) .await .unwrap(); reader.init().await.unwrap(); @@ -814,7 +814,7 @@ mod tests { .storage .start_epoch(epoch1, HashSet::from_iter([TableId::new(table.id)])); writer - .init_after_yield_barrier(EpochPair::new_test_epoch(epoch1), false) + .init(EpochPair::new_test_epoch(epoch1), false) .await .unwrap(); writer.write_chunk(stream_chunk1_1.clone()).await.unwrap(); @@ -930,7 +930,7 @@ mod tests { .storage .start_epoch(epoch3, HashSet::from_iter([TableId::new(table.id)])); writer - .init_after_yield_barrier(EpochPair::new_test_epoch(epoch3), false) + .init(EpochPair::new_test_epoch(epoch3), false) .await .unwrap(); @@ -1043,11 +1043,11 @@ mod tests { .storage .start_epoch(epoch1, HashSet::from_iter([TableId::new(table.id)])); writer1 - .init_after_yield_barrier(EpochPair::new_test_epoch(epoch1), false) + .init(EpochPair::new_test_epoch(epoch1), false) .await .unwrap(); writer2 - .init_after_yield_barrier(EpochPair::new_test_epoch(epoch1), false) + .init(EpochPair::new_test_epoch(epoch1), false) .await .unwrap(); reader1.init().await.unwrap(); @@ -1166,7 +1166,7 @@ mod tests { .storage .start_epoch(epoch3, HashSet::from_iter([TableId::new(table.id)])); writer - .init_after_yield_barrier(EpochPair::new(epoch3, epoch2), false) + .init(EpochPair::new(epoch3, epoch2), false) .await .unwrap(); reader.init().await.unwrap(); @@ -1238,7 +1238,7 @@ mod tests { .storage .start_epoch(epoch1, HashSet::from_iter([TableId::new(table.id)])); writer - .init_after_yield_barrier(EpochPair::new_test_epoch(epoch1), false) + .init(EpochPair::new_test_epoch(epoch1), false) .await .unwrap(); writer.write_chunk(stream_chunk1.clone()).await.unwrap(); @@ -1378,7 +1378,7 @@ mod tests { .storage .start_epoch(epoch1, HashSet::from_iter([TableId::new(table.id)])); writer - .init_after_yield_barrier(EpochPair::new_test_epoch(epoch1), false) + .init(EpochPair::new_test_epoch(epoch1), false) .await .unwrap(); writer.write_chunk(stream_chunk1.clone()).await.unwrap(); @@ -1487,7 +1487,7 @@ mod tests { .storage .start_epoch(epoch4, HashSet::from_iter([TableId::new(table.id)])); writer - .init_after_yield_barrier(EpochPair::new(epoch4, epoch3), false) + .init(EpochPair::new(epoch4, epoch3), false) .await .unwrap(); @@ -1548,7 +1548,7 @@ mod tests { let (mut reader, mut writer) = factory.build().await; writer - .init_after_yield_barrier(EpochPair::new(epoch4, epoch3), false) + .init(EpochPair::new(epoch4, epoch3), false) .await .unwrap(); writer.write_chunk(stream_chunk4.clone()).await.unwrap(); @@ -1718,7 +1718,7 @@ mod tests { .storage .start_epoch(epoch1, HashSet::from_iter([TableId::new(table.id)])); writer - .init_after_yield_barrier(EpochPair::new_test_epoch(epoch1), false) + .init(EpochPair::new_test_epoch(epoch1), false) .await .unwrap(); writer.write_chunk(stream_chunk1.clone()).await.unwrap(); @@ -1787,7 +1787,7 @@ mod tests { .storage .start_epoch(epoch3, HashSet::from_iter([TableId::new(table.id)])); writer - .init_after_yield_barrier(EpochPair::new_test_epoch(epoch3), false) + .init(EpochPair::new_test_epoch(epoch3), false) .await .unwrap(); reader.init().await.unwrap(); @@ -1851,7 +1851,7 @@ mod tests { .storage .start_epoch(epoch4, HashSet::from_iter([TableId::new(table.id)])); writer - .init_after_yield_barrier(EpochPair::new_test_epoch(epoch4), false) + .init(EpochPair::new_test_epoch(epoch4), false) .await .unwrap(); reader.init().await.unwrap(); diff --git a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs index d23f6bdba0d8d..93e4c1211fe61 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs @@ -78,7 +78,7 @@ impl KvLogStoreWriter { } impl LogWriter for KvLogStoreWriter { - async fn init_after_yield_barrier( + async fn init( &mut self, epoch: EpochPair, pause_read_on_bootstrap: bool, diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 6b7fdb3a398a2..d513626cb6267 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -176,8 +176,6 @@ where S: StateStore, SD: ValueRowSerde, { - /// get the newest epoch of the state store and panic if the `init_epoch()` has never be called - /// No need to `wait_for_epoch`, so it should complete immediately. pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { self.local_store.init(InitOptions::new(epoch)).await?; Ok(()) diff --git a/src/stream/src/executor/approx_percentile/global.rs b/src/stream/src/executor/approx_percentile/global.rs index c7366d3e67a4f..f6c70ba80fb69 100644 --- a/src/stream/src/executor/approx_percentile/global.rs +++ b/src/stream/src/executor/approx_percentile/global.rs @@ -55,7 +55,7 @@ impl GlobalApproxPercentileExecutor { let first_epoch = first_barrier.epoch; yield Message::Barrier(first_barrier); let mut state = self.state; - state.init_after_yield_barrier(first_epoch).await?; + state.init(first_epoch).await?; // Get row count state, and row_count. #[for_await] diff --git a/src/stream/src/executor/approx_percentile/global_state.rs b/src/stream/src/executor/approx_percentile/global_state.rs index ca53cd4c067ce..3f95a57e22613 100644 --- a/src/stream/src/executor/approx_percentile/global_state.rs +++ b/src/stream/src/executor/approx_percentile/global_state.rs @@ -58,10 +58,7 @@ impl GlobalApproxPercentileState { } } - pub async fn init_after_yield_barrier( - &mut self, - init_epoch: EpochPair, - ) -> StreamExecutorResult<()> { + pub async fn init(&mut self, init_epoch: EpochPair) -> StreamExecutorResult<()> { // Init state tables. self.count_state_table.init_epoch(init_epoch).await?; self.bucket_state_table.init_epoch(init_epoch).await?; diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 1f06affda6d11..328bf07bf795e 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -283,9 +283,7 @@ impl SinkExecutor { // Propagate the first barrier yield Message::Barrier(barrier); - log_writer - .init_after_yield_barrier(epoch_pair, is_pause_on_startup) - .await?; + log_writer.init(epoch_pair, is_pause_on_startup).await?; let mut is_paused = false; diff --git a/src/stream/src/executor/top_n/group_top_n.rs b/src/stream/src/executor/top_n/group_top_n.rs index 6de2edd156ff8..9c7d037c40acb 100644 --- a/src/stream/src/executor/top_n/group_top_n.rs +++ b/src/stream/src/executor/top_n/group_top_n.rs @@ -232,7 +232,7 @@ where self.caches.evict() } - async fn init_after_yield_barrier(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { + async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { self.managed_state.init_epoch(epoch).await } diff --git a/src/stream/src/executor/top_n/group_top_n_appendonly.rs b/src/stream/src/executor/top_n/group_top_n_appendonly.rs index 597c43d8ff940..2cf1741169ced 100644 --- a/src/stream/src/executor/top_n/group_top_n_appendonly.rs +++ b/src/stream/src/executor/top_n/group_top_n_appendonly.rs @@ -201,7 +201,7 @@ where self.caches.evict() } - async fn init_after_yield_barrier(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { + async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { self.managed_state.init_epoch(epoch).await } diff --git a/src/stream/src/executor/top_n/top_n_appendonly.rs b/src/stream/src/executor/top_n/top_n_appendonly.rs index 8a195af8685dc..2dcf36b2250b4 100644 --- a/src/stream/src/executor/top_n/top_n_appendonly.rs +++ b/src/stream/src/executor/top_n/top_n_appendonly.rs @@ -135,7 +135,7 @@ where self.managed_state.try_flush().await } - async fn init_after_yield_barrier(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { + async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { self.managed_state.init_epoch(epoch).await?; self.managed_state .init_topn_cache(NO_GROUP_KEY, &mut self.cache) diff --git a/src/stream/src/executor/top_n/top_n_plain.rs b/src/stream/src/executor/top_n/top_n_plain.rs index 4f4835584e47e..ebddd801a579c 100644 --- a/src/stream/src/executor/top_n/top_n_plain.rs +++ b/src/stream/src/executor/top_n/top_n_plain.rs @@ -169,7 +169,7 @@ where self.managed_state.try_flush().await } - async fn init_after_yield_barrier(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { + async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { self.managed_state.init_epoch(epoch).await?; self.managed_state .init_topn_cache(NO_GROUP_KEY, &mut self.cache) diff --git a/src/stream/src/executor/top_n/utils.rs b/src/stream/src/executor/top_n/utils.rs index 2c9a04532da67..761556319e6b8 100644 --- a/src/stream/src/executor/top_n/utils.rs +++ b/src/stream/src/executor/top_n/utils.rs @@ -50,10 +50,7 @@ pub trait TopNExecutorBase: Send + 'static { fn evict(&mut self) {} - fn init_after_yield_barrier( - &mut self, - epoch: EpochPair, - ) -> impl Future> + Send; + fn init(&mut self, epoch: EpochPair) -> impl Future> + Send; /// Handle incoming watermarks fn handle_watermark( @@ -92,7 +89,7 @@ where let barrier = expect_first_barrier(&mut input).await?; let barrier_epoch = barrier.epoch; yield Message::Barrier(barrier); - self.inner.init_after_yield_barrier(barrier_epoch).await?; + self.inner.init(barrier_epoch).await?; #[for_await] for msg in input { From 9d98b9caabb7fc6294ab805678ae2957ba121e2c Mon Sep 17 00:00:00 2001 From: William Wen Date: Tue, 12 Nov 2024 14:56:23 +0800 Subject: [PATCH 9/9] add comment --- src/stream/src/common/table/state_table.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index d513626cb6267..7979bc9f4354f 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -176,6 +176,8 @@ where S: StateStore, SD: ValueRowSerde, { + /// In streaming executors, this methods must be called **after** receiving and yielding the first barrier, + /// and otherwise, deadlock can be likely to happen. pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { self.local_store.init(InitOptions::new(epoch)).await?; Ok(())