Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
msmouse committed Dec 10, 2024
1 parent d3f1e7e commit d59dead
Show file tree
Hide file tree
Showing 13 changed files with 96 additions and 28 deletions.
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,7 @@ num-derive = "0.3.3"
num-integer = "0.1.42"
num-traits = "0.2.15"
once_cell = "1.10.0"
once_map = "0.4.21"
ordered-float = "3.9.1"
ouroboros = "0.15.6"
owo-colors = "3.5.0"
Expand Down Expand Up @@ -771,6 +772,7 @@ shadow-rs = "0.16.2"
simplelog = "0.9.0"
smallbitvec = "2.5.1"
smallvec = "1.8.0"
stable_deref_trait = "1.2.0"
static_assertions = "1.1.0"
stats_alloc = "0.1.8"
status-line = "0.2.0"
Expand Down
6 changes: 3 additions & 3 deletions execution/executor-types/src/execution_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ impl ExecutionOutput {
to_commit: TransactionsToKeep::new_empty(),
to_discard: TransactionsWithOutput::new_empty(),
to_retry: TransactionsWithOutput::new_empty(),
state_reads: ShardedStateCache::new_empty(state.next_version()),
result_state: state,
state_reads: ShardedStateCache::default(),
block_end_info: None,
next_epoch_state: None,
subscribable_events: Planned::ready(vec![]),
Expand All @@ -95,7 +95,7 @@ impl ExecutionOutput {
to_discard: TransactionsWithOutput::new_empty(),
to_retry: TransactionsWithOutput::new_empty(),
result_state: LedgerState::new_empty(),
state_reads: ShardedStateCache::default(),
state_reads: ShardedStateCache::new_empty(0),
block_end_info: None,
next_epoch_state: None,
subscribable_events: Planned::ready(vec![]),
Expand All @@ -115,7 +115,7 @@ impl ExecutionOutput {
to_discard: TransactionsWithOutput::new_empty(),
to_retry: TransactionsWithOutput::new_empty(),
result_state: self.result_state.clone(),
state_reads: ShardedStateCache::default(),
state_reads: ShardedStateCache::new_empty(self.next_version()),
block_end_info: None,
next_epoch_state: self.next_epoch_state.clone(),
subscribable_events: Planned::ready(vec![]),
Expand Down
19 changes: 13 additions & 6 deletions execution/executor-types/src/transactions_with_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ impl TransactionsToKeep {
self.borrow_state_update_refs().for_last_checkpoint.as_ref()
}

pub fn state_update_refs_for_latest(&self) -> &BatchedStateUpdateRefs {
&self.borrow_state_update_refs().for_latest
pub fn state_update_refs_for_latest(&self) -> Option<&BatchedStateUpdateRefs> {
self.borrow_state_update_refs().for_latest.as_ref()
}

pub fn ends_with_sole_checkpoint(&self) -> bool {
Expand Down Expand Up @@ -186,7 +186,7 @@ impl Debug for TransactionsToKeep {
pub struct StateUpdateRefs<'kv> {
pub per_version: PerVersionStateUpdateRefs<'kv>,
pub for_last_checkpoint: Option<BatchedStateUpdateRefs<'kv>>,
pub for_latest: BatchedStateUpdateRefs<'kv>,
pub for_latest: Option<BatchedStateUpdateRefs<'kv>>,
}

impl<'kv> StateUpdateRefs<'kv> {
Expand All @@ -212,7 +212,7 @@ impl<'kv> StateUpdateRefs<'kv> {
last_checkpoint_index: Option<usize>,
) -> (
Option<BatchedStateUpdateRefs<'kv>>,
BatchedStateUpdateRefs<'kv>,
Option<BatchedStateUpdateRefs<'kv>>,
) {
let _timer = TIMER.timer_with(&["index_state_updates__collect_batch"]);

Expand All @@ -230,8 +230,15 @@ impl<'kv> StateUpdateRefs<'kv> {
num_versions -= idx + 1;
ret
});
let updates_for_latest =
Self::collect_some_updates(first_version, num_versions, &mut shard_iters);
let updates_for_latest = if num_versions == 0 {
None
} else {
Some(Self::collect_some_updates(
first_version,
num_versions,
&mut shard_iters,
))
};

// Assert that all updates are consumed.
assert!(shard_iters.iter_mut().all(|iter| iter.next().is_none()));
Expand Down
2 changes: 1 addition & 1 deletion execution/executor/src/workflow/do_state_checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl DoStateCheckpoint {
.to_commit
.state_update_refs_for_last_checkpoint()
{
let index = (updates.next_version() - 1) as usize;
let index = updates.num_versions - 1;
out[index] = Some(state_summary.last_checkpoint().root_hash());
}
out[num_txns - 1] = Some(state_summary.root_hash());
Expand Down
7 changes: 4 additions & 3 deletions storage/aptosdb/src/state_store/buffered_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl BufferedState {
/// If a commit is needed, it sends a CommitMessage::Data message to the StateSnapshotCommitter thread to commit the data.
/// If sync_commit is true, it also sends a CommitMessage::Sync message to ensure that the commit is completed before returning.
fn maybe_commit(&mut self, sync_commit: bool) {
if sync_commit
if sync_commit && self.estimated_items > 0
|| self.estimated_items >= self.target_items
|| self.buffered_versions() >= TARGET_SNAPSHOT_INTERVAL_IN_VERSION
{
Expand Down Expand Up @@ -170,10 +170,11 @@ impl BufferedState {
.last_checkpoint()
.make_delta(old_state.last_checkpoint())
.count_updates_costly();
let version = new_state.last_checkpoint().version();

self.maybe_commit(sync_commit);
Self::report_last_checkpoint_version(new_state.last_checkpoint().version());
self.current_state.lock().set(new_state);
self.maybe_commit(sync_commit);
Self::report_last_checkpoint_version(version);
Ok(())
}

Expand Down
6 changes: 3 additions & 3 deletions storage/aptosdb/src/state_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -657,14 +657,14 @@ impl StateStore {
let new_ledger_state = state.ledger_state().update(
state.state(),
state_update_refs.for_last_checkpoint.as_ref(),
&state_update_refs.for_latest,
state_update_refs.for_latest.as_ref(),
&state_view.into_state_cache(),
);

let new_ledger_state_summary = state.ledger_state_summary().update(
&ProvableStateSummary::new(state.summary().clone(), state_db.clone()),
state_update_refs.for_last_checkpoint.as_ref(),
&state_update_refs.for_latest,
state_update_refs.for_latest.as_ref(),
)?;

Ok(LedgerStateWithSummary::from_state_and_summary(
Expand Down Expand Up @@ -840,7 +840,7 @@ impl StateStore {
cache
} else {
// If no cache is provided, we load the old values of all keys inline.
_state_cache = ShardedStateCache::default();
_state_cache = ShardedStateCache::new_empty(current_state.next_version());
self.prime_state_cache(current_state, latest_state, &_state_cache);
&_state_cache
};
Expand Down
1 change: 1 addition & 0 deletions storage/storage-interface/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dashmap = { workspace = true }
derive_more = { workspace = true }
itertools = { workspace = true }
once_cell = { workspace = true }
once_map = { workspace = true }
parking_lot = { workspace = true }
proptest = { workspace = true }
proptest-derive = { workspace = true }
Expand Down
15 changes: 12 additions & 3 deletions storage/storage-interface/src/state_store/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,12 @@ impl State {
// 2. The cache must be at a version equal or newer than `persisted`, otherwise
// updates between the cached version and the persisted version are potentially
// missed during the usage calculation.
assert!(persisted.next_version() <= state_cache.next_version());
assert!(
persisted.next_version() <= state_cache.next_version(),
"persisted: {}, cache: {}",
persisted.next_version(),
state_cache.next_version(),
);
// 3. `self` must be at a version equal or newer than the cache, because we assume
// it is overlayed on top of the cache.
assert!(self.next_version() >= state_cache.next_version());
Expand Down Expand Up @@ -225,7 +230,7 @@ impl LedgerState {
&self,
persisted_snapshot: &State,
updates_for_last_checkpoint: Option<&BatchedStateUpdateRefs<'kv>>,
updates_for_latest: &BatchedStateUpdateRefs<'kv>,
updates_for_latest: Option<&BatchedStateUpdateRefs<'kv>>,
state_cache: &ShardedStateCache,
) -> LedgerState {
let _timer = TIMER.timer_with(&["ledger_state__update"]);
Expand All @@ -242,7 +247,11 @@ impl LedgerState {
} else {
&last_checkpoint
};
let latest = base_of_latest.update(persisted_snapshot, updates_for_latest, state_cache);
let latest = if let Some(updates) = updates_for_latest {
base_of_latest.update(persisted_snapshot, updates, state_cache)
} else {
base_of_latest.clone()
};

LedgerState::new(latest, last_checkpoint)
}
Expand Down
36 changes: 29 additions & 7 deletions storage/storage-interface/src/state_store/state_summary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use aptos_types::{
};
use derive_more::Deref;
use itertools::Itertools;
use once_map::sync::OnceMap;
use rayon::prelude::*;
use std::sync::Arc;

Expand Down Expand Up @@ -58,6 +59,10 @@ impl StateSummary {
self.next_version
}

pub fn version(&self) -> Option<Version> {
self.next_version.checked_sub(1)
}

pub fn is_descendant_of(&self, other: &Self) -> bool {
self.global_state_summary
.is_descendant_of(&other.global_state_summary)
Expand Down Expand Up @@ -156,7 +161,7 @@ impl LedgerStateSummary {
&self,
persisted: &ProvableStateSummary,
updates_for_last_checkpoint: Option<&BatchedStateUpdateRefs<'kv>>,
updates_for_latest: &BatchedStateUpdateRefs<'kv>,
updates_for_latest: Option<&BatchedStateUpdateRefs<'kv>>,
) -> Result<Self> {
let _timer = TIMER.timer_with(&["ledger_state_summary__update"]);

Expand All @@ -171,7 +176,11 @@ impl LedgerStateSummary {
} else {
&last_checkpoint
};
let latest = base_of_latest.update(persisted, updates_for_latest)?;
let latest = if let Some(updates) = updates_for_latest {
base_of_latest.update(persisted, updates)?
} else {
base_of_latest.clone()
};

Ok(Self::new(last_checkpoint, latest))
}
Expand All @@ -181,7 +190,9 @@ impl LedgerStateSummary {
pub struct ProvableStateSummary {
#[deref]
state_summary: StateSummary,
_db: Arc<dyn DbReader>,
db: Arc<dyn DbReader>,
// FIXME(aldenhu): avoid lock conflicts
memorized: OnceMap<HashValue, Box<SparseMerkleProofExt>>,
}

impl ProvableStateSummary {
Expand All @@ -192,15 +203,26 @@ impl ProvableStateSummary {
pub fn new(state_summary: StateSummary, db: Arc<dyn DbReader>) -> Self {
Self {
state_summary,
_db: db,
db,
memorized: OnceMap::new(),
}
}
}

impl ProofRead for ProvableStateSummary {
fn get_proof(&self, _key: HashValue) -> Option<&SparseMerkleProofExt> {
// FIXME(change interface to return non-ref)
todo!()
// FIXME(aldenhu): return error
// FIXME(aldenhu): partial proof
// FIXME(aldenhu): ref
fn get_proof(&self, key: HashValue) -> Option<&SparseMerkleProofExt> {
self.version().map(|ver| {
self.memorized.insert(key, |key| {
Box::new(
self.db
.get_state_proof_by_version_ext(key, ver, 0)
.expect("Failed to get account state with proof by version."),
)
})
})
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,20 @@ static IO_POOL: Lazy<rayon::ThreadPool> = Lazy::new(|| {
.unwrap()
});

#[derive(Debug, Default)]
#[derive(Debug)]
pub struct ShardedStateCache {
next_version: Version,
pub shards: [StateCacheShard; 16],
}

impl ShardedStateCache {
pub fn new_empty(next_version: Version) -> Self {
Self {
next_version,
shards: Default::default(),
}
}

fn shard(&self, shard_id: u8) -> &StateCacheShard {
&self.shards[shard_id as usize]
}
Expand Down Expand Up @@ -102,8 +109,8 @@ impl CachedStateView {
Self {
id,
reader,
memorized: ShardedStateCache::new_empty(state.next_version()),
speculative: state.into_delta(persisted_state),
memorized: ShardedStateCache::default(),
}
}

Expand Down
1 change: 1 addition & 0 deletions types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ serde_bytes = { workspace = true }
serde_json = { workspace = true }
serde_with = { workspace = true }
serde_yaml = { workspace = true }
stable_deref_trait = { workspace = true }
strum = { workspace = true }
strum_macros = { workspace = true }
thiserror = { workspace = true }
Expand Down
4 changes: 4 additions & 0 deletions types/src/proof/definition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ impl SparseMerkleProofExt {
}
}

pub fn new_empty() -> Self {
Self::new(None, vec![])
}

pub fn new_partial(
leaf: Option<SparseMerkleLeafNode>,
siblings: Vec<NodeInProof>,
Expand Down

0 comments on commit d59dead

Please sign in to comment.