Skip to content

Commit

Permalink
Add Stop command and test cases to verify suspend/resume/shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
chenyukang committed Jan 17, 2024
1 parent 60a65ba commit 1607a98
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 29 deletions.
2 changes: 1 addition & 1 deletion script/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ ckb-logger = { path = "../util/logger", version = "= 0.114.0-pre", optional = tr
serde = { version = "1.0", features = ["derive"] }
ckb-error = { path = "../error", version = "= 0.114.0-pre" }
ckb-chain-spec = { path = "../spec", version = "= 0.114.0-pre" }
tokio = { version = "1.35.0", features = ["sync"] }
tokio = { version = "1.35.0", features = ["sync", "rt-multi-thread"] }

[dev-dependencies]
proptest = "1.0"
Expand Down
2 changes: 2 additions & 0 deletions script/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,4 +464,6 @@ pub enum ChunkCommand {
Suspend,
/// Resume the verification process
Resume,
/// Stop the verification process
Stop,
}
21 changes: 14 additions & 7 deletions script/src/verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1410,16 +1410,16 @@ async fn run_vms_with_signal(
);

loop {
//eprintln!("parent wait to receive: {:?}", signal.borrow().to_owned());
tokio::select! {
_ = signal.changed() => {
match signal.borrow().to_owned() {
let command = signal.borrow().to_owned();
match command {
ChunkCommand::Suspend => {
pause.interrupt();
}
ChunkCommand::Resume => {
ChunkCommand::Resume | ChunkCommand::Stop => {
pause.free();
let _res = child_sender.send(ChunkCommand::Resume);
let _res = child_sender.send(command);
}
}
}
Expand Down Expand Up @@ -1462,9 +1462,16 @@ async fn run_vms_child(
loop {
select! {
_ = child_recv.changed() => {
let state = child_recv.borrow().to_owned();
if state != ChunkCommand::Resume {
continue;
match child_recv.borrow().to_owned() {
ChunkCommand::Stop => {
let exit = (Err(ckb_vm::Error::Unexpected("stopped".to_string())), cycles);
let _ = finished_send.send(exit);
return;
}
ChunkCommand::Suspend => {
continue;
}
ChunkCommand::Resume => {}
}
if machines.is_empty() {
finished_send.send((Ok(exit_code), cycles)).unwrap();
Expand Down
76 changes: 65 additions & 11 deletions script/src/verify/tests/ckb_latest/features_since_v2023.rs
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ fn check_spawn_snapshot() {
assert!(chunks_count > 1);
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn check_spawn_async() {
let script_version = SCRIPT_VERSION;
if script_version <= ScriptVersion::V1 {
Expand Down Expand Up @@ -705,20 +705,17 @@ async fn check_spawn_async() {
let _jt = tokio::spawn(async move {
loop {
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
let res = command_tx.send(ChunkCommand::Suspend);
let _res = command_tx.send(ChunkCommand::Suspend);
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
if res.is_err() {
break;
}
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
let res = command_tx.send(ChunkCommand::Resume);
if res.is_err() {
break;
}

let _res = command_tx.send(ChunkCommand::Resume);
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;

let _res = command_tx.send(ChunkCommand::Suspend);
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;

let _res = command_tx.send(ChunkCommand::Resume);
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
});

Expand All @@ -729,6 +726,63 @@ async fn check_spawn_async() {
assert_eq!(cycles, cycles_once);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn check_spawn_suspend_shutdown() {
let script_version = SCRIPT_VERSION;
if script_version <= ScriptVersion::V1 {
return;
}

let (spawn_caller_cell, spawn_caller_data_hash) =
load_cell_from_path("testdata/spawn_caller_exec");
let (snapshot_cell, _) = load_cell_from_path("testdata/infinite_loop");

let spawn_caller_script = Script::new_builder()
.hash_type(script_version.data_hash_type().into())
.code_hash(spawn_caller_data_hash)
.build();
let output = CellOutputBuilder::default()
.capacity(capacity_bytes!(100).pack())
.lock(spawn_caller_script)
.build();
let input = CellInput::new(OutPoint::null(), 0);

let transaction = TransactionBuilder::default().input(input).build();
let dummy_cell = create_dummy_cell(output);

let rtx = ResolvedTransaction {
transaction,
resolved_cell_deps: vec![spawn_caller_cell, snapshot_cell],
resolved_inputs: vec![dummy_cell],
resolved_dep_groups: vec![],
};

let verifier = TransactionScriptsVerifierWithEnv::new();
let (command_tx, mut command_rx) = watch::channel(ChunkCommand::Resume);
let _jt = tokio::spawn(async move {
loop {
let _res = command_tx.send(ChunkCommand::Suspend);
tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;

let _res = command_tx.send(ChunkCommand::Resume);
tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;

let _res = command_tx.send(ChunkCommand::Suspend);
tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;

let _res = command_tx.send(ChunkCommand::Stop);
tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
}
});

let res = verifier
.verify_complete_async(script_version, &rtx, &mut command_rx, true)
.await;
assert!(res.is_err());
let err = res.unwrap_err().to_string();
assert!(err.contains("VM Internal Error: Unexpected(\"stopped\")"));
}

#[test]
fn check_spawn_state() {
let script_version = SCRIPT_VERSION;
Expand Down
2 changes: 2 additions & 0 deletions script/testdata/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ ALL_BINS := jalr_zero \
exec_caller_big_offset_length \
exec_configurable_callee \
exec_configurable_caller \
infinite_loop \
load_code_to_stack_then_reuse \
load_is_even_into_global \
load_is_even_with_snapshot \
Expand Down Expand Up @@ -120,6 +121,7 @@ cpop_lock: cpop_lock.c
mop_adc_lock: mop_adc_lock.S
current_cycles: current_cycles.c
current_cycles_with_snapshot: current_cycles_with_snapshot.c
infinite_loop: infinite_loop.c
vm_version: vm_version.c
vm_version_2: vm_version_2.c
vm_version_with_snapshot: vm_version_with_snapshot.c
Expand Down
Binary file added script/testdata/infinite_loop
Binary file not shown.
15 changes: 15 additions & 0 deletions script/testdata/infinite_loop.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#include "ckb_syscalls.h"

#ifdef DEBUG
#include <stdio.h>
#else
#define ckb_debug(...)
#define sprintf(...)
#endif


int main() {
for(; ;) {
}
return CKB_SUCCESS;
}
33 changes: 23 additions & 10 deletions tx-pool/src/verify_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ impl Worker {
}

pub(crate) struct VerifyMgr {
workers: Vec<Worker>,
workers: Vec<(watch::Sender<ChunkCommand>, Worker)>,
join_handles: Option<Vec<JoinHandle<()>>>,
signal_exit: CancellationToken,
command_rx: watch::Receiver<ChunkCommand>,
Expand All @@ -256,15 +256,18 @@ impl VerifyMgr {
let workers: Vec<_> = (0..num_cpus::get())
.map({
let tasks = Arc::clone(&verify_queue);
let command_rx = chunk_rx.clone();
let signal_exit = signal_exit.clone();
move |_| {
Worker::new(
service.clone(),
Arc::clone(&tasks),
command_rx.clone(),
queue_rx.clone(),
signal_exit.clone(),
let (child_tx, child_rx) = watch::channel(ChunkCommand::Resume);
(
child_tx,
Worker::new(
service.clone(),
Arc::clone(&tasks),
child_rx,
queue_rx.clone(),
signal_exit.clone(),
),
)
}
})
Expand All @@ -277,21 +280,31 @@ impl VerifyMgr {
}
}

fn send_child_command(&self, command: ChunkCommand) {
for w in &self.workers {
if let Err(err) = w.0.send(command.clone()) {
info!("send worker command failed, error: {}", err);
}
}
}

async fn start_loop(&mut self) {
let mut join_handles = Vec::new();
for w in self.workers.iter_mut() {
let h = w.clone().start();
let h = w.1.clone().start();
join_handles.push(h);
}
self.join_handles.replace(join_handles);
loop {
tokio::select! {
_ = self.signal_exit.cancelled() => {
info!("TxPool chunk_command service received exit signal, exit now");
self.send_child_command(ChunkCommand::Stop);
break;
},
_ = self.command_rx.changed() => {
//eprintln!("command: {:?}", self.command_rx.borrow());
let command = self.command_rx.borrow().to_owned();
self.send_child_command(command);
}
}
}
Expand Down

0 comments on commit 1607a98

Please sign in to comment.