diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 60bbb4cee7fa..92fc4dc31a2e 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -767,10 +767,10 @@ impl GlobalBarrierManager { send_latency_timer.observe_duration(); - let node_to_collect = match self - .control_stream_manager - .inject_barrier(command_ctx.clone()) - { + let node_to_collect = match self.control_stream_manager.inject_barrier( + command_ctx.clone(), + self.state.inflight_actor_infos.existing_table_ids(), + ) { Ok(node_to_collect) => node_to_collect, Err(err) => { for notifier in notifiers { diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 3bb51b3b5ef7..0ead9779e914 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -477,8 +477,8 @@ impl GlobalBarrierManager { tracing::Span::current(), // recovery span )); - let mut node_to_collect = - control_stream_manager.inject_barrier(command_ctx.clone())?; + let mut node_to_collect = control_stream_manager + .inject_barrier(command_ctx.clone(), info.existing_table_ids())?; while !node_to_collect.is_empty() { let (worker_id, prev_epoch, _) = control_stream_manager .next_complete_barrier_response() diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 0a7e6d4e1e95..c1a337bde046 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -24,6 +24,7 @@ use futures::future::try_join_all; use futures::stream::{BoxStream, FuturesUnordered}; use futures::{pin_mut, FutureExt, StreamExt}; use itertools::Itertools; +use risingwave_common::catalog::TableId; use risingwave_common::hash::ActorId; use risingwave_common::util::tracing::TracingContext; use risingwave_pb::common::{ActorInfo, WorkerNode}; @@ -247,6 +248,7 @@ impl ControlStreamManager { pub(super) fn inject_barrier( &mut self, command_context: Arc, + table_ids_to_sync: HashSet, ) -> MetaResult> { fail_point!("inject_barrier_err", |_| risingwave_common::bail!( "inject_barrier_err" @@ -263,9 +265,13 @@ impl ControlStreamManager { if actor_ids_to_collect.is_empty() { // No need to send or collect barrier for this node. assert!(actor_ids_to_send.is_empty()); - Ok(()) - } else { + } + { let Some(node) = self.nodes.get_mut(node_id) else { + if actor_ids_to_collect.is_empty() { + // Worker node get disconnected but has no actor to collect. Simply skip it. + return Ok(()); + } return Err( anyhow!("unconnected worker node: {:?}", worker_node.host).into() ); @@ -294,9 +300,7 @@ impl ControlStreamManager { barrier: Some(barrier), actor_ids_to_send, actor_ids_to_collect, - table_ids_to_sync: command_context - .info - .existing_table_ids() + table_ids_to_sync: table_ids_to_sync .iter() .map(|table_id| table_id.table_id) .collect(), diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 96f237704abf..9f862e3300dc 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -15,7 +15,7 @@ #[cfg(test)] pub(crate) mod tests { - use std::collections::{BTreeMap, BTreeSet, VecDeque}; + use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque}; use std::ops::Bound; use std::sync::Arc; @@ -156,6 +156,9 @@ pub(crate) mod tests { value_size: usize, epochs: Vec, ) { + for epoch in &epochs { + storage.start_epoch(*epoch, HashSet::from_iter([Default::default()])); + } let mut local = storage .new_local(NewLocalOptions::for_test(TableId::default())) .await; @@ -534,17 +537,16 @@ pub(crate) mod tests { existing_table_id: u32, keys_per_epoch: usize, ) { + let table_id = existing_table_id.into(); let kv_count: u16 = 128; let mut epoch = test_epoch(1); - let mut local = storage - .new_local(NewLocalOptions::for_test(existing_table_id.into())) - .await; + let mut local = storage.new_local(NewLocalOptions::for_test(table_id)).await; + + storage.start_epoch(epoch, HashSet::from_iter([table_id])); // 1. add sstables let val = Bytes::from(b"0"[..].repeat(1 << 10)); // 1024 Byte value for idx in 0..kv_count { - epoch.inc_epoch(); - if idx == 0 { local.init_for_test(epoch).await.unwrap(); } @@ -559,9 +561,11 @@ pub(crate) mod tests { } local.flush().await.unwrap(); let next_epoch = epoch.next_epoch(); + storage.start_epoch(next_epoch, HashSet::from_iter([table_id])); local.seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()); flush_and_commit(&hummock_meta_client, storage, epoch).await; + epoch.inc_epoch(); } } @@ -727,9 +731,10 @@ pub(crate) mod tests { .await; let vnode = VirtualNode::from_index(1); + global_storage.start_epoch(epoch, HashSet::from_iter([1.into(), 2.into()])); for index in 0..kv_count { - epoch.inc_epoch(); let next_epoch = epoch.next_epoch(); + global_storage.start_epoch(next_epoch, HashSet::from_iter([1.into(), 2.into()])); if index == 0 { storage_1.init_for_test(epoch).await.unwrap(); storage_2.init_for_test(epoch).await.unwrap(); @@ -755,6 +760,7 @@ pub(crate) mod tests { let res = global_storage.seal_and_sync_epoch(epoch).await.unwrap(); hummock_meta_client.commit_epoch(epoch, res).await.unwrap(); + epoch.inc_epoch(); } // Mimic dropping table @@ -838,7 +844,6 @@ pub(crate) mod tests { .unwrap(); assert!(compact_task.is_none()); - epoch.inc_epoch(); // to update version for hummock_storage global_storage.wait_version(version).await; @@ -921,12 +926,14 @@ pub(crate) mod tests { let vnode = VirtualNode::from_index(1); let mut epoch_set = BTreeSet::new(); + storage.start_epoch(epoch, HashSet::from_iter([existing_table_id.into()])); + let mut local = storage .new_local(NewLocalOptions::for_test(existing_table_id.into())) .await; for i in 0..kv_count { - epoch += millisec_interval_epoch; let next_epoch = epoch + millisec_interval_epoch; + storage.start_epoch(next_epoch, HashSet::from_iter([existing_table_id.into()])); if i == 0 { local.init_for_test(epoch).await.unwrap(); } @@ -944,6 +951,7 @@ pub(crate) mod tests { let res = storage.seal_and_sync_epoch(epoch).await.unwrap(); hummock_meta_client.commit_epoch(epoch, res).await.unwrap(); + epoch += millisec_interval_epoch; } let manual_compcation_option = ManualCompactionOption { @@ -969,7 +977,10 @@ pub(crate) mod tests { retention_seconds: Some(retention_seconds_expire_second), }, )]); - compact_task.current_epoch_time = epoch; + compact_task.current_epoch_time = hummock_manager_ref + .get_current_version() + .await + .max_committed_epoch; // assert compact_task assert_eq!( @@ -1123,12 +1134,13 @@ pub(crate) mod tests { let mut local = storage .new_local(NewLocalOptions::for_test(existing_table_id.into())) .await; + storage.start_epoch(epoch, HashSet::from_iter([existing_table_id.into()])); for i in 0..kv_count { - epoch += millisec_interval_epoch; if i == 0 { local.init_for_test(epoch).await.unwrap(); } let next_epoch = epoch + millisec_interval_epoch; + storage.start_epoch(next_epoch, HashSet::from_iter([existing_table_id.into()])); epoch_set.insert(epoch); let ramdom_key = [key_prefix.as_ref(), &rand::thread_rng().gen::<[u8; 32]>()].concat(); @@ -1139,6 +1151,7 @@ pub(crate) mod tests { local.seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()); let res = storage.seal_and_sync_epoch(epoch).await.unwrap(); hummock_meta_client.commit_epoch(epoch, res).await.unwrap(); + epoch += millisec_interval_epoch; } let manual_compcation_option = ManualCompactionOption { @@ -1166,7 +1179,10 @@ pub(crate) mod tests { let compaction_filter_flag = CompactionFilterFlag::STATE_CLEAN | CompactionFilterFlag::TTL; compact_task.compaction_filter_mask = compaction_filter_flag.bits(); - compact_task.current_epoch_time = epoch; + compact_task.current_epoch_time = hummock_manager_ref + .get_current_version() + .await + .max_committed_epoch; // 3. compact let (_tx, rx) = tokio::sync::oneshot::channel(); diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index 8d721e9e560c..b9e576b547d7 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -1,3 +1,4 @@ +use std::collections::HashSet; // Copyright 2024 RisingWave Labs // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -101,6 +102,9 @@ async fn test_storage_basic() { // epoch 0 is reserved by storage service let epoch1 = test_epoch(1); + test_env + .storage + .start_epoch(epoch1, HashSet::from_iter([TEST_TABLE_ID])); hummock_storage.init_for_test(epoch1).await.unwrap(); // Write the first batch. @@ -165,6 +169,9 @@ async fn test_storage_basic() { assert_eq!(value, None); let epoch2 = epoch1.next_epoch(); + test_env + .storage + .start_epoch(epoch2, HashSet::from_iter([TEST_TABLE_ID])); hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); hummock_storage .ingest_batch( @@ -197,6 +204,9 @@ async fn test_storage_basic() { // Write the third batch. let epoch3 = epoch2.next_epoch(); + test_env + .storage + .start_epoch(epoch3, HashSet::from_iter([TEST_TABLE_ID])); hummock_storage.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()); hummock_storage .ingest_batch( @@ -457,6 +467,9 @@ async fn test_state_store_sync() { let base_epoch = read_version.read().committed().max_committed_epoch(); let epoch1 = test_epoch(base_epoch.next_epoch()); + test_env + .storage + .start_epoch(epoch1, HashSet::from_iter([TEST_TABLE_ID])); hummock_storage.init_for_test(epoch1).await.unwrap(); // ingest 16B batch @@ -511,6 +524,9 @@ async fn test_state_store_sync() { .unwrap(); let epoch2 = epoch1.next_epoch(); + test_env + .storage + .start_epoch(epoch2, HashSet::from_iter([TEST_TABLE_ID])); hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); // ingest more 8B then will trigger a sync behind the scene @@ -531,6 +547,9 @@ async fn test_state_store_sync() { .unwrap(); let epoch3 = epoch2.next_epoch(); + test_env + .storage + .start_epoch(epoch3, HashSet::from_iter([TEST_TABLE_ID])); hummock_storage.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()); let res = test_env.storage.seal_and_sync_epoch(epoch1).await.unwrap(); @@ -809,6 +828,9 @@ async fn test_delete_get() { .max_committed_epoch(); let epoch1 = initial_epoch.next_epoch(); + test_env + .storage + .start_epoch(epoch1, HashSet::from_iter([TEST_TABLE_ID])); hummock_storage.init_for_test(epoch1).await.unwrap(); let batch1 = vec![ @@ -833,6 +855,9 @@ async fn test_delete_get() { .unwrap(); let epoch2 = epoch1.next_epoch(); + test_env + .storage + .start_epoch(epoch2, HashSet::from_iter([TEST_TABLE_ID])); hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); let res = test_env.storage.seal_and_sync_epoch(epoch1).await.unwrap(); test_env @@ -896,6 +921,9 @@ async fn test_multiple_epoch_sync() { .max_committed_epoch(); let epoch1 = initial_epoch.next_epoch(); + test_env + .storage + .start_epoch(epoch1, HashSet::from_iter([TEST_TABLE_ID])); hummock_storage.init_for_test(epoch1).await.unwrap(); let batch1 = vec![ ( @@ -919,6 +947,9 @@ async fn test_multiple_epoch_sync() { .unwrap(); let epoch2 = epoch1.next_epoch(); + test_env + .storage + .start_epoch(epoch2, HashSet::from_iter([TEST_TABLE_ID])); hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); let batch2 = vec![( gen_key_from_str(VirtualNode::ZERO, "bb"), @@ -936,6 +967,9 @@ async fn test_multiple_epoch_sync() { .unwrap(); let epoch3 = epoch2.next_epoch(); + test_env + .storage + .start_epoch(epoch3, HashSet::from_iter([TEST_TABLE_ID])); hummock_storage.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()); let batch3 = vec![ ( @@ -1011,6 +1045,9 @@ async fn test_multiple_epoch_sync() { test_get().await; let epoch4 = epoch3.next_epoch(); + test_env + .storage + .start_epoch(epoch4, HashSet::from_iter([TEST_TABLE_ID])); hummock_storage.seal_current_epoch(epoch4, SealCurrentEpochOptions::for_test()); test_env.storage.seal_epoch(epoch1, false); let sync_result2 = test_env.storage.seal_and_sync_epoch(epoch2).await.unwrap(); @@ -1043,6 +1080,9 @@ async fn test_iter_with_min_epoch() { .await; let epoch1 = (31 * 1000) << 16; + test_env + .storage + .start_epoch(epoch1, HashSet::from_iter([TEST_TABLE_ID])); let gen_key = |index: usize| -> TableKey { gen_key_from_str(VirtualNode::ZERO, format!("\0\0key_{}", index).as_str()) @@ -1069,6 +1109,9 @@ async fn test_iter_with_min_epoch() { .unwrap(); let epoch2 = (32 * 1000) << 16; + test_env + .storage + .start_epoch(epoch2, HashSet::from_iter([TEST_TABLE_ID])); hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); // epoch 2 write let batch_epoch2: Vec<(TableKey, StorageValue)> = (20..30) @@ -1087,6 +1130,9 @@ async fn test_iter_with_min_epoch() { .unwrap(); let epoch3 = (33 * 1000) << 16; + test_env + .storage + .start_epoch(epoch3, HashSet::from_iter([TEST_TABLE_ID])); hummock_storage.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()); { @@ -1279,6 +1325,9 @@ async fn test_hummock_version_reader() { let hummock_version_reader = test_env.storage.version_reader(); let epoch1 = (31 * 1000) << 16; + test_env + .storage + .start_epoch(epoch1, HashSet::from_iter([TEST_TABLE_ID])); let gen_key = |index: usize| -> TableKey { gen_key_from_str(VirtualNode::ZERO, format!("\0\0key_{}", index).as_str()) @@ -1292,12 +1341,18 @@ async fn test_hummock_version_reader() { .collect(); let epoch2 = (32 * 1000) << 16; + test_env + .storage + .start_epoch(epoch2, HashSet::from_iter([TEST_TABLE_ID])); // epoch 2 write let batch_epoch2: Vec<(TableKey, StorageValue)> = (20..30) .map(|index| (gen_key(index), StorageValue::new_put(gen_val(index)))) .collect(); let epoch3 = (33 * 1000) << 16; + test_env + .storage + .start_epoch(epoch3, HashSet::from_iter([TEST_TABLE_ID])); // epoch 3 write let batch_epoch3: Vec<(TableKey, StorageValue)> = (40..50) .map(|index| (gen_key(index), StorageValue::new_put(gen_val(index)))) @@ -1340,6 +1395,9 @@ async fn test_hummock_version_reader() { .unwrap(); let epoch4 = (34 * 1000) << 16; + test_env + .storage + .start_epoch(epoch4, HashSet::from_iter([TEST_TABLE_ID])); hummock_storage.seal_current_epoch(epoch4, SealCurrentEpochOptions::for_test()); { @@ -1710,6 +1768,9 @@ async fn test_get_with_min_epoch() { .await; let epoch1 = (31 * 1000) << 16; + test_env + .storage + .start_epoch(epoch1, HashSet::from_iter([TEST_TABLE_ID])); hummock_storage.init_for_test(epoch1).await.unwrap(); let gen_key = |index: usize| -> TableKey { @@ -1735,6 +1796,9 @@ async fn test_get_with_min_epoch() { .unwrap(); let epoch2 = (32 * 1000) << 16; + test_env + .storage + .start_epoch(epoch2, HashSet::from_iter([TEST_TABLE_ID])); hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); // epoch 2 write let batch_epoch2: Vec<(TableKey, StorageValue)> = (20..30) @@ -1969,6 +2033,9 @@ async fn test_table_watermark() { }); let epoch1 = (31 * 1000) << 16; + test_env + .storage + .start_epoch(epoch1, HashSet::from_iter([TEST_TABLE_ID])); local1.init_for_test(epoch1).await.unwrap(); local1.update_vnode_bitmap(vnode_bitmap1.clone()); local2.init_for_test(epoch1).await.unwrap(); @@ -2057,6 +2124,9 @@ async fn test_table_watermark() { let watermark1 = 10; let epoch2 = (32 * 1000) << 16; + test_env + .storage + .start_epoch(epoch2, HashSet::from_iter([TEST_TABLE_ID])); for (local, vnode_bitmap) in [ (&mut local1, vnode_bitmap1.clone()), (&mut local2, vnode_bitmap2.clone()), @@ -2159,6 +2229,9 @@ async fn test_table_watermark() { let batch2_epoch2 = gen_batch(vnode2, epoch2_indexes()); let epoch3 = (33 * 1000) << 16; + test_env + .storage + .start_epoch(epoch3, HashSet::from_iter([TEST_TABLE_ID])); for (local, batch) in [(&mut local1, batch1_epoch2), (&mut local2, batch2_epoch2)] { for (key, value) in batch { @@ -2372,6 +2445,9 @@ async fn test_table_watermark() { let (mut local1, mut local2) = test_after_epoch2(local1, local2).await; let epoch4 = (34 * 1000) << 16; + test_env + .storage + .start_epoch(epoch4, HashSet::from_iter([TEST_TABLE_ID])); for (local, vnode_bitmap) in [ (&mut local1, vnode_bitmap1.clone()), diff --git a/src/storage/hummock_test/src/snapshot_tests.rs b/src/storage/hummock_test/src/snapshot_tests.rs index 402952dd0968..bde3c046ed6c 100644 --- a/src/storage/hummock_test/src/snapshot_tests.rs +++ b/src/storage/hummock_test/src/snapshot_tests.rs @@ -1,3 +1,4 @@ +use std::collections::HashSet; // Copyright 2024 RisingWave Labs // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -113,6 +114,7 @@ async fn test_snapshot_inner( .await; let epoch1 = test_epoch(1); + hummock_storage.start_epoch(epoch1, HashSet::from_iter([Default::default()])); local.init_for_test(epoch1).await.unwrap(); local .ingest_batch( @@ -134,6 +136,7 @@ async fn test_snapshot_inner( .await .unwrap(); let epoch2 = epoch1.next_epoch(); + hummock_storage.start_epoch(epoch2, HashSet::from_iter([Default::default()])); local.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); if enable_sync { let res = hummock_storage.seal_and_sync_epoch(epoch1).await.unwrap(); @@ -174,6 +177,7 @@ async fn test_snapshot_inner( .await .unwrap(); let epoch3 = epoch2.next_epoch(); + hummock_storage.start_epoch(epoch3, HashSet::from_iter([Default::default()])); local.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()); if enable_sync { let res = hummock_storage.seal_and_sync_epoch(epoch2).await.unwrap(); @@ -243,6 +247,7 @@ async fn test_snapshot_range_scan_inner( let mut local = hummock_storage .new_local(NewLocalOptions::for_test(Default::default())) .await; + hummock_storage.start_epoch(epoch, HashSet::from_iter([Default::default()])); local.init_for_test(epoch).await.unwrap(); local diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index 2ed1f4359aaa..5c13d73f07ec 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; use std::ops::Bound; use std::ops::Bound::{Excluded, Included, Unbounded}; use std::sync::Arc; @@ -25,7 +26,7 @@ use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::hash::table_distribution::TableDistribution; use risingwave_common::hash::VirtualNode; -use risingwave_common::util::epoch::{test_epoch, EpochExt}; +use risingwave_common::util::epoch::{test_epoch, EpochExt, MAX_EPOCH}; use risingwave_hummock_sdk::key::{prefixed_range_with_vnode, TableKeyRange}; use risingwave_hummock_sdk::{ HummockReadEpoch, HummockSstableObjectId, LocalSstableInfo, SyncResult, @@ -133,6 +134,7 @@ async fn test_basic_v2() { // epoch 0 is reserved by storage service let epoch1 = test_epoch(1); + hummock_storage.start_epoch(epoch1, HashSet::from_iter([Default::default()])); local.init_for_test(epoch1).await.unwrap(); // try to write an empty batch, and hummock should write nothing @@ -162,6 +164,7 @@ async fn test_basic_v2() { .unwrap(); let epoch2 = epoch1.next_epoch(); + hummock_storage.start_epoch(epoch2, HashSet::from_iter([Default::default()])); local.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); // Get the value after flushing to remote. @@ -219,6 +222,7 @@ async fn test_basic_v2() { .unwrap(); let epoch3 = epoch2.next_epoch(); + hummock_storage.start_epoch(epoch3, HashSet::from_iter([Default::default()])); local.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()); // Get the value after flushing to remote. @@ -432,6 +436,7 @@ async fn test_state_store_sync_v2() { let mut local = hummock_storage .new_local(NewLocalOptions::for_test(Default::default())) .await; + hummock_storage.start_epoch(epoch, HashSet::from_iter([Default::default()])); local.init_for_test(epoch).await.unwrap(); local .ingest_batch( @@ -481,6 +486,7 @@ async fn test_state_store_sync_v2() { // ); epoch.inc_epoch(); + hummock_storage.start_epoch(epoch, HashSet::from_iter([Default::default()])); local.seal_current_epoch(epoch, SealCurrentEpochOptions::for_test()); // ingest more 8B then will trigger a sync behind the scene @@ -1022,6 +1028,7 @@ async fn test_delete_get_v2() { let initial_epoch = hummock_storage.get_pinned_version().max_committed_epoch(); let epoch1 = initial_epoch.next_epoch(); + hummock_storage.start_epoch(epoch1, HashSet::from_iter([Default::default()])); let batch1 = vec![ ( gen_key_from_str(VirtualNode::ZERO, "aa"), @@ -1047,6 +1054,7 @@ async fn test_delete_get_v2() { .await .unwrap(); let epoch2 = epoch1.next_epoch(); + hummock_storage.start_epoch(epoch2, HashSet::from_iter([Default::default()])); local.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); let res = hummock_storage.seal_and_sync_epoch(epoch1).await.unwrap(); @@ -1107,6 +1115,7 @@ async fn test_multiple_epoch_sync_v2() { let mut local = hummock_storage .new_local(NewLocalOptions::for_test(TableId::default())) .await; + hummock_storage.start_epoch(epoch1, HashSet::from_iter([Default::default()])); local.init_for_test(epoch1).await.unwrap(); local .ingest_batch( @@ -1120,6 +1129,7 @@ async fn test_multiple_epoch_sync_v2() { .unwrap(); let epoch2 = epoch1.next_epoch(); + hummock_storage.start_epoch(epoch2, HashSet::from_iter([Default::default()])); local.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); let batch2 = vec![( gen_key_from_str(VirtualNode::ZERO, "bb"), @@ -1137,6 +1147,7 @@ async fn test_multiple_epoch_sync_v2() { .unwrap(); let epoch3 = epoch2.next_epoch(); + hummock_storage.start_epoch(epoch3, HashSet::from_iter([Default::default()])); let batch3 = vec![ ( gen_key_from_str(VirtualNode::ZERO, "aa"), @@ -1245,6 +1256,7 @@ async fn test_gc_watermark_and_clear_shared_buffer() { let initial_epoch = hummock_storage.get_pinned_version().max_committed_epoch(); let epoch1 = initial_epoch.next_epoch(); + hummock_storage.start_epoch(epoch1, HashSet::from_iter([Default::default()])); local_hummock_storage.init_for_test(epoch1).await.unwrap(); local_hummock_storage .insert( @@ -1270,6 +1282,7 @@ async fn test_gc_watermark_and_clear_shared_buffer() { ); let epoch2 = epoch1.next_epoch(); + hummock_storage.start_epoch(epoch2, HashSet::from_iter([Default::default()])); local_hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); local_hummock_storage .delete( @@ -1536,6 +1549,10 @@ async fn test_iter_log() { let key_count = 10000; let test_log_data = gen_test_data(epoch_count, key_count, 0.05, 0.2); + for (epoch, _) in &test_log_data { + hummock_storage.start_epoch(*epoch, HashSet::from_iter([table_id])); + } + hummock_storage.start_epoch(MAX_EPOCH, HashSet::from_iter([table_id])); let in_memory_state_store = MemoryStateStore::new(); let mut in_memory_local = in_memory_state_store diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index b126974c7c08..f4038f0d7d52 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -737,6 +737,9 @@ impl HummockEventHandler { HummockEvent::Shutdown => { unreachable!("shutdown is handled specially") } + HummockEvent::StartEpoch { epoch, table_ids } => { + self.uploader.start_epoch(epoch, table_ids); + } HummockEvent::InitEpoch { instance_id, init_epoch, @@ -1146,6 +1149,11 @@ mod tests { rx.await.unwrap() }; + send_event(HummockEvent::StartEpoch { + epoch: epoch1, + table_ids: HashSet::from_iter([TEST_TABLE_ID]), + }); + send_event(HummockEvent::InitEpoch { instance_id: guard.instance_id, init_epoch: epoch1, @@ -1161,6 +1169,11 @@ mod tests { imm: imm1, }); + send_event(HummockEvent::StartEpoch { + epoch: epoch2, + table_ids: HashSet::from_iter([TEST_TABLE_ID]), + }); + send_event(HummockEvent::LocalSealEpoch { instance_id: guard.instance_id, next_epoch: epoch2, @@ -1178,6 +1191,10 @@ mod tests { }); let epoch3 = epoch2.next_epoch(); + send_event(HummockEvent::StartEpoch { + epoch: epoch3, + table_ids: HashSet::from_iter([TEST_TABLE_ID]), + }); send_event(HummockEvent::LocalSealEpoch { instance_id: guard.instance_id, next_epoch: epoch3, diff --git a/src/storage/src/hummock/event_handler/mod.rs b/src/storage/src/hummock/event_handler/mod.rs index 996d5d6a6df7..74a69bfa7194 100644 --- a/src/storage/src/hummock/event_handler/mod.rs +++ b/src/storage/src/hummock/event_handler/mod.rs @@ -74,6 +74,11 @@ pub enum HummockEvent { imm: ImmutableMemtable, }, + StartEpoch { + epoch: HummockEpoch, + table_ids: HashSet, + }, + InitEpoch { instance_id: LocalInstanceId, init_epoch: HummockEpoch, @@ -117,6 +122,10 @@ impl HummockEvent { HummockEvent::Shutdown => "Shutdown".to_string(), + HummockEvent::StartEpoch { epoch, table_ids } => { + format!("StartEpoch {} {:?}", epoch, table_ids) + } + HummockEvent::InitEpoch { instance_id, init_epoch, diff --git a/src/storage/src/hummock/event_handler/uploader/mod.rs b/src/storage/src/hummock/event_handler/uploader/mod.rs index 06f0c3aff77a..f0a18aa9eca6 100644 --- a/src/storage/src/hummock/event_handler/uploader/mod.rs +++ b/src/storage/src/hummock/event_handler/uploader/mod.rs @@ -615,6 +615,11 @@ struct TableUnsyncData { BTreeMap, BitmapBuilder)>, )>, spill_tasks: BTreeMap>, + unsync_epochs: BTreeMap, + // Initialized to be `None`. Transform to `Some(_)` when called + // `local_seal_epoch` with a non-existing epoch, to mark that + // the fragment of the table has stopped. + stopped_next_epoch: Option, // newer epoch at the front syncing_epochs: VecDeque, max_synced_epoch: Option, @@ -627,11 +632,21 @@ impl TableUnsyncData { instance_data: Default::default(), table_watermarks: None, spill_tasks: Default::default(), + unsync_epochs: Default::default(), + stopped_next_epoch: None, syncing_epochs: Default::default(), max_synced_epoch: committed_epoch, } } + fn new_epoch(&mut self, epoch: HummockEpoch) { + debug!(table_id = ?self.table_id, epoch, "table new epoch"); + if let Some(latest_epoch) = self.max_epoch() { + assert_gt!(epoch, latest_epoch); + } + self.unsync_epochs.insert(epoch, ()); + } + fn sync( &mut self, epoch: HummockEpoch, @@ -646,6 +661,13 @@ impl TableUnsyncData { if let Some(prev_epoch) = self.max_sync_epoch() { assert_gt!(epoch, prev_epoch) } + let epochs = take_before_epoch(&mut self.unsync_epochs, epoch); + assert_eq!( + *epochs.last_key_value().expect("non-empty").0, + epoch, + "{epochs:?} {epoch} {:?}", + self.table_id + ); self.syncing_epochs.push_front(epoch); ( self.instance_data @@ -711,8 +733,17 @@ impl TableUnsyncData { .or(self.max_synced_epoch) } + fn max_epoch(&self) -> Option { + self.unsync_epochs + .first_key_value() + .map(|(epoch, _)| *epoch) + .or_else(|| self.max_sync_epoch()) + } + fn is_empty(&self) -> bool { - self.instance_data.is_empty() && self.syncing_epochs.is_empty() + self.instance_data.is_empty() + && self.syncing_epochs.is_empty() + && self.unsync_epochs.is_empty() } } @@ -727,7 +758,7 @@ struct UnsyncData { instance_table_id: HashMap, // TODO: this is only used in spill to get existing epochs and can be removed // when we support spill not based on epoch - epochs: BTreeMap, + epochs: BTreeMap>, } impl UnsyncData { @@ -736,27 +767,15 @@ impl UnsyncData { table_id: TableId, instance_id: LocalInstanceId, init_epoch: HummockEpoch, - context: &UploaderContext, ) { debug!( table_id = table_id.table_id, instance_id, init_epoch, "init epoch" ); - let table_data = self.table_data.entry(table_id).or_insert_with(|| { - TableUnsyncData::new( - table_id, - context - .pinned_version - .version() - .state_table_info - .info() - .get(&table_id) - .map(|info| info.committed_epoch), - ) - }); - if let Some(max_prev_epoch) = table_data.max_sync_epoch() { - assert_gt!(init_epoch, max_prev_epoch); - } + let table_data = self + .table_data + .get_mut(&table_id) + .unwrap_or_else(|| panic!("should exist. {table_id:?}")); assert!(table_data .instance_data .insert( @@ -768,7 +787,7 @@ impl UnsyncData { .instance_table_id .insert(instance_id, table_id) .is_none()); - self.epochs.insert(init_epoch, ()); + assert!(table_data.unsync_epochs.contains_key(&init_epoch)); } fn instance_data( @@ -807,7 +826,20 @@ impl UnsyncData { .get_mut(&instance_id) .expect("should exist"); let epoch = instance_data.local_seal_epoch(next_epoch); - self.epochs.insert(next_epoch, ()); + // When drop/cancel a streaming job, for the barrier to stop actor, the + // local instance will call `local_seal_epoch`, but the `next_epoch` won't be + // called `start_epoch` because we have stopped writing on it. + if !table_data.unsync_epochs.contains_key(&next_epoch) { + if let Some(stopped_next_epoch) = table_data.stopped_next_epoch { + assert_eq!(stopped_next_epoch, next_epoch); + } else { + if let Some(max_epoch) = table_data.max_epoch() { + assert_gt!(next_epoch, max_epoch); + } + debug!(?table_id, epoch, next_epoch, "table data has stopped"); + table_data.stopped_next_epoch = Some(next_epoch); + } + } if let Some((direction, table_watermarks)) = opts.table_watermarks { table_data.add_table_watermarks(epoch, table_watermarks, direction); } @@ -838,20 +870,29 @@ impl UploaderData { sync_result_sender: oneshot::Sender>, ) { // clean old epochs - let _epochs = take_before_epoch(&mut self.unsync_data.epochs, epoch); + let epochs = take_before_epoch(&mut self.unsync_data.epochs, epoch); + if cfg!(debug_assertions) { + for epoch_table_ids in epochs.into_values() { + assert_eq!(epoch_table_ids, table_ids); + } + } let mut all_table_watermarks = HashMap::new(); let mut uploading_tasks = HashSet::new(); let mut spilled_tasks = BTreeSet::new(); let mut flush_payload = HashMap::new(); - let mut table_ids_to_ack = HashSet::new(); - for (table_id, table_data) in &mut self.unsync_data.table_data { + for (table_id, table_data) in &self.unsync_data.table_data { if !table_ids.contains(table_id) { table_data.assert_after_epoch(epoch); - continue; } - table_ids_to_ack.insert(*table_id); + } + for table_id in &table_ids { + let table_data = self + .unsync_data + .table_data + .get_mut(table_id) + .expect("should exist"); let (unflushed_payload, table_watermarks, task_ids) = table_data.sync(epoch); for (instance_id, payload) in unflushed_payload { if !payload.is_empty() { @@ -898,10 +939,7 @@ impl UploaderData { .map(|task_id| { let (sst, spill_table_ids) = self.spilled_data.remove(task_id).expect("should exist"); - assert!( - spill_table_ids.is_subset(&table_ids), - "spill_table_ids: {spill_table_ids:?}, table_ids: {table_ids:?}" - ); + assert_eq!(spill_table_ids, table_ids); sst }) .collect(); @@ -911,7 +949,6 @@ impl UploaderData { SyncingData { sync_epoch: epoch, table_ids, - table_ids_to_ack, remaining_uploading_tasks: uploading_tasks, uploaded, table_watermarks: all_table_watermarks, @@ -937,8 +974,6 @@ impl UnsyncData { struct SyncingData { sync_epoch: HummockEpoch, table_ids: HashSet, - /// Subset of `table_ids` that has existing instance - table_ids_to_ack: HashSet, remaining_uploading_tasks: HashSet, // newer data at the front uploaded: VecDeque>, @@ -1079,7 +1114,7 @@ impl HummockUploader { return; }; data.unsync_data - .init_instance(table_id, instance_id, init_epoch, &self.context); + .init_instance(table_id, instance_id, init_epoch); } pub(super) fn local_seal_epoch( @@ -1095,6 +1130,32 @@ impl HummockUploader { .local_seal_epoch(instance_id, next_epoch, opts); } + pub(super) fn start_epoch(&mut self, epoch: HummockEpoch, table_ids: HashSet) { + let UploaderState::Working(data) = &mut self.state else { + return; + }; + for table_id in &table_ids { + let table_data = data + .unsync_data + .table_data + .entry(*table_id) + .or_insert_with(|| { + TableUnsyncData::new( + *table_id, + self.context + .pinned_version + .version() + .state_table_info + .info() + .get(table_id) + .map(|info| info.committed_epoch), + ) + }); + table_data.new_epoch(epoch); + } + data.unsync_data.epochs.insert(epoch, table_ids); + } + pub(super) fn start_sync_epoch( &mut self, epoch: HummockEpoch, @@ -1150,7 +1211,7 @@ impl HummockUploader { if self.context.buffer_tracker.need_flush() { let mut curr_batch_flush_size = 0; // iterate from older epoch to newer epoch - for epoch in &mut data.unsync_data.epochs.keys() { + for (epoch, table_ids) in &data.unsync_data.epochs { if !self .context .buffer_tracker @@ -1160,7 +1221,12 @@ impl HummockUploader { } let mut spilled_table_ids = HashSet::new(); let mut payload = HashMap::new(); - for (table_id, table_data) in &mut data.unsync_data.table_data { + for table_id in table_ids { + let table_data = data + .unsync_data + .table_data + .get_mut(table_id) + .expect("should exist"); for (instance_id, instance_data) in &mut table_data.instance_data { let instance_payload = instance_data.spill(*epoch); if !instance_payload.is_empty() { @@ -1240,8 +1306,7 @@ impl UploaderData { let (_, syncing_data) = self.syncing_data.pop_first().expect("non-empty"); let SyncingData { sync_epoch, - table_ids: _table_ids, - table_ids_to_ack, + table_ids, remaining_uploading_tasks: _, uploaded, table_watermarks, @@ -1252,7 +1317,7 @@ impl UploaderData { .uploader_syncing_epoch_count .set(self.syncing_data.len() as _); - for table_id in table_ids_to_ack { + for table_id in table_ids { if let Some(table_data) = self.unsync_data.table_data.get_mut(&table_id) { table_data.ack_synced(sync_epoch); if table_data.is_empty() { @@ -1632,6 +1697,18 @@ pub(crate) mod tests { SealCurrentEpochOptions::for_test(), ); } + + fn start_epochs_for_test(&mut self, epochs: impl IntoIterator) { + let mut last_epoch = None; + for epoch in epochs { + last_epoch = Some(epoch); + self.start_epoch(epoch, HashSet::from_iter([TEST_TABLE_ID])); + } + self.start_epoch( + last_epoch.unwrap().next_epoch(), + HashSet::from_iter([TEST_TABLE_ID]), + ); + } } #[tokio::test] @@ -1709,6 +1786,7 @@ pub(crate) mod tests { async fn test_uploader_basic() { let mut uploader = test_uploader(dummy_success_upload_future); let epoch1 = INITIAL_EPOCH.next_epoch(); + uploader.start_epochs_for_test([epoch1]); let imm = gen_imm(epoch1).await; uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1); uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm.clone()); @@ -1771,6 +1849,7 @@ pub(crate) mod tests { let epoch1 = INITIAL_EPOCH.next_epoch(); let (sync_tx, sync_rx) = oneshot::channel(); + uploader.start_epochs_for_test([epoch1]); uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1); uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1); uploader.start_sync_epoch(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID])); @@ -1799,6 +1878,7 @@ pub(crate) mod tests { let mut uploader = test_uploader(dummy_success_upload_future); let epoch1 = INITIAL_EPOCH.next_epoch(); let epoch2 = epoch1.next_epoch(); + uploader.start_epochs_for_test([epoch1, epoch2]); let imm = gen_imm(epoch2).await; // epoch1 is empty while epoch2 is not. Going to seal empty epoch1. uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1); @@ -1851,6 +1931,7 @@ pub(crate) mod tests { let version4 = initial_pinned_version.new_pin_version(test_hummock_version(epoch4)); let version5 = initial_pinned_version.new_pin_version(test_hummock_version(epoch5)); + uploader.start_epochs_for_test([epoch6]); uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch6); uploader.update_pinned_version(version1); @@ -1980,6 +2061,9 @@ pub(crate) mod tests { let epoch1 = INITIAL_EPOCH.next_epoch(); let epoch2 = epoch1.next_epoch(); + let epoch3 = epoch2.next_epoch(); + let epoch4 = epoch3.next_epoch(); + uploader.start_epochs_for_test([epoch1, epoch2, epoch3, epoch4]); let memory_limiter = buffer_tracker.get_memory_limiter().clone(); let memory_limiter = Some(memory_limiter.deref()); @@ -2039,7 +2123,6 @@ pub(crate) mod tests { let (sync_tx1, mut sync_rx1) = oneshot::channel(); uploader.start_sync_epoch(epoch1, sync_tx1, HashSet::from_iter([TEST_TABLE_ID])); await_start1_4.await; - let epoch3 = epoch2.next_epoch(); uploader.local_seal_epoch_for_test(instance_id1, epoch2); uploader.local_seal_epoch_for_test(instance_id2, epoch2); @@ -2071,7 +2154,6 @@ pub(crate) mod tests { // sealed: uploaded sst([imm2]) // syncing: epoch1: uploading: [imm1_4], [imm1_3], uploaded: sst([imm1_2, imm1_1]) - let epoch4 = epoch3.next_epoch(); uploader.local_seal_epoch_for_test(instance_id1, epoch3); let imm4 = gen_imm_with_limiter(epoch4, memory_limiter).await; uploader.add_imm(instance_id1, imm4.clone()); @@ -2216,6 +2298,7 @@ pub(crate) mod tests { let epoch1 = INITIAL_EPOCH.next_epoch(); let epoch2 = epoch1.next_epoch(); + uploader.start_epochs_for_test([epoch1, epoch2]); let instance_id1 = 1; let instance_id2 = 2; let flush_threshold = buffer_tracker.flush_threshold(); diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index fcc80a1e54e1..511d9dd33814 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -499,6 +499,15 @@ impl HummockStorage { ) } + /// Declare the start of an epoch. This information is provided for spill so that the spill task won't + /// include data of two or more syncs. + // TODO: remove this method when we support spill task that can include data of more two or more syncs + pub fn start_epoch(&self, epoch: HummockEpoch, table_ids: HashSet) { + let _ = self + .hummock_event_sender + .send(HummockEvent::StartEpoch { epoch, table_ids }); + } + pub fn sstable_store(&self) -> SstableStoreRef { self.context.sstable_store.clone() } diff --git a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs index 9f9d2e3abe37..05d8ae73ba99 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs @@ -424,6 +424,7 @@ impl LogStoreFactory for KvLogStoreFactory { #[cfg(test)] mod tests { + use std::collections::HashSet; use std::future::{poll_fn, Future}; use std::iter::empty; use std::pin::pin; @@ -433,6 +434,7 @@ mod tests { use itertools::Itertools; use risingwave_common::array::StreamChunk; use risingwave_common::bitmap::{Bitmap, BitmapBuilder}; + use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::{EpochExt, EpochPair}; use risingwave_connector::sink::log_store::{ @@ -495,12 +497,18 @@ mod tests { .version() .max_committed_epoch .next_epoch(); + test_env + .storage + .start_epoch(epoch1, HashSet::from_iter([TableId::new(table.id)])); writer .init(EpochPair::new_test_epoch(epoch1), false) .await .unwrap(); writer.write_chunk(stream_chunk1.clone()).await.unwrap(); let epoch2 = epoch1.next_epoch(); + test_env + .storage + .start_epoch(epoch2, HashSet::from_iter([TableId::new(table.id)])); writer.flush_current_epoch(epoch2, false).await.unwrap(); writer.write_chunk(stream_chunk2.clone()).await.unwrap(); let epoch3 = epoch2.next_epoch(); @@ -596,12 +604,18 @@ mod tests { .version() .max_committed_epoch .next_epoch(); + test_env + .storage + .start_epoch(epoch1, HashSet::from_iter([TableId::new(table.id)])); writer .init(EpochPair::new_test_epoch(epoch1), false) .await .unwrap(); writer.write_chunk(stream_chunk1.clone()).await.unwrap(); let epoch2 = epoch1.next_epoch(); + test_env + .storage + .start_epoch(epoch2, HashSet::from_iter([TableId::new(table.id)])); writer.flush_current_epoch(epoch2, false).await.unwrap(); writer.write_chunk(stream_chunk2.clone()).await.unwrap(); let epoch3 = epoch2.next_epoch(); @@ -678,6 +692,9 @@ mod tests { pk_info, ); let (mut reader, mut writer) = factory.build().await; + test_env + .storage + .start_epoch(epoch3, HashSet::from_iter([TableId::new(table.id)])); writer .init(EpochPair::new_test_epoch(epoch3), false) .await @@ -776,6 +793,9 @@ mod tests { .version() .max_committed_epoch .next_epoch(); + test_env + .storage + .start_epoch(epoch1, HashSet::from_iter([TableId::new(table.id)])); writer .init(EpochPair::new_test_epoch(epoch1), false) .await @@ -783,6 +803,9 @@ mod tests { writer.write_chunk(stream_chunk1_1.clone()).await.unwrap(); writer.write_chunk(stream_chunk1_2.clone()).await.unwrap(); let epoch2 = epoch1.next_epoch(); + test_env + .storage + .start_epoch(epoch2, HashSet::from_iter([TableId::new(table.id)])); writer.flush_current_epoch(epoch2, true).await.unwrap(); writer.write_chunk(stream_chunk2.clone()).await.unwrap(); @@ -883,6 +906,9 @@ mod tests { ); let (mut reader, mut writer) = factory.build().await; + test_env + .storage + .start_epoch(epoch3, HashSet::from_iter([TableId::new(table.id)])); writer .init(EpochPair::new_test_epoch(epoch3), false) .await @@ -993,6 +1019,9 @@ mod tests { .version() .max_committed_epoch .next_epoch(); + test_env + .storage + .start_epoch(epoch1, HashSet::from_iter([TableId::new(table.id)])); writer1 .init(EpochPair::new_test_epoch(epoch1), false) .await @@ -1007,6 +1036,9 @@ mod tests { writer1.write_chunk(chunk1_1.clone()).await.unwrap(); writer2.write_chunk(chunk1_2.clone()).await.unwrap(); let epoch2 = epoch1.next_epoch(); + test_env + .storage + .start_epoch(epoch2, HashSet::from_iter([TableId::new(table.id)])); writer1.flush_current_epoch(epoch2, false).await.unwrap(); writer2.flush_current_epoch(epoch2, false).await.unwrap(); let [chunk2_1, chunk2_2] = gen_multi_vnode_stream_chunks::<2>(200, 100, pk_info); @@ -1108,6 +1140,9 @@ mod tests { pk_info, ); let (mut reader, mut writer) = factory.build().await; + test_env + .storage + .start_epoch(epoch3, HashSet::from_iter([TableId::new(table.id)])); writer .init(EpochPair::new(epoch3, epoch2), false) .await @@ -1177,6 +1212,9 @@ mod tests { .version() .max_committed_epoch .next_epoch(); + test_env + .storage + .start_epoch(epoch1, HashSet::from_iter([TableId::new(table.id)])); writer .init(EpochPair::new_test_epoch(epoch1), false) .await @@ -1314,15 +1352,24 @@ mod tests { .version() .max_committed_epoch .next_epoch(); + test_env + .storage + .start_epoch(epoch1, HashSet::from_iter([TableId::new(table.id)])); writer .init(EpochPair::new_test_epoch(epoch1), false) .await .unwrap(); writer.write_chunk(stream_chunk1.clone()).await.unwrap(); let epoch2 = epoch1.next_epoch(); + test_env + .storage + .start_epoch(epoch2, HashSet::from_iter([TableId::new(table.id)])); writer.flush_current_epoch(epoch2, true).await.unwrap(); writer.write_chunk(stream_chunk2.clone()).await.unwrap(); let epoch3 = epoch2.next_epoch(); + test_env + .storage + .start_epoch(epoch3, HashSet::from_iter([TableId::new(table.id)])); writer.flush_current_epoch(epoch3, true).await.unwrap(); writer.write_chunk(stream_chunk3.clone()).await.unwrap(); writer.flush_current_epoch(u64::MAX, true).await.unwrap(); @@ -1411,6 +1458,9 @@ mod tests { let (mut reader, mut writer) = factory.build().await; let epoch4 = epoch3.next_epoch(); + test_env + .storage + .start_epoch(epoch4, HashSet::from_iter([TableId::new(table.id)])); writer .init(EpochPair::new(epoch4, epoch3), false) .await @@ -1471,6 +1521,9 @@ mod tests { .unwrap(); writer.write_chunk(stream_chunk4.clone()).await.unwrap(); let epoch5 = epoch4 + 1; + test_env + .storage + .start_epoch(epoch5, HashSet::from_iter([TableId::new(table.id)])); writer.flush_current_epoch(epoch5, true).await.unwrap(); writer.write_chunk(stream_chunk5.clone()).await.unwrap(); @@ -1629,12 +1682,18 @@ mod tests { .version() .max_committed_epoch .next_epoch(); + test_env + .storage + .start_epoch(epoch1, HashSet::from_iter([TableId::new(table.id)])); writer .init(EpochPair::new_test_epoch(epoch1), false) .await .unwrap(); writer.write_chunk(stream_chunk1.clone()).await.unwrap(); let epoch2 = epoch1.next_epoch(); + test_env + .storage + .start_epoch(epoch2, HashSet::from_iter([TableId::new(table.id)])); writer.flush_current_epoch(epoch2, false).await.unwrap(); writer.write_chunk(stream_chunk2.clone()).await.unwrap(); let epoch3 = epoch2.next_epoch(); @@ -1693,6 +1752,9 @@ mod tests { pk_info, ); let (mut reader, mut writer) = factory.build().await; + test_env + .storage + .start_epoch(epoch3, HashSet::from_iter([TableId::new(table.id)])); writer .init(EpochPair::new_test_epoch(epoch3), false) .await @@ -1754,6 +1816,9 @@ mod tests { pk_info, ); let (mut reader, mut writer) = factory.build().await; + test_env + .storage + .start_epoch(epoch4, HashSet::from_iter([TableId::new(table.id)])); writer .init(EpochPair::new_test_epoch(epoch4), false) .await diff --git a/src/stream/src/common/table/test_state_table.rs b/src/stream/src/common/table/test_state_table.rs index 89944cdfc487..72ffa72479cf 100644 --- a/src/stream/src/common/table/test_state_table.rs +++ b/src/stream/src/common/table/test_state_table.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; use std::ops::Bound::{self, *}; use futures::{pin_mut, StreamExt}; @@ -61,6 +62,9 @@ async fn test_state_table_update_insert() { .await; let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); state_table.init_epoch(epoch); state_table.insert(OwnedRow::new(vec![ @@ -78,6 +82,9 @@ async fn test_state_table_update_insert() { ])); epoch.inc_for_test(); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); state_table.commit(epoch).await.unwrap(); state_table.delete(OwnedRow::new(vec![ @@ -134,6 +141,9 @@ async fn test_state_table_update_insert() { ); epoch.inc_for_test(); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); state_table.commit(epoch).await.unwrap(); let row6_commit = state_table @@ -171,6 +181,9 @@ async fn test_state_table_update_insert() { ])); epoch.inc_for_test(); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); state_table.commit(epoch).await.unwrap(); // one epoch: delete (1, 2, 3, 4), insert (5, 6, 7, None), delete(5, 6, 7, None) @@ -200,6 +213,9 @@ async fn test_state_table_update_insert() { assert_eq!(row1, None); epoch.inc_for_test(); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); state_table.commit(epoch).await.unwrap(); let row1_commit = state_table @@ -239,6 +255,9 @@ async fn test_state_table_iter_with_prefix() { .await; let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); state_table.init_epoch(epoch); state_table.insert(OwnedRow::new(vec![ @@ -265,6 +284,9 @@ async fn test_state_table_iter_with_prefix() { ])); epoch.inc_for_test(); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); state_table.commit(epoch).await.unwrap(); state_table.insert(OwnedRow::new(vec![ @@ -368,6 +390,9 @@ async fn test_state_table_iter_with_pk_range() { .await; let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); state_table.init_epoch(epoch); state_table.insert(OwnedRow::new(vec![ @@ -394,6 +419,9 @@ async fn test_state_table_iter_with_pk_range() { ])); epoch.inc_for_test(); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); state_table.commit(epoch).await.unwrap(); state_table.insert(OwnedRow::new(vec![ @@ -501,6 +529,9 @@ async fn test_mem_table_assertion() { StateTable::from_table_catalog(&table, test_env.storage.clone(), None).await; let epoch = EpochPair::new_test_epoch(test_epoch(1)); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); state_table.init_epoch(epoch); state_table.insert(OwnedRow::new(vec![ Some(1_i32.into()), @@ -544,6 +575,9 @@ async fn test_state_table_iter_with_value_indices() { .await; let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); state_table.init_epoch(epoch); state_table.insert(OwnedRow::new(vec![ @@ -599,6 +633,9 @@ async fn test_state_table_iter_with_value_indices() { } epoch.inc_for_test(); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); state_table.commit(epoch).await.unwrap(); // write [3, 33, 333], [4, 44, 444], [5, 55, 555], [7, 77, 777], [8, 88, 888]into mem_table, @@ -711,6 +748,9 @@ async fn test_state_table_iter_with_shuffle_value_indices() { .await; let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); state_table.init_epoch(epoch); state_table.insert(OwnedRow::new(vec![ @@ -787,6 +827,9 @@ async fn test_state_table_iter_with_shuffle_value_indices() { } epoch.inc_for_test(); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); state_table.commit(epoch).await.unwrap(); // write [3, 33, 333], [4, 44, 444], [5, 55, 555], [7, 77, 777], [8, 88, 888]into mem_table, @@ -952,6 +995,9 @@ async fn test_state_table_write_chunk() { .await; let epoch = EpochPair::new_test_epoch(test_epoch(1)); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); state_table.init_epoch(epoch); let chunk = StreamChunk::from_rows( @@ -1081,6 +1127,9 @@ async fn test_state_table_write_chunk_visibility() { .await; let epoch = EpochPair::new_test_epoch(test_epoch(1)); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); state_table.init_epoch(epoch); let chunk = StreamChunk::from_rows( @@ -1205,6 +1254,9 @@ async fn test_state_table_write_chunk_value_indices() { .await; let epoch = EpochPair::new_test_epoch(test_epoch(1)); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); state_table.init_epoch(epoch); let chunk = StreamChunk::from_rows( @@ -1299,6 +1351,9 @@ async fn test_state_table_watermark_cache_ignore_null() { WatermarkCacheStateTable::from_table_catalog(&table, test_env.storage.clone(), None).await; let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); state_table.init_epoch(epoch); let rows = vec![ @@ -1347,6 +1402,9 @@ async fn test_state_table_watermark_cache_ignore_null() { state_table.update_watermark(watermark, true); epoch.inc_for_test(); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); state_table.commit(epoch).await.unwrap(); let cache = state_table.get_watermark_cache(); @@ -1419,6 +1477,9 @@ async fn test_state_table_watermark_cache_write_chunk() { WatermarkCacheStateTable::from_table_catalog(&table, test_env.storage.clone(), None).await; let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); state_table.init_epoch(epoch); let cache = state_table.get_watermark_cache(); @@ -1428,6 +1489,9 @@ async fn test_state_table_watermark_cache_write_chunk() { state_table.update_watermark(watermark, true); epoch.inc_for_test(); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); state_table.commit(epoch).await.unwrap(); let inserts_1 = vec![ @@ -1537,6 +1601,9 @@ async fn test_state_table_watermark_cache_write_chunk() { state_table.update_watermark(watermark, true); epoch.inc_for_test(); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); state_table.commit(epoch).await.unwrap(); // After sync, we should scan all rows into watermark cache. @@ -1585,6 +1652,9 @@ async fn test_state_table_watermark_cache_refill() { WatermarkCacheStateTable::from_table_catalog(&table, test_env.storage.clone(), None).await; let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); state_table.init_epoch(epoch); let rows = vec![ @@ -1634,6 +1704,9 @@ async fn test_state_table_watermark_cache_refill() { state_table.update_watermark(watermark, true); epoch.inc_for_test(); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); state_table.commit(epoch).await.unwrap(); // After the first barrier, watermark cache won't be filled. @@ -1675,6 +1748,9 @@ async fn test_state_table_iter_prefix_and_sub_range() { StateTable::from_table_catalog_inconsistent_op(&table, test_env.storage.clone(), None) .await; let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); state_table.init_epoch(epoch); state_table.insert(OwnedRow::new(vec![ @@ -1700,6 +1776,9 @@ async fn test_state_table_iter_prefix_and_sub_range() { ])); epoch.inc_for_test(); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); state_table.commit(epoch).await.unwrap(); let pk_prefix = OwnedRow::new(vec![Some(1_i32.into())]); @@ -1870,6 +1949,9 @@ async fn test_replicated_state_table_replication() { .await; let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); state_table.init_epoch(epoch); replicated_state_table.init_epoch(epoch).await.unwrap(); @@ -1881,6 +1963,12 @@ async fn test_replicated_state_table_replication() { ])); epoch.inc_for_test(); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); state_table.commit(epoch).await.unwrap(); replicated_state_table.commit(epoch).await.unwrap(); test_env.commit_epoch(epoch.prev).await; @@ -1941,6 +2029,12 @@ async fn test_replicated_state_table_replication() { replicated_state_table.write_chunk(replicate_chunk); epoch.inc_for_test(); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); state_table.commit(epoch).await.unwrap(); replicated_state_table.commit(epoch).await.unwrap(); diff --git a/src/stream/src/common/table/test_storage_table.rs b/src/stream/src/common/table/test_storage_table.rs index 098632192d02..1f130330e3be 100644 --- a/src/stream/src/common/table/test_storage_table.rs +++ b/src/stream/src/common/table/test_storage_table.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; + use futures::{pin_mut, StreamExt}; use itertools::Itertools; use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId}; @@ -77,6 +79,9 @@ async fn test_storage_table_value_indices() { value_indices.into_iter().map(|v| v as usize).collect_vec(), ); let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); state.init_epoch(epoch); state.insert(OwnedRow::new(vec![ @@ -110,6 +115,9 @@ async fn test_storage_table_value_indices() { ])); epoch.inc_for_test(); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); state.commit(epoch).await.unwrap(); test_env.commit_epoch(epoch.prev).await; @@ -197,6 +205,9 @@ async fn test_shuffled_column_id_for_storage_table_get_row() { .await; let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); state.init_epoch(epoch); let table = StorageTable::for_test( @@ -223,6 +234,9 @@ async fn test_shuffled_column_id_for_storage_table_get_row() { ])); epoch.inc_for_test(); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); state.commit(epoch).await.unwrap(); test_env.commit_epoch(epoch.prev).await; @@ -310,6 +324,9 @@ async fn test_row_based_storage_table_point_get_in_batch_mode() { value_indices, ); let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); state.init_epoch(epoch); state.insert(OwnedRow::new(vec![Some(1_i32.into()), None, None])); @@ -326,6 +343,9 @@ async fn test_row_based_storage_table_point_get_in_batch_mode() { Some(222_i32.into()), ])); epoch.inc_for_test(); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); state.commit(epoch).await.unwrap(); test_env.commit_epoch(epoch.prev).await; @@ -415,6 +435,9 @@ async fn test_batch_scan_with_value_indices() { value_indices, ); let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); state.init_epoch(epoch); state.insert(OwnedRow::new(vec![ @@ -437,6 +460,9 @@ async fn test_batch_scan_with_value_indices() { ])); epoch.inc_for_test(); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); state.commit(epoch).await.unwrap(); test_env.commit_epoch(epoch.prev).await; @@ -513,6 +539,9 @@ async fn test_batch_scan_chunk_with_value_indices() { value_indices.clone(), ); let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); state.init_epoch(epoch); let gen_row = |i: i32, is_update: bool| { @@ -554,6 +583,12 @@ async fn test_batch_scan_chunk_with_value_indices() { .collect_vec(); epoch.inc_for_test(); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); state.commit(epoch).await.unwrap(); test_env.commit_epoch(epoch.prev).await; diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index bc33f434bf22..a1108d9e4627 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -701,9 +701,6 @@ impl LocalBarrierWorker { to_collect ); - // There must be some actors to collect from. - assert!(!to_collect.is_empty()); - for actor_id in &to_collect { if let Some(e) = self.failure_actors.get(actor_id) { // The failure actors could exit before the barrier is issued, while their diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index 40b47ee26e8f..f4a3fb31c03c 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -27,6 +27,7 @@ use futures::{FutureExt, StreamExt, TryFutureExt}; use prometheus::HistogramTimer; use risingwave_common::catalog::TableId; use risingwave_common::must_match; +use risingwave_common::util::epoch::EpochPair; use risingwave_hummock_sdk::SyncResult; use risingwave_pb::stream_plan::barrier::BarrierKind; use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress; @@ -49,7 +50,8 @@ struct IssuedState { pub barrier_inflight_latency: HistogramTimer, - pub table_ids: HashSet, + /// Only be `Some(_)` when `kind` is `Checkpoint` + pub table_ids: Option>, pub kind: BarrierKind, } @@ -202,6 +204,8 @@ pub(super) struct ManagedBarrierState { mutation_subscribers: HashMap, + prev_barrier_table_ids: Option<(EpochPair, HashSet)>, + /// Record the progress updates of creating mviews for each epoch of concurrent checkpoints. pub(super) create_mview_progress: HashMap>, @@ -235,6 +239,7 @@ impl ManagedBarrierState { Self { epoch_barrier_state_map: BTreeMap::default(), mutation_subscribers: Default::default(), + prev_barrier_table_ids: None, create_mview_progress: Default::default(), state_store, streaming_metrics, @@ -403,7 +408,7 @@ impl ManagedBarrierState { state_store, &self.streaming_metrics, prev_epoch, - table_ids, + table_ids.expect("should be Some on BarrierKind::Checkpoint"), )) }) } @@ -527,6 +532,50 @@ impl ManagedBarrierState { .streaming_metrics .barrier_inflight_latency .start_timer(); + + if let Some(hummock) = self.state_store.as_hummock() { + hummock.start_epoch(barrier.epoch.curr, table_ids.clone()); + } + + let table_ids = match barrier.kind { + BarrierKind::Unspecified => { + unreachable!() + } + BarrierKind::Initial => { + assert!( + self.prev_barrier_table_ids.is_none(), + "non empty table_ids at initial barrier: {:?}", + self.prev_barrier_table_ids + ); + info!(epoch = ?barrier.epoch, "initialize at Initial barrier"); + self.prev_barrier_table_ids = Some((barrier.epoch, table_ids)); + None + } + BarrierKind::Barrier => { + if let Some((prev_epoch, prev_table_ids)) = self.prev_barrier_table_ids.as_mut() { + assert_eq!(prev_epoch.curr, barrier.epoch.prev); + assert_eq!(prev_table_ids, &table_ids); + *prev_epoch = barrier.epoch; + } else { + info!(epoch = ?barrier.epoch, "initialize at non-checkpoint barrier"); + self.prev_barrier_table_ids = Some((barrier.epoch, table_ids)); + } + None + } + BarrierKind::Checkpoint => Some( + if let Some((prev_epoch, prev_table_ids)) = self + .prev_barrier_table_ids + .replace((barrier.epoch, table_ids)) + { + assert_eq!(prev_epoch.curr, barrier.epoch.prev); + prev_table_ids + } else { + info!(epoch = ?barrier.epoch, "initialize at Checkpoint barrier"); + HashSet::new() + }, + ), + }; + if let Some(BarrierState { ref inner, .. }) = self.epoch_barrier_state_map.get_mut(&barrier.epoch.prev) {