Skip to content

Commit

Permalink
restore 'many_waits' test
Browse files Browse the repository at this point in the history
  • Loading branch information
breathx committed Oct 1, 2024
1 parent 07e1ad9 commit bf38e9b
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 108 deletions.
2 changes: 2 additions & 0 deletions ethexe/processor/src/handling/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ impl Processor {
pub fn run_schedule(&mut self, in_block_transitions: &mut InBlockTransitions) {
let tasks = in_block_transitions.take_actual_tasks();

log::debug!("Running schedule for #{}: tasks are {tasks:?}", in_block_transitions.block_number());

let mut handler = ScheduleHandler {
in_block_transitions,
storage: &self.db,
Expand Down
253 changes: 145 additions & 108 deletions ethexe/processor/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ use wabt::wat2wasm;
fn init_new_block(processor: &mut Processor, meta: BlockHeader) -> H256 {
let chain_head = H256::random();
processor.db.set_block_header(chain_head, meta);
processor
.db
.set_block_start_program_states(chain_head, Default::default());
processor
.db
.set_block_start_schedule(chain_head, Default::default());
processor.creator.set_chain_head(chain_head);
chain_head
}
Expand Down Expand Up @@ -438,114 +444,145 @@ fn async_and_ping() {
assert_eq!(message.payload, wait_for_reply_to.into_bytes().as_slice());
}

// TODO (breathx).
// #[test]
// fn many_waits() {
// init_logger();

// let threads_amount = 8;

// let wat = r#"
// (module
// (import "env" "memory" (memory 1))
// (import "env" "gr_reply" (func $reply (param i32 i32 i32 i32)))
// (import "env" "gr_wait_for" (func $wait_for (param i32)))
// (export "handle" (func $handle))
// (func $handle
// (if
// (i32.eqz (i32.load (i32.const 0x200)))
// (then
// (i32.store (i32.const 0x200) (i32.const 1))
// (call $wait_for (i32.const 10))
// )
// (else
// (call $reply (i32.const 0) (i32.const 13) (i32.const 0x400) (i32.const 0x600))
// )
// )
// )
// (data (i32.const 0) "Hello, world!")
// )
// "#;

// let (_, code) = wat_to_wasm(wat);

// let db = MemDb::default();
// let mut processor = Processor::new(Database::from_one(&db, Default::default())).unwrap();

// init_new_block(&mut processor, Default::default());

// let code_id = processor
// .handle_new_code(code)
// .expect("failed to call runtime api")
// .expect("code failed verification or instrumentation");

// let amount = 10000;
// let mut programs = BTreeMap::new();
// for i in 0..amount {
// let program_id = ProgramId::from(i);

// processor
// .handle_new_program(program_id, code_id)
// .expect("failed to create new program");

// let state_hash = processor
// .handle_executable_balance_top_up(H256::zero(), 10_000_000_000)
// .expect("failed to top up balance");

// let message = create_message(&mut processor, DispatchKind::Init, b"");
// let state_hash = processor
// .handle_message_queueing(state_hash, message)
// .expect("failed to populate message queue");

// programs.insert(program_id, state_hash);
// }

// let (to_users, _) = run::run(
// threads_amount,
// processor.db.clone(),
// processor.creator.clone(),
// &mut programs,
// );
// assert_eq!(to_users.len(), amount as usize);

// for (_pid, state_hash) in programs.iter_mut() {
// let message = create_message(&mut processor, DispatchKind::Handle, b"");
// let new_state_hash = processor
// .handle_message_queueing(*state_hash, message)
// .expect("failed to populate message queue");
// *state_hash = new_state_hash;
// }

// let (to_users, _) = run::run(
// threads_amount,
// processor.db.clone(),
// processor.creator.clone(),
// &mut programs,
// );
// assert_eq!(to_users.len(), 0);

// init_new_block(
// &mut processor,
// BlockHeader {
// height: 11,
// timestamp: 11,
// ..Default::default()
// },
// );

// let (to_users, _) = run::run(
// threads_amount,
// processor.db.clone(),
// processor.creator.clone(),
// &mut programs,
// );

// assert_eq!(to_users.len(), amount as usize);

// for message in to_users {
// assert_eq!(message.payload_bytes(), b"Hello, world!");
// }
// }
#[test]
fn many_waits() {
init_logger();

let threads_amount = 8;

let wat = r#"
(module
(import "env" "memory" (memory 1))
(import "env" "gr_reply" (func $reply (param i32 i32 i32 i32)))
(import "env" "gr_wait_for" (func $wait_for (param i32)))
(export "handle" (func $handle))
(func $handle
(if
(i32.eqz (i32.load (i32.const 0x200)))
(then
(i32.store (i32.const 0x200) (i32.const 1))
(call $wait_for (i32.const 10))
)
(else
(call $reply (i32.const 0) (i32.const 13) (i32.const 0x400) (i32.const 0x600))
)
)
)
(data (i32.const 0) "Hello, world!")
)
"#;

let (_, code) = wat_to_wasm(wat);

let db = MemDb::default();
let mut processor = Processor::new(Database::from_one(&db, Default::default())).unwrap();

let ch0 = init_new_block(&mut processor, BlockHeader { height: 1, timestamp: 1, parent_hash: Default::default() });

let code_id = processor
.handle_new_code(code)
.expect("failed to call runtime api")
.expect("code failed verification or instrumentation");

let amount = 10000;
let mut states = BTreeMap::new();
for i in 0..amount {
let program_id = ProgramId::from(i);

processor
.handle_new_program(program_id, code_id)
.expect("failed to create new program");

let state_hash = processor
.handle_executable_balance_top_up(H256::zero(), 10_000_000_000)
.expect("failed to top up balance");

let message = create_message(&mut processor, DispatchKind::Init, b"");
let state_hash = processor
.handle_message_queueing(state_hash, message)
.expect("failed to populate message queue");

states.insert(program_id, state_hash);
}

let mut in_block_transitions = InBlockTransitions::new(1, states, Default::default());

processor.run_schedule(&mut in_block_transitions);
run::run(
threads_amount,
processor.db.clone(),
processor.creator.clone(),
&mut in_block_transitions,
);
assert_eq!(
in_block_transitions.current_messages().len(),
amount as usize
);

let mut changes = BTreeMap::new();

for (pid, state_hash) in in_block_transitions.states_iter().clone() {
let message = create_message(&mut processor, DispatchKind::Handle, b"");
let new_state_hash = processor
.handle_message_queueing(*state_hash, message)
.expect("failed to populate message queue");
changes.insert(*pid, new_state_hash);
}

for (pid, state_hash) in changes {
in_block_transitions.modify_state(pid, state_hash).unwrap();
}

processor.run_schedule(&mut in_block_transitions);
run::run(
threads_amount,
processor.db.clone(),
processor.creator.clone(),
&mut in_block_transitions,
);
// unchanged
assert_eq!(
in_block_transitions.current_messages().len(),
amount as usize
);

let (_outcomes, states, schedule) = in_block_transitions.finalize();
processor.db.set_block_end_program_states(ch0, states);
processor.db.set_block_end_schedule(ch0, schedule);

let mut parent = ch0;
for _ in 0..10 {
parent = init_new_block_from_parent(&mut processor, parent);
let states = processor.db.block_start_program_states(parent).unwrap();
processor.db.set_block_end_program_states(parent, states);
let schedule = processor.db.block_start_schedule(parent).unwrap();
processor.db.set_block_end_schedule(parent, schedule);
}

let ch11 = init_new_block_from_parent(&mut processor, parent);

let states = processor.db.block_start_program_states(ch11).unwrap();
let schedule = processor.db.block_start_schedule(ch11).unwrap();

let mut in_block_transitions = InBlockTransitions::new(11, states, schedule);

processor.run_schedule(&mut in_block_transitions);
run::run(
threads_amount,
processor.db.clone(),
processor.creator.clone(),
&mut in_block_transitions,
);

assert_eq!(
in_block_transitions.current_messages().len(),
amount as usize
);

for (_pid, message) in in_block_transitions.current_messages() {
assert_eq!(message.payload, b"Hello, world!");
}
}

mod utils {
use super::*;
Expand Down
2 changes: 2 additions & 0 deletions ethexe/runtime/common/src/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ impl<'a, S: Storage> TaskHandler<ActorId> for Handler<'a, S> {
}
// TODO (breathx): consider deprecation of delayed wakes + non-concrete waits.
fn wake_message(&mut self, program_id: ProgramId, message_id: MessageId) -> u64 {
log::trace!("Running scheduled task wake message {message_id} to {program_id}");

// TODO (breathx): don't update state if not changed?
self.update_state_with_storage(program_id, |storage, state| {
let Some((dispatch, new_waitlist_hash)) = storage
Expand Down
4 changes: 4 additions & 0 deletions ethexe/runtime/common/src/transitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ impl InBlockTransitions {
}
}

pub fn block_number(&self) -> u32 {
self.current_bn
}

pub fn state_of(&self, actor_id: &ActorId) -> Option<H256> {
self.states.get(actor_id).cloned()
}
Expand Down

0 comments on commit bf38e9b

Please sign in to comment.