Skip to content

Commit

Permalink
feat(ethexe): add storage key for upcoming scheduler and manage it (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
breathx authored Oct 1, 2024
1 parent 5d67331 commit c0afe4f
Show file tree
Hide file tree
Showing 11 changed files with 226 additions and 3 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ethexe/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ repository.workspace = true
[dependencies]
gear-core.workspace = true
gprimitives = { workspace = true, features = ["serde"] }
common.workspace = true

parity-scale-codec.workspace = true
derive_more.workspace = true
Expand Down
8 changes: 8 additions & 0 deletions ethexe/common/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ use gear_core::{
use gprimitives::H256;
use parity_scale_codec::{Decode, Encode};

pub type ScheduledTask = common::scheduler::ScheduledTask<ActorId>;

#[derive(Debug, Clone, Default, Encode, Decode, serde::Serialize)]
pub struct BlockHeader {
pub height: u32,
Expand Down Expand Up @@ -73,6 +75,12 @@ pub trait BlockMetaStorage: Send + Sync {

fn latest_valid_block(&self) -> Option<(H256, BlockHeader)>;
fn set_latest_valid_block(&self, block_hash: H256, header: BlockHeader);

fn block_start_schedule(&self, block_hash: H256) -> Option<BTreeMap<u32, Vec<ScheduledTask>>>;
fn set_block_start_schedule(&self, block_hash: H256, map: BTreeMap<u32, Vec<ScheduledTask>>);

fn block_end_schedule(&self, block_hash: H256) -> Option<BTreeMap<u32, Vec<ScheduledTask>>>;
fn set_block_end_schedule(&self, block_hash: H256, map: BTreeMap<u32, Vec<ScheduledTask>>);
}

pub trait CodesStorage: Send + Sync {
Expand Down
38 changes: 37 additions & 1 deletion ethexe/db/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{
CASDatabase, KVDatabase,
};
use ethexe_common::{
db::{BlockHeader, BlockMetaStorage, CodesStorage},
db::{BlockHeader, BlockMetaStorage, CodesStorage, ScheduledTask},
router::StateTransition,
BlockRequestEvent,
};
Expand Down Expand Up @@ -56,6 +56,8 @@ enum KeyPrefix {
LatestValidBlock = 8,
BlockHeader = 9,
CodeValid = 10,
BlockStartSchedule = 11,
BlockEndSchedule = 12,
}

impl KeyPrefix {
Expand Down Expand Up @@ -271,6 +273,40 @@ impl BlockMetaStorage for Database {
(block_hash, header).encode(),
);
}

fn block_start_schedule(&self, block_hash: H256) -> Option<BTreeMap<u32, Vec<ScheduledTask>>> {
self.kv
.get(&KeyPrefix::BlockStartSchedule.two(self.router_address, block_hash))
.map(|data| {
BTreeMap::decode(&mut data.as_slice())
.expect("Failed to decode data into `BTreeMap`")
})
}

fn set_block_start_schedule(&self, block_hash: H256, map: BTreeMap<u32, Vec<ScheduledTask>>) {
log::trace!(target: LOG_TARGET, "For block {block_hash} set block start schedule: {map:?}");
self.kv.put(
&KeyPrefix::BlockStartSchedule.two(self.router_address, block_hash),
map.encode(),
);
}

fn block_end_schedule(&self, block_hash: H256) -> Option<BTreeMap<u32, Vec<ScheduledTask>>> {
self.kv
.get(&KeyPrefix::BlockEndSchedule.two(self.router_address, block_hash))
.map(|data| {
BTreeMap::decode(&mut data.as_slice())
.expect("Failed to decode data into `BTreeMap`")
})
}

fn set_block_end_schedule(&self, block_hash: H256, map: BTreeMap<u32, Vec<ScheduledTask>>) {
log::trace!(target: LOG_TARGET, "For block {block_hash} set block end schedule: {map:?}");
self.kv.put(
&KeyPrefix::BlockEndSchedule.two(self.router_address, block_hash),
map.encode(),
);
}
}

impl CodesStorage for Database {
Expand Down
9 changes: 9 additions & 0 deletions ethexe/observer/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ impl Query {
self.database.set_block_is_empty(hash, true);
self.database
.set_block_end_program_states(hash, Default::default());
self.database
.set_block_end_schedule(hash, Default::default());

// set latest valid if empty.
if self.database.latest_valid_block().is_none() {
Expand Down Expand Up @@ -275,6 +277,13 @@ impl Query {
self.database
.set_block_start_program_states(block_hash, program_state_hashes);

// Propagate scheduled tasks
let schedule = self
.database
.block_end_schedule(parent)
.ok_or_else(|| anyhow!("parent block schedule not found"))?;
self.database.set_block_start_schedule(block_hash, schedule);

// Propagate `wait for commitment` blocks queue
let queue = self
.database
Expand Down
1 change: 1 addition & 0 deletions ethexe/processor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ gprimitives.workspace = true
gear-runtime-interface = { workspace = true, features = ["std"] }
gear-lazy-pages.workspace = true
core-processor.workspace = true
common = { workspace = true, features = ["std"] }

anyhow = { workspace = true, features = ["std"] }
wasmtime.workspace = true
Expand Down
18 changes: 18 additions & 0 deletions ethexe/processor/src/handling/events.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
// This file is part of Gear.
//
// Copyright (C) 2024 Gear Technologies Inc.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use anyhow::Result;
use ethexe_common::{
mirror::RequestEvent as MirrorEvent,
Expand Down
19 changes: 19 additions & 0 deletions ethexe/processor/src/handling/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
// This file is part of Gear.
//
// Copyright (C) 2024 Gear Technologies Inc.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use crate::Processor;
use anyhow::Result;
use ethexe_db::CodesStorage;
Expand All @@ -6,6 +24,7 @@ use gprimitives::{CodeId, H256};

pub(crate) mod events;
pub(crate) mod run;
pub(crate) mod tasks;

impl Processor {
pub(crate) fn handle_message_queueing(
Expand Down
99 changes: 99 additions & 0 deletions ethexe/processor/src/handling/tasks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// This file is part of Gear.
//
// Copyright (C) 2024 Gear Technologies Inc.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use crate::{LocalOutcome, Processor};
use anyhow::Result;
use common::{
scheduler::{ScheduledTask, TaskHandler},
CodeId, Gas, MessageId, ProgramId, ReservationId,
};
use ethexe_db::BlockMetaStorage;
use gear_core::message::Message;
use gprimitives::{ActorId, H256};
use std::collections::BTreeMap;

impl Processor {
pub fn run_tasks(
&mut self,
block_hash: H256,
states: &mut BTreeMap<ProgramId, H256>,
tasks: &mut BTreeMap<u32, Vec<ScheduledTask<ActorId>>>,
) -> Result<Vec<LocalOutcome>> {
let mut handler = TasksHandler {
states,
results: Default::default(),
to_users_messages: Default::default(),
};

let block_meta = self
.db
.block_header(block_hash)
.ok_or_else(|| anyhow::anyhow!("block header for chain head wasn't found"))?;

let tasks = tasks.remove(&block_meta.height).unwrap_or_default();

for task in tasks {
let _gas = task.process_with(&mut handler);
}

Ok(vec![])
}
}

#[allow(unused)]
pub struct TasksHandler<'a> {
pub states: &'a mut BTreeMap<ProgramId, H256>,
pub results: BTreeMap<ActorId, H256>,
pub to_users_messages: Vec<Message>,
}

impl<'a> TaskHandler<ActorId> for TasksHandler<'a> {
fn remove_from_mailbox(&mut self, _user_id: ActorId, _message_id: MessageId) -> Gas {
unimplemented!("TODO (breathx)")
}
fn remove_from_waitlist(&mut self, _program_id: ProgramId, _message_id: MessageId) -> Gas {
unimplemented!("TODO (breathx)")
}
fn send_dispatch(&mut self, _stashed_message_id: MessageId) -> Gas {
unimplemented!("TODO (breathx)")
}
fn send_user_message(&mut self, _stashed_message_id: MessageId, _to_mailbox: bool) -> Gas {
unimplemented!("TODO (breathx)")
}
fn wake_message(&mut self, _program_id: ProgramId, _message_id: MessageId) -> Gas {
// TODO (breathx): consider deprecation of delayed wakes + non-concrete waits.
unimplemented!("TODO (breathx)")
}

/* Deprecated APIs */
fn pause_program(&mut self, _: ProgramId) -> Gas {
unreachable!("deprecated")
}
fn remove_code(&mut self, _: CodeId) -> Gas {
unreachable!("deprecated")
}
fn remove_gas_reservation(&mut self, _: ProgramId, _: ReservationId) -> Gas {
unreachable!("deprecated")
}
fn remove_paused_program(&mut self, _: ProgramId) -> Gas {
unreachable!("deprecated")
}
fn remove_resume_session(&mut self, _: u32) -> Gas {
unreachable!("deprecated")
}
}
8 changes: 7 additions & 1 deletion ethexe/processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ impl Processor {
let mut states = self
.db
.block_start_program_states(block_hash)
.unwrap_or_default();
.unwrap_or_default(); // TODO (breathx): shouldn't it be a panic?

let mut schedule = self.db.block_start_schedule(block_hash).unwrap_or_default(); // TODO (breathx): shouldn't it be a panic?

let mut all_value_claims = Default::default();

Expand All @@ -104,6 +106,9 @@ impl Processor {
}
}

// TODO (breathx): handle outcomes.
let mut _outcomes = self.run_tasks(block_hash, &mut states, &mut schedule)?;

let mut outcomes = self.run(block_hash, &mut states)?;

for outcome in &mut outcomes {
Expand All @@ -118,6 +123,7 @@ impl Processor {
}

self.db.set_block_end_program_states(block_hash, states);
self.db.set_block_end_schedule(block_hash, schedule);

Ok(outcomes)
}
Expand Down
26 changes: 25 additions & 1 deletion ethexe/processor/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ fn init_new_block(processor: &mut Processor, meta: BlockHeader) -> H256 {
chain_head
}

#[track_caller]
fn init_new_block_from_parent(processor: &mut Processor, parent_hash: H256) -> H256 {
let parent_block_header = processor.db.block_header(parent_hash).unwrap_or_default();
let height = parent_block_header.height + 1;
Expand All @@ -48,13 +49,35 @@ fn init_new_block_from_parent(processor: &mut Processor, parent_hash: H256) -> H
parent_hash,
},
);

let parent_out_program_hashes = processor
.db
.block_end_program_states(parent_hash)
.unwrap_or_default();
.unwrap_or_else(|| {
if parent_hash.is_zero() {
Default::default()
} else {
panic!("process block events before new block; start states not found")
}
});
processor
.db
.set_block_start_program_states(chain_head, parent_out_program_hashes);

let parent_out_schedule = processor
.db
.block_end_schedule(parent_hash)
.unwrap_or_else(|| {
if parent_hash.is_zero() {
Default::default()
} else {
panic!("process block events before new block; start schedule not found")
}
});
processor
.db
.set_block_start_schedule(chain_head, parent_out_schedule);

chain_head
}

Expand Down Expand Up @@ -83,6 +106,7 @@ fn process_observer_event() {
}]
);

let _ = processor.process_block_events(ch0, vec![]).unwrap();
let ch1 = init_new_block_from_parent(&mut processor, ch0);

let actor_id = ActorId::from(42);
Expand Down

0 comments on commit c0afe4f

Please sign in to comment.