diff --git a/e2e_test/batch/functions/internal.slt.part b/e2e_test/batch/functions/internal.slt.part index fdb240c4c371a..52ddbcbc87a58 100644 --- a/e2e_test/batch/functions/internal.slt.part +++ b/e2e_test/batch/functions/internal.slt.part @@ -13,5 +13,8 @@ insert into t values ('aaa', 1, 1), ('bbb', 0, 2), ('ccc', 0, 5), ('ddd', 1, 4) statement ok select rw_vnode(_row_id) as vnode, _row_id from t; +statement ok +select rw_vnode(v2 + 114), rw_vnode(514) from t; + statement ok drop table t diff --git a/src/expr/impl/src/scalar/vnode.rs b/src/expr/impl/src/scalar/vnode.rs index 7d44dfb0e03b1..1cb92cb036097 100644 --- a/src/expr/impl/src/scalar/vnode.rs +++ b/src/expr/impl/src/scalar/vnode.rs @@ -24,17 +24,21 @@ use risingwave_expr::{build_function, Result}; #[derive(Debug)] struct VnodeExpression { - dist_key_indices: Vec, + /// A list of expressions to get the distribution key columns. Typically `InputRef`. + children: Vec, + + /// Normally, we pass the distribution key indices to `VirtualNode::compute_xx` functions. + /// But in this case, all children columns are used to compute vnode. So we cache a vector of + /// all indices here and pass it later to reduce allocation. + all_indices: Vec, } #[build_function("vnode(...) -> int2")] fn build(_: DataType, children: Vec) -> Result { - let dist_key_indices = children - .into_iter() - .map(|child| child.input_ref_index().unwrap()) - .collect(); - - Ok(Box::new(VnodeExpression { dist_key_indices })) + Ok(Box::new(VnodeExpression { + all_indices: (0..children.len()).collect(), + children, + })) } #[async_trait::async_trait] @@ -44,7 +48,13 @@ impl Expression for VnodeExpression { } async fn eval(&self, input: &DataChunk) -> Result { - let vnodes = VirtualNode::compute_chunk(input, &self.dist_key_indices, vnode_count()); + let mut arrays = Vec::with_capacity(self.children.len()); + for child in &self.children { + arrays.push(child.eval(input).await?); + } + let input = DataChunk::new(arrays, input.visibility().clone()); + + let vnodes = VirtualNode::compute_chunk(&input, &self.all_indices, vnode_count()); let mut builder = I16ArrayBuilder::new(input.capacity()); vnodes .into_iter() @@ -53,8 +63,14 @@ impl Expression for VnodeExpression { } async fn eval_row(&self, input: &OwnedRow) -> Result { + let mut datums = Vec::with_capacity(self.children.len()); + for child in &self.children { + datums.push(child.eval_row(input).await?); + } + let input = OwnedRow::new(datums); + Ok(Some( - VirtualNode::compute_row(input, &self.dist_key_indices, vnode_count()) + VirtualNode::compute_row(input, &self.all_indices, vnode_count()) .to_scalar() .into(), ))