Skip to content

Commit

Permalink
reaches
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Apr 12, 2024
1 parent 98b7618 commit 0effb4b
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 9 deletions.
48 changes: 41 additions & 7 deletions src/common/src/util/recursive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,75 @@ use std::cell::RefCell;
const RED_ZONE: usize = 128 * 1024; // 128KiB
const STACK_SIZE: usize = 16 * RED_ZONE; // 2MiB

/// Recursion depth.
struct Depth {
/// The current depth.
current: usize,
/// The max depth reached so far, not considering the current depth.
last_max: usize,
}

impl Depth {
const fn new() -> Self {
Self {
current: 0,
last_max: 0,
}
}

fn reset(&mut self) {
*self = Self::new();
}
}

/// The tracker for a recursive function.
pub struct Tracker {
depth: RefCell<usize>,
depth: RefCell<Depth>,
}

impl Tracker {
/// Create a new tracker.
pub const fn new() -> Self {
Self {
depth: RefCell::new(0),
depth: RefCell::new(Depth::new()),
}
}

/// Retrieve the current depth of the recursion. Starts from 1 once the
/// recursive function is called.
pub fn depth(&self) -> usize {
*self.depth.borrow()
self.depth.borrow().current
}

/// Check if the current depth reaches the given depth **for the first time**.
///
/// This is useful for logging without any duplication.
pub fn depth_reaches(&self, depth: usize) -> bool {
let d = self.depth.borrow();
d.current == depth && d.current > d.last_max
}

/// Run a recursive function. Grow the stack if necessary.
fn recurse<T>(&self, f: impl FnOnce() -> T) -> T {
struct DepthGuard<'a> {
depth: &'a RefCell<usize>,
depth: &'a RefCell<Depth>,
}

impl<'a> DepthGuard<'a> {
fn new(depth: &'a RefCell<usize>) -> Self {
*depth.borrow_mut() += 1;
fn new(depth: &'a RefCell<Depth>) -> Self {
depth.borrow_mut().current += 1;
Self { depth }
}
}

impl<'a> Drop for DepthGuard<'a> {
fn drop(&mut self) {
*self.depth.borrow_mut() -= 1;
let mut d = self.depth.borrow_mut();
d.last_max = d.last_max.max(d.current); // update the last max depth
d.current -= 1; // restore the current depth
if d.current == 0 {
d.reset(); // reset state if the recursion is finished
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ impl dyn PlanNode {
state: &mut BuildFragmentGraphState,
) -> SchedulerResult<StreamPlanPb> {
recursive::tracker!().recurse(|t| {
if t.depth() == PLAN_DEPTH_THRESHOLD {
if t.depth_reaches(PLAN_DEPTH_THRESHOLD) {
notice_to_user(PLAN_TOO_DEEP_NOTICE);
}

Expand Down Expand Up @@ -762,7 +762,7 @@ impl dyn PlanNode {
/// (for testing).
pub fn to_batch_prost_identity(&self, identity: bool) -> SchedulerResult<BatchPlanPb> {
recursive::tracker!().recurse(|t| {
if t.depth() == PLAN_DEPTH_THRESHOLD {
if t.depth_reaches(PLAN_DEPTH_THRESHOLD) {
notice_to_user(PLAN_TOO_DEEP_NOTICE);
}

Expand Down

0 comments on commit 0effb4b

Please sign in to comment.