Skip to content

Commit

Permalink
feat(expr): support any children in rw_vnode (#18405)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Oct 8, 2024
1 parent c174d91 commit 419c853
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 9 deletions.
3 changes: 3 additions & 0 deletions e2e_test/batch/functions/internal.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,8 @@ insert into t values ('aaa', 1, 1), ('bbb', 0, 2), ('ccc', 0, 5), ('ddd', 1, 4)
statement ok
select rw_vnode(_row_id) as vnode, _row_id from t;

statement ok
select rw_vnode(v2 + 114), rw_vnode(514) from t;

statement ok
drop table t
34 changes: 25 additions & 9 deletions src/expr/impl/src/scalar/vnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,21 @@ use risingwave_expr::{build_function, Result};

#[derive(Debug)]
struct VnodeExpression {
dist_key_indices: Vec<usize>,
/// A list of expressions to get the distribution key columns. Typically `InputRef`.
children: Vec<BoxedExpression>,

/// Normally, we pass the distribution key indices to `VirtualNode::compute_xx` functions.
/// But in this case, all children columns are used to compute vnode. So we cache a vector of
/// all indices here and pass it later to reduce allocation.
all_indices: Vec<usize>,
}

#[build_function("vnode(...) -> int2")]
fn build(_: DataType, children: Vec<BoxedExpression>) -> Result<BoxedExpression> {
let dist_key_indices = children
.into_iter()
.map(|child| child.input_ref_index().unwrap())
.collect();

Ok(Box::new(VnodeExpression { dist_key_indices }))
Ok(Box::new(VnodeExpression {
all_indices: (0..children.len()).collect(),
children,
}))
}

#[async_trait::async_trait]
Expand All @@ -44,7 +48,13 @@ impl Expression for VnodeExpression {
}

async fn eval(&self, input: &DataChunk) -> Result<ArrayRef> {
let vnodes = VirtualNode::compute_chunk(input, &self.dist_key_indices, vnode_count());
let mut arrays = Vec::with_capacity(self.children.len());
for child in &self.children {
arrays.push(child.eval(input).await?);
}
let input = DataChunk::new(arrays, input.visibility().clone());

let vnodes = VirtualNode::compute_chunk(&input, &self.all_indices, vnode_count());
let mut builder = I16ArrayBuilder::new(input.capacity());
vnodes
.into_iter()
Expand All @@ -53,8 +63,14 @@ impl Expression for VnodeExpression {
}

async fn eval_row(&self, input: &OwnedRow) -> Result<Datum> {
let mut datums = Vec::with_capacity(self.children.len());
for child in &self.children {
datums.push(child.eval_row(input).await?);
}
let input = OwnedRow::new(datums);

Ok(Some(
VirtualNode::compute_row(input, &self.dist_key_indices, vnode_count())
VirtualNode::compute_row(input, &self.all_indices, vnode_count())
.to_scalar()
.into(),
))
Expand Down

0 comments on commit 419c853

Please sign in to comment.