diff --git a/crates/sui-core/src/authority/authority_per_epoch_store.rs b/crates/sui-core/src/authority/authority_per_epoch_store.rs index f475fb3fc4068..84e4a32ff5a52 100644 --- a/crates/sui-core/src/authority/authority_per_epoch_store.rs +++ b/crates/sui-core/src/authority/authority_per_epoch_store.rs @@ -4270,6 +4270,8 @@ impl AuthorityPerEpochStore { } mod quarantine { + use mysten_common::fatal; + use super::*; /// ConsensusOutputQuarantine holds outputs of consensus processing in memory until the checkpoints @@ -4287,9 +4289,13 @@ mod quarantine { builder_digest_to_checkpoint: HashMap, - // Any un-committed next versions are stored here. A ref-count is used to - // track which objects still exist in some element of output_queue. - shared_object_next_versions: HashMap, + // Any un-committed next versions are stored here. + shared_object_next_versions: RefCountedHashMap, + + // The most recent congestion control debts for objects. Uses a ref-count to track + // which objects still exist in some element of output_queue. + congestion_control_randomness_object_debts: RefCountedHashMap, + congestion_control_object_debts: RefCountedHashMap, } impl ConsensusOutputQuarantine { @@ -4300,8 +4306,9 @@ mod quarantine { output_queue: VecDeque::new(), builder_checkpoint_summary: BTreeMap::new(), builder_digest_to_checkpoint: HashMap::new(), - //next_checkpoint_sequence_to_commit: None, - shared_object_next_versions: HashMap::new(), + shared_object_next_versions: Default::default(), + congestion_control_randomness_object_debts: Default::default(), + congestion_control_object_debts: Default::default(), } } } @@ -4312,6 +4319,7 @@ mod quarantine { // Push all data gathered from a consensus commit into the quarantine. pub(super) fn push_consensus_output(&mut self, output: ConsensusCommitOutput) { self.insert_shared_object_next_versions(&output); + self.insert_congestion_control_debts(&output); self.output_queue.push_back(output); } @@ -4502,7 +4510,7 @@ mod quarantine { let mut fallback_indices = Vec::with_capacity(object_ids.len()); for (i, object_id) in object_ids.iter().enumerate() { - if let Some((_, next_version)) = self.shared_object_next_versions.get(object_id) { + if let Some(next_version) = self.shared_object_next_versions.get(object_id) { results.push(Some(*next_version)); } else { results.push(None); @@ -4522,45 +4530,36 @@ mod quarantine { Ok(results) } - pub(super) fn insert_shared_object_next_versions( - &mut self, - output: &ConsensusCommitOutput, - ) { + fn insert_shared_object_next_versions(&mut self, output: &ConsensusCommitOutput) { if let Some(next_versions) = output.next_shared_object_versions.as_ref() { for (object_id, next_version) in next_versions { - let entry = self.shared_object_next_versions.entry(*object_id); - match entry { - hash_map::Entry::Occupied(mut entry) => { - let (ref_count, v) = entry.get_mut(); - *ref_count += 1; - *v = *next_version; - } - hash_map::Entry::Vacant(entry) => { - entry.insert((1, *next_version)); - } - } + self.shared_object_next_versions + .insert(*object_id, *next_version); } } } + fn insert_congestion_control_debts(&mut self, output: &ConsensusCommitOutput) { + for (object_id, debt) in output.congestion_control_object_debts.iter() { + self.congestion_control_object_debts.insert(*object_id, *debt); + } + + for (object_id, debt) in output.congestion_control_randomness_object_debts.iter() { + self.congestion_control_randomness_object_debts.insert(*object_id, *debt); + } + } + pub(super) fn remove_shared_object_next_versions( &mut self, output: &ConsensusCommitOutput, ) { if let Some(next_versions) = output.next_shared_object_versions.as_ref() { for object_id in next_versions.keys() { - let entry = self.shared_object_next_versions.entry(*object_id); - match entry { - hash_map::Entry::Occupied(mut entry) => { - let (ref_count, _) = entry.get_mut(); - *ref_count -= 1; - if *ref_count == 0 { - entry.remove(); - } - } - hash_map::Entry::Vacant(_) => { - panic!("Shared object next version not found"); - } + if !self.shared_object_next_versions.remove(object_id) { + fatal!( + "Shared object next version not found in quarantine: {:?}", + object_id + ); } } } @@ -4657,6 +4656,92 @@ mod quarantine { .filter_map(|output| output.get_randomness_last_round_timestamp()) .next() } + + pub(super) fn load_initial_object_debts( + &self, + epoch_store: &AuthorityPerEpochStore, + current_round: Round, + for_randomness: bool, + per_commit_budget: u64, + transactions: &[VerifiedSequencedConsensusTransaction], + ) -> SuiResult> { + let tables = epoch_store.tables()?; + let (hash_table, db_table) = if for_randomness { + (&self.congestion_control_randomness_object_debts, &tables.congestion_control_randomness_object_debts) + } else { + (&self.congestion_control_object_debts, &tables.congestion_control_object_debts) + }; + let shared_input_object_ids: BTreeSet<_> = transactions + .iter() + .filter_map(|tx| { + if let SequencedConsensusTransactionKind::External(ConsensusTransaction { + kind: ConsensusTransactionKind::CertifiedTransaction(tx), + .. + }) = &tx.0.transaction + { + Some(tx.shared_input_objects().map(|obj| obj.id)) + } else { + None + } + }) + .flatten() + .collect(); + + let num_ids = shared_input_object_ids.len(); + let mut results = Vec::with_capacity(num_ids); + let mut fallback_keys = Vec::with_capacity(num_ids); + let mut fallback_indices = Vec::with_capacity(num_ids); + + for (i, object_id) in shared_input_object_ids.iter().enumerate() { + if let Some(debt) = hash_table.get(object_id) { + results.push(Some(*debt)); + } else { + results.push(None); + fallback_keys.push(object_id); + fallback_indices.push(i); + } + } + + let fallback_results = db_table.multi_get(fallback_keys)?; + assert_eq!(fallback_results.len(), fallback_indices.len()); + for (i, result) in fallback_indices.into_iter().zip(fallback_results) { + results[i] = result; + } + + Ok(results.into_iter().zip(shared_input_object_ids.into_iter()) + .map(|(debt, object_id)| { + let (round, debt) = debt.into_v1(); + ( + object_id, + // Stored debts already account for the budget of the round in which + // they were accumulated. Application of budget from future rounds to + // the debt is handled here. + debt.saturating_sub(per_commit_budget * (current_round - round - 1)), + ) + })) + + ) + + Ok(results) + + + + Ok(hash_table + .multi_get(shared_input_object_ids.iter())? + .into_iter() + .flatten() + .zip(shared_input_object_ids) + .map(move |(debt, object_id)| { + let (round, debt) = debt.into_v1(); + ( + object_id, + // Stored debts already account for the budget of the round in which + // they were accumulated. Application of budget from future rounds to + // the debt is handled here. + debt.saturating_sub(per_commit_budget * (current_round - round - 1)), + ) + })) + } } } @@ -5021,3 +5106,66 @@ impl From for LockDetailsWrapper { LockDetailsWrapper::V1(details) } } + +// A wrapper around HashMap that uses refcounts to keep entries alive until +// they are no longer needed. +// +// If there are N inserts for the same key, the key will not be removed until +// there are N removes. +// +// It is intended to track the *latest* value for a given key, so duplicate +// inserts are intended to overwrite any prior value. +#[derive(Debug, Default)] +struct RefCountedHashMap { + map: HashMap, +} + +impl RefCountedHashMap +where + K: Copy + Eq + std::hash::Hash, +{ + pub fn new() -> Self { + Self { + map: HashMap::new(), + } + } + + pub fn insert(&mut self, key: K, value: V) { + let entry = self.map.entry(key); + match entry { + hash_map::Entry::Occupied(mut entry) => { + let (ref_count, v) = entry.get_mut(); + *ref_count += 1; + *v = value; + } + hash_map::Entry::Vacant(entry) => { + entry.insert((1, value)); + } + } + } + + // Returns true if the key was present, false otherwise. + // Note that the key may not be removed if present, as it may have a refcount > 1. + pub fn remove(&mut self, key: &K) -> bool { + let entry = self.map.entry(*key); + match entry { + hash_map::Entry::Occupied(mut entry) => { + let (ref_count, _) = entry.get_mut(); + *ref_count -= 1; + if *ref_count == 0 { + entry.remove(); + } + true + } + hash_map::Entry::Vacant(_) => false, + } + } + + pub fn get(&self, key: &K) -> Option<&V> { + self.map.get(key).map(|(_, v)| v) + } + + pub fn iter(&self) -> impl Iterator { + self.map.iter().map(|(k, (_, v))| (k, v)) + } +}