diff --git a/Cargo.lock b/Cargo.lock index fdd9eb3607371..624b4beb9f42f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8383,6 +8383,7 @@ dependencies = [ "cfg-or-panic", "clap", "console", + "expect-test", "fail", "futures", "glob", @@ -8398,6 +8399,7 @@ dependencies = [ "pretty_assertions", "prometheus", "rand", + "rand_chacha", "risingwave_common", "risingwave_compactor", "risingwave_compute", @@ -10236,9 +10238,9 @@ dependencies = [ [[package]] name = "thiserror-ext" -version = "0.0.7" +version = "0.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01e0feae8c8aff1b65f0bca5b24fe2b6e2331e03cd7c6daa681e43a8055f92f6" +checksum = "51b32ea5f1a980ddd27f9ad46dc18db583f508a4d148dd2cf1ee3d81bfd767cf" dependencies = [ "thiserror", "thiserror-ext-derive", @@ -10246,10 +10248,11 @@ dependencies = [ [[package]] name = "thiserror-ext-derive" -version = "0.0.7" +version = "0.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e32482e53bca3d022bdee662ea7d568835e0eb8a86ab410946104657e9e1b5b5" +checksum = "1cf8fd06431d331ee33484e2c83f0309dd2ad26606c3b639d5927665a1698043" dependencies = [ + "either", "proc-macro2", "quote", "syn 2.0.37", diff --git a/Cargo.toml b/Cargo.toml index f2fa22b19e4e6..96ab8443db0bf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -128,7 +128,7 @@ arrow-flight = "49" arrow-select = "49" arrow-ord = "49" arrow-row = "49" -thiserror-ext = "0.0.7" +thiserror-ext = "0.0.8" tikv-jemalloc-ctl = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9" } tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", features = [ "profiling", diff --git a/ci/scripts/deterministic-recovery-test.sh b/ci/scripts/deterministic-recovery-test.sh index c5f89a2bbc7e0..b1c8937ad8e8c 100755 --- a/ci/scripts/deterministic-recovery-test.sh +++ b/ci/scripts/deterministic-recovery-test.sh @@ -19,18 +19,17 @@ export LOGDIR=.risingwave/log mkdir -p $LOGDIR -# FIXME(kwannoel): Why is this failing? -# echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, background_ddl" -# seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill --kill-rate=${KILL_RATE} ./e2e_test/background_ddl/sim/basic.slt 2> $LOGDIR/recovery-ddl-{}.log && rm $LOGDIR/recovery-ddl-{}.log' +echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, background_ddl" +seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill --kill-rate=${KILL_RATE} ./e2e_test/background_ddl/sim/basic.slt 2> $LOGDIR/recovery-ddl-{}.log && rm $LOGDIR/recovery-ddl-{}.log' echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, ddl" -seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill --kill-rate=${KILL_RATE} ./e2e_test/ddl/\*\*/\*.slt 2> $LOGDIR/recovery-ddl-{}.log && rm $LOGDIR/recovery-ddl-{}.log' +seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill --kill-rate=${KILL_RATE} --background-ddl-rate=${BACKGROUND_DDL_RATE} ./e2e_test/ddl/\*\*/\*.slt 2> $LOGDIR/recovery-ddl-{}.log && rm $LOGDIR/recovery-ddl-{}.log' echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, streaming" -seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill --kill-rate=${KILL_RATE} ./e2e_test/streaming/\*\*/\*.slt 2> $LOGDIR/recovery-streaming-{}.log && rm $LOGDIR/recovery-streaming-{}.log' +seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill --kill-rate=${KILL_RATE} --background-ddl-rate=${BACKGROUND_DDL_RATE} ./e2e_test/streaming/\*\*/\*.slt 2> $LOGDIR/recovery-streaming-{}.log && rm $LOGDIR/recovery-streaming-{}.log' echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, batch" -seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill --kill-rate=${KILL_RATE} ./e2e_test/batch/\*\*/\*.slt 2> $LOGDIR/recovery-batch-{}.log && rm $LOGDIR/recovery-batch-{}.log' +seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill --kill-rate=${KILL_RATE} --background-ddl-rate=${BACKGROUND_DDL_RATE} ./e2e_test/batch/\*\*/\*.slt 2> $LOGDIR/recovery-batch-{}.log && rm $LOGDIR/recovery-batch-{}.log' echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, kafka source,sink" seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill --kill-rate=${KILL_RATE} --kafka-datadir=./scripts/source/test_data ./e2e_test/source/basic/kafka\*.slt 2> $LOGDIR/recovery-source-{}.log && rm $LOGDIR/recovery-source-{}.log' diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index ee92ccb90a7fd..e1501e908f124 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -293,7 +293,24 @@ steps: retry: *auto-retry - label: "recovery test (deterministic simulation)" - command: "TEST_NUM=12 KILL_RATE=1.0 timeout 55m ci/scripts/deterministic-recovery-test.sh" + command: "TEST_NUM=12 KILL_RATE=1.0 BACKGROUND_DDL_RATE=0.0 timeout 55m ci/scripts/deterministic-recovery-test.sh" + if: | + !(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null + || build.pull_request.labels includes "ci/run-recovery-test-deterministic-simulation" + || build.env("CI_STEPS") =~ /(^|,)recovery-tests?-deterministic-simulation(,|$$)/ + depends_on: "build-simulation" + plugins: + - docker-compose#v4.9.0: + run: rw-build-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + - ./ci/plugins/upload-failure-logs + timeout_in_minutes: 60 + retry: *auto-retry + + # Ddl statements will randomly run with background_ddl. + - label: "background_ddl recovery test (deterministic simulation)" + command: "TEST_NUM=12 KILL_RATE=1.0 BACKGROUND_DDL_RATE=0.8 timeout 55m ci/scripts/deterministic-recovery-test.sh" if: | !(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null || build.pull_request.labels includes "ci/run-recovery-test-deterministic-simulation" diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index df4a12ba8ac8c..7b43ccb77a2b1 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -410,7 +410,7 @@ steps: retry: *auto-retry - label: "recovery test (deterministic simulation)" - command: "TEST_NUM=8 KILL_RATE=0.5 ci/scripts/deterministic-recovery-test.sh" + command: "TEST_NUM=8 KILL_RATE=0.5 BACKGROUND_DDL_RATE=0.0 ci/scripts/deterministic-recovery-test.sh" if: | !(build.pull_request.labels includes "ci/skip-ci") && build.env("CI_STEPS") == null || build.pull_request.labels includes "ci/run-recovery-test-deterministic-simulation" diff --git a/e2e_test/background_ddl/sim/basic.slt b/e2e_test/background_ddl/sim/basic.slt index b58dd36604c06..35f5814fe8b4f 100644 --- a/e2e_test/background_ddl/sim/basic.slt +++ b/e2e_test/background_ddl/sim/basic.slt @@ -13,6 +13,9 @@ INSERT INTO t select * from generate_series(1, 200000); statement ok FLUSH; +statement ok +SET STREAMING_RATE_LIMIT=4000; + statement ok CREATE MATERIALIZED VIEW m1 as SELECT * FROM t; @@ -26,23 +29,20 @@ CREATE MATERIALIZED VIEW m3 as SELECT * FROM t; statement error CREATE MATERIALIZED VIEW m3 as SELECT * FROM t; -# Wait for background ddl to finish -sleep 30s - query I select count(*) from m1; ---- -10000000 +200000 query I select count(*) from m2; ---- -10000000 +200000 query I select count(*) from m3; ---- -10000000 +200000 statement ok DROP MATERIALIZED VIEW m1; diff --git a/e2e_test/batch/basic/null_range_scan.slt.part b/e2e_test/batch/basic/null_range_scan.slt.part index 85c71f143c28d..761c63c1f1788 100644 --- a/e2e_test/batch/basic/null_range_scan.slt.part +++ b/e2e_test/batch/basic/null_range_scan.slt.part @@ -16,6 +16,43 @@ query II rowsort SELECT * FROM t0 WHERE (c1 > 1); ---- + +query II rowsort +SELECT * FROM t0 WHERE (c1 > 9223372036854775808) IS NULL; +---- +1 NULL + +query II rowsort +SELECT * FROM t0 WHERE (c1 < 9223372036854775808) IS NULL; +---- +1 NULL + +query II rowsort +SELECT * FROM t0 WHERE (c1 >= -9223372036854775808) IS NULL; +---- +1 NULL + +query II rowsort +SELECT * FROM t0 WHERE (c1 <= -9223372036854775808) IS NULL; +---- +1 NULL + +query II rowsort +SELECT * FROM t0 WHERE c1 > 9223372036854775808; +---- + +query II rowsort +SELECT * FROM t0 WHERE c1 < 9223372036854775808; +---- + +query II rowsort +SELECT * FROM t0 WHERE c1 >= -9223372036854775808; +---- + +query II rowsort +SELECT * FROM t0 WHERE c1 <= -9223372036854775808; +---- + statement ok create materialized view mv as select * from t0 order by c1 desc nulls last; diff --git a/e2e_test/error_ui/simple/main.slt b/e2e_test/error_ui/simple/main.slt index 65b2338cc7fce..0104cb27e2ab8 100644 --- a/e2e_test/error_ui/simple/main.slt +++ b/e2e_test/error_ui/simple/main.slt @@ -58,3 +58,26 @@ db error: ERROR: Failed to run the query Caused by these errors (recent errors listed first): 1: Expr error 2: Division by zero + + +statement error +set rw_implicit_flush to maybe; +---- +db error: ERROR: Failed to run the query + +Caused by these errors (recent errors listed first): + 1: Failed to get/set session config + 2: Invalid value `maybe` for `rw_implicit_flush` + 3: provided string was not `true` or `false` + + +statement error +set transaction_isolation_level to read_committed; +---- +db error: ERROR: Failed to run the query + +Caused by these errors (recent errors listed first): + 1: Failed to get/set session config + 2: Invalid value `read_committed` for `transaction_isolation_level` + 3: Feature is not yet implemented: isolation level +Tracking issue: https://github.com/risingwavelabs/risingwave/issues/10736 diff --git a/src/common/src/error.rs b/src/common/src/error.rs index 2e1e0be5b343b..d97fbdc0c73e9 100644 --- a/src/common/src/error.rs +++ b/src/common/src/error.rs @@ -23,6 +23,7 @@ use memcomparable::Error as MemComparableError; use risingwave_error::tonic::{ToTonicStatus, TonicStatusWrapper}; use risingwave_pb::PbFieldNotFound; use thiserror::Error; +use thiserror_ext::Macro; use tokio::task::JoinError; use crate::array::ArrayError; @@ -36,7 +37,7 @@ pub type BoxedError = Box; pub use anyhow::anyhow as anyhow_error; -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, Default)] pub struct TrackingIssue(Option); impl TrackingIssue { @@ -70,6 +71,15 @@ impl Display for TrackingIssue { } } +#[derive(Error, Debug, Macro)] +#[error("Feature is not yet implemented: {feature}\n{issue}")] +#[thiserror_ext(macro(path = "crate::error"))] +pub struct NotImplemented { + #[message] + pub feature: String, + pub issue: TrackingIssue, +} + #[derive(Error, Debug)] pub enum ErrorCode { #[error("internal error: {0}")] @@ -87,8 +97,8 @@ pub enum ErrorCode { #[backtrace] BoxedError, ), - #[error("Feature is not yet implemented: {0}\n{1}")] - NotImplemented(String, TrackingIssue), + #[error(transparent)] + NotImplemented(#[from] NotImplemented), // Tips: Use this only if it's intended to reject the query #[error("Not supported: {0}\nHINT: {1}")] NotSupported(String, String), @@ -200,6 +210,13 @@ pub enum ErrorCode { ), } +// TODO(error-handling): automatically generate this impl. +impl From for RwError { + fn from(value: NotImplemented) -> Self { + ErrorCode::from(value).into() + } +} + pub fn internal_error(msg: impl Into) -> RwError { ErrorCode::InternalError(msg.into()).into() } @@ -479,14 +496,8 @@ macro_rules! ensure_eq { #[macro_export] macro_rules! bail { - ($msg:literal $(,)?) => { - return Err($crate::error::anyhow_error!($msg).into()) - }; - ($err:expr $(,)?) => { - return Err($crate::error::anyhow_error!($err).into()) - }; - ($fmt:expr, $($arg:tt)*) => { - return Err($crate::error::anyhow_error!($fmt, $($arg)*).into()) + ($($arg:tt)*) => { + return Err($crate::error::anyhow_error!($($arg)*).into()) }; } @@ -616,7 +627,7 @@ mod tests { check_grpc_error(ErrorCode::TaskNotFound, Code::Internal); check_grpc_error(ErrorCode::InternalError(String::new()), Code::Internal); check_grpc_error( - ErrorCode::NotImplemented(String::new(), None.into()), + ErrorCode::NotImplemented(not_implemented!("test")), Code::Internal, ); } diff --git a/src/common/src/session_config/transaction_isolation_level.rs b/src/common/src/session_config/transaction_isolation_level.rs index 5f122075f41f8..81b94ccd99393 100644 --- a/src/common/src/session_config/transaction_isolation_level.rs +++ b/src/common/src/session_config/transaction_isolation_level.rs @@ -15,6 +15,8 @@ use std::fmt::Formatter; use std::str::FromStr; +use crate::error::{bail_not_implemented, NotImplemented}; + #[derive(Copy, Default, Debug, Clone, PartialEq, Eq)] // Some variants are never constructed so allow dead code here. #[allow(dead_code)] @@ -27,10 +29,10 @@ pub enum IsolationLevel { } impl FromStr for IsolationLevel { - type Err = &'static str; + type Err = NotImplemented; fn from_str(_s: &str) -> Result { - Err("isolation level is not yet supported") + bail_not_implemented!(issue = 10736, "isolation level"); } } diff --git a/src/common/src/types/to_binary.rs b/src/common/src/types/to_binary.rs index 46b5a61589493..91f7257db1cc1 100644 --- a/src/common/src/types/to_binary.rs +++ b/src/common/src/types/to_binary.rs @@ -16,7 +16,7 @@ use bytes::{Bytes, BytesMut}; use postgres_types::{ToSql, Type}; use super::{DataType, DatumRef, ScalarRefImpl, F32, F64}; -use crate::error::TrackingIssue; +use crate::error::NotImplemented; /// Error type for [`ToBinary`] trait. #[derive(thiserror::Error, Debug)] @@ -24,8 +24,8 @@ pub enum ToBinaryError { #[error(transparent)] ToSql(Box), - #[error("Feature is not yet implemented: {0}\n{1}")] - NotImplemented(String, TrackingIssue), + #[error(transparent)] + NotImplemented(#[from] NotImplemented), } pub type Result = std::result::Result; @@ -87,15 +87,10 @@ impl ToBinary for ScalarRefImpl<'_> { ScalarRefImpl::Time(v) => v.to_binary_with_type(ty), ScalarRefImpl::Bytea(v) => v.to_binary_with_type(ty), ScalarRefImpl::Jsonb(v) => v.to_binary_with_type(ty), - ScalarRefImpl::Struct(_) | ScalarRefImpl::List(_) => { - Err(ToBinaryError::NotImplemented( - format!( - "the pgwire extended-mode encoding for {} is unsupported", - ty - ), - Some(7949).into(), - )) - } + ScalarRefImpl::Struct(_) | ScalarRefImpl::List(_) => bail_not_implemented!( + issue = 7949, + "the pgwire extended-mode encoding for {ty} is unsupported" + ), } } } diff --git a/src/compute/src/memory/config.rs b/src/compute/src/memory/config.rs index 2bf9e9ba9a95f..3d959e19aa75b 100644 --- a/src/compute/src/memory/config.rs +++ b/src/compute/src/memory/config.rs @@ -21,7 +21,7 @@ pub const MIN_COMPUTE_MEMORY_MB: usize = 512; /// overhead, network buffer, etc.) in megabytes. pub const MIN_SYSTEM_RESERVED_MEMORY_MB: usize = 512; -const SYSTEM_RESERVED_MEMORY_PROPORTION: f64 = 0.2; +const SYSTEM_RESERVED_MEMORY_PROPORTION: f64 = 0.3; const STORAGE_MEMORY_PROPORTION: f64 = 0.3; @@ -157,14 +157,14 @@ mod tests { #[test] fn test_reserve_memory_bytes() { // at least 512 MB - let (reserved, non_reserved) = reserve_memory_bytes(2 << 30); + let (reserved, non_reserved) = reserve_memory_bytes(1536 << 20); assert_eq!(reserved, 512 << 20); - assert_eq!(non_reserved, 1536 << 20); + assert_eq!(non_reserved, 1024 << 20); // reserve based on proportion let (reserved, non_reserved) = reserve_memory_bytes(10 << 30); - assert_eq!(reserved, 2 << 30); - assert_eq!(non_reserved, 8 << 30); + assert_eq!(reserved, 3 << 30); + assert_eq!(non_reserved, 7 << 30); } #[test] diff --git a/src/frontend/planner_test/tests/testdata/output/range_scan.yaml b/src/frontend/planner_test/tests/testdata/output/range_scan.yaml index fb66ddb3f399b..d892c2d2417b9 100644 --- a/src/frontend/planner_test/tests/testdata/output/range_scan.yaml +++ b/src/frontend/planner_test/tests/testdata/output/range_scan.yaml @@ -339,19 +339,26 @@ SELECT * FROM mv WHERE x < 60000; batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchScan { table: mv, columns: [mv.x], distribution: UpstreamHashShard(mv.x) } + └─BatchFilter { predicate: (mv.x < 60000:Int32) } + └─BatchScan { table: mv, columns: [mv.x], distribution: UpstreamHashShard(mv.x) } - name: When the constant with larger type is out of the upper bound of the column's type, we can convert > as false condition. before: - create_small sql: | SELECT * FROM mv WHERE x > 60000; - batch_plan: 'BatchValues { rows: [] }' + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchFilter { predicate: (mv.x > 60000:Int32) } + └─BatchScan { table: mv, columns: [mv.x], distribution: UpstreamHashShard(mv.x) } - name: When the constant with larger type is out of the lower bound of the column's type, we can convert < as false condition. before: - create_small sql: | SELECT * FROM mv WHERE x < -60000; - batch_plan: 'BatchValues { rows: [] }' + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchFilter { predicate: (mv.x < -60000:Int32) } + └─BatchScan { table: mv, columns: [mv.x], distribution: UpstreamHashShard(mv.x) } - name: When the constant with larger type is out of the lower bound of the column's type, we can convert > as true condition. before: - create_small @@ -359,7 +366,8 @@ SELECT * FROM mv WHERE x > -60000; batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchScan { table: mv, columns: [mv.x], distribution: UpstreamHashShard(mv.x) } + └─BatchFilter { predicate: (mv.x > -60000:Int32) } + └─BatchScan { table: mv, columns: [mv.x], distribution: UpstreamHashShard(mv.x) } - name: When the constant with larger type is in range of the column's type, we can convert it. before: - create_small diff --git a/src/frontend/planner_test/tests/testdata/output/subquery.yaml b/src/frontend/planner_test/tests/testdata/output/subquery.yaml index 5143c7fdee853..d79ace454a704 100644 --- a/src/frontend/planner_test/tests/testdata/output/subquery.yaml +++ b/src/frontend/planner_test/tests/testdata/output/subquery.yaml @@ -450,7 +450,7 @@ create table c (v1 int, v2 int); select * from a left outer join b on a.v1 = b.v1 and a.v2 = (select v2 from c where v1 = 1 limit 1); planner_error: |- - Feature is not yet implemented: Subquery in join on condition is unsupported + Feature is not yet implemented: Subquery in join on condition No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml - sql: | create table auction (date_time date); diff --git a/src/frontend/src/binder/expr/binary_op.rs b/src/frontend/src/binder/expr/binary_op.rs index 856906f13c04c..275d4f152636f 100644 --- a/src/frontend/src/binder/expr/binary_op.rs +++ b/src/frontend/src/binder/expr/binary_op.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_common::bail_not_implemented; use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::types::DataType; use risingwave_sqlparser::ast::{BinaryOperator, Expr}; @@ -186,11 +187,7 @@ impl Binder { func_types.push(ExprType::Not); ExprType::RegexpEq } - _ => { - return Err( - ErrorCode::NotImplemented(format!("binary op: {:?}", op), 112.into()).into(), - ) - } + _ => bail_not_implemented!(issue = 112, "binary op: {:?}", op), }; func_types.push(final_type); Ok(func_types) diff --git a/src/frontend/src/binder/expr/function.rs b/src/frontend/src/binder/expr/function.rs index 184d5427176a6..4a08945b4d4eb 100644 --- a/src/frontend/src/binder/expr/function.rs +++ b/src/frontend/src/binder/expr/function.rs @@ -24,7 +24,7 @@ use risingwave_common::catalog::{INFORMATION_SCHEMA_SCHEMA_NAME, PG_CATALOG_SCHE use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::session_config::USER_NAME_WILD_CARD; use risingwave_common::types::{DataType, ScalarImpl, Timestamptz}; -use risingwave_common::{GIT_SHA, RW_VERSION}; +use risingwave_common::{bail_not_implemented, not_implemented, GIT_SHA, RW_VERSION}; use risingwave_expr::aggregate::{agg_kinds, AggKind}; use risingwave_expr::window_function::{ Frame, FrameBound, FrameBounds, FrameExclusion, WindowFuncKind, @@ -68,28 +68,22 @@ impl Binder { // FIXME: handle schema correctly, so that the functions are hidden if the schema is not in the search path. let function_name = name.real_value(); if function_name != "_pg_expandarray" { - return Err(ErrorCode::NotImplemented( - format!("Unsupported function name under schema: {}", schema_name), - 12422.into(), - ) - .into()); + bail_not_implemented!( + issue = 12422, + "Unsupported function name under schema: {}", + schema_name + ); } function_name } else { - return Err(ErrorCode::NotImplemented( - format!("Unsupported function name under schema: {}", schema_name), - 12422.into(), - ) - .into()); + bail_not_implemented!( + issue = 12422, + "Unsupported function name under schema: {}", + schema_name + ); } } - _ => { - return Err(ErrorCode::NotImplemented( - format!("qualified function: {}", f.name), - 112.into(), - ) - .into()); - } + _ => bail_not_implemented!(issue = 112, "qualified function {}", f.name), }; // agg calls @@ -141,11 +135,11 @@ impl Binder { )) .into()); } else if f.over.is_some() { - return Err(ErrorCode::NotImplemented( - format!("Unrecognized window function: {}", function_name), - 8961.into(), - ) - .into()); + bail_not_implemented!( + issue = 8961, + "Unrecognized window function: {}", + function_name + ); } // table function @@ -272,25 +266,13 @@ impl Binder { .and_then(|expr| expr.enforce_bool_clause("FILTER"))?; self.context.clause = clause; if expr.has_subquery() { - return Err(ErrorCode::NotImplemented( - "subquery in filter clause".to_string(), - None.into(), - ) - .into()); + bail_not_implemented!("subquery in filter clause"); } if expr.has_agg_call() { - return Err(ErrorCode::NotImplemented( - "aggregation function in filter clause".to_string(), - None.into(), - ) - .into()); + bail_not_implemented!("aggregation function in filter clause"); } if expr.has_table_function() { - return Err(ErrorCode::NotImplemented( - "table function in filter clause".to_string(), - None.into(), - ) - .into()); + bail_not_implemented!("table function in filter clause"); } Condition::with_expr(expr) } @@ -348,11 +330,7 @@ impl Binder { .flatten_ok() .try_collect()?; if args.iter().any(|arg| arg.as_literal().is_none()) { - return Err(ErrorCode::NotImplemented( - "non-constant direct arguments for ordered-set aggregation is not supported now".to_string(), - None.into() - ) - .into()); + bail_not_implemented!("non-constant direct arguments for ordered-set aggregation is not supported now"); } args }; @@ -470,12 +448,7 @@ impl Binder { // restrict arguments[1..] to be constant because we don't support multiple distinct key // indices for now if args.iter().skip(1).any(|arg| arg.as_literal().is_none()) { - return Err(ErrorCode::NotImplemented( - "non-constant arguments other than the first one for DISTINCT aggregation is not supported now" - .to_string(), - None.into(), - ) - .into()); + bail_not_implemented!("non-constant arguments other than the first one for DISTINCT aggregation is not supported now"); } // restrict ORDER BY to align with PG, which says: @@ -520,14 +493,11 @@ impl Binder { match exclusion { WindowFrameExclusion::CurrentRow => FrameExclusion::CurrentRow, WindowFrameExclusion::Group | WindowFrameExclusion::Ties => { - return Err(ErrorCode::NotImplemented( - format!( - "window frame exclusion `{}` is not supported yet", - exclusion - ), - 9124.into(), - ) - .into()); + bail_not_implemented!( + issue = 9124, + "window frame exclusion `{}` is not supported yet", + exclusion + ); } WindowFrameExclusion::NoOthers => FrameExclusion::NoOthers, } @@ -556,14 +526,11 @@ impl Binder { FrameBounds::Rows(start, end) } WindowFrameUnits::Range | WindowFrameUnits::Groups => { - return Err(ErrorCode::NotImplemented( - format!( - "window frame in `{}` mode is not supported yet", - frame.units - ), - 9124.into(), - ) - .into()); + bail_not_implemented!( + issue = 9124, + "window frame in `{}` mode is not supported yet", + frame.units + ); } }; if !bounds.is_valid() { @@ -974,10 +941,7 @@ impl Binder { .map_err(|_| no_match_err)?; let ExprImpl::Literal(literal) = &input else { - return Err(ErrorCode::NotImplemented( - "Only boolean literals are supported in `current_schemas`.".to_string(), None.into() - ) - .into()); + bail_not_implemented!("Only boolean literals are supported in `current_schemas`."); }; let Some(bool) = literal.get_data().as_ref().map(|bool| bool.clone().into_bool()) else { @@ -1279,7 +1243,7 @@ impl Binder { ) }; - ErrorCode::NotImplemented(err_msg, 112.into()).into() + not_implemented!(issue = 112, "{}", err_msg).into() }), } } diff --git a/src/frontend/src/binder/expr/mod.rs b/src/frontend/src/binder/expr/mod.rs index 3297fa5071350..c52c42dbc973d 100644 --- a/src/frontend/src/binder/expr/mod.rs +++ b/src/frontend/src/binder/expr/mod.rs @@ -17,6 +17,7 @@ use risingwave_common::catalog::{ColumnDesc, ColumnId}; use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::types::DataType; use risingwave_common::util::iter_util::zip_eq_fast; +use risingwave_common::{bail_not_implemented, not_implemented}; use risingwave_sqlparser::ast::{ Array, BinaryOperator, DataType as AstDataType, Expr, Function, JsonPredicateType, ObjectName, Query, StructField, TrimWhereField, UnaryOperator, @@ -168,11 +169,7 @@ impl Binder { } => self.bind_overlay(*expr, *new_substring, *start, count), Expr::Parameter { index } => self.bind_parameter(index), Expr::Collate { expr, collation } => self.bind_collate(*expr, collation), - _ => Err(ErrorCode::NotImplemented( - format!("unsupported expression {:?}", expr), - 112.into(), - ) - .into()), + _ => bail_not_implemented!(issue = 112, "unsupported expression {:?}", expr), } } @@ -184,12 +181,11 @@ impl Binder { vec![self.bind_string(field.clone())?.into(), arg], ) .map_err(|_| { - ErrorCode::NotImplemented( - format!( - "function extract({} from {:?}) doesn't exist", - field, arg_type - ), - 112.into(), + not_implemented!( + issue = 112, + "function extract({} from {:?}) doesn't exist", + field, + arg_type ) })? .into()) @@ -291,13 +287,7 @@ impl Binder { } UnaryOperator::PGSquareRoot => ExprType::Sqrt, UnaryOperator::PGCubeRoot => ExprType::Cbrt, - _ => { - return Err(ErrorCode::NotImplemented( - format!("unsupported unary expression: {:?}", op), - 112.into(), - ) - .into()) - } + _ => bail_not_implemented!(issue = 112, "unsupported unary expression: {:?}", op), }; let expr = self.bind_expr_inner(expr)?; FunctionCall::new(func_type, vec![expr]).map(|f| f.into()) @@ -539,11 +529,7 @@ impl Binder { pub fn bind_collate(&mut self, expr: Expr, collation: ObjectName) -> Result { if !["C", "POSIX"].contains(&collation.real_value().as_str()) { - return Err(ErrorCode::NotImplemented( - "Collate collation other than `C` or `POSIX` is not implemented".into(), - None.into(), - ) - .into()); + bail_not_implemented!("Collate collation other than `C` or `POSIX` is not implemented"); } let bound_inner = self.bind_expr_inner(expr)?; @@ -592,12 +578,7 @@ pub fn bind_struct_field(column_def: &StructField) -> Result { } pub fn bind_data_type(data_type: &AstDataType) -> Result { - let new_err = || { - ErrorCode::NotImplemented( - format!("unsupported data type: {:}", data_type), - None.into(), - ) - }; + let new_err = || not_implemented!("unsupported data type: {:}", data_type); let data_type = match data_type { AstDataType::Boolean => DataType::Boolean, AstDataType::SmallInt => DataType::Int16, @@ -615,11 +596,7 @@ pub fn bind_data_type(data_type: &AstDataType) -> Result { AstDataType::Interval => DataType::Interval, AstDataType::Array(datatype) => DataType::List(Box::new(bind_data_type(datatype)?)), AstDataType::Char(..) => { - return Err(ErrorCode::NotImplemented( - "CHAR is not supported, please use VARCHAR instead\n".to_string(), - None.into(), - ) - .into()) + bail_not_implemented!("CHAR is not supported, please use VARCHAR instead") } AstDataType::Struct(types) => DataType::new_struct( types diff --git a/src/frontend/src/binder/expr/value.rs b/src/frontend/src/binder/expr/value.rs index 54559266a136f..82c36f9c1973e 100644 --- a/src/frontend/src/binder/expr/value.rs +++ b/src/frontend/src/binder/expr/value.rs @@ -13,6 +13,7 @@ // limitations under the License. use itertools::Itertools; +use risingwave_common::bail_not_implemented; use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::types::{DataType, DateTimeField, Decimal, Interval, ScalarImpl}; use risingwave_sqlparser::ast::{DateTimeField as AstDateTimeField, Expr, Value}; @@ -38,7 +39,7 @@ impl Binder { last_field: None, fractional_seconds_precision: None, } => self.bind_interval(value, leading_field), - _ => Err(ErrorCode::NotImplemented(format!("value: {:?}", value), None.into()).into()), + _ => bail_not_implemented!("value: {:?}", value), } } diff --git a/src/frontend/src/binder/query.rs b/src/frontend/src/binder/query.rs index a3b78343c6041..349894a30bf9d 100644 --- a/src/frontend/src/binder/query.rs +++ b/src/frontend/src/binder/query.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use std::rc::Rc; +use risingwave_common::bail_not_implemented; use risingwave_common::catalog::Schema; use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::types::DataType; @@ -278,7 +279,7 @@ impl Binder { fn bind_with(&mut self, with: With) -> Result<()> { if with.recursive { - Err(ErrorCode::NotImplemented("recursive cte".into(), None.into()).into()) + bail_not_implemented!("recursive cte"); } else { for cte_table in with.cte_tables { let Cte { alias, query, .. } = cte_table; diff --git a/src/frontend/src/binder/relation/table_function.rs b/src/frontend/src/binder/relation/table_function.rs index 032791bfab30c..1d2c8491de657 100644 --- a/src/frontend/src/binder/relation/table_function.rs +++ b/src/frontend/src/binder/relation/table_function.rs @@ -15,6 +15,7 @@ use std::str::FromStr; use itertools::Itertools; +use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{ Field, Schema, PG_CATALOG_SCHEMA_NAME, RW_INTERNAL_TABLE_FUNCTION_NAME, }; @@ -49,14 +50,10 @@ impl Binder { { if func_name.eq_ignore_ascii_case(RW_INTERNAL_TABLE_FUNCTION_NAME) { if with_ordinality { - return Err(ErrorCode::NotImplemented( - format!( - "WITH ORDINALITY for internal/system table function {}", - func_name - ), - None.into(), - ) - .into()); + bail_not_implemented!( + "WITH ORDINALITY for internal/system table function {}", + func_name + ); } return self.bind_internal_table(args, alias); } @@ -66,14 +63,10 @@ impl Binder { ) { if with_ordinality { - return Err(ErrorCode::NotImplemented( - format!( - "WITH ORDINALITY for internal/system table function {}", - func_name - ), - None.into(), - ) - .into()); + bail_not_implemented!( + "WITH ORDINALITY for internal/system table function {}", + func_name + ); } return self.bind_relation_by_name_inner( Some(PG_CATALOG_SCHEMA_NAME), diff --git a/src/frontend/src/binder/relation/table_or_source.rs b/src/frontend/src/binder/relation/table_or_source.rs index cd2d2ef45efab..2302d3730360a 100644 --- a/src/frontend/src/binder/relation/table_or_source.rs +++ b/src/frontend/src/binder/relation/table_or_source.rs @@ -16,6 +16,7 @@ use std::ops::Deref; use std::sync::Arc; use itertools::Itertools; +use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{is_system_schema, Field}; use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::session_config::USER_NAME_WILD_CARD; @@ -100,19 +101,17 @@ impl Binder { { self.resolve_view_relation(&view_catalog.clone())? } else { - return Err(ErrorCode::NotImplemented( - format!( - r###"{}.{} is not supported, please use `SHOW` commands for now. + bail_not_implemented!( + issue = 1695, + r###"{}.{} is not supported, please use `SHOW` commands for now. `SHOW TABLES`, `SHOW MATERIALIZED VIEWS`, `DESCRIBE `, `SHOW COLUMNS FROM [table]` "###, - schema_name, table_name - ), - 1695.into(), - ) - .into()); + schema_name, + table_name + ); } } else if let Ok((table_catalog, schema_name)) = self.catalog diff --git a/src/frontend/src/binder/set_expr.rs b/src/frontend/src/binder/set_expr.rs index a91f6774b8c1d..91085a4084ee4 100644 --- a/src/frontend/src/binder/set_expr.rs +++ b/src/frontend/src/binder/set_expr.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_common::bail_not_implemented; use risingwave_common::catalog::Schema; use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::util::iter_util::ZipEqFast; @@ -198,11 +199,7 @@ impl Binder { match op { SetOperator::Union => {} SetOperator::Intersect | SetOperator::Except => { - return Err(ErrorCode::NotImplemented( - format!("{} all", op), - None.into(), - ) - .into()) + bail_not_implemented!("{} all", op); } } } diff --git a/src/frontend/src/binder/statement.rs b/src/frontend/src/binder/statement.rs index 027f78e7705a9..4eef5e7f7dd41 100644 --- a/src/frontend/src/binder/statement.rs +++ b/src/frontend/src/binder/statement.rs @@ -12,8 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_common::bail_not_implemented; use risingwave_common::catalog::Field; -use risingwave_common::error::{ErrorCode, Result}; +use risingwave_common::error::Result; use risingwave_sqlparser::ast::Statement; use super::delete::BoundDelete; @@ -82,11 +83,7 @@ impl Binder { Statement::Query(q) => Ok(BoundStatement::Query(self.bind_query(*q)?.into())), - _ => Err(ErrorCode::NotImplemented( - format!("unsupported statement {:?}", stmt), - None.into(), - ) - .into()), + _ => bail_not_implemented!("unsupported statement {:?}", stmt), } } } diff --git a/src/frontend/src/binder/update.rs b/src/frontend/src/binder/update.rs index aabe2a5bd43ca..e2ea54eb77e77 100644 --- a/src/frontend/src/binder/update.rs +++ b/src/frontend/src/binder/update.rs @@ -17,6 +17,7 @@ use std::collections::{BTreeMap, HashMap}; use fixedbitset::FixedBitSet; use itertools::Itertools; +use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{Schema, TableVersionId}; use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::util::iter_util::ZipEqFast; @@ -135,11 +136,7 @@ impl Binder { // (col1, col2) = (subquery) (_ids, AssignmentValue::Expr(Expr::Subquery(_))) => { - return Err(ErrorCode::NotImplemented( - "subquery on the right side of multi-assignment".to_owned(), - None.into(), - ) - .into()) + bail_not_implemented!("subquery on the right side of multi-assignment"); } // (col1, col2) = (expr1, expr2) // TODO: support `DEFAULT` in multiple assignments diff --git a/src/frontend/src/binder/values.rs b/src/frontend/src/binder/values.rs index 5ce2527adebb7..d1286c1c01375 100644 --- a/src/frontend/src/binder/values.rs +++ b/src/frontend/src/binder/values.rs @@ -13,6 +13,7 @@ // limitations under the License. use itertools::Itertools; +use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::types::DataType; @@ -142,14 +143,10 @@ impl Binder { .flatten() .any(|expr| expr.has_subquery()) { - return Err(ErrorCode::NotImplemented("Subquery in VALUES".into(), None.into()).into()); + bail_not_implemented!("Subquery in VALUES"); } if bound_values.is_correlated(1) { - return Err(ErrorCode::NotImplemented( - "CorrelatedInputRef in VALUES".into(), - None.into(), - ) - .into()); + bail_not_implemented!("CorrelatedInputRef in VALUES"); } Ok(bound_values) } diff --git a/src/frontend/src/expr/type_inference/func.rs b/src/frontend/src/expr/type_inference/func.rs index d760ee039499a..b58935e810176 100644 --- a/src/frontend/src/expr/type_inference/func.rs +++ b/src/frontend/src/expr/type_inference/func.rs @@ -14,6 +14,7 @@ use itertools::Itertools as _; use num_integer::Integer as _; +use risingwave_common::bail_not_implemented; use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::types::{DataType, StructType}; use risingwave_common::util::iter_util::ZipEqFast; @@ -629,15 +630,12 @@ fn infer_type_name<'a>( let mut candidates = top_matches(&candidates, inputs); if candidates.is_empty() { - return Err(ErrorCode::NotImplemented( - format!( - "{:?}{:?}", - func_type, - inputs.iter().map(TypeDebug).collect_vec() - ), - 112.into(), - ) - .into()); + bail_not_implemented!( + issue = 112, + "{:?}{:?}", + func_type, + inputs.iter().map(TypeDebug).collect_vec() + ); } // After this line `candidates` will never be empty, as the narrow rules will retain original diff --git a/src/frontend/src/expr/window_function.rs b/src/frontend/src/expr/window_function.rs index 371a00dc6b62a..d82cf4fb788d0 100644 --- a/src/frontend/src/expr/window_function.rs +++ b/src/frontend/src/expr/window_function.rs @@ -13,6 +13,7 @@ // limitations under the License. use itertools::Itertools; +use risingwave_common::bail_not_implemented; use risingwave_common::error::{ErrorCode, RwError}; use risingwave_common::types::DataType; use risingwave_expr::window_function::{Frame, WindowFuncKind}; @@ -72,21 +73,16 @@ impl WindowFunction { .into()); } if !offset.is_const() { - return Err(ErrorCode::NotImplemented( - format!("non-const `offset` of `{kind}` function is not supported yet"), - None.into(), - ) - .into()); + bail_not_implemented!( + "non-const `offset` of `{kind}` function is not supported yet" + ); } Ok(value.return_type()) } (Lag | Lead, [_value, _offset, _default]) => { - Err(RwError::from(ErrorCode::NotImplemented( - format!( - "`{kind}` window function with `default` argument is not supported yet" - ), - None.into(), - ))) + bail_not_implemented!( + "`{kind}` window function with `default` argument is not supported yet" + ); } (Aggregate(agg_kind), args) => { diff --git a/src/frontend/src/handler/alter_source_column.rs b/src/frontend/src/handler/alter_source_column.rs index 95e9c3bf7e597..d24beefdc56f7 100644 --- a/src/frontend/src/handler/alter_source_column.rs +++ b/src/frontend/src/handler/alter_source_column.rs @@ -14,6 +14,7 @@ use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; +use risingwave_common::bail_not_implemented; use risingwave_common::catalog::ColumnId; use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_connector::source::{extract_source_struct, SourceEncode, SourceStruct}; @@ -62,16 +63,10 @@ pub async fn handle_alter_source_column( let SourceStruct { encode, .. } = extract_source_struct(&catalog.info)?; match encode { SourceEncode::Avro | SourceEncode::Protobuf => { - return Err(RwError::from(ErrorCode::NotImplemented( - "Alter source with schema registry".into(), - None.into(), - ))); + bail_not_implemented!("Alter source with schema registry") } SourceEncode::Json if catalog.info.use_schema_registry => { - return Err(RwError::from(ErrorCode::NotImplemented( - "Alter source with schema registry".into(), - None.into(), - ))); + bail_not_implemented!("Alter source with schema registry") } SourceEncode::Invalid | SourceEncode::Native => { return Err(RwError::from(ErrorCode::NotSupported( diff --git a/src/frontend/src/handler/alter_system.rs b/src/frontend/src/handler/alter_system.rs index 91f8cff23c0d4..b040474d98c49 100644 --- a/src/frontend/src/handler/alter_system.rs +++ b/src/frontend/src/handler/alter_system.rs @@ -14,8 +14,9 @@ use pgwire::pg_response::StatementType; use risingwave_common::error::Result; -use risingwave_sqlparser::ast::{Ident, SetVariableValue, Value}; +use risingwave_sqlparser::ast::{Ident, SetVariableValue}; +use super::variable::set_var_to_param_str; use super::{HandlerArgs, RwPgResponse}; // Warn user if barrier_interval_ms is set above 5mins. @@ -28,12 +29,7 @@ pub async fn handle_alter_system( param: Ident, value: SetVariableValue, ) -> Result { - let value = match value { - SetVariableValue::Literal(Value::DoubleQuotedString(s)) - | SetVariableValue::Literal(Value::SingleQuotedString(s)) => Some(s), - SetVariableValue::Default => None, - _ => Some(value.to_string()), - }; + let value = set_var_to_param_str(&value); let params = handler_args .session .env() diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index 2d51bcc48f8cc..649fade132e0d 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use anyhow::Context; use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; +use risingwave_common::bail_not_implemented; use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_pb::catalog::table::OptionalAssociatedSourceId; @@ -97,10 +98,7 @@ pub async fn handle_alter_table_column( if let Some(source_schema) = &source_schema { if schema_has_schema_registry(source_schema) { - return Err(RwError::from(ErrorCode::NotImplemented( - "Alter table with source having schema registry".into(), - None.into(), - ))); + bail_not_implemented!("Alter table with source having schema registry"); } } @@ -140,10 +138,7 @@ pub async fn handle_alter_table_column( cascade, } => { if cascade { - Err(ErrorCode::NotImplemented( - "drop column cascade".to_owned(), - 6903.into(), - ))? + bail_not_implemented!(issue = 6903, "drop column cascade"); } // Locate the column by name and remove it. diff --git a/src/frontend/src/handler/create_function.rs b/src/frontend/src/handler/create_function.rs index 9d9db08204e49..67f617ed5546d 100644 --- a/src/frontend/src/handler/create_function.rs +++ b/src/frontend/src/handler/create_function.rs @@ -39,18 +39,10 @@ pub async fn handle_create_function( params: CreateFunctionBody, ) -> Result { if or_replace { - return Err(ErrorCode::NotImplemented( - "CREATE OR REPLACE FUNCTION".to_string(), - None.into(), - ) - .into()); + bail_not_implemented!("CREATE OR REPLACE FUNCTION"); } if temporary { - return Err(ErrorCode::NotImplemented( - "CREATE TEMPORARY FUNCTION".to_string(), - None.into(), - ) - .into()); + bail_not_implemented!("CREATE TEMPORARY FUNCTION"); } let language = match params.language { Some(lang) => { diff --git a/src/frontend/src/handler/create_mv.rs b/src/frontend/src/handler/create_mv.rs index f42bb1d9bb284..bea68d1bf4d8d 100644 --- a/src/frontend/src/handler/create_mv.rs +++ b/src/frontend/src/handler/create_mv.rs @@ -20,6 +20,7 @@ use risingwave_common::error::ErrorCode::ProtocolError; use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_pb::catalog::{CreateType, PbTable}; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; +use risingwave_pb::stream_plan::StreamScanType; use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query}; use super::privilege::resolve_relation_privileges; @@ -164,7 +165,7 @@ pub async fn handle_create_mv( return Ok(resp); } - let (mut table, graph) = { + let (mut table, graph, can_run_in_background) = { let context = OptimizerContext::from_handler_args(handler_args); if !context.with_options().is_empty() { // get other useful fields by `remove`, the logic here is to reject unknown options. @@ -183,6 +184,20 @@ It only indicates the physical clustering of the data, which may improve the per let (plan, table) = gen_create_mv_plan(&session, context.into(), query, name, columns, emit_mode)?; + // All leaf nodes must be stream table scan, no other scan operators support recovery. + fn plan_has_backfill_leaf_nodes(plan: &PlanRef) -> bool { + if plan.inputs().is_empty() { + if let Some(scan) = plan.as_stream_table_scan() { + scan.stream_scan_type() == StreamScanType::Backfill + } else { + false + } + } else { + assert!(!plan.inputs().is_empty()); + plan.inputs().iter().all(plan_has_backfill_leaf_nodes) + } + } + let can_run_in_background = plan_has_backfill_leaf_nodes(&plan); let context = plan.plan_base().ctx().clone(); let mut graph = build_graph(plan); graph.parallelism = @@ -196,7 +211,7 @@ It only indicates the physical clustering of the data, which may improve the per let env = graph.env.as_mut().unwrap(); env.timezone = context.get_session_timezone(); - (table, graph) + (table, graph, can_run_in_background) }; // Ensure writes to `StreamJobTracker` are atomic. @@ -212,7 +227,7 @@ It only indicates the physical clustering of the data, which may improve the per )); let run_in_background = session.config().background_ddl(); - let create_type = if run_in_background { + let create_type = if run_in_background && can_run_in_background { CreateType::Background } else { CreateType::Foreground diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 6eceaa5825266..5bfa3240e7f6b 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -20,6 +20,7 @@ use either::Either; use fixedbitset::FixedBitSet; use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; +use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{ CdcTableDesc, ColumnCatalog, ColumnDesc, TableId, TableVersionId, DEFAULT_SCHEMA_NAME, INITIAL_SOURCE_VERSION_ID, INITIAL_TABLE_VERSION_ID, USER_COLUMN_ID_OFFSET, @@ -137,13 +138,7 @@ fn ensure_column_options_supported(c: &ColumnDef) -> Result<()> { ColumnOption::GeneratedColumns(_) => {} ColumnOption::DefaultColumns(_) => {} ColumnOption::Unique { is_primary: true } => {} - _ => { - return Err(ErrorCode::NotImplemented( - format!("column constraints \"{}\"", option_def), - None.into(), - ) - .into()) - } + _ => bail_not_implemented!("column constraints \"{}\"", option_def), } } Ok(()) @@ -177,11 +172,9 @@ pub fn bind_sql_columns(column_defs: &[ColumnDef]) -> Result> // // But we don't support real collation, we simply ignore it here. if !["C", "POSIX"].contains(&collation.real_value().as_str()) { - return Err(ErrorCode::NotImplemented( - "Collate collation other than `C` or `POSIX` is not implemented".into(), - None.into(), - ) - .into()); + bail_not_implemented!( + "Collate collation other than `C` or `POSIX` is not implemented" + ); } match data_type { @@ -359,13 +352,7 @@ pub fn ensure_table_constraints_supported(table_constraints: &[TableConstraint]) columns: _, is_primary: true, } => {} - _ => { - return Err(ErrorCode::NotImplemented( - format!("table constraint \"{}\"", constraint), - None.into(), - ) - .into()) - } + _ => bail_not_implemented!("table constraint \"{}\"", constraint), } } Ok(()) diff --git a/src/frontend/src/handler/drop_function.rs b/src/frontend/src/handler/drop_function.rs index 56ac4d787a716..684f1a368260b 100644 --- a/src/frontend/src/handler/drop_function.rs +++ b/src/frontend/src/handler/drop_function.rs @@ -27,11 +27,7 @@ pub async fn handle_drop_function( _option: Option, ) -> Result { if func_desc.len() != 1 { - return Err(ErrorCode::NotImplemented( - "only support dropping 1 function".to_string(), - None.into(), - ) - .into()); + bail_not_implemented!("only support dropping 1 function"); } let func_desc = func_desc.remove(0); diff --git a/src/frontend/src/handler/drop_schema.rs b/src/frontend/src/handler/drop_schema.rs index c0c71b97d2de8..90b1aa4ba3a53 100644 --- a/src/frontend/src/handler/drop_schema.rs +++ b/src/frontend/src/handler/drop_schema.rs @@ -13,6 +13,7 @@ // limitations under the License. use pgwire::pg_response::{PgResponse, StatementType}; +use risingwave_common::bail_not_implemented; use risingwave_common::catalog::is_system_schema; use risingwave_common::error::{ErrorCode, Result}; use risingwave_sqlparser::ast::{DropMode, ObjectName}; @@ -81,11 +82,7 @@ pub async fn handle_drop_schema( } } Some(DropMode::Cascade) => { - return Err(ErrorCode::NotImplemented( - "drop schema with cascade mode".to_string(), - 6773.into(), - ) - .into()) + bail_not_implemented!(issue = 6773, "drop schema with cascade mode"); } }; diff --git a/src/frontend/src/handler/explain.rs b/src/frontend/src/handler/explain.rs index 78441c774f383..6bec02b7bd4db 100644 --- a/src/frontend/src/handler/explain.rs +++ b/src/frontend/src/handler/explain.rs @@ -16,6 +16,7 @@ use itertools::Itertools; use pgwire::pg_field_descriptor::PgFieldDescriptor; use pgwire::pg_response::{PgResponse, StatementType}; use pgwire::types::Row; +use risingwave_common::bail_not_implemented; use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::types::DataType; use risingwave_sqlparser::ast::{ExplainOptions, ExplainType, Statement}; @@ -138,13 +139,7 @@ async fn do_handle_explain( gen_batch_plan_by_statement(&session, context.clone(), stmt).map(|x| x.plan) } - _ => { - return Err(ErrorCode::NotImplemented( - format!("unsupported statement {:?}", stmt), - None.into(), - ) - .into()) - } + _ => bail_not_implemented!("unsupported statement {:?}", stmt), }; (plan, context) @@ -221,7 +216,7 @@ pub async fn handle_explain( analyze: bool, ) -> Result { if analyze { - return Err(ErrorCode::NotImplemented("explain analyze".to_string(), 4856.into()).into()); + bail_not_implemented!(issue = 4856, "explain analyze"); } let context = OptimizerContext::new(handler_args.clone(), options.clone()); diff --git a/src/frontend/src/handler/extended_handle.rs b/src/frontend/src/handler/extended_handle.rs index d6f22984f404e..47fefc5700b3d 100644 --- a/src/frontend/src/handler/extended_handle.rs +++ b/src/frontend/src/handler/extended_handle.rs @@ -18,7 +18,8 @@ use std::sync::Arc; use bytes::Bytes; use pgwire::types::Format; -use risingwave_common::error::{ErrorCode, Result}; +use risingwave_common::bail_not_implemented; +use risingwave_common::error::Result; use risingwave_common::types::DataType; use risingwave_sqlparser::ast::{CreateSink, Query, Statement}; @@ -97,11 +98,7 @@ pub fn handle_parse( } Statement::CreateView { query, .. } => { if have_parameter_in_query(query) { - return Err(ErrorCode::NotImplemented( - "CREATE VIEW with parameters".to_string(), - None.into(), - ) - .into()); + bail_not_implemented!("CREATE VIEW with parameters"); } Ok(PrepareStatement::PureStatement(statement)) } @@ -109,11 +106,7 @@ pub fn handle_parse( if let Some(query) = query && have_parameter_in_query(query) { - Err(ErrorCode::NotImplemented( - "CREATE TABLE AS SELECT with parameters".to_string(), - None.into(), - ) - .into()) + bail_not_implemented!("CREATE TABLE AS SELECT with parameters"); } else { Ok(PrepareStatement::PureStatement(statement)) } @@ -122,11 +115,7 @@ pub fn handle_parse( if let CreateSink::AsQuery(query) = &stmt.sink_from && have_parameter_in_query(query) { - Err(ErrorCode::NotImplemented( - "CREATE SINK AS SELECT with parameters".to_string(), - None.into(), - ) - .into()) + bail_not_implemented!("CREATE SINK AS SELECT with parameters"); } else { Ok(PrepareStatement::PureStatement(statement)) } diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index c682fa6d9133c..24cfb7c177e16 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -22,6 +22,7 @@ use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, S use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult}; use pgwire::pg_server::BoxedError; use pgwire::types::{Format, Row}; +use risingwave_common::bail_not_implemented; use risingwave_common::error::{ErrorCode, Result}; use risingwave_sqlparser::ast::*; @@ -230,18 +231,10 @@ pub async fn handle( cdc_table_info, } => { if or_replace { - return Err(ErrorCode::NotImplemented( - "CREATE OR REPLACE TABLE".to_string(), - None.into(), - ) - .into()); + bail_not_implemented!("CREATE OR REPLACE TABLE"); } if temporary { - return Err(ErrorCode::NotImplemented( - "CREATE TEMPORARY TABLE".to_string(), - None.into(), - ) - .into()); + bail_not_implemented!("CREATE TEMPORARY TABLE"); } if let Some(query) = query { return create_table_as::handle_create_as( @@ -313,11 +306,7 @@ pub async fn handle( | ObjectType::Database | ObjectType::User | ObjectType::Connection => { - return Err(ErrorCode::NotImplemented( - "DROP CASCADE".to_string(), - None.into(), - ) - .into()); + bail_not_implemented!("DROP CASCADE"); } }; }; @@ -397,11 +386,7 @@ pub async fn handle( emit_mode, } => { if or_replace { - return Err(ErrorCode::NotImplemented( - "CREATE OR REPLACE VIEW".to_string(), - None.into(), - ) - .into()); + bail_not_implemented!("CREATE OR REPLACE VIEW"); } if materialized { create_mv::handle_create_mv( @@ -437,9 +422,7 @@ pub async fn handle( if_not_exists, } => { if unique { - return Err( - ErrorCode::NotImplemented("create unique index".into(), None.into()).into(), - ); + bail_not_implemented!("create unique index"); } create_index::handle_create_index( @@ -704,8 +687,6 @@ pub async fn handle( object_name, comment, } => comment::handle_comment(handler_args, object_type, object_name, comment).await, - _ => Err( - ErrorCode::NotImplemented(format!("Unhandled statement: {}", stmt), None.into()).into(), - ), + _ => bail_not_implemented!("Unhandled statement: {}", stmt), } } diff --git a/src/frontend/src/handler/show.rs b/src/frontend/src/handler/show.rs index 3c6016a6a610e..42ba3b6ad968d 100644 --- a/src/frontend/src/handler/show.rs +++ b/src/frontend/src/handler/show.rs @@ -20,8 +20,9 @@ use pgwire::pg_protocol::truncated_fmt; use pgwire::pg_response::{PgResponse, StatementType}; use pgwire::pg_server::Session; use pgwire::types::Row; +use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, DEFAULT_SCHEMA_NAME}; -use risingwave_common::error::{ErrorCode, Result}; +use risingwave_common::error::Result; use risingwave_common::types::DataType; use risingwave_common::util::addr::HostAddr; use risingwave_connector::source::kafka::PRIVATELINK_CONNECTION; @@ -116,11 +117,7 @@ pub async fn handle_show_object( let session = handler_args.session; if let Some(ShowStatementFilter::Where(..)) = filter { - return Err(ErrorCode::NotImplemented( - "WHERE clause in SHOW statement".to_string(), - None.into(), - ) - .into()); + bail_not_implemented!("WHERE clause in SHOW statement"); } let row_desc = infer_show_object(&command); @@ -412,11 +409,7 @@ pub fn handle_show_create_object( index.create_sql() } ShowCreateType::Function => { - return Err(ErrorCode::NotImplemented( - format!("show create on: {}", show_create_type), - None.into(), - ) - .into()); + bail_not_implemented!("show create on: {}", show_create_type); } }; let name = format!("{}.{}", schema_name, object_name); diff --git a/src/frontend/src/handler/transaction.rs b/src/frontend/src/handler/transaction.rs index d17b1f4183ce4..ba1f8dc7845c4 100644 --- a/src/frontend/src/handler/transaction.rs +++ b/src/frontend/src/handler/transaction.rs @@ -13,7 +13,8 @@ // limitations under the License. use pgwire::pg_response::StatementType; -use risingwave_common::error::{ErrorCode, Result}; +use risingwave_common::bail_not_implemented; +use risingwave_common::error::Result; use risingwave_sqlparser::ast::{TransactionAccessMode, TransactionMode, Value}; use super::{HandlerArgs, RwPgResponse}; @@ -21,7 +22,7 @@ use crate::session::transaction::AccessMode; macro_rules! not_impl { ($body:expr) => { - Err(ErrorCode::NotImplemented($body.into(), 10736.into())) + bail_not_implemented!(issue = 10376, "{}", $body) }; } @@ -40,7 +41,7 @@ pub async fn handle_begin( TransactionMode::AccessMode(mode) => { let _ = access_mode.replace(mode); } - TransactionMode::IsolationLevel(_) => not_impl!("ISOLATION LEVEL")?, + TransactionMode::IsolationLevel(_) => not_impl!("ISOLATION LEVEL"), } } @@ -74,7 +75,7 @@ pub async fn handle_commit( let HandlerArgs { session, .. } = handler_args; if chain { - not_impl!("COMMIT AND CHAIN")?; + not_impl!("COMMIT AND CHAIN"); } session.txn_commit_explicit(); @@ -91,7 +92,7 @@ pub async fn handle_rollback( let HandlerArgs { session, .. } = handler_args; if chain { - not_impl!("ROLLBACK AND CHAIN")?; + not_impl!("ROLLBACK AND CHAIN"); } session.txn_rollback_explicit(); diff --git a/src/frontend/src/handler/variable.rs b/src/frontend/src/handler/variable.rs index 112107e725318..d7c8695040a2d 100644 --- a/src/frontend/src/handler/variable.rs +++ b/src/frontend/src/handler/variable.rs @@ -26,15 +26,16 @@ use super::RwPgResponse; use crate::handler::HandlerArgs; use crate::utils::infer_stmt_row_desc::infer_show_variable; -fn set_var_to_guc_str(value: &SetVariableValue) -> String { +/// convert `SetVariableValue` to string while remove the quotes on literals. +pub(crate) fn set_var_to_param_str(value: &SetVariableValue) -> Option { match value { - SetVariableValue::Literal(Value::DoubleQuotedString(s)) - | SetVariableValue::Literal(Value::SingleQuotedString(s)) => s.clone(), - SetVariableValue::List(list) => list - .iter() - .map(set_var_to_guc_str) - .join(SESSION_CONFIG_LIST_SEP), - _ => value.to_string(), + SetVariableValue::Single(var) => Some(var.to_string_unquoted()), + SetVariableValue::List(list) => Some( + list.iter() + .map(|var| var.to_string_unquoted()) + .join(SESSION_CONFIG_LIST_SEP), + ), + SetVariableValue::Default => None, } } @@ -44,7 +45,9 @@ pub fn handle_set( value: SetVariableValue, ) -> Result { // Strip double and single quotes - let string_val = set_var_to_guc_str(&value); + let string_val = set_var_to_param_str(&value).ok_or(ErrorCode::InternalError( + "SET TO DEFAULT is not supported yet".to_string(), + ))?; let mut status = ParameterStatus::default(); diff --git a/src/frontend/src/optimizer/plan_node/logical_agg.rs b/src/frontend/src/optimizer/plan_node/logical_agg.rs index 3d648d00e5c13..d58becfedd46f 100644 --- a/src/frontend/src/optimizer/plan_node/logical_agg.rs +++ b/src/frontend/src/optimizer/plan_node/logical_agg.rs @@ -17,6 +17,7 @@ use itertools::Itertools; use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::types::{DataType, Datum, ScalarImpl}; use risingwave_common::util::sort_util::ColumnOrder; +use risingwave_common::{bail_not_implemented, not_implemented}; use risingwave_expr::aggregate::{agg_kinds, AggKind}; use super::generic::{self, Agg, GenericPlanRef, PlanAggCall, ProjectBuilder}; @@ -297,12 +298,7 @@ impl LogicalAggBuilder { set.into_iter() .map(|expr| input_proj_builder.add_expr(&expr)) .try_collect() - .map_err(|err| { - ErrorCode::NotImplemented( - format!("{err} inside GROUP BY"), - None.into(), - ) - }) + .map_err(|err| not_implemented!("{err} inside GROUP BY")) }) .try_collect()?; @@ -322,9 +318,7 @@ impl LogicalAggBuilder { .into_iter() .map(|expr| input_proj_builder.add_expr(&expr)) .try_collect() - .map_err(|err| { - ErrorCode::NotImplemented(format!("{err} inside GROUP BY"), None.into()) - })?; + .map_err(|err| not_implemented!("{err} inside GROUP BY"))?; (group_key, vec![]) } GroupBy::GroupingSets(grouping_sets) => gen_group_key_and_grouping_sets(grouping_sets)?, @@ -477,9 +471,7 @@ impl LogicalAggBuilder { Ok(InputRef::new(index, expr.return_type())) }) .try_collect() - .map_err(|err: &'static str| { - ErrorCode::NotImplemented(format!("{err} inside aggregation calls"), None.into()) - })?; + .map_err(|err: &'static str| not_implemented!("{err} inside aggregation calls"))?; let order_by: Vec<_> = order_by .sort_exprs @@ -490,10 +482,7 @@ impl LogicalAggBuilder { }) .try_collect() .map_err(|err: &'static str| { - ErrorCode::NotImplemented( - format!("{err} inside aggregation calls order by"), - None.into(), - ) + not_implemented!("{err} inside aggregation calls order by") })?; match agg_kind { @@ -786,10 +775,13 @@ impl ExprRewriter for LogicalAggBuilder { fn rewrite_subquery(&mut self, subquery: crate::expr::Subquery) -> ExprImpl { if subquery.is_correlated(0) { - self.error = Some(ErrorCode::NotImplemented( - "correlated subquery in HAVING or SELECT with agg".into(), - 2275.into(), - )); + self.error = Some( + not_implemented!( + issue = 2275, + "correlated subquery in HAVING or SELECT with agg", + ) + .into(), + ); } subquery.into() } @@ -1146,11 +1138,7 @@ impl ToStream for LogicalAgg { for agg_call in self.agg_calls() { if matches!(agg_call.agg_kind, agg_kinds::unimplemented_in_stream!()) { - return Err(ErrorCode::NotImplemented( - format!("{} aggregation in materialized view", agg_call.agg_kind), - None.into(), - ) - .into()); + bail_not_implemented!("{} aggregation in materialized view", agg_call.agg_kind); } } let eowc = ctx.emit_on_window_close(); diff --git a/src/frontend/src/optimizer/plan_node/logical_over_window.rs b/src/frontend/src/optimizer/plan_node/logical_over_window.rs index f865efc70e44a..440070923eb1d 100644 --- a/src/frontend/src/optimizer/plan_node/logical_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/logical_over_window.rs @@ -17,6 +17,7 @@ use itertools::Itertools; use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::types::{DataType, Datum, ScalarImpl}; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; +use risingwave_common::{bail_not_implemented, not_implemented}; use risingwave_expr::aggregate::AggKind; use risingwave_expr::window_function::{Frame, FrameBound, WindowFuncKind}; @@ -303,23 +304,23 @@ impl<'a> OverWindowProjectBuilder<'a> { FunctionCall::new(ExprType::Multiply, vec![input.clone(), input.clone()]).unwrap(), ); self.builder.add_expr(&squared_input_expr).map_err(|err| { - ErrorCode::NotImplemented(format!("{err} inside args"), None.into()) + not_implemented!("{err} inside args") })?; } for arg in &window_function.args { - self.builder.add_expr(arg).map_err(|err| { - ErrorCode::NotImplemented(format!("{err} inside args"), None.into()) - })?; + self.builder + .add_expr(arg) + .map_err(|err| not_implemented!("{err} inside args"))?; } for partition_by in &window_function.partition_by { - self.builder.add_expr(partition_by).map_err(|err| { - ErrorCode::NotImplemented(format!("{err} inside partition_by"), None.into()) - })?; + self.builder + .add_expr(partition_by) + .map_err(|err| not_implemented!("{err} inside partition_by"))?; } for order_by in window_function.order_by.sort_exprs.iter().map(|e| &e.expr) { - self.builder.add_expr(order_by).map_err(|err| { - ErrorCode::NotImplemented(format!("{err} inside order_by"), None.into()) - })?; + self.builder + .add_expr(order_by) + .map_err(|err| not_implemented!("{err} inside order_by"))?; } Ok(()) } @@ -355,9 +356,7 @@ impl LogicalOverWindow { for (idx, field) in input.schema().fields().iter().enumerate() { input_proj_builder .add_expr(&InputRef::new(idx, field.data_type()).into()) - .map_err(|err| { - ErrorCode::NotImplemented(format!("{err} inside input"), None.into()) - })?; + .map_err(|err| not_implemented!("{err} inside input"))?; } let mut build_input_proj_visitor = OverWindowProjectBuilder::new(&mut input_proj_builder); for expr in select_exprs { @@ -453,11 +452,9 @@ impl LogicalOverWindow { if const_offset.is_none() { // should already be checked in `WindowFunction::infer_return_type`, // but just in case - return Err(ErrorCode::NotImplemented( - "non-const `offset` of `lag`/`lead` is not supported yet".to_string(), - None.into(), - ) - .into()); + bail_not_implemented!( + "non-const `offset` of `lag`/`lead` is not supported yet" + ); } const_offset.unwrap()?.map(|v| *v.as_int64()).unwrap_or(1) } else { @@ -722,11 +719,7 @@ impl ToBatch for LogicalOverWindow { .map(|e| e.index()) .collect_vec(); if partition_key_indices.is_empty() { - return Err(ErrorCode::NotImplemented( - "Window function with empty PARTITION BY is not supported yet".to_string(), - None.into(), - ) - .into()); + bail_not_implemented!("Window function with empty PARTITION BY is not supported yet"); } let input = self.input().to_batch()?; @@ -779,11 +772,9 @@ impl ToStream for LogicalOverWindow { .map(|e| e.index()) .collect_vec(); if partition_key_indices.is_empty() { - return Err(ErrorCode::NotImplemented( - "Window function with empty PARTITION BY is not supported yet".to_string(), - None.into(), - ) - .into()); + bail_not_implemented!( + "Window function with empty PARTITION BY is not supported yet" + ); } let sort_input = @@ -812,11 +803,9 @@ impl ToStream for LogicalOverWindow { .map(|e| e.index()) .collect_vec(); if partition_key_indices.is_empty() { - return Err(ErrorCode::NotImplemented( - "Window function with empty PARTITION BY is not supported yet".to_string(), - None.into(), - ) - .into()); + bail_not_implemented!( + "Window function with empty PARTITION BY is not supported yet" + ); } let new_input = diff --git a/src/frontend/src/optimizer/plan_node/logical_share.rs b/src/frontend/src/optimizer/plan_node/logical_share.rs index 5b736b5367004..eb8061bb0b0a5 100644 --- a/src/frontend/src/optimizer/plan_node/logical_share.rs +++ b/src/frontend/src/optimizer/plan_node/logical_share.rs @@ -15,7 +15,7 @@ use std::cell::RefCell; use pretty_xmlish::{Pretty, XmlNode}; -use risingwave_common::error::ErrorCode::NotImplemented; +use risingwave_common::bail_not_implemented; use risingwave_common::error::Result; use super::utils::{childless_record, Distill}; @@ -131,11 +131,7 @@ impl PredicatePushdown for LogicalShare { impl ToBatch for LogicalShare { fn to_batch(&self) -> Result { - Err(NotImplemented( - "batch query doesn't support share operator for now".into(), - None.into(), - ) - .into()) + bail_not_implemented!("batch query doesn't support share operator for now"); } } diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index 1cf7aa691a381..74d0a50ffcf77 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -20,10 +20,11 @@ use std::rc::Rc; use fixedbitset::FixedBitSet; use itertools::Itertools; use pretty_xmlish::{Pretty, XmlNode}; +use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{ ColumnCatalog, ColumnDesc, Field, Schema, KAFKA_TIMESTAMP_COLUMN_NAME, }; -use risingwave_common::error::{ErrorCode, Result, RwError, TrackingIssue}; +use risingwave_common::error::Result; use risingwave_connector::source::{ConnectorProperties, DataType}; use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; use risingwave_pb::plan_common::GeneratedColumnDesc; @@ -546,10 +547,7 @@ impl ToBatch for LogicalSource { &self.core.catalog.as_ref().unwrap().properties, ) { - return Err(RwError::from(ErrorCode::NotImplemented( - "New S3 connector for batch".to_string(), - TrackingIssue::from(None), - ))); + bail_not_implemented!("New S3 connector for batch"); } let source = self.wrap_with_optional_generated_columns_batch_proj()?; Ok(source) diff --git a/src/frontend/src/optimizer/plan_node/logical_sys_scan.rs b/src/frontend/src/optimizer/plan_node/logical_sys_scan.rs index f2d8be84e275c..4a5c3ec6a936c 100644 --- a/src/frontend/src/optimizer/plan_node/logical_sys_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_sys_scan.rs @@ -16,8 +16,9 @@ use std::rc::Rc; use itertools::Itertools; use pretty_xmlish::{Pretty, XmlNode}; +use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{ColumnDesc, TableDesc}; -use risingwave_common::error::{ErrorCode, Result, RwError}; +use risingwave_common::error::Result; use super::generic::{GenericPlanNode, GenericPlanRef}; use super::utils::{childless_record, Distill}; @@ -367,19 +368,13 @@ impl ToBatch for LogicalSysScan { impl ToStream for LogicalSysScan { fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result { - Err(RwError::from(ErrorCode::NotImplemented( - "streaming on system table is not allowed".to_string(), - None.into(), - ))) + bail_not_implemented!("streaming on system table"); } fn logical_rewrite_for_stream( &self, _ctx: &mut RewriteStreamContext, ) -> Result<(PlanRef, ColIndexMapping)> { - Err(RwError::from(ErrorCode::NotImplemented( - "streaming on system table is not allowed".to_string(), - None.into(), - ))) + bail_not_implemented!("streaming on system table"); } } diff --git a/src/frontend/src/optimizer/plan_node/logical_topn.rs b/src/frontend/src/optimizer/plan_node/logical_topn.rs index 3db5c8c65a273..f487928a0adca 100644 --- a/src/frontend/src/optimizer/plan_node/logical_topn.rs +++ b/src/frontend/src/optimizer/plan_node/logical_topn.rs @@ -14,6 +14,7 @@ use fixedbitset::FixedBitSet; use itertools::Itertools; +use risingwave_common::bail_not_implemented; use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::util::sort_util::ColumnOrder; @@ -71,11 +72,7 @@ impl LogicalTopN { group_key: Vec, ) -> Result { if with_ties && offset > 0 { - return Err(ErrorCode::NotImplemented( - "WITH TIES is not supported with OFFSET".to_string(), - None.into(), - ) - .into()); + bail_not_implemented!("WITH TIES is not supported with OFFSET"); } Ok(Self::new(input, limit, offset, with_ties, order, group_key).into()) } @@ -121,10 +118,7 @@ impl LogicalTopN { Distribution::Single | Distribution::SomeShard => { self.gen_single_stream_top_n_plan(stream_input) } - Distribution::Broadcast => Err(RwError::from(ErrorCode::NotImplemented( - "topN does not support Broadcast".to_string(), - None.into(), - ))), + Distribution::Broadcast => bail_not_implemented!("topN does not support Broadcast"), Distribution::HashShard(dists) | Distribution::UpstreamHashShard(dists, _) => { self.gen_vnode_two_phase_stream_top_n_plan(stream_input, &dists) } diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index 7ff5806eceb08..20a682bf7fb69 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -15,6 +15,7 @@ use std::rc::Rc; use itertools::Itertools; +use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::types::{DataType, Interval, ScalarImpl}; @@ -91,11 +92,7 @@ impl Planner { let join_type = join.join_type; let on_clause = join.cond; if on_clause.has_subquery() { - Err(ErrorCode::NotImplemented( - "Subquery in join on condition is unsupported".into(), - None.into(), - ) - .into()) + bail_not_implemented!("Subquery in join on condition"); } else { Ok(LogicalJoin::create(left, right, join_type, on_clause)) } @@ -105,11 +102,7 @@ impl Planner { let join_type = join.join_type; let on_clause = join.cond; if on_clause.has_subquery() { - return Err(ErrorCode::NotImplemented( - "Subquery in join on condition is unsupported".into(), - None.into(), - ) - .into()); + bail_not_implemented!("Subquery in join on condition"); } let correlated_id = self.ctx.next_correlated_id(); diff --git a/src/frontend/src/planner/select.rs b/src/frontend/src/planner/select.rs index a6b578dddb0dc..fa0e08d4f0217 100644 --- a/src/frontend/src/planner/select.rs +++ b/src/frontend/src/planner/select.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use itertools::Itertools; +use risingwave_common::bail_not_implemented; use risingwave_common::catalog::Schema; use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::types::DataType; @@ -297,13 +298,7 @@ impl Planner { let right_expr = InputRef::new(input.schema().len(), output_column_type); FunctionCall::new(ExprType::Equal, vec![left_expr, right_expr.into()])?.into() } - kind => { - return Err(ErrorCode::NotImplemented( - format!("Not supported subquery kind: {:?}", kind), - 1343.into(), - ) - .into()) - } + kind => bail_not_implemented!(issue = 1343, "Not supported subquery kind: {:?}", kind), }; *input = Self::create_apply( correlated_id, @@ -378,13 +373,7 @@ impl Planner { SubqueryKind::Existential => { right = self.create_exists(right)?; } - _ => { - return Err(ErrorCode::NotImplemented( - format!("{:?}", subquery.kind), - 1343.into(), - ) - .into()) - } + _ => bail_not_implemented!(issue = 1343, "{:?}", subquery.kind), } root = Self::create_apply( diff --git a/src/frontend/src/utils/condition.rs b/src/frontend/src/utils/condition.rs index d078e06bcea6c..105d45dbc5153 100644 --- a/src/frontend/src/utils/condition.rs +++ b/src/frontend/src/utils/condition.rs @@ -622,23 +622,7 @@ impl Condition { op, ) { Ok(ResultForCmp::Success(expr)) => expr, - Ok(ResultForCmp::OutUpperBound) => { - if op == ExprType::GreaterThan || op == ExprType::GreaterThanOrEqual { - return Ok(None); - } - // op == < and <= means result is always true, don't need any extra - // work. - continue; - } - Ok(ResultForCmp::OutLowerBound) => { - if op == ExprType::LessThan || op == ExprType::LessThanOrEqual { - return Ok(None); - } - // op == > and >= means result is always true, don't need any extra - // work. - continue; - } - Err(_) => { + _ => { other_conds.push(expr); continue; } diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 570813a7ab53a..22983b3a89824 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -33,6 +33,7 @@ use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::update_mutation::MergeUpdate; use risingwave_pb::stream_plan::{ DispatchStrategy, Dispatcher, DispatcherType, FragmentTypeFlag, StreamActor, StreamNode, + StreamScanType, }; use tokio::sync::{RwLock, RwLockReadGuard}; @@ -177,13 +178,14 @@ impl FragmentManager { let map = &self.core.read().await.table_fragments; let mut table_map = HashMap::new(); // TODO(kwannoel): Can this be unified with `PlanVisitor`? - fn has_stream_scan(stream_node: &StreamNode) -> bool { - let is_node_scan = if let Some(node) = &stream_node.node_body { - node.is_stream_scan() + fn has_backfill(stream_node: &StreamNode) -> bool { + let is_backfill = if let Some(node) = &stream_node.node_body + && let Some(node) = node.as_stream_scan() { + node.stream_scan_type == StreamScanType::Backfill as i32 } else { false }; - is_node_scan || stream_node.get_input().iter().any(has_stream_scan) + is_backfill || stream_node.get_input().iter().any(has_backfill) } for table_id in table_ids { if let Some(table_fragment) = map.get(table_id) { @@ -191,7 +193,7 @@ impl FragmentManager { for fragment in table_fragment.fragments.values() { for actor in &fragment.actors { if let Some(node) = &actor.nodes - && has_stream_scan(node) + && has_backfill(node) { actors.insert(actor.actor_id); } else { diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index a4867ba9d5ae8..a57a6a9175ebd 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -2652,24 +2652,51 @@ impl fmt::Display for CreateFunctionUsing { #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum SetVariableValue { - Ident(Ident), - Literal(Value), - List(Vec), + Single(SetVariableValueSingle), + List(Vec), Default, } +impl From for SetVariableValue { + fn from(value: SetVariableValueSingle) -> Self { + SetVariableValue::Single(value) + } +} + impl fmt::Display for SetVariableValue { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { use SetVariableValue::*; + match self { + Single(val) => write!(f, "{}", val), + List(list) => write!(f, "{}", display_comma_separated(list),), + Default => write!(f, "DEFAULT"), + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub enum SetVariableValueSingle { + Ident(Ident), + Literal(Value), +} + +impl SetVariableValueSingle { + pub fn to_string_unquoted(&self) -> String { + match self { + Self::Literal(Value::SingleQuotedString(s)) + | Self::Literal(Value::DoubleQuotedString(s)) => s.clone(), + _ => self.to_string(), + } + } +} + +impl fmt::Display for SetVariableValueSingle { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + use SetVariableValueSingle::*; match self { Ident(ident) => write!(f, "{}", ident), Literal(literal) => write!(f, "{}", literal), - List(list) => write!( - f, - "{}", - list.iter().map(|value| value.to_string()).join(", ") - ), - Default => write!(f, "DEFAULT"), } } } diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 439aed4a18e7f..4db8b3ebdfff6 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -3228,16 +3228,22 @@ impl Parser { loop { let token = self.peek_token(); let value = match (self.parse_value(), token.token) { - (Ok(value), _) => SetVariableValue::Literal(value), + (Ok(value), _) => SetVariableValueSingle::Literal(value), (Err(_), Token::Word(w)) => { if w.keyword == Keyword::DEFAULT { - SetVariableValue::Default + if !values.is_empty() { + self.expected( + "parameter list value", + Token::Word(w).with_location(token.location), + )? + } + return Ok(SetVariableValue::Default); } else { - SetVariableValue::Ident(w.to_ident()?) + SetVariableValueSingle::Ident(w.to_ident()?) } } (Err(_), unexpected) => { - self.expected("variable value", unexpected.with_location(token.location))? + self.expected("parameter value", unexpected.with_location(token.location))? } }; values.push(value); @@ -3246,7 +3252,7 @@ impl Parser { } } if values.len() == 1 { - Ok(values[0].clone()) + Ok(SetVariableValue::Single(values[0].clone())) } else { Ok(SetVariableValue::List(values)) } diff --git a/src/sqlparser/tests/sqlparser_postgres.rs b/src/sqlparser/tests/sqlparser_postgres.rs index 94eb2d53fbfa5..2d97834ad23b5 100644 --- a/src/sqlparser/tests/sqlparser_postgres.rs +++ b/src/sqlparser/tests/sqlparser_postgres.rs @@ -392,7 +392,7 @@ fn parse_set() { Statement::SetVariable { local: false, variable: "a".into(), - value: SetVariableValue::Ident("b".into()), + value: SetVariableValueSingle::Ident("b".into()).into(), } ); @@ -402,7 +402,7 @@ fn parse_set() { Statement::SetVariable { local: false, variable: "a".into(), - value: SetVariableValue::Literal(Value::SingleQuotedString("b".into())), + value: SetVariableValueSingle::Literal(Value::SingleQuotedString("b".into())).into(), } ); @@ -412,7 +412,7 @@ fn parse_set() { Statement::SetVariable { local: false, variable: "a".into(), - value: SetVariableValue::Literal(number("0")), + value: SetVariableValueSingle::Literal(number("0")).into(), } ); @@ -432,7 +432,7 @@ fn parse_set() { Statement::SetVariable { local: true, variable: "a".into(), - value: SetVariableValue::Ident("b".into()), + value: SetVariableValueSingle::Ident("b".into()).into(), } ); @@ -441,7 +441,7 @@ fn parse_set() { for (sql, err_msg) in [ ("SET", "Expected identifier, found: EOF"), ("SET a b", "Expected equals sign or TO, found: b"), - ("SET a =", "Expected variable value, found: EOF"), + ("SET a =", "Expected parameter value, found: EOF"), ] { let res = parse_sql_statements(sql); assert!(format!("{}", res.unwrap_err()).contains(err_msg)); diff --git a/src/sqlparser/tests/testdata/set.yaml b/src/sqlparser/tests/testdata/set.yaml index 309ffc5213aee..947bbea7056c9 100644 --- a/src/sqlparser/tests/testdata/set.yaml +++ b/src/sqlparser/tests/testdata/set.yaml @@ -13,3 +13,13 @@ formatted_sql: SET TIME ZONE UTC - input: set time = '1'; formatted_sql: SET time = '1' +- input: set search_path to 'default', 'my_path'; + formatted_sql: SET search_path = 'default', 'my_path' +- input: set search_path to default, 'my_path'; + error_msg: |- + sql parser error: Expected end of statement, found: , at line:1, column:28 + Near "set search_path to default" +- input: set search_path to 'my_path', default; + error_msg: |- + sql parser error: Expected parameter list value, found: default at line:1, column:36 + Near "set search_path to 'my_path', default" diff --git a/src/stream/src/executor/error.rs b/src/stream/src/executor/error.rs index fc2f02f0aede0..b4efcac1702f4 100644 --- a/src/stream/src/executor/error.rs +++ b/src/stream/src/executor/error.rs @@ -15,7 +15,7 @@ use std::backtrace::Backtrace; use risingwave_common::array::ArrayError; -use risingwave_common::error::{BoxedError, Error, TrackingIssue}; +use risingwave_common::error::{BoxedError, Error, NotImplemented}; use risingwave_common::util::value_encoding::error::ValueEncodingError; use risingwave_connector::error::ConnectorError; use risingwave_connector::sink::SinkError; @@ -109,8 +109,8 @@ enum ErrorKind { BoxedError, ), - #[error("Feature is not yet implemented: {0}, {1}")] - NotImplemented(String, TrackingIssue), + #[error(transparent)] + NotImplemented(#[from] NotImplemented), #[error(transparent)] Internal( @@ -137,10 +137,6 @@ impl StreamExecutorError { ErrorKind::ConnectorError(error.into()).into() } - pub fn not_implemented(error: impl Into, issue: impl Into) -> Self { - ErrorKind::NotImplemented(error.into(), issue.into()).into() - } - pub fn dml_error(error: impl Error) -> Self { ErrorKind::DmlError(error.into()).into() } diff --git a/src/tests/simulation/Cargo.toml b/src/tests/simulation/Cargo.toml index 72eeca9837bf0..7783cfbcf0aba 100644 --- a/src/tests/simulation/Cargo.toml +++ b/src/tests/simulation/Cargo.toml @@ -18,6 +18,7 @@ cfg-or-panic = "0.2" clap = { version = "4", features = ["derive"] } console = "0.15" etcd-client = { workspace = true } +expect-test = "1" fail = { version = "0.5" } futures = { version = "0.3", default-features = false, features = ["alloc"] } glob = "0.3" @@ -29,6 +30,7 @@ pin-project = "1.1" pretty_assertions = "1" prometheus = { version = "0.13" } rand = "0.8" +rand_chacha = { version = "0.3.1" } rdkafka = { workspace = true } risingwave_common = { workspace = true } risingwave_compactor = { workspace = true } diff --git a/src/tests/simulation/src/lib.rs b/src/tests/simulation/src/lib.rs index 6cf880d7d66fb..5afd68c4f97c8 100644 --- a/src/tests/simulation/src/lib.rs +++ b/src/tests/simulation/src/lib.rs @@ -15,6 +15,8 @@ #![feature(trait_alias)] #![feature(lint_reasons)] #![feature(lazy_cell)] +#![feature(let_chains)] +#![feature(try_blocks)] pub mod client; pub mod cluster; diff --git a/src/tests/simulation/src/main.rs b/src/tests/simulation/src/main.rs index a3aa4ca056415..c0c996dc05e5d 100644 --- a/src/tests/simulation/src/main.rs +++ b/src/tests/simulation/src/main.rs @@ -139,6 +139,11 @@ pub struct Args { #[arg(short, long)] e2e_extended_test: bool, + + /// Background ddl + /// The probability of background ddl for a ddl query. + #[clap(long, default_value = "0.0")] + background_ddl_rate: f64, } #[tokio::main] @@ -245,7 +250,7 @@ async fn main() { if let Some(jobs) = args.jobs { run_parallel_slt_task(glob, jobs).await.unwrap(); } else { - run_slt_task(cluster0, glob, &kill_opts).await; + run_slt_task(cluster0, glob, &kill_opts, args.background_ddl_rate).await; } }) .await; diff --git a/src/tests/simulation/src/slt.rs b/src/tests/simulation/src/slt.rs index 9df28c1ee24ff..b38b382cb47a0 100644 --- a/src/tests/simulation/src/slt.rs +++ b/src/tests/simulation/src/slt.rs @@ -16,8 +16,11 @@ use std::path::Path; use std::sync::Arc; use std::time::Duration; -use rand::{thread_rng, Rng}; -use sqllogictest::ParallelTestError; +use anyhow::{bail, Result}; +use itertools::Itertools; +use rand::{thread_rng, Rng, SeedableRng}; +use rand_chacha::ChaChaRng; +use sqllogictest::{ParallelTestError, Record}; use crate::client::RisingWave; use crate::cluster::{Cluster, KillOpts}; @@ -29,9 +32,20 @@ fn is_create_table_as(sql: &str) -> bool { parts.len() >= 4 && parts[0] == "create" && parts[1] == "table" && parts[3] == "as" } -#[derive(PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq)] enum SqlCmd { - Create { is_create_table_as: bool }, + /// Other create statements. + Create { + is_create_table_as: bool, + }, + /// Create Materialized views + CreateMaterializedView { + name: String, + }, + /// Set background ddl + SetBackgroundDdl { + enable: bool, + }, Drop, Dml, Flush, @@ -59,16 +73,54 @@ impl SqlCmd { } fn extract_sql_command(sql: &str) -> SqlCmd { - let cmd = sql - .trim_start() - .split_once(' ') - .unwrap_or_default() - .0 - .to_lowercase(); - match cmd.as_str() { - "create" => SqlCmd::Create { - is_create_table_as: is_create_table_as(sql), - }, + let sql = sql.to_lowercase(); + let tokens = sql.split_whitespace(); + let mut tokens = tokens.multipeek(); + let first_token = tokens.next().unwrap_or(""); + + match first_token { + // NOTE(kwannoel): + // It's entirely possible for a malformed command to be parsed as `SqlCmd::Create`. + // BUT an error should be expected for such a test. + // So we don't need to handle this case. + // Eventually if there are too many edge cases, we can opt to use our parser. + "create" => { + let result: Option = try { + match tokens.next()? { + "materialized" => { + // view + tokens.next()?; + + // if not exists | name + let next = *tokens.peek()?; + if "if" == next + && let Some("not") = tokens.peek().cloned() + && let Some("exists") = tokens.peek().cloned() { + tokens.next(); + tokens.next(); + tokens.next(); + let name = tokens.next()?.to_string(); + SqlCmd::CreateMaterializedView { name } + } else { + let name = next.to_string(); + SqlCmd::CreateMaterializedView { name } + } + } + _ => SqlCmd::Create { + is_create_table_as: is_create_table_as(&sql), + }, + } + }; + result.unwrap_or(SqlCmd::Others) + } + "set" => { + if sql.contains("background_ddl") { + let enable = sql.contains("true"); + SqlCmd::SetBackgroundDdl { enable } + } else { + SqlCmd::Others + } + } "drop" => SqlCmd::Drop, "insert" | "update" | "delete" => SqlCmd::Dml, "flush" => SqlCmd::Flush, @@ -90,8 +142,46 @@ const KILL_IGNORE_FILES: &[&str] = &[ "transaction/tolerance.slt", ]; +/// Wait for background mv to finish creating +async fn wait_background_mv_finished(mview_name: &str) -> Result<()> { + let Ok(rw) = RisingWave::connect("frontend".into(), "dev".into()).await else { + bail!("failed to connect to frontend for {mview_name}"); + }; + let client = rw.pg_client(); + if client.simple_query("WAIT;").await.is_err() { + bail!("failed to wait for background mv to finish creating for {mview_name}"); + } + + let Ok(result) = client + .query( + "select count(*) from pg_matviews where matviewname=$1;", + &[&mview_name], + ) + .await + else { + bail!("failed to query pg_matviews for {mview_name}"); + }; + + match result[0].try_get::<_, i64>(0) { + Ok(1) => Ok(()), + r => bail!("expected 1 row in pg_matviews, got {r:#?} instead for {mview_name}"), + } +} + /// Run the sqllogictest files in `glob`. -pub async fn run_slt_task(cluster: Arc, glob: &str, opts: &KillOpts) { +pub async fn run_slt_task( + cluster: Arc, + glob: &str, + opts: &KillOpts, + // Probability of background_ddl being set to true per ddl record. + background_ddl_rate: f64, +) { + tracing::info!("background_ddl_rate: {}", background_ddl_rate); + let seed = std::env::var("MADSIM_TEST_SEED") + .unwrap_or("0".to_string()) + .parse::() + .unwrap(); + let mut rng = ChaChaRng::seed_from_u64(seed); let kill = opts.kill_compute || opts.kill_meta || opts.kill_frontend || opts.kill_compactor; let files = glob::glob(glob).expect("failed to read glob pattern"); for file in files { @@ -109,6 +199,10 @@ pub async fn run_slt_task(cluster: Arc, glob: &str, opts: &KillOpts) { let tempfile = (path.ends_with("kafka.slt") || path.ends_with("kafka_batch.slt")) .then(|| hack_kafka_test(path)); let path = tempfile.as_ref().map(|p| p.path()).unwrap_or(path); + + // NOTE(kwannoel): For background ddl + let mut background_ddl_enabled = false; + for record in sqllogictest::parse_file(path).expect("failed to parse file") { // uncomment to print metrics for task counts // let metrics = madsim::runtime::Handle::current().metrics(); @@ -138,6 +232,32 @@ pub async fn run_slt_task(cluster: Arc, glob: &str, opts: &KillOpts) { | sqllogictest::Record::Query { sql, .. } => extract_sql_command(sql), _ => SqlCmd::Others, }; + tracing::debug!(?cmd, "Running"); + + if matches!(cmd, SqlCmd::SetBackgroundDdl { .. }) && background_ddl_rate > 0.0 { + panic!("We cannot run background_ddl statement with background_ddl_rate > 0.0, since it could be reset"); + } + + // For each background ddl compatible statement, provide a chance for background_ddl=true. + if let Record::Statement { + loc, + conditions, + connection, + .. + } = &record + && matches!(cmd, SqlCmd::CreateMaterializedView { .. }) { + let background_ddl_setting = rng.gen_bool(background_ddl_rate); + let set_background_ddl = Record::Statement { + loc: loc.clone(), + conditions: conditions.clone(), + connection: connection.clone(), + expected_error: None, + sql: format!("SET BACKGROUND_DDL={background_ddl_setting};"), + expected_count: None, + }; + tester.run_async(set_background_ddl).await.unwrap(); + background_ddl_enabled = background_ddl_setting; + }; if cmd.ignore_kill() { for i in 0usize.. { @@ -179,9 +299,15 @@ pub async fn run_slt_task(cluster: Arc, glob: &str, opts: &KillOpts) { } else { None }; + // retry up to 5 times until it succeed + let max_retry = 5; for i in 0usize.. { + tracing::debug!(iteration = i, "retry count"); let delay = Duration::from_secs(1 << i); + if i > 0 { + tokio::time::sleep(delay).await; + } match tester .run_async(record.clone()) .timed(|_res, elapsed| { @@ -189,34 +315,93 @@ pub async fn run_slt_task(cluster: Arc, glob: &str, opts: &KillOpts) { }) .await { - Ok(_) => break, - // allow 'table exists' error when retry CREATE statement - Err(e) - if matches!( - cmd, - SqlCmd::Create { - is_create_table_as: false + Ok(_) => { + // For background ddl + if let SqlCmd::CreateMaterializedView { ref name } = cmd && background_ddl_enabled + && matches!(record, Record::Statement { expected_error: None, .. } | Record::Query { expected_error: None, ..}) + { + tracing::debug!(iteration=i, "Retry for background ddl"); + match wait_background_mv_finished(name).await { + Ok(_) => { + tracing::debug!(iteration=i, "Record with background_ddl {:?} finished", record); + break; + } + Err(err) => { + tracing::error!(iteration=i, ?err, "failed to wait for background mv to finish creating"); + if i >= max_retry { + panic!("failed to run test after retry {i} times, error={err:#?}"); + } + continue; + } } - ) && i != 0 - && e.to_string().contains("exists") - && e.to_string().contains("Catalog error") => - { - break + } + break; } - // allow 'not found' error when retry DROP statement - Err(e) - if cmd == SqlCmd::Drop - && i != 0 - && e.to_string().contains("not found") - && e.to_string().contains("Catalog error") => - { - break + Err(e) => { + match cmd { + // allow 'table exists' error when retry CREATE statement + SqlCmd::Create { + is_create_table_as: false, + } + | SqlCmd::CreateMaterializedView { .. } + if i != 0 + && e.to_string().contains("exists") + && e.to_string().contains("Catalog error") => + { + break + } + // allow 'not found' error when retry DROP statement + SqlCmd::Drop + if i != 0 + && e.to_string().contains("not found") + && e.to_string().contains("Catalog error") => + { + break + } + + // Keep i >= max_retry for other errors. Since these errors indicate that the MV might not yet be created. + _ if i >= max_retry => { + panic!("failed to run test after retry {i} times: {e}") + } + SqlCmd::CreateMaterializedView { ref name } + if i != 0 + && e.to_string().contains("table is in creating procedure") + && background_ddl_enabled => + { + tracing::debug!(iteration = i, name, "Retry for background ddl"); + match wait_background_mv_finished(name).await { + Ok(_) => { + tracing::debug!( + iteration = i, + "Record with background_ddl {:?} finished", + record + ); + break; + } + Err(err) => { + tracing::error!( + iteration = i, + ?err, + "failed to wait for background mv to finish creating" + ); + if i >= max_retry { + panic!("failed to run test after retry {i} times, error={err:#?}"); + } + continue; + } + } + } + _ => tracing::error!( + iteration = i, + "failed to run test: {e}\nretry after {delay:?}" + ), + } } - Err(e) if i >= 5 => panic!("failed to run test after retry {i} times: {e}"), - Err(e) => tracing::error!("failed to run test: {e}\nretry after {delay:?}"), } - tokio::time::sleep(delay).await; } + if let SqlCmd::SetBackgroundDdl { enable } = cmd { + background_ddl_enabled = enable; + }; if let Some(handle) = handle { handle.await.unwrap(); } @@ -278,7 +463,17 @@ fn hack_kafka_test(path: &Path) -> tempfile::NamedTempFile { #[cfg(test)] mod tests { + use std::fmt::Debug; + + use expect_test::{expect, Expect}; + use super::*; + + fn check(actual: impl Debug, expect: Expect) { + let actual = format!("{:#?}", actual); + expect.assert_eq(&actual); + } + #[test] fn test_is_create_table_as() { assert!(is_create_table_as(" create table xx as select 1;")); @@ -287,4 +482,50 @@ mod tests { )); assert!(!is_create_table_as(" create view xx as select 1;")); } + + #[test] + fn test_extract_sql_command() { + check( + extract_sql_command("create table t as select 1;"), + expect![[r#" + Create { + is_create_table_as: true, + }"#]], + ); + check( + extract_sql_command(" create table t (a int);"), + expect![[r#" + Create { + is_create_table_as: false, + }"#]], + ); + check( + extract_sql_command(" create materialized view m_1 as select 1;"), + expect![[r#" + CreateMaterializedView { + name: "m_1", + }"#]], + ); + check( + extract_sql_command("set background_ddl= true;"), + expect![[r#" + SetBackgroundDdl { + enable: true, + }"#]], + ); + check( + extract_sql_command("SET BACKGROUND_DDL=true;"), + expect![[r#" + SetBackgroundDdl { + enable: true, + }"#]], + ); + check( + extract_sql_command("CREATE MATERIALIZED VIEW if not exists m_1 as select 1;"), + expect![[r#" + CreateMaterializedView { + name: "m_1", + }"#]], + ) + } }