Skip to content

Commit

Permalink
WIP RefCountedHashMap
Browse files Browse the repository at this point in the history
  • Loading branch information
mystenmark committed Nov 7, 2024
1 parent 5ebc5a7 commit 629c17b
Showing 1 changed file with 181 additions and 33 deletions.
214 changes: 181 additions & 33 deletions crates/sui-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4270,6 +4270,8 @@ impl AuthorityPerEpochStore {
}

mod quarantine {
use mysten_common::fatal;

use super::*;

/// ConsensusOutputQuarantine holds outputs of consensus processing in memory until the checkpoints
Expand All @@ -4287,9 +4289,13 @@ mod quarantine {

builder_digest_to_checkpoint: HashMap<TransactionDigest, CheckpointSequenceNumber>,

// Any un-committed next versions are stored here. A ref-count is used to
// track which objects still exist in some element of output_queue.
shared_object_next_versions: HashMap<ObjectID, (usize, SequenceNumber)>,
// Any un-committed next versions are stored here.
shared_object_next_versions: RefCountedHashMap<ObjectID, SequenceNumber>,

// The most recent congestion control debts for objects. Uses a ref-count to track
// which objects still exist in some element of output_queue.
congestion_control_randomness_object_debts: RefCountedHashMap<ObjectID, u64>,
congestion_control_object_debts: RefCountedHashMap<ObjectID, u64>,
}

impl ConsensusOutputQuarantine {
Expand All @@ -4300,8 +4306,9 @@ mod quarantine {
output_queue: VecDeque::new(),
builder_checkpoint_summary: BTreeMap::new(),
builder_digest_to_checkpoint: HashMap::new(),
//next_checkpoint_sequence_to_commit: None,
shared_object_next_versions: HashMap::new(),
shared_object_next_versions: Default::default(),
congestion_control_randomness_object_debts: Default::default(),
congestion_control_object_debts: Default::default(),
}
}
}
Expand All @@ -4312,6 +4319,7 @@ mod quarantine {
// Push all data gathered from a consensus commit into the quarantine.
pub(super) fn push_consensus_output(&mut self, output: ConsensusCommitOutput) {
self.insert_shared_object_next_versions(&output);
self.insert_congestion_control_debts(&output);
self.output_queue.push_back(output);
}

Expand Down Expand Up @@ -4502,7 +4510,7 @@ mod quarantine {
let mut fallback_indices = Vec::with_capacity(object_ids.len());

for (i, object_id) in object_ids.iter().enumerate() {
if let Some((_, next_version)) = self.shared_object_next_versions.get(object_id) {
if let Some(next_version) = self.shared_object_next_versions.get(object_id) {
results.push(Some(*next_version));
} else {
results.push(None);
Expand All @@ -4522,45 +4530,36 @@ mod quarantine {
Ok(results)
}

pub(super) fn insert_shared_object_next_versions(
&mut self,
output: &ConsensusCommitOutput,
) {
fn insert_shared_object_next_versions(&mut self, output: &ConsensusCommitOutput) {
if let Some(next_versions) = output.next_shared_object_versions.as_ref() {
for (object_id, next_version) in next_versions {
let entry = self.shared_object_next_versions.entry(*object_id);
match entry {
hash_map::Entry::Occupied(mut entry) => {
let (ref_count, v) = entry.get_mut();
*ref_count += 1;
*v = *next_version;
}
hash_map::Entry::Vacant(entry) => {
entry.insert((1, *next_version));
}
}
self.shared_object_next_versions
.insert(*object_id, *next_version);
}
}
}

fn insert_congestion_control_debts(&mut self, output: &ConsensusCommitOutput) {
for (object_id, debt) in output.congestion_control_object_debts.iter() {
self.congestion_control_object_debts.insert(*object_id, *debt);
}

for (object_id, debt) in output.congestion_control_randomness_object_debts.iter() {
self.congestion_control_randomness_object_debts.insert(*object_id, *debt);
}
}

pub(super) fn remove_shared_object_next_versions(
&mut self,
output: &ConsensusCommitOutput,
) {
if let Some(next_versions) = output.next_shared_object_versions.as_ref() {
for object_id in next_versions.keys() {
let entry = self.shared_object_next_versions.entry(*object_id);
match entry {
hash_map::Entry::Occupied(mut entry) => {
let (ref_count, _) = entry.get_mut();
*ref_count -= 1;
if *ref_count == 0 {
entry.remove();
}
}
hash_map::Entry::Vacant(_) => {
panic!("Shared object next version not found");
}
if !self.shared_object_next_versions.remove(object_id) {
fatal!(
"Shared object next version not found in quarantine: {:?}",
object_id
);
}
}
}
Expand Down Expand Up @@ -4657,6 +4656,92 @@ mod quarantine {
.filter_map(|output| output.get_randomness_last_round_timestamp())
.next()
}

pub(super) fn load_initial_object_debts(
&self,
epoch_store: &AuthorityPerEpochStore,
current_round: Round,
for_randomness: bool,
per_commit_budget: u64,
transactions: &[VerifiedSequencedConsensusTransaction],
) -> SuiResult<impl IntoIterator<Item = (ObjectID, u64)>> {
let tables = epoch_store.tables()?;
let (hash_table, db_table) = if for_randomness {
(&self.congestion_control_randomness_object_debts, &tables.congestion_control_randomness_object_debts)
} else {
(&self.congestion_control_object_debts, &tables.congestion_control_object_debts)
};
let shared_input_object_ids: BTreeSet<_> = transactions
.iter()
.filter_map(|tx| {
if let SequencedConsensusTransactionKind::External(ConsensusTransaction {
kind: ConsensusTransactionKind::CertifiedTransaction(tx),
..
}) = &tx.0.transaction
{
Some(tx.shared_input_objects().map(|obj| obj.id))
} else {
None
}
})
.flatten()
.collect();

let num_ids = shared_input_object_ids.len();
let mut results = Vec::with_capacity(num_ids);
let mut fallback_keys = Vec::with_capacity(num_ids);
let mut fallback_indices = Vec::with_capacity(num_ids);

for (i, object_id) in shared_input_object_ids.iter().enumerate() {
if let Some(debt) = hash_table.get(object_id) {
results.push(Some(*debt));
} else {
results.push(None);
fallback_keys.push(object_id);
fallback_indices.push(i);
}
}

let fallback_results = db_table.multi_get(fallback_keys)?;
assert_eq!(fallback_results.len(), fallback_indices.len());
for (i, result) in fallback_indices.into_iter().zip(fallback_results) {
results[i] = result;
}

Ok(results.into_iter().zip(shared_input_object_ids.into_iter())
.map(|(debt, object_id)| {
let (round, debt) = debt.into_v1();
(
object_id,
// Stored debts already account for the budget of the round in which
// they were accumulated. Application of budget from future rounds to
// the debt is handled here.
debt.saturating_sub(per_commit_budget * (current_round - round - 1)),
)
}))

)

Ok(results)



Ok(hash_table
.multi_get(shared_input_object_ids.iter())?
.into_iter()
.flatten()
.zip(shared_input_object_ids)
.map(move |(debt, object_id)| {
let (round, debt) = debt.into_v1();
(
object_id,
// Stored debts already account for the budget of the round in which
// they were accumulated. Application of budget from future rounds to
// the debt is handled here.
debt.saturating_sub(per_commit_budget * (current_round - round - 1)),
)
}))
}
}
}

Expand Down Expand Up @@ -5021,3 +5106,66 @@ impl From<LockDetails> for LockDetailsWrapper {
LockDetailsWrapper::V1(details)
}
}

// A wrapper around HashMap that uses refcounts to keep entries alive until
// they are no longer needed.
//
// If there are N inserts for the same key, the key will not be removed until
// there are N removes.
//
// It is intended to track the *latest* value for a given key, so duplicate
// inserts are intended to overwrite any prior value.
#[derive(Debug, Default)]
struct RefCountedHashMap<K, V> {
map: HashMap<K, (usize, V)>,
}

impl<K, V> RefCountedHashMap<K, V>
where
K: Copy + Eq + std::hash::Hash,
{
pub fn new() -> Self {
Self {
map: HashMap::new(),
}
}

pub fn insert(&mut self, key: K, value: V) {
let entry = self.map.entry(key);
match entry {
hash_map::Entry::Occupied(mut entry) => {
let (ref_count, v) = entry.get_mut();
*ref_count += 1;
*v = value;
}
hash_map::Entry::Vacant(entry) => {
entry.insert((1, value));
}
}
}

// Returns true if the key was present, false otherwise.
// Note that the key may not be removed if present, as it may have a refcount > 1.
pub fn remove(&mut self, key: &K) -> bool {
let entry = self.map.entry(*key);
match entry {
hash_map::Entry::Occupied(mut entry) => {
let (ref_count, _) = entry.get_mut();
*ref_count -= 1;
if *ref_count == 0 {
entry.remove();
}
true
}
hash_map::Entry::Vacant(_) => false,
}
}

pub fn get(&self, key: &K) -> Option<&V> {
self.map.get(key).map(|(_, v)| v)
}

pub fn iter(&self) -> impl Iterator<Item = (&K, &V)> {
self.map.iter().map(|(k, (_, v))| (k, v))
}
}

0 comments on commit 629c17b

Please sign in to comment.