Skip to content

Commit

Permalink
WIP rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
mystenmark committed Nov 7, 2024
1 parent 629c17b commit 0226d34
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 163 deletions.
179 changes: 63 additions & 116 deletions crates/sui-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use typed_store::{
use super::authority_store_tables::ENV_VAR_LOCKS_BLOCK_CACHE_SIZE;
use super::epoch_start_configuration::EpochStartConfigTrait;
use super::shared_object_congestion_tracker::{
CongestionPerObjectDebt, SharedObjectCongestionTracker,
CongestionPerObjectDebt, Debt, SharedObjectCongestionTracker,
};
use super::transaction_deferral::{transaction_deferral_within_limit, DeferralKey, DeferralReason};
use crate::authority::epoch_start_configuration::{EpochFlag, EpochStartConfiguration};
Expand Down Expand Up @@ -729,63 +729,6 @@ impl AuthorityEpochTables {
Ok(())
}

// TODO(quarantine)
pub fn load_initial_object_debts(
&self,
current_round: Round,
for_randomness: bool,
protocol_config: &ProtocolConfig,
transactions: &[VerifiedSequencedConsensusTransaction],
) -> SuiResult<impl IntoIterator<Item = (ObjectID, u64)>> {
let default_per_commit_budget = protocol_config
.max_accumulated_txn_cost_per_object_in_mysticeti_commit_as_option()
.unwrap_or(0);
let (table, per_commit_budget) = if for_randomness {
(
&self.congestion_control_randomness_object_debts,
protocol_config
.max_accumulated_randomness_txn_cost_per_object_in_mysticeti_commit_as_option()
.unwrap_or(default_per_commit_budget),
)
} else {
(
&self.congestion_control_object_debts,
default_per_commit_budget,
)
};

let shared_input_object_ids: BTreeSet<_> = transactions
.iter()
.filter_map(|tx| {
if let SequencedConsensusTransactionKind::External(ConsensusTransaction {
kind: ConsensusTransactionKind::CertifiedTransaction(tx),
..
}) = &tx.0.transaction
{
Some(tx.shared_input_objects().map(|obj| obj.id))
} else {
None
}
})
.flatten()
.collect();
Ok(table
.multi_get(shared_input_object_ids.iter())?
.into_iter()
.zip(shared_input_object_ids)
.filter_map(|(debt, object_id)| debt.map(|debt| (debt, object_id)))
.map(move |(debt, object_id)| {
let (round, debt) = debt.into_v1();
(
object_id,
// Stored debts already account for the budget of the round in which
// they were accumulated. Application of budget from future rounds to
// the debt is handled here.
debt.saturating_sub(per_commit_budget * (current_round - round - 1)),
)
}))
}

fn get_all_deferred_transactions(
&self,
) -> SuiResult<BTreeMap<DeferralKey, Vec<VerifiedSequencedConsensusTransaction>>> {
Expand Down Expand Up @@ -2870,21 +2813,26 @@ impl AuthorityPerEpochStore {

// We track transaction execution cost separately for regular transactions and transactions using randomness, since
// they will be in different PendingCheckpoints.
let tables = self.tables()?;
let shared_object_congestion_tracker = SharedObjectCongestionTracker::from_protocol_config(
&tables,
self.consensus_quarantine.read().load_initial_object_debts(
self,
consensus_commit_info.round,
false,
&sequenced_transactions,
)?,
self.protocol_config(),
consensus_commit_info.round,
false,
&sequenced_transactions,
)?;
let shared_object_using_randomness_congestion_tracker =
SharedObjectCongestionTracker::from_protocol_config(
&tables,
self.consensus_quarantine.read().load_initial_object_debts(
self,
consensus_commit_info.round,
true,
&sequenced_transactions,
)?,
self.protocol_config(),
consensus_commit_info.round,
true,
&sequenced_randomness_transactions,
)?;

// We always order transactions using randomness last.
Expand Down Expand Up @@ -4272,6 +4220,8 @@ impl AuthorityPerEpochStore {
mod quarantine {
use mysten_common::fatal;

use crate::authority::shared_object_congestion_tracker::Debt;

use super::*;

/// ConsensusOutputQuarantine holds outputs of consensus processing in memory until the checkpoints
Expand All @@ -4294,8 +4244,9 @@ mod quarantine {

// The most recent congestion control debts for objects. Uses a ref-count to track
// which objects still exist in some element of output_queue.
congestion_control_randomness_object_debts: RefCountedHashMap<ObjectID, u64>,
congestion_control_object_debts: RefCountedHashMap<ObjectID, u64>,
congestion_control_randomness_object_debts:
RefCountedHashMap<ObjectID, CongestionPerObjectDebt>,
congestion_control_object_debts: RefCountedHashMap<ObjectID, CongestionPerObjectDebt>,
}

impl ConsensusOutputQuarantine {
Expand All @@ -4306,9 +4257,9 @@ mod quarantine {
output_queue: VecDeque::new(),
builder_checkpoint_summary: BTreeMap::new(),
builder_digest_to_checkpoint: HashMap::new(),
shared_object_next_versions: Default::default(),
congestion_control_randomness_object_debts: Default::default(),
congestion_control_object_debts: Default::default(),
shared_object_next_versions: RefCountedHashMap::new(),
congestion_control_randomness_object_debts: RefCountedHashMap::new(),
congestion_control_object_debts: RefCountedHashMap::new(),
}
}
}
Expand Down Expand Up @@ -4540,12 +4491,20 @@ mod quarantine {
}

fn insert_congestion_control_debts(&mut self, output: &ConsensusCommitOutput) {
let current_round = output.consensus_round;

for (object_id, debt) in output.congestion_control_object_debts.iter() {
self.congestion_control_object_debts.insert(*object_id, *debt);
self.congestion_control_object_debts.insert(
*object_id,
CongestionPerObjectDebt::new(current_round, *debt),
);
}

for (object_id, debt) in output.congestion_control_randomness_object_debts.iter() {
self.congestion_control_randomness_object_debts.insert(*object_id, *debt);
self.congestion_control_randomness_object_debts.insert(
*object_id,
CongestionPerObjectDebt::new(current_round, *debt),
);
}
}

Expand Down Expand Up @@ -4662,14 +4621,27 @@ mod quarantine {
epoch_store: &AuthorityPerEpochStore,
current_round: Round,
for_randomness: bool,
per_commit_budget: u64,
transactions: &[VerifiedSequencedConsensusTransaction],
) -> SuiResult<impl IntoIterator<Item = (ObjectID, u64)>> {
) -> SuiResult<impl IntoIterator<Item = (ObjectID, Debt)>> {
let protocol_config = epoch_store.protocol_config();
let tables = epoch_store.tables()?;
let (hash_table, db_table) = if for_randomness {
(&self.congestion_control_randomness_object_debts, &tables.congestion_control_randomness_object_debts)
let default_per_commit_budget = protocol_config
.max_accumulated_txn_cost_per_object_in_mysticeti_commit_as_option()
.unwrap_or(0);
let (hash_table, db_table, per_commit_budget) = if for_randomness {
(
&self.congestion_control_randomness_object_debts,
&tables.congestion_control_randomness_object_debts,
protocol_config
.max_accumulated_randomness_txn_cost_per_object_in_mysticeti_commit_as_option()
.unwrap_or(default_per_commit_budget),
)
} else {
(&self.congestion_control_object_debts, &tables.congestion_control_object_debts)
(
&self.congestion_control_object_debts,
&tables.congestion_control_object_debts,
default_per_commit_budget,
)
};
let shared_input_object_ids: BTreeSet<_> = transactions
.iter()
Expand All @@ -4694,7 +4666,7 @@ mod quarantine {

for (i, object_id) in shared_input_object_ids.iter().enumerate() {
if let Some(debt) = hash_table.get(object_id) {
results.push(Some(*debt));
results.push(Some(debt.into_v1()));
} else {
results.push(None);
fallback_keys.push(object_id);
Expand All @@ -4705,41 +4677,20 @@ mod quarantine {
let fallback_results = db_table.multi_get(fallback_keys)?;
assert_eq!(fallback_results.len(), fallback_indices.len());
for (i, result) in fallback_indices.into_iter().zip(fallback_results) {
results[i] = result;
results[i] = result.map(|debt| debt.into_v1());
}

Ok(results.into_iter().zip(shared_input_object_ids.into_iter())
.map(|(debt, object_id)| {
let (round, debt) = debt.into_v1();
(
object_id,
Ok(results
.into_iter()
.zip(shared_input_object_ids)
.filter_map(|(debt, object_id)| debt.map(|debt| (debt, object_id)))
.map(move |((round, debt), object_id)| {
// Stored debts already account for the budget of the round in which
// they were accumulated. Application of budget from future rounds to
// the debt is handled here.
debt.saturating_sub(per_commit_budget * (current_round - round - 1)),
)
}))

)

Ok(results)



Ok(hash_table
.multi_get(shared_input_object_ids.iter())?
.into_iter()
.flatten()
.zip(shared_input_object_ids)
.map(move |(debt, object_id)| {
let (round, debt) = debt.into_v1();
(
object_id,
// Stored debts already account for the budget of the round in which
// they were accumulated. Application of budget from future rounds to
// the debt is handled here.
debt.saturating_sub(per_commit_budget * (current_round - round - 1)),
)
assert!(current_round > round);
let num_rounds = current_round - round - 1;
(object_id, debt.dec_by(per_commit_budget * num_rounds))
}))
}
}
Expand Down Expand Up @@ -4780,8 +4731,8 @@ pub(crate) struct ConsensusCommitOutput {
active_jwks: BTreeSet<(u64, (JwkId, JWK))>,

// congestion control state
congestion_control_object_debts: Vec<(ObjectID, u64)>,
congestion_control_randomness_object_debts: Vec<(ObjectID, u64)>,
congestion_control_object_debts: Vec<(ObjectID, Debt)>,
congestion_control_randomness_object_debts: Vec<(ObjectID, Debt)>,
}

impl ConsensusCommitOutput {
Expand Down Expand Up @@ -4911,13 +4862,13 @@ impl ConsensusCommitOutput {
self.active_jwks.insert((round, key));
}

fn set_congestion_control_object_debts(&mut self, object_debts: Vec<(ObjectID, u64)>) {
fn set_congestion_control_object_debts(&mut self, object_debts: Vec<(ObjectID, Debt)>) {
self.congestion_control_object_debts = object_debts;
}

fn set_congestion_control_randomness_object_debts(
&mut self,
object_debts: Vec<(ObjectID, u64)>,
object_debts: Vec<(ObjectID, Debt)>,
) {
self.congestion_control_randomness_object_debts = object_debts;
}
Expand Down Expand Up @@ -5164,8 +5115,4 @@ where
pub fn get(&self, key: &K) -> Option<&V> {
self.map.get(key).map(|(_, v)| v)
}

pub fn iter(&self) -> impl Iterator<Item = (&K, &V)> {
self.map.iter().map(|(k, (_, v))| (k, v))
}
}
Loading

0 comments on commit 0226d34

Please sign in to comment.