Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ethexe): add storage key for upcoming scheduler and manage it #4269

Merged
merged 4 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ethexe/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ repository.workspace = true
[dependencies]
gear-core.workspace = true
gprimitives = { workspace = true, features = ["serde"] }
common.workspace = true

parity-scale-codec.workspace = true
derive_more.workspace = true
Expand Down
8 changes: 8 additions & 0 deletions ethexe/common/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ use gear_core::{
use gprimitives::H256;
use parity_scale_codec::{Decode, Encode};

pub type ScheduledTask = common::scheduler::ScheduledTask<ActorId>;

#[derive(Debug, Clone, Default, Encode, Decode, serde::Serialize)]
pub struct BlockHeader {
pub height: u32,
Expand Down Expand Up @@ -73,6 +75,12 @@ pub trait BlockMetaStorage: Send + Sync {

fn latest_valid_block(&self) -> Option<(H256, BlockHeader)>;
fn set_latest_valid_block(&self, block_hash: H256, header: BlockHeader);

fn block_start_schedule(&self, block_hash: H256) -> Option<BTreeMap<u32, Vec<ScheduledTask>>>;
fn set_block_start_schedule(&self, block_hash: H256, map: BTreeMap<u32, Vec<ScheduledTask>>);

fn block_end_schedule(&self, block_hash: H256) -> Option<BTreeMap<u32, Vec<ScheduledTask>>>;
fn set_block_end_schedule(&self, block_hash: H256, map: BTreeMap<u32, Vec<ScheduledTask>>);
}

pub trait CodesStorage: Send + Sync {
Expand Down
38 changes: 37 additions & 1 deletion ethexe/db/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{
CASDatabase, KVDatabase,
};
use ethexe_common::{
db::{BlockHeader, BlockMetaStorage, CodesStorage},
db::{BlockHeader, BlockMetaStorage, CodesStorage, ScheduledTask},
router::StateTransition,
BlockRequestEvent,
};
Expand Down Expand Up @@ -56,6 +56,8 @@ enum KeyPrefix {
LatestValidBlock = 8,
BlockHeader = 9,
CodeValid = 10,
BlockStartSchedule = 11,
BlockEndSchedule = 12,
}

impl KeyPrefix {
Expand Down Expand Up @@ -271,6 +273,40 @@ impl BlockMetaStorage for Database {
(block_hash, header).encode(),
);
}

fn block_start_schedule(&self, block_hash: H256) -> Option<BTreeMap<u32, Vec<ScheduledTask>>> {
self.kv
.get(&KeyPrefix::BlockStartSchedule.two(self.router_address, block_hash))
.map(|data| {
BTreeMap::decode(&mut data.as_slice())
.expect("Failed to decode data into `BTreeMap`")
})
}

fn set_block_start_schedule(&self, block_hash: H256, map: BTreeMap<u32, Vec<ScheduledTask>>) {
log::trace!(target: LOG_TARGET, "For block {block_hash} set block start schedule: {map:?}");
self.kv.put(
&KeyPrefix::BlockStartSchedule.two(self.router_address, block_hash),
map.encode(),
);
}

fn block_end_schedule(&self, block_hash: H256) -> Option<BTreeMap<u32, Vec<ScheduledTask>>> {
self.kv
.get(&KeyPrefix::BlockEndSchedule.two(self.router_address, block_hash))
.map(|data| {
BTreeMap::decode(&mut data.as_slice())
.expect("Failed to decode data into `BTreeMap`")
})
}

fn set_block_end_schedule(&self, block_hash: H256, map: BTreeMap<u32, Vec<ScheduledTask>>) {
log::trace!(target: LOG_TARGET, "For block {block_hash} set block end schedule: {map:?}");
self.kv.put(
&KeyPrefix::BlockEndSchedule.two(self.router_address, block_hash),
map.encode(),
);
}
}

impl CodesStorage for Database {
Expand Down
9 changes: 9 additions & 0 deletions ethexe/observer/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ impl Query {
self.database.set_block_is_empty(hash, true);
self.database
.set_block_end_program_states(hash, Default::default());
self.database
.set_block_end_schedule(hash, Default::default());

// set latest valid if empty.
if self.database.latest_valid_block().is_none() {
Expand Down Expand Up @@ -275,6 +277,13 @@ impl Query {
self.database
.set_block_start_program_states(block_hash, program_state_hashes);

// Propagate scheduled tasks
let schedule = self
.database
.block_end_schedule(parent)
.ok_or_else(|| anyhow!("parent block schedule not found"))?;
self.database.set_block_start_schedule(block_hash, schedule);

// Propagate `wait for commitment` blocks queue
let queue = self
.database
Expand Down
1 change: 1 addition & 0 deletions ethexe/processor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ gprimitives.workspace = true
gear-runtime-interface = { workspace = true, features = ["std"] }
gear-lazy-pages.workspace = true
core-processor.workspace = true
common = { workspace = true, features = ["std"] }

anyhow = { workspace = true, features = ["std"] }
wasmtime.workspace = true
Expand Down
18 changes: 18 additions & 0 deletions ethexe/processor/src/handling/events.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
// This file is part of Gear.
//
// Copyright (C) 2024 Gear Technologies Inc.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use anyhow::Result;
use ethexe_common::{
mirror::RequestEvent as MirrorEvent,
Expand Down
19 changes: 19 additions & 0 deletions ethexe/processor/src/handling/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
// This file is part of Gear.
//
// Copyright (C) 2024 Gear Technologies Inc.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use crate::Processor;
use anyhow::Result;
use ethexe_db::CodesStorage;
Expand All @@ -6,6 +24,7 @@ use gprimitives::{CodeId, H256};

pub(crate) mod events;
pub(crate) mod run;
pub(crate) mod tasks;

impl Processor {
pub(crate) fn handle_message_queueing(
Expand Down
99 changes: 99 additions & 0 deletions ethexe/processor/src/handling/tasks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// This file is part of Gear.
//
// Copyright (C) 2024 Gear Technologies Inc.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use crate::{LocalOutcome, Processor};
use anyhow::Result;
use common::{
scheduler::{ScheduledTask, TaskHandler},
CodeId, Gas, MessageId, ProgramId, ReservationId,
};
use ethexe_db::BlockMetaStorage;
use gear_core::message::Message;
use gprimitives::{ActorId, H256};
use std::collections::BTreeMap;

impl Processor {
pub fn run_tasks(
&mut self,
block_hash: H256,
states: &mut BTreeMap<ProgramId, H256>,
tasks: &mut BTreeMap<u32, Vec<ScheduledTask<ActorId>>>,
) -> Result<Vec<LocalOutcome>> {
let mut handler = TasksHandler {
states,
results: Default::default(),
to_users_messages: Default::default(),
};

let block_meta = self
.db
.block_header(block_hash)
.ok_or_else(|| anyhow::anyhow!("block header for chain head wasn't found"))?;

let tasks = tasks.remove(&block_meta.height).unwrap_or_default();

for task in tasks {
let _gas = task.process_with(&mut handler);
}

Ok(vec![])
}
}

#[allow(unused)]
pub struct TasksHandler<'a> {
pub states: &'a mut BTreeMap<ProgramId, H256>,
pub results: BTreeMap<ActorId, H256>,
pub to_users_messages: Vec<Message>,
}

impl<'a> TaskHandler<ActorId> for TasksHandler<'a> {
fn remove_from_mailbox(&mut self, _user_id: ActorId, _message_id: MessageId) -> Gas {
unimplemented!("TODO (breathx)")
}
fn remove_from_waitlist(&mut self, _program_id: ProgramId, _message_id: MessageId) -> Gas {
unimplemented!("TODO (breathx)")
}
fn send_dispatch(&mut self, _stashed_message_id: MessageId) -> Gas {
unimplemented!("TODO (breathx)")
}
fn send_user_message(&mut self, _stashed_message_id: MessageId, _to_mailbox: bool) -> Gas {
unimplemented!("TODO (breathx)")
}
fn wake_message(&mut self, _program_id: ProgramId, _message_id: MessageId) -> Gas {
// TODO (breathx): consider deprecation of delayed wakes + non-concrete waits.
unimplemented!("TODO (breathx)")
}

/* Deprecated APIs */
fn pause_program(&mut self, _: ProgramId) -> Gas {
unreachable!("deprecated")
}
fn remove_code(&mut self, _: CodeId) -> Gas {
unreachable!("deprecated")
}
fn remove_gas_reservation(&mut self, _: ProgramId, _: ReservationId) -> Gas {
unreachable!("deprecated")
}
fn remove_paused_program(&mut self, _: ProgramId) -> Gas {
unreachable!("deprecated")
}
fn remove_resume_session(&mut self, _: u32) -> Gas {
unreachable!("deprecated")
}
}
8 changes: 7 additions & 1 deletion ethexe/processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ impl Processor {
let mut states = self
.db
.block_start_program_states(block_hash)
.unwrap_or_default();
.unwrap_or_default(); // TODO (breathx): shouldn't it be a panic?

let mut schedule = self.db.block_start_schedule(block_hash).unwrap_or_default(); // TODO (breathx): shouldn't it be a panic?

let mut all_value_claims = Default::default();

Expand All @@ -104,6 +106,9 @@ impl Processor {
}
}

// TODO (breathx): handle outcomes.
let mut _outcomes = self.run_tasks(block_hash, &mut states, &mut schedule)?;

let mut outcomes = self.run(block_hash, &mut states)?;

for outcome in &mut outcomes {
Expand All @@ -118,6 +123,7 @@ impl Processor {
}

self.db.set_block_end_program_states(block_hash, states);
self.db.set_block_end_schedule(block_hash, schedule);

Ok(outcomes)
}
Expand Down
26 changes: 25 additions & 1 deletion ethexe/processor/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ fn init_new_block(processor: &mut Processor, meta: BlockHeader) -> H256 {
chain_head
}

#[track_caller]
fn init_new_block_from_parent(processor: &mut Processor, parent_hash: H256) -> H256 {
let parent_block_header = processor.db.block_header(parent_hash).unwrap_or_default();
let height = parent_block_header.height + 1;
Expand All @@ -48,13 +49,35 @@ fn init_new_block_from_parent(processor: &mut Processor, parent_hash: H256) -> H
parent_hash,
},
);

let parent_out_program_hashes = processor
.db
.block_end_program_states(parent_hash)
.unwrap_or_default();
.unwrap_or_else(|| {
if parent_hash.is_zero() {
Default::default()
} else {
panic!("process block events before new block; start states not found")
}
});
processor
.db
.set_block_start_program_states(chain_head, parent_out_program_hashes);

let parent_out_schedule = processor
.db
.block_end_schedule(parent_hash)
.unwrap_or_else(|| {
if parent_hash.is_zero() {
Default::default()
} else {
panic!("process block events before new block; start schedule not found")
}
});
processor
.db
.set_block_start_schedule(chain_head, parent_out_schedule);

chain_head
}

Expand Down Expand Up @@ -83,6 +106,7 @@ fn process_observer_event() {
}]
);

let _ = processor.process_block_events(ch0, vec![]).unwrap();
let ch1 = init_new_block_from_parent(&mut processor, ch0);

let actor_id = ActorId::from(42);
Expand Down
Loading