Skip to content

Commit

Permalink
Add ExecutionTimeEstimate mode for congestion control (#20994)
Browse files Browse the repository at this point in the history
## Description 

- `ExecutionTimeObserver` tracks local measurements of execution time
and submits observations to consensus when moving average changes by
more than a threshold.

- `ExecutionTimeEstimator` records execution time observations received
from consensus and computes stake-weighted medians for use in congestion
control.

This PR contains the core implementation of the feature but is missing
some important components required for use in production:
- storage of received observations for crash recovery
- propagation of observations to next epoch
- metrics

The heuristics it uses are as simple as possible for a first pass and
will likely require some tuning.

## Test plan 

Added & updated unit tests. Disabled for now by protocol config.

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] gRPC:
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
aschran authored Jan 31, 2025
1 parent 9a95135 commit d6ed8a5
Show file tree
Hide file tree
Showing 19 changed files with 1,373 additions and 68 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ shell-words = "1.1.0"
shellexpand = "3.1.0"
signature = "1.6.0"
similar = "2.4.0"
simple_moving_average = "1.0.2"
simple-server-timing-header = "0.1.1"
slip10_ed25519 = "0.1.3"
smallvec = "1.10.0"
Expand Down
2 changes: 2 additions & 0 deletions crates/sui-benchmark/tests/simtest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,8 @@ mod test {
config.set_gas_budget_based_txn_cost_cap_factor_for_testing(total_gas_limit/cap_factor_denominator);
config.set_gas_budget_based_txn_cost_absolute_cap_commit_count_for_testing(absolute_cap_factor);
},
// TODO: Enable once ExecutionTimeEstimate mode is functional across epochs.
PerObjectCongestionControlMode::ExecutionTimeEstimate => unimplemented!(),
}
config.set_max_deferral_rounds_for_congestion_control_for_testing(max_deferral_rounds);
config.set_max_txn_cost_overage_per_object_in_commit_for_testing(
Expand Down
10 changes: 10 additions & 0 deletions crates/sui-config/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,12 @@ pub struct NodeConfig {
/// By default, write stall is enabled on validators but not on fullnodes.
#[serde(skip_serializing_if = "Option::is_none")]
pub enable_db_write_stall: Option<bool>,

/// Size of the channel used for buffering local execution time observations.
///
/// If unspecified, this will default to `128`.
#[serde(default = "default_local_execution_time_channel_capacity")]
pub local_execution_time_channel_capacity: usize,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
Expand Down Expand Up @@ -544,6 +550,10 @@ pub fn default_end_of_epoch_broadcast_channel_capacity() -> usize {
128
}

pub fn default_local_execution_time_channel_capacity() -> usize {
128
}

pub fn bool_true() -> bool {
true
}
Expand Down
1 change: 1 addition & 0 deletions crates/sui-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ scopeguard.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true
simple_moving_average.workspace = true
static_assertions.workspace = true
tap.workspace = true
tempfile.workspace = true
Expand Down
71 changes: 47 additions & 24 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use std::{
Expand All @@ -50,6 +51,7 @@ use sui_config::node::{AuthorityOverloadConfig, StateDebugDumpConfig};
use sui_config::NodeConfig;
use sui_types::crypto::RandomnessRound;
use sui_types::dynamic_field::visitor as DFV;
use sui_types::execution::ExecutionTiming;
use sui_types::execution_status::ExecutionStatus;
use sui_types::inner_temporary_store::PackageStoreWithFallback;
use sui_types::layout_resolver::into_struct_layout;
Expand Down Expand Up @@ -214,6 +216,7 @@ pub mod authority_store_pruner;
pub mod authority_store_tables;
pub mod authority_store_types;
pub mod epoch_start_configuration;
pub mod execution_time_estimator;
pub mod shared_object_congestion_tracker;
pub mod shared_object_version_manager;
pub mod test_authority_builder;
Expand Down Expand Up @@ -1235,6 +1238,8 @@ impl AuthorityState {
return Ok((effects, None));
}

let execution_start_time = Instant::now();

let input_objects =
self.read_objects_for_execution(tx_guard.as_lock_guard(), certificate, epoch_store)?;

Expand All @@ -1246,18 +1251,27 @@ impl AuthorityState {
expected_effects_digest = epoch_store.get_signed_effects_digest(tx_digest)?;
}

self.process_certificate(
tx_guard,
certificate,
input_objects,
expected_effects_digest,
epoch_store,
)
.await
.tap_err(|e| info!("process_certificate failed: {e}"))
.tap_ok(
|(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_certificate succeeded"),
)
let (effects, timings, execution_error_opt) = self
.process_certificate(
tx_guard,
certificate,
input_objects,
expected_effects_digest,
epoch_store,
)
.await
.tap_err(|e| info!("process_certificate failed: {e}"))
.tap_ok(
|(fx, _, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_certificate succeeded"),
)?;

epoch_store.record_local_execution_time(
certificate.data().transaction_data(),
timings,
execution_start_time.elapsed(),
);

Ok((effects, execution_error_opt))
}

pub fn read_objects_for_execution(
Expand Down Expand Up @@ -1357,7 +1371,11 @@ impl AuthorityState {
input_objects: InputObjects,
expected_effects_digest: Option<TransactionEffectsDigest>,
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult<(TransactionEffects, Option<ExecutionError>)> {
) -> SuiResult<(
TransactionEffects,
Vec<ExecutionTiming>,
Option<ExecutionError>,
)> {
let process_certificate_start_time = tokio::time::Instant::now();
let digest = *certificate.digest();

Expand Down Expand Up @@ -1396,12 +1414,9 @@ impl AuthorityState {
// non-transient (transaction input is invalid, move vm errors). However, all errors from
// this function occur before we have written anything to the db, so we commit the tx
// guard and rely on the client to retry the tx (if it was transient).
let (inner_temporary_store, effects, execution_error_opt) = match self.prepare_certificate(
&execution_guard,
certificate,
input_objects,
epoch_store,
) {
let (inner_temporary_store, effects, timings, execution_error_opt) = match self
.prepare_certificate(&execution_guard, certificate, input_objects, epoch_store)
{
Err(e) => {
info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
tx_guard.release();
Expand Down Expand Up @@ -1496,7 +1511,7 @@ impl AuthorityState {
.execution_gas_latency_ratio
.observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
};
Ok((effects, execution_error_opt))
Ok((effects, timings, execution_error_opt))
}

#[instrument(level = "trace", skip_all)]
Expand Down Expand Up @@ -1632,6 +1647,7 @@ impl AuthorityState {
) -> SuiResult<(
InnerTemporaryStore,
TransactionEffects,
Vec<ExecutionTiming>,
Option<ExecutionError>,
)> {
let _scope = monitored_scope("Execution::prepare_certificate");
Expand All @@ -1658,7 +1674,7 @@ impl AuthorityState {
let (kind, signer, gas) = transaction_data.execution_parts();

#[allow(unused_mut)]
let (inner_temp_store, _, mut effects, _timings, execution_error_opt) =
let (inner_temp_store, _, mut effects, timings, execution_error_opt) =
epoch_store.executor().execute_transaction_to_effects(
self.get_backing_store().as_ref(),
protocol_config,
Expand Down Expand Up @@ -1694,7 +1710,12 @@ impl AuthorityState {
.observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
}

Ok((inner_temp_store, effects, execution_error_opt.err()))
Ok((
inner_temp_store,
effects,
timings,
execution_error_opt.err(),
))
}

pub fn prepare_certificate_for_benchmark(
Expand All @@ -1710,7 +1731,9 @@ impl AuthorityState {
let lock: RwLock<EpochId> = RwLock::new(epoch_store.epoch());
let execution_guard = lock.try_read().unwrap();

self.prepare_certificate(&execution_guard, certificate, input_objects, epoch_store)
let (inner_temp_store, effects, _timings, execution_error_opt) =
self.prepare_certificate(&execution_guard, certificate, input_objects, epoch_store)?;
Ok((inner_temp_store, effects, execution_error_opt))
}

#[instrument(skip_all)]
Expand Down Expand Up @@ -5083,7 +5106,7 @@ impl AuthorityState {
let input_objects =
self.read_objects_for_execution(&tx_lock, &executable_tx, epoch_store)?;

let (temporary_store, effects, _execution_error_opt) =
let (temporary_store, effects, _timings, _execution_error_opt) =
self.prepare_certificate(&execution_guard, &executable_tx, input_objects, epoch_store)?;
let system_obj = get_sui_system_state(&temporary_store.written)
.expect("change epoch tx must write to system object");
Expand Down
Loading

0 comments on commit d6ed8a5

Please sign in to comment.