From 1b359b0cb7320988ed5d61d40fc560f9bb1897db Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 9 Oct 2024 01:59:14 +0800 Subject: [PATCH 01/11] refactor --- proto/hummock.proto | 1 - src/meta/src/barrier/mod.rs | 30 ++++--- src/meta/src/hummock/manager/commit_epoch.rs | 82 +++++++++++-------- src/meta/src/hummock/manager/context.rs | 10 +-- src/meta/src/hummock/manager/time_travel.rs | 5 +- src/meta/src/hummock/manager/transaction.rs | 24 +++--- .../src/hummock/mock_hummock_meta_client.rs | 13 ++- src/storage/hummock_sdk/src/compact_task.rs | 3 - .../hummock_test/src/hummock_storage_tests.rs | 36 +++----- .../hummock_test/src/state_store_tests.rs | 9 +- src/storage/src/hummock/validator.rs | 14 +--- 11 files changed, 110 insertions(+), 117 deletions(-) diff --git a/proto/hummock.proto b/proto/hummock.proto index 0fab515e546e6..28bc5294725c5 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -525,7 +525,6 @@ message ReportCompactionTaskResponse { message ValidationTask { repeated SstableInfo sst_infos = 1; map sst_id_to_worker_id = 2; - uint64 epoch = 3; } // Delete SSTs in object store diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 463de3f6febe4..15d6600f68033 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -1166,20 +1166,23 @@ impl GlobalBarrierManagerContext { let (sst_to_context, sstables, new_table_watermarks, old_value_sst) = collect_resp_info(resps); assert!(old_value_sst.is_empty()); - let new_table_fragment_info = if is_first_time { - NewTableFragmentInfo::NewCompactionGroup { + let new_table_fragment_infos = if is_first_time { + vec![NewTableFragmentInfo::NewCompactionGroup { table_ids: tables_to_commit.clone(), - } + }] } else { - NewTableFragmentInfo::None + vec![] }; + let tables_to_commit = tables_to_commit + .into_iter() + .map(|table_id| (table_id, epoch)) + .collect(); let info = CommitEpochInfo { sstables, new_table_watermarks, sst_to_context, - new_table_fragment_info, + new_table_fragment_infos, change_log_delta: Default::default(), - committed_epoch: epoch, tables_to_commit, }; self.hummock_manager.commit_epoch(info).await?; @@ -1712,21 +1715,21 @@ fn collect_commit_epoch_info( let (sst_to_context, synced_ssts, new_table_watermarks, old_value_ssts) = collect_resp_info(resps); - let new_table_fragment_info = if let Command::CreateStreamingJob { info, job_type } = + let new_table_fragment_infos = if let Command::CreateStreamingJob { info, job_type } = &command_ctx.command && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)) { let table_fragments = &info.table_fragments; - NewTableFragmentInfo::Normal { + vec![NewTableFragmentInfo::Normal { mv_table_id: table_fragments.mv_table_id().map(TableId::new), internal_table_ids: table_fragments .internal_table_ids() .into_iter() .map(TableId::new) .collect(), - } + }] } else { - NewTableFragmentInfo::None + vec![] }; let mut mv_log_store_truncate_epoch = HashMap::new(); @@ -1767,14 +1770,17 @@ fn collect_commit_epoch_info( ); let epoch = command_ctx.prev_epoch.value().0; + let tables_to_commit = tables_to_commit + .into_iter() + .map(|table_id| (table_id, epoch)) + .collect(); CommitEpochInfo { sstables: synced_ssts, new_table_watermarks, sst_to_context, - new_table_fragment_info, + new_table_fragment_infos, change_log_delta: table_new_change_log, - committed_epoch: epoch, tables_to_commit, } } diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index e444f2c263bb3..3da93d61ab70b 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::{BTreeMap, HashMap, HashSet}; +use std::sync::Arc; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::change_log::ChangeLogDelta; @@ -28,6 +29,7 @@ use risingwave_hummock_sdk::{ CompactionGroupId, HummockContextId, HummockSstableObjectId, LocalSstableInfo, }; use risingwave_pb::hummock::compact_task::{self}; +use risingwave_pb::hummock::CompactionConfig; use sea_orm::TransactionTrait; use crate::hummock::error::{Error, Result}; @@ -47,7 +49,6 @@ use crate::hummock::{ }; pub enum NewTableFragmentInfo { - None, Normal { mv_table_id: Option, internal_table_ids: Vec, @@ -61,10 +62,10 @@ pub struct CommitEpochInfo { pub sstables: Vec, pub new_table_watermarks: HashMap, pub sst_to_context: HashMap, - pub new_table_fragment_info: NewTableFragmentInfo, + pub new_table_fragment_infos: Vec, pub change_log_delta: HashMap, - pub committed_epoch: u64, - pub tables_to_commit: HashSet, + /// `table_id` -> `committed_epoch` + pub tables_to_commit: HashMap, } impl HummockManager { @@ -75,9 +76,8 @@ impl HummockManager { mut sstables, new_table_watermarks, sst_to_context, - new_table_fragment_info, + new_table_fragment_infos, change_log_delta, - committed_epoch, tables_to_commit, } = commit_info; let mut versioning_guard = self.versioning.write().await; @@ -91,7 +91,6 @@ impl HummockManager { let versioning: &mut Versioning = &mut versioning_guard; self.commit_epoch_sanity_check( - committed_epoch, &tables_to_commit, &sstables, &sst_to_context, @@ -124,15 +123,18 @@ impl HummockManager { let state_table_info = &version.latest_version().state_table_info; let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id(); + let mut new_table_ids = HashMap::new(); + let mut new_compaction_groups = HashMap::new(); + let mut compaction_group_manager_txn = None; + let mut compaction_group_config: Option> = None; // Add new table - let (new_table_ids, new_compaction_group, compaction_group_manager_txn) = + for new_table_fragment_info in new_table_fragment_infos { match new_table_fragment_info { NewTableFragmentInfo::Normal { mv_table_id, internal_table_ids, } => { - let mut new_table_ids = HashMap::new(); on_handle_add_new_table( state_table_info, &internal_table_ids, @@ -148,24 +150,40 @@ impl HummockManager { &mut table_compaction_group_mapping, &mut new_table_ids, )?; - (new_table_ids, None, None) } NewTableFragmentInfo::NewCompactionGroup { table_ids } => { - let compaction_group_manager_guard = - self.compaction_group_manager.write().await; - let compaction_group_config = - compaction_group_manager_guard.default_compaction_config(); - let mut compaction_group_manager = - CompactionGroupManager::start_owned_compaction_groups_txn( - compaction_group_manager_guard, - ); - let mut new_table_ids = HashMap::new(); + let (compaction_group_manager, compaction_group_config) = + if let Some(compaction_group_manager) = &mut compaction_group_manager_txn { + ( + compaction_group_manager, + (*compaction_group_config + .as_ref() + .expect("must be set with compaction_group_manager_txn")) + .clone(), + ) + } else { + let compaction_group_manager_guard = + self.compaction_group_manager.write().await; + let new_compaction_group_config = + compaction_group_manager_guard.default_compaction_config(); + compaction_group_config = Some(new_compaction_group_config.clone()); + ( + compaction_group_manager_txn.insert( + CompactionGroupManager::start_owned_compaction_groups_txn( + compaction_group_manager_guard, + ), + ), + new_compaction_group_config, + ) + }; let new_compaction_group_id = next_compaction_group_id(&self.env).await?; + new_compaction_groups + .insert(new_compaction_group_id, compaction_group_config.clone()); compaction_group_manager.insert( new_compaction_group_id, CompactionGroup { group_id: new_compaction_group_id, - compaction_config: compaction_group_config.clone(), + compaction_config: compaction_group_config, }, ); @@ -176,14 +194,9 @@ impl HummockManager { &mut table_compaction_group_mapping, &mut new_table_ids, )?; - ( - new_table_ids, - Some((new_compaction_group_id, (*compaction_group_config).clone())), - Some(compaction_group_manager), - ) } - NewTableFragmentInfo::None => (HashMap::new(), None, None), - }; + } + } let commit_sstables = self .correct_commit_ssts(sstables, &table_compaction_group_mapping) @@ -192,9 +205,8 @@ impl HummockManager { let modified_compaction_groups: Vec<_> = commit_sstables.keys().cloned().collect(); let time_travel_delta = version.pre_commit_epoch( - committed_epoch, &tables_to_commit, - new_compaction_group, + new_compaction_groups, commit_sstables, &new_table_ids, new_table_watermarks, @@ -253,9 +265,14 @@ impl HummockManager { .values() .map(|g| (g.group_id, g.parent_group_id)) .collect(); - let time_travel_tables_to_commit = table_compaction_group_mapping - .iter() - .filter(|(table_id, _)| tables_to_commit.contains(table_id)); + let time_travel_tables_to_commit = + table_compaction_group_mapping + .iter() + .filter_map(|(table_id, cg_id)| { + tables_to_commit + .get(table_id) + .map(|committed_epoch| (table_id, cg_id, *committed_epoch)) + }); let mut txn = sql_store.conn.begin().await?; let version_snapshot_sst_ids = self .write_time_travel_metadata( @@ -265,7 +282,6 @@ impl HummockManager { &group_parents, &versioning.last_time_travel_snapshot_sst_ids, time_travel_tables_to_commit, - committed_epoch, ) .await?; commit_multi_var_with_provided_txn!( diff --git a/src/meta/src/hummock/manager/context.rs b/src/meta/src/hummock/manager/context.rs index b42dd9e54d0ed..257613c9926be 100644 --- a/src/meta/src/hummock/manager/context.rs +++ b/src/meta/src/hummock/manager/context.rs @@ -19,7 +19,7 @@ use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{ - HummockContextId, HummockEpoch, HummockSstableObjectId, HummockVersionId, LocalSstableInfo, + HummockContextId, HummockSstableObjectId, HummockVersionId, LocalSstableInfo, INVALID_VERSION_ID, }; use risingwave_pb::hummock::{HummockPinnedVersion, ValidationTask}; @@ -188,8 +188,7 @@ impl HummockManager { pub async fn commit_epoch_sanity_check( &self, - committed_epoch: HummockEpoch, - tables_to_commit: &HashSet, + tables_to_commit: &HashMap, sstables: &[LocalSstableInfo], sst_to_context: &HashMap, current_version: &HummockVersion, @@ -215,9 +214,9 @@ impl HummockManager { } // sanity check on monotonically increasing table committed epoch - for table_id in tables_to_commit { + for (table_id, committed_epoch) in tables_to_commit { if let Some(info) = current_version.state_table_info.info().get(table_id) { - if committed_epoch <= info.committed_epoch { + if *committed_epoch <= info.committed_epoch { return Err(anyhow::anyhow!( "table {} Epoch {} <= committed_epoch {}", table_id, @@ -264,7 +263,6 @@ impl HummockManager { .send_event(ResponseEvent::ValidationTask(ValidationTask { sst_infos: sst_infos.into_iter().map(|sst| sst.into()).collect_vec(), sst_id_to_worker_id: sst_to_context.clone(), - epoch: committed_epoch, })) .is_err() { diff --git a/src/meta/src/hummock/manager/time_travel.rs b/src/meta/src/hummock/manager/time_travel.rs index 044d8503fc94e..42037e65f7423 100644 --- a/src/meta/src/hummock/manager/time_travel.rs +++ b/src/meta/src/hummock/manager/time_travel.rs @@ -377,8 +377,7 @@ impl HummockManager { delta: HummockVersionDelta, group_parents: &HashMap, skip_sst_ids: &HashSet, - tables_to_commit: impl Iterator, - committed_epoch: u64, + tables_to_commit: impl Iterator, ) -> Result>> { let select_groups = group_parents .iter() @@ -415,7 +414,7 @@ impl HummockManager { Ok(count) } - for (table_id, cg_id) in tables_to_commit { + for (table_id, cg_id, committed_epoch) in tables_to_commit { if !select_groups.contains(cg_id) { continue; } diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index b8e9335a161b6..87ec960bca1d1 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -12,10 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap}; use std::ops::{Deref, DerefMut}; +use std::sync::Arc; use risingwave_common::catalog::TableId; +use risingwave_common::util::epoch::Epoch; use risingwave_hummock_sdk::change_log::ChangeLogDelta; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::sstable_info::SstableInfo; @@ -23,9 +25,7 @@ use risingwave_hummock_sdk::table_watermark::TableWatermarks; use risingwave_hummock_sdk::version::{ GroupDelta, HummockVersion, HummockVersionDelta, IntraLevelDelta, }; -use risingwave_hummock_sdk::{ - CompactionGroupId, FrontendHummockVersionDelta, HummockEpoch, HummockVersionId, -}; +use risingwave_hummock_sdk::{CompactionGroupId, FrontendHummockVersionDelta, HummockVersionId}; use risingwave_pb::hummock::{ CompactionConfig, CompatibilityVersion, GroupConstruct, HummockVersionDeltas, HummockVersionStats, StateTableInfoDelta, @@ -113,9 +113,8 @@ impl<'a> HummockVersionTransaction<'a> { /// Returns a duplicate delta, used by time travel. pub(super) fn pre_commit_epoch( &mut self, - committed_epoch: HummockEpoch, - tables_to_commit: &HashSet, - new_compaction_group: Option<(CompactionGroupId, CompactionConfig)>, + tables_to_commit: &HashMap, + new_compaction_groups: HashMap>, commit_sstables: BTreeMap>, new_table_ids: &HashMap, new_table_watermarks: HashMap, @@ -125,7 +124,7 @@ impl<'a> HummockVersionTransaction<'a> { new_version_delta.new_table_watermarks = new_table_watermarks; new_version_delta.change_log_delta = change_log_delta; - if let Some((compaction_group_id, compaction_group_config)) = new_compaction_group { + for (compaction_group_id, compaction_group_config) in new_compaction_groups { { let group_deltas = &mut new_version_delta .group_deltas @@ -135,7 +134,7 @@ impl<'a> HummockVersionTransaction<'a> { #[expect(deprecated)] group_deltas.push(GroupDelta::GroupConstruct(GroupConstruct { - group_config: Some(compaction_group_config.clone()), + group_config: Some((*compaction_group_config).clone()), group_id: compaction_group_id, parent_group_id: StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId, @@ -160,7 +159,7 @@ impl<'a> HummockVersionTransaction<'a> { .last() .map(|level| level.sub_level_id + 1) }) - .unwrap_or(committed_epoch); + .unwrap_or(Epoch::now().0); let group_deltas = &mut new_version_delta .group_deltas .entry(compaction_group_id) @@ -185,6 +184,7 @@ impl<'a> HummockVersionTransaction<'a> { "newly added table exists previously: {:?}", table_id ); + let committed_epoch = *tables_to_commit.get(table_id).expect("newly added table must exist in tables_to_commit"); delta.state_table_info_delta.insert( *table_id, StateTableInfoDelta { @@ -194,7 +194,7 @@ impl<'a> HummockVersionTransaction<'a> { ); } - for table_id in tables_to_commit { + for (table_id, committed_epoch) in tables_to_commit { if new_table_ids.contains_key(table_id) { continue; } @@ -206,7 +206,7 @@ impl<'a> HummockVersionTransaction<'a> { .insert( *table_id, StateTableInfoDelta { - committed_epoch, + committed_epoch: *committed_epoch, compaction_group_id: info.compaction_group_id, } ) diff --git a/src/meta/src/hummock/mock_hummock_meta_client.rs b/src/meta/src/hummock/mock_hummock_meta_client.rs index ba54bc64969ad..c045a46dd1d95 100644 --- a/src/meta/src/hummock/mock_hummock_meta_client.rs +++ b/src/meta/src/hummock/mock_hummock_meta_client.rs @@ -171,20 +171,20 @@ impl HummockMetaClient for MockHummockMetaClient { .chain(table_ids.iter().cloned()) .collect::>(); - let new_table_fragment_info = if commit_table_ids + let new_table_fragment_infos = if commit_table_ids .iter() .all(|table_id| table_ids.contains(table_id)) { - NewTableFragmentInfo::None + vec![] } else { - NewTableFragmentInfo::Normal { + vec![NewTableFragmentInfo::Normal { mv_table_id: None, internal_table_ids: commit_table_ids .iter() .cloned() .map(TableId::from) .collect_vec(), - } + }] }; let sst_to_context = sync_result @@ -215,13 +215,12 @@ impl HummockMetaClient for MockHummockMetaClient { sstables: sync_result.uncommitted_ssts, new_table_watermarks: new_table_watermark, sst_to_context, - new_table_fragment_info, + new_table_fragment_infos, change_log_delta: table_change_log, - committed_epoch: epoch, tables_to_commit: commit_table_ids .iter() .cloned() - .map(TableId::from) + .map(|table_id| (TableId::new(table_id), epoch)) .collect(), }) .await diff --git a/src/storage/hummock_sdk/src/compact_task.rs b/src/storage/hummock_sdk/src/compact_task.rs index ff76e3a70dde8..126aced6e9afe 100644 --- a/src/storage/hummock_sdk/src/compact_task.rs +++ b/src/storage/hummock_sdk/src/compact_task.rs @@ -325,7 +325,6 @@ impl From<&CompactTask> for PbCompactTask { pub struct ValidationTask { pub sst_infos: Vec, pub sst_id_to_worker_id: HashMap, - pub epoch: u64, } impl From for ValidationTask { @@ -337,7 +336,6 @@ impl From for ValidationTask { .map(SstableInfo::from) .collect_vec(), sst_id_to_worker_id: pb_validation_task.sst_id_to_worker_id.clone(), - epoch: pb_validation_task.epoch, } } } @@ -351,7 +349,6 @@ impl From for PbValidationTask { .map(|sst| sst.into()) .collect_vec(), sst_id_to_worker_id: validation_task.sst_id_to_worker_id.clone(), - epoch: validation_task.epoch, } } } diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index 7c70721f04d82..4e6ab26a539c6 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -2585,9 +2585,12 @@ async fn test_commit_multi_epoch() { let initial_epoch = INVALID_EPOCH; let commit_epoch = - |epoch, sst: SstableInfo, new_table_fragment_info, tables_to_commit: &[TableId]| { + |epoch, sst: SstableInfo, new_table_fragment_infos, tables_to_commit: &[TableId]| { let manager = &test_env.manager; - let tables_to_commit = tables_to_commit.iter().cloned().collect(); + let tables_to_commit = tables_to_commit + .iter() + .map(|table_id| (*table_id, epoch)) + .collect(); async move { manager .commit_epoch(CommitEpochInfo { @@ -2610,9 +2613,8 @@ async fn test_commit_multi_epoch() { sst_info: sst, created_at: u64::MAX, }], - new_table_fragment_info, + new_table_fragment_infos, change_log_delta: Default::default(), - committed_epoch: epoch, tables_to_commit, }) .await @@ -2633,10 +2635,10 @@ async fn test_commit_multi_epoch() { commit_epoch( epoch1, sst1_epoch1.clone(), - NewTableFragmentInfo::Normal { + vec![NewTableFragmentInfo::Normal { mv_table_id: None, internal_table_ids: vec![existing_table_id], - }, + }], &[existing_table_id], ) .await; @@ -2678,13 +2680,7 @@ async fn test_commit_multi_epoch() { let epoch2 = epoch1.next_epoch(); - commit_epoch( - epoch2, - sst1_epoch2.clone(), - NewTableFragmentInfo::None, - &[existing_table_id], - ) - .await; + commit_epoch(epoch2, sst1_epoch2.clone(), vec![], &[existing_table_id]).await; { let version = test_env.manager.get_current_version().await; @@ -2727,9 +2723,9 @@ async fn test_commit_multi_epoch() { commit_epoch( epoch1, sst2_epoch1.clone(), - NewTableFragmentInfo::NewCompactionGroup { + vec![NewTableFragmentInfo::NewCompactionGroup { table_ids: HashSet::from_iter([new_table_id]), - }, + }], &[new_table_id], ) .await; @@ -2764,13 +2760,7 @@ async fn test_commit_multi_epoch() { ..Default::default() }; - commit_epoch( - epoch2, - sst2_epoch2.clone(), - NewTableFragmentInfo::None, - &[new_table_id], - ) - .await; + commit_epoch(epoch2, sst2_epoch2.clone(), vec![], &[new_table_id]).await; { let version = test_env.manager.get_current_version().await; @@ -2804,7 +2794,7 @@ async fn test_commit_multi_epoch() { commit_epoch( epoch3, sst_epoch3.clone(), - NewTableFragmentInfo::None, + vec![], &[existing_table_id, new_table_id], ) .await; diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index 7b2c5b5e60649..0c511dbfb2068 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::ops::Bound; use std::ops::Bound::{Excluded, Included, Unbounded}; use std::sync::Arc; @@ -31,7 +31,7 @@ use risingwave_hummock_sdk::{ HummockReadEpoch, HummockSstableObjectId, LocalSstableInfo, SyncResult, }; use risingwave_meta::hummock::test_utils::setup_compute_env; -use risingwave_meta::hummock::{CommitEpochInfo, NewTableFragmentInfo}; +use risingwave_meta::hummock::CommitEpochInfo; use risingwave_rpc_client::HummockMetaClient; use risingwave_storage::hummock::iterator::change_log::test_utils::{ apply_test_log_data, gen_test_data, @@ -1407,10 +1407,9 @@ async fn test_replicated_local_hummock_storage() { sstables: vec![], new_table_watermarks: Default::default(), sst_to_context: Default::default(), - new_table_fragment_info: NewTableFragmentInfo::None, + new_table_fragment_infos: vec![], change_log_delta: Default::default(), - committed_epoch: epoch0, - tables_to_commit: HashSet::from_iter([TEST_TABLE_ID]), + tables_to_commit: HashMap::from_iter([(TEST_TABLE_ID, epoch0)]), }) .await .unwrap(); diff --git a/src/storage/src/hummock/validator.rs b/src/storage/src/hummock/validator.rs index cc95b7089b664..2c0efbb3ca934 100644 --- a/src/storage/src/hummock/validator.rs +++ b/src/storage/src/hummock/validator.rs @@ -38,12 +38,7 @@ pub async fn validate_ssts(task: ValidationTask, sstable_store: SstableStoreRef) .sst_id_to_worker_id .get(&sst.object_id) .expect("valid worker_id"); - tracing::debug!( - "Validating SST {} from worker {}, epoch {}", - sst.object_id, - worker_id, - task.epoch - ); + tracing::debug!("Validating SST {} from worker {}", sst.object_id, worker_id,); let holder = match sstable_store.sstable(&sst, unused.borrow_mut()).await { Ok(holder) => holder, Err(_err) => { @@ -100,12 +95,7 @@ pub async fn validate_ssts(task: ValidationTask, sstable_store: SstableStoreRef) break; } } - tracing::debug!( - "Validated {} keys for SST {}, epoch {}", - key_counts, - sst.object_id, - task.epoch - ); + tracing::debug!("Validated {} keys for SST {}", key_counts, sst.object_id,); iter.collect_local_statistic(&mut unused); unused.ignore(); } From 07a33c3bec959b9cd9beea752a2050a8ef45b28f Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 9 Oct 2024 10:58:20 +0800 Subject: [PATCH 02/11] refine --- src/meta/src/barrier/mod.rs | 104 ++++++++++++++---------------------- 1 file changed, 39 insertions(+), 65 deletions(-) diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 15d6600f68033..2bbc4083299d4 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -484,11 +484,12 @@ impl CheckpointControl { let finished_jobs = self .create_mview_tracker .apply_collected_command(&node, &self.hummock_version_stats); - if let Err(e) = self - .context - .clone() - .complete_barrier(node, finished_jobs, HashMap::new()) - .await + if node.command_ctx.kind.is_checkpoint() + && let Err(e) = self + .context + .clone() + .complete_barrier(node, finished_jobs, HashMap::new()) + .await { error!( prev_epoch, @@ -568,7 +569,7 @@ enum CompletingCommand { // The join handle of a spawned task that completes the barrier. // The return value indicate whether there is some create streaming job command // that has finished but not checkpointed. If there is any, we will force checkpoint on the next barrier - join_handle: JoinHandle>>, + join_handle: JoinHandle>, }, CreatingStreamingJob { table_id: TableId, @@ -1194,7 +1195,7 @@ impl GlobalBarrierManagerContext { node: EpochNode, mut finished_jobs: Vec, backfill_pinned_log_epoch: HashMap)>, - ) -> MetaResult> { + ) -> MetaResult { tracing::trace!( prev_epoch = node.command_ctx.prev_epoch.value().0, kind = ?node.command_ctx.kind, @@ -1207,12 +1208,10 @@ impl GlobalBarrierManagerContext { state, .. } = node; + let epochs = must_match!(&command_ctx.kind, BarrierKind::Checkpoint(epochs) => epochs); assert!(state.node_to_collect.is_empty()); assert!(state.creating_jobs_to_wait.is_empty()); let wait_commit_timer = self.metrics.barrier_wait_commit_latency.start_timer(); - if !state.finished_table_ids.is_empty() { - assert!(command_ctx.kind.is_checkpoint()); - } finished_jobs.extend(state.finished_table_ids.into_values().map(|info| { TrackingJob::New(TrackingCommand { info, @@ -1220,14 +1219,18 @@ impl GlobalBarrierManagerContext { }) })); - let result = self - .update_snapshot( - &command_ctx, - state.table_ids_to_commit, + let result: MetaResult = try { + let commit_info = collect_commit_epoch_info( state.resps, + &command_ctx, + epochs, backfill_pinned_log_epoch, - ) - .await; + state.table_ids_to_commit, + ); + self.hummock_manager.commit_epoch(commit_info).await?; + command_ctx.post_collect().await?; + self.hummock_manager.get_version_stats().await + }; let version_stats = match result { Ok(version_stats) => version_stats, @@ -1255,45 +1258,6 @@ impl GlobalBarrierManagerContext { Ok(version_stats) } - async fn update_snapshot( - &self, - command_ctx: &CommandContext, - tables_to_commit: HashSet, - resps: Vec, - backfill_pinned_log_epoch: HashMap)>, - ) -> MetaResult> { - { - { - match &command_ctx.kind { - BarrierKind::Initial => {} - BarrierKind::Checkpoint(epochs) => { - let commit_info = collect_commit_epoch_info( - resps, - command_ctx, - epochs, - backfill_pinned_log_epoch, - tables_to_commit, - ); - self.hummock_manager.commit_epoch(commit_info).await?; - } - BarrierKind::Barrier => { - // if we collect a barrier(checkpoint = false), - // we need to ensure that command is Plain and the notifier's checkpoint is - // false - assert!(!command_ctx.command.need_checkpoint()); - } - } - - command_ctx.post_collect().await?; - Ok(if command_ctx.kind.is_checkpoint() { - Some(self.hummock_manager.get_version_stats().await) - } else { - None - }) - } - } - } - pub fn hummock_manager(&self) -> &HummockManagerRef { &self.hummock_manager } @@ -1392,18 +1356,26 @@ impl CheckpointControl { pub(super) async fn next_completed_barrier( &mut self, ) -> MetaResult> { - if matches!(&self.completing_command, CompletingCommand::None) { - // If there is no completing barrier, try to start completing the earliest barrier if - // it has been collected. - if let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value() - && !state.is_inflight() + // If there is no completing barrier, try to start completing the earliest barrier if + // it has been collected. + while let CompletingCommand::None = &self.completing_command + && let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value() + && !state.is_inflight() + { { let (_, node) = self.command_ctx_queue.pop_first().expect("non-empty"); assert!(node.state.creating_jobs_to_wait.is_empty()); - let table_ids_to_finish = node.state.finished_table_ids.keys().cloned().collect(); let finished_jobs = self .create_mview_tracker .apply_collected_command(&node, &self.hummock_version_stats); + if !node.command_ctx.kind.is_checkpoint() { + assert!(finished_jobs.is_empty()); + node.notifiers.into_iter().for_each(|notifier| { + notifier.notify_collected(); + }); + continue; + } + let table_ids_to_finish = node.state.finished_table_ids.keys().cloned().collect(); let command_ctx = node.command_ctx.clone(); let join_handle = tokio::spawn(self.context.clone().complete_barrier( node, @@ -1424,7 +1396,11 @@ impl CheckpointControl { join_handle, table_ids_to_finish, }; - } else { + } + } + + if matches!(&self.completing_command, CompletingCommand::None) { + { for (table_id, job) in &mut self.creating_streaming_job_controls { let (upstream_epochs_to_notify, commit_info) = job.start_completing(); for upstream_epoch in upstream_epochs_to_notify { @@ -1482,9 +1458,7 @@ impl CheckpointControl { let completed_command = replace(&mut self.completing_command, next_completing_command_status); join_result.map(move | version_stats| { - if let Some(new_version_stats) = version_stats { - self.hummock_version_stats = new_version_stats; - } + self.hummock_version_stats = version_stats; must_match!( completed_command, CompletingCommand::GlobalStreamingGraph { command_ctx, table_ids_to_finish, require_next_checkpoint, .. } => { From f50cb2bc54bef9852f41dbbc11a1e6fe47cd1bd9 Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 9 Oct 2024 13:08:04 +0800 Subject: [PATCH 03/11] fix timeout --- src/meta/src/hummock/manager/transaction.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index 87ec960bca1d1..cef638908e59d 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -159,7 +159,7 @@ impl<'a> HummockVersionTransaction<'a> { .last() .map(|level| level.sub_level_id + 1) }) - .unwrap_or(Epoch::now().0); + .unwrap_or_else(|| Epoch::now().0); let group_deltas = &mut new_version_delta .group_deltas .entry(compaction_group_id) From c2789e89206f0733b762501a4d105a191a2fbaba Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 9 Oct 2024 13:54:54 +0800 Subject: [PATCH 04/11] avoid calling now --- src/meta/src/barrier/mod.rs | 9 +++++---- src/meta/src/hummock/manager/transaction.rs | 9 +++++++-- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 2bbc4083299d4..cfabf840e14f2 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -1244,10 +1244,11 @@ impl GlobalBarrierManagerContext { notifiers.into_iter().for_each(|notifier| { notifier.notify_collected(); }); - try_join_all(finished_jobs.into_iter().map(|finished_job| { - let metadata_manager = &self.metadata_manager; - async move { finished_job.finish(metadata_manager).await } - })) + try_join_all( + finished_jobs + .into_iter() + .map(|finished_job| finished_job.finish(&self.metadata_manager)), + ) .await?; let duration_sec = enqueue_time.stop_and_record(); self.report_complete_event(duration_sec, &command_ctx); diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index cef638908e59d..56b60a6535722 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -17,7 +17,6 @@ use std::ops::{Deref, DerefMut}; use std::sync::Arc; use risingwave_common::catalog::TableId; -use risingwave_common::util::epoch::Epoch; use risingwave_hummock_sdk::change_log::ChangeLogDelta; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::sstable_info::SstableInfo; @@ -159,7 +158,13 @@ impl<'a> HummockVersionTransaction<'a> { .last() .map(|level| level.sub_level_id + 1) }) - .unwrap_or_else(|| Epoch::now().0); + .unwrap_or_else(|| { + tables_to_commit + .values() + .cloned() + .max() + .expect("non empty tables_to_commit") + }); let group_deltas = &mut new_version_delta .group_deltas .entry(compaction_group_id) From f86c5c63d223abfacc76ec966398d6b1055ad129 Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 9 Oct 2024 18:17:25 +0800 Subject: [PATCH 05/11] refactor --- src/meta/src/barrier/mod.rs | 415 +++++++++++++++---------------- src/meta/src/barrier/schedule.rs | 4 + 2 files changed, 199 insertions(+), 220 deletions(-) diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index cfabf840e14f2..7b75cd0c964f3 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -12,10 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::assert_matches::assert_matches; use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap, HashSet}; -use std::future::pending; +use std::future::{pending, Future}; use std::mem::{replace, take}; use std::sync::Arc; use std::time::Duration; @@ -55,6 +54,7 @@ use crate::barrier::creating_job::CreatingStreamingJobControl; use crate::barrier::info::InflightGraphInfo; use crate::barrier::progress::{CreateMviewProgressTracker, TrackingCommand, TrackingJob}; use crate::barrier::rpc::{merge_node_rpc_errors, ControlStreamManager}; +use crate::barrier::schedule::ScheduledBarriers; use crate::barrier::state::BarrierManagerState; use crate::error::MetaErrorInner; use crate::hummock::{CommitEpochInfo, HummockManagerRef, NewTableFragmentInfo}; @@ -292,7 +292,6 @@ impl CheckpointControl { notifiers: Vec, node_to_collect: HashSet, jobs_to_wait: HashSet, - table_ids_to_commit: HashSet, ) { let timer = self.context.metrics.barrier_latency.start_timer(); @@ -334,8 +333,7 @@ impl CheckpointControl { node_to_collect, resps: vec![], creating_jobs_to_wait, - finished_table_ids: HashMap::new(), - table_ids_to_commit, + finished_jobs: HashMap::new(), }, command_ctx, notifiers, @@ -404,7 +402,7 @@ impl CheckpointControl { CompletingCommand::None | CompletingCommand::Err(_) | CompletingCommand::CreatingStreamingJob { .. } => None, - CompletingCommand::GlobalStreamingGraph { command_ctx, .. } => Some(command_ctx), + CompletingCommand::GlobalStreamingGraph { command_ctx, .. } => command_ctx.as_ref(), }) .map(|command_ctx| command_ctx.command.should_pause_inject_barrier()) .unwrap_or(false); @@ -418,7 +416,7 @@ impl CheckpointControl { | CompletingCommand::Err(_) | CompletingCommand::CreatingStreamingJob { .. } => None, CompletingCommand::GlobalStreamingGraph { command_ctx, .. } => - Some(command_ctx), + command_ctx.as_ref(), } .into_iter() ) @@ -434,16 +432,8 @@ impl CheckpointControl { // join spawned completing command to finish no matter it succeeds or not. let is_err = match replace(&mut self.completing_command, CompletingCommand::None) { CompletingCommand::None => false, - CompletingCommand::GlobalStreamingGraph { - command_ctx, - join_handle, - .. - } => { - info!( - prev_epoch = ?command_ctx.prev_epoch, - curr_epoch = ?command_ctx.curr_epoch, - "waiting for completing command to finish in recovery" - ); + CompletingCommand::GlobalStreamingGraph { join_handle, .. } => { + info!("waiting for completing command to finish in recovery"); match join_handle.await { Err(e) => { warn!(err = ?e.as_report(), "failed to join completing task"); @@ -473,36 +463,15 @@ impl CheckpointControl { }; if !is_err { // continue to finish the pending collected barrier. - while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value() - && !state.is_inflight() - { - let (_, node) = self.command_ctx_queue.pop_first().expect("non-empty"); - let (prev_epoch, curr_epoch) = ( - node.command_ctx.prev_epoch.value().0, - node.command_ctx.curr_epoch.value().0, - ); - let finished_jobs = self - .create_mview_tracker - .apply_collected_command(&node, &self.hummock_version_stats); - if node.command_ctx.kind.is_checkpoint() - && let Err(e) = self - .context - .clone() - .complete_barrier(node, finished_jobs, HashMap::new()) - .await - { + while let Some((task, _)) = self.next_collected_checkpoint_barrier(None) { + if let Err(e) = self.context.clone().complete_barrier(task).await { error!( - prev_epoch, - curr_epoch, err = ?e.as_report(), "failed to complete barrier during recovery" ); break; } else { - info!( - prev_epoch, - curr_epoch, "succeed to complete barrier during recovery" - ) + info!("succeed to complete barrier during recovery") } } } @@ -548,9 +517,7 @@ struct BarrierEpochState { creating_jobs_to_wait: HashMap>, - finished_table_ids: HashMap, - - table_ids_to_commit: HashSet, + finished_jobs: HashMap, } impl BarrierEpochState { @@ -562,9 +529,8 @@ impl BarrierEpochState { enum CompletingCommand { None, GlobalStreamingGraph { - command_ctx: Arc, + command_ctx: Option>, table_ids_to_finish: HashSet, - require_next_checkpoint: bool, // The join handle of a spawned task that completes the barrier. // The return value indicate whether there is some create streaming job command @@ -871,15 +837,9 @@ impl GlobalBarrierManager { } } } - complete_result = self.checkpoint_control.next_completed_barrier() => { + complete_result = self.checkpoint_control.next_completed_barrier(&mut self.scheduled_barriers) => { match complete_result { Ok(Some(output)) => { - // If there are remaining commands (that requires checkpoint to finish), we force - // the next barrier to be a checkpoint. - if output.require_next_checkpoint { - assert_matches!(output.command_ctx.kind, BarrierKind::Barrier); - self.scheduled_barriers.force_checkpoint_in_next_barrier(); - } self.control_stream_manager.remove_partial_graph( output.table_ids_to_finish.iter().map(|table_id| table_id.table_id).collect() ); @@ -1039,7 +999,7 @@ impl GlobalBarrierManager { pre_applied_subscription_info, prev_epoch.clone(), curr_epoch.clone(), - table_ids_to_commit.clone(), + table_ids_to_commit, self.state.paused_reason(), command, kind, @@ -1091,7 +1051,6 @@ impl GlobalBarrierManager { notifiers, node_to_collect, jobs_to_wait, - table_ids_to_commit, ); Ok(()) @@ -1156,6 +1115,14 @@ impl GlobalBarrierManager { } } +struct CompleteBarrierTask { + commit_info: CommitEpochInfo, + finished_jobs: Vec, + notifiers: Vec, + /// Some((`command_ctx`, `enqueue_time`)) + command_context: Option<(Arc, HistogramTimer)>, +} + impl GlobalBarrierManagerContext { async fn complete_creating_job_barrier( self, @@ -1190,73 +1157,43 @@ impl GlobalBarrierManagerContext { Ok(()) } - async fn complete_barrier( - self, - node: EpochNode, - mut finished_jobs: Vec, - backfill_pinned_log_epoch: HashMap)>, - ) -> MetaResult { - tracing::trace!( - prev_epoch = node.command_ctx.prev_epoch.value().0, - kind = ?node.command_ctx.kind, - "complete barrier" - ); - let EpochNode { - command_ctx, - notifiers, - enqueue_time, - state, - .. - } = node; - let epochs = must_match!(&command_ctx.kind, BarrierKind::Checkpoint(epochs) => epochs); - assert!(state.node_to_collect.is_empty()); - assert!(state.creating_jobs_to_wait.is_empty()); - let wait_commit_timer = self.metrics.barrier_wait_commit_latency.start_timer(); - finished_jobs.extend(state.finished_table_ids.into_values().map(|info| { - TrackingJob::New(TrackingCommand { - info, - replace_table_info: None, - }) - })); - - let result: MetaResult = try { - let commit_info = collect_commit_epoch_info( - state.resps, - &command_ctx, - epochs, - backfill_pinned_log_epoch, - state.table_ids_to_commit, - ); - self.hummock_manager.commit_epoch(commit_info).await?; - command_ctx.post_collect().await?; - self.hummock_manager.get_version_stats().await + async fn complete_barrier(self, task: CompleteBarrierTask) -> MetaResult { + let result: MetaResult<()> = try { + let wait_commit_timer = self.metrics.barrier_wait_commit_latency.start_timer(); + self.hummock_manager.commit_epoch(task.commit_info).await?; + if let Some((command_ctx, _)) = &task.command_context { + command_ctx.post_collect().await?; + } + + wait_commit_timer.observe_duration(); }; - let version_stats = match result { - Ok(version_stats) => version_stats, - Err(e) => { - for notifier in notifiers { + { + if let Err(e) = result { + for notifier in task.notifiers { notifier.notify_collection_failed(e.clone()); } return Err(e); } - }; - notifiers.into_iter().for_each(|notifier| { - notifier.notify_collected(); - }); - try_join_all( - finished_jobs - .into_iter() - .map(|finished_job| finished_job.finish(&self.metadata_manager)), - ) - .await?; - let duration_sec = enqueue_time.stop_and_record(); - self.report_complete_event(duration_sec, &command_ctx); - wait_commit_timer.observe_duration(); - self.metrics - .last_committed_barrier_time - .set(command_ctx.curr_epoch.value().as_unix_secs() as i64); - Ok(version_stats) + task.notifiers.into_iter().for_each(|notifier| { + notifier.notify_collected(); + }); + try_join_all( + task.finished_jobs + .into_iter() + .map(|finished_job| finished_job.finish(&self.metadata_manager)), + ) + .await?; + if let Some((command_ctx, enqueue_time)) = task.command_context { + let duration_sec = enqueue_time.stop_and_record(); + self.report_complete_event(duration_sec, &command_ctx); + self.metrics + .last_committed_barrier_time + .set(command_ctx.curr_epoch.value().as_unix_secs() as i64); + } + } + + Ok(self.hummock_manager.get_version_stats().await) } pub fn hummock_manager(&self) -> &HummockManagerRef { @@ -1323,8 +1260,6 @@ impl GlobalBarrierManagerContext { } struct BarrierCompleteOutput { - command_ctx: Arc, - require_next_checkpoint: bool, table_ids_to_finish: HashSet, } @@ -1354,19 +1289,18 @@ impl CheckpointControl { .collect() } - pub(super) async fn next_completed_barrier( + fn next_collected_checkpoint_barrier( &mut self, - ) -> MetaResult> { - // If there is no completing barrier, try to start completing the earliest barrier if - // it has been collected. - while let CompletingCommand::None = &self.completing_command - && let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value() + mut scheduled_barriers: Option<&mut ScheduledBarriers>, + ) -> Option<(CompleteBarrierTask, HashSet)> { + while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value() && !state.is_inflight() { { - let (_, node) = self.command_ctx_queue.pop_first().expect("non-empty"); + let (_, mut node) = self.command_ctx_queue.pop_first().expect("non-empty"); assert!(node.state.creating_jobs_to_wait.is_empty()); - let finished_jobs = self + assert!(node.state.node_to_collect.is_empty()); + let mut finished_jobs = self .create_mview_tracker .apply_collected_command(&node, &self.hummock_version_stats); if !node.command_ctx.kind.is_checkpoint() { @@ -1374,29 +1308,71 @@ impl CheckpointControl { node.notifiers.into_iter().for_each(|notifier| { notifier.notify_collected(); }); + if let Some(scheduled_barriers) = &mut scheduled_barriers + && !scheduled_barriers.is_forced_next_checkpoint() + && self.create_mview_tracker.has_pending_finished_jobs() + && self + .command_ctx_queue + .values() + .all(|node| !node.command_ctx.kind.is_checkpoint()) + { + scheduled_barriers.force_checkpoint_in_next_barrier(); + } continue; } - let table_ids_to_finish = node.state.finished_table_ids.keys().cloned().collect(); - let command_ctx = node.command_ctx.clone(); - let join_handle = tokio::spawn(self.context.clone().complete_barrier( - node, - finished_jobs, + let commit_info = collect_commit_epoch_info( + take(&mut node.state.resps), + &node.command_ctx, self.collect_backfill_pinned_upstream_log_epoch(), + ); + let table_ids_to_finish = node + .state + .finished_jobs + .drain() + .map(|(table_id, info)| { + finished_jobs.push(TrackingJob::New(TrackingCommand { + info, + replace_table_info: None, + })); + table_id + }) + .collect(); + return Some(( + CompleteBarrierTask { + commit_info, + finished_jobs, + notifiers: node.notifiers, + command_context: Some((node.command_ctx, node.enqueue_time)), + }, + table_ids_to_finish, )); - let require_next_checkpoint = - if self.create_mview_tracker.has_pending_finished_jobs() { - self.command_ctx_queue - .values() - .all(|node| !node.command_ctx.kind.is_checkpoint()) - } else { - false + } + } + None + } + + pub(super) fn next_completed_barrier<'a>( + &'a mut self, + scheduled_barriers: &mut ScheduledBarriers, + ) -> impl Future>> + 'a { + // If there is no completing barrier, try to start completing the earliest barrier if + // it has been collected. + if let CompletingCommand::None = &self.completing_command { + if let Some((task, table_ids_to_finish)) = + self.next_collected_checkpoint_barrier(Some(scheduled_barriers)) + { + { + let command_ctx = task + .command_context + .as_ref() + .map(|(command_ctx, _)| command_ctx.clone()); + let join_handle = tokio::spawn(self.context.clone().complete_barrier(task)); + self.completing_command = CompletingCommand::GlobalStreamingGraph { + command_ctx, + join_handle, + table_ids_to_finish, }; - self.completing_command = CompletingCommand::GlobalStreamingGraph { - command_ctx, - require_next_checkpoint, - join_handle, - table_ids_to_finish, - }; + } } } @@ -1442,93 +1418,93 @@ impl CheckpointControl { } } - match &mut self.completing_command { - CompletingCommand::GlobalStreamingGraph { join_handle, .. } => { - let join_result: MetaResult<_> = try { - join_handle - .await - .context("failed to join completing command")?? - }; - // It's important to reset the completing_command after await no matter the result is err - // or not, and otherwise the join handle will be polled again after ready. - let next_completing_command_status = if let Err(e) = &join_result { - CompletingCommand::Err(e.clone()) - } else { - CompletingCommand::None - }; - let completed_command = - replace(&mut self.completing_command, next_completing_command_status); - join_result.map(move | version_stats| { + async move { + match &mut self.completing_command { + CompletingCommand::GlobalStreamingGraph { join_handle, .. } => { + let join_result: MetaResult<_> = try { + join_handle + .await + .context("failed to join completing command")?? + }; + // It's important to reset the completing_command after await no matter the result is err + // or not, and otherwise the join handle will be polled again after ready. + let next_completing_command_status = if let Err(e) = &join_result { + CompletingCommand::Err(e.clone()) + } else { + CompletingCommand::None + }; + let completed_command = + replace(&mut self.completing_command, next_completing_command_status); + join_result.map(move |version_stats| { self.hummock_version_stats = version_stats; must_match!( completed_command, - CompletingCommand::GlobalStreamingGraph { command_ctx, table_ids_to_finish, require_next_checkpoint, .. } => { + CompletingCommand::GlobalStreamingGraph { table_ids_to_finish, .. } => { Some(BarrierCompleteOutput { - command_ctx, - require_next_checkpoint, table_ids_to_finish, }) } ) }) - } - CompletingCommand::CreatingStreamingJob { - table_id, - epoch, - join_handle, - } => { - let table_id = *table_id; - let epoch = *epoch; - let join_result: MetaResult<_> = try { - join_handle - .await - .context("failed to join completing command")?? - }; - // It's important to reset the completing_command after await no matter the result is err - // or not, and otherwise the join handle will be polled again after ready. - let next_completing_command_status = if let Err(e) = &join_result { - CompletingCommand::Err(e.clone()) - } else { - CompletingCommand::None - }; - self.completing_command = next_completing_command_status; - if let Some((upstream_epoch, is_finished)) = self - .creating_streaming_job_controls - .get_mut(&table_id) - .expect("should exist") - .ack_completed(epoch) - { - let wait_progress_timer = self - .command_ctx_queue - .get_mut(&upstream_epoch) + } + CompletingCommand::CreatingStreamingJob { + table_id, + epoch, + join_handle, + } => { + let table_id = *table_id; + let epoch = *epoch; + let join_result: MetaResult<_> = try { + join_handle + .await + .context("failed to join completing command")?? + }; + // It's important to reset the completing_command after await no matter the result is err + // or not, and otherwise the join handle will be polled again after ready. + let next_completing_command_status = if let Err(e) = &join_result { + CompletingCommand::Err(e.clone()) + } else { + CompletingCommand::None + }; + self.completing_command = next_completing_command_status; + if let Some((upstream_epoch, is_finished)) = self + .creating_streaming_job_controls + .get_mut(&table_id) .expect("should exist") - .state - .creating_jobs_to_wait - .remove(&table_id) - .expect("should exist"); - if let Some(timer) = wait_progress_timer { - timer.observe_duration(); - } - if is_finished { - debug!(epoch, ?table_id, "finish creating job"); - let creating_streaming_job = self - .creating_streaming_job_controls - .remove(&table_id) - .expect("should exist"); - assert!(creating_streaming_job.is_finished()); - assert!(self + .ack_completed(epoch) + { + let wait_progress_timer = self .command_ctx_queue .get_mut(&upstream_epoch) .expect("should exist") .state - .finished_table_ids - .insert(table_id, creating_streaming_job.info) - .is_none()); + .creating_jobs_to_wait + .remove(&table_id) + .expect("should exist"); + if let Some(timer) = wait_progress_timer { + timer.observe_duration(); + } + if is_finished { + debug!(epoch, ?table_id, "finish creating job"); + let creating_streaming_job = self + .creating_streaming_job_controls + .remove(&table_id) + .expect("should exist"); + assert!(creating_streaming_job.is_finished()); + assert!(self + .command_ctx_queue + .get_mut(&upstream_epoch) + .expect("should exist") + .state + .finished_jobs + .insert(table_id, creating_streaming_job.info) + .is_none()); + } } + join_result.map(|_| None) } - join_result.map(|_| None) + CompletingCommand::None | CompletingCommand::Err(_) => pending().await, } - CompletingCommand::None | CompletingCommand::Err(_) => pending().await, } } } @@ -1683,9 +1659,7 @@ fn collect_resp_info( fn collect_commit_epoch_info( resps: Vec, command_ctx: &CommandContext, - epochs: &Vec, backfill_pinned_log_epoch: HashMap)>, - tables_to_commit: HashSet, ) -> CommitEpochInfo { let (sst_to_context, synced_ssts, new_table_watermarks, old_value_ssts) = collect_resp_info(resps); @@ -1740,14 +1714,15 @@ fn collect_commit_epoch_info( let table_new_change_log = build_table_change_log_delta( old_value_ssts.into_iter(), synced_ssts.iter().map(|sst| &sst.sst_info), - epochs, + must_match!(&command_ctx.kind, BarrierKind::Checkpoint(epochs) => epochs), mv_log_store_truncate_epoch.into_iter(), ); let epoch = command_ctx.prev_epoch.value().0; - let tables_to_commit = tables_to_commit - .into_iter() - .map(|table_id| (table_id, epoch)) + let tables_to_commit = command_ctx + .table_ids_to_commit + .iter() + .map(|table_id| (*table_id, epoch)) .collect(); CommitEpochInfo { diff --git a/src/meta/src/barrier/schedule.rs b/src/meta/src/barrier/schedule.rs index c39fdb56e4fb7..59eb11d4342aa 100644 --- a/src/meta/src/barrier/schedule.rs +++ b/src/meta/src/barrier/schedule.rs @@ -451,6 +451,10 @@ impl ScheduledBarriers { self.force_checkpoint = true; } + pub fn is_forced_next_checkpoint(&self) -> bool { + self.force_checkpoint + } + /// Update the `checkpoint_frequency` pub fn set_checkpoint_frequency(&mut self, frequency: usize) { self.checkpoint_frequency = frequency; From f8df42d575402116e5fc313048154644b3381831 Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 9 Oct 2024 19:35:29 +0800 Subject: [PATCH 06/11] refactor --- src/meta/src/barrier/mod.rs | 360 +++++++++---------- src/meta/src/barrier/schedule.rs | 4 - src/meta/src/hummock/manager/commit_epoch.rs | 1 + src/meta/src/lib.rs | 1 + 4 files changed, 162 insertions(+), 204 deletions(-) diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 7b75cd0c964f3..401e983dabbcb 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -14,7 +14,7 @@ use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap, HashSet}; -use std::future::{pending, Future}; +use std::future::pending; use std::mem::{replace, take}; use std::sync::Arc; use std::time::Duration; @@ -54,7 +54,6 @@ use crate::barrier::creating_job::CreatingStreamingJobControl; use crate::barrier::info::InflightGraphInfo; use crate::barrier::progress::{CreateMviewProgressTracker, TrackingCommand, TrackingJob}; use crate::barrier::rpc::{merge_node_rpc_errors, ControlStreamManager}; -use crate::barrier::schedule::ScheduledBarriers; use crate::barrier::state::BarrierManagerState; use crate::error::MetaErrorInner; use crate::hummock::{CommitEpochInfo, HummockManagerRef, NewTableFragmentInfo}; @@ -399,9 +398,7 @@ impl CheckpointControl { .last_key_value() .map(|(_, x)| &x.command_ctx) .or(match &self.completing_command { - CompletingCommand::None - | CompletingCommand::Err(_) - | CompletingCommand::CreatingStreamingJob { .. } => None, + CompletingCommand::None | CompletingCommand::Err(_) => None, CompletingCommand::GlobalStreamingGraph { command_ctx, .. } => command_ctx.as_ref(), }) .map(|command_ctx| command_ctx.command.should_pause_inject_barrier()) @@ -412,9 +409,7 @@ impl CheckpointControl { .map(|node| &node.command_ctx) .chain( match &self.completing_command { - CompletingCommand::None - | CompletingCommand::Err(_) - | CompletingCommand::CreatingStreamingJob { .. } => None, + CompletingCommand::None | CompletingCommand::Err(_) => None, CompletingCommand::GlobalStreamingGraph { command_ctx, .. } => command_ctx.as_ref(), } @@ -447,23 +442,10 @@ impl CheckpointControl { } } CompletingCommand::Err(_) => true, - CompletingCommand::CreatingStreamingJob { join_handle, .. } => { - match join_handle.await { - Err(e) => { - warn!(err = ?e.as_report(), "failed to join completing task"); - true - } - Ok(Err(e)) => { - warn!(err = ?e.as_report(), "failed to complete barrier during clear"); - true - } - Ok(Ok(_)) => false, - } - } }; if !is_err { // continue to finish the pending collected barrier. - while let Some((task, _)) = self.next_collected_checkpoint_barrier(None) { + while let Some(task) = self.next_complete_barrier_task() { if let Err(e) = self.context.clone().complete_barrier(task).await { error!( err = ?e.as_report(), @@ -531,17 +513,14 @@ enum CompletingCommand { GlobalStreamingGraph { command_ctx: Option>, table_ids_to_finish: HashSet, + creating_job_epochs: Vec<(TableId, u64)>, + require_next_checkpoint: bool, // The join handle of a spawned task that completes the barrier. // The return value indicate whether there is some create streaming job command // that has finished but not checkpointed. If there is any, we will force checkpoint on the next barrier join_handle: JoinHandle>, }, - CreatingStreamingJob { - table_id: TableId, - epoch: u64, - join_handle: JoinHandle>, - }, #[expect(dead_code)] Err(MetaError), } @@ -837,14 +816,18 @@ impl GlobalBarrierManager { } } } - complete_result = self.checkpoint_control.next_completed_barrier(&mut self.scheduled_barriers) => { + complete_result = self.checkpoint_control.next_completed_barrier() => { match complete_result { - Ok(Some(output)) => { + Ok(output) => { + // If there are remaining commands (that requires checkpoint to finish), we force + // the next barrier to be a checkpoint. + if output.require_next_checkpoint { + self.scheduled_barriers.force_checkpoint_in_next_barrier(); + } self.control_stream_manager.remove_partial_graph( output.table_ids_to_finish.iter().map(|table_id| table_id.table_id).collect() ); } - Ok(None) => {} Err(e) => { self.failure_recovery(e).await; } @@ -1115,46 +1098,48 @@ impl GlobalBarrierManager { } } +#[derive(Default)] struct CompleteBarrierTask { commit_info: CommitEpochInfo, finished_jobs: Vec, notifiers: Vec, /// Some((`command_ctx`, `enqueue_time`)) command_context: Option<(Arc, HistogramTimer)>, + table_ids_to_finish: HashSet, + creating_job_epochs: Vec<(TableId, u64)>, + require_next_checkpoint: bool, } impl GlobalBarrierManagerContext { - async fn complete_creating_job_barrier( - self, + fn collect_creating_job_commit_epoch_info( + commit_info: &mut CommitEpochInfo, epoch: u64, resps: Vec, - tables_to_commit: HashSet, + tables_to_commit: impl Iterator, is_first_time: bool, - ) -> MetaResult<()> { + ) { let (sst_to_context, sstables, new_table_watermarks, old_value_sst) = collect_resp_info(resps); assert!(old_value_sst.is_empty()); - let new_table_fragment_infos = if is_first_time { - vec![NewTableFragmentInfo::NewCompactionGroup { - table_ids: tables_to_commit.clone(), - }] - } else { - vec![] - }; - let tables_to_commit = tables_to_commit - .into_iter() - .map(|table_id| (table_id, epoch)) - .collect(); - let info = CommitEpochInfo { - sstables, - new_table_watermarks, - sst_to_context, - new_table_fragment_infos, - change_log_delta: Default::default(), - tables_to_commit, + commit_info.sst_to_context.extend(sst_to_context); + commit_info.sstables.extend(sstables); + commit_info + .new_table_watermarks + .extend(new_table_watermarks); + let tables_to_commit: HashSet<_> = tables_to_commit.collect(); + tables_to_commit.iter().for_each(|table_id| { + commit_info + .tables_to_commit + .try_insert(*table_id, epoch) + .expect("non duplicate"); + }); + if is_first_time { + commit_info + .new_table_fragment_infos + .push(NewTableFragmentInfo::NewCompactionGroup { + table_ids: tables_to_commit, + }); }; - self.hummock_manager.commit_epoch(info).await?; - Ok(()) } async fn complete_barrier(self, task: CompleteBarrierTask) -> MetaResult { @@ -1260,6 +1245,7 @@ impl GlobalBarrierManagerContext { } struct BarrierCompleteOutput { + require_next_checkpoint: bool, table_ids_to_finish: HashSet, } @@ -1289,10 +1275,8 @@ impl CheckpointControl { .collect() } - fn next_collected_checkpoint_barrier( - &mut self, - mut scheduled_barriers: Option<&mut ScheduledBarriers>, - ) -> Option<(CompleteBarrierTask, HashSet)> { + fn next_complete_barrier_task(&mut self) -> Option { + let mut task = None; while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value() && !state.is_inflight() { @@ -1308,16 +1292,6 @@ impl CheckpointControl { node.notifiers.into_iter().for_each(|notifier| { notifier.notify_collected(); }); - if let Some(scheduled_barriers) = &mut scheduled_barriers - && !scheduled_barriers.is_forced_next_checkpoint() - && self.create_mview_tracker.has_pending_finished_jobs() - && self - .command_ctx_queue - .values() - .all(|node| !node.command_ctx.kind.is_checkpoint()) - { - scheduled_barriers.force_checkpoint_in_next_barrier(); - } continue; } let commit_info = collect_commit_epoch_info( @@ -1337,46 +1311,27 @@ impl CheckpointControl { table_id }) .collect(); - return Some(( - CompleteBarrierTask { - commit_info, - finished_jobs, - notifiers: node.notifiers, - command_context: Some((node.command_ctx, node.enqueue_time)), - }, - table_ids_to_finish, - )); - } - } - None - } - - pub(super) fn next_completed_barrier<'a>( - &'a mut self, - scheduled_barriers: &mut ScheduledBarriers, - ) -> impl Future>> + 'a { - // If there is no completing barrier, try to start completing the earliest barrier if - // it has been collected. - if let CompletingCommand::None = &self.completing_command { - if let Some((task, table_ids_to_finish)) = - self.next_collected_checkpoint_barrier(Some(scheduled_barriers)) - { - { - let command_ctx = task - .command_context - .as_ref() - .map(|(command_ctx, _)| command_ctx.clone()); - let join_handle = tokio::spawn(self.context.clone().complete_barrier(task)); - self.completing_command = CompletingCommand::GlobalStreamingGraph { - command_ctx, - join_handle, - table_ids_to_finish, + let require_next_checkpoint = + if self.create_mview_tracker.has_pending_finished_jobs() { + self.command_ctx_queue + .values() + .all(|node| !node.command_ctx.kind.is_checkpoint()) + } else { + false }; - } + task = Some(CompleteBarrierTask { + commit_info, + finished_jobs, + notifiers: node.notifiers, + command_context: Some((node.command_ctx, node.enqueue_time)), + table_ids_to_finish, + creating_job_epochs: vec![], + require_next_checkpoint, + }); + break; } } - - if matches!(&self.completing_command, CompletingCommand::None) { + { { for (table_id, job) in &mut self.creating_streaming_job_controls { let (upstream_epochs_to_notify, commit_info) = job.start_completing(); @@ -1394,117 +1349,122 @@ impl CheckpointControl { } } if let Some((epoch, resps, is_first_time)) = commit_info { - let tables_to_commit = job - .info - .table_fragments - .all_table_ids() - .map(TableId::new) - .collect(); - let join_handle = - tokio::spawn(self.context.clone().complete_creating_job_barrier( - epoch, - resps, - tables_to_commit, - is_first_time, - )); - self.completing_command = CompletingCommand::CreatingStreamingJob { - table_id: *table_id, + let task = task.get_or_insert_default(); + GlobalBarrierManagerContext::collect_creating_job_commit_epoch_info( + &mut task.commit_info, epoch, - join_handle, - }; - break; + resps, + job.info.table_fragments.all_table_ids().map(TableId::new), + is_first_time, + ); + task.creating_job_epochs.push((*table_id, epoch)); } } } } + task + } - async move { - match &mut self.completing_command { - CompletingCommand::GlobalStreamingGraph { join_handle, .. } => { - let join_result: MetaResult<_> = try { - join_handle - .await - .context("failed to join completing command")?? - }; - // It's important to reset the completing_command after await no matter the result is err - // or not, and otherwise the join handle will be polled again after ready. - let next_completing_command_status = if let Err(e) = &join_result { - CompletingCommand::Err(e.clone()) - } else { - CompletingCommand::None + pub(super) async fn next_completed_barrier(&mut self) -> MetaResult { + // If there is no completing barrier, try to start completing the earliest barrier if + // it has been collected. + if let CompletingCommand::None = &self.completing_command { + if let Some(task) = self.next_complete_barrier_task() { + { + let command_ctx = task + .command_context + .as_ref() + .map(|(command_ctx, _)| command_ctx.clone()); + let table_ids_to_finish = task.table_ids_to_finish.clone(); + let creating_job_epochs = task.creating_job_epochs.clone(); + let require_next_checkpoint = task.require_next_checkpoint; + let join_handle = tokio::spawn(self.context.clone().complete_barrier(task)); + self.completing_command = CompletingCommand::GlobalStreamingGraph { + command_ctx, + join_handle, + table_ids_to_finish, + creating_job_epochs, + require_next_checkpoint, }; - let completed_command = - replace(&mut self.completing_command, next_completing_command_status); - join_result.map(move |version_stats| { - self.hummock_version_stats = version_stats; - must_match!( - completed_command, - CompletingCommand::GlobalStreamingGraph { table_ids_to_finish, .. } => { - Some(BarrierCompleteOutput { - table_ids_to_finish, - }) - } - ) - }) } - CompletingCommand::CreatingStreamingJob { - table_id, - epoch, - join_handle, - } => { - let table_id = *table_id; - let epoch = *epoch; - let join_result: MetaResult<_> = try { - join_handle - .await - .context("failed to join completing command")?? - }; - // It's important to reset the completing_command after await no matter the result is err - // or not, and otherwise the join handle will be polled again after ready. - let next_completing_command_status = if let Err(e) = &join_result { - CompletingCommand::Err(e.clone()) - } else { - CompletingCommand::None - }; - self.completing_command = next_completing_command_status; - if let Some((upstream_epoch, is_finished)) = self - .creating_streaming_job_controls - .get_mut(&table_id) + } + } + + let CompletingCommand::GlobalStreamingGraph { join_handle, .. } = + &mut self.completing_command + else { + return pending().await; + }; + + let (table_ids_to_finish, creating_job_epochs, require_next_checkpoint) = { + { + let join_result: MetaResult<_> = try { + join_handle + .await + .context("failed to join completing command")?? + }; + // It's important to reset the completing_command after await no matter the result is err + // or not, and otherwise the join handle will be polled again after ready. + let next_completing_command_status = if let Err(e) = &join_result { + CompletingCommand::Err(e.clone()) + } else { + CompletingCommand::None + }; + let completed_command = + replace(&mut self.completing_command, next_completing_command_status); + self.hummock_version_stats = join_result?; + + must_match!(completed_command, CompletingCommand::GlobalStreamingGraph { + table_ids_to_finish, + creating_job_epochs, + require_next_checkpoint, + .. + } => (table_ids_to_finish, creating_job_epochs, require_next_checkpoint)) + } + }; + + { + for (table_id, epoch) in creating_job_epochs { + if let Some((upstream_epoch, is_finished)) = self + .creating_streaming_job_controls + .get_mut(&table_id) + .expect("should exist") + .ack_completed(epoch) + { + let wait_progress_timer = self + .command_ctx_queue + .get_mut(&upstream_epoch) .expect("should exist") - .ack_completed(epoch) - { - let wait_progress_timer = self + .state + .creating_jobs_to_wait + .remove(&table_id) + .expect("should exist"); + if let Some(timer) = wait_progress_timer { + timer.observe_duration(); + } + if is_finished { + debug!(epoch, ?table_id, "finish creating job"); + let creating_streaming_job = self + .creating_streaming_job_controls + .remove(&table_id) + .expect("should exist"); + assert!(creating_streaming_job.is_finished()); + assert!(self .command_ctx_queue .get_mut(&upstream_epoch) .expect("should exist") .state - .creating_jobs_to_wait - .remove(&table_id) - .expect("should exist"); - if let Some(timer) = wait_progress_timer { - timer.observe_duration(); - } - if is_finished { - debug!(epoch, ?table_id, "finish creating job"); - let creating_streaming_job = self - .creating_streaming_job_controls - .remove(&table_id) - .expect("should exist"); - assert!(creating_streaming_job.is_finished()); - assert!(self - .command_ctx_queue - .get_mut(&upstream_epoch) - .expect("should exist") - .state - .finished_jobs - .insert(table_id, creating_streaming_job.info) - .is_none()); - } + .finished_jobs + .insert(table_id, creating_streaming_job.info) + .is_none()); } - join_result.map(|_| None) } - CompletingCommand::None | CompletingCommand::Err(_) => pending().await, } + + Ok(BarrierCompleteOutput { + require_next_checkpoint, + table_ids_to_finish, + }) } } } diff --git a/src/meta/src/barrier/schedule.rs b/src/meta/src/barrier/schedule.rs index 59eb11d4342aa..c39fdb56e4fb7 100644 --- a/src/meta/src/barrier/schedule.rs +++ b/src/meta/src/barrier/schedule.rs @@ -451,10 +451,6 @@ impl ScheduledBarriers { self.force_checkpoint = true; } - pub fn is_forced_next_checkpoint(&self) -> bool { - self.force_checkpoint - } - /// Update the `checkpoint_frequency` pub fn set_checkpoint_frequency(&mut self, frequency: usize) { self.checkpoint_frequency = frequency; diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index 3da93d61ab70b..358e8907e5d5d 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -58,6 +58,7 @@ pub enum NewTableFragmentInfo { }, } +#[derive(Default)] pub struct CommitEpochInfo { pub sstables: Vec, pub new_table_watermarks: HashMap, diff --git a/src/meta/src/lib.rs b/src/meta/src/lib.rs index eab9dd1287ebf..ced439bac63ab 100644 --- a/src/meta/src/lib.rs +++ b/src/meta/src/lib.rs @@ -31,6 +31,7 @@ #![feature(const_option)] #![feature(anonymous_lifetime_in_impl_trait)] #![feature(duration_millis_float)] +#![feature(option_get_or_insert_default)] pub mod backup_restore; pub mod barrier; From d85167ed26d059e7b13ef7890ee92345c630e1df Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 10 Oct 2024 13:12:11 +0800 Subject: [PATCH 07/11] temp revert to 1b359b0cb7 for test timeout --- src/meta/src/barrier/mod.rs | 470 +++++++++++-------- src/meta/src/hummock/manager/commit_epoch.rs | 1 - src/meta/src/hummock/manager/transaction.rs | 9 +- src/meta/src/lib.rs | 1 - 4 files changed, 282 insertions(+), 199 deletions(-) diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 401e983dabbcb..15d6600f68033 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::assert_matches::assert_matches; use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap, HashSet}; use std::future::pending; @@ -291,6 +292,7 @@ impl CheckpointControl { notifiers: Vec, node_to_collect: HashSet, jobs_to_wait: HashSet, + table_ids_to_commit: HashSet, ) { let timer = self.context.metrics.barrier_latency.start_timer(); @@ -332,7 +334,8 @@ impl CheckpointControl { node_to_collect, resps: vec![], creating_jobs_to_wait, - finished_jobs: HashMap::new(), + finished_table_ids: HashMap::new(), + table_ids_to_commit, }, command_ctx, notifiers, @@ -398,8 +401,10 @@ impl CheckpointControl { .last_key_value() .map(|(_, x)| &x.command_ctx) .or(match &self.completing_command { - CompletingCommand::None | CompletingCommand::Err(_) => None, - CompletingCommand::GlobalStreamingGraph { command_ctx, .. } => command_ctx.as_ref(), + CompletingCommand::None + | CompletingCommand::Err(_) + | CompletingCommand::CreatingStreamingJob { .. } => None, + CompletingCommand::GlobalStreamingGraph { command_ctx, .. } => Some(command_ctx), }) .map(|command_ctx| command_ctx.command.should_pause_inject_barrier()) .unwrap_or(false); @@ -409,9 +414,11 @@ impl CheckpointControl { .map(|node| &node.command_ctx) .chain( match &self.completing_command { - CompletingCommand::None | CompletingCommand::Err(_) => None, + CompletingCommand::None + | CompletingCommand::Err(_) + | CompletingCommand::CreatingStreamingJob { .. } => None, CompletingCommand::GlobalStreamingGraph { command_ctx, .. } => - command_ctx.as_ref(), + Some(command_ctx), } .into_iter() ) @@ -427,8 +434,16 @@ impl CheckpointControl { // join spawned completing command to finish no matter it succeeds or not. let is_err = match replace(&mut self.completing_command, CompletingCommand::None) { CompletingCommand::None => false, - CompletingCommand::GlobalStreamingGraph { join_handle, .. } => { - info!("waiting for completing command to finish in recovery"); + CompletingCommand::GlobalStreamingGraph { + command_ctx, + join_handle, + .. + } => { + info!( + prev_epoch = ?command_ctx.prev_epoch, + curr_epoch = ?command_ctx.curr_epoch, + "waiting for completing command to finish in recovery" + ); match join_handle.await { Err(e) => { warn!(err = ?e.as_report(), "failed to join completing task"); @@ -442,18 +457,51 @@ impl CheckpointControl { } } CompletingCommand::Err(_) => true, + CompletingCommand::CreatingStreamingJob { join_handle, .. } => { + match join_handle.await { + Err(e) => { + warn!(err = ?e.as_report(), "failed to join completing task"); + true + } + Ok(Err(e)) => { + warn!(err = ?e.as_report(), "failed to complete barrier during clear"); + true + } + Ok(Ok(_)) => false, + } + } }; if !is_err { // continue to finish the pending collected barrier. - while let Some(task) = self.next_complete_barrier_task() { - if let Err(e) = self.context.clone().complete_barrier(task).await { + while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value() + && !state.is_inflight() + { + let (_, node) = self.command_ctx_queue.pop_first().expect("non-empty"); + let (prev_epoch, curr_epoch) = ( + node.command_ctx.prev_epoch.value().0, + node.command_ctx.curr_epoch.value().0, + ); + let finished_jobs = self + .create_mview_tracker + .apply_collected_command(&node, &self.hummock_version_stats); + if let Err(e) = self + .context + .clone() + .complete_barrier(node, finished_jobs, HashMap::new()) + .await + { error!( + prev_epoch, + curr_epoch, err = ?e.as_report(), "failed to complete barrier during recovery" ); break; } else { - info!("succeed to complete barrier during recovery") + info!( + prev_epoch, + curr_epoch, "succeed to complete barrier during recovery" + ) } } } @@ -499,7 +547,9 @@ struct BarrierEpochState { creating_jobs_to_wait: HashMap>, - finished_jobs: HashMap, + finished_table_ids: HashMap, + + table_ids_to_commit: HashSet, } impl BarrierEpochState { @@ -511,15 +561,19 @@ impl BarrierEpochState { enum CompletingCommand { None, GlobalStreamingGraph { - command_ctx: Option>, + command_ctx: Arc, table_ids_to_finish: HashSet, - creating_job_epochs: Vec<(TableId, u64)>, require_next_checkpoint: bool, // The join handle of a spawned task that completes the barrier. // The return value indicate whether there is some create streaming job command // that has finished but not checkpointed. If there is any, we will force checkpoint on the next barrier - join_handle: JoinHandle>, + join_handle: JoinHandle>>, + }, + CreatingStreamingJob { + table_id: TableId, + epoch: u64, + join_handle: JoinHandle>, }, #[expect(dead_code)] Err(MetaError), @@ -818,16 +872,18 @@ impl GlobalBarrierManager { } complete_result = self.checkpoint_control.next_completed_barrier() => { match complete_result { - Ok(output) => { + Ok(Some(output)) => { // If there are remaining commands (that requires checkpoint to finish), we force // the next barrier to be a checkpoint. if output.require_next_checkpoint { + assert_matches!(output.command_ctx.kind, BarrierKind::Barrier); self.scheduled_barriers.force_checkpoint_in_next_barrier(); } self.control_stream_manager.remove_partial_graph( output.table_ids_to_finish.iter().map(|table_id| table_id.table_id).collect() ); } + Ok(None) => {} Err(e) => { self.failure_recovery(e).await; } @@ -982,7 +1038,7 @@ impl GlobalBarrierManager { pre_applied_subscription_info, prev_epoch.clone(), curr_epoch.clone(), - table_ids_to_commit, + table_ids_to_commit.clone(), self.state.paused_reason(), command, kind, @@ -1034,6 +1090,7 @@ impl GlobalBarrierManager { notifiers, node_to_collect, jobs_to_wait, + table_ids_to_commit, ); Ok(()) @@ -1098,87 +1155,143 @@ impl GlobalBarrierManager { } } -#[derive(Default)] -struct CompleteBarrierTask { - commit_info: CommitEpochInfo, - finished_jobs: Vec, - notifiers: Vec, - /// Some((`command_ctx`, `enqueue_time`)) - command_context: Option<(Arc, HistogramTimer)>, - table_ids_to_finish: HashSet, - creating_job_epochs: Vec<(TableId, u64)>, - require_next_checkpoint: bool, -} - impl GlobalBarrierManagerContext { - fn collect_creating_job_commit_epoch_info( - commit_info: &mut CommitEpochInfo, + async fn complete_creating_job_barrier( + self, epoch: u64, resps: Vec, - tables_to_commit: impl Iterator, + tables_to_commit: HashSet, is_first_time: bool, - ) { + ) -> MetaResult<()> { let (sst_to_context, sstables, new_table_watermarks, old_value_sst) = collect_resp_info(resps); assert!(old_value_sst.is_empty()); - commit_info.sst_to_context.extend(sst_to_context); - commit_info.sstables.extend(sstables); - commit_info - .new_table_watermarks - .extend(new_table_watermarks); - let tables_to_commit: HashSet<_> = tables_to_commit.collect(); - tables_to_commit.iter().for_each(|table_id| { - commit_info - .tables_to_commit - .try_insert(*table_id, epoch) - .expect("non duplicate"); - }); - if is_first_time { - commit_info - .new_table_fragment_infos - .push(NewTableFragmentInfo::NewCompactionGroup { - table_ids: tables_to_commit, - }); + let new_table_fragment_infos = if is_first_time { + vec![NewTableFragmentInfo::NewCompactionGroup { + table_ids: tables_to_commit.clone(), + }] + } else { + vec![] + }; + let tables_to_commit = tables_to_commit + .into_iter() + .map(|table_id| (table_id, epoch)) + .collect(); + let info = CommitEpochInfo { + sstables, + new_table_watermarks, + sst_to_context, + new_table_fragment_infos, + change_log_delta: Default::default(), + tables_to_commit, }; + self.hummock_manager.commit_epoch(info).await?; + Ok(()) } - async fn complete_barrier(self, task: CompleteBarrierTask) -> MetaResult { - let result: MetaResult<()> = try { - let wait_commit_timer = self.metrics.barrier_wait_commit_latency.start_timer(); - self.hummock_manager.commit_epoch(task.commit_info).await?; - if let Some((command_ctx, _)) = &task.command_context { - command_ctx.post_collect().await?; - } - - wait_commit_timer.observe_duration(); - }; + async fn complete_barrier( + self, + node: EpochNode, + mut finished_jobs: Vec, + backfill_pinned_log_epoch: HashMap)>, + ) -> MetaResult> { + tracing::trace!( + prev_epoch = node.command_ctx.prev_epoch.value().0, + kind = ?node.command_ctx.kind, + "complete barrier" + ); + let EpochNode { + command_ctx, + notifiers, + enqueue_time, + state, + .. + } = node; + assert!(state.node_to_collect.is_empty()); + assert!(state.creating_jobs_to_wait.is_empty()); + let wait_commit_timer = self.metrics.barrier_wait_commit_latency.start_timer(); + if !state.finished_table_ids.is_empty() { + assert!(command_ctx.kind.is_checkpoint()); + } + finished_jobs.extend(state.finished_table_ids.into_values().map(|info| { + TrackingJob::New(TrackingCommand { + info, + replace_table_info: None, + }) + })); + + let result = self + .update_snapshot( + &command_ctx, + state.table_ids_to_commit, + state.resps, + backfill_pinned_log_epoch, + ) + .await; - { - if let Err(e) = result { - for notifier in task.notifiers { + let version_stats = match result { + Ok(version_stats) => version_stats, + Err(e) => { + for notifier in notifiers { notifier.notify_collection_failed(e.clone()); } return Err(e); } - task.notifiers.into_iter().for_each(|notifier| { - notifier.notify_collected(); - }); - try_join_all( - task.finished_jobs - .into_iter() - .map(|finished_job| finished_job.finish(&self.metadata_manager)), - ) - .await?; - if let Some((command_ctx, enqueue_time)) = task.command_context { - let duration_sec = enqueue_time.stop_and_record(); - self.report_complete_event(duration_sec, &command_ctx); - self.metrics - .last_committed_barrier_time - .set(command_ctx.curr_epoch.value().as_unix_secs() as i64); + }; + notifiers.into_iter().for_each(|notifier| { + notifier.notify_collected(); + }); + try_join_all(finished_jobs.into_iter().map(|finished_job| { + let metadata_manager = &self.metadata_manager; + async move { finished_job.finish(metadata_manager).await } + })) + .await?; + let duration_sec = enqueue_time.stop_and_record(); + self.report_complete_event(duration_sec, &command_ctx); + wait_commit_timer.observe_duration(); + self.metrics + .last_committed_barrier_time + .set(command_ctx.curr_epoch.value().as_unix_secs() as i64); + Ok(version_stats) + } + + async fn update_snapshot( + &self, + command_ctx: &CommandContext, + tables_to_commit: HashSet, + resps: Vec, + backfill_pinned_log_epoch: HashMap)>, + ) -> MetaResult> { + { + { + match &command_ctx.kind { + BarrierKind::Initial => {} + BarrierKind::Checkpoint(epochs) => { + let commit_info = collect_commit_epoch_info( + resps, + command_ctx, + epochs, + backfill_pinned_log_epoch, + tables_to_commit, + ); + self.hummock_manager.commit_epoch(commit_info).await?; + } + BarrierKind::Barrier => { + // if we collect a barrier(checkpoint = false), + // we need to ensure that command is Plain and the notifier's checkpoint is + // false + assert!(!command_ctx.command.need_checkpoint()); + } + } + + command_ctx.post_collect().await?; + Ok(if command_ctx.kind.is_checkpoint() { + Some(self.hummock_manager.get_version_stats().await) + } else { + None + }) } } - - Ok(self.hummock_manager.get_version_stats().await) } pub fn hummock_manager(&self) -> &HummockManagerRef { @@ -1245,6 +1358,7 @@ impl GlobalBarrierManagerContext { } struct BarrierCompleteOutput { + command_ctx: Arc, require_next_checkpoint: bool, table_ids_to_finish: HashSet, } @@ -1275,42 +1389,27 @@ impl CheckpointControl { .collect() } - fn next_complete_barrier_task(&mut self) -> Option { - let mut task = None; - while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value() - && !state.is_inflight() - { + pub(super) async fn next_completed_barrier( + &mut self, + ) -> MetaResult> { + if matches!(&self.completing_command, CompletingCommand::None) { + // If there is no completing barrier, try to start completing the earliest barrier if + // it has been collected. + if let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value() + && !state.is_inflight() { - let (_, mut node) = self.command_ctx_queue.pop_first().expect("non-empty"); + let (_, node) = self.command_ctx_queue.pop_first().expect("non-empty"); assert!(node.state.creating_jobs_to_wait.is_empty()); - assert!(node.state.node_to_collect.is_empty()); - let mut finished_jobs = self + let table_ids_to_finish = node.state.finished_table_ids.keys().cloned().collect(); + let finished_jobs = self .create_mview_tracker .apply_collected_command(&node, &self.hummock_version_stats); - if !node.command_ctx.kind.is_checkpoint() { - assert!(finished_jobs.is_empty()); - node.notifiers.into_iter().for_each(|notifier| { - notifier.notify_collected(); - }); - continue; - } - let commit_info = collect_commit_epoch_info( - take(&mut node.state.resps), - &node.command_ctx, + let command_ctx = node.command_ctx.clone(); + let join_handle = tokio::spawn(self.context.clone().complete_barrier( + node, + finished_jobs, self.collect_backfill_pinned_upstream_log_epoch(), - ); - let table_ids_to_finish = node - .state - .finished_jobs - .drain() - .map(|(table_id, info)| { - finished_jobs.push(TrackingJob::New(TrackingCommand { - info, - replace_table_info: None, - })); - table_id - }) - .collect(); + )); let require_next_checkpoint = if self.create_mview_tracker.has_pending_finished_jobs() { self.command_ctx_queue @@ -1319,20 +1418,13 @@ impl CheckpointControl { } else { false }; - task = Some(CompleteBarrierTask { - commit_info, - finished_jobs, - notifiers: node.notifiers, - command_context: Some((node.command_ctx, node.enqueue_time)), - table_ids_to_finish, - creating_job_epochs: vec![], + self.completing_command = CompletingCommand::GlobalStreamingGraph { + command_ctx, require_next_checkpoint, - }); - break; - } - } - { - { + join_handle, + table_ids_to_finish, + }; + } else { for (table_id, job) in &mut self.creating_streaming_job_controls { let (upstream_epochs_to_notify, commit_info) = job.start_completing(); for upstream_epoch in upstream_epochs_to_notify { @@ -1349,55 +1441,32 @@ impl CheckpointControl { } } if let Some((epoch, resps, is_first_time)) = commit_info { - let task = task.get_or_insert_default(); - GlobalBarrierManagerContext::collect_creating_job_commit_epoch_info( - &mut task.commit_info, + let tables_to_commit = job + .info + .table_fragments + .all_table_ids() + .map(TableId::new) + .collect(); + let join_handle = + tokio::spawn(self.context.clone().complete_creating_job_barrier( + epoch, + resps, + tables_to_commit, + is_first_time, + )); + self.completing_command = CompletingCommand::CreatingStreamingJob { + table_id: *table_id, epoch, - resps, - job.info.table_fragments.all_table_ids().map(TableId::new), - is_first_time, - ); - task.creating_job_epochs.push((*table_id, epoch)); + join_handle, + }; + break; } } } } - task - } - pub(super) async fn next_completed_barrier(&mut self) -> MetaResult { - // If there is no completing barrier, try to start completing the earliest barrier if - // it has been collected. - if let CompletingCommand::None = &self.completing_command { - if let Some(task) = self.next_complete_barrier_task() { - { - let command_ctx = task - .command_context - .as_ref() - .map(|(command_ctx, _)| command_ctx.clone()); - let table_ids_to_finish = task.table_ids_to_finish.clone(); - let creating_job_epochs = task.creating_job_epochs.clone(); - let require_next_checkpoint = task.require_next_checkpoint; - let join_handle = tokio::spawn(self.context.clone().complete_barrier(task)); - self.completing_command = CompletingCommand::GlobalStreamingGraph { - command_ctx, - join_handle, - table_ids_to_finish, - creating_job_epochs, - require_next_checkpoint, - }; - } - } - } - - let CompletingCommand::GlobalStreamingGraph { join_handle, .. } = - &mut self.completing_command - else { - return pending().await; - }; - - let (table_ids_to_finish, creating_job_epochs, require_next_checkpoint) = { - { + match &mut self.completing_command { + CompletingCommand::GlobalStreamingGraph { join_handle, .. } => { let join_result: MetaResult<_> = try { join_handle .await @@ -1412,19 +1481,42 @@ impl CheckpointControl { }; let completed_command = replace(&mut self.completing_command, next_completing_command_status); - self.hummock_version_stats = join_result?; - - must_match!(completed_command, CompletingCommand::GlobalStreamingGraph { - table_ids_to_finish, - creating_job_epochs, - require_next_checkpoint, - .. - } => (table_ids_to_finish, creating_job_epochs, require_next_checkpoint)) + join_result.map(move | version_stats| { + if let Some(new_version_stats) = version_stats { + self.hummock_version_stats = new_version_stats; + } + must_match!( + completed_command, + CompletingCommand::GlobalStreamingGraph { command_ctx, table_ids_to_finish, require_next_checkpoint, .. } => { + Some(BarrierCompleteOutput { + command_ctx, + require_next_checkpoint, + table_ids_to_finish, + }) + } + ) + }) } - }; - - { - for (table_id, epoch) in creating_job_epochs { + CompletingCommand::CreatingStreamingJob { + table_id, + epoch, + join_handle, + } => { + let table_id = *table_id; + let epoch = *epoch; + let join_result: MetaResult<_> = try { + join_handle + .await + .context("failed to join completing command")?? + }; + // It's important to reset the completing_command after await no matter the result is err + // or not, and otherwise the join handle will be polled again after ready. + let next_completing_command_status = if let Err(e) = &join_result { + CompletingCommand::Err(e.clone()) + } else { + CompletingCommand::None + }; + self.completing_command = next_completing_command_status; if let Some((upstream_epoch, is_finished)) = self .creating_streaming_job_controls .get_mut(&table_id) @@ -1454,17 +1546,14 @@ impl CheckpointControl { .get_mut(&upstream_epoch) .expect("should exist") .state - .finished_jobs + .finished_table_ids .insert(table_id, creating_streaming_job.info) .is_none()); } } + join_result.map(|_| None) } - - Ok(BarrierCompleteOutput { - require_next_checkpoint, - table_ids_to_finish, - }) + CompletingCommand::None | CompletingCommand::Err(_) => pending().await, } } } @@ -1619,7 +1708,9 @@ fn collect_resp_info( fn collect_commit_epoch_info( resps: Vec, command_ctx: &CommandContext, + epochs: &Vec, backfill_pinned_log_epoch: HashMap)>, + tables_to_commit: HashSet, ) -> CommitEpochInfo { let (sst_to_context, synced_ssts, new_table_watermarks, old_value_ssts) = collect_resp_info(resps); @@ -1674,15 +1765,14 @@ fn collect_commit_epoch_info( let table_new_change_log = build_table_change_log_delta( old_value_ssts.into_iter(), synced_ssts.iter().map(|sst| &sst.sst_info), - must_match!(&command_ctx.kind, BarrierKind::Checkpoint(epochs) => epochs), + epochs, mv_log_store_truncate_epoch.into_iter(), ); let epoch = command_ctx.prev_epoch.value().0; - let tables_to_commit = command_ctx - .table_ids_to_commit - .iter() - .map(|table_id| (*table_id, epoch)) + let tables_to_commit = tables_to_commit + .into_iter() + .map(|table_id| (table_id, epoch)) .collect(); CommitEpochInfo { diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index 358e8907e5d5d..3da93d61ab70b 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -58,7 +58,6 @@ pub enum NewTableFragmentInfo { }, } -#[derive(Default)] pub struct CommitEpochInfo { pub sstables: Vec, pub new_table_watermarks: HashMap, diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index 56b60a6535722..87ec960bca1d1 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -17,6 +17,7 @@ use std::ops::{Deref, DerefMut}; use std::sync::Arc; use risingwave_common::catalog::TableId; +use risingwave_common::util::epoch::Epoch; use risingwave_hummock_sdk::change_log::ChangeLogDelta; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::sstable_info::SstableInfo; @@ -158,13 +159,7 @@ impl<'a> HummockVersionTransaction<'a> { .last() .map(|level| level.sub_level_id + 1) }) - .unwrap_or_else(|| { - tables_to_commit - .values() - .cloned() - .max() - .expect("non empty tables_to_commit") - }); + .unwrap_or(Epoch::now().0); let group_deltas = &mut new_version_delta .group_deltas .entry(compaction_group_id) diff --git a/src/meta/src/lib.rs b/src/meta/src/lib.rs index ced439bac63ab..eab9dd1287ebf 100644 --- a/src/meta/src/lib.rs +++ b/src/meta/src/lib.rs @@ -31,7 +31,6 @@ #![feature(const_option)] #![feature(anonymous_lifetime_in_impl_trait)] #![feature(duration_millis_float)] -#![feature(option_get_or_insert_default)] pub mod backup_restore; pub mod barrier; From a734be6d22306e7bd2d18a7cc0ea76447e5df649 Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 10 Oct 2024 15:26:25 +0800 Subject: [PATCH 08/11] Revert "temp revert to 1b359b0cb7 for test timeout" This reverts commit d85167ed26d059e7b13ef7890ee92345c630e1df. --- src/meta/src/barrier/mod.rs | 470 ++++++++----------- src/meta/src/hummock/manager/commit_epoch.rs | 1 + src/meta/src/hummock/manager/transaction.rs | 9 +- src/meta/src/lib.rs | 1 + 4 files changed, 199 insertions(+), 282 deletions(-) diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 15d6600f68033..401e983dabbcb 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::assert_matches::assert_matches; use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap, HashSet}; use std::future::pending; @@ -292,7 +291,6 @@ impl CheckpointControl { notifiers: Vec, node_to_collect: HashSet, jobs_to_wait: HashSet, - table_ids_to_commit: HashSet, ) { let timer = self.context.metrics.barrier_latency.start_timer(); @@ -334,8 +332,7 @@ impl CheckpointControl { node_to_collect, resps: vec![], creating_jobs_to_wait, - finished_table_ids: HashMap::new(), - table_ids_to_commit, + finished_jobs: HashMap::new(), }, command_ctx, notifiers, @@ -401,10 +398,8 @@ impl CheckpointControl { .last_key_value() .map(|(_, x)| &x.command_ctx) .or(match &self.completing_command { - CompletingCommand::None - | CompletingCommand::Err(_) - | CompletingCommand::CreatingStreamingJob { .. } => None, - CompletingCommand::GlobalStreamingGraph { command_ctx, .. } => Some(command_ctx), + CompletingCommand::None | CompletingCommand::Err(_) => None, + CompletingCommand::GlobalStreamingGraph { command_ctx, .. } => command_ctx.as_ref(), }) .map(|command_ctx| command_ctx.command.should_pause_inject_barrier()) .unwrap_or(false); @@ -414,11 +409,9 @@ impl CheckpointControl { .map(|node| &node.command_ctx) .chain( match &self.completing_command { - CompletingCommand::None - | CompletingCommand::Err(_) - | CompletingCommand::CreatingStreamingJob { .. } => None, + CompletingCommand::None | CompletingCommand::Err(_) => None, CompletingCommand::GlobalStreamingGraph { command_ctx, .. } => - Some(command_ctx), + command_ctx.as_ref(), } .into_iter() ) @@ -434,16 +427,8 @@ impl CheckpointControl { // join spawned completing command to finish no matter it succeeds or not. let is_err = match replace(&mut self.completing_command, CompletingCommand::None) { CompletingCommand::None => false, - CompletingCommand::GlobalStreamingGraph { - command_ctx, - join_handle, - .. - } => { - info!( - prev_epoch = ?command_ctx.prev_epoch, - curr_epoch = ?command_ctx.curr_epoch, - "waiting for completing command to finish in recovery" - ); + CompletingCommand::GlobalStreamingGraph { join_handle, .. } => { + info!("waiting for completing command to finish in recovery"); match join_handle.await { Err(e) => { warn!(err = ?e.as_report(), "failed to join completing task"); @@ -457,51 +442,18 @@ impl CheckpointControl { } } CompletingCommand::Err(_) => true, - CompletingCommand::CreatingStreamingJob { join_handle, .. } => { - match join_handle.await { - Err(e) => { - warn!(err = ?e.as_report(), "failed to join completing task"); - true - } - Ok(Err(e)) => { - warn!(err = ?e.as_report(), "failed to complete barrier during clear"); - true - } - Ok(Ok(_)) => false, - } - } }; if !is_err { // continue to finish the pending collected barrier. - while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value() - && !state.is_inflight() - { - let (_, node) = self.command_ctx_queue.pop_first().expect("non-empty"); - let (prev_epoch, curr_epoch) = ( - node.command_ctx.prev_epoch.value().0, - node.command_ctx.curr_epoch.value().0, - ); - let finished_jobs = self - .create_mview_tracker - .apply_collected_command(&node, &self.hummock_version_stats); - if let Err(e) = self - .context - .clone() - .complete_barrier(node, finished_jobs, HashMap::new()) - .await - { + while let Some(task) = self.next_complete_barrier_task() { + if let Err(e) = self.context.clone().complete_barrier(task).await { error!( - prev_epoch, - curr_epoch, err = ?e.as_report(), "failed to complete barrier during recovery" ); break; } else { - info!( - prev_epoch, - curr_epoch, "succeed to complete barrier during recovery" - ) + info!("succeed to complete barrier during recovery") } } } @@ -547,9 +499,7 @@ struct BarrierEpochState { creating_jobs_to_wait: HashMap>, - finished_table_ids: HashMap, - - table_ids_to_commit: HashSet, + finished_jobs: HashMap, } impl BarrierEpochState { @@ -561,19 +511,15 @@ impl BarrierEpochState { enum CompletingCommand { None, GlobalStreamingGraph { - command_ctx: Arc, + command_ctx: Option>, table_ids_to_finish: HashSet, + creating_job_epochs: Vec<(TableId, u64)>, require_next_checkpoint: bool, // The join handle of a spawned task that completes the barrier. // The return value indicate whether there is some create streaming job command // that has finished but not checkpointed. If there is any, we will force checkpoint on the next barrier - join_handle: JoinHandle>>, - }, - CreatingStreamingJob { - table_id: TableId, - epoch: u64, - join_handle: JoinHandle>, + join_handle: JoinHandle>, }, #[expect(dead_code)] Err(MetaError), @@ -872,18 +818,16 @@ impl GlobalBarrierManager { } complete_result = self.checkpoint_control.next_completed_barrier() => { match complete_result { - Ok(Some(output)) => { + Ok(output) => { // If there are remaining commands (that requires checkpoint to finish), we force // the next barrier to be a checkpoint. if output.require_next_checkpoint { - assert_matches!(output.command_ctx.kind, BarrierKind::Barrier); self.scheduled_barriers.force_checkpoint_in_next_barrier(); } self.control_stream_manager.remove_partial_graph( output.table_ids_to_finish.iter().map(|table_id| table_id.table_id).collect() ); } - Ok(None) => {} Err(e) => { self.failure_recovery(e).await; } @@ -1038,7 +982,7 @@ impl GlobalBarrierManager { pre_applied_subscription_info, prev_epoch.clone(), curr_epoch.clone(), - table_ids_to_commit.clone(), + table_ids_to_commit, self.state.paused_reason(), command, kind, @@ -1090,7 +1034,6 @@ impl GlobalBarrierManager { notifiers, node_to_collect, jobs_to_wait, - table_ids_to_commit, ); Ok(()) @@ -1155,143 +1098,87 @@ impl GlobalBarrierManager { } } +#[derive(Default)] +struct CompleteBarrierTask { + commit_info: CommitEpochInfo, + finished_jobs: Vec, + notifiers: Vec, + /// Some((`command_ctx`, `enqueue_time`)) + command_context: Option<(Arc, HistogramTimer)>, + table_ids_to_finish: HashSet, + creating_job_epochs: Vec<(TableId, u64)>, + require_next_checkpoint: bool, +} + impl GlobalBarrierManagerContext { - async fn complete_creating_job_barrier( - self, + fn collect_creating_job_commit_epoch_info( + commit_info: &mut CommitEpochInfo, epoch: u64, resps: Vec, - tables_to_commit: HashSet, + tables_to_commit: impl Iterator, is_first_time: bool, - ) -> MetaResult<()> { + ) { let (sst_to_context, sstables, new_table_watermarks, old_value_sst) = collect_resp_info(resps); assert!(old_value_sst.is_empty()); - let new_table_fragment_infos = if is_first_time { - vec![NewTableFragmentInfo::NewCompactionGroup { - table_ids: tables_to_commit.clone(), - }] - } else { - vec![] - }; - let tables_to_commit = tables_to_commit - .into_iter() - .map(|table_id| (table_id, epoch)) - .collect(); - let info = CommitEpochInfo { - sstables, - new_table_watermarks, - sst_to_context, - new_table_fragment_infos, - change_log_delta: Default::default(), - tables_to_commit, + commit_info.sst_to_context.extend(sst_to_context); + commit_info.sstables.extend(sstables); + commit_info + .new_table_watermarks + .extend(new_table_watermarks); + let tables_to_commit: HashSet<_> = tables_to_commit.collect(); + tables_to_commit.iter().for_each(|table_id| { + commit_info + .tables_to_commit + .try_insert(*table_id, epoch) + .expect("non duplicate"); + }); + if is_first_time { + commit_info + .new_table_fragment_infos + .push(NewTableFragmentInfo::NewCompactionGroup { + table_ids: tables_to_commit, + }); }; - self.hummock_manager.commit_epoch(info).await?; - Ok(()) } - async fn complete_barrier( - self, - node: EpochNode, - mut finished_jobs: Vec, - backfill_pinned_log_epoch: HashMap)>, - ) -> MetaResult> { - tracing::trace!( - prev_epoch = node.command_ctx.prev_epoch.value().0, - kind = ?node.command_ctx.kind, - "complete barrier" - ); - let EpochNode { - command_ctx, - notifiers, - enqueue_time, - state, - .. - } = node; - assert!(state.node_to_collect.is_empty()); - assert!(state.creating_jobs_to_wait.is_empty()); - let wait_commit_timer = self.metrics.barrier_wait_commit_latency.start_timer(); - if !state.finished_table_ids.is_empty() { - assert!(command_ctx.kind.is_checkpoint()); - } - finished_jobs.extend(state.finished_table_ids.into_values().map(|info| { - TrackingJob::New(TrackingCommand { - info, - replace_table_info: None, - }) - })); - - let result = self - .update_snapshot( - &command_ctx, - state.table_ids_to_commit, - state.resps, - backfill_pinned_log_epoch, - ) - .await; - - let version_stats = match result { - Ok(version_stats) => version_stats, - Err(e) => { - for notifier in notifiers { - notifier.notify_collection_failed(e.clone()); - } - return Err(e); + async fn complete_barrier(self, task: CompleteBarrierTask) -> MetaResult { + let result: MetaResult<()> = try { + let wait_commit_timer = self.metrics.barrier_wait_commit_latency.start_timer(); + self.hummock_manager.commit_epoch(task.commit_info).await?; + if let Some((command_ctx, _)) = &task.command_context { + command_ctx.post_collect().await?; } + + wait_commit_timer.observe_duration(); }; - notifiers.into_iter().for_each(|notifier| { - notifier.notify_collected(); - }); - try_join_all(finished_jobs.into_iter().map(|finished_job| { - let metadata_manager = &self.metadata_manager; - async move { finished_job.finish(metadata_manager).await } - })) - .await?; - let duration_sec = enqueue_time.stop_and_record(); - self.report_complete_event(duration_sec, &command_ctx); - wait_commit_timer.observe_duration(); - self.metrics - .last_committed_barrier_time - .set(command_ctx.curr_epoch.value().as_unix_secs() as i64); - Ok(version_stats) - } - async fn update_snapshot( - &self, - command_ctx: &CommandContext, - tables_to_commit: HashSet, - resps: Vec, - backfill_pinned_log_epoch: HashMap)>, - ) -> MetaResult> { { - { - match &command_ctx.kind { - BarrierKind::Initial => {} - BarrierKind::Checkpoint(epochs) => { - let commit_info = collect_commit_epoch_info( - resps, - command_ctx, - epochs, - backfill_pinned_log_epoch, - tables_to_commit, - ); - self.hummock_manager.commit_epoch(commit_info).await?; - } - BarrierKind::Barrier => { - // if we collect a barrier(checkpoint = false), - // we need to ensure that command is Plain and the notifier's checkpoint is - // false - assert!(!command_ctx.command.need_checkpoint()); - } + if let Err(e) = result { + for notifier in task.notifiers { + notifier.notify_collection_failed(e.clone()); } - - command_ctx.post_collect().await?; - Ok(if command_ctx.kind.is_checkpoint() { - Some(self.hummock_manager.get_version_stats().await) - } else { - None - }) + return Err(e); + } + task.notifiers.into_iter().for_each(|notifier| { + notifier.notify_collected(); + }); + try_join_all( + task.finished_jobs + .into_iter() + .map(|finished_job| finished_job.finish(&self.metadata_manager)), + ) + .await?; + if let Some((command_ctx, enqueue_time)) = task.command_context { + let duration_sec = enqueue_time.stop_and_record(); + self.report_complete_event(duration_sec, &command_ctx); + self.metrics + .last_committed_barrier_time + .set(command_ctx.curr_epoch.value().as_unix_secs() as i64); } } + + Ok(self.hummock_manager.get_version_stats().await) } pub fn hummock_manager(&self) -> &HummockManagerRef { @@ -1358,7 +1245,6 @@ impl GlobalBarrierManagerContext { } struct BarrierCompleteOutput { - command_ctx: Arc, require_next_checkpoint: bool, table_ids_to_finish: HashSet, } @@ -1389,27 +1275,42 @@ impl CheckpointControl { .collect() } - pub(super) async fn next_completed_barrier( - &mut self, - ) -> MetaResult> { - if matches!(&self.completing_command, CompletingCommand::None) { - // If there is no completing barrier, try to start completing the earliest barrier if - // it has been collected. - if let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value() - && !state.is_inflight() + fn next_complete_barrier_task(&mut self) -> Option { + let mut task = None; + while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value() + && !state.is_inflight() + { { - let (_, node) = self.command_ctx_queue.pop_first().expect("non-empty"); + let (_, mut node) = self.command_ctx_queue.pop_first().expect("non-empty"); assert!(node.state.creating_jobs_to_wait.is_empty()); - let table_ids_to_finish = node.state.finished_table_ids.keys().cloned().collect(); - let finished_jobs = self + assert!(node.state.node_to_collect.is_empty()); + let mut finished_jobs = self .create_mview_tracker .apply_collected_command(&node, &self.hummock_version_stats); - let command_ctx = node.command_ctx.clone(); - let join_handle = tokio::spawn(self.context.clone().complete_barrier( - node, - finished_jobs, + if !node.command_ctx.kind.is_checkpoint() { + assert!(finished_jobs.is_empty()); + node.notifiers.into_iter().for_each(|notifier| { + notifier.notify_collected(); + }); + continue; + } + let commit_info = collect_commit_epoch_info( + take(&mut node.state.resps), + &node.command_ctx, self.collect_backfill_pinned_upstream_log_epoch(), - )); + ); + let table_ids_to_finish = node + .state + .finished_jobs + .drain() + .map(|(table_id, info)| { + finished_jobs.push(TrackingJob::New(TrackingCommand { + info, + replace_table_info: None, + })); + table_id + }) + .collect(); let require_next_checkpoint = if self.create_mview_tracker.has_pending_finished_jobs() { self.command_ctx_queue @@ -1418,13 +1319,20 @@ impl CheckpointControl { } else { false }; - self.completing_command = CompletingCommand::GlobalStreamingGraph { - command_ctx, - require_next_checkpoint, - join_handle, + task = Some(CompleteBarrierTask { + commit_info, + finished_jobs, + notifiers: node.notifiers, + command_context: Some((node.command_ctx, node.enqueue_time)), table_ids_to_finish, - }; - } else { + creating_job_epochs: vec![], + require_next_checkpoint, + }); + break; + } + } + { + { for (table_id, job) in &mut self.creating_streaming_job_controls { let (upstream_epochs_to_notify, commit_info) = job.start_completing(); for upstream_epoch in upstream_epochs_to_notify { @@ -1441,32 +1349,55 @@ impl CheckpointControl { } } if let Some((epoch, resps, is_first_time)) = commit_info { - let tables_to_commit = job - .info - .table_fragments - .all_table_ids() - .map(TableId::new) - .collect(); - let join_handle = - tokio::spawn(self.context.clone().complete_creating_job_barrier( - epoch, - resps, - tables_to_commit, - is_first_time, - )); - self.completing_command = CompletingCommand::CreatingStreamingJob { - table_id: *table_id, + let task = task.get_or_insert_default(); + GlobalBarrierManagerContext::collect_creating_job_commit_epoch_info( + &mut task.commit_info, epoch, - join_handle, - }; - break; + resps, + job.info.table_fragments.all_table_ids().map(TableId::new), + is_first_time, + ); + task.creating_job_epochs.push((*table_id, epoch)); } } } } + task + } - match &mut self.completing_command { - CompletingCommand::GlobalStreamingGraph { join_handle, .. } => { + pub(super) async fn next_completed_barrier(&mut self) -> MetaResult { + // If there is no completing barrier, try to start completing the earliest barrier if + // it has been collected. + if let CompletingCommand::None = &self.completing_command { + if let Some(task) = self.next_complete_barrier_task() { + { + let command_ctx = task + .command_context + .as_ref() + .map(|(command_ctx, _)| command_ctx.clone()); + let table_ids_to_finish = task.table_ids_to_finish.clone(); + let creating_job_epochs = task.creating_job_epochs.clone(); + let require_next_checkpoint = task.require_next_checkpoint; + let join_handle = tokio::spawn(self.context.clone().complete_barrier(task)); + self.completing_command = CompletingCommand::GlobalStreamingGraph { + command_ctx, + join_handle, + table_ids_to_finish, + creating_job_epochs, + require_next_checkpoint, + }; + } + } + } + + let CompletingCommand::GlobalStreamingGraph { join_handle, .. } = + &mut self.completing_command + else { + return pending().await; + }; + + let (table_ids_to_finish, creating_job_epochs, require_next_checkpoint) = { + { let join_result: MetaResult<_> = try { join_handle .await @@ -1481,42 +1412,19 @@ impl CheckpointControl { }; let completed_command = replace(&mut self.completing_command, next_completing_command_status); - join_result.map(move | version_stats| { - if let Some(new_version_stats) = version_stats { - self.hummock_version_stats = new_version_stats; - } - must_match!( - completed_command, - CompletingCommand::GlobalStreamingGraph { command_ctx, table_ids_to_finish, require_next_checkpoint, .. } => { - Some(BarrierCompleteOutput { - command_ctx, - require_next_checkpoint, - table_ids_to_finish, - }) - } - ) - }) + self.hummock_version_stats = join_result?; + + must_match!(completed_command, CompletingCommand::GlobalStreamingGraph { + table_ids_to_finish, + creating_job_epochs, + require_next_checkpoint, + .. + } => (table_ids_to_finish, creating_job_epochs, require_next_checkpoint)) } - CompletingCommand::CreatingStreamingJob { - table_id, - epoch, - join_handle, - } => { - let table_id = *table_id; - let epoch = *epoch; - let join_result: MetaResult<_> = try { - join_handle - .await - .context("failed to join completing command")?? - }; - // It's important to reset the completing_command after await no matter the result is err - // or not, and otherwise the join handle will be polled again after ready. - let next_completing_command_status = if let Err(e) = &join_result { - CompletingCommand::Err(e.clone()) - } else { - CompletingCommand::None - }; - self.completing_command = next_completing_command_status; + }; + + { + for (table_id, epoch) in creating_job_epochs { if let Some((upstream_epoch, is_finished)) = self .creating_streaming_job_controls .get_mut(&table_id) @@ -1546,14 +1454,17 @@ impl CheckpointControl { .get_mut(&upstream_epoch) .expect("should exist") .state - .finished_table_ids + .finished_jobs .insert(table_id, creating_streaming_job.info) .is_none()); } } - join_result.map(|_| None) } - CompletingCommand::None | CompletingCommand::Err(_) => pending().await, + + Ok(BarrierCompleteOutput { + require_next_checkpoint, + table_ids_to_finish, + }) } } } @@ -1708,9 +1619,7 @@ fn collect_resp_info( fn collect_commit_epoch_info( resps: Vec, command_ctx: &CommandContext, - epochs: &Vec, backfill_pinned_log_epoch: HashMap)>, - tables_to_commit: HashSet, ) -> CommitEpochInfo { let (sst_to_context, synced_ssts, new_table_watermarks, old_value_ssts) = collect_resp_info(resps); @@ -1765,14 +1674,15 @@ fn collect_commit_epoch_info( let table_new_change_log = build_table_change_log_delta( old_value_ssts.into_iter(), synced_ssts.iter().map(|sst| &sst.sst_info), - epochs, + must_match!(&command_ctx.kind, BarrierKind::Checkpoint(epochs) => epochs), mv_log_store_truncate_epoch.into_iter(), ); let epoch = command_ctx.prev_epoch.value().0; - let tables_to_commit = tables_to_commit - .into_iter() - .map(|table_id| (table_id, epoch)) + let tables_to_commit = command_ctx + .table_ids_to_commit + .iter() + .map(|table_id| (*table_id, epoch)) .collect(); CommitEpochInfo { diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index 3da93d61ab70b..358e8907e5d5d 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -58,6 +58,7 @@ pub enum NewTableFragmentInfo { }, } +#[derive(Default)] pub struct CommitEpochInfo { pub sstables: Vec, pub new_table_watermarks: HashMap, diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index 87ec960bca1d1..56b60a6535722 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -17,7 +17,6 @@ use std::ops::{Deref, DerefMut}; use std::sync::Arc; use risingwave_common::catalog::TableId; -use risingwave_common::util::epoch::Epoch; use risingwave_hummock_sdk::change_log::ChangeLogDelta; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::sstable_info::SstableInfo; @@ -159,7 +158,13 @@ impl<'a> HummockVersionTransaction<'a> { .last() .map(|level| level.sub_level_id + 1) }) - .unwrap_or(Epoch::now().0); + .unwrap_or_else(|| { + tables_to_commit + .values() + .cloned() + .max() + .expect("non empty tables_to_commit") + }); let group_deltas = &mut new_version_delta .group_deltas .entry(compaction_group_id) diff --git a/src/meta/src/lib.rs b/src/meta/src/lib.rs index eab9dd1287ebf..ced439bac63ab 100644 --- a/src/meta/src/lib.rs +++ b/src/meta/src/lib.rs @@ -31,6 +31,7 @@ #![feature(const_option)] #![feature(anonymous_lifetime_in_impl_trait)] #![feature(duration_millis_float)] +#![feature(option_get_or_insert_default)] pub mod backup_restore; pub mod barrier; From 9b7a63ab737c8d534c054ec39b07495ad0418f0b Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 10 Oct 2024 15:51:16 +0800 Subject: [PATCH 09/11] force checkpoint in normal barrier --- src/meta/src/barrier/mod.rs | 57 ++++++++++----------- src/meta/src/hummock/manager/transaction.rs | 17 +++--- 2 files changed, 38 insertions(+), 36 deletions(-) diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 401e983dabbcb..4d803fd7af367 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -14,7 +14,7 @@ use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap, HashSet}; -use std::future::pending; +use std::future::{pending, Future}; use std::mem::{replace, take}; use std::sync::Arc; use std::time::Duration; @@ -54,6 +54,7 @@ use crate::barrier::creating_job::CreatingStreamingJobControl; use crate::barrier::info::InflightGraphInfo; use crate::barrier::progress::{CreateMviewProgressTracker, TrackingCommand, TrackingJob}; use crate::barrier::rpc::{merge_node_rpc_errors, ControlStreamManager}; +use crate::barrier::schedule::ScheduledBarriers; use crate::barrier::state::BarrierManagerState; use crate::error::MetaErrorInner; use crate::hummock::{CommitEpochInfo, HummockManagerRef, NewTableFragmentInfo}; @@ -445,7 +446,7 @@ impl CheckpointControl { }; if !is_err { // continue to finish the pending collected barrier. - while let Some(task) = self.next_complete_barrier_task() { + while let Some(task) = self.next_complete_barrier_task(None) { if let Err(e) = self.context.clone().complete_barrier(task).await { error!( err = ?e.as_report(), @@ -514,7 +515,6 @@ enum CompletingCommand { command_ctx: Option>, table_ids_to_finish: HashSet, creating_job_epochs: Vec<(TableId, u64)>, - require_next_checkpoint: bool, // The join handle of a spawned task that completes the barrier. // The return value indicate whether there is some create streaming job command @@ -816,14 +816,9 @@ impl GlobalBarrierManager { } } } - complete_result = self.checkpoint_control.next_completed_barrier() => { + complete_result = self.checkpoint_control.next_completed_barrier(&mut self.scheduled_barriers) => { match complete_result { Ok(output) => { - // If there are remaining commands (that requires checkpoint to finish), we force - // the next barrier to be a checkpoint. - if output.require_next_checkpoint { - self.scheduled_barriers.force_checkpoint_in_next_barrier(); - } self.control_stream_manager.remove_partial_graph( output.table_ids_to_finish.iter().map(|table_id| table_id.table_id).collect() ); @@ -1107,7 +1102,6 @@ struct CompleteBarrierTask { command_context: Option<(Arc, HistogramTimer)>, table_ids_to_finish: HashSet, creating_job_epochs: Vec<(TableId, u64)>, - require_next_checkpoint: bool, } impl GlobalBarrierManagerContext { @@ -1245,7 +1239,6 @@ impl GlobalBarrierManagerContext { } struct BarrierCompleteOutput { - require_next_checkpoint: bool, table_ids_to_finish: HashSet, } @@ -1275,7 +1268,10 @@ impl CheckpointControl { .collect() } - fn next_complete_barrier_task(&mut self) -> Option { + fn next_complete_barrier_task( + &mut self, + mut scheduled_barriers: Option<&mut ScheduledBarriers>, + ) -> Option { let mut task = None; while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value() && !state.is_inflight() @@ -1292,6 +1288,15 @@ impl CheckpointControl { node.notifiers.into_iter().for_each(|notifier| { notifier.notify_collected(); }); + if let Some(scheduled_barriers) = &mut scheduled_barriers + && self.create_mview_tracker.has_pending_finished_jobs() + && self + .command_ctx_queue + .values() + .all(|node| !node.command_ctx.kind.is_checkpoint()) + { + scheduled_barriers.force_checkpoint_in_next_barrier(); + } continue; } let commit_info = collect_commit_epoch_info( @@ -1311,14 +1316,6 @@ impl CheckpointControl { table_id }) .collect(); - let require_next_checkpoint = - if self.create_mview_tracker.has_pending_finished_jobs() { - self.command_ctx_queue - .values() - .all(|node| !node.command_ctx.kind.is_checkpoint()) - } else { - false - }; task = Some(CompleteBarrierTask { commit_info, finished_jobs, @@ -1326,7 +1323,6 @@ impl CheckpointControl { command_context: Some((node.command_ctx, node.enqueue_time)), table_ids_to_finish, creating_job_epochs: vec![], - require_next_checkpoint, }); break; } @@ -1365,11 +1361,14 @@ impl CheckpointControl { task } - pub(super) async fn next_completed_barrier(&mut self) -> MetaResult { + pub(super) fn next_completed_barrier<'a>( + &'a mut self, + scheduled_barriers: &mut ScheduledBarriers, + ) -> impl Future> + 'a { // If there is no completing barrier, try to start completing the earliest barrier if // it has been collected. if let CompletingCommand::None = &self.completing_command { - if let Some(task) = self.next_complete_barrier_task() { + if let Some(task) = self.next_complete_barrier_task(Some(scheduled_barriers)) { { let command_ctx = task .command_context @@ -1377,26 +1376,28 @@ impl CheckpointControl { .map(|(command_ctx, _)| command_ctx.clone()); let table_ids_to_finish = task.table_ids_to_finish.clone(); let creating_job_epochs = task.creating_job_epochs.clone(); - let require_next_checkpoint = task.require_next_checkpoint; let join_handle = tokio::spawn(self.context.clone().complete_barrier(task)); self.completing_command = CompletingCommand::GlobalStreamingGraph { command_ctx, join_handle, table_ids_to_finish, creating_job_epochs, - require_next_checkpoint, }; } } } + self.next_completed_barrier_inner() + } + + async fn next_completed_barrier_inner(&mut self) -> MetaResult { let CompletingCommand::GlobalStreamingGraph { join_handle, .. } = &mut self.completing_command else { return pending().await; }; - let (table_ids_to_finish, creating_job_epochs, require_next_checkpoint) = { + let (table_ids_to_finish, creating_job_epochs) = { { let join_result: MetaResult<_> = try { join_handle @@ -1417,9 +1418,8 @@ impl CheckpointControl { must_match!(completed_command, CompletingCommand::GlobalStreamingGraph { table_ids_to_finish, creating_job_epochs, - require_next_checkpoint, .. - } => (table_ids_to_finish, creating_job_epochs, require_next_checkpoint)) + } => (table_ids_to_finish, creating_job_epochs)) } }; @@ -1462,7 +1462,6 @@ impl CheckpointControl { } Ok(BarrierCompleteOutput { - require_next_checkpoint, table_ids_to_finish, }) } diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index 56b60a6535722..5e4bf772b6454 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cell::LazyCell; use std::collections::{BTreeMap, HashMap}; use std::ops::{Deref, DerefMut}; use std::sync::Arc; @@ -145,6 +146,14 @@ impl<'a> HummockVersionTransaction<'a> { } } + let max_epoch_to_commit = LazyCell::new(|| { + tables_to_commit + .values() + .cloned() + .max() + .expect("non empty tables_to_commit") + }); + // Append SSTs to a new version. for (compaction_group_id, inserted_table_infos) in commit_sstables { let l0_sub_level_id = new_version_delta @@ -158,13 +167,7 @@ impl<'a> HummockVersionTransaction<'a> { .last() .map(|level| level.sub_level_id + 1) }) - .unwrap_or_else(|| { - tables_to_commit - .values() - .cloned() - .max() - .expect("non empty tables_to_commit") - }); + .unwrap_or_else(|| *max_epoch_to_commit); let group_deltas = &mut new_version_delta .group_deltas .entry(compaction_group_id) From 1e82a19697f3f0e9397fade546fd935de70db013 Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 10 Oct 2024 17:50:17 +0800 Subject: [PATCH 10/11] rename --- src/meta/src/barrier/mod.rs | 49 +++++++++++++++++-------------------- 1 file changed, 23 insertions(+), 26 deletions(-) diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 327c1f99e2f5f..038aef4a550a9 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -218,7 +218,7 @@ struct CheckpointControl { /// Command that has been collected but is still completing. /// The join handle of the completing future is stored. - completing_command: CompletingCommand, + completing_task: CompletingTask, hummock_version_stats: HummockVersionStats, @@ -235,7 +235,7 @@ impl CheckpointControl { Self { command_ctx_queue: Default::default(), creating_streaming_job_controls: Default::default(), - completing_command: CompletingCommand::None, + completing_task: CompletingTask::None, hummock_version_stats: context.hummock_manager.get_version_stats().await, create_mview_tracker, context, @@ -244,8 +244,8 @@ impl CheckpointControl { fn total_command_num(&self) -> usize { self.command_ctx_queue.len() - + match &self.completing_command { - CompletingCommand::GlobalStreamingGraph { .. } => 1, + + match &self.completing_task { + CompletingTask::Completing { .. } => 1, _ => 0, } } @@ -398,9 +398,9 @@ impl CheckpointControl { .command_ctx_queue .last_key_value() .map(|(_, x)| &x.command_ctx) - .or(match &self.completing_command { - CompletingCommand::None | CompletingCommand::Err(_) => None, - CompletingCommand::GlobalStreamingGraph { command_ctx, .. } => command_ctx.as_ref(), + .or(match &self.completing_task { + CompletingTask::None | CompletingTask::Err(_) => None, + CompletingTask::Completing { command_ctx, .. } => command_ctx.as_ref(), }) .map(|command_ctx| command_ctx.command.should_pause_inject_barrier()) .unwrap_or(false); @@ -409,10 +409,9 @@ impl CheckpointControl { .values() .map(|node| &node.command_ctx) .chain( - match &self.completing_command { - CompletingCommand::None | CompletingCommand::Err(_) => None, - CompletingCommand::GlobalStreamingGraph { command_ctx, .. } => - command_ctx.as_ref(), + match &self.completing_task { + CompletingTask::None | CompletingTask::Err(_) => None, + CompletingTask::Completing { command_ctx, .. } => command_ctx.as_ref(), } .into_iter() ) @@ -426,9 +425,9 @@ impl CheckpointControl { /// We need to make sure there are no changes when doing recovery pub async fn clear_on_err(&mut self, err: &MetaError) { // join spawned completing command to finish no matter it succeeds or not. - let is_err = match replace(&mut self.completing_command, CompletingCommand::None) { - CompletingCommand::None => false, - CompletingCommand::GlobalStreamingGraph { join_handle, .. } => { + let is_err = match replace(&mut self.completing_task, CompletingTask::None) { + CompletingTask::None => false, + CompletingTask::Completing { join_handle, .. } => { info!("waiting for completing command to finish in recovery"); match join_handle.await { Err(e) => { @@ -442,7 +441,7 @@ impl CheckpointControl { Ok(Ok(_)) => false, } } - CompletingCommand::Err(_) => true, + CompletingTask::Err(_) => true, }; if !is_err { // continue to finish the pending collected barrier. @@ -509,9 +508,9 @@ impl BarrierEpochState { } } -enum CompletingCommand { +enum CompletingTask { None, - GlobalStreamingGraph { + Completing { command_ctx: Option>, table_ids_to_finish: HashSet, creating_job_epochs: Vec<(TableId, u64)>, @@ -1369,7 +1368,7 @@ impl CheckpointControl { ) -> impl Future> + 'a { // If there is no completing barrier, try to start completing the earliest barrier if // it has been collected. - if let CompletingCommand::None = &self.completing_command { + if let CompletingTask::None = &self.completing_task { if let Some(task) = self.next_complete_barrier_task(Some(scheduled_barriers)) { { let command_ctx = task @@ -1379,7 +1378,7 @@ impl CheckpointControl { let table_ids_to_finish = task.table_ids_to_finish.clone(); let creating_job_epochs = task.creating_job_epochs.clone(); let join_handle = tokio::spawn(self.context.clone().complete_barrier(task)); - self.completing_command = CompletingCommand::GlobalStreamingGraph { + self.completing_task = CompletingTask::Completing { command_ctx, join_handle, table_ids_to_finish, @@ -1393,9 +1392,7 @@ impl CheckpointControl { } async fn next_completed_barrier_inner(&mut self) -> MetaResult { - let CompletingCommand::GlobalStreamingGraph { join_handle, .. } = - &mut self.completing_command - else { + let CompletingTask::Completing { join_handle, .. } = &mut self.completing_task else { return pending().await; }; @@ -1409,15 +1406,15 @@ impl CheckpointControl { // It's important to reset the completing_command after await no matter the result is err // or not, and otherwise the join handle will be polled again after ready. let next_completing_command_status = if let Err(e) = &join_result { - CompletingCommand::Err(e.clone()) + CompletingTask::Err(e.clone()) } else { - CompletingCommand::None + CompletingTask::None }; let completed_command = - replace(&mut self.completing_command, next_completing_command_status); + replace(&mut self.completing_task, next_completing_command_status); self.hummock_version_stats = join_result?; - must_match!(completed_command, CompletingCommand::GlobalStreamingGraph { + must_match!(completed_command, CompletingTask::Completing { table_ids_to_finish, creating_job_epochs, .. From af34fd8ce8761dd6f107ea3e28c919b627481d21 Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 14 Oct 2024 15:04:28 +0800 Subject: [PATCH 11/11] fix --- src/meta/src/barrier/mod.rs | 5 ++++- src/meta/src/hummock/manager/transaction.rs | 9 --------- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 038aef4a550a9..27b83fcd87dfe 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -245,7 +245,10 @@ impl CheckpointControl { fn total_command_num(&self) -> usize { self.command_ctx_queue.len() + match &self.completing_task { - CompletingTask::Completing { .. } => 1, + CompletingTask::Completing { + command_ctx: Some(_), + .. + } => 1, _ => 0, } } diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index 5c2fc529dd754..a8d3645d29037 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::cell::LazyCell; use std::collections::{BTreeMap, HashMap}; use std::ops::{Deref, DerefMut}; use std::sync::Arc; @@ -146,14 +145,6 @@ impl<'a> HummockVersionTransaction<'a> { } } - let max_epoch_to_commit = LazyCell::new(|| { - tables_to_commit - .values() - .cloned() - .max() - .expect("non empty tables_to_commit") - }); - // Append SSTs to a new version. for (compaction_group_id, inserted_table_infos) in commit_sstables { let group_deltas = &mut new_version_delta