From 20ac111a84252b88bdcff15e38d18c35fbcd2d17 Mon Sep 17 00:00:00 2001 From: Dmitrii Novikov Date: Thu, 12 Dec 2024 16:14:59 +0400 Subject: [PATCH] feat(ethexe): sparse memory pages (#4390) --- Makefile | 2 +- ethexe/db/src/database.rs | 15 +++- ethexe/processor/src/host/threads.rs | 58 +++++++----- ethexe/runtime/common/src/journal.rs | 4 +- ethexe/runtime/common/src/lib.rs | 15 +--- ethexe/runtime/common/src/state.rs | 127 +++++++++++++++++++++++++-- ethexe/runtime/src/wasm/storage.rs | 18 ++-- 7 files changed, 191 insertions(+), 48 deletions(-) diff --git a/Makefile b/Makefile index 4a9cbb907ed..cad8f050225 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,7 @@ ethexe-pre-commit: ethexe-contracts-pre-commit ethexe-pre-commit-no-contracts ethexe-pre-commit-no-contracts: @ echo " > Formatting ethexe" && cargo +nightly fmt --all -- --config imports_granularity=Crate,edition=2021 @ echo " >> Clippy checking ethexe" && cargo clippy -p "ethexe-*" --all-targets --all-features -- --no-deps -D warnings - @ echo " >>> Testing ethexe" && cargo test -p "ethexe-*" + @ echo " >>> Testing ethexe" && cargo nextest run -p "ethexe-*" # Building ethexe contracts .PHONY: ethexe-contracts-pre-commit diff --git a/ethexe/db/src/database.rs b/ethexe/db/src/database.rs index 11e7bff7756..ef60d9b150c 100644 --- a/ethexe/db/src/database.rs +++ b/ethexe/db/src/database.rs @@ -28,8 +28,8 @@ use ethexe_common::{ gear::StateTransition, }; use ethexe_runtime_common::state::{ - Allocations, DispatchStash, HashOf, Mailbox, MemoryPages, MessageQueue, ProgramState, Storage, - Waitlist, + Allocations, DispatchStash, HashOf, Mailbox, MemoryPages, MemoryPagesRegion, MessageQueue, + ProgramState, Storage, Waitlist, }; use gear_core::{ code::InstrumentedCode, @@ -531,10 +531,21 @@ impl Storage for Database { }) } + fn read_pages_region(&self, hash: HashOf) -> Option { + self.cas.read(&hash.hash()).map(|data| { + MemoryPagesRegion::decode(&mut &data[..]) + .expect("Failed to decode data into `MemoryPagesRegion`") + }) + } + fn write_pages(&self, pages: MemoryPages) -> HashOf { unsafe { HashOf::new(self.cas.write(&pages.encode())) } } + fn write_pages_region(&self, pages_region: MemoryPagesRegion) -> HashOf { + unsafe { HashOf::new(self.cas.write(&pages_region.encode())) } + } + fn read_allocations(&self, hash: HashOf) -> Option { self.cas.read(&hash.hash()).map(|data| { Allocations::decode(&mut &data[..]).expect("Failed to decode data into `Allocations`") diff --git a/ethexe/processor/src/host/threads.rs b/ethexe/processor/src/host/threads.rs index 1f2370f01ef..5aab0d896f3 100644 --- a/ethexe/processor/src/host/threads.rs +++ b/ethexe/processor/src/host/threads.rs @@ -22,7 +22,10 @@ use crate::Database; use core::fmt; use ethexe_db::BlockMetaStorage; use ethexe_runtime_common::{ - state::{ActiveProgram, HashOf, Program, ProgramState, Storage}, + state::{ + ActiveProgram, HashOf, MemoryPages, MemoryPagesInner, MemoryPagesRegionInner, Program, + ProgramState, RegionIdx, Storage, + }, BlockInfo, }; use gear_core::{ids::ProgramId, memory::PageBuf, pages::GearPage}; @@ -42,7 +45,8 @@ pub struct ThreadParams { pub db: Database, pub block_info: BlockInfo, pub state_hash: H256, - pub pages: Option>>, + pages_registry_cache: Option, + pages_regions_cache: Option>, } impl fmt::Debug for ThreadParams { @@ -52,19 +56,38 @@ impl fmt::Debug for ThreadParams { } impl ThreadParams { - pub fn pages(&mut self) -> &BTreeMap> { - self.pages.get_or_insert_with(|| { + pub fn get_page_region( + &mut self, + page: GearPage, + ) -> Option<&BTreeMap>> { + let pages_registry = self.pages_registry_cache.get_or_insert_with(|| { let ProgramState { program: Program::Active(ActiveProgram { pages_hash, .. }), .. } = self.db.read_state(self.state_hash).expect(UNKNOWN_STATE) else { - // TODO: consider me. - panic!("Couldn't get pages hash for inactive program!") + unreachable!("program that is currently running can't be inactive"); }; pages_hash.query(&self.db).expect(UNKNOWN_STATE).into() - }) + }); + + let region_idx = MemoryPages::page_region(page); + + let region_hash = pages_registry.get(®ion_idx)?; + + let pages_regions = self + .pages_regions_cache + .get_or_insert_with(Default::default); + + let page_region = pages_regions.entry(region_idx).or_insert_with(|| { + self.db + .read_pages_region(*region_hash) + .expect("Pages region not found") + .into() + }); + + Some(&*page_region) } } @@ -92,11 +115,11 @@ pub fn set(db: Database, chain_head: H256, state_hash: H256) { timestamp: header.timestamp, }, state_hash, - pages: None, + pages_registry_cache: None, + pages_regions_cache: None, })) } -// TODO: consider Database mutability. pub fn with_db(f: impl FnOnce(&Database) -> T) -> T { PARAMS.with_borrow(|v| { let params = v.as_ref().expect(UNSET_PANIC); @@ -113,16 +136,6 @@ pub fn chain_head_info() -> BlockInfo { }) } -// TODO: consider Database mutability. -#[allow(unused)] -pub fn with_db_and_state_hash(f: impl FnOnce(&Database, H256) -> T) -> T { - PARAMS.with_borrow(|v| { - let params = v.as_ref().expect(UNSET_PANIC); - - f(¶ms.db, params.state_hash) - }) -} - pub fn with_params(f: impl FnOnce(&mut ThreadParams) -> T) -> T { PARAMS.with_borrow_mut(|v| { let params = v.as_mut().expect(UNSET_PANIC); @@ -139,7 +152,7 @@ impl LazyPagesStorage for EthexeHostLazyPages { with_params(|params| { let page = PageKey::page_from_buf(key); - let page_hash = params.pages().get(&page).cloned()?; + let page_hash = params.get_page_region(page)?.get(&page).cloned()?; let data = params.db.read_page_data(page_hash).expect("Page not found"); @@ -153,7 +166,10 @@ impl LazyPagesStorage for EthexeHostLazyPages { with_params(|params| { let page = PageKey::page_from_buf(key); - params.pages().contains_key(&page) + params + .get_page_region(page) + .map(|region| region.contains_key(&page)) + .unwrap_or(false) }) } } diff --git a/ethexe/runtime/common/src/journal.rs b/ethexe/runtime/common/src/journal.rs index af4b6e73729..c6fafedfc6a 100644 --- a/ethexe/runtime/common/src/journal.rs +++ b/ethexe/runtime/common/src/journal.rs @@ -323,7 +323,7 @@ impl JournalHandler for Handler<'_, S> { }; pages_hash.modify_pages(storage, |pages| { - pages.update(storage.write_pages_data(pages_data)); + pages.update_and_store_regions(storage, storage.write_pages_data(pages_data)); }); Ok(()) @@ -351,7 +351,7 @@ impl JournalHandler for Handler<'_, S> { if !removed_pages.is_empty() { pages_hash.modify_pages(storage, |pages| { - pages.remove(&removed_pages); + pages.remove_and_store_regions(storage, &removed_pages); }) } }); diff --git a/ethexe/runtime/common/src/lib.rs b/ethexe/runtime/common/src/lib.rs index 612adacd046..c9af5daa61a 100644 --- a/ethexe/runtime/common/src/lib.rs +++ b/ethexe/runtime/common/src/lib.rs @@ -22,7 +22,7 @@ extern crate alloc; -use alloc::{collections::BTreeMap, vec::Vec}; +use alloc::vec::Vec; use core_processor::{ common::{ExecutableActorData, JournalNote}, configs::{BlockConfig, SyscallName}, @@ -31,14 +31,12 @@ use core_processor::{ use gear_core::{ code::{InstrumentedCode, MAX_WASM_PAGES_AMOUNT}, ids::ProgramId, - memory::PageBuf, message::{DispatchKind, IncomingDispatch, IncomingMessage}, - pages::GearPage, }; use gear_lazy_pages_common::LazyPagesInterface; use gprimitives::CodeId; use gsys::{GasMultiplier, Percent}; -use state::{Dispatch, HashOf, ProgramState, Storage}; +use state::{Dispatch, ProgramState, Storage}; pub use core_processor::configs::BlockInfo; pub use journal::Handler as JournalHandler; @@ -59,7 +57,7 @@ pub trait RuntimeInterface { type LazyPages: LazyPagesInterface + 'static; fn block_info(&self) -> BlockInfo; - fn init_lazy_pages(&self, pages_map: BTreeMap>); + fn init_lazy_pages(&self); fn random_data(&self) -> (Vec, u32); fn storage(&self) -> &S; } @@ -236,11 +234,6 @@ where Err(journal) => return journal, }; - let pages_map = active_state.pages_hash.with_hash_or_default(|hash| { - ri.storage() - .read_pages(hash) - .expect("Cannot get memory pages") - }); let actor_data = ExecutableActorData { allocations: allocations.into(), code_id, @@ -271,7 +264,7 @@ where let random_data = ri.random_data(); - ri.init_lazy_pages(pages_map.into()); + ri.init_lazy_pages(); core_processor::process::>(&block_config, execution_context, random_data) .unwrap_or_else(|err| unreachable!("{err}")) diff --git a/ethexe/runtime/common/src/state.rs b/ethexe/runtime/common/src/state.rs index 553e23b4d66..596add3a492 100644 --- a/ethexe/runtime/common/src/state.rs +++ b/ethexe/runtime/common/src/state.rs @@ -55,6 +55,7 @@ mod private { impl Sealed for DispatchStash {} impl Sealed for Mailbox {} impl Sealed for MemoryPages {} + impl Sealed for MemoryPagesRegion {} impl Sealed for MessageQueue {} impl Sealed for Payload {} impl Sealed for PageBuf {} @@ -813,18 +814,111 @@ impl Mailbox { #[derive(Clone, Default, Debug, Encode, Decode, PartialEq, Eq, derive_more::Into)] #[cfg_attr(feature = "std", derive(serde::Serialize, serde::Deserialize))] -pub struct MemoryPages(BTreeMap>); +pub struct MemoryPages(MemoryPagesInner); + +// TODO (breathx): replace BTreeMap here with fixed size array. +pub type MemoryPagesInner = BTreeMap>; impl MemoryPages { - pub fn update(&mut self, new_pages: BTreeMap>) { + /// Granularity parameter of how memory pages hashes are stored. + /// + /// Instead of a single huge map of GearPage to HashOf, memory is + /// stored in page regions. Each region represents the same map, + /// but with a specific range of GearPage as keys. + /// + /// # Safety + /// Be careful adjusting this value, as it affects the storage invariants. + /// In case of a change, not only should the database be migrated, but + /// necessary changes should also be applied in the ethexe lazy pages + /// host implementation: see the `ThreadParams` struct. + pub const REGIONS_AMOUNT: usize = 16; + + pub fn page_region(page: GearPage) -> RegionIdx { + RegionIdx((u32::from(page) as usize / Self::REGIONS_AMOUNT) as u8) + } + + pub fn update_and_store_regions( + &mut self, + storage: &S, + new_pages: BTreeMap>, + ) { + let mut updated_regions = BTreeMap::new(); + + let mut current_region_idx = None; + let mut current_region_entry = None; + for (page, data) in new_pages { - self.0.insert(page, data); + let region_idx = Self::page_region(page); + + if current_region_idx != Some(region_idx) { + let region_entry = updated_regions.entry(region_idx).or_insert_with(|| { + self.0 + .remove(®ion_idx) + .map(|region_hash| { + storage + .read_pages_region(region_hash) + .expect("failed to read region from storage") + }) + .unwrap_or_default() + }); + + current_region_idx = Some(region_idx); + current_region_entry = Some(region_entry); + } + + current_region_entry + .as_mut() + .expect("infallible; inserted above") + .0 + .insert(page, data); + } + + for (region_idx, region) in updated_regions { + let region_hash = region + .store(storage) + .hash() + .expect("infallible; pages are only appended here, none are removed"); + + self.0.insert(region_idx, region_hash); } } - pub fn remove(&mut self, pages: &Vec) { + pub fn remove_and_store_regions(&mut self, storage: &S, pages: &Vec) { + let mut updated_regions = BTreeMap::new(); + + let mut current_region_idx = None; + let mut current_region_entry = None; + for page in pages { - self.0.remove(page); + let region_idx = Self::page_region(*page); + + if current_region_idx != Some(region_idx) { + let region_entry = updated_regions.entry(region_idx).or_insert_with(|| { + self.0 + .remove(®ion_idx) + .map(|region_hash| { + storage + .read_pages_region(region_hash) + .expect("failed to read region from storage") + }) + .unwrap_or_default() + }); + + current_region_idx = Some(region_idx); + current_region_entry = Some(region_entry); + } + + current_region_entry + .as_mut() + .expect("infallible; inserted above") + .0 + .remove(page); + } + + for (region_idx, region) in updated_regions { + if let Some(region_hash) = region.store(storage).hash() { + self.0.insert(region_idx, region_hash); + } } } @@ -833,6 +927,22 @@ impl MemoryPages { } } +#[derive(Clone, Default, Debug, Encode, Decode, PartialEq, Eq, derive_more::Into)] +#[cfg_attr(feature = "std", derive(serde::Serialize, serde::Deserialize))] +pub struct MemoryPagesRegion(MemoryPagesRegionInner); + +pub type MemoryPagesRegionInner = BTreeMap>; + +impl MemoryPagesRegion { + pub fn store(self, storage: &S) -> MaybeHashOf { + MaybeHashOf((!self.0.is_empty()).then(|| storage.write_pages_region(self))) + } +} + +#[derive(Clone, Copy, Debug, Encode, Decode, PartialEq, Eq, PartialOrd, Ord, derive_more::Into)] +#[cfg_attr(feature = "std", derive(serde::Serialize, serde::Deserialize))] +pub struct RegionIdx(u8); + #[derive(Default, Debug, Encode, Decode, PartialEq, Eq, derive_more::Into)] pub struct Allocations { inner: IntervalsTree, @@ -904,13 +1014,18 @@ pub trait Storage { /// Writes mailbox and returns its hash. fn write_mailbox(&self, mailbox: Mailbox) -> HashOf; - // TODO: #4355. /// Reads memory pages by pages hash. fn read_pages(&self, hash: HashOf) -> Option; + /// Writes memory pages region and returns its hash. + fn read_pages_region(&self, hash: HashOf) -> Option; + /// Writes memory pages and returns its hash. fn write_pages(&self, pages: MemoryPages) -> HashOf; + /// Writes memory pages region and returns its hash. + fn write_pages_region(&self, pages_region: MemoryPagesRegion) -> HashOf; + /// Reads allocations by allocations hash. fn read_allocations(&self, hash: HashOf) -> Option; diff --git a/ethexe/runtime/src/wasm/storage.rs b/ethexe/runtime/src/wasm/storage.rs index 4c00a42b793..d9edf94ffc9 100644 --- a/ethexe/runtime/src/wasm/storage.rs +++ b/ethexe/runtime/src/wasm/storage.rs @@ -17,16 +17,16 @@ // along with this program. If not, see . use super::interface::database_ri; -use alloc::{collections::BTreeMap, vec::Vec}; +use alloc::vec::Vec; use core_processor::configs::BlockInfo; use ethexe_runtime_common::{ state::{ - Allocations, DispatchStash, HashOf, Mailbox, MemoryPages, MessageQueue, ProgramState, - Storage, Waitlist, + Allocations, DispatchStash, HashOf, Mailbox, MemoryPages, MemoryPagesRegion, MessageQueue, + ProgramState, Storage, Waitlist, }, RuntimeInterface, }; -use gear_core::{memory::PageBuf, message::Payload, pages::GearPage}; +use gear_core::{memory::PageBuf, message::Payload}; use gear_lazy_pages_interface::{LazyPagesInterface, LazyPagesRuntimeInterface}; use gprimitives::H256; @@ -86,10 +86,18 @@ impl Storage for RuntimeInterfaceStorage { database_ri::read_unwrapping(&hash.hash()) } + fn read_pages_region(&self, hash: HashOf) -> Option { + database_ri::read_unwrapping(&hash.hash()) + } + fn write_pages(&self, pages: MemoryPages) -> HashOf { unsafe { HashOf::new(database_ri::write(pages)) } } + fn write_pages_region(&self, pages_region: MemoryPagesRegion) -> HashOf { + unsafe { HashOf::new(database_ri::write(pages_region)) } + } + fn read_allocations(&self, hash: HashOf) -> Option { database_ri::read_unwrapping(&hash.hash()) } @@ -129,7 +137,7 @@ impl RuntimeInterface for NativeRuntimeInterface { self.block_info } - fn init_lazy_pages(&self, _: BTreeMap>) { + fn init_lazy_pages(&self) { assert!(Self::LazyPages::try_to_enable_lazy_pages(Default::default())) }