From 7dee94c77119b2d95dfd88353a3542c9dd7ec9b5 Mon Sep 17 00:00:00 2001 From: igor-aptos <110557261+igor-aptos@users.noreply.github.com> Date: Tue, 31 Oct 2023 15:19:23 -0700 Subject: [PATCH] [agg_v2] Proper gas charging for writing to resources with aggregators (#10698) * [agg_v2] Proper gas charging for writing to resources with aggregators * fixes --------- --- .../aptos-aggregator/src/delta_change_set.rs | 24 +- aptos-move/aptos-aggregator/src/resolver.rs | 49 +++- .../aptos-aggregator/src/tests/types.rs | 25 +- aptos-move/aptos-vm-types/src/change_set.rs | 66 ++++- aptos-move/aptos-vm-types/src/output.rs | 1 + aptos-move/aptos-vm-types/src/resolver.rs | 26 +- aptos-move/aptos-vm-types/src/tests/utils.rs | 2 + aptos-move/aptos-vm/src/aptos_vm.rs | 8 +- aptos-move/aptos-vm/src/block_executor/mod.rs | 18 ++ aptos-move/aptos-vm/src/data_cache.rs | 16 ++ .../src/move_vm_ext/respawned_session.rs | 21 +- .../aptos-vm/src/move_vm_ext/session.rs | 17 +- aptos-move/aptos-vm/src/natives.rs | 20 ++ aptos-move/block-executor/src/executor.rs | 162 ++++------- .../src/proptest_types/types.rs | 19 +- aptos-move/block-executor/src/task.rs | 8 + .../src/txn_last_input_output.rs | 18 ++ aptos-move/block-executor/src/view.rs | 252 ++++++++++++++---- .../src/natives/aggregator_natives/context.rs | 53 ++-- aptos-move/mvhashmap/src/types.rs | 9 +- .../src/unit_tests/proptest_types.rs | 8 + types/src/transaction/mod.rs | 2 +- types/src/write_set.rs | 22 ++ 23 files changed, 639 insertions(+), 207 deletions(-) diff --git a/aptos-move/aptos-aggregator/src/delta_change_set.rs b/aptos-move/aptos-aggregator/src/delta_change_set.rs index a73fdd89ed5d4..706bc47495ff3 100644 --- a/aptos-move/aptos-aggregator/src/delta_change_set.rs +++ b/aptos-move/aptos-aggregator/src/delta_change_set.rs @@ -219,12 +219,20 @@ mod test { FakeAggregatorView, }; use aptos_types::{ + aggregator::PanicError, state_store::{state_key::StateKey, state_value::StateValue}, write_set::WriteOp, }; use claims::{assert_err, assert_matches, assert_ok, assert_ok_eq}; - use move_core_types::vm_status::{StatusCode, VMStatus}; + use move_core_types::{ + value::MoveTypeLayout, + vm_status::{StatusCode, VMStatus}, + }; use once_cell::sync::Lazy; + use std::{ + collections::{BTreeMap, HashSet}, + sync::Arc, + }; fn delta_add_with_history(v: u128, max_value: u128, max: u128, min: u128) -> DeltaOp { let mut delta = delta_add(v, max_value); @@ -501,6 +509,9 @@ mod test { impl TDelayedFieldView for BadStorage { type Identifier = (); + type ResourceGroupTag = (); + type ResourceKey = (); + type ResourceValue = (); fn is_delayed_field_optimization_capable(&self) -> bool { unimplemented!("Irrelevant for the test") @@ -526,6 +537,17 @@ mod test { fn generate_delayed_field_id(&self) -> Self::Identifier { unimplemented!("Irrelevant for the test") } + + fn get_reads_needing_exchange( + &self, + _delayed_write_set_keys: &HashSet, + _skip: &HashSet, + ) -> Result< + BTreeMap)>, + PanicError, + > { + unimplemented!("Irrelevant for the test") + } } #[test] diff --git a/aptos-move/aptos-aggregator/src/resolver.rs b/aptos-move/aptos-aggregator/src/resolver.rs index 7424d553e582c..95227f792a04b 100644 --- a/aptos-move/aptos-aggregator/src/resolver.rs +++ b/aptos-move/aptos-aggregator/src/resolver.rs @@ -12,6 +12,7 @@ use crate::{ }; use aptos_state_view::StateView; use aptos_types::{ + aggregator::PanicError, state_store::{ state_key::StateKey, state_value::{StateValue, StateValueMetadataKind}, @@ -23,9 +24,14 @@ use move_core_types::{ account_address::AccountAddress, ident_str, identifier::IdentStr, - language_storage::{ModuleId, CORE_CODE_ADDRESS}, + language_storage::{ModuleId, StructTag, CORE_CODE_ADDRESS}, + value::MoveTypeLayout, vm_status::{StatusCode, VMStatus}, }; +use std::{ + collections::{BTreeMap, HashSet}, + sync::Arc, +}; /// We differentiate between deprecated way to interact with aggregators (TAggregatorV1View), /// and new, more general, TDelayedFieldView. @@ -140,6 +146,9 @@ where /// from the state storage. pub trait TDelayedFieldView { type Identifier; + type ResourceKey; + type ResourceGroupTag; + type ResourceValue; fn is_delayed_field_optimization_capable(&self) -> bool; @@ -174,17 +183,42 @@ pub trait TDelayedFieldView { /// Returns a unique per-block identifier that can be used when creating a /// new aggregator V2. fn generate_delayed_field_id(&self) -> Self::Identifier; + + fn get_reads_needing_exchange( + &self, + delayed_write_set_keys: &HashSet, + skip: &HashSet, + ) -> Result)>, PanicError>; } -pub trait DelayedFieldResolver: TDelayedFieldView {} +pub trait DelayedFieldResolver: + TDelayedFieldView< + Identifier = DelayedFieldID, + ResourceKey = StateKey, + ResourceGroupTag = StructTag, + ResourceValue = WriteOp, +> +{ +} -impl DelayedFieldResolver for T where T: TDelayedFieldView {} +impl DelayedFieldResolver for T where + T: TDelayedFieldView< + Identifier = DelayedFieldID, + ResourceKey = StateKey, + ResourceGroupTag = StructTag, + ResourceValue = WriteOp, + > +{ +} impl TDelayedFieldView for S where S: StateView, { type Identifier = DelayedFieldID; + type ResourceGroupTag = StructTag; + type ResourceKey = StateKey; + type ResourceValue = WriteOp; fn is_delayed_field_optimization_capable(&self) -> bool { // For resolvers that are not capable, it cannot be enabled @@ -213,4 +247,13 @@ where fn generate_delayed_field_id(&self) -> Self::Identifier { unimplemented!("generate_delayed_field_id not implemented") } + + fn get_reads_needing_exchange( + &self, + _delayed_write_set_keys: &HashSet, + _skip: &HashSet, + ) -> Result)>, PanicError> + { + unimplemented!("get_reads_needing_exchange not implemented") + } } diff --git a/aptos-move/aptos-aggregator/src/tests/types.rs b/aptos-move/aptos-aggregator/src/tests/types.rs index 11ad355ef8974..6ed0068d9fd5e 100644 --- a/aptos-move/aptos-aggregator/src/tests/types.rs +++ b/aptos-move/aptos-aggregator/src/tests/types.rs @@ -8,8 +8,17 @@ use crate::{ resolver::{TAggregatorV1View, TDelayedFieldView}, types::{expect_ok, DelayedFieldID, DelayedFieldValue, DelayedFieldsSpeculativeError, PanicOr}, }; -use aptos_types::state_store::{state_key::StateKey, state_value::StateValue}; -use std::{cell::RefCell, collections::HashMap}; +use aptos_types::{ + aggregator::PanicError, + state_store::{state_key::StateKey, state_value::StateValue}, + write_set::WriteOp, +}; +use move_core_types::{language_storage::StructTag, value::MoveTypeLayout}; +use std::{ + cell::RefCell, + collections::{BTreeMap, HashMap, HashSet}, + sync::Arc, +}; pub fn aggregator_v1_id_for_test(key: u128) -> AggregatorID { AggregatorID(aggregator_v1_state_key_for_test(key)) @@ -66,6 +75,9 @@ impl TAggregatorV1View for FakeAggregatorView { impl TDelayedFieldView for FakeAggregatorView { type Identifier = DelayedFieldID; + type ResourceGroupTag = StructTag; + type ResourceKey = StateKey; + type ResourceValue = WriteOp; fn is_delayed_field_optimization_capable(&self) -> bool { true @@ -100,4 +112,13 @@ impl TDelayedFieldView for FakeAggregatorView { *counter += 1; id } + + fn get_reads_needing_exchange( + &self, + _delayed_write_set_keys: &HashSet, + _skip: &HashSet, + ) -> Result)>, PanicError> + { + unimplemented!(); + } } diff --git a/aptos-move/aptos-vm-types/src/change_set.rs b/aptos-move/aptos-vm-types/src/change_set.rs index be601c5f5977a..a41931554a654 100644 --- a/aptos-move/aptos-vm-types/src/change_set.rs +++ b/aptos-move/aptos-vm-types/src/change_set.rs @@ -113,6 +113,7 @@ pub struct VMChangeSet { aggregator_v1_write_set: BTreeMap, aggregator_v1_delta_set: BTreeMap, delayed_field_change_set: BTreeMap>, + reads_needing_delayed_field_exchange: BTreeMap)>, events: Vec<(ContractEvent, Option)>, } @@ -141,6 +142,7 @@ impl VMChangeSet { aggregator_v1_write_set: BTreeMap::new(), aggregator_v1_delta_set: BTreeMap::new(), delayed_field_change_set: BTreeMap::new(), + reads_needing_delayed_field_exchange: BTreeMap::new(), events: vec![], } } @@ -152,6 +154,7 @@ impl VMChangeSet { aggregator_v1_write_set: BTreeMap, aggregator_v1_delta_set: BTreeMap, delayed_field_change_set: BTreeMap>, + reads_needing_delayed_field_exchange: BTreeMap)>, events: Vec<(ContractEvent, Option)>, checker: &dyn CheckChangeSet, ) -> anyhow::Result { @@ -162,6 +165,7 @@ impl VMChangeSet { aggregator_v1_write_set, aggregator_v1_delta_set, delayed_field_change_set, + reads_needing_delayed_field_exchange, events, }; @@ -221,6 +225,7 @@ impl VMChangeSet { aggregator_v1_write_set: BTreeMap::new(), aggregator_v1_delta_set: BTreeMap::new(), delayed_field_change_set: BTreeMap::new(), + reads_needing_delayed_field_exchange: BTreeMap::new(), events, }; checker.check_change_set(&change_set)?; @@ -236,6 +241,7 @@ impl VMChangeSet { // that knows how to deal with it. assert!(self.delayed_field_change_set().is_empty()); assert!(self.resource_group_write_set().is_empty()); + assert!(self.reads_needing_delayed_field_exchange().is_empty()); let Self { resource_write_set, @@ -244,6 +250,7 @@ impl VMChangeSet { aggregator_v1_write_set, aggregator_v1_delta_set: _, delayed_field_change_set: _, + reads_needing_delayed_field_exchange: _, events, } = self; @@ -364,8 +371,8 @@ impl VMChangeSet { pub(crate) fn drain_delayed_field_change_set( &mut self, - ) -> impl Iterator)> + '_ { - std::mem::take(&mut self.delayed_field_change_set).into_iter() + ) -> BTreeMap> { + std::mem::take(&mut self.delayed_field_change_set) } pub fn aggregator_v1_write_set(&self) -> &BTreeMap { @@ -382,6 +389,18 @@ impl VMChangeSet { &self.delayed_field_change_set } + pub fn reads_needing_delayed_field_exchange( + &self, + ) -> &BTreeMap)> { + &self.reads_needing_delayed_field_exchange + } + + pub(crate) fn drain_reads_needing_delayed_field_exchange( + &mut self, + ) -> BTreeMap)> { + std::mem::take(&mut self.reads_needing_delayed_field_exchange) + } + pub fn events(&self) -> &[(ContractEvent, Option)] { &self.events } @@ -399,6 +418,7 @@ impl VMChangeSet { mut aggregator_v1_write_set, aggregator_v1_delta_set, delayed_field_change_set, + reads_needing_delayed_field_exchange, events, } = self; @@ -424,6 +444,7 @@ impl VMChangeSet { aggregator_v1_write_set, aggregator_v1_delta_set: BTreeMap::new(), delayed_field_change_set, + reads_needing_delayed_field_exchange, events, }) } @@ -658,6 +679,33 @@ impl VMChangeSet { Ok(()) } + fn squash_additional_reads_needing_exchange( + reads_needing_exchange: &mut BTreeMap)>, + additional_reads_needing_exchange: BTreeMap)>, + skip: &BTreeMap, + ) -> anyhow::Result<(), VMStatus> { + for key in skip.keys() { + reads_needing_exchange.remove(key); + } + + for (key, additional_value) in additional_reads_needing_exchange.into_iter() { + if skip.contains_key(&key) { + continue; + } + match reads_needing_exchange.entry(key) { + Occupied(entry) => { + // When squashing, reads should always be identical. + // TODO[agg_v2](fix) remove asssertion, as this should always hold. + assert_eq!(entry.get(), &additional_value); + }, + Vacant(entry) => { + entry.insert(additional_value); + }, + } + } + Ok(()) + } + pub fn squash_additional_change_set( &mut self, additional_change_set: Self, @@ -670,6 +718,7 @@ impl VMChangeSet { aggregator_v1_write_set: additional_aggregator_write_set, aggregator_v1_delta_set: additional_aggregator_delta_set, delayed_field_change_set: additional_delayed_field_change_set, + reads_needing_delayed_field_exchange: additional_reads_needing_delayed_field_exchange, events: additional_events, } = additional_change_set; @@ -679,10 +728,6 @@ impl VMChangeSet { additional_aggregator_write_set, additional_aggregator_delta_set, )?; - Self::squash_additional_delayed_field_changes( - &mut self.delayed_field_change_set, - additional_delayed_field_change_set, - )?; Self::squash_additional_resource_writes( &mut self.resource_write_set, additional_resource_write_set, @@ -695,6 +740,15 @@ impl VMChangeSet { &mut self.module_write_set, additional_module_write_set, )?; + Self::squash_additional_delayed_field_changes( + &mut self.delayed_field_change_set, + additional_delayed_field_change_set, + )?; + Self::squash_additional_reads_needing_exchange( + &mut self.reads_needing_delayed_field_exchange, + additional_reads_needing_delayed_field_exchange, + &self.resource_write_set, + )?; self.events.extend(additional_events); checker.check_change_set(self) diff --git a/aptos-move/aptos-vm-types/src/output.rs b/aptos-move/aptos-vm-types/src/output.rs index 3ea921320db5a..1505138b8f5ba 100644 --- a/aptos-move/aptos-vm-types/src/output.rs +++ b/aptos-move/aptos-vm-types/src/output.rs @@ -169,6 +169,7 @@ impl VMOutput { self.change_set.set_events(patched_events.into_iter()); // TODO[agg_v2](cleanup) move drain to happen when getting what to materialize. let _ = self.change_set.drain_delayed_field_change_set(); + let _ = self.change_set.drain_reads_needing_delayed_field_exchange(); let (vm_change_set, gas_used, status) = self.unpack(); let (write_set, events) = vm_change_set diff --git a/aptos-move/aptos-vm-types/src/resolver.rs b/aptos-move/aptos-vm-types/src/resolver.rs index 135731d99f2ad..0133df60c1d02 100644 --- a/aptos-move/aptos-vm-types/src/resolver.rs +++ b/aptos-move/aptos-vm-types/src/resolver.rs @@ -6,10 +6,13 @@ use aptos_aggregator::{ types::DelayedFieldID, }; use aptos_state_view::{StateView, StateViewId}; -use aptos_types::state_store::{ - state_key::StateKey, - state_storage_usage::StateStorageUsage, - state_value::{StateValue, StateValueMetadataKind}, +use aptos_types::{ + state_store::{ + state_key::StateKey, + state_storage_usage::StateStorageUsage, + state_value::{StateValue, StateValueMetadataKind}, + }, + write_set::WriteOp, }; use bytes::Bytes; use move_core_types::{language_storage::StructTag, value::MoveTypeLayout}; @@ -191,28 +194,31 @@ pub trait StateStorageView { /// TODO: audit and reconsider the default implementation (e.g. should not /// resolve AggregatorV2 via the state-view based default implementation, as it /// doesn't provide a value exchange functionality). -pub trait TExecutorView: +pub trait TExecutorView: TResourceView + TModuleView + TAggregatorV1View - + TDelayedFieldView + + TDelayedFieldView + StateStorageView { } -impl TExecutorView for A where +impl TExecutorView for A where A: TResourceView + TModuleView + TAggregatorV1View - + TDelayedFieldView + + TDelayedFieldView + StateStorageView { } -pub trait ExecutorView: TExecutorView {} +pub trait ExecutorView: + TExecutorView +{ +} impl ExecutorView for T where - T: TExecutorView + T: TExecutorView { } diff --git a/aptos-move/aptos-vm-types/src/tests/utils.rs b/aptos-move/aptos-vm-types/src/tests/utils.rs index 72f5673cd2e96..db2bfa7f25644 100644 --- a/aptos-move/aptos-vm-types/src/tests/utils.rs +++ b/aptos-move/aptos-vm-types/src/tests/utils.rs @@ -156,6 +156,8 @@ pub(crate) fn build_change_set( BTreeMap::from_iter(aggregator_v1_write_set), BTreeMap::from_iter(aggregator_v1_delta_set), BTreeMap::from_iter(delayed_field_change_set), + // TODO[agg_v2](fix) add to the caller. + BTreeMap::new(), vec![], &MockChangeSetChecker, ) diff --git a/aptos-move/aptos-vm/src/aptos_vm.rs b/aptos-move/aptos-vm/src/aptos_vm.rs index c992dd6c1588d..c1ae1e8897293 100644 --- a/aptos-move/aptos-vm/src/aptos_vm.rs +++ b/aptos-move/aptos-vm/src/aptos_vm.rs @@ -539,6 +539,12 @@ impl AptosVM { for (key, op) in change_set.write_set_iter() { gas_meter.charge_io_gas_for_write(key, op)?; } + // TODO[agg_v2](fix): Charge SnapshotDerived (string concat) based on lenght, + // as charge below charges based on non-exchanged writes (i.e. identifier being in the read_op) + // Do we want to charge delayed field changes also? + for (key, (read_op, _)) in change_set.reads_needing_delayed_field_exchange().iter() { + gas_meter.charge_io_gas_for_write(key, read_op)?; + } for (key, group_write) in change_set.resource_group_write_set().iter() { gas_meter.charge_io_gas_for_group_write(key, group_write)?; } @@ -552,7 +558,7 @@ impl AptosVM { storage_refund = 0.into(); } - // TODO(Gas): Charge for aggregator writes + // TODO[agg_v1](fix): Charge for aggregator writes let session_id = SessionId::epilogue_meta(txn_data); RespawnedSession::spawn(self, session_id, resolver, change_set, storage_refund) } diff --git a/aptos-move/aptos-vm/src/block_executor/mod.rs b/aptos-move/aptos-vm/src/block_executor/mod.rs index d9788332d12bc..cd0ce4d6d480a 100644 --- a/aptos-move/aptos-vm/src/block_executor/mod.rs +++ b/aptos-move/aptos-vm/src/block_executor/mod.rs @@ -181,6 +181,24 @@ impl BlockExecutorTransactionOutput for AptosTransactionOutput { .clone() } + fn reads_needing_delayed_field_exchange( + &self, + ) -> BTreeMap< + ::Key, + ( + ::Value, + Arc, + ), + > { + self.vm_output + .lock() + .as_ref() + .expect("Output to be set to get reads") + .change_set() + .reads_needing_delayed_field_exchange() + .clone() + } + /// Should never be called after incorporating materialized output, as that consumes vm_output. fn get_events(&self) -> Vec<(ContractEvent, Option)> { self.vm_output diff --git a/aptos-move/aptos-vm/src/data_cache.rs b/aptos-move/aptos-vm/src/data_cache.rs index b273d00eea382..9d5cd4db33206 100644 --- a/aptos-move/aptos-vm/src/data_cache.rs +++ b/aptos-move/aptos-vm/src/data_cache.rs @@ -21,12 +21,14 @@ use aptos_state_view::{StateView, StateViewId}; use aptos_table_natives::{TableHandle, TableResolver}; use aptos_types::{ access_path::AccessPath, + aggregator::PanicError, on_chain_config::{ConfigStorage, Features, OnChainConfig}, state_store::{ state_key::StateKey, state_storage_usage::StateStorageUsage, state_value::{StateValue, StateValueMetadataKind}, }, + write_set::WriteOp, }; use aptos_vm_types::{ resolver::{ @@ -48,6 +50,7 @@ use move_core_types::{ use std::{ cell::RefCell, collections::{BTreeMap, HashMap, HashSet}, + sync::Arc, }; pub(crate) fn get_resource_group_from_metadata( @@ -287,6 +290,9 @@ impl<'e, E: ExecutorView> TAggregatorV1View for StorageAdapter<'e, E> { impl<'e, E: ExecutorView> TDelayedFieldView for StorageAdapter<'e, E> { type Identifier = DelayedFieldID; + type ResourceGroupTag = StructTag; + type ResourceKey = StateKey; + type ResourceValue = WriteOp; fn is_delayed_field_optimization_capable(&self) -> bool { self.executor_view.is_delayed_field_optimization_capable() @@ -313,6 +319,16 @@ impl<'e, E: ExecutorView> TDelayedFieldView for StorageAdapter<'e, E> { fn generate_delayed_field_id(&self) -> Self::Identifier { self.executor_view.generate_delayed_field_id() } + + fn get_reads_needing_exchange( + &self, + delayed_write_set_keys: &HashSet, + skip: &HashSet, + ) -> Result)>, PanicError> + { + self.executor_view + .get_reads_needing_exchange(delayed_write_set_keys, skip) + } } impl<'e, E: ExecutorView> ConfigStorage for StorageAdapter<'e, E> { diff --git a/aptos-move/aptos-vm/src/move_vm_ext/respawned_session.rs b/aptos-move/aptos-vm/src/move_vm_ext/respawned_session.rs index da48e1c97dfce..3f51d9eb245b6 100644 --- a/aptos-move/aptos-vm/src/move_vm_ext/respawned_session.rs +++ b/aptos-move/aptos-vm/src/move_vm_ext/respawned_session.rs @@ -19,10 +19,11 @@ use aptos_aggregator::{ use aptos_gas_algebra::Fee; use aptos_state_view::StateViewId; use aptos_types::{ + aggregator::PanicError, state_store::{ state_key::StateKey, state_storage_usage::StateStorageUsage, state_value::StateValue, }, - write_set::TransactionWrite, + write_set::{TransactionWrite, WriteOp}, }; use aptos_vm_types::{ change_set::VMChangeSet, @@ -38,6 +39,10 @@ use move_core_types::{ value::MoveTypeLayout, vm_status::{err_msg, StatusCode, VMStatus}, }; +use std::{ + collections::{BTreeMap, HashSet}, + sync::Arc, +}; /// We finish the session after the user transaction is done running to get the change set and /// charge gas and storage fee based on it before running storage refunds and the transaction @@ -162,6 +167,9 @@ impl<'r> TAggregatorV1View for ExecutorViewWithChangeSet<'r> { impl<'r> TDelayedFieldView for ExecutorViewWithChangeSet<'r> { type Identifier = DelayedFieldID; + type ResourceGroupTag = StructTag; + type ResourceKey = StateKey; + type ResourceValue = WriteOp; fn is_delayed_field_optimization_capable(&self) -> bool { self.base_executor_view @@ -235,6 +243,16 @@ impl<'r> TDelayedFieldView for ExecutorViewWithChangeSet<'r> { fn generate_delayed_field_id(&self) -> Self::Identifier { self.base_executor_view.generate_delayed_field_id() } + + fn get_reads_needing_exchange( + &self, + delayed_write_set_keys: &HashSet, + skip: &HashSet, + ) -> Result)>, PanicError> + { + self.base_executor_view + .get_reads_needing_exchange(delayed_write_set_keys, skip) + } } impl<'r> TResourceView for ExecutorViewWithChangeSet<'r> { @@ -474,6 +492,7 @@ mod test { aggregator_v1_write_set, aggregator_v1_delta_set, BTreeMap::new(), + BTreeMap::new(), vec![], &NoOpChangeSetChecker, ) diff --git a/aptos-move/aptos-vm/src/move_vm_ext/session.rs b/aptos-move/aptos-vm/src/move_vm_ext/session.rs index fc6698a6300ef..bc39d8f4b2267 100644 --- a/aptos-move/aptos-vm/src/move_vm_ext/session.rs +++ b/aptos-move/aptos-vm/src/move_vm_ext/session.rs @@ -188,7 +188,9 @@ impl<'r, 'l> SessionExt<'r, 'l> { .map_err(|e| e.finish(Location::Undefined))?; let aggregator_context: NativeAggregatorContext = extensions.remove(); - let aggregator_change_set = aggregator_context.into_change_set(); + let aggregator_change_set = aggregator_context + .into_change_set() + .map_err(|e| PartialVMError::from(e).finish(Location::Undefined))?; let event_context: NativeEventContext = extensions.remove(); let events = event_context.into_events(); @@ -408,7 +410,6 @@ impl<'r, 'l> SessionExt<'r, 'l> { let mut module_write_set = BTreeMap::new(); let mut aggregator_v1_write_set = BTreeMap::new(); let mut aggregator_v1_delta_set = BTreeMap::new(); - let mut delayed_field_change_set = BTreeMap::new(); for (addr, account_changeset) in change_set.into_inner() { let (modules, resources) = account_changeset.into_inner(); @@ -471,9 +472,12 @@ impl<'r, 'l> SessionExt<'r, 'l> { } } - for (id, change) in aggregator_change_set.delayed_field_changes { - delayed_field_change_set.insert(id, change); - } + // We need to remove values that are already in the writes. + let reads_needing_exchange = aggregator_change_set + .reads_needing_exchange + .into_iter() + .filter(|(state_key, _)| !resource_write_set.contains_key(state_key)) + .collect(); VMChangeSet::new( resource_write_set, @@ -481,7 +485,8 @@ impl<'r, 'l> SessionExt<'r, 'l> { module_write_set, aggregator_v1_write_set, aggregator_v1_delta_set, - delayed_field_change_set, + aggregator_change_set.delayed_field_changes, + reads_needing_exchange, events, configs, ) diff --git a/aptos-move/aptos-vm/src/natives.rs b/aptos-move/aptos-vm/src/natives.rs index a1e5ebbff3c4b..4104a5e3243b2 100644 --- a/aptos-move/aptos-vm/src/natives.rs +++ b/aptos-move/aptos-vm/src/natives.rs @@ -24,7 +24,9 @@ use aptos_native_interface::SafeNativeBuilder; use aptos_table_natives::{TableHandle, TableResolver}; use aptos_types::{ account_config::CORE_CODE_ADDRESS, + aggregator::PanicError, on_chain_config::{Features, TimedFeatures, TimedFeaturesBuilder}, + write_set::WriteOp, }; #[cfg(feature = "testing")] use aptos_types::{ @@ -34,8 +36,14 @@ use aptos_types::{ #[cfg(feature = "testing")] use bytes::Bytes; #[cfg(feature = "testing")] +use move_core_types::language_storage::StructTag; +#[cfg(feature = "testing")] use move_core_types::value::MoveTypeLayout; use move_vm_runtime::native_functions::NativeFunctionTable; +use std::{ + collections::{BTreeMap, HashSet}, + sync::Arc, +}; #[cfg(feature = "testing")] use { aptos_framework::natives::{ @@ -78,6 +86,9 @@ impl TAggregatorV1View for AptosBlankStorage { #[cfg(feature = "testing")] impl TDelayedFieldView for AptosBlankStorage { type Identifier = DelayedFieldID; + type ResourceGroupTag = StructTag; + type ResourceKey = StateKey; + type ResourceValue = WriteOp; fn is_delayed_field_optimization_capable(&self) -> bool { false @@ -103,6 +114,15 @@ impl TDelayedFieldView for AptosBlankStorage { fn generate_delayed_field_id(&self) -> Self::Identifier { (self.counter.fetch_add(1, Ordering::SeqCst) as u64).into() } + + fn get_reads_needing_exchange( + &self, + _delayed_write_set_keys: &HashSet, + _skip: &HashSet, + ) -> Result)>, PanicError> + { + unimplemented!() + } } #[cfg(feature = "testing")] diff --git a/aptos-move/block-executor/src/executor.rs b/aptos-move/block-executor/src/executor.rs index daa6fae1d72c3..b27600bff5470 100644 --- a/aptos-move/block-executor/src/executor.rs +++ b/aptos-move/block-executor/src/executor.rs @@ -3,7 +3,6 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - captured_reads::DataRead, counters, counters::{ PARALLEL_EXECUTION_SECONDS, RAYON_EXECUTION_SECONDS, TASK_EXECUTE_SECONDS, @@ -614,15 +613,13 @@ where fn map_id_to_values_in_write_set( resource_write_set: Option>)>>, latest_view: &LatestView, - ) -> (BTreeMap, HashSet) { - let mut write_set_keys = HashSet::new(); + ) -> BTreeMap { let mut patched_resource_write_set = BTreeMap::new(); if let Some(resource_write_set) = resource_write_set { for (key, (write_op, layout)) in resource_write_set.iter() { // layout is Some(_) if it contains a delayed field if let Some(layout) = layout { if !write_op.is_deletion() { - write_set_keys.insert(key.clone()); let patched_bytes = match latest_view .replace_identifiers_with_values(write_op.bytes().unwrap(), layout) { @@ -636,103 +633,32 @@ where } } } - (patched_resource_write_set, write_set_keys) + patched_resource_write_set } // Parse the input `value` and replace delayed field identifiers with // corresponding values fn replace_ids_with_values( - value: Arc, + value: &T::Value, layout: &MoveTypeLayout, latest_view: &LatestView, - delayed_field_keys: &HashSet, - ) -> Option { - if let Some(value_bytes) = value.bytes() { - match latest_view.replace_identifiers_with_values(value_bytes, layout) { - Ok((patched_bytes, delayed_field_keys_in_resource)) => { - if !delayed_field_keys.is_disjoint(&delayed_field_keys_in_resource) { - let mut patched_value = value.as_ref().clone(); - patched_value.set_bytes(patched_bytes); - Some(patched_value) - } else { - None - } - }, - Err(_) => unreachable!("Failed to replace identifiers with values in read set"), + ) -> T::Value { + if let Some(mut value) = value.convert_read_to_modification() { + if let Some(value_bytes) = value.bytes() { + let (patched_bytes, _) = latest_view + .replace_identifiers_with_values(value_bytes, layout) + .unwrap(); + value.set_bytes(patched_bytes); + value + } else { + unreachable!("Value to be exchanged doesn't have bytes: {:?}", value) } } else { - // TODO[agg_v2](fix): Is this unreachable? - unreachable!("Data read value must exist"); - } - } - - // For each resource that satisfies the following conditions, - // 1. Resource is in read set - // 2. Resource is not in write set - // replace the delayed field identifiers in the resource with corresponding values. - // If any of the delayed field identifiers in the resource are part of delayed_field_write_set, - // then include the resource in the write set. - fn map_id_to_values_in_read_set_parallel( - txn_idx: TxnIndex, - delayed_field_keys: Option>, - write_set_keys: HashSet, - last_input_output: &TxnLastInputOutput, - latest_view: &LatestView, - ) -> BTreeMap { - let mut patched_resource_write_set = BTreeMap::new(); - if let Some(delayed_field_keys) = delayed_field_keys { - let delayed_field_keys = delayed_field_keys.collect::>(); - let read_set = last_input_output.read_set(txn_idx); - if let Some(read_set) = read_set { - for (key, data_read) in read_set.get_read_values_with_delayed_fields() { - if write_set_keys.contains(key) { - continue; - } - // layout is Some(_) if it contains an delayed field - if let DataRead::Versioned(_version, value, Some(layout)) = data_read { - if let Some(patched_value) = Self::replace_ids_with_values( - value.clone(), - layout, - latest_view, - &delayed_field_keys, - ) { - patched_resource_write_set.insert(key.clone(), patched_value); - } - } - } - } - } - patched_resource_write_set - } - - fn map_id_to_values_in_read_set_sequential( - delayed_field_keys: Option>, - write_set_keys: HashSet, - read_set: RefCell>, - unsync_map: &UnsyncMap, - latest_view: &LatestView, - ) -> HashMap { - let mut patched_resource_write_set = HashMap::new(); - if let Some(delayed_field_keys) = delayed_field_keys { - let delayed_field_keys = delayed_field_keys.collect::>(); - for key in read_set.borrow().iter() { - if write_set_keys.contains(key) { - continue; - } - // layout is Some(_) if it contains an delayed field - if let Some((value, Some(layout))) = unsync_map.fetch_data(key) { - if let Some(patched_value) = Self::replace_ids_with_values( - value.clone(), - &layout, - latest_view, - &delayed_field_keys, - ) { - patched_resource_write_set.insert(key.clone(), patched_value); - } - } - } + unreachable!( + "Value to be exchanged cannot be transformed to modification: {:?}", + value + ); } - patched_resource_write_set } // For each delayed field in the event, replace delayed field identifier with value. @@ -849,18 +775,21 @@ where let parallel_state = ParallelState::::new(versioned_cache, scheduler, shared_counter); let latest_view = LatestView::new(base_view, ViewState::Sync(parallel_state), txn_idx); let resource_write_set = last_input_output.resource_write_set(txn_idx); - let delayed_field_keys = last_input_output.delayed_field_keys(txn_idx); let finalized_groups = last_input_output.take_finalized_group(txn_idx); - let (mut patched_resource_write_set, write_set_keys) = + let mut patched_resource_write_set = Self::map_id_to_values_in_write_set(resource_write_set, &latest_view); - patched_resource_write_set.extend(Self::map_id_to_values_in_read_set_parallel( - txn_idx, - delayed_field_keys, - write_set_keys, - last_input_output, - &latest_view, - )); + + if let Some(reads_needing_delayed_field_exchange) = + last_input_output.reads_needing_delayed_field_exchange(txn_idx) + { + for (key, (value, layout)) in reads_needing_delayed_field_exchange.into_iter() { + patched_resource_write_set.insert( + key, + Self::replace_ids_with_values(&value, layout.as_ref(), &latest_view), + ); + } + } let events = last_input_output.events(txn_idx); let patched_events = Self::map_id_to_values_events(events, &latest_view); @@ -1262,22 +1191,29 @@ where if dynamic_change_set_optimizations_enabled { // Replace delayed field id with values in resource write set and read set. - let delayed_field_keys = - Some(output.delayed_field_change_set().into_keys()); let resource_change_set = Some(output.resource_write_set()); - let (mut patched_resource_write_set, write_set_keys) = + let mut patched_resource_write_set = Self::map_id_to_values_in_write_set(resource_change_set, &latest_view); - let read_set = latest_view.read_set_sequential_execution(); - patched_resource_write_set.extend( - Self::map_id_to_values_in_read_set_sequential( - delayed_field_keys, - write_set_keys, - read_set, - &unsync_map, - &latest_view, - ), - ); + for (key, (value, layout)) in + output.reads_needing_delayed_field_exchange().into_iter() + { + if patched_resource_write_set + .insert( + key, + Self::replace_ids_with_values( + &value, + layout.as_ref(), + &latest_view, + ), + ) + .is_some() + { + return Err(Error::FallbackToSequential(code_invariant_error( + "reads_needing_delayed_field_exchange already in the write set for key", + ).into())); + } + } // Replace delayed field id with values in events let patched_events = Self::map_id_to_values_events( diff --git a/aptos-move/block-executor/src/proptest_types/types.rs b/aptos-move/block-executor/src/proptest_types/types.rs index bc20c636e494a..c4c195c00d76e 100644 --- a/aptos-move/block-executor/src/proptest_types/types.rs +++ b/aptos-move/block-executor/src/proptest_types/types.rs @@ -297,6 +297,13 @@ impl TransactionWrite for ValueType { fn set_bytes(&mut self, bytes: Bytes) { self.bytes = bytes.into(); } + + fn convert_read_to_modification(&self) -> Option + where + Self: Sized, + { + Some(self.clone()) + } } #[derive(Clone, Copy)] @@ -827,7 +834,7 @@ where fn execute_transaction( &self, - view: &(impl TExecutorView + view: &(impl TExecutorView + TResourceGroupView), txn: &Self::Txn, txn_idx: TxnIndex, @@ -1023,6 +1030,16 @@ where BTreeMap::new() } + fn reads_needing_delayed_field_exchange( + &self, + ) -> BTreeMap< + ::Key, + (::Value, Arc), + > { + // TODO[agg_v2](tests): add aggregators V2 to the proptest? + BTreeMap::new() + } + // TODO[agg_v2](tests): Currently, appending None to all events, which means none of the // events have aggregators. Test it with aggregators as well. fn get_events(&self) -> Vec<(E, Option)> { diff --git a/aptos-move/block-executor/src/task.rs b/aptos-move/block-executor/src/task.rs index 5ba8bf6fdfb8a..98ad5354d3665 100644 --- a/aptos-move/block-executor/src/task.rs +++ b/aptos-move/block-executor/src/task.rs @@ -66,6 +66,7 @@ pub trait ExecutorTask: Sync { ::Tag, MoveTypeLayout, ::Identifier, + ::Value, > + TResourceGroupView< GroupKey = ::Key, ResourceTag = ::Tag, @@ -113,6 +114,13 @@ pub trait TransactionOutput: Send + Sync + Debug { DelayedChange<::Identifier>, >; + fn reads_needing_delayed_field_exchange( + &self, + ) -> BTreeMap< + ::Key, + (::Value, Arc), + >; + /// Get the events of a transaction from its output. fn get_events(&self) -> Vec<(::Event, Option)>; diff --git a/aptos-move/block-executor/src/txn_last_input_output.rs b/aptos-move/block-executor/src/txn_last_input_output.rs index 88feec2dc912f..b943aece939bb 100644 --- a/aptos-move/block-executor/src/txn_last_input_output.rs +++ b/aptos-move/block-executor/src/txn_last_input_output.rs @@ -286,6 +286,24 @@ impl, E: Debug + Send + Clone> }) } + pub(crate) fn reads_needing_delayed_field_exchange( + &self, + txn_idx: TxnIndex, + ) -> Option)>> { + self.outputs[txn_idx as usize] + .load() + .as_ref() + .and_then(|txn_output| match &txn_output.output_status { + ExecutionStatus::Success(t) | ExecutionStatus::SkipRest(t) => { + Some(t.reads_needing_delayed_field_exchange()) + }, + ExecutionStatus::Abort(_) + | ExecutionStatus::DirectWriteSetTransactionNotCapableError + | ExecutionStatus::SpeculativeExecutionAbortError(_) + | ExecutionStatus::DelayedFieldsCodeInvariantError(_) => None, + }) + } + pub(crate) fn aggregator_v1_delta_keys(&self, txn_idx: TxnIndex) -> Vec { self.outputs[txn_idx as usize].load().as_ref().map_or( vec![], diff --git a/aptos-move/block-executor/src/view.rs b/aptos-move/block-executor/src/view.rs index 3be7636ac6afb..07a212108f1d5 100644 --- a/aptos-move/block-executor/src/view.rs +++ b/aptos-move/block-executor/src/view.rs @@ -29,6 +29,7 @@ use aptos_mvhashmap::{ }; use aptos_state_view::{StateViewId, TStateView}; use aptos_types::{ + aggregator::PanicError, executable::{Executable, ModulePath}, state_store::{ state_storage_usage::StateStorageUsage, @@ -661,15 +662,6 @@ impl<'a, T: Transaction, S: TStateView, X: Executable> LatestView< } } - pub(crate) fn read_set_sequential_execution(&self) -> RefCell> { - match &self.latest_view { - ViewState::Sync(_) => unreachable!( - "Read set for sequential execution while running in parallel execution mode" - ), - ViewState::Unsync(state) => state.read_set.clone(), - } - } - fn get_base_value(&self, state_key: &T::Key) -> anyhow::Result> { let ret = self.base_view.get_state_value(state_key); @@ -741,6 +733,94 @@ impl<'a, T: Transaction, S: TStateView, X: Executable> LatestView< Ok((patched_bytes, mapping.into_inner())) } + // Given a bytes, where values were already exchanged with idnetifiers, + // return a list of identifiers present in it. + fn extract_identifiers_from_value( + &self, + bytes: &Bytes, + layout: &MoveTypeLayout, + ) -> anyhow::Result> { + let mapping = TemporaryExtractIdentifiersMapping::::new(); + // TODO[agg_v2](cleanup) rename deserialize_and_replace_values_with_ids to not be specific to mapping trait implementation + // TODO[agg_v2](cleanup) provide traversal method, that doesn't create unnecessary patched value. + let _patched_value = + deserialize_and_replace_values_with_ids(bytes.as_ref(), layout, &mapping).ok_or_else( + || anyhow::anyhow!("Failed to deserialize resource during id replacement"), + )?; + Ok(mapping.into_inner()) + } + + fn does_value_need_exchange( + &self, + value: &T::Value, + layout: &Arc, + delayed_write_set_keys: &HashSet, + key: &T::Key, + ) -> Option)), PanicError>> { + if let Some(bytes) = value.bytes() { + let identifiers_in_read_result = self.extract_identifiers_from_value(bytes, layout); + + match identifiers_in_read_result { + Ok(identifiers_in_read) => { + if !delayed_write_set_keys.is_disjoint(&identifiers_in_read) { + return Some(Ok((key.clone(), (value.clone(), layout.clone())))); + } + }, + Err(e) => { + return Some(Err(code_invariant_error(format!("Cannot extract identifiers from value that identifiers were exchanged into before {:?}", e)))) + } + } + } + None + } + + fn get_reads_needing_exchange_parallel( + &self, + read_set: &CapturedReads, + delayed_write_set_keys: &HashSet, + skip: &HashSet, + ) -> Result)>, PanicError> { + read_set + .get_read_values_with_delayed_fields() + .filter(|(key, _)| !skip.contains(key)) + .flat_map(|(key, data_read)| { + if let DataRead::Versioned(_version, value, Some(layout)) = data_read { + return self.does_value_need_exchange( + value, + layout, + delayed_write_set_keys, + key, + ); + } + None + }) + .collect() + } + + fn get_reads_needing_exchange_sequential( + &self, + read_set: &HashSet, + unsync_map: &UnsyncMap, + delayed_write_set_keys: &HashSet, + skip: &HashSet, + ) -> Result)>, PanicError> { + read_set + .iter() + .filter(|key| !skip.contains(key)) + .flat_map(|key| { + if let Some((value, Some(layout))) = unsync_map.fetch_data(key) { + return self.does_value_need_exchange( + &value, + &layout, + delayed_write_set_keys, + key, + ); + } + None + }) + .collect() + } + fn get_resource_state_value_impl( &self, state_key: &T::Key, @@ -825,48 +905,46 @@ impl<'a, T: Transaction, S: TStateView, X: Executable> LatestView< }, None => { let from_storage = self.get_base_value(state_key)?; - - Ok( - match ( - kind.clone(), - from_storage, - maybe_layout, - state.dynamic_change_set_optimizations_enabled, - ) { - (ReadKind::Value, Some(state_value), Some(layout), true) => { - let res = - self.replace_values_with_identifiers(state_value, layout); - let patched_state_value = match res { - Ok((patched_state_value, _)) => { - state.read_set.borrow_mut().insert(state_key.clone()); - state.unsync_map.write( - state_key.clone(), - TransactionWrite::from_state_value(Some( - patched_state_value.clone(), - )), - maybe_layout.map(|layout| Arc::new(layout.clone())), - ); - Some(patched_state_value) - }, - Err(err) => { - let log_context = AdapterLogSchema::new( - self.base_view.id(), - self.txn_idx as usize, - ); - alert!( + let maybe_patched_from_storage = match ( + kind.clone(), + from_storage, + maybe_layout, + state.dynamic_change_set_optimizations_enabled, + ) { + (ReadKind::Value, Some(state_value), Some(layout), true) => { + let res = self + .replace_values_with_identifiers(state_value.clone(), layout); + let patched_state_value = match res { + Ok((patched_state_value, _)) => { + state.read_set.borrow_mut().insert(state_key.clone()); + Some(patched_state_value) + }, + Err(err) => { + let log_context = AdapterLogSchema::new( + self.base_view.id(), + self.txn_idx as usize, + ); + alert!( log_context, "[VM, ResourceView] Error during value to id replacement for {:?}: {}", state_key, err ); - None - }, - }; - patched_state_value - }, - (_, maybe_state_value, _, _) => maybe_state_value, + None + }, + }; + patched_state_value }, - ) + (_, maybe_state_value, _, _) => maybe_state_value, + }; + + state.unsync_map.write( + state_key.clone(), + TransactionWrite::from_state_value(maybe_patched_from_storage.clone()), + maybe_layout.cloned().map(Arc::new), + ); + + Ok(maybe_patched_from_storage) }, }; ret.map(|maybe_state_value| match kind { @@ -1106,6 +1184,9 @@ impl<'a, T: Transaction, S: TStateView, X: Executable> TDelayedFie for LatestView<'a, T, S, X> { type Identifier = T::Identifier; + type ResourceGroupTag = T::Tag; + type ResourceKey = T::Key; + type ResourceValue = T::Value; fn is_delayed_field_optimization_capable(&self) -> bool { match &self.latest_view { @@ -1174,6 +1255,40 @@ impl<'a, T: Transaction, S: TStateView, X: Executable> TDelayedFie }, } } + + // TODO - update comment. + // For each resource that satisfies the following conditions, + // 1. Resource is in read set + // 2. Resource is not in write set + // replace the delayed field identifiers in the resource with corresponding values. + // If any of the delayed field identifiers in the resource are part of delayed_field_write_set, + // then include the resource in the write set. + fn get_reads_needing_exchange( + &self, + delayed_write_set_keys: &HashSet, + skip: &HashSet, + ) -> Result)>, PanicError> + { + match &self.latest_view { + ViewState::Sync(state) => { + let captured_reads = state.captured_reads.borrow(); + self.get_reads_needing_exchange_parallel( + &captured_reads, + delayed_write_set_keys, + skip, + ) + }, + ViewState::Unsync(state) => { + let read_set = state.read_set.borrow(); + self.get_reads_needing_exchange_sequential( + &read_set, + state.unsync_map, + delayed_write_set_keys, + skip, + ) + }, + } + } } struct TemporaryValueToIdentifierMapping< @@ -1205,7 +1320,7 @@ impl<'a, T: Transaction, S: TStateView, X: Executable> } pub fn into_inner(self) -> HashSet { - self.delayed_field_keys.borrow().clone() + self.delayed_field_keys.into_inner() } } @@ -1257,6 +1372,51 @@ impl<'a, T: Transaction, S: TStateView, X: Executable> ValueToIden } } +struct TemporaryExtractIdentifiersMapping { + // These are the delayed field keys that were touched when utilizing this mapping + // to replace ids with values or values with ids + delayed_field_keys: RefCell>, +} + +impl TemporaryExtractIdentifiersMapping { + pub fn new() -> Self { + Self { + delayed_field_keys: RefCell::new(HashSet::new()), + } + } + + pub fn into_inner(self) -> HashSet { + self.delayed_field_keys.into_inner() + } +} + +impl ValueToIdentifierMapping for TemporaryExtractIdentifiersMapping { + fn value_to_identifier( + &self, + _kind: &IdentifierMappingKind, + layout: &MoveTypeLayout, + value: Value, + ) -> TransformationResult { + let id = T::Identifier::try_from_move_value(layout, value, &()) + .map_err(|e| TransformationError(format!("{:?}", e)))?; + self.delayed_field_keys.borrow_mut().insert(id); + id.try_into_move_value(layout) + .map_err(|e| TransformationError(format!("{:?}", e))) + } + + fn identifier_to_value( + &self, + layout: &MoveTypeLayout, + identifier_value: Value, + ) -> TransformationResult { + let id = T::Identifier::try_from_move_value(layout, identifier_value, &()) + .map_err(|e| TransformationError(format!("{:?}", e)))?; + self.delayed_field_keys.borrow_mut().insert(id); + id.try_into_move_value(layout) + .map_err(|e| TransformationError(format!("{:?}", e))) + } +} + #[cfg(test)] mod test { use super::LatestView; diff --git a/aptos-move/framework/src/natives/aggregator_natives/context.rs b/aptos-move/framework/src/natives/aggregator_natives/context.rs index 79210eb19a3e3..7a79f3f0e459e 100644 --- a/aptos-move/framework/src/natives/aggregator_natives/context.rs +++ b/aptos-move/framework/src/natives/aggregator_natives/context.rs @@ -10,9 +10,14 @@ use aptos_aggregator::{ resolver::{AggregatorV1Resolver, DelayedFieldResolver}, types::DelayedFieldID, }; -use aptos_types::state_store::state_key::StateKey; +use aptos_types::{aggregator::PanicError, state_store::state_key::StateKey, write_set::WriteOp}; use better_any::{Tid, TidAble}; -use std::{cell::RefCell, collections::HashMap}; +use move_core_types::value::MoveTypeLayout; +use std::{ + cell::RefCell, + collections::{BTreeMap, HashSet}, + sync::Arc, +}; /// Represents a single aggregator change. #[derive(Copy, Clone, Debug, PartialEq, Eq)] @@ -29,8 +34,9 @@ pub enum AggregatorChangeV1 { /// set can be converted into appropriate `WriteSet` and `DeltaChangeSet` by the /// user, e.g. VM session. pub struct AggregatorChangeSet { - pub aggregator_v1_changes: HashMap, - pub delayed_field_changes: HashMap>, + pub aggregator_v1_changes: BTreeMap, + pub delayed_field_changes: BTreeMap>, + pub reads_needing_exchange: BTreeMap)>, } /// Native context that can be attached to VM `NativeContextExtensions`. @@ -69,7 +75,7 @@ impl<'a> NativeAggregatorContext<'a> { /// Returns all changes made within this context (i.e. by a single /// transaction). - pub fn into_change_set(self) -> AggregatorChangeSet { + pub fn into_change_set(self) -> Result { let NativeAggregatorContext { aggregator_v1_data, delayed_field_data, @@ -77,7 +83,7 @@ impl<'a> NativeAggregatorContext<'a> { } = self; let (_, destroyed_aggregators, aggregators) = aggregator_v1_data.into_inner().into(); - let mut aggregator_v1_changes = HashMap::new(); + let mut aggregator_v1_changes = BTreeMap::new(); // First, process all writes and deltas. for (id, aggregator) in aggregators { @@ -107,11 +113,31 @@ impl<'a> NativeAggregatorContext<'a> { } let delayed_field_changes = delayed_field_data.into_inner().into(); - - AggregatorChangeSet { + let delayed_write_set_keys = delayed_field_changes + .keys() + .cloned() + .collect::>(); + Ok(AggregatorChangeSet { aggregator_v1_changes, - delayed_field_changes: HashMap::from_iter(delayed_field_changes), - } + delayed_field_changes, + // is_empty check covers both whether delayed fields are enabled or not, as well as whether there + // are any changes that would require computing reads needing exchange. + // TODO[agg_v2](optimize) we only later compute the the write set, so cannot pass the correct skip values here. + reads_needing_exchange: if delayed_write_set_keys.is_empty() { + BTreeMap::new() + } else { + self.delayed_field_resolver + .get_reads_needing_exchange(&delayed_write_set_keys, &HashSet::new())? + }, + }) + } + + #[cfg(test)] + fn into_delayed_fields(self) -> BTreeMap> { + let NativeAggregatorContext { + delayed_field_data, .. + } = self; + delayed_field_data.into_inner().into() } } @@ -193,7 +219,7 @@ mod test { let AggregatorChangeSet { aggregator_v1_changes, .. - } = context.into_change_set(); + } = context.into_change_set().unwrap(); assert!(!aggregator_v1_changes.contains_key(&aggregator_v1_state_key_for_test(100))); assert_matches!( @@ -418,10 +444,7 @@ mod test { let resolver = get_test_resolver_v2(); let context = NativeAggregatorContext::new([0; 32], &resolver, &resolver); test_set_up_v2(&context); - let AggregatorChangeSet { - delayed_field_changes, - .. - } = context.into_change_set(); + let delayed_field_changes = context.into_delayed_fields(); assert!(!delayed_field_changes.contains_key(&DelayedFieldID::new(1000))); assert_some_eq!( delayed_field_changes.get(&DelayedFieldID::new(900)), diff --git a/aptos-move/mvhashmap/src/types.rs b/aptos-move/mvhashmap/src/types.rs index a086cd58a2ada..0014de735d83d 100644 --- a/aptos-move/mvhashmap/src/types.rs +++ b/aptos-move/mvhashmap/src/types.rs @@ -218,7 +218,7 @@ pub(crate) mod test { // group base values (used in some tests), and most tests do not care about // the kind. Otherwise, there are specific constructors that initialize kind // for the tests that care (testing group commit logic in parallel). - #[derive(Debug, PartialEq, Eq)] + #[derive(Debug, Clone, PartialEq, Eq)] pub(crate) struct TestValue { bytes: Bytes, kind: WriteOpKind, @@ -292,6 +292,13 @@ pub(crate) mod test { fn set_bytes(&mut self, bytes: Bytes) { self.bytes = bytes; } + + fn convert_read_to_modification(&self) -> Option + where + Self: Sized, + { + Some(self.clone()) + } } // Generate a Vec deterministically based on txn_idx and incarnation. diff --git a/aptos-move/mvhashmap/src/unit_tests/proptest_types.rs b/aptos-move/mvhashmap/src/unit_tests/proptest_types.rs index 882830ca0fa16..937620183bcc2 100644 --- a/aptos-move/mvhashmap/src/unit_tests/proptest_types.rs +++ b/aptos-move/mvhashmap/src/unit_tests/proptest_types.rs @@ -85,6 +85,14 @@ impl> + Clone + Debug> TransactionWrite for Value { fn set_bytes(&mut self, bytes: Bytes) { self.maybe_bytes = Some(bytes); } + + fn convert_read_to_modification(&self) -> Option + where + Self: Sized, + { + // If we have no bytes, no modification can be created. + self.maybe_bytes.as_ref().map(|_| self.clone()) + } } enum Data { diff --git a/types/src/transaction/mod.rs b/types/src/transaction/mod.rs index d2ba98e07f592..785f9fd751ff2 100644 --- a/types/src/transaction/mod.rs +++ b/types/src/transaction/mod.rs @@ -1872,6 +1872,6 @@ pub trait BlockExecutableTransaction: Sync + Send + Clone + 'static { + From + TryIntoMoveValue + TryFromMoveValue; - type Value: Send + Sync + Clone + TransactionWrite; + type Value: Send + Sync + Debug + Clone + TransactionWrite; type Event: Send + Sync + Debug + Clone + ReadWriteEvent; } diff --git a/types/src/write_set.rs b/types/src/write_set.rs index bdff407a59aa4..fcb8544af8ed5 100644 --- a/types/src/write_set.rs +++ b/types/src/write_set.rs @@ -192,6 +192,13 @@ pub trait TransactionWrite: Debug { } fn set_bytes(&mut self, bytes: Bytes); + + /// Convert a `self`, which was read (containing DelayedField exchanges) in a current + /// transaction, to a modification write, in which we can then exchange DelayedField + /// identifiers into their final values, to produce a write operation. + fn convert_read_to_modification(&self) -> Option + where + Self: Sized; } impl TransactionWrite for WriteOp { @@ -242,6 +249,21 @@ impl TransactionWrite for WriteOp { Deletion | DeletionWithMetadata { .. } => (), } } + + fn convert_read_to_modification(&self) -> Option { + use WriteOp::*; + + match self { + Creation(data) | Modification(data) => Some(Modification(data.clone())), + CreationWithMetadata { data, metadata } + | ModificationWithMetadata { data, metadata } => Some(ModificationWithMetadata { + data: data.clone(), + metadata: metadata.clone(), + }), + // Deletion don't have data to become modification. + Deletion | DeletionWithMetadata { .. } => None, + } + } } impl std::fmt::Debug for WriteOp {