From 667db65abdefaacfaa7e0427bb0aefa3be1aa34f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EB=83=A5=EB=83=90=EC=B1=A0?= Date: Thu, 26 Dec 2024 06:01:21 +0900 Subject: [PATCH] feat(base): make able to trigger early drop with other resources (#465) * fix(ci): adjust log fileter * chore: update `types/global.d.ts` * feat(base): make able to trigger early drop with other resources * chore: add integration tests --- .github/workflows/ci.yml | 2 +- crates/base/src/deno_runtime.rs | 9 + crates/base/src/worker/pool.rs | 14 +- .../worker/supervisor/strategy_per_worker.rs | 210 ++++++++++++------ .../base/test_cases/early-drop-mem/index.ts | 26 +++ .../test_cases/early-drop-wall-clock/index.ts | 18 ++ crates/base/tests/integration_tests.rs | 82 +++++++ types/global.d.ts | 3 +- 8 files changed, 283 insertions(+), 81 deletions(-) create mode 100644 crates/base/test_cases/early-drop-mem/index.ts create mode 100644 crates/base/test_cases/early-drop-wall-clock/index.ts diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index baff97d23..18bdc6787 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,7 +13,7 @@ env: CARGO_TERM_COLOR: always RUSTUP_MAX_RETRIES: 10 ORT_DYLIB_PATH: /tmp/onnxruntime/lib/libonnxruntime.so - RUST_LOG: event_worker=trace + RUST_LOG: sb_event_worker=trace jobs: cargo-fmt: diff --git a/crates/base/src/deno_runtime.rs b/crates/base/src/deno_runtime.rs index 743ad9830..c83b7768d 100644 --- a/crates/base/src/deno_runtime.rs +++ b/crates/base/src/deno_runtime.rs @@ -279,6 +279,7 @@ pub struct RuntimeState { pub event_loop_completed: Arc, pub terminated: Arc, pub found_inspector_session: Arc, + pub mem_reached_half: Arc, } impl RuntimeState { @@ -1324,6 +1325,14 @@ where } } + if let Some(limit) = mem_state.limit { + if total_malloced_bytes >= limit / 2 { + state.mem_reached_half.raise(); + } else { + state.mem_reached_half.lower(); + } + } + if let Some(threshold_bytes) = beforeunload_mem_threshold.load().as_deref().copied() { let total_malloced_bytes = total_malloced_bytes as u64; diff --git a/crates/base/src/worker/pool.rs b/crates/base/src/worker/pool.rs index 350d1a016..73fa7054d 100644 --- a/crates/base/src/worker/pool.rs +++ b/crates/base/src/worker/pool.rs @@ -460,8 +460,6 @@ impl WorkerPool { if tx.send(Ok(CreateUserWorkerResult { key: uuid })).is_err() { error!("main worker receiver dropped") }; - - status.demand.fetch_add(1, Ordering::Release); } Err(err) => { error!("{err:#}"); @@ -502,6 +500,8 @@ impl WorkerPool { let cancel = worker.cancel.clone(); let (req_start_tx, req_end_tx) = profile.timing_tx_pair.clone(); + profile.status.demand.fetch_add(1, Ordering::Release); + // Create a closure to handle the request and send the response let request_handler = async move { if !policy.is_per_worker() { @@ -648,15 +648,7 @@ impl WorkerPool { .get(&worker_uuid) .map(|it| it.status.is_retired.clone()) { - Some(is_retired) if !is_retired.is_raised() => { - self.user_workers - .get(&worker_uuid) - .map(|it| it.status.demand.as_ref()) - .unwrap() - .fetch_add(1, Ordering::Release); - - Some(worker_uuid) - } + Some(is_retired) if !is_retired.is_raised() => Some(worker_uuid), _ => { self.retire(&worker_uuid); diff --git a/crates/base/src/worker/supervisor/strategy_per_worker.rs b/crates/base/src/worker/supervisor/strategy_per_worker.rs index c72eaf04f..45a0cebe3 100644 --- a/crates/base/src/worker/supervisor/strategy_per_worker.rs +++ b/crates/base/src/worker/supervisor/strategy_per_worker.rs @@ -1,15 +1,24 @@ -use std::{future::pending, sync::atomic::Ordering, time::Duration}; +use std::{ + future::pending, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::Duration, +}; #[cfg(debug_assertions)] use std::thread::ThreadId; +use deno_core::unsync::AtomicFlag; use log::{error, info}; +use sb_core::PromiseMetrics; use sb_event_worker::events::ShutdownReason; use sb_workers::context::{Timing, TimingStatus, UserWorkerMsgs}; use tokio_util::sync::CancellationToken; use crate::{ - deno_runtime::WillTerminateReason, + deno_runtime::{RuntimeState, WillTerminateReason}, worker::supervisor::{ create_wall_clock_beforeunload_alert, v8_handle_beforeunload, v8_handle_drain, v8_handle_early_drop_beforeunload, v8_handle_early_retire, wait_cpu_alarm, CPUUsage, @@ -19,6 +28,77 @@ use crate::{ use super::{v8_handle_termination, Arguments, CPUUsageMetrics, V8HandleTerminationData}; +#[derive(Debug, Default)] +struct State { + is_worker_entered: bool, + is_wall_clock_limit_disabled: bool, + is_wall_clock_beforeunload_armed: bool, + is_cpu_time_soft_limit_reached: bool, + is_mem_half_reached: bool, + is_waiting_for_termination: bool, + is_retired: Arc, + + wall_clock_alerts: usize, + + req_ack_count: usize, + req_demand: Arc, + + runtime: Arc, + promise: PromiseMetrics, +} + +impl Drop for State { + fn drop(&mut self) { + self.is_retired.raise(); + } +} + +impl State { + fn update_runtime_state(&mut self) { + self.is_mem_half_reached = self.runtime.mem_reached_half.is_raised(); + } + + fn worker_enter(&mut self) { + assert!(!self.is_worker_entered); + self.is_worker_entered = true; + self.update_runtime_state(); + } + + fn worker_leave(&mut self) { + assert!(self.is_worker_entered); + self.is_worker_entered = false; + self.update_runtime_state(); + } + + fn req_acknowledged(&mut self) { + self.req_ack_count += 1; + self.update_runtime_state(); + } + + fn is_retired(&self) -> bool { + self.is_retired.is_raised() + } + + fn has_resource_alert(&self) -> bool { + self.is_waiting_for_termination + || self.is_cpu_time_soft_limit_reached + || self.is_mem_half_reached + || self.wall_clock_alerts == 2 + } + + fn have_all_reqs_been_acknowledged(&self) -> bool { + self.req_ack_count == self.req_demand.load(Ordering::Acquire) + } + + fn have_all_pending_tasks_been_resolved(&self) -> bool { + self.have_all_reqs_been_acknowledged() && self.promise.have_all_promises_been_resolved() + } + + fn can_early_drop(&self) -> bool { + self.has_resource_alert() && self.have_all_pending_tasks_been_resolved() + } +} + pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { let Arguments { key, @@ -50,29 +130,24 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { let (cpu_timer, mut cpu_alarms_rx) = cpu_timer.unzip(); let (soft_limit_ms, hard_limit_ms) = cpu_timer_param.limits(); - let guard = scopeguard::guard(is_retired.clone(), |v| { - v.raise(); - }); - #[cfg(debug_assertions)] let mut current_thread_id = Option::::None; let wall_clock_limit_ms = runtime_opts.worker_timeout_ms; - let is_wall_clock_limit_disabled = wall_clock_limit_ms == 0; - let mut is_worker_entered = false; - let mut is_wall_clock_beforeunload_armed = false; - let mut is_cpu_time_soft_limit_reached = false; - let mut is_waiting_for_termination = false; - let mut have_all_reqs_been_acknowledged: bool; + let mut complete_reason = None::; + let mut state = State { + is_wall_clock_limit_disabled: wall_clock_limit_ms == 0, + is_retired: is_retired.clone(), + req_demand: demand, + runtime: runtime_state, + promise: promise_metrics, + ..Default::default() + }; let mut cpu_usage_metrics_rx = cpu_usage_metrics_rx.unwrap(); let mut cpu_usage_ms = 0i64; - let mut complete_reason = None::; - let mut wall_clock_alerts = 0; - let mut req_ack_count = 0usize; - let wall_clock_limit_ms = if wall_clock_limit_ms < 2 { 2 } else { @@ -94,13 +169,18 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { flags.beforeunload_wall_clock_pct, ); - let early_retire_fn = || { - // we should raise a retire signal because subsequent incoming requests are unlikely to get - // enough wall clock time or cpu time - guard.raise(); + let early_retire_fn = { + let is_retired = state.is_retired.clone(); + let thread_safe_handle = thread_safe_handle.clone(); + let waker = waker.clone(); + move || { + // we should raise a retire signal because subsequent incoming requests are unlikely to get + // enough wall clock time or cpu time + is_retired.raise(); - if thread_safe_handle.request_interrupt(v8_handle_early_retire, std::ptr::null_mut()) { - waker.wake(); + if thread_safe_handle.request_interrupt(v8_handle_early_retire, std::ptr::null_mut()) { + waker.wake(); + } } }; @@ -160,14 +240,15 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { Some(token) => token.inbound.cancelled().await, None => pending().await, } - }, if !is_waiting_for_termination => { - is_waiting_for_termination = true; + }, if !state.is_waiting_for_termination => { + state.is_waiting_for_termination = true; early_retire_fn(); + if let Some(func) = dispatch_drain_fn.take() { func(); } - if promise_metrics.have_all_promises_been_resolved() { + if state.have_all_pending_tasks_been_resolved() { if let Some(func) = dispatch_early_drop_beforeunload_fn.take() { func(); } @@ -199,8 +280,7 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { current_thread_id = Some(_thread_id); } - assert!(!is_worker_entered); - is_worker_entered = true; + state.worker_enter(); if !cpu_timer_param.is_disabled() { if let Some(Err(err)) = cpu_timer.as_ref().map(|it| it.reset()) { @@ -210,25 +290,23 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { } CPUUsageMetrics::Leave(CPUUsage { accumulated, .. }) => { - assert!(is_worker_entered); + state.worker_leave(); - is_worker_entered = false; cpu_usage_ms = accumulated / 1_000_000; - have_all_reqs_been_acknowledged = req_ack_count == demand.load(Ordering::Acquire); if !cpu_timer_param.is_disabled() { if cpu_usage_ms >= hard_limit_ms as i64 { error!("CPU time hard limit reached: isolate: {:?}", key); complete_reason = Some(ShutdownReason::CPUTime); - } else if cpu_usage_ms >= soft_limit_ms as i64 && !is_cpu_time_soft_limit_reached { + } else if cpu_usage_ms >= soft_limit_ms as i64 + && !state.is_cpu_time_soft_limit_reached + { early_retire_fn(); error!("CPU time soft limit reached: isolate: {:?}", key); - is_cpu_time_soft_limit_reached = true; + state.is_cpu_time_soft_limit_reached = true; - if have_all_reqs_been_acknowledged - && promise_metrics.have_all_promises_been_resolved() - { + if state.have_all_pending_tasks_been_resolved() { if let Some(func) = dispatch_early_drop_beforeunload_fn.take() { func(); } @@ -236,10 +314,7 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { } } - if (is_cpu_time_soft_limit_reached || is_waiting_for_termination) - && have_all_reqs_been_acknowledged - && promise_metrics.have_all_promises_been_resolved() - { + if state.can_early_drop() { if let Some(func) = dispatch_early_drop_beforeunload_fn.take() { func(); } @@ -249,17 +324,14 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { } Some(_) = wait_cpu_alarm(cpu_alarms_rx.as_mut()) => { - if is_worker_entered { - if !is_cpu_time_soft_limit_reached { + if state.is_worker_entered { + if !state.is_cpu_time_soft_limit_reached { early_retire_fn(); error!("CPU time soft limit reached: isolate: {:?}", key); - is_cpu_time_soft_limit_reached = true; - have_all_reqs_been_acknowledged = req_ack_count == demand.load(Ordering::Acquire); + state.is_cpu_time_soft_limit_reached = true; - if have_all_reqs_been_acknowledged - && promise_metrics.have_all_promises_been_resolved() - { + if state.have_all_pending_tasks_been_resolved() { if let Some(func) = dispatch_early_drop_beforeunload_fn.take() { func(); } @@ -272,10 +344,9 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { } Some(_) = req_end_rx.recv() => { - req_ack_count += 1; - have_all_reqs_been_acknowledged = req_ack_count == demand.load(Ordering::Acquire); + state.req_acknowledged(); - if !is_cpu_time_soft_limit_reached { + if !state.has_resource_alert() { if let Some(tx) = pool_msg_tx.clone() { if tx.send(UserWorkerMsgs::Idle(key)).is_err() { error!("failed to send idle msg to pool: {:?}", key); @@ -283,42 +354,47 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { } } - if have_all_reqs_been_acknowledged && guard.is_raised() { + if state.have_all_reqs_been_acknowledged() && state.is_retired() { if let Some(func) = dispatch_drain_fn.take() { func(); } } - - if !is_cpu_time_soft_limit_reached - || !have_all_reqs_been_acknowledged - || !promise_metrics.have_all_promises_been_resolved() - { + if !state.can_early_drop() { continue; } - if let Some(func) = dispatch_early_drop_beforeunload_fn.take() { func(); } } - _ = wall_clock_duration_alert.tick(), if !is_wall_clock_limit_disabled => { - if wall_clock_alerts == 0 { + _ = wall_clock_duration_alert.tick(), if !state.is_wall_clock_limit_disabled => { + if state.wall_clock_alerts == 0 { // first tick completes immediately - wall_clock_alerts += 1; - } else if wall_clock_alerts == 1 { + state.wall_clock_alerts += 1; + } else if state.wall_clock_alerts == 1 { early_retire_fn(); error!("wall clock duration warning: isolate: {:?}", key); - wall_clock_alerts += 1; + + state.wall_clock_alerts += 1; + + if state.can_early_drop() { + if let Some(func) = dispatch_early_drop_beforeunload_fn.take() { + func(); + } + } } else { - let is_in_flight_req_exists = req_ack_count != demand.load(Ordering::Acquire); + error!( + "wall clock duration reached: isolate: {:?} (in_flight_req_exists = {})", + key, + !state.have_all_reqs_been_acknowledged() + ); - error!("wall clock duration reached: isolate: {:?} (in_flight_req_exists = {})", key, is_in_flight_req_exists); complete_reason = Some(ShutdownReason::WallClockTime); } } _ = &mut wall_clock_beforeunload_alert, - if !is_wall_clock_limit_disabled && !is_wall_clock_beforeunload_armed + if !state.is_wall_clock_limit_disabled && !state.is_wall_clock_beforeunload_armed => { let data_ptr_mut = Box::into_raw(Box::new(V8HandleBeforeunloadData { reason: WillTerminateReason::WallClock @@ -332,7 +408,7 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { drop(unsafe { Box::from_raw(data_ptr_mut) }); } - is_wall_clock_beforeunload_armed = true; + state.is_wall_clock_beforeunload_armed = true; } Some(_) = memory_limit_rx.recv() => { @@ -348,9 +424,9 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { match complete_reason.take() { Some(ShutdownReason::EarlyDrop) => { - terminate_fn(runtime_state.is_evaluating_mod()); + terminate_fn(state.runtime.is_evaluating_mod()); return ( - if is_waiting_for_termination { + if state.is_waiting_for_termination { ShutdownReason::TerminationRequested } else { ShutdownReason::EarlyDrop diff --git a/crates/base/test_cases/early-drop-mem/index.ts b/crates/base/test_cases/early-drop-mem/index.ts new file mode 100644 index 000000000..5958db3ad --- /dev/null +++ b/crates/base/test_cases/early-drop-mem/index.ts @@ -0,0 +1,26 @@ +function sleep(ms: number) { + return new Promise(res => { + setTimeout(() => { + res(void 0); + }, ms) + }); +} + +addEventListener("beforeunload", ev => { + console.log(ev.detail); +}); + +const mem = []; + +export default { + async fetch() { + for (const _ of [...Array(12).keys()]) { + const buf = new Uint8Array(1024 * 1024); + buf.fill(1, 0); + mem.push(buf); + await sleep(300); + } + + return new Response(); + } +} \ No newline at end of file diff --git a/crates/base/test_cases/early-drop-wall-clock/index.ts b/crates/base/test_cases/early-drop-wall-clock/index.ts new file mode 100644 index 000000000..f3fb406bd --- /dev/null +++ b/crates/base/test_cases/early-drop-wall-clock/index.ts @@ -0,0 +1,18 @@ +function sleep(ms: number) { + return new Promise(res => { + setTimeout(() => { + res(void 0); + }, ms) + }); +} + +addEventListener("beforeunload", ev => { + console.log(ev.detail); +}); + +export default { + async fetch() { + await sleep(2000); + return new Response(); + } +} \ No newline at end of file diff --git a/crates/base/tests/integration_tests.rs b/crates/base/tests/integration_tests.rs index 5686083a4..b49bfdae3 100644 --- a/crates/base/tests/integration_tests.rs +++ b/crates/base/tests/integration_tests.rs @@ -3295,6 +3295,88 @@ async fn test_should_not_wait_for_background_tests() { } } +#[tokio::test] +#[serial] +async fn test_should_be_able_to_trigger_early_drop_with_wall_clock() { + let (tx, mut rx) = mpsc::unbounded_channel(); + let tb = TestBedBuilder::new("./test_cases/main") + .with_per_worker_policy(None) + .with_worker_event_sender(Some(tx)) + .build() + .await; + + let resp = tb + .request(|b| { + b.uri("/early-drop-wall-clock") + .header("x-worker-timeout-ms", HeaderValue::from_static("3000")) + .body(Body::empty()) + .context("can't make request") + }) + .await + .unwrap(); + + assert_eq!(resp.status().as_u16(), StatusCode::OK); + + sleep(Duration::from_secs(2)).await; + rx.close(); + tb.exit(Duration::from_secs(TESTBED_DEADLINE_SEC)).await; + + while let Some(ev) = rx.recv().await { + let WorkerEvents::Log(ev) = ev.event else { + continue; + }; + if ev.level != LogLevel::Info { + continue; + } + if ev.msg.contains("early_drop") { + return; + } + } + + unreachable!("test failed"); +} + +#[tokio::test] +#[serial] +async fn test_should_be_able_to_trigger_early_drop_with_mem() { + let (tx, mut rx) = mpsc::unbounded_channel(); + let tb = TestBedBuilder::new("./test_cases/main") + .with_per_worker_policy(None) + .with_worker_event_sender(Some(tx)) + .build() + .await; + + let resp = tb + .request(|b| { + b.uri("/early-drop-mem") + .header("x-memory-limit-mb", HeaderValue::from_static("20")) + .body(Body::empty()) + .context("can't make request") + }) + .await + .unwrap(); + + assert_eq!(resp.status().as_u16(), StatusCode::OK); + + sleep(Duration::from_secs(2)).await; + rx.close(); + tb.exit(Duration::from_secs(TESTBED_DEADLINE_SEC)).await; + + while let Some(ev) = rx.recv().await { + let WorkerEvents::Log(ev) = ev.event else { + continue; + }; + if ev.level != LogLevel::Info { + continue; + } + if ev.msg.contains("early_drop") { + return; + } + } + + unreachable!("test failed"); +} + #[derive(Deserialize)] struct ErrorResponsePayload { msg: String, diff --git a/types/global.d.ts b/types/global.d.ts index ac02f3656..3433d94b8 100644 --- a/types/global.d.ts +++ b/types/global.d.ts @@ -1,10 +1,9 @@ declare type BeforeunloadReason = "cpu" | "memory" | "wall_clock" | "early_drop" | "termination"; -declare interface BeforeunloadEvent extends CustomEvent { } declare interface WindowEventMap { "load": Event; "unload": Event; - "beforeunload": BeforeunloadEvent; + "beforeunload": CustomEvent; "drain": Event; }