Skip to content

Commit

Permalink
Merge branch 'main' into xxchan/welcome-armadillo
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan authored Aug 20, 2024
2 parents 5750227 + ed73a52 commit 8bd8bd9
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 40 deletions.
2 changes: 1 addition & 1 deletion Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ do
fi
done
if (( "$(du -sk ${PREFIX_LOG} | cut -f1)" > 3000 )) ; then
if (( "$(du -sk ${PREFIX_LOG} | cut -f1)" > 4000 )) ; then
echo "$(tput setaf 1)ERROR: log size is significantly large ($(du -sh ${PREFIX_LOG} | cut -f1)).$(tput sgr0) Please disable unnecessary logs."
exit 1
fi
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/error_ui/simple/main.slt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Caused by these errors (recent errors listed first):
2: invalid IPv4 address


statement error Failed to run the query
statement error failed to send requests to UDF service
create function int_42() returns int as int_42 using link '55.55.55.55:5555';


Expand Down
11 changes: 1 addition & 10 deletions src/expr/core/src/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,16 +160,7 @@ pub fn build(agg: &AggCall, prefer_append_only: bool) -> Result<BoxedAggregateFu
};

// find the signature for builtin aggregation
let sig = crate::sig::FUNCTION_REGISTRY
.get(*kind, agg.args.arg_types(), &agg.return_type)
.ok_or_else(|| {
ExprError::UnsupportedFunction(format!(
"{}({}) -> {}",
kind.as_str_name().to_ascii_lowercase(),
agg.args.arg_types().iter().format(", "),
agg.return_type,
))
})?;
let sig = crate::sig::FUNCTION_REGISTRY.get(*kind, agg.args.arg_types(), &agg.return_type)?;

if let FuncBuilder::Aggregate {
append_only: Some(f),
Expand Down
13 changes: 2 additions & 11 deletions src/expr/core/src/expr/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::expr::{
BoxedExpression, Expression, ExpressionBoxExt, InputRefExpression, LiteralExpression,
};
use crate::sig::FUNCTION_REGISTRY;
use crate::{bail, ExprError, Result};
use crate::{bail, Result};

/// Build an expression from protobuf.
pub fn build_from_prost(prost: &ExprNode) -> Result<BoxedExpression> {
Expand Down Expand Up @@ -188,16 +188,7 @@ pub fn build_func(
children: Vec<BoxedExpression>,
) -> Result<BoxedExpression> {
let args = children.iter().map(|c| c.return_type()).collect_vec();
let desc = FUNCTION_REGISTRY
.get(func, &args, &ret_type)
.ok_or_else(|| {
ExprError::UnsupportedFunction(format!(
"{}({}) -> {}",
func.as_str_name().to_ascii_lowercase(),
args.iter().format(", "),
ret_type,
))
})?;
let desc = FUNCTION_REGISTRY.get(func, &args, &ret_type)?;
desc.build_scalar(ret_type, children)
}

Expand Down
35 changes: 32 additions & 3 deletions src/expr/core/src/sig/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,38 @@ impl FunctionRegistry {
name: impl Into<FuncName>,
args: &[DataType],
ret: &DataType,
) -> Option<&FuncSign> {
let v = self.0.get(&name.into())?;
v.iter().find(|d| d.match_args_ret(args, ret))
) -> Result<&FuncSign, ExprError> {
let name = name.into();
let err = |candidates: &Vec<FuncSign>| {
// Note: if we return error here, it probably means there is a bug in frontend type inference,
// because such error should be caught in the frontend.
ExprError::UnsupportedFunction(format!(
"{}({}) -> {}{}",
name,
args.iter().format(", "),
ret,
if candidates.is_empty() {
"".to_string()
} else {
format!(
"\nHINT: Supported functions:\n{}",
candidates
.iter()
.map(|d| format!(
" {}({}) -> {}",
d.name,
d.inputs_type.iter().format(", "),
d.ret_type
))
.format("\n")
)
}
))
};
let v = self.0.get(&name).ok_or_else(|| err(&vec![]))?;
v.iter()
.find(|d| d.match_args_ret(args, ret))
.ok_or_else(|| err(v))
}

/// Returns all function signatures with the same type and number of arguments.
Expand Down
11 changes: 1 addition & 10 deletions src/expr/core/src/table_function/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,16 +127,7 @@ pub fn build(
) -> Result<BoxedTableFunction> {
use itertools::Itertools;
let args = children.iter().map(|t| t.return_type()).collect_vec();
let desc = crate::sig::FUNCTION_REGISTRY
.get(func, &args, &return_type)
.ok_or_else(|| {
ExprError::UnsupportedFunction(format!(
"{}({}) -> setof {}",
func.as_str_name().to_ascii_lowercase(),
args.iter().format(", "),
return_type,
))
})?;
let desc = crate::sig::FUNCTION_REGISTRY.get(func, &args, &return_type)?;
desc.build_table(return_type, chunk_size, children)
}

Expand Down
6 changes: 3 additions & 3 deletions src/frontend/planner_test/tests/testdata/output/cse_expr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
└─BatchScan { table: t, columns: [t.v1], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [x, y, t._row_id(hidden)], stream_key: [t._row_id], pk_columns: [t._row_id], pk_conflict: NoCheck }
└─StreamProject { exprs: [JsonbAccess($expr1, 'c':Varchar) as $expr2, JsonbAccess($expr1, 'b':Varchar) as $expr3, t._row_id] }
└─StreamProject { exprs: [t.v1, JsonbAccess(t.v1, 'a':Varchar) as $expr1, t._row_id] }
└─StreamProject { exprs: [JsonbAccess($expr1, 'c':Varchar) as $expr2, JsonbAccess($expr1, 'b':Varchar) as $expr3, t._row_id], noop_update_hint: true }
└─StreamProject { exprs: [t.v1, JsonbAccess(t.v1, 'a':Varchar) as $expr1, t._row_id], noop_update_hint: true }
└─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
- name: Common sub expression extract2
sql: |
Expand All @@ -25,7 +25,7 @@
stream_plan: |-
StreamMaterialize { columns: [x, y, t._row_id(hidden)], stream_key: [t._row_id], pk_columns: [t._row_id], pk_conflict: NoCheck }
└─StreamProject { exprs: [$expr1, $expr1, t._row_id] }
└─StreamProject { exprs: [t.v1, JsonbAccess(JsonbAccess(t.v1, 'a':Varchar), 'c':Varchar) as $expr1, t._row_id] }
└─StreamProject { exprs: [t.v1, JsonbAccess(JsonbAccess(t.v1, 'a':Varchar), 'c':Varchar) as $expr1, t._row_id], noop_update_hint: true }
└─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
- name: Common sub expression shouldn't extract impure function
sql: |
Expand Down
35 changes: 35 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,41 @@ impl<PlanRef: GenericPlanRef> Project<PlanRef> {
})
.collect::<Option<Vec<_>>>()
}

pub(crate) fn likely_produces_noop_updates(&self) -> bool {
struct HasJsonbAccess {
has: bool,
}

impl ExprVisitor for HasJsonbAccess {
fn visit_function_call(&mut self, func_call: &FunctionCall) {
if matches!(
func_call.func_type(),
ExprType::JsonbAccess
| ExprType::JsonbAccessStr
| ExprType::JsonbExtractPath
| ExprType::JsonbExtractPathVariadic
| ExprType::JsonbExtractPathText
| ExprType::JsonbExtractPathTextVariadic
| ExprType::JsonbPathExists
| ExprType::JsonbPathMatch
| ExprType::JsonbPathQueryArray
| ExprType::JsonbPathQueryFirst
) {
self.has = true;
}
}
}

self.exprs.iter().any(|expr| {
// When there's a jsonb access in the `Project`, it's very likely that the query is
// extracting some fields from a jsonb payload column. In this case, a change from the
// input jsonb payload may not change the output of the `Project`.
let mut visitor = HasJsonbAccess { has: false };
visitor.visit_expr(expr);
visitor.has
})
}
}

/// Construct a `Project` and dedup expressions.
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ impl Distill for StreamProject {

impl StreamProject {
pub fn new(core: generic::Project<PlanRef>) -> Self {
Self::new_inner(core, false)
let noop_update_hint = core.likely_produces_noop_updates();
Self::new_inner(core, noop_update_hint)
}

/// Set the `noop_update_hint` flag to the given value.
Expand Down

0 comments on commit 8bd8bd9

Please sign in to comment.