Skip to content

Commit

Permalink
impl processing for claim value and send reply
Browse files Browse the repository at this point in the history
  • Loading branch information
breathx committed Sep 25, 2024
1 parent 36b138a commit 8f4ce34
Showing 1 changed file with 138 additions and 9 deletions.
147 changes: 138 additions & 9 deletions ethexe/processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,18 @@
use anyhow::Result;
use ethexe_common::{
mirror::RequestEvent as MirrorEvent,
router::{RequestEvent as RouterEvent, StateTransition},
router::{RequestEvent as RouterEvent, StateTransition, ValueClaim},
wvara::RequestEvent as WVaraEvent,
BlockRequestEvent,
};
use ethexe_db::{BlockMetaStorage, CodesStorage, Database};
use ethexe_runtime_common::state::{Dispatch, HashAndLen, MaybeHash, Storage};
use gear_core::{
ids::{prelude::CodeIdExt, ProgramId},
message::{DispatchKind, Payload, ReplyInfo},
ids::{
prelude::{CodeIdExt, MessageIdExt},
ProgramId,
},
message::{DispatchKind, Payload, ReplyDetails, ReplyInfo, SuccessReplyReason},
};
use gprimitives::{ActorId, CodeId, MessageId, H256};
use host::InstanceCreator;
Expand Down Expand Up @@ -69,6 +72,8 @@ impl OverlaidProcessor {
.block_start_program_states(block_hash)
.unwrap_or_default();

let mut value_claims = Default::default();

let Some(&state_hash) = states.get(&program_id) else {
return Err(anyhow::anyhow!("unknown program at specified block hash"));
};
Expand All @@ -85,6 +90,7 @@ impl OverlaidProcessor {

self.0.handle_mirror_event(
&mut states,
&mut value_claims,
program_id,
MirrorEvent::MessageQueueingRequested {
id: MessageId::zero(),
Expand Down Expand Up @@ -272,6 +278,93 @@ impl Processor {
Ok(self.db.write_state(state))
}

fn handle_reply_queueing(
&mut self,
state_hash: H256,
mailboxed_id: MessageId,
user_id: ActorId,
payload: Vec<u8>,
value: u128,
) -> Result<Option<(ValueClaim, H256)>> {
self.handle_mailboxed_message_impl(
state_hash,
mailboxed_id,
user_id,
payload,
value,
SuccessReplyReason::Manual,
)
}

fn handle_value_claiming(
&mut self,
state_hash: H256,
mailboxed_id: MessageId,
user_id: ActorId,
) -> Result<Option<(ValueClaim, H256)>> {
self.handle_mailboxed_message_impl(
state_hash,
mailboxed_id,
user_id,
vec![],
0,
SuccessReplyReason::Auto,
)
}

fn handle_mailboxed_message_impl(
&mut self,
state_hash: H256,
mailboxed_id: MessageId,
user_id: ActorId,
payload: Vec<u8>,
value: u128,
reply_reason: SuccessReplyReason,
) -> Result<Option<(ValueClaim, H256)>> {
let mut state = self
.db
.read_state(state_hash)
.ok_or_else(|| anyhow::anyhow!("program should exist"))?;

let mut mailbox = state.mailbox_hash.with_hash_or_default(|hash| {
self.db.read_mailbox(hash).expect("Failed to read mailbox")
});

let entry = mailbox.entry(user_id).or_default();

let Some((claimed_value, _expiration)) = entry.remove(&mailboxed_id) else {
return Ok(None);
};

let claim = ValueClaim {
message_id: mailboxed_id,
destination: user_id,
value: claimed_value,
};

let mut queue = state
.queue_hash
.with_hash_or_default(|hash| self.db.read_queue(hash).expect("Failed to read queue"));

let payload_hash = self.handle_payload(payload)?;

let dispatch = Dispatch {
id: MessageId::generate_reply(mailboxed_id),
kind: DispatchKind::Reply,
source: user_id,
payload_hash,
value,
details: Some(ReplyDetails::new(mailboxed_id, reply_reason.into()).into()),
context: None,
};

queue.push_back(dispatch);

state.queue_hash = self.db.write_queue(queue).into();

Ok(Some((claim, self.db.write_state(state))))
}

// TODO: replace LocalOutcome with Transition struct.
pub fn run(
&mut self,
Expand Down Expand Up @@ -312,21 +405,34 @@ impl Processor {
.block_start_program_states(block_hash)
.unwrap_or_default();

let mut all_value_claims = Default::default();

for event in events {
match event {
BlockRequestEvent::Router(event) => {
self.handle_router_event(&mut states, event)?;
}
BlockRequestEvent::Mirror { address, event } => {
self.handle_mirror_event(&mut states, address, event)?;
self.handle_mirror_event(&mut states, &mut all_value_claims, address, event)?;
}
BlockRequestEvent::WVara(event) => {
self.handle_wvara_event(&mut states, event)?;
}
}
}

let outcomes = self.run(block_hash, &mut states)?;
let mut outcomes = self.run(block_hash, &mut states)?;

for outcome in &mut outcomes {
if let LocalOutcome::Transition(StateTransition {
actor_id,
value_claims,
..
}) = outcome
{
value_claims.extend(all_value_claims.remove(actor_id).unwrap_or_default());
}
}

self.db.set_block_end_program_states(block_hash, states);

Expand Down Expand Up @@ -360,6 +466,7 @@ impl Processor {
fn handle_mirror_event(
&mut self,
states: &mut BTreeMap<ProgramId, H256>,
value_claims: &mut BTreeMap<ProgramId, Vec<ValueClaim>>,
actor_id: ProgramId,
event: MirrorEvent,
) -> Result<()> {
Expand Down Expand Up @@ -404,10 +511,32 @@ impl Processor {

self.handle_message_queueing(state_hash, dispatch)?
}
MirrorEvent::ReplyQueueingRequested { .. }
| MirrorEvent::ValueClaimingRequested { .. } => {
log::debug!("Handler not yet implemented: {event:?}");
return Ok(());
MirrorEvent::ReplyQueueingRequested {
replied_to,
source,
payload,
value,
} => {
let Some((value_claim, state_hash)) =
self.handle_reply_queueing(state_hash, replied_to, source, payload, value)?
else {
return Ok(());
};

value_claims.entry(actor_id).or_default().push(value_claim);

state_hash
}
MirrorEvent::ValueClaimingRequested { claimed_id, source } => {
let Some((value_claim, state_hash)) =
self.handle_value_claiming(state_hash, claimed_id, source)?
else {
return Ok(());
};

value_claims.entry(actor_id).or_default().push(value_claim);

state_hash
}
};

Expand Down

0 comments on commit 8f4ce34

Please sign in to comment.