Skip to content

Commit

Permalink
vnode expr context
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Sep 16, 2024
1 parent dd939ea commit fdf0810
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 17 deletions.
7 changes: 7 additions & 0 deletions src/expr/core/src/expr_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,27 @@

use std::future::Future;

use risingwave_common::hash::VirtualNode;
use risingwave_expr::{define_context, Result as ExprResult};
use risingwave_pb::plan_common::ExprContext;

// For all execution mode.
define_context! {
pub TIME_ZONE: String,
pub FRAGMENT_ID: u32,
pub VNODE_COUNT: usize,
}

pub fn capture_expr_context() -> ExprResult<ExprContext> {
let time_zone = TIME_ZONE::try_with(ToOwned::to_owned)?;
Ok(ExprContext { time_zone })
}

/// Get the vnode count from the context, or [`VirtualNode::COUNT`] if not set.
pub fn vnode_count() -> usize {
VNODE_COUNT::try_with(|&x| x).unwrap_or(VirtualNode::COUNT)
}

pub async fn expr_context_scope<Fut>(expr_context: ExprContext, future: Fut) -> Fut::Output
where
Fut: Future,
Expand Down
7 changes: 3 additions & 4 deletions src/expr/impl/src/scalar/vnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use risingwave_common::hash::VirtualNode;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{DataType, Datum};
use risingwave_expr::expr::{BoxedExpression, Expression};
use risingwave_expr::expr_context::vnode_count;
use risingwave_expr::{build_function, Result};

#[derive(Debug)]
Expand All @@ -43,8 +44,7 @@ impl Expression for VnodeExpression {
}

async fn eval(&self, input: &DataChunk) -> Result<ArrayRef> {
// TODO(var-vnode): get vnode count from context
let vnodes = VirtualNode::compute_chunk(input, &self.dist_key_indices, VirtualNode::COUNT);
let vnodes = VirtualNode::compute_chunk(input, &self.dist_key_indices, vnode_count());
let mut builder = I16ArrayBuilder::new(input.capacity());
vnodes
.into_iter()
Expand All @@ -53,9 +53,8 @@ impl Expression for VnodeExpression {
}

async fn eval_row(&self, input: &OwnedRow) -> Result<Datum> {
// TODO(var-vnode): get vnode count from context
Ok(Some(
VirtualNode::compute_row(input, &self.dist_key_indices, VirtualNode::COUNT)
VirtualNode::compute_row(input, &self.dist_key_indices, vnode_count())
.to_scalar()
.into(),
))
Expand Down
42 changes: 29 additions & 13 deletions src/stream/src/executor/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ use std::sync::{Arc, LazyLock};
use anyhow::anyhow;
use await_tree::InstrumentAwait;
use futures::future::join_all;
use futures::FutureExt;
use hytra::TrAdder;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::TableId;
use risingwave_common::config::StreamingConfig;
use risingwave_common::hash::VirtualNode;
use risingwave_common::log::LogSuppresser;
use risingwave_common::metrics::{IntGaugeExt, GLOBAL_ERROR_METRICS};
use risingwave_common::util::epoch::EpochPair;
use risingwave_expr::expr_context::{expr_context_scope, FRAGMENT_ID};
use risingwave_expr::expr_context::{expr_context_scope, FRAGMENT_ID, VNODE_COUNT};
use risingwave_expr::ExprError;
use risingwave_pb::plan_common::ExprContext;
use risingwave_pb::stream_plan::PbStreamActor;
Expand All @@ -44,6 +47,7 @@ use crate::task::{ActorId, LocalBarrierManager};
pub struct ActorContext {
pub id: ActorId,
pub fragment_id: u32,
pub vnode_count: usize,
pub mview_definition: String,

// TODO(eric): these seem to be useless now?
Expand Down Expand Up @@ -71,6 +75,7 @@ impl ActorContext {
Arc::new(Self {
id,
fragment_id: 0,
vnode_count: VirtualNode::COUNT_FOR_TEST,
mview_definition: "".to_string(),
cur_mem_val: Arc::new(0.into()),
last_mem_val: Arc::new(0.into()),
Expand All @@ -97,6 +102,9 @@ impl ActorContext {
id: stream_actor.actor_id,
fragment_id: stream_actor.fragment_id,
mview_definition: stream_actor.mview_definition.clone(),
vnode_count: (stream_actor.vnode_bitmap.as_ref())
// TODO(var-vnode): use 1 for singleton fragment
.map_or(VirtualNode::COUNT, |b| Bitmap::from(b).len()),
cur_mem_val: Arc::new(0.into()),
last_mem_val: Arc::new(0.into()),
total_mem_val,
Expand Down Expand Up @@ -177,18 +185,26 @@ where

#[inline(always)]
pub async fn run(mut self) -> StreamResult<()> {
FRAGMENT_ID::scope(
self.actor_context.fragment_id,
expr_context_scope(self.expr_context.clone(), async move {
tokio::join!(
// Drive the subtasks concurrently.
join_all(std::mem::take(&mut self.subtasks)),
self.run_consumer(),
)
.1
}),
)
.await
let expr_context = self.expr_context.clone();
let fragment_id = self.actor_context.fragment_id;
let vnode_count = self.actor_context.vnode_count;

let run = async move {
tokio::join!(
// Drive the subtasks concurrently.
join_all(std::mem::take(&mut self.subtasks)),
self.run_consumer(),
)
.1
}
.boxed();

// Attach contexts to the future.
let run = expr_context_scope(expr_context, run);
let run = FRAGMENT_ID::scope(fragment_id, run);
let run = VNODE_COUNT::scope(vnode_count, run);

run.await
}

async fn run_consumer(self) -> StreamResult<()> {
Expand Down

0 comments on commit fdf0810

Please sign in to comment.