From 6d8abb7bc1a115d6c386e9edba924ebf916ceb61 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Tue, 20 Aug 2024 22:54:19 +0800 Subject: [PATCH 1/4] fix: flaky udf e2e error ui test (#18132) Signed-off-by: Richard Chien Co-authored-by: xxchan --- e2e_test/error_ui/simple/main.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e_test/error_ui/simple/main.slt b/e2e_test/error_ui/simple/main.slt index 23f136c7cc032..a59b8acfa2e88 100644 --- a/e2e_test/error_ui/simple/main.slt +++ b/e2e_test/error_ui/simple/main.slt @@ -19,7 +19,7 @@ Caused by these errors (recent errors listed first): 2: invalid IPv4 address -statement error Failed to run the query +statement error failed to send requests to UDF service create function int_42() returns int as int_42 using link '55.55.55.55:5555'; From 6b5e364873401b42d75dbe3cba59f4787ebeaae0 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 20 Aug 2024 22:57:24 +0800 Subject: [PATCH 2/4] ci: fix log size limit failure by increasing limit (#18117) Signed-off-by: xxchan --- Makefile.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile.toml b/Makefile.toml index 2fe13db1ca8cc..554d2df8d3f3c 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -276,7 +276,7 @@ do fi done -if (( "$(du -sk ${PREFIX_LOG} | cut -f1)" > 3000 )) ; then +if (( "$(du -sk ${PREFIX_LOG} | cut -f1)" > 4000 )) ; then echo "$(tput setaf 1)ERROR: log size is significantly large ($(du -sh ${PREFIX_LOG} | cut -f1)).$(tput sgr0) Please disable unnecessary logs." exit 1 fi From 4b32136fe06ec284fb1466fdcadce40ffedef414 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 20 Aug 2024 23:00:28 +0800 Subject: [PATCH 3/4] refactor(expr): show candicates when function not found (#18078) --- src/expr/core/src/aggregate/mod.rs | 11 +------- src/expr/core/src/expr/build.rs | 13 ++------- src/expr/core/src/sig/mod.rs | 35 ++++++++++++++++++++++--- src/expr/core/src/table_function/mod.rs | 11 +------- 4 files changed, 36 insertions(+), 34 deletions(-) diff --git a/src/expr/core/src/aggregate/mod.rs b/src/expr/core/src/aggregate/mod.rs index 695aad482343d..e77c549a99db0 100644 --- a/src/expr/core/src/aggregate/mod.rs +++ b/src/expr/core/src/aggregate/mod.rs @@ -160,16 +160,7 @@ pub fn build(agg: &AggCall, prefer_append_only: bool) -> Result {}", - kind.as_str_name().to_ascii_lowercase(), - agg.args.arg_types().iter().format(", "), - agg.return_type, - )) - })?; + let sig = crate::sig::FUNCTION_REGISTRY.get(*kind, agg.args.arg_types(), &agg.return_type)?; if let FuncBuilder::Aggregate { append_only: Some(f), diff --git a/src/expr/core/src/expr/build.rs b/src/expr/core/src/expr/build.rs index 91ebe67f479a9..988adbb5d8342 100644 --- a/src/expr/core/src/expr/build.rs +++ b/src/expr/core/src/expr/build.rs @@ -30,7 +30,7 @@ use crate::expr::{ BoxedExpression, Expression, ExpressionBoxExt, InputRefExpression, LiteralExpression, }; use crate::sig::FUNCTION_REGISTRY; -use crate::{bail, ExprError, Result}; +use crate::{bail, Result}; /// Build an expression from protobuf. pub fn build_from_prost(prost: &ExprNode) -> Result { @@ -188,16 +188,7 @@ pub fn build_func( children: Vec, ) -> Result { let args = children.iter().map(|c| c.return_type()).collect_vec(); - let desc = FUNCTION_REGISTRY - .get(func, &args, &ret_type) - .ok_or_else(|| { - ExprError::UnsupportedFunction(format!( - "{}({}) -> {}", - func.as_str_name().to_ascii_lowercase(), - args.iter().format(", "), - ret_type, - )) - })?; + let desc = FUNCTION_REGISTRY.get(func, &args, &ret_type)?; desc.build_scalar(ret_type, children) } diff --git a/src/expr/core/src/sig/mod.rs b/src/expr/core/src/sig/mod.rs index ae5af5b57c649..aae3802489d09 100644 --- a/src/expr/core/src/sig/mod.rs +++ b/src/expr/core/src/sig/mod.rs @@ -113,9 +113,38 @@ impl FunctionRegistry { name: impl Into, args: &[DataType], ret: &DataType, - ) -> Option<&FuncSign> { - let v = self.0.get(&name.into())?; - v.iter().find(|d| d.match_args_ret(args, ret)) + ) -> Result<&FuncSign, ExprError> { + let name = name.into(); + let err = |candidates: &Vec| { + // Note: if we return error here, it probably means there is a bug in frontend type inference, + // because such error should be caught in the frontend. + ExprError::UnsupportedFunction(format!( + "{}({}) -> {}{}", + name, + args.iter().format(", "), + ret, + if candidates.is_empty() { + "".to_string() + } else { + format!( + "\nHINT: Supported functions:\n{}", + candidates + .iter() + .map(|d| format!( + " {}({}) -> {}", + d.name, + d.inputs_type.iter().format(", "), + d.ret_type + )) + .format("\n") + ) + } + )) + }; + let v = self.0.get(&name).ok_or_else(|| err(&vec![]))?; + v.iter() + .find(|d| d.match_args_ret(args, ret)) + .ok_or_else(|| err(v)) } /// Returns all function signatures with the same type and number of arguments. diff --git a/src/expr/core/src/table_function/mod.rs b/src/expr/core/src/table_function/mod.rs index d2d8e291ee076..ba976bda59404 100644 --- a/src/expr/core/src/table_function/mod.rs +++ b/src/expr/core/src/table_function/mod.rs @@ -127,16 +127,7 @@ pub fn build( ) -> Result { use itertools::Itertools; let args = children.iter().map(|t| t.return_type()).collect_vec(); - let desc = crate::sig::FUNCTION_REGISTRY - .get(func, &args, &return_type) - .ok_or_else(|| { - ExprError::UnsupportedFunction(format!( - "{}({}) -> setof {}", - func.as_str_name().to_ascii_lowercase(), - args.iter().format(", "), - return_type, - )) - })?; + let desc = crate::sig::FUNCTION_REGISTRY.get(func, &args, &return_type)?; desc.build_table(return_type, chunk_size, children) } From ed73a52ccf6ee2caa2bd475efb5414eca40cd8a9 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Tue, 20 Aug 2024 23:59:04 +0800 Subject: [PATCH 4/4] perf(stream): set `noop_update_hint` when jsonb access exists (#18065) Signed-off-by: Richard Chien --- .../tests/testdata/output/cse_expr.yaml | 6 ++-- .../optimizer/plan_node/generic/project.rs | 35 +++++++++++++++++++ .../src/optimizer/plan_node/stream_project.rs | 3 +- 3 files changed, 40 insertions(+), 4 deletions(-) diff --git a/src/frontend/planner_test/tests/testdata/output/cse_expr.yaml b/src/frontend/planner_test/tests/testdata/output/cse_expr.yaml index 0e5d72b3499a3..abbc0aae184e0 100644 --- a/src/frontend/planner_test/tests/testdata/output/cse_expr.yaml +++ b/src/frontend/planner_test/tests/testdata/output/cse_expr.yaml @@ -10,8 +10,8 @@ └─BatchScan { table: t, columns: [t.v1], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [x, y, t._row_id(hidden)], stream_key: [t._row_id], pk_columns: [t._row_id], pk_conflict: NoCheck } - └─StreamProject { exprs: [JsonbAccess($expr1, 'c':Varchar) as $expr2, JsonbAccess($expr1, 'b':Varchar) as $expr3, t._row_id] } - └─StreamProject { exprs: [t.v1, JsonbAccess(t.v1, 'a':Varchar) as $expr1, t._row_id] } + └─StreamProject { exprs: [JsonbAccess($expr1, 'c':Varchar) as $expr2, JsonbAccess($expr1, 'b':Varchar) as $expr3, t._row_id], noop_update_hint: true } + └─StreamProject { exprs: [t.v1, JsonbAccess(t.v1, 'a':Varchar) as $expr1, t._row_id], noop_update_hint: true } └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } - name: Common sub expression extract2 sql: | @@ -25,7 +25,7 @@ stream_plan: |- StreamMaterialize { columns: [x, y, t._row_id(hidden)], stream_key: [t._row_id], pk_columns: [t._row_id], pk_conflict: NoCheck } └─StreamProject { exprs: [$expr1, $expr1, t._row_id] } - └─StreamProject { exprs: [t.v1, JsonbAccess(JsonbAccess(t.v1, 'a':Varchar), 'c':Varchar) as $expr1, t._row_id] } + └─StreamProject { exprs: [t.v1, JsonbAccess(JsonbAccess(t.v1, 'a':Varchar), 'c':Varchar) as $expr1, t._row_id], noop_update_hint: true } └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } - name: Common sub expression shouldn't extract impure function sql: | diff --git a/src/frontend/src/optimizer/plan_node/generic/project.rs b/src/frontend/src/optimizer/plan_node/generic/project.rs index 68c652f0e006f..5d658d404db35 100644 --- a/src/frontend/src/optimizer/plan_node/generic/project.rs +++ b/src/frontend/src/optimizer/plan_node/generic/project.rs @@ -296,6 +296,41 @@ impl Project { }) .collect::>>() } + + pub(crate) fn likely_produces_noop_updates(&self) -> bool { + struct HasJsonbAccess { + has: bool, + } + + impl ExprVisitor for HasJsonbAccess { + fn visit_function_call(&mut self, func_call: &FunctionCall) { + if matches!( + func_call.func_type(), + ExprType::JsonbAccess + | ExprType::JsonbAccessStr + | ExprType::JsonbExtractPath + | ExprType::JsonbExtractPathVariadic + | ExprType::JsonbExtractPathText + | ExprType::JsonbExtractPathTextVariadic + | ExprType::JsonbPathExists + | ExprType::JsonbPathMatch + | ExprType::JsonbPathQueryArray + | ExprType::JsonbPathQueryFirst + ) { + self.has = true; + } + } + } + + self.exprs.iter().any(|expr| { + // When there's a jsonb access in the `Project`, it's very likely that the query is + // extracting some fields from a jsonb payload column. In this case, a change from the + // input jsonb payload may not change the output of the `Project`. + let mut visitor = HasJsonbAccess { has: false }; + visitor.visit_expr(expr); + visitor.has + }) + } } /// Construct a `Project` and dedup expressions. diff --git a/src/frontend/src/optimizer/plan_node/stream_project.rs b/src/frontend/src/optimizer/plan_node/stream_project.rs index ef879627c66bd..d6ac8af7b146c 100644 --- a/src/frontend/src/optimizer/plan_node/stream_project.rs +++ b/src/frontend/src/optimizer/plan_node/stream_project.rs @@ -62,7 +62,8 @@ impl Distill for StreamProject { impl StreamProject { pub fn new(core: generic::Project) -> Self { - Self::new_inner(core, false) + let noop_update_hint = core.likely_produces_noop_updates(); + Self::new_inner(core, noop_update_hint) } /// Set the `noop_update_hint` flag to the given value.