diff --git a/proto/stream_service.proto b/proto/stream_service.proto index 19a75f366436..8b006626cd74 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -64,7 +64,6 @@ message WaitEpochCommitResponse { message StreamingControlStreamRequest { message InitRequest { - uint64 version_id = 1; repeated stream_plan.SubscriptionUpstreamInfo subscriptions = 2; } diff --git a/src/compute/tests/integration_tests.rs b/src/compute/tests/integration_tests.rs index f36cd6cf8164..798e0120ee3d 100644 --- a/src/compute/tests/integration_tests.rs +++ b/src/compute/tests/integration_tests.rs @@ -471,7 +471,7 @@ async fn test_row_seq_scan() -> StreamResult<()> { ); let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); - state.init_epoch(epoch); + state.init_epoch(epoch).await?; state.insert(OwnedRow::new(vec![ Some(1_i32.into()), Some(4_i32.into()), diff --git a/src/ctl/src/cmd_impl/bench.rs b/src/ctl/src/cmd_impl/bench.rs index dce4a21115d6..eeba749a1483 100644 --- a/src/ctl/src/cmd_impl/bench.rs +++ b/src/ctl/src/cmd_impl/bench.rs @@ -18,7 +18,7 @@ use std::sync::atomic::AtomicU64; use std::sync::Arc; use std::time::Instant; -use anyhow::Result; +use anyhow::{anyhow, Result}; use clap::Subcommand; use futures::future::try_join_all; use futures::{pin_mut, Future, StreamExt}; @@ -97,6 +97,11 @@ pub async fn do_bench(context: &CtlContext, cmd: BenchCommands) -> Result<()> { )?) .await?; let table = get_table_catalog(meta.clone(), mv_name).await?; + let committed_epoch = hummock + .inner() + .get_pinned_version() + .table_committed_epoch(table.id) + .ok_or_else(|| anyhow!("table id {} not exist", table.id))?; let mut handlers = vec![]; for i in 0..threads { let table = table.clone(); @@ -107,7 +112,8 @@ pub async fn do_bench(context: &CtlContext, cmd: BenchCommands) -> Result<()> { tracing::info!(thread = i, "starting scan"); let state_table = { let mut tb = make_state_table(hummock, &table).await; - tb.init_epoch(EpochPair::new_test_epoch(u64::MAX)); + tb.init_epoch(EpochPair::new(u64::MAX, committed_epoch)) + .await?; tb }; loop { diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index d1522de6ba0c..ea84625b1933 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -474,16 +474,12 @@ impl GlobalBarrierWorkerContextImpl { node: &WorkerNode, subscriptions: impl Iterator, ) -> MetaResult { - let initial_version_id = self - .hummock_manager - .on_current_version(|version| version.id) - .await; let handle = self .env .stream_client_pool() .get(node) .await? - .start_streaming_control(initial_version_id, subscriptions) + .start_streaming_control(subscriptions) .await?; Ok(handle) } diff --git a/src/rpc_client/src/stream_client.rs b/src/rpc_client/src/stream_client.rs index 882e7c2830ca..6484adb1c921 100644 --- a/src/rpc_client/src/stream_client.rs +++ b/src/rpc_client/src/stream_client.rs @@ -21,7 +21,6 @@ use futures::TryStreamExt; use risingwave_common::config::MAX_CONNECTION_WINDOW_SIZE; use risingwave_common::monitor::{EndpointExt, TcpConfig}; use risingwave_common::util::addr::HostAddr; -use risingwave_hummock_sdk::HummockVersionId; use risingwave_pb::stream_plan::SubscriptionUpstreamInfo; use risingwave_pb::stream_service::stream_service_client::StreamServiceClient; use risingwave_pb::stream_service::streaming_control_stream_request::InitRequest; @@ -87,13 +86,11 @@ pub type StreamingControlHandle = impl StreamClient { pub async fn start_streaming_control( &self, - version_id: HummockVersionId, subscriptions: impl Iterator, ) -> Result { let first_request = StreamingControlStreamRequest { request: Some(streaming_control_stream_request::Request::Init( InitRequest { - version_id: version_id.to_u64(), subscriptions: subscriptions.collect(), }, )), diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 3b7054a51d86..d0ba14195ed2 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -724,7 +724,11 @@ pub(crate) mod tests { let val = Bytes::from(b"0"[..].to_vec()); // 1 Byte value let kv_count = 11; - // let base_epoch = Epoch(0); + let prev_epoch: u64 = hummock_manager_ref + .get_current_version() + .await + .table_committed_epoch(existing_table_id.into()) + .unwrap(); let base_epoch = Epoch::now(); let mut epoch: u64 = base_epoch.0; let millisec_interval_epoch: u64 = (1 << 16) * 100; @@ -741,7 +745,10 @@ pub(crate) mod tests { let next_epoch = epoch + millisec_interval_epoch; storage.start_epoch(next_epoch, table_id_set.clone()); if i == 0 { - local.init_for_test(epoch).await.unwrap(); + local + .init_for_test_with_prev_epoch(epoch, prev_epoch) + .await + .unwrap(); } epoch_set.insert(epoch); let mut prefix = BytesMut::default(); @@ -935,6 +942,11 @@ pub(crate) mod tests { // 1. add sstables let val = Bytes::from(b"0"[..].to_vec()); // 1 Byte value let kv_count = 11; + let prev_epoch: u64 = hummock_manager_ref + .get_current_version() + .await + .table_committed_epoch(existing_table_id.into()) + .unwrap(); // let base_epoch = Epoch(0); let base_epoch = Epoch::now(); let mut epoch: u64 = base_epoch.0; @@ -948,7 +960,10 @@ pub(crate) mod tests { storage.start_epoch(epoch, table_id_set.clone()); for i in 0..kv_count { if i == 0 { - local.init_for_test(epoch).await.unwrap(); + local + .init_for_test_with_prev_epoch(epoch, prev_epoch) + .await + .unwrap(); } let next_epoch = epoch + millisec_interval_epoch; storage.start_epoch(next_epoch, table_id_set.clone()); @@ -1912,11 +1927,23 @@ pub(crate) mod tests { let table_id_set = HashSet::from_iter(vec![local_1.0.table_id(), local_2.0.table_id()].into_iter()); + let version = hummock_meta_client.get_current_version().await.unwrap(); + storage.start_epoch(*epoch, table_id_set.clone()); for i in 0..kv_count { if i == 0 && *is_init { - local_1.0.init_for_test(*epoch).await.unwrap(); - local_2.0.init_for_test(*epoch).await.unwrap(); + let prev_epoch_1 = version.table_committed_epoch(local_1.0.table_id()).unwrap(); + local_1 + .0 + .init_for_test_with_prev_epoch(*epoch, prev_epoch_1) + .await + .unwrap(); + let prev_epoch_2 = version.table_committed_epoch(local_2.0.table_id()).unwrap(); + local_2 + .0 + .init_for_test_with_prev_epoch(*epoch, prev_epoch_2) + .await + .unwrap(); *is_init = false; } diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index 24ba1ca1cf77..7e847fc089aa 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -476,7 +476,7 @@ async fn test_state_store_sync() { .committed() .table_committed_epoch(TEST_TABLE_ID) .unwrap(); - let epoch1 = test_epoch(base_epoch.next_epoch()); + let epoch1 = base_epoch.next_epoch(); test_env .storage .start_epoch(epoch1, HashSet::from_iter([TEST_TABLE_ID])); @@ -1133,6 +1133,13 @@ async fn test_iter_with_min_epoch() { .new_local(NewLocalOptions::for_test(TEST_TABLE_ID)) .await; + let prev_epoch = test_env + .manager + .get_current_version() + .await + .table_committed_epoch(TEST_TABLE_ID) + .unwrap(); + let epoch1 = (31 * 1000) << 16; test_env .storage @@ -1149,7 +1156,10 @@ async fn test_iter_with_min_epoch() { .map(|index| (gen_key(index), StorageValue::new_put(gen_val(index)))) .collect(); - hummock_storage.init_for_test(epoch1).await.unwrap(); + hummock_storage + .init_for_test_with_prev_epoch(epoch1, prev_epoch) + .await + .unwrap(); hummock_storage .ingest_batch( @@ -1422,7 +1432,16 @@ async fn test_hummock_version_reader() { .map(|index| (gen_key(index), StorageValue::new_put(gen_val(index)))) .collect(); { - hummock_storage.init_for_test(epoch1).await.unwrap(); + let prev_epoch = test_env + .manager + .get_current_version() + .await + .table_committed_epoch(TEST_TABLE_ID) + .unwrap(); + hummock_storage + .init_for_test_with_prev_epoch(epoch1, prev_epoch) + .await + .unwrap(); hummock_storage .ingest_batch( batch_epoch1, @@ -1852,7 +1871,16 @@ async fn test_get_with_min_epoch() { test_env .storage .start_epoch(epoch1, HashSet::from_iter([TEST_TABLE_ID])); - hummock_storage.init_for_test(epoch1).await.unwrap(); + let prev_epoch = test_env + .manager + .get_current_version() + .await + .table_committed_epoch(TEST_TABLE_ID) + .unwrap(); + hummock_storage + .init_for_test_with_prev_epoch(epoch1, prev_epoch) + .await + .unwrap(); let gen_key = |index: usize| -> TableKey { gen_key_from_str(VirtualNode::ZERO, format!("key_{}", index).as_str()) @@ -2125,9 +2153,21 @@ async fn test_table_watermark() { test_env .storage .start_epoch(epoch1, HashSet::from_iter([TEST_TABLE_ID])); - local1.init_for_test(epoch1).await.unwrap(); + let prev_epoch = test_env + .manager + .get_current_version() + .await + .table_committed_epoch(TEST_TABLE_ID) + .unwrap(); + local1 + .init_for_test_with_prev_epoch(epoch1, prev_epoch) + .await + .unwrap(); local1.update_vnode_bitmap(vnode_bitmap1.clone()); - local2.init_for_test(epoch1).await.unwrap(); + local2 + .init_for_test_with_prev_epoch(epoch1, prev_epoch) + .await + .unwrap(); local2.update_vnode_bitmap(vnode_bitmap2.clone()); fn gen_inner_key(index: usize) -> Bytes { diff --git a/src/storage/hummock_test/src/local_state_store_test_utils.rs b/src/storage/hummock_test/src/local_state_store_test_utils.rs index fed253a0488d..650c55774f42 100644 --- a/src/storage/hummock_test/src/local_state_store_test_utils.rs +++ b/src/storage/hummock_test/src/local_state_store_test_utils.rs @@ -22,5 +22,13 @@ pub trait LocalStateStoreTestExt: LocalStateStore { fn init_for_test(&mut self, epoch: u64) -> impl Future> + Send + '_ { self.init(InitOptions::new(EpochPair::new_test_epoch(epoch))) } + + fn init_for_test_with_prev_epoch( + &mut self, + epoch: u64, + prev_epoch: u64, + ) -> impl Future> + Send + '_ { + self.init(InitOptions::new(EpochPair::new(epoch, prev_epoch))) + } } impl LocalStateStoreTestExt for T {} diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index 673423522565..914517ff8333 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -1357,9 +1357,7 @@ async fn test_clear_shared_buffer() { drop(local_hummock_storage); - hummock_storage - .clear_shared_buffer(hummock_storage.get_pinned_version().id()) - .await; + hummock_storage.clear_shared_buffer().await; } /// Test the following behaviours: @@ -1480,7 +1478,8 @@ async fn test_replicated_local_hummock_storage() { .new_local(NewLocalOptions::for_test(TEST_TABLE_ID)) .await; - local_hummock_storage_2.init_for_test(epoch2).await.unwrap(); + local_hummock_storage_2.init_for_test(epoch1).await.unwrap(); + local_hummock_storage_2.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); // ingest 16B batch let mut batch2 = vec![ diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index 908bb45a43fc..3e3acf4b48ef 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -28,7 +28,7 @@ use prometheus::core::{AtomicU64, GenericGauge}; use prometheus::{Histogram, IntGauge}; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::SstDeltaInfo; -use risingwave_hummock_sdk::{HummockEpoch, HummockVersionId, SyncResult}; +use risingwave_hummock_sdk::{HummockEpoch, SyncResult}; use tokio::spawn; use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; @@ -465,67 +465,14 @@ impl HummockEventHandler { .start_sync_epoch(new_sync_epoch, sync_result_sender, table_ids); } - async fn handle_clear(&mut self, notifier: oneshot::Sender<()>, version_id: HummockVersionId) { + fn handle_clear(&mut self, notifier: oneshot::Sender<()>) { info!( - ?version_id, current_version_id = ?self.uploader.hummock_version().id(), "handle clear event" ); self.uploader.clear(); - let current_version = self.uploader.hummock_version(); - - if current_version.id < version_id { - let mut latest_version = if let Some(CacheRefillerEvent { - pinned_version, - new_pinned_version, - }) = self.refiller.clear() - { - assert_eq!( - current_version.id(), - pinned_version.id(), - "refiller earliest version {:?} not equal to current version {:?}", - *pinned_version, - **current_version - ); - - info!(?version_id, "refiller is clear in recovery"); - - Some(new_pinned_version) - } else { - None - }; - - while let latest_version_ref = latest_version.as_ref().unwrap_or(current_version) - && latest_version_ref.id < version_id - { - let version_update = self - .version_update_rx - .recv() - .await - .expect("should not be empty"); - let prev_version_id = latest_version_ref.id(); - if let Some(new_version) = Self::resolve_version_update_info( - latest_version_ref.clone(), - version_update, - None, - ) { - info!( - ?prev_version_id, - new_version_id = ?new_version.id(), - "recv new version" - ); - latest_version = Some(new_version); - } - } - - self.apply_version_update( - current_version.clone(), - latest_version.expect("must have some version update to raise the mce"), - ); - } - assert!( self.local_read_version_mapping.is_empty(), "read version mapping not empty when clear. remaining tables: {:?}", @@ -540,7 +487,7 @@ impl HummockEventHandler { error!("failed to notify completion of clear event: {:?}", e); }); - info!(?version_id, "clear finished"); + info!("clear finished"); } fn handle_version_update(&mut self, version_payload: HummockVersionUpdate) { @@ -646,9 +593,6 @@ impl HummockEventHandler { event = pin!(self.hummock_event_rx.recv()) => { let Some(event) = event else { break }; match event { - HummockEvent::Clear(notifier, version_id) => { - self.handle_clear(notifier, version_id).await - }, HummockEvent::Shutdown => { info!("event handler shutdown"); return; @@ -690,8 +634,8 @@ impl HummockEventHandler { } => { self.handle_sync_epoch(new_sync_epoch, sync_result_sender, table_ids); } - HummockEvent::Clear(_, _) => { - unreachable!("clear is handled in separated async context") + HummockEvent::Clear(notifier) => { + self.handle_clear(notifier); } HummockEvent::Shutdown => { unreachable!("shutdown is handled specially") @@ -886,7 +830,7 @@ impl SyncedData { #[cfg(test)] mod tests { use std::collections::{HashMap, HashSet}; - use std::future::{poll_fn, Future}; + use std::future::poll_fn; use std::sync::Arc; use std::task::Poll; @@ -905,7 +849,7 @@ mod tests { use crate::hummock::event_handler::refiller::CacheRefiller; use crate::hummock::event_handler::uploader::test_utils::{gen_imm, TEST_TABLE_ID}; use crate::hummock::event_handler::uploader::UploadTaskOutput; - use crate::hummock::event_handler::{HummockEvent, HummockEventHandler, HummockVersionUpdate}; + use crate::hummock::event_handler::{HummockEvent, HummockEventHandler}; use crate::hummock::iterator::test_utils::mock_sstable_store; use crate::hummock::local_version::pinned_version::PinnedVersion; use crate::hummock::store::version::{StagingData, VersionUpdate}; @@ -914,132 +858,6 @@ mod tests { use crate::monitor::HummockStateStoreMetrics; use crate::store::SealCurrentEpochOptions; - #[tokio::test] - async fn test_clear_shared_buffer() { - let mut next_version_id = 1; - let mut make_new_version = || { - let id = next_version_id; - next_version_id += 1; - HummockVersion::from_rpc_protobuf(&PbHummockVersion { - id, - ..Default::default() - }) - }; - - let initial_version = PinnedVersion::new(make_new_version(), unbounded_channel().0); - - let (version_update_tx, version_update_rx) = unbounded_channel(); - let (refill_task_tx, mut refill_task_rx) = unbounded_channel(); - - let refill_task_tx_clone = refill_task_tx.clone(); - - let event_handler = HummockEventHandler::new_inner( - version_update_rx, - initial_version.clone(), - mock_sstable_store().await, - Arc::new(HummockStateStoreMetrics::unused()), - &default_opts_for_test(), - Arc::new(|_, _| unreachable!("should not spawn upload task")), - Arc::new(move |_, _, old_version, new_version| { - let (tx, rx) = oneshot::channel(); - refill_task_tx_clone - .send((old_version, new_version, tx)) - .unwrap(); - spawn(async move { - let _ = rx.await; - }) - }), - ); - - let event_tx = event_handler.event_sender(); - let latest_version = event_handler.recent_versions.clone(); - let latest_version_update_tx = event_handler.version_update_notifier_tx.clone(); - - let send_clear = |version_id| { - let (tx, rx) = oneshot::channel(); - event_tx.send(HummockEvent::Clear(tx, version_id)).unwrap(); - rx - }; - - let _join_handle = spawn(event_handler.start_hummock_event_handler_worker()); - - // test normal recovery - send_clear(initial_version.id()).await.unwrap(); - - // test normal refill finish - let version1 = make_new_version(); - { - version_update_tx - .send(HummockVersionUpdate::PinnedVersion(Box::new( - version1.clone(), - ))) - .unwrap(); - let (old_version, new_version, refill_finish_tx) = refill_task_rx.recv().await.unwrap(); - assert_eq!(*old_version, *initial_version); - assert_eq!(*new_version, version1); - assert_eq!(**latest_version.load().latest_version(), *initial_version); - - let mut changed = latest_version_update_tx.subscribe(); - refill_finish_tx.send(()).unwrap(); - changed.changed().await.unwrap(); - assert_eq!(**latest_version.load().latest_version(), version1); - } - - // test recovery with pending refill task - let version2 = make_new_version(); - let version3 = make_new_version(); - { - version_update_tx - .send(HummockVersionUpdate::PinnedVersion(Box::new( - version2.clone(), - ))) - .unwrap(); - version_update_tx - .send(HummockVersionUpdate::PinnedVersion(Box::new( - version3.clone(), - ))) - .unwrap(); - let (old_version2, new_version2, _refill_finish_tx2) = - refill_task_rx.recv().await.unwrap(); - assert_eq!(*old_version2, version1); - assert_eq!(*new_version2, version2); - let (old_version3, new_version3, _refill_finish_tx3) = - refill_task_rx.recv().await.unwrap(); - assert_eq!(*old_version3, version2); - assert_eq!(*new_version3, version3); - assert_eq!(**latest_version.load().latest_version(), version1); - - let rx = send_clear(version3.id); - rx.await.unwrap(); - assert_eq!(**latest_version.load().latest_version(), version3); - } - - async fn assert_pending(fut: &mut (impl Future + Unpin)) { - assert!(poll_fn(|cx| Poll::Ready(fut.poll_unpin(cx).is_pending())).await); - } - - // test recovery with later arriving version update - let version4 = make_new_version(); - let version5 = make_new_version(); - { - let mut rx = send_clear(version5.id); - assert_pending(&mut rx).await; - version_update_tx - .send(HummockVersionUpdate::PinnedVersion(Box::new( - version4.clone(), - ))) - .unwrap(); - assert_pending(&mut rx).await; - version_update_tx - .send(HummockVersionUpdate::PinnedVersion(Box::new( - version5.clone(), - ))) - .unwrap(); - rx.await.unwrap(); - assert_eq!(**latest_version.load().latest_version(), version5); - } - } - #[tokio::test] async fn test_old_epoch_sync_fail() { let epoch0 = test_epoch(233); diff --git a/src/storage/src/hummock/event_handler/mod.rs b/src/storage/src/hummock/event_handler/mod.rs index f0a4b2a89987..910c567e5d4d 100644 --- a/src/storage/src/hummock/event_handler/mod.rs +++ b/src/storage/src/hummock/event_handler/mod.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use parking_lot::{RwLock, RwLockReadGuard}; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::TableId; -use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId, HummockVersionId}; +use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId}; use thiserror_ext::AsReport; use tokio::sync::oneshot; @@ -65,7 +65,7 @@ pub enum HummockEvent { }, /// Clear shared buffer and reset all states - Clear(oneshot::Sender<()>, HummockVersionId), + Clear(oneshot::Sender<()>), Shutdown, @@ -122,7 +122,7 @@ impl HummockEvent { table_ids, } => format!("AwaitSyncEpoch epoch {} {:?}", new_sync_epoch, table_ids), - HummockEvent::Clear(_, version_id) => format!("Clear {}", version_id), + HummockEvent::Clear(_) => "Clear".to_string(), HummockEvent::Shutdown => "Shutdown".to_string(), diff --git a/src/storage/src/hummock/event_handler/refiller.rs b/src/storage/src/hummock/event_handler/refiller.rs index 09526f6b83c1..dcf5c7a565eb 100644 --- a/src/storage/src/hummock/event_handler/refiller.rs +++ b/src/storage/src/hummock/event_handler/refiller.rs @@ -287,21 +287,6 @@ impl CacheRefiller { pub(crate) fn last_new_pinned_version(&self) -> Option<&PinnedVersion> { self.queue.back().map(|item| &item.event.new_pinned_version) } - - /// Clear the queue for cache refill and return an event that merges all pending cache refill events - /// into a single event that takes the earliest and latest version. - pub(crate) fn clear(&mut self) -> Option { - let last_item = self.queue.pop_back()?; - let mut event = last_item.event; - while let Some(item) = self.queue.pop_back() { - assert_eq!( - event.pinned_version.id(), - item.event.new_pinned_version.id() - ); - event.pinned_version = item.event.pinned_version; - } - Some(event) - } } impl CacheRefiller { diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 073dd9e1dc1c..3b4a143d0ad5 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -30,7 +30,7 @@ use risingwave_hummock_sdk::key::{ use risingwave_hummock_sdk::sstable_info::SstableInfo; use risingwave_hummock_sdk::table_watermark::TableWatermarksIndex; use risingwave_hummock_sdk::version::HummockVersion; -use risingwave_hummock_sdk::{HummockReadEpoch, HummockSstableObjectId, HummockVersionId}; +use risingwave_hummock_sdk::{HummockReadEpoch, HummockSstableObjectId}; use risingwave_rpc_client::HummockMetaClient; use thiserror_ext::AsReport; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; @@ -510,10 +510,10 @@ impl HummockStorage { ) } - pub async fn clear_shared_buffer(&self, version_id: HummockVersionId) { + pub async fn clear_shared_buffer(&self) { let (tx, rx) = oneshot::channel(); self.hummock_event_sender - .send(HummockEvent::Clear(tx, version_id)) + .send(HummockEvent::Clear(tx)) .expect("should send success"); rx.await.expect("should wait success"); } @@ -787,7 +787,10 @@ impl HummockStorage { &self.hummock_version_reader } - pub async fn wait_version_update(&self, old_id: HummockVersionId) -> HummockVersionId { + pub async fn wait_version_update( + &self, + old_id: risingwave_hummock_sdk::HummockVersionId, + ) -> risingwave_hummock_sdk::HummockVersionId { use tokio::task::yield_now; loop { let cur_id = self.recent_versions.load().latest_version().id(); diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index a2fcc5a2c786..b53dc6ee27c7 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -481,9 +481,7 @@ impl LocalStateStore for LocalHummockStorage { async fn init(&mut self, options: InitOptions) -> StorageResult<()> { let epoch = options.epoch; - if self.is_replicated { - self.wait_for_epoch(epoch.prev).await?; - } + self.wait_for_epoch(epoch.prev).await?; assert!( self.epoch.replace(epoch.curr).is_none(), "local state store of table id {:?} is init for more than once", diff --git a/src/stream/benches/bench_state_table.rs b/src/stream/benches/bench_state_table.rs index ceaabdec5b63..445c5f7ddc32 100644 --- a/src/stream/benches/bench_state_table.rs +++ b/src/stream/benches/bench_state_table.rs @@ -113,7 +113,7 @@ async fn run_bench_state_table_inserts( rows: Vec, ) { let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); - state_table.init_epoch(epoch); + state_table.init_epoch(epoch).await.unwrap(); for row in rows { state_table.insert(row); } @@ -173,7 +173,7 @@ async fn run_bench_state_table_chunks( chunks: Vec, ) { let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); - state_table.init_epoch(epoch); + state_table.init_epoch(epoch).await.unwrap(); for chunk in chunks { state_table.write_chunk(chunk); } diff --git a/src/stream/spill_test/src/test_mem_table.rs b/src/stream/spill_test/src/test_mem_table.rs index 75407feeaa28..a24d998ac88f 100644 --- a/src/stream/spill_test/src/test_mem_table.rs +++ b/src/stream/spill_test/src/test_mem_table.rs @@ -65,7 +65,7 @@ async fn test_mem_table_spill_in_streaming() { .await; let epoch = EpochPair::new_test_epoch(test_epoch(1)); - state_table.init_epoch(epoch); + state_table.init_epoch(epoch).await.unwrap(); state_table.insert(OwnedRow::new(vec![ Some(1_i32.into()), @@ -195,7 +195,7 @@ async fn test_mem_table_spill_in_streaming_multiple_times() { .await; let epoch = EpochPair::new_test_epoch(test_epoch(1)); - state_table.init_epoch(epoch); + state_table.init_epoch(epoch).await.unwrap(); state_table.insert(OwnedRow::new(vec![ Some(1_i32.into()), diff --git a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs index 49ca6bbc17d5..490c5bf9abca 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs @@ -696,10 +696,7 @@ mod tests { drop(writer); // Recovery - test_env - .storage - .clear_shared_buffer(test_env.manager.get_current_version().await.id) - .await; + test_env.storage.clear_shared_buffer().await; // Rebuild log reader and writer in recovery let factory = KvLogStoreFactory::new( @@ -915,10 +912,7 @@ mod tests { drop(writer); // Recovery - test_env - .storage - .clear_shared_buffer(test_env.manager.get_current_version().await.id) - .await; + test_env.storage.clear_shared_buffer().await; // Rebuild log reader and writer in recovery let factory = KvLogStoreFactory::new( @@ -1155,10 +1149,7 @@ mod tests { drop(writer2); // Recovery - test_env - .storage - .clear_shared_buffer(test_env.manager.get_current_version().await.id) - .await; + test_env.storage.clear_shared_buffer().await; let vnodes = build_bitmap(0..VirtualNode::COUNT_FOR_TEST); let factory = KvLogStoreFactory::new( @@ -1779,10 +1770,7 @@ mod tests { drop(writer); // Recovery - test_env - .storage - .clear_shared_buffer(test_env.manager.get_current_version().await.id) - .await; + test_env.storage.clear_shared_buffer().await; // Rebuild log reader and writer in recovery let factory = KvLogStoreFactory::new( @@ -1846,10 +1834,7 @@ mod tests { drop(writer); // Recovery - test_env - .storage - .clear_shared_buffer(test_env.manager.get_current_version().await.id) - .await; + test_env.storage.clear_shared_buffer().await; // Rebuild log reader and writer in recovery let factory = KvLogStoreFactory::new( diff --git a/src/stream/src/common/log_store_impl/kv_log_store/reader.rs b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs index c84db97002b0..b2b19c45bd0a 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/reader.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs @@ -110,7 +110,9 @@ impl RewindDelay { self.rewind_count.inc(); if let Some(delay) = self.backoff_policy.next() { self.rewind_delay.observe(delay.as_secs_f64()); - sleep(delay).await; + if !cfg!(test) { + sleep(delay).await; + } } } } diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 8bf2e2750f6d..7979bc9f4354 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use bytes::{BufMut, Bytes, BytesMut}; use either::Either; use foyer::CacheContext; -use futures::{pin_mut, FutureExt, Stream, StreamExt, TryStreamExt}; +use futures::{pin_mut, Stream, StreamExt, TryStreamExt}; use futures_async_stream::for_await; use itertools::{izip, Itertools}; use risingwave_common::array::stream_record::Record; @@ -44,7 +44,7 @@ use risingwave_hummock_sdk::key::{ }; use risingwave_hummock_sdk::table_watermark::{VnodeWatermark, WatermarkDirection}; use risingwave_pb::catalog::Table; -use risingwave_storage::error::{ErrorKind, StorageError, StorageResult}; +use risingwave_storage::error::{ErrorKind, StorageError}; use risingwave_storage::hummock::CachePolicy; use risingwave_storage::mem_table::MemTableError; use risingwave_storage::row_serde::find_columns_by_ids; @@ -170,33 +170,17 @@ pub type WatermarkCacheParameterizedStateTable; // initialize -impl StateTableInner -where - S: StateStore, - SD: ValueRowSerde, -{ - /// get the newest epoch of the state store and panic if the `init_epoch()` has never be called - /// async interface only used for replicated state table, - /// as it needs to wait for prev epoch to be committed. - pub async fn init_epoch(&mut self, epoch: EpochPair) -> StorageResult<()> { - self.local_store.init(InitOptions::new(epoch)).await - } -} - -// initialize -impl StateTableInner +impl + StateTableInner where S: StateStore, SD: ValueRowSerde, { - /// get the newest epoch of the state store and panic if the `init_epoch()` has never be called - /// No need to `wait_for_epoch`, so it should complete immediately. - pub fn init_epoch(&mut self, epoch: EpochPair) { - self.local_store - .init(InitOptions::new(epoch)) - .now_or_never() - .expect("non-replicated state store should start immediately.") - .expect("non-replicated state store should not wait_for_epoch, and fail because of it.") + /// In streaming executors, this methods must be called **after** receiving and yielding the first barrier, + /// and otherwise, deadlock can be likely to happen. + pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { + self.local_store.init(InitOptions::new(epoch)).await?; + Ok(()) } pub fn state_store(&self) -> &S { diff --git a/src/stream/src/common/table/test_state_table.rs b/src/stream/src/common/table/test_state_table.rs index 99036e968f04..7b8959bbf2c9 100644 --- a/src/stream/src/common/table/test_state_table.rs +++ b/src/stream/src/common/table/test_state_table.rs @@ -65,7 +65,7 @@ async fn test_state_table_update_insert() { test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); - state_table.init_epoch(epoch); + state_table.init_epoch(epoch).await.unwrap(); state_table.insert(OwnedRow::new(vec![ Some(6_i32.into()), @@ -258,7 +258,7 @@ async fn test_state_table_iter_with_prefix() { test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); - state_table.init_epoch(epoch); + state_table.init_epoch(epoch).await.unwrap(); state_table.insert(OwnedRow::new(vec![ Some(1_i32.into()), @@ -393,7 +393,7 @@ async fn test_state_table_iter_with_pk_range() { test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); - state_table.init_epoch(epoch); + state_table.init_epoch(epoch).await.unwrap(); state_table.insert(OwnedRow::new(vec![ Some(1_i32.into()), @@ -532,7 +532,7 @@ async fn test_mem_table_assertion() { test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); - state_table.init_epoch(epoch); + state_table.init_epoch(epoch).await.unwrap(); state_table.insert(OwnedRow::new(vec![ Some(1_i32.into()), Some(11_i32.into()), @@ -578,7 +578,7 @@ async fn test_state_table_iter_with_value_indices() { test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); - state_table.init_epoch(epoch); + state_table.init_epoch(epoch).await.unwrap(); state_table.insert(OwnedRow::new(vec![ Some(1_i32.into()), @@ -751,7 +751,7 @@ async fn test_state_table_iter_with_shuffle_value_indices() { test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); - state_table.init_epoch(epoch); + state_table.init_epoch(epoch).await.unwrap(); state_table.insert(OwnedRow::new(vec![ Some(1_i32.into()), @@ -998,7 +998,7 @@ async fn test_state_table_write_chunk() { test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); - state_table.init_epoch(epoch); + state_table.init_epoch(epoch).await.unwrap(); let chunk = StreamChunk::from_rows( &[ @@ -1130,7 +1130,7 @@ async fn test_state_table_write_chunk_visibility() { test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); - state_table.init_epoch(epoch); + state_table.init_epoch(epoch).await.unwrap(); let chunk = StreamChunk::from_rows( &[ @@ -1257,7 +1257,7 @@ async fn test_state_table_write_chunk_value_indices() { test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); - state_table.init_epoch(epoch); + state_table.init_epoch(epoch).await.unwrap(); let chunk = StreamChunk::from_rows( &[ @@ -1354,7 +1354,7 @@ async fn test_state_table_watermark_cache_ignore_null() { test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); - state_table.init_epoch(epoch); + state_table.init_epoch(epoch).await.unwrap(); let rows = vec![ ( @@ -1480,7 +1480,7 @@ async fn test_state_table_watermark_cache_write_chunk() { test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); - state_table.init_epoch(epoch); + state_table.init_epoch(epoch).await.unwrap(); let cache = state_table.get_watermark_cache(); assert_eq!(cache.len(), 0); @@ -1655,7 +1655,7 @@ async fn test_state_table_watermark_cache_refill() { test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); - state_table.init_epoch(epoch); + state_table.init_epoch(epoch).await.unwrap(); let rows = vec![ OwnedRow::new(vec![ @@ -1751,7 +1751,7 @@ async fn test_state_table_iter_prefix_and_sub_range() { test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); - state_table.init_epoch(epoch); + state_table.init_epoch(epoch).await.unwrap(); state_table.insert(OwnedRow::new(vec![ Some(1_i32.into()), @@ -1952,7 +1952,7 @@ async fn test_replicated_state_table_replication() { test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); - state_table.init_epoch(epoch); + state_table.init_epoch(epoch).await.unwrap(); replicated_state_table.init_epoch(epoch).await.unwrap(); // Insert first record into base state table diff --git a/src/stream/src/common/table/test_storage_table.rs b/src/stream/src/common/table/test_storage_table.rs index be09fa1e9183..bf1dd0098feb 100644 --- a/src/stream/src/common/table/test_storage_table.rs +++ b/src/stream/src/common/table/test_storage_table.rs @@ -82,7 +82,7 @@ async fn test_storage_table_value_indices() { test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); - state.init_epoch(epoch); + state.init_epoch(epoch).await.unwrap(); state.insert(OwnedRow::new(vec![ Some(1_i32.into()), @@ -208,7 +208,7 @@ async fn test_shuffled_column_id_for_storage_table_get_row() { test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); - state.init_epoch(epoch); + state.init_epoch(epoch).await.unwrap(); let table = StorageTable::for_test( test_env.storage.clone(), @@ -327,7 +327,7 @@ async fn test_row_based_storage_table_point_get_in_batch_mode() { test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); - state.init_epoch(epoch); + state.init_epoch(epoch).await.unwrap(); state.insert(OwnedRow::new(vec![Some(1_i32.into()), None, None])); state.insert(OwnedRow::new(vec![ @@ -438,7 +438,7 @@ async fn test_batch_scan_with_value_indices() { test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); - state.init_epoch(epoch); + state.init_epoch(epoch).await.unwrap(); state.insert(OwnedRow::new(vec![ Some(1_i32.into()), @@ -542,7 +542,7 @@ async fn test_batch_scan_chunk_with_value_indices() { test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); - state.init_epoch(epoch); + state.init_epoch(epoch).await.unwrap(); let gen_row = |i: i32, is_update: bool| { let scale = if is_update { 10 } else { 1 }; diff --git a/src/stream/src/executor/aggregation/distinct.rs b/src/stream/src/executor/aggregation/distinct.rs index 3ec5aae2e97b..4025bddcae8c 100644 --- a/src/stream/src/executor/aggregation/distinct.rs +++ b/src/stream/src/executor/aggregation/distinct.rs @@ -375,9 +375,9 @@ mod tests { let store = MemoryStateStore::new(); let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); let mut dedup_tables = infer_dedup_tables(&agg_calls, &[], store).await; - dedup_tables - .values_mut() - .for_each(|table| table.init_epoch(epoch)); + for table in dedup_tables.values_mut() { + table.init_epoch(epoch).await.unwrap() + } let mut deduplicater = DistinctDeduplicater::new( &agg_calls, @@ -555,9 +555,9 @@ mod tests { let store = MemoryStateStore::new(); let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); let mut dedup_tables = infer_dedup_tables(&agg_calls, &group_key_types, store).await; - dedup_tables - .values_mut() - .for_each(|table| table.init_epoch(epoch)); + for table in dedup_tables.values_mut() { + table.init_epoch(epoch).await.unwrap() + } let mut deduplicater = DistinctDeduplicater::new( &agg_calls, diff --git a/src/stream/src/executor/aggregation/minput.rs b/src/stream/src/executor/aggregation/minput.rs index 440541186720..fe88faf4a2bd 100644 --- a/src/stream/src/executor/aggregation/minput.rs +++ b/src/stream/src/executor/aggregation/minput.rs @@ -410,7 +410,7 @@ mod tests { .unwrap(); let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); - table.init_epoch(epoch); + table.init_epoch(epoch).await.unwrap(); { let chunk = create_chunk( @@ -516,7 +516,7 @@ mod tests { .unwrap(); let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); - table.init_epoch(epoch); + table.init_epoch(epoch).await.unwrap(); { let chunk = create_chunk( @@ -619,8 +619,8 @@ mod tests { .await; let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); - table_1.init_epoch(epoch); - table_2.init_epoch(epoch); + table_1.init_epoch(epoch).await.unwrap(); + table_2.init_epoch(epoch).await.unwrap(); let order_columns_1 = vec![ ColumnOrder::new(0, OrderType::ascending()), // a ASC for AggKind::Min @@ -741,7 +741,7 @@ mod tests { .unwrap(); let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); - table.init_epoch(epoch); + table.init_epoch(epoch).await.unwrap(); { let chunk = create_chunk( @@ -830,7 +830,7 @@ mod tests { .await; let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); - table.init_epoch(epoch); + table.init_epoch(epoch).await.unwrap(); let order_columns = vec![ ColumnOrder::new(0, OrderType::ascending()), // a ASC for AggKind::Min @@ -956,7 +956,7 @@ mod tests { .unwrap(); let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); - table.init_epoch(epoch); + table.init_epoch(epoch).await.unwrap(); { let chunk = create_chunk( @@ -1075,7 +1075,7 @@ mod tests { .unwrap(); let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); - table.init_epoch(epoch); + table.init_epoch(epoch).await.unwrap(); { let chunk = create_chunk( @@ -1164,7 +1164,7 @@ mod tests { .unwrap(); let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); - table.init_epoch(epoch); + table.init_epoch(epoch).await.unwrap(); { let chunk = create_chunk( " T i i I diff --git a/src/stream/src/executor/approx_percentile/global.rs b/src/stream/src/executor/approx_percentile/global.rs index 2ccff36c4739..f6c70ba80fb6 100644 --- a/src/stream/src/executor/approx_percentile/global.rs +++ b/src/stream/src/executor/approx_percentile/global.rs @@ -52,9 +52,10 @@ impl GlobalApproxPercentileExecutor { // Initialize state let mut input_stream = self.input.execute(); let first_barrier = expect_first_barrier(&mut input_stream).await?; - let mut state = self.state; - state.init(first_barrier.epoch).await?; + let first_epoch = first_barrier.epoch; yield Message::Barrier(first_barrier); + let mut state = self.state; + state.init(first_epoch).await?; // Get row count state, and row_count. #[for_await] diff --git a/src/stream/src/executor/approx_percentile/global_state.rs b/src/stream/src/executor/approx_percentile/global_state.rs index 58011a8450c8..3f95a57e2261 100644 --- a/src/stream/src/executor/approx_percentile/global_state.rs +++ b/src/stream/src/executor/approx_percentile/global_state.rs @@ -60,8 +60,8 @@ impl GlobalApproxPercentileState { pub async fn init(&mut self, init_epoch: EpochPair) -> StreamExecutorResult<()> { // Init state tables. - self.count_state_table.init_epoch(init_epoch); - self.bucket_state_table.init_epoch(init_epoch); + self.count_state_table.init_epoch(init_epoch).await?; + self.bucket_state_table.init_epoch(init_epoch).await?; // Refill row_count let row_count_state = self.get_row_count_state().await?; diff --git a/src/stream/src/executor/asof_join.rs b/src/stream/src/executor/asof_join.rs index 74e55deca75f..f2a002e4f811 100644 --- a/src/stream/src/executor/asof_join.rs +++ b/src/stream/src/executor/asof_join.rs @@ -100,8 +100,8 @@ impl JoinSide { // self.ht.clear(); } - pub fn init(&mut self, epoch: EpochPair) { - self.ht.init(epoch); + pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { + self.ht.init(epoch).await } } @@ -340,11 +340,12 @@ impl AsOfJoinExecutor pin_mut!(aligned_stream); let barrier = expect_first_barrier_from_aligned_stream(&mut aligned_stream).await?; - self.side_l.init(barrier.epoch); - self.side_r.init(barrier.epoch); - + let first_epoch = barrier.epoch; // The first barrier message should be propagated. yield Message::Barrier(barrier); + self.side_l.init(first_epoch).await?; + self.side_r.init(first_epoch).await?; + let actor_id_str = self.ctx.id.to_string(); let fragment_id_str = self.ctx.fragment_id.to_string(); diff --git a/src/stream/src/executor/backfill/arrangement_backfill.rs b/src/stream/src/executor/backfill/arrangement_backfill.rs index 540ffe1a020f..e2d28bce04bc 100644 --- a/src/stream/src/executor/backfill/arrangement_backfill.rs +++ b/src/stream/src/executor/backfill/arrangement_backfill.rs @@ -137,7 +137,11 @@ where let first_barrier = expect_first_barrier(&mut upstream).await?; let mut paused = first_barrier.is_pause_on_startup(); let first_epoch = first_barrier.epoch; - self.state_table.init_epoch(first_barrier.epoch); + let is_newly_added = first_barrier.is_newly_added(self.actor_id); + // The first barrier message should be propagated. + yield Message::Barrier(first_barrier); + + self.state_table.init_epoch(first_epoch).await?; let progress_per_vnode = get_progress_per_vnode(&self.state_table).await?; @@ -148,11 +152,9 @@ where ) }); if is_completely_finished { - assert!(!first_barrier.is_newly_added(self.actor_id)); + assert!(!is_newly_added); } - // The first barrier message should be propagated. - yield Message::Barrier(first_barrier); upstream_table.init_epoch(first_epoch).await?; let mut backfill_state: BackfillState = progress_per_vnode.into(); diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index 066dc86ba551..e4cd24f58d4d 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -159,6 +159,9 @@ impl CdcBackfillExecutor { let first_barrier = expect_first_barrier(&mut upstream).await?; let mut is_snapshot_paused = first_barrier.is_pause_on_startup(); + let first_barrier_epoch = first_barrier.epoch; + // The first barrier message should be propagated. + yield Message::Barrier(first_barrier); let mut rate_limit_to_zero = self.rate_limit_rps.is_some_and(|val| val == 0); // Check whether this parallelism has been assigned splits, @@ -169,7 +172,7 @@ impl CdcBackfillExecutor { .boxed() .peekable(); - state_impl.init_epoch(first_barrier.epoch); + state_impl.init_epoch(first_barrier_epoch).await?; // restore backfill state let state = state_impl.restore_state().await?; @@ -177,9 +180,6 @@ impl CdcBackfillExecutor { let to_backfill = !self.options.disable_backfill && !state.is_finished; - // The first barrier message should be propagated. - yield Message::Barrier(first_barrier); - // Keep track of rows from the snapshot. let mut total_snapshot_row_count = state.row_count as u64; diff --git a/src/stream/src/executor/backfill/cdc/state.rs b/src/stream/src/executor/backfill/cdc/state.rs index 9fda6c811de1..b36e90e2a2b0 100644 --- a/src/stream/src/executor/backfill/cdc/state.rs +++ b/src/stream/src/executor/backfill/cdc/state.rs @@ -50,8 +50,8 @@ impl CdcBackfillState { } } - pub fn init_epoch(&mut self, epoch: EpochPair) { - self.state_table.init_epoch(epoch) + pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { + self.state_table.init_epoch(epoch).await } /// Restore the backfill state from storage diff --git a/src/stream/src/executor/backfill/no_shuffle_backfill.rs b/src/stream/src/executor/backfill/no_shuffle_backfill.rs index d8de07375d72..9d6d95ff7e7c 100644 --- a/src/stream/src/executor/backfill/no_shuffle_backfill.rs +++ b/src/stream/src/executor/backfill/no_shuffle_backfill.rs @@ -140,9 +140,13 @@ where // Poll the upstream to get the first barrier. let first_barrier = expect_first_barrier(&mut upstream).await?; let mut paused = first_barrier.is_pause_on_startup(); + let first_epoch = first_barrier.epoch; let init_epoch = first_barrier.epoch.prev; + // The first barrier message should be propagated. + yield Message::Barrier(first_barrier); + if let Some(state_table) = self.state_table.as_mut() { - state_table.init_epoch(first_barrier.epoch); + state_table.init_epoch(first_epoch).await?; } let BackfillState { @@ -162,9 +166,6 @@ where // which will then be persisted. let mut current_state: Vec = vec![None; state_len]; - // The first barrier message should be propagated. - yield Message::Barrier(first_barrier); - // If no need backfill, but state was still "unfinished" we need to finish it. // So we just update the state + progress to meta at the next barrier to finish progress, // and forward other messages. diff --git a/src/stream/src/executor/dedup/append_only_dedup.rs b/src/stream/src/executor/dedup/append_only_dedup.rs index 0a38c82f294f..7bd462064998 100644 --- a/src/stream/src/executor/dedup/append_only_dedup.rs +++ b/src/stream/src/executor/dedup/append_only_dedup.rs @@ -59,10 +59,10 @@ impl AppendOnlyDedupExecutor { // Consume the first barrier message and initialize state table. let barrier = expect_first_barrier(&mut input).await?; - self.state_table.init_epoch(barrier.epoch); - + let first_epoch = barrier.epoch; // The first barrier message should be propagated. yield Message::Barrier(barrier); + self.state_table.init_epoch(first_epoch).await?; #[for_await] for msg in input { diff --git a/src/stream/src/executor/dynamic_filter.rs b/src/stream/src/executor/dynamic_filter.rs index d34de5c009dd..8df058b9903a 100644 --- a/src/stream/src/executor/dynamic_filter.rs +++ b/src/stream/src/executor/dynamic_filter.rs @@ -321,8 +321,11 @@ impl DynamicFilterExecutor DynamicFilterExecutor HashAggExecutor { // First barrier let mut input = input.execute(); let barrier = expect_first_barrier(&mut input).await?; - this.all_state_tables_mut().for_each(|table| { - table.init_epoch(barrier.epoch); - }); - + let first_epoch = barrier.epoch; yield Message::Barrier(barrier); + for table in this.all_state_tables_mut() { + table.init_epoch(first_epoch).await?; + } #[for_await] for msg in input { diff --git a/src/stream/src/executor/hash_join.rs b/src/stream/src/executor/hash_join.rs index 6c3e51f5ee55..74db1d267b1a 100644 --- a/src/stream/src/executor/hash_join.rs +++ b/src/stream/src/executor/hash_join.rs @@ -115,8 +115,8 @@ impl JoinSide { // self.ht.clear(); } - pub fn init(&mut self, epoch: EpochPair) { - self.ht.init(epoch); + pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { + self.ht.init(epoch).await } } @@ -465,11 +465,12 @@ impl HashJoinExecutor JoinHashMap { } } - pub fn init(&mut self, epoch: EpochPair) { - self.state.table.init_epoch(epoch); + pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { + self.state.table.init_epoch(epoch).await?; if let Some(degree_state) = &mut self.degree_state { - degree_state.table.init_epoch(epoch); + degree_state.table.init_epoch(epoch).await?; } + Ok(()) } /// Update the vnode bitmap and manipulate the cache if necessary. diff --git a/src/stream/src/executor/mview/materialize.rs b/src/stream/src/executor/mview/materialize.rs index c2cbef8e6114..4bd446222e54 100644 --- a/src/stream/src/executor/mview/materialize.rs +++ b/src/stream/src/executor/mview/materialize.rs @@ -170,10 +170,10 @@ impl MaterializeExecutor { let mut input = self.input.execute(); let barrier = expect_first_barrier(&mut input).await?; - self.state_table.init_epoch(barrier.epoch); - + let first_epoch = barrier.epoch; // The first barrier message should be propagated. yield Message::Barrier(barrier); + self.state_table.init_epoch(first_epoch).await?; #[for_await] for msg in input { diff --git a/src/stream/src/executor/mview/test_utils.rs b/src/stream/src/executor/mview/test_utils.rs index c194142c0a49..b6dfcdcda1d3 100644 --- a/src/stream/src/executor/mview/test_utils.rs +++ b/src/stream/src/executor/mview/test_utils.rs @@ -55,7 +55,7 @@ pub async fn gen_basic_table(row_count: usize) -> StorageTable vec![0, 1, 2], ); let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); - state.init_epoch(epoch); + state.init_epoch(epoch).await.unwrap(); for idx in 0..row_count { let idx = idx as i32; diff --git a/src/stream/src/executor/now.rs b/src/stream/src/executor/now.rs index df77c459a878..0ee7069e2aef 100644 --- a/src/stream/src/executor/now.rs +++ b/src/stream/src/executor/now.rs @@ -127,9 +127,23 @@ impl NowExecutor { ); } for barrier in barriers { + let new_timestamp = Some(barrier.get_curr_epoch().as_scalar()); + let pause_mutation = + barrier + .mutation + .as_deref() + .and_then(|mutation| match mutation { + Mutation::Pause => Some(true), + Mutation::Resume => Some(false), + _ => None, + }); + if !initialized { + let first_epoch = barrier.epoch; + let is_pause_on_startup = barrier.is_pause_on_startup(); + yield Message::Barrier(barrier); // Handle the initial barrier. - state_table.init_epoch(barrier.epoch); + state_table.init_epoch(first_epoch).await?; let state_row = { let sub_range: &(Bound, Bound) = &(Unbounded, Unbounded); @@ -144,25 +158,20 @@ impl NowExecutor { } }; last_timestamp = state_row.and_then(|row| row[0].clone()); - paused = barrier.is_pause_on_startup(); + paused = is_pause_on_startup; initialized = true; } else { state_table.commit(barrier.epoch).await?; + yield Message::Barrier(barrier); } // Extract timestamp from the current epoch. - curr_timestamp = Some(barrier.get_curr_epoch().as_scalar()); + curr_timestamp = new_timestamp; // Update paused state. - if let Some(mutation) = barrier.mutation.as_deref() { - match mutation { - Mutation::Pause => paused = true, - Mutation::Resume => paused = false, - _ => {} - } + if let Some(pause_mutation) = pause_mutation { + paused = pause_mutation; } - - yield Message::Barrier(barrier); } // Do not yield any messages if paused. diff --git a/src/stream/src/executor/over_window/eowc.rs b/src/stream/src/executor/over_window/eowc.rs index 263b578d5d83..3974947389ee 100644 --- a/src/stream/src/executor/over_window/eowc.rs +++ b/src/stream/src/executor/over_window/eowc.rs @@ -347,9 +347,9 @@ impl EowcOverWindowExecutor { let mut input = input.execute(); let barrier = expect_first_barrier(&mut input).await?; - this.state_table.init_epoch(barrier.epoch); - + let first_epoch = barrier.epoch; yield Message::Barrier(barrier); + this.state_table.init_epoch(first_epoch).await?; #[for_await] for msg in input { diff --git a/src/stream/src/executor/over_window/general.rs b/src/stream/src/executor/over_window/general.rs index 16bc8065f2a0..fa17d5f5a78f 100644 --- a/src/stream/src/executor/over_window/general.rs +++ b/src/stream/src/executor/over_window/general.rs @@ -634,9 +634,9 @@ impl OverWindowExecutor { let mut input = input.execute(); let barrier = expect_first_barrier(&mut input).await?; - this.state_table.init_epoch(barrier.epoch); - + let first_epoch = barrier.epoch; yield Message::Barrier(barrier); + this.state_table.init_epoch(first_epoch).await?; #[for_await] for msg in input { diff --git a/src/stream/src/executor/simple_agg.rs b/src/stream/src/executor/simple_agg.rs index 6f8526661589..5bbd925dea72 100644 --- a/src/stream/src/executor/simple_agg.rs +++ b/src/stream/src/executor/simple_agg.rs @@ -241,9 +241,12 @@ impl SimpleAggExecutor { let mut input = input.execute(); let barrier = expect_first_barrier(&mut input).await?; - this.all_state_tables_mut().for_each(|table| { - table.init_epoch(barrier.epoch); - }); + let first_epoch = barrier.epoch; + yield Message::Barrier(barrier); + + for table in this.all_state_tables_mut() { + table.init_epoch(first_epoch).await?; + } let distinct_dedup = DistinctDeduplicater::new( &this.agg_calls, @@ -252,8 +255,6 @@ impl SimpleAggExecutor { &this.actor_ctx, ); - yield Message::Barrier(barrier); - // This will fetch previous agg states from the intermediate state table. let mut vars = ExecutionVars { agg_group: AggGroup::create( diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 206b8c498981..328bf07bf795 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -278,18 +278,15 @@ impl SinkExecutor { ) { pin_mut!(input); let barrier = expect_first_barrier(&mut input).await?; - let epoch_pair = barrier.epoch; + let is_pause_on_startup = barrier.is_pause_on_startup(); + // Propagate the first barrier + yield Message::Barrier(barrier); - log_writer - .init(epoch_pair, barrier.is_pause_on_startup()) - .await?; + log_writer.init(epoch_pair, is_pause_on_startup).await?; let mut is_paused = false; - // Propagate the first barrier - yield Message::Barrier(barrier); - #[for_await] for msg in input { match msg? { diff --git a/src/stream/src/executor/sort.rs b/src/stream/src/executor/sort.rs index 1556ebf419aa..98f840b8349e 100644 --- a/src/stream/src/executor/sort.rs +++ b/src/stream/src/executor/sort.rs @@ -76,8 +76,9 @@ impl SortExecutor { let mut input = input.execute(); let barrier = expect_first_barrier(&mut input).await?; - this.buffer_table.init_epoch(barrier.epoch); + let first_epoch = barrier.epoch; yield Message::Barrier(barrier); + this.buffer_table.init_epoch(first_epoch).await?; let mut vars = ExecutionVars { buffer: SortBuffer::new(this.sort_column_index, &this.buffer_table), diff --git a/src/stream/src/executor/source/fetch_executor.rs b/src/stream/src/executor/source/fetch_executor.rs index 766c42a5e2c8..ecea95a0484c 100644 --- a/src/stream/src/executor/source/fetch_executor.rs +++ b/src/stream/src/executor/source/fetch_executor.rs @@ -193,6 +193,9 @@ impl FsFetchExecutor { async fn into_stream(mut self) { let mut upstream = self.upstream.take().unwrap().execute(); let barrier = expect_first_barrier(&mut upstream).await?; + let first_epoch = barrier.epoch; + let is_pause_on_startup = barrier.is_pause_on_startup(); + yield Message::Barrier(barrier); let mut core = self.stream_source_core.take().unwrap(); let mut state_store_handler = core.split_state_store; @@ -209,13 +212,13 @@ impl FsFetchExecutor { unreachable!("Partition and offset columns must be set."); }; // Initialize state table. - state_store_handler.init_epoch(barrier.epoch); + state_store_handler.init_epoch(first_epoch).await?; let mut splits_on_fetch: usize = 0; let mut stream = StreamReaderWithPause::::new(upstream, stream::pending().boxed()); - if barrier.is_pause_on_startup() { + if is_pause_on_startup { stream.pause_stream(); } @@ -233,8 +236,6 @@ impl FsFetchExecutor { ) .await?; - yield Message::Barrier(barrier); - while let Some(msg) = stream.next().await { match msg { Err(e) => { diff --git a/src/stream/src/executor/source/fs_source_executor.rs b/src/stream/src/executor/source/fs_source_executor.rs index 0f3115c46c62..b6c8e888f26a 100644 --- a/src/stream/src/executor/source/fs_source_executor.rs +++ b/src/stream/src/executor/source/fs_source_executor.rs @@ -299,6 +299,16 @@ impl FsSourceExecutor { self.stream_source_core.source_id ) })?; + // If the first barrier requires us to pause on startup, pause the stream. + let start_with_paused = barrier.is_pause_on_startup(); + let first_epoch = barrier.epoch; + let mut boot_state = Vec::default(); + if let Some(splits) = barrier.initial_split_assignment(self.actor_ctx.id) { + boot_state = splits.to_vec(); + } + let boot_state = boot_state; + + yield Message::Barrier(barrier); let source_desc_builder: SourceDescBuilder = self.stream_source_core.source_desc_builder.take().unwrap(); @@ -312,17 +322,10 @@ impl FsSourceExecutor { unreachable!("Partition and offset columns must be set."); }; - // If the first barrier requires us to pause on startup, pause the stream. - let start_with_paused = barrier.is_pause_on_startup(); - - let mut boot_state = Vec::default(); - if let Some(splits) = barrier.initial_split_assignment(self.actor_ctx.id) { - boot_state = splits.to_vec(); - } - self.stream_source_core .split_state_store - .init_epoch(barrier.epoch); + .init_epoch(first_epoch) + .await?; let all_completed: HashSet = self .stream_source_core @@ -370,8 +373,6 @@ impl FsSourceExecutor { command_paused = true; } - yield Message::Barrier(barrier); - // We allow data to flow for 5 * `expected_barrier_latency_ms` milliseconds, considering // some other latencies like network and cost in Meta. let mut max_wait_barrier_time_ms = diff --git a/src/stream/src/executor/source/source_backfill_executor.rs b/src/stream/src/executor/source/source_backfill_executor.rs index 55806150bb87..9df74a719d46 100644 --- a/src/stream/src/executor/source/source_backfill_executor.rs +++ b/src/stream/src/executor/source/source_backfill_executor.rs @@ -326,6 +326,13 @@ impl SourceBackfillExecutorInner { // Poll the upstream to get the first barrier. let barrier = expect_first_barrier(&mut input).await?; + let first_epoch = barrier.epoch; + let owned_splits = barrier + .initial_split_assignment(self.actor_ctx.id) + .unwrap_or(&[]) + .to_vec(); + let is_pause_on_startup = barrier.is_pause_on_startup(); + yield Message::Barrier(barrier); let mut core = self.stream_source_core; @@ -338,12 +345,7 @@ impl SourceBackfillExecutorInner { unreachable!("Partition and offset columns must be set."); }; - let mut owned_splits = Vec::default(); - if let Some(splits) = barrier.initial_split_assignment(self.actor_ctx.id) { - owned_splits = splits.to_vec(); - } - - self.backfill_state_store.init_epoch(barrier.epoch); + self.backfill_state_store.init_epoch(first_epoch).await?; let mut backfill_states: BackfillStates = HashMap::new(); for split in &owned_splits { @@ -422,7 +424,7 @@ impl SourceBackfillExecutorInner { } // If the first barrier requires us to pause on startup, pause the stream. - if barrier.is_pause_on_startup() { + if is_pause_on_startup { command_paused = true; pause_reader!(); } @@ -433,7 +435,7 @@ impl SourceBackfillExecutorInner { tokio::spawn(async move { // This is for self.backfill_finished() to be safe. // We wait for 1st epoch's curr, i.e., the 2nd epoch's prev. - let epoch = barrier.epoch.curr; + let epoch = first_epoch.curr; tracing::info!("waiting for epoch: {}", epoch); state_store .try_wait_epoch( @@ -445,7 +447,6 @@ impl SourceBackfillExecutorInner { STATE_TABLE_INITIALIZED.call_once(|| ()); tracing::info!("finished waiting for epoch: {}", epoch); }); - yield Message::Barrier(barrier); { let source_backfill_row_count = self diff --git a/src/stream/src/executor/source/source_backfill_state_table.rs b/src/stream/src/executor/source/source_backfill_state_table.rs index b0ca8d363b9d..384a86211fdc 100644 --- a/src/stream/src/executor/source/source_backfill_state_table.rs +++ b/src/stream/src/executor/source/source_backfill_state_table.rs @@ -40,8 +40,8 @@ impl BackfillStateTableHandler { } } - pub fn init_epoch(&mut self, epoch: EpochPair) { - self.state_store.init_epoch(epoch); + pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { + self.state_store.init_epoch(epoch).await } fn string_to_scalar(rhs: impl Into) -> ScalarImpl { diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 6e583caf739a..32991b8e2b23 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -430,6 +430,17 @@ impl SourceExecutor { self.stream_source_core.as_ref().unwrap().source_id ) })?; + let first_epoch = barrier.epoch; + let mut boot_state = + if let Some(splits) = barrier.initial_split_assignment(self.actor_ctx.id) { + tracing::debug!(?splits, "boot with splits"); + splits.to_vec() + } else { + Vec::default() + }; + let is_pause_on_startup = barrier.is_pause_on_startup(); + + yield Message::Barrier(barrier); let mut core = self.stream_source_core.unwrap(); @@ -447,13 +458,7 @@ impl SourceExecutor { unreachable!("Partition and offset columns must be set."); }; - let mut boot_state = Vec::default(); - if let Some(splits) = barrier.initial_split_assignment(self.actor_ctx.id) { - tracing::debug!(?splits, "boot with splits"); - boot_state = splits.to_vec(); - } - - core.split_state_store.init_epoch(barrier.epoch); + core.split_state_store.init_epoch(first_epoch).await?; for ele in &mut boot_state { if let Some(recover_state) = core @@ -507,7 +512,7 @@ impl SourceExecutor { // - For shared source, pause until there's a MV. // - If the first barrier requires us to pause on startup, pause the stream. - if (self.is_shared && is_uninitialized) || barrier.is_pause_on_startup() { + if (self.is_shared && is_uninitialized) || is_pause_on_startup { tracing::info!( is_shared = self.is_shared, is_uninitialized = is_uninitialized, @@ -517,8 +522,6 @@ impl SourceExecutor { command_paused = true; } - yield Message::Barrier(barrier); - // We allow data to flow for `WAIT_BARRIER_MULTIPLE_TIMES` * `expected_barrier_latency_ms` // milliseconds, considering some other latencies like network and cost in Meta. let mut max_wait_barrier_time_ms = @@ -1108,7 +1111,10 @@ mod tests { ) .await; // there must exist state for new add partition - source_state_handler.init_epoch(EpochPair::new_test_epoch(test_epoch(2))); + source_state_handler + .init_epoch(EpochPair::new_test_epoch(test_epoch(2))) + .await + .unwrap(); source_state_handler .get(new_assignment[1].id()) .await @@ -1147,7 +1153,10 @@ mod tests { ) .await; - source_state_handler.init_epoch(EpochPair::new_test_epoch(5 * test_epoch(1))); + source_state_handler + .init_epoch(EpochPair::new_test_epoch(5 * test_epoch(1))) + .await + .unwrap(); assert!(source_state_handler .try_recover_from_state_store(&prev_assignment[0]) diff --git a/src/stream/src/executor/source/state_table_handler.rs b/src/stream/src/executor/source/state_table_handler.rs index 1c9615c542bf..dfc12fcd710d 100644 --- a/src/stream/src/executor/source/state_table_handler.rs +++ b/src/stream/src/executor/source/state_table_handler.rs @@ -71,8 +71,8 @@ impl SourceStateTableHandler { } } - pub fn init_epoch(&mut self, epoch: EpochPair) { - self.state_table.init_epoch(epoch); + pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { + self.state_table.init_epoch(epoch).await } fn string_to_scalar(rhs: impl Into) -> ScalarImpl { @@ -283,7 +283,7 @@ pub(crate) mod tests { let init_epoch = EpochPair::new_test_epoch(init_epoch_num); let next_epoch = EpochPair::new_test_epoch(init_epoch_num + test_epoch(1)); - state_table.init_epoch(init_epoch); + state_table.init_epoch(init_epoch).await.unwrap(); state_table.insert(OwnedRow::new(vec![a.clone(), b.clone()])); state_table.commit(next_epoch).await.unwrap(); @@ -308,7 +308,7 @@ pub(crate) mod tests { let epoch_2 = EpochPair::new_test_epoch(test_epoch(2)); let epoch_3 = EpochPair::new_test_epoch(test_epoch(3)); - state_table_handler.init_epoch(epoch_1); + state_table_handler.init_epoch(epoch_1).await?; state_table_handler .set_states(vec![split_impl.clone()]) .await?; diff --git a/src/stream/src/executor/temporal_join.rs b/src/stream/src/executor/temporal_join.rs index 8a994acb5e8d..c1d5cbc0486d 100644 --- a/src/stream/src/executor/temporal_join.rs +++ b/src/stream/src/executor/temporal_join.rs @@ -811,19 +811,29 @@ impl { + let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.ctx.id); + let barrier_epoch = barrier.epoch; if !APPEND_ONLY { if wait_first_barrier { wait_first_barrier = false; - self.memo_table.as_mut().unwrap().init_epoch(barrier.epoch); + yield Message::Barrier(barrier); + self.memo_table + .as_mut() + .unwrap() + .init_epoch(barrier_epoch) + .await?; } else { self.memo_table .as_mut() .unwrap() .commit(barrier.epoch) .await?; + yield Message::Barrier(barrier); } + } else { + yield Message::Barrier(barrier); } - if let Some(vnodes) = barrier.as_update_vnode_bitmap(self.ctx.id) { + if let Some(vnodes) = update_vnode_bitmap { let prev_vnodes = self.right_table.source.update_vnode_bitmap(vnodes.clone()); if cache_may_stale(&prev_vnodes, &vnodes) { @@ -835,8 +845,7 @@ impl StreamExecutorResult<()> { - self.managed_state.init_epoch(epoch); - Ok(()) + self.managed_state.init_epoch(epoch).await } async fn handle_watermark(&mut self, watermark: Watermark) -> Option { diff --git a/src/stream/src/executor/top_n/group_top_n_appendonly.rs b/src/stream/src/executor/top_n/group_top_n_appendonly.rs index 2f9bd2d54702..2cf1741169ce 100644 --- a/src/stream/src/executor/top_n/group_top_n_appendonly.rs +++ b/src/stream/src/executor/top_n/group_top_n_appendonly.rs @@ -202,8 +202,7 @@ where } async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { - self.managed_state.init_epoch(epoch); - Ok(()) + self.managed_state.init_epoch(epoch).await } async fn handle_watermark(&mut self, watermark: Watermark) -> Option { diff --git a/src/stream/src/executor/top_n/top_n_appendonly.rs b/src/stream/src/executor/top_n/top_n_appendonly.rs index 1b45c82c9c83..2dcf36b2250b 100644 --- a/src/stream/src/executor/top_n/top_n_appendonly.rs +++ b/src/stream/src/executor/top_n/top_n_appendonly.rs @@ -136,7 +136,7 @@ where } async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { - self.managed_state.init_epoch(epoch); + self.managed_state.init_epoch(epoch).await?; self.managed_state .init_topn_cache(NO_GROUP_KEY, &mut self.cache) .await diff --git a/src/stream/src/executor/top_n/top_n_plain.rs b/src/stream/src/executor/top_n/top_n_plain.rs index 5c9370a9f35d..ebddd801a579 100644 --- a/src/stream/src/executor/top_n/top_n_plain.rs +++ b/src/stream/src/executor/top_n/top_n_plain.rs @@ -170,7 +170,7 @@ where } async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { - self.managed_state.init_epoch(epoch); + self.managed_state.init_epoch(epoch).await?; self.managed_state .init_topn_cache(NO_GROUP_KEY, &mut self.cache) .await diff --git a/src/stream/src/executor/top_n/top_n_state.rs b/src/stream/src/executor/top_n/top_n_state.rs index 52e5fd176a97..919197f48ed3 100644 --- a/src/stream/src/executor/top_n/top_n_state.rs +++ b/src/stream/src/executor/top_n/top_n_state.rs @@ -68,8 +68,8 @@ impl ManagedTopNState { } /// Init epoch for the managed state table. - pub fn init_epoch(&mut self, epoch: EpochPair) { - self.state_table.init_epoch(epoch) + pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { + self.state_table.init_epoch(epoch).await } /// Update vnode bitmap of state table, returning `cache_may_stale`. @@ -352,7 +352,9 @@ mod tests { &[0, 1], ) .await; - tb.init_epoch(EpochPair::new_test_epoch(test_epoch(1))); + tb.init_epoch(EpochPair::new_test_epoch(test_epoch(1))) + .await + .unwrap(); tb }; @@ -432,7 +434,9 @@ mod tests { &[0, 1], ) .await; - tb.init_epoch(EpochPair::new_test_epoch(test_epoch(1))); + tb.init_epoch(EpochPair::new_test_epoch(test_epoch(1))) + .await + .unwrap(); tb }; @@ -479,7 +483,9 @@ mod tests { &[0, 1], ) .await; - tb.init_epoch(EpochPair::new_test_epoch(test_epoch(1))); + tb.init_epoch(EpochPair::new_test_epoch(test_epoch(1))) + .await + .unwrap(); tb }; diff --git a/src/stream/src/executor/top_n/utils.rs b/src/stream/src/executor/top_n/utils.rs index 207c6a6d9366..761556319e6b 100644 --- a/src/stream/src/executor/top_n/utils.rs +++ b/src/stream/src/executor/top_n/utils.rs @@ -87,9 +87,9 @@ where let mut input = self.input.execute(); let barrier = expect_first_barrier(&mut input).await?; - self.inner.init(barrier.epoch).await?; - + let barrier_epoch = barrier.epoch; yield Message::Barrier(barrier); + self.inner.init(barrier_epoch).await?; #[for_await] for msg in input { diff --git a/src/stream/src/executor/watermark_filter.rs b/src/stream/src/executor/watermark_filter.rs index 47a5448435b8..92a26d6c4e32 100644 --- a/src/stream/src/executor/watermark_filter.rs +++ b/src/stream/src/executor/watermark_filter.rs @@ -98,11 +98,12 @@ impl WatermarkFilterExecutor { let mut input = input.execute(); let first_barrier = expect_first_barrier(&mut input).await?; - let prev_epoch = first_barrier.epoch.prev; - table.init_epoch(first_barrier.epoch); + let first_epoch = first_barrier.epoch; let mut is_paused = first_barrier.is_pause_on_startup(); // The first barrier message should be propagated. yield Message::Barrier(first_barrier); + let prev_epoch = first_epoch.prev; + table.init_epoch(first_epoch).await?; // Initiate and yield the first watermark. let mut current_watermark = Self::get_global_max_watermark( diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 7ac42fe5c46d..0b5f6d6178e6 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -49,7 +49,7 @@ pub use progress::CreateMviewProgressReporter; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::runtime::BackgroundShutdownRuntime; use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; -use risingwave_hummock_sdk::{HummockVersionId, LocalSstableInfo, SyncResult}; +use risingwave_hummock_sdk::{LocalSstableInfo, SyncResult}; use risingwave_pb::stream_plan::barrier::BarrierKind; use risingwave_pb::stream_service::streaming_control_stream_request::{InitRequest, Request}; use risingwave_pb::stream_service::streaming_control_stream_response::{ @@ -334,7 +334,7 @@ impl LocalBarrierWorker { match actor_op { LocalActorOperation::NewControlStream { handle, init_request } => { self.control_stream_handle.reset_stream_with_err(Status::internal("control stream has been reset to a new one")); - self.reset(HummockVersionId::new(init_request.version_id)).await; + self.reset().await; self.state.add_subscriptions(init_request.subscriptions); self.control_stream_handle = handle; self.control_stream_handle.send_response(StreamingControlStreamResponse { @@ -916,7 +916,6 @@ pub(crate) mod barrier_test_utils { UnboundedReceiverStream::new(request_rx).boxed(), ), init_request: InitRequest { - version_id: 0, subscriptions: vec![], }, }); diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 799f98fc1a84..361c5d9582d6 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -28,7 +28,6 @@ use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{ColumnId, Field, Schema, TableId}; use risingwave_common::config::MetricLevel; use risingwave_common::{bail, must_match}; -use risingwave_hummock_sdk::HummockVersionId; use risingwave_pb::common::ActorInfo; use risingwave_pb::plan_common::StorageTableDesc; use risingwave_pb::stream_plan; @@ -249,7 +248,7 @@ impl LocalStreamManager { impl LocalBarrierWorker { /// Force stop all actors on this worker, and then drop their resources. - pub(super) async fn reset(&mut self, version_id: HummockVersionId) { + pub(super) async fn reset(&mut self) { self.state.abort_actors().await; if let Some(m) = self.actor_manager.await_tree_reg.as_ref() { m.clear(); @@ -257,7 +256,7 @@ impl LocalBarrierWorker { if let Some(hummock) = self.actor_manager.env.state_store().as_hummock() { hummock - .clear_shared_buffer(version_id) + .clear_shared_buffer() .verbose_instrument_await("store_clear_shared_buffer") .await }