From 4dadf10e4e365754c2256fafe0b89aa632b1c5c4 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 8 Oct 2024 18:07:00 +0800 Subject: [PATCH 1/8] add a new type for user, update build function Signed-off-by: Bugen Zhao --- proto/expr.proto | 1 + src/expr/core/src/expr_context.rs | 11 +++-- src/expr/impl/src/scalar/vnode.rs | 43 +++++++++++++++++-- .../binder/expr/function/builtin_scalar.rs | 2 +- src/frontend/src/expr/pure.rs | 1 + src/frontend/src/expr/type_inference/func.rs | 5 ++- .../src/optimizer/plan_expr_visitor/strong.rs | 1 + 7 files changed, 51 insertions(+), 13 deletions(-) diff --git a/proto/expr.proto b/proto/expr.proto index 808f402a77aa8..29421f46961e9 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -302,6 +302,7 @@ message ExprNode { // Internal functions VNODE = 1101; TEST_PAID_TIER = 1102; + VNODE_USER = 1103; // Non-deterministic functions PROCTIME = 2023; PG_SLEEP = 2024; diff --git a/src/expr/core/src/expr_context.rs b/src/expr/core/src/expr_context.rs index d6798a861315f..e66f6322706f9 100644 --- a/src/expr/core/src/expr_context.rs +++ b/src/expr/core/src/expr_context.rs @@ -14,7 +14,6 @@ use std::future::Future; -use risingwave_common::hash::VirtualNode; use risingwave_expr::{define_context, Result as ExprResult}; use risingwave_pb::plan_common::ExprContext; @@ -30,11 +29,11 @@ pub fn capture_expr_context() -> ExprResult { Ok(ExprContext { time_zone }) } -/// Get the vnode count from the context, or [`VirtualNode::COUNT_FOR_COMPAT`] if not set. -// TODO(var-vnode): the only case where this is not set is for batch queries, is it still -// necessary to support `rw_vnode` expression in batch queries? -pub fn vnode_count() -> usize { - VNODE_COUNT::try_with(|&x| x).unwrap_or(VirtualNode::COUNT_FOR_COMPAT) +/// Get the vnode count from the context. +/// +/// Always returns `Ok` in streaming mode and `Err` in batch mode. +pub fn vnode_count() -> ExprResult { + VNODE_COUNT::try_with(|&x| x) } pub async fn expr_context_scope(expr_context: ExprContext, future: Fut) -> Fut::Output diff --git a/src/expr/impl/src/scalar/vnode.rs b/src/expr/impl/src/scalar/vnode.rs index 1cb92cb036097..aefedf8203ea7 100644 --- a/src/expr/impl/src/scalar/vnode.rs +++ b/src/expr/impl/src/scalar/vnode.rs @@ -14,16 +14,19 @@ use std::sync::Arc; +use anyhow::Context; +use itertools::Itertools; use risingwave_common::array::{ArrayBuilder, ArrayImpl, ArrayRef, DataChunk, I16ArrayBuilder}; use risingwave_common::hash::VirtualNode; use risingwave_common::row::OwnedRow; use risingwave_common::types::{DataType, Datum}; use risingwave_expr::expr::{BoxedExpression, Expression}; -use risingwave_expr::expr_context::vnode_count; -use risingwave_expr::{build_function, Result}; +use risingwave_expr::{build_function, expr_context, Result}; #[derive(Debug)] struct VnodeExpression { + vnode_count: Option, + /// A list of expressions to get the distribution key columns. Typically `InputRef`. children: Vec, @@ -36,6 +39,28 @@ struct VnodeExpression { #[build_function("vnode(...) -> int2")] fn build(_: DataType, children: Vec) -> Result { Ok(Box::new(VnodeExpression { + vnode_count: None, + all_indices: (0..children.len()).collect(), + children, + })) +} + +#[build_function("vnode_user(...) -> int2")] +fn build_user(_: DataType, children: Vec) -> Result { + let mut children = children.into_iter(); + + let vnode_count = children + .next() + .unwrap() // always exist, enforced in binder + .eval_const() // required to be constant + .ok() + .flatten() // required to be non-null + .context("rw_vnode expects the first argument to be a non-null constant")? + .into_int16() as usize; + let children = children.collect_vec(); + + Ok(Box::new(VnodeExpression { + vnode_count: Some(vnode_count), all_indices: (0..children.len()).collect(), children, })) @@ -54,7 +79,7 @@ impl Expression for VnodeExpression { } let input = DataChunk::new(arrays, input.visibility().clone()); - let vnodes = VirtualNode::compute_chunk(&input, &self.all_indices, vnode_count()); + let vnodes = VirtualNode::compute_chunk(&input, &self.all_indices, self.vnode_count()?); let mut builder = I16ArrayBuilder::new(input.capacity()); vnodes .into_iter() @@ -70,13 +95,23 @@ impl Expression for VnodeExpression { let input = OwnedRow::new(datums); Ok(Some( - VirtualNode::compute_row(input, &self.all_indices, vnode_count()) + VirtualNode::compute_row(input, &self.all_indices, self.vnode_count()?) .to_scalar() .into(), )) } } +impl VnodeExpression { + fn vnode_count(&self) -> Result { + if let Some(vnode_count) = self.vnode_count { + Ok(vnode_count) + } else { + expr_context::vnode_count() + } + } +} + #[cfg(test)] mod tests { use risingwave_common::array::{DataChunk, DataChunkTestExt}; diff --git a/src/frontend/src/binder/expr/function/builtin_scalar.rs b/src/frontend/src/binder/expr/function/builtin_scalar.rs index d46681c51ab3e..7781140432577 100644 --- a/src/frontend/src/binder/expr/function/builtin_scalar.rs +++ b/src/frontend/src/binder/expr/function/builtin_scalar.rs @@ -662,7 +662,7 @@ impl Binder { ("pg_is_in_recovery", raw_call(ExprType::PgIsInRecovery)), ("rw_recovery_status", raw_call(ExprType::RwRecoveryStatus)), // internal - ("rw_vnode", raw_call(ExprType::Vnode)), + ("rw_vnode", raw_call(ExprType::VnodeUser)), ("rw_test_paid_tier", raw_call(ExprType::TestPaidTier)), // for testing purposes // TODO: choose which pg version we should return. ("version", raw_literal(ExprImpl::literal_varchar(current_cluster_version()))), diff --git a/src/frontend/src/expr/pure.rs b/src/frontend/src/expr/pure.rs index d47cc3851f641..c79309572cc18 100644 --- a/src/frontend/src/expr/pure.rs +++ b/src/frontend/src/expr/pure.rs @@ -271,6 +271,7 @@ impl ExprVisitor for ImpureAnalyzer { } // expression output is not deterministic Type::Vnode + | Type::VnodeUser | Type::TestPaidTier | Type::Proctime | Type::PgSleep diff --git a/src/frontend/src/expr/type_inference/func.rs b/src/frontend/src/expr/type_inference/func.rs index ada1ac2367b5c..8dfe08e8b6dfa 100644 --- a/src/frontend/src/expr/type_inference/func.rs +++ b/src/frontend/src/expr/type_inference/func.rs @@ -660,8 +660,9 @@ fn infer_type_for_special( .into()), } } - ExprType::Vnode => { - ensure_arity!("vnode", 1 <= | inputs |); + ExprType::VnodeUser => { + ensure_arity!("vnode", 2 <= | inputs |); + inputs[0].cast_implicit_mut(DataType::Int16)?; Ok(Some(VirtualNode::RW_TYPE)) } ExprType::Greatest | ExprType::Least => { diff --git a/src/frontend/src/optimizer/plan_expr_visitor/strong.rs b/src/frontend/src/optimizer/plan_expr_visitor/strong.rs index 673a5f41746bb..890152f00e337 100644 --- a/src/frontend/src/optimizer/plan_expr_visitor/strong.rs +++ b/src/frontend/src/optimizer/plan_expr_visitor/strong.rs @@ -304,6 +304,7 @@ impl Strong { | ExprType::MapInsert | ExprType::MapLength | ExprType::Vnode + | ExprType::VnodeUser | ExprType::TestPaidTier | ExprType::Proctime | ExprType::PgSleep From dff451e50b3856629868c5be9c15cecde6d47289 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 8 Oct 2024 18:16:43 +0800 Subject: [PATCH 2/8] refine Signed-off-by: Bugen Zhao --- src/expr/impl/src/scalar/vnode.rs | 8 +++++++- src/frontend/src/expr/type_inference/func.rs | 2 +- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/expr/impl/src/scalar/vnode.rs b/src/expr/impl/src/scalar/vnode.rs index aefedf8203ea7..ce8d992aecbfd 100644 --- a/src/expr/impl/src/scalar/vnode.rs +++ b/src/expr/impl/src/scalar/vnode.rs @@ -25,6 +25,8 @@ use risingwave_expr::{build_function, expr_context, Result}; #[derive(Debug)] struct VnodeExpression { + /// `Some` if it's from the first argument of user-facing function `VnodeUser` (`rw_vnode`), + /// `None` if it's from the internal function `Vnode`. vnode_count: Option, /// A list of expressions to get the distribution key columns. Typically `InputRef`. @@ -55,8 +57,12 @@ fn build_user(_: DataType, children: Vec) -> Result { ensure_arity!("vnode", 2 <= | inputs |); - inputs[0].cast_implicit_mut(DataType::Int16)?; + inputs[0].cast_explicit_mut(DataType::Int16)?; Ok(Some(VirtualNode::RW_TYPE)) } ExprType::Greatest | ExprType::Least => { From 288a4381cc2283564c143964f7083c84f0243466 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 9 Oct 2024 14:24:36 +0800 Subject: [PATCH 3/8] minor refine Signed-off-by: Bugen Zhao --- src/expr/impl/src/scalar/vnode.rs | 9 ++++----- src/frontend/src/expr/type_inference/func.rs | 2 +- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/expr/impl/src/scalar/vnode.rs b/src/expr/impl/src/scalar/vnode.rs index ce8d992aecbfd..e5208e30ff553 100644 --- a/src/expr/impl/src/scalar/vnode.rs +++ b/src/expr/impl/src/scalar/vnode.rs @@ -53,12 +53,11 @@ fn build_user(_: DataType, children: Vec) -> Result { - ensure_arity!("vnode", 2 <= | inputs |); + ensure_arity!("rw_vnode", 2 <= | inputs |); inputs[0].cast_explicit_mut(DataType::Int16)?; Ok(Some(VirtualNode::RW_TYPE)) } From 671668efdb3ea0b05eba32173136a84f61d48ff7 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 9 Oct 2024 14:55:17 +0800 Subject: [PATCH 4/8] add unit tests Signed-off-by: Bugen Zhao --- e2e_test/batch/functions/internal.slt.part | 20 ------------- e2e_test/batch/functions/vnode.slt.part | 33 ++++++++++++++++++++++ 2 files changed, 33 insertions(+), 20 deletions(-) delete mode 100644 e2e_test/batch/functions/internal.slt.part create mode 100644 e2e_test/batch/functions/vnode.slt.part diff --git a/e2e_test/batch/functions/internal.slt.part b/e2e_test/batch/functions/internal.slt.part deleted file mode 100644 index 52ddbcbc87a58..0000000000000 --- a/e2e_test/batch/functions/internal.slt.part +++ /dev/null @@ -1,20 +0,0 @@ -statement ok -SET RW_IMPLICIT_FLUSH TO true; - -statement ok -create table t(v1 varchar, v2 int, v3 int) - -statement ok -select rw_vnode(_row_id) as vnode, _row_id from t; - -statement ok -insert into t values ('aaa', 1, 1), ('bbb', 0, 2), ('ccc', 0, 5), ('ddd', 1, 4) - -statement ok -select rw_vnode(_row_id) as vnode, _row_id from t; - -statement ok -select rw_vnode(v2 + 114), rw_vnode(514) from t; - -statement ok -drop table t diff --git a/e2e_test/batch/functions/vnode.slt.part b/e2e_test/batch/functions/vnode.slt.part new file mode 100644 index 0000000000000..e5b65626dce65 --- /dev/null +++ b/e2e_test/batch/functions/vnode.slt.part @@ -0,0 +1,33 @@ +query error Function `rw_vnode` takes at least 2 arguments (0 given) +select rw_vnode(); + +query error Function `rw_vnode` takes at least 2 arguments (1 given) +select rw_vnode(256); + +query I +select rw_vnode(256, 114, 514); +---- +97 + +query I +select rw_vnode(4096, 114, 514); +---- +1377 + +query error the first argument (vnode count) must not be NULL +select rw_vnode(NULL, 114, 514); + +statement ok +create table vnodes (vnode int); + +statement ok +insert into vnodes values (256), (4096); + +statement ok +flush; + +query error the first argument (vnode count) must be a constant +select rw_vnode(vnode, 114, 514) from vnodes; + +statement ok +drop table vnodes; From 5f948a03572f4567a4e7fba697243af2277c8a82 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 9 Oct 2024 15:14:29 +0800 Subject: [PATCH 5/8] use i32 instead Signed-off-by: Bugen Zhao --- e2e_test/batch/functions/vnode.slt.part | 20 ++++++++++++++++---- src/expr/impl/src/scalar/vnode.rs | 13 +++++++++---- src/frontend/src/expr/type_inference/func.rs | 2 +- 3 files changed, 26 insertions(+), 9 deletions(-) diff --git a/e2e_test/batch/functions/vnode.slt.part b/e2e_test/batch/functions/vnode.slt.part index e5b65626dce65..9f955c289d483 100644 --- a/e2e_test/batch/functions/vnode.slt.part +++ b/e2e_test/batch/functions/vnode.slt.part @@ -1,7 +1,7 @@ -query error Function `rw_vnode` takes at least 2 arguments (0 given) +query error takes at least 2 arguments \(0 given\) select rw_vnode(); -query error Function `rw_vnode` takes at least 2 arguments (1 given) +query error takes at least 2 arguments \(1 given\) select rw_vnode(256); query I @@ -14,9 +14,21 @@ select rw_vnode(4096, 114, 514); ---- 1377 -query error the first argument (vnode count) must not be NULL +# VirtualNode::MAX_COUNT +query I +select rw_vnode(32768, 114, 514); +---- +21857 + +query error the first argument \(vnode count\) must not be NULL select rw_vnode(NULL, 114, 514); +query error the first argument \(vnode count\) must be in range 1..=32768 +select rw_vnode(0, 114, 514); + +query error the first argument \(vnode count\) must be in range 1..=32768 +select rw_vnode(32769, 114, 514); + statement ok create table vnodes (vnode int); @@ -26,7 +38,7 @@ insert into vnodes values (256), (4096); statement ok flush; -query error the first argument (vnode count) must be a constant +query error the first argument \(vnode count\) must be a constant select rw_vnode(vnode, 114, 514) from vnodes; statement ok diff --git a/src/expr/impl/src/scalar/vnode.rs b/src/expr/impl/src/scalar/vnode.rs index e5208e30ff553..f466fd4189e16 100644 --- a/src/expr/impl/src/scalar/vnode.rs +++ b/src/expr/impl/src/scalar/vnode.rs @@ -57,15 +57,20 @@ fn build_user(_: DataType, children: Vec) -> Result { ensure_arity!("rw_vnode", 2 <= | inputs |); - inputs[0].cast_explicit_mut(DataType::Int16)?; + inputs[0].cast_explicit_mut(DataType::Int32)?; // vnode count Ok(Some(VirtualNode::RW_TYPE)) } ExprType::Greatest | ExprType::Least => { From 8a25e77436d93901c0dd458c960c0e9d18436ed5 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 9 Oct 2024 15:21:16 +0800 Subject: [PATCH 6/8] vnode_user is deterministic Signed-off-by: Bugen Zhao --- src/frontend/src/expr/pure.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/frontend/src/expr/pure.rs b/src/frontend/src/expr/pure.rs index c79309572cc18..5e3bd968a46b1 100644 --- a/src/frontend/src/expr/pure.rs +++ b/src/frontend/src/expr/pure.rs @@ -261,7 +261,8 @@ impl ExprVisitor for ImpureAnalyzer { | Type::MapContains | Type::MapDelete | Type::MapInsert - | Type::MapLength => + | Type::MapLength + | Type::VnodeUser => // expression output is deterministic(same result for the same input) { func_call @@ -270,8 +271,7 @@ impl ExprVisitor for ImpureAnalyzer { .for_each(|expr| self.visit_expr(expr)); } // expression output is not deterministic - Type::Vnode - | Type::VnodeUser + Type::Vnode // obtain vnode count from the context | Type::TestPaidTier | Type::Proctime | Type::PgSleep From 9bf37c765665c09eadcac2798394bda32ceea831 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 9 Oct 2024 17:32:21 +0800 Subject: [PATCH 7/8] add back type inference for internally used vnode function Signed-off-by: Bugen Zhao --- src/frontend/src/expr/type_inference/func.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/frontend/src/expr/type_inference/func.rs b/src/frontend/src/expr/type_inference/func.rs index 9470cfa31039c..99392ad87b971 100644 --- a/src/frontend/src/expr/type_inference/func.rs +++ b/src/frontend/src/expr/type_inference/func.rs @@ -660,6 +660,9 @@ fn infer_type_for_special( .into()), } } + // internal use only + ExprType::Vnode => Ok(Some(VirtualNode::RW_TYPE)), + // user-facing `rw_vnode` ExprType::VnodeUser => { ensure_arity!("rw_vnode", 2 <= | inputs |); inputs[0].cast_explicit_mut(DataType::Int32)?; // vnode count From 5505393bc8a3ce7d48796fd8bbd9c9817a2e9e8c Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 10 Oct 2024 13:05:38 +0800 Subject: [PATCH 8/8] replace impure cse test with `pg_sleep` Signed-off-by: Bugen Zhao --- .../tests/testdata/input/cse_expr.yaml | 4 ++-- .../tests/testdata/output/cse_expr.yaml | 14 +++++++------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/frontend/planner_test/tests/testdata/input/cse_expr.yaml b/src/frontend/planner_test/tests/testdata/input/cse_expr.yaml index baf9135a4200f..66ca41a518c8a 100644 --- a/src/frontend/planner_test/tests/testdata/input/cse_expr.yaml +++ b/src/frontend/planner_test/tests/testdata/input/cse_expr.yaml @@ -14,8 +14,8 @@ - stream_plan - name: Common sub expression shouldn't extract impure function sql: | - create table t(v1 varchar, v2 int, v3 int); - select rw_vnode(v2) + 1 as vnode, rw_vnode(v2) + 1 as vnode2, v2 + 1 x, v2 + 1 y from t; + create table t(v1 varchar, v2 double, v3 double); + select pg_sleep(v2) + 1 as a, pg_sleep(v2) + 1 as b, v2 + 1 x, v2 + 1 y from t; expected_outputs: - batch_plan - stream_plan diff --git a/src/frontend/planner_test/tests/testdata/output/cse_expr.yaml b/src/frontend/planner_test/tests/testdata/output/cse_expr.yaml index abbc0aae184e0..9372c837324f6 100644 --- a/src/frontend/planner_test/tests/testdata/output/cse_expr.yaml +++ b/src/frontend/planner_test/tests/testdata/output/cse_expr.yaml @@ -29,17 +29,17 @@ └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } - name: Common sub expression shouldn't extract impure function sql: | - create table t(v1 varchar, v2 int, v3 int); - select rw_vnode(v2) + 1 as vnode, rw_vnode(v2) + 1 as vnode2, v2 + 1 x, v2 + 1 y from t; + create table t(v1 varchar, v2 double, v3 double); + select pg_sleep(v2) + 1 as a, pg_sleep(v2) + 1 as b, v2 + 1 x, v2 + 1 y from t; batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchProject { exprs: [(Vnode(t.v2) + 1:Int32) as $expr2, (Vnode(t.v2) + 1:Int32) as $expr3, $expr1, $expr1] } - └─BatchProject { exprs: [t.v2, (t.v2 + 1:Int32) as $expr1] } + └─BatchProject { exprs: [(PgSleep(t.v2) + 1:Int32) as $expr2, (PgSleep(t.v2) + 1:Int32) as $expr3, $expr1, $expr1] } + └─BatchProject { exprs: [t.v2, (t.v2 + 1:Float64) as $expr1] } └─BatchScan { table: t, columns: [t.v2], distribution: SomeShard } stream_plan: |- - StreamMaterialize { columns: [vnode, vnode2, x, y, t._row_id(hidden)], stream_key: [t._row_id], pk_columns: [t._row_id], pk_conflict: NoCheck } - └─StreamProject { exprs: [(Vnode(t.v2) + 1:Int32) as $expr2, (Vnode(t.v2) + 1:Int32) as $expr3, $expr1, $expr1, t._row_id] } - └─StreamProject { exprs: [t.v2, (t.v2 + 1:Int32) as $expr1, t._row_id] } + StreamMaterialize { columns: [a, b, x, y, t._row_id(hidden)], stream_key: [t._row_id], pk_columns: [t._row_id], pk_conflict: NoCheck } + └─StreamProject { exprs: [(PgSleep(t.v2) + 1:Int32) as $expr2, (PgSleep(t.v2) + 1:Int32) as $expr3, $expr1, $expr1, t._row_id] } + └─StreamProject { exprs: [t.v2, (t.v2 + 1:Float64) as $expr1, t._row_id] } └─StreamTableScan { table: t, columns: [t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } - name: Common sub expression shouldn't extract const sql: |