diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 0523e534c5d8a..3ee3bea8bbabf 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -39,6 +39,7 @@ updates: google-cloud: patterns: - "google-cloud*" + # Don't update these directories - package-ecosystem: cargo directory: /integration_tests/feature-store @@ -46,3 +47,21 @@ updates: interval: "daily" ignore: - dependency-name: "*" + +- package-ecosystem: maven + directory: /java + schedule: + interval: "weekly" + open-pull-requests-limit: 5 + # Disable auto rebase to reduce cost. Use `@dependabot rebase` manually instead. + rebase-strategy: "disabled" + ignore: + # Do not bump Debezium because we have hacked its source code e.g. #18760 + - dependency-name: "io.debezium:*" + update-types: + ["version-update:semver-minor", "version-update:semver-major"] + groups: + # Group all dependenies together because Java libraries are quite stable + all: + patterns: + - "*" diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index e8a0fa32f1010..946f6c355efd4 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -367,6 +367,28 @@ steps: timeout_in_minutes: 120 retry: *auto-retry + - label: "end-to-end test (madsim, random vnode count)" + key: "e2e-test-deterministic-random-vnode-count" + command: "TEST_NUM=32 RW_SIM_RANDOM_VNODE_COUNT=true timeout 120m ci/scripts/deterministic-e2e-test.sh" + if: | + !(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null + || build.pull_request.labels includes "ci/run-e2e-test-deterministic-simulation" + || build.env("CI_STEPS") =~ /(^|,)e2e-tests?-deterministic-simulation(,|$$)/ + depends_on: "build-simulation" + plugins: + - seek-oss/aws-sm#v2.3.1: + env: + GITHUB_TOKEN: github-token + - docker-compose#v5.1.0: + run: rw-build-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + environment: + - GITHUB_TOKEN + - ./ci/plugins/upload-failure-logs + timeout_in_minutes: 120 + retry: *auto-retry + - label: "recovery test (madsim)" key: "recovery-test-deterministic" command: "TEST_NUM=12 KILL_RATE=1.0 BACKGROUND_DDL_RATE=0.0 timeout 65m ci/scripts/deterministic-recovery-test.sh" diff --git a/e2e_test/source_legacy/cdc_inline/postgres_create_drop.slt b/e2e_test/source_legacy/cdc_inline/postgres_create_drop.slt index f23ff824f249f..1a313a761b683 100644 --- a/e2e_test/source_legacy/cdc_inline/postgres_create_drop.slt +++ b/e2e_test/source_legacy/cdc_inline/postgres_create_drop.slt @@ -4,7 +4,7 @@ control substitution on system ok psql -c " DROP TABLE IF EXISTS tt1; - CREATE TABLE tt1 (v1 int primary key, v2 timestamp); + CREATE TABLE tt1 (v1 int primary key, v2 timestamptz); INSERT INTO tt1 VALUES (1, '2023-10-23 10:00:00');" statement ok diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java index 00a5f9fc03ad8..b74654b5f348e 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java @@ -292,7 +292,6 @@ private boolean isDataTypeCompatible(String mysqlDataType, Data.DataType.TypeNam && val <= Data.DataType.TypeName.INT64_VALUE; case "bigint": return val == Data.DataType.TypeName.INT64_VALUE; - case "float": case "real": return val == Data.DataType.TypeName.FLOAT_VALUE @@ -303,6 +302,12 @@ private boolean isDataTypeCompatible(String mysqlDataType, Data.DataType.TypeNam return val == Data.DataType.TypeName.DECIMAL_VALUE; case "varchar": return val == Data.DataType.TypeName.VARCHAR_VALUE; + case "date": + return val == Data.DataType.TypeName.DATE_VALUE; + case "time": + return val == Data.DataType.TypeName.TIME_VALUE; + case "datetime": + return val == Data.DataType.TypeName.TIMESTAMP_VALUE; case "timestamp": return val == Data.DataType.TypeName.TIMESTAMPTZ_VALUE; default: diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java index b3da549798f43..2199dd6168d62 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java @@ -679,7 +679,25 @@ private boolean isDataTypeCompatible(String pgDataType, Data.DataType.TypeName t || val == Data.DataType.TypeName.VARCHAR_VALUE; case "varchar": case "character varying": + case "uuid": + case "enum": return val == Data.DataType.TypeName.VARCHAR_VALUE; + case "bytea": + return val == Data.DataType.TypeName.BYTEA_VALUE; + case "date": + return val == Data.DataType.TypeName.DATE_VALUE; + case "time": + return val == Data.DataType.TypeName.TIME_VALUE; + case "timestamp": + case "timestamp without time zone": + return val == Data.DataType.TypeName.TIMESTAMP_VALUE; + case "timestamptz": + case "timestamp with time zone": + return val == Data.DataType.TypeName.TIMESTAMPTZ_VALUE; + case "interval": + return val == Data.DataType.TypeName.INTERVAL_VALUE; + case "jsonb": + return val == Data.DataType.TypeName.JSONB_VALUE; default: return true; // true for other uncovered types } diff --git a/proto/catalog.proto b/proto/catalog.proto index 6d54903cc2592..9771d31fba006 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -325,10 +325,10 @@ message Function { // The zstd-compressed binary of the function. optional bytes compressed_binary = 17; bool always_retry_on_network_error = 16; - // The runtime used when javascript is used as the language. Could be "quickjs" or "deno". + // The runtime selected when multiple runtimes are available for the language. Now is not used. optional string runtime = 18; - // The function type, which is used to execute the function. Could be "sync", "async", "generator" or "async_generator" - optional string function_type = 19; + reserved 19; + reserved "function_type"; oneof kind { ScalarFunction scalar = 11; diff --git a/proto/expr.proto b/proto/expr.proto index 5330843512849..c6d8e2082fa73 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -610,10 +610,10 @@ message UserDefinedFunction { // - If `language` is `rust` or `wasm`, the zstd-compressed wasm binary. optional bytes compressed_binary = 10; bool always_retry_on_network_error = 9; - // The runtime used when javascript is used as the language. Could be "quickjs" or "deno". + // The runtime selected when multiple runtimes are available for the language. Now is not used. optional string runtime = 11; - // The function type, which is used to execute the function. Could be "sync", "async", "generator" or "async_generator" - optional string function_type = 12; + reserved 12; + reserved "function_type"; } // Additional information for user defined table/aggregate functions. @@ -627,5 +627,6 @@ message UserDefinedFunctionMetadata { optional string body = 7; optional bytes compressed_binary = 10; optional string runtime = 11; - optional string function_type = 12; + reserved 12; + reserved "function_type"; } diff --git a/src/expr/core/src/aggregate/user_defined.rs b/src/expr/core/src/aggregate/user_defined.rs index 2f4fdc5f9f9c5..cba83d4f439ed 100644 --- a/src/expr/core/src/aggregate/user_defined.rs +++ b/src/expr/core/src/aggregate/user_defined.rs @@ -138,7 +138,6 @@ pub fn new_user_defined( arg_names: &udf.arg_names, return_type, always_retry_on_network_error: false, - function_type: udf.function_type.as_deref(), }) .context("failed to build UDF runtime")?; diff --git a/src/expr/core/src/expr/expr_udf.rs b/src/expr/core/src/expr/expr_udf.rs index 6ae27dabb2458..41c9257036e7d 100644 --- a/src/expr/core/src/expr/expr_udf.rs +++ b/src/expr/core/src/expr/expr_udf.rs @@ -185,7 +185,6 @@ impl Build for UserDefinedFunction { arg_names: &udf.arg_names, return_type: &return_type, always_retry_on_network_error: udf.always_retry_on_network_error, - function_type: udf.function_type.as_deref(), }) .context("failed to build UDF runtime")?; diff --git a/src/expr/core/src/sig/udf.rs b/src/expr/core/src/sig/udf.rs index 047879b9192b8..e8aa1c3efdf1f 100644 --- a/src/expr/core/src/sig/udf.rs +++ b/src/expr/core/src/sig/udf.rs @@ -105,7 +105,6 @@ pub struct UdfOptions<'a> { pub arg_names: &'a [String], pub return_type: &'a DataType, pub always_retry_on_network_error: bool, - pub function_type: Option<&'a str>, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumAsInner)] diff --git a/src/expr/core/src/table_function/user_defined.rs b/src/expr/core/src/table_function/user_defined.rs index b490e9b023af1..8263696669449 100644 --- a/src/expr/core/src/table_function/user_defined.rs +++ b/src/expr/core/src/table_function/user_defined.rs @@ -140,7 +140,6 @@ pub fn new_user_defined(prost: &PbTableFunction, chunk_size: usize) -> Result, pub return_type: DataType, pub language: String, + pub runtime: Option, pub identifier: Option, pub body: Option, pub link: Option, pub compressed_binary: Option>, pub always_retry_on_network_error: bool, - pub function_type: Option, - pub runtime: Option, } #[derive(Clone, Display, PartialEq, Eq, Hash, Debug, EnumAsInner)] @@ -71,13 +70,12 @@ impl From<&PbFunction> for FunctionCatalog { arg_types: prost.arg_types.iter().map(|arg| arg.into()).collect(), return_type: prost.return_type.as_ref().expect("no return type").into(), language: prost.language.clone(), + runtime: prost.runtime.clone(), identifier: prost.identifier.clone(), body: prost.body.clone(), link: prost.link.clone(), compressed_binary: prost.compressed_binary.clone(), always_retry_on_network_error: prost.always_retry_on_network_error, - function_type: prost.function_type.clone(), - runtime: prost.runtime.clone(), } } } @@ -89,12 +87,11 @@ impl From<&FunctionCatalog> for PbUserDefinedFunctionMetadata { arg_types: c.arg_types.iter().map(|t| t.to_protobuf()).collect(), return_type: Some(c.return_type.to_protobuf()), language: c.language.clone(), + runtime: c.runtime.clone(), link: c.link.clone(), identifier: c.identifier.clone(), body: c.body.clone(), compressed_binary: c.compressed_binary.clone(), - function_type: c.function_type.clone(), - runtime: c.runtime.clone(), } } } diff --git a/src/frontend/src/expr/user_defined_function.rs b/src/frontend/src/expr/user_defined_function.rs index 44abfa1859c42..084fe7387d766 100644 --- a/src/frontend/src/expr/user_defined_function.rs +++ b/src/frontend/src/expr/user_defined_function.rs @@ -47,21 +47,20 @@ impl UserDefinedFunction { let catalog = FunctionCatalog { // FIXME(yuhao): function id is not in udf proto. id: FunctionId::placeholder(), - name: udf.get_name().clone(), + name: udf.name.clone(), // FIXME(yuhao): owner is not in udf proto. owner: u32::MAX - 1, kind: FunctionKind::Scalar, arg_names: udf.arg_names.clone(), arg_types, return_type, - language: udf.get_language().clone(), + language: udf.language.clone(), + runtime: udf.runtime.clone(), identifier: udf.identifier.clone(), body: udf.body.clone(), link: udf.link.clone(), compressed_binary: udf.compressed_binary.clone(), always_retry_on_network_error: udf.always_retry_on_network_error, - function_type: udf.function_type.clone(), - runtime: udf.runtime.clone(), }; Ok(Self { @@ -93,13 +92,12 @@ impl Expr for UserDefinedFunction { .map(|t| t.to_protobuf()) .collect(), language: self.catalog.language.clone(), + runtime: self.catalog.runtime.clone(), identifier: self.catalog.identifier.clone(), link: self.catalog.link.clone(), body: self.catalog.body.clone(), compressed_binary: self.catalog.compressed_binary.clone(), always_retry_on_network_error: self.catalog.always_retry_on_network_error, - function_type: self.catalog.function_type.clone(), - runtime: self.catalog.runtime.clone(), })), } } diff --git a/src/frontend/src/handler/create_aggregate.rs b/src/frontend/src/handler/create_aggregate.rs index b9b8a391eaffe..fe7460ff09973 100644 --- a/src/frontend/src/handler/create_aggregate.rs +++ b/src/frontend/src/handler/create_aggregate.rs @@ -52,6 +52,16 @@ pub async fn handle_create_aggregate( None => return Err(ErrorCode::InvalidParameterValue("no language".into()).into()), }; + let runtime = match params.runtime { + Some(_) => { + return Err(ErrorCode::InvalidParameterValue( + "runtime selection is currently not supported".to_string(), + ) + .into()); + } + None => None, + }; + let return_type = bind_data_type(&returns)?; let mut arg_names = vec![]; @@ -94,13 +104,6 @@ pub async fn handle_create_aggregate( } _ => None, }; - let function_type = match params.function_type { - Some(CreateFunctionType::Sync) => Some("sync".to_string()), - Some(CreateFunctionType::Async) => Some("async".to_string()), - Some(CreateFunctionType::Generator) => Some("generator".to_string()), - Some(CreateFunctionType::AsyncGenerator) => Some("async_generator".to_string()), - None => None, - }; let create_fn = risingwave_expr::sig::find_udf_impl(&language, None, link)?.create_fn; let output = create_fn(CreateFunctionOptions { @@ -124,14 +127,13 @@ pub async fn handle_create_aggregate( arg_types: arg_types.into_iter().map(|t| t.into()).collect(), return_type: Some(return_type.into()), language, + runtime, identifier: Some(output.identifier), link: link.map(|s| s.to_string()), body: output.body, compressed_binary: output.compressed_binary, owner: session.user_id(), always_retry_on_network_error: false, - runtime: None, - function_type, }; let catalog_writer = session.catalog_writer()?; diff --git a/src/frontend/src/handler/create_function.rs b/src/frontend/src/handler/create_function.rs index ccd83a13ed81c..b81d2b4514edf 100644 --- a/src/frontend/src/handler/create_function.rs +++ b/src/frontend/src/handler/create_function.rs @@ -60,15 +60,11 @@ pub async fn handle_create_function( }; let runtime = match params.runtime { - Some(runtime) => { - if language == "javascript" { - Some(runtime.real_value()) - } else { - return Err(ErrorCode::InvalidParameterValue( - "runtime is only supported for javascript".to_string(), - ) - .into()); - } + Some(_) => { + return Err(ErrorCode::InvalidParameterValue( + "runtime selection is currently not supported".to_string(), + ) + .into()); } None => None, }; @@ -141,13 +137,6 @@ pub async fn handle_create_function( } _ => None, }; - let function_type = match params.function_type { - Some(CreateFunctionType::Sync) => Some("sync".to_string()), - Some(CreateFunctionType::Async) => Some("async".to_string()), - Some(CreateFunctionType::Generator) => Some("generator".to_string()), - Some(CreateFunctionType::AsyncGenerator) => Some("async_generator".to_string()), - None => None, - }; let create_fn = risingwave_expr::sig::find_udf_impl(&language, runtime.as_deref(), link)?.create_fn; @@ -176,6 +165,7 @@ pub async fn handle_create_function( arg_types: arg_types.into_iter().map(|t| t.into()).collect(), return_type: Some(return_type.into()), language, + runtime, identifier: Some(output.identifier), link: link.map(|s| s.to_string()), body: output.body, @@ -184,8 +174,6 @@ pub async fn handle_create_function( always_retry_on_network_error: with_options .always_retry_on_network_error .unwrap_or_default(), - runtime, - function_type, }; let catalog_writer = session.catalog_writer()?; diff --git a/src/frontend/src/handler/create_sql_function.rs b/src/frontend/src/handler/create_sql_function.rs index 9b5d34c34abe8..c733f603a3c44 100644 --- a/src/frontend/src/handler/create_sql_function.rs +++ b/src/frontend/src/handler/create_sql_function.rs @@ -336,14 +336,13 @@ pub async fn handle_create_sql_function( arg_types: arg_types.into_iter().map(|t| t.into()).collect(), return_type: Some(return_type.into()), language, + runtime: None, identifier: None, body: Some(body), compressed_binary: None, link: None, owner: session.user_id(), always_retry_on_network_error: false, - runtime: None, - function_type: None, }; let catalog_writer = session.catalog_writer()?; diff --git a/src/meta/model/migration/src/lib.rs b/src/meta/model/migration/src/lib.rs index 6cffc3413f3e2..ac9d7abcf19a7 100644 --- a/src/meta/model/migration/src/lib.rs +++ b/src/meta/model/migration/src/lib.rs @@ -24,7 +24,8 @@ mod m20240820_081248_add_time_travel_per_table_epoch; mod m20240911_083152_variable_vnode_count; mod m20241016_065621_hummock_gc_history; mod m20241025_062548_singleton_vnode_count; -mod m20241103_043732_connection_params; +mod m20241115_085007_remove_function_type; +mod m20241118_043732_connection_params; mod utils; pub struct Migrator; @@ -87,7 +88,8 @@ impl MigratorTrait for Migrator { Box::new(m20240911_083152_variable_vnode_count::Migration), Box::new(m20241016_065621_hummock_gc_history::Migration), Box::new(m20241025_062548_singleton_vnode_count::Migration), - Box::new(m20241103_043732_connection_params::Migration), + Box::new(m20241115_085007_remove_function_type::Migration), + Box::new(m20241118_043732_connection_params::Migration), ] } } diff --git a/src/meta/model/migration/src/m20241115_085007_remove_function_type.rs b/src/meta/model/migration/src/m20241115_085007_remove_function_type.rs new file mode 100644 index 0000000000000..b74382991c889 --- /dev/null +++ b/src/meta/model/migration/src/m20241115_085007_remove_function_type.rs @@ -0,0 +1,35 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Function::Table) + .drop_column(Function::FunctionType) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Function::Table) + .add_column(ColumnDef::new(Function::FunctionType).string()) + .to_owned(), + ) + .await + } +} + +#[derive(DeriveIden)] +enum Function { + Table, + FunctionType, +} diff --git a/src/meta/model/migration/src/m20241103_043732_connection_params.rs b/src/meta/model/migration/src/m20241118_043732_connection_params.rs similarity index 100% rename from src/meta/model/migration/src/m20241103_043732_connection_params.rs rename to src/meta/model/migration/src/m20241118_043732_connection_params.rs diff --git a/src/meta/model/src/function.rs b/src/meta/model/src/function.rs index 0fea52c6c3488..48e9812999d67 100644 --- a/src/meta/model/src/function.rs +++ b/src/meta/model/src/function.rs @@ -42,14 +42,13 @@ pub struct Model { pub arg_types: DataTypeArray, pub return_type: DataType, pub language: String, + pub runtime: Option, pub link: Option, pub identifier: Option, pub body: Option, pub compressed_binary: Option>, pub kind: FunctionKind, pub always_retry_on_network_error: bool, - pub runtime: Option, - pub function_type: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] @@ -101,14 +100,13 @@ impl From for ActiveModel { arg_types: Set(DataTypeArray::from(function.arg_types)), return_type: Set(DataType::from(&function.return_type.unwrap())), language: Set(function.language), + runtime: Set(function.runtime), link: Set(function.link), identifier: Set(function.identifier), body: Set(function.body), compressed_binary: Set(function.compressed_binary), kind: Set(function.kind.unwrap().into()), always_retry_on_network_error: Set(function.always_retry_on_network_error), - runtime: Set(function.runtime), - function_type: Set(function.function_type), } } } diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index 3f89c6e42ef63..138f7f6530cf9 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -360,14 +360,13 @@ impl From> for PbFunction { arg_types: value.0.arg_types.to_protobuf(), return_type: Some(value.0.return_type.to_protobuf()), language: value.0.language, + runtime: value.0.runtime, link: value.0.link, identifier: value.0.identifier, body: value.0.body, compressed_binary: value.0.compressed_binary, kind: Some(value.0.kind.into()), always_retry_on_network_error: value.0.always_retry_on_network_error, - runtime: value.0.runtime, - function_type: value.0.function_type, } } } diff --git a/src/meta/src/hummock/manager/compaction/mod.rs b/src/meta/src/hummock/manager/compaction/mod.rs index cf2c448f10021..6b8df20120528 100644 --- a/src/meta/src/hummock/manager/compaction/mod.rs +++ b/src/meta/src/hummock/manager/compaction/mod.rs @@ -643,6 +643,11 @@ impl HummockManager { self.env.notification_manager(), &self.metrics, ); + // Apply stats changes. + let mut version_stats = HummockVersionStatsTransaction::new( + &mut versioning.version_stats, + self.env.notification_manager(), + ); if deterministic_mode { version.disable_apply_to_txn(); @@ -808,6 +813,10 @@ impl HummockManager { .sorted_output_ssts .clone_from(&compact_task.input_ssts[0].table_infos); } + update_table_stats_for_vnode_watermark_trivial_reclaim( + &mut version_stats.table_stats, + &compact_task, + ); self.metrics .compact_frequency .with_label_values(&[ @@ -878,7 +887,8 @@ impl HummockManager { self.meta_store_ref(), compaction_statuses, compact_task_assignment, - version + version, + version_stats )?; self.metrics .compact_task_batch_count @@ -1674,3 +1684,34 @@ pub struct CompactionGroupStatistic { pub table_statistic: BTreeMap, pub compaction_group_config: CompactionGroup, } + +/// Updates table stats caused by vnode watermark trivial reclaim compaction. +fn update_table_stats_for_vnode_watermark_trivial_reclaim( + table_stats: &mut PbTableStatsMap, + task: &CompactTask, +) { + if task.task_type != TaskType::VnodeWatermark { + return; + } + let mut deleted_table_keys: HashMap = HashMap::default(); + for s in task.input_ssts.iter().flat_map(|l| l.table_infos.iter()) { + assert_eq!(s.table_ids.len(), 1); + let e = deleted_table_keys.entry(s.table_ids[0]).or_insert(0); + *e += s.total_key_count; + } + for (table_id, delete_count) in deleted_table_keys { + let Some(stats) = table_stats.get_mut(&table_id) else { + continue; + }; + if stats.total_key_count == 0 { + continue; + } + let new_total_key_count = stats.total_key_count.saturating_sub(delete_count as i64); + let ratio = new_total_key_count as f64 / stats.total_key_count as f64; + // total_key_count is updated accurately. + stats.total_key_count = new_total_key_count; + // others are updated approximately. + stats.total_key_size = (stats.total_key_size as f64 * ratio).ceil() as i64; + stats.total_value_size = (stats.total_value_size as f64 * ratio).ceil() as i64; + } +} diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index 829edde25e028..7b232ac6ce5c9 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -3098,8 +3098,6 @@ pub struct CreateFunctionBody { pub return_: Option, /// USING ... pub using: Option, - - pub function_type: Option, } impl fmt::Display for CreateFunctionBody { @@ -3122,9 +3120,6 @@ impl fmt::Display for CreateFunctionBody { if let Some(using) = &self.using { write!(f, " {using}")?; } - if let Some(function_type) = &self.function_type { - write!(f, " {function_type}")?; - } Ok(()) } } @@ -3197,26 +3192,6 @@ impl fmt::Display for CreateFunctionUsing { } } -#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)] -#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -pub enum CreateFunctionType { - Sync, - Async, - Generator, - AsyncGenerator, -} - -impl fmt::Display for CreateFunctionType { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - CreateFunctionType::Sync => write!(f, "SYNC"), - CreateFunctionType::Async => write!(f, "ASYNC"), - CreateFunctionType::Generator => write!(f, "SYNC GENERATOR"), - CreateFunctionType::AsyncGenerator => write!(f, "ASYNC GENERATOR"), - } - } -} - #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum SetVariableValue { @@ -3460,12 +3435,11 @@ mod tests { returns: Some(CreateFunctionReturns::Value(DataType::Int)), params: CreateFunctionBody { language: Some(Ident::new_unchecked("python")), + runtime: None, behavior: Some(FunctionBehavior::Immutable), as_: Some(FunctionDefinition::SingleQuotedDef("SELECT 1".to_string())), return_: None, using: None, - runtime: None, - function_type: None, }, with_options: CreateFunctionWithOptions { always_retry_on_network_error: None, @@ -3483,12 +3457,11 @@ mod tests { returns: Some(CreateFunctionReturns::Value(DataType::Int)), params: CreateFunctionBody { language: Some(Ident::new_unchecked("python")), + runtime: None, behavior: Some(FunctionBehavior::Immutable), as_: Some(FunctionDefinition::SingleQuotedDef("SELECT 1".to_string())), return_: None, using: None, - runtime: None, - function_type: None, }, with_options: CreateFunctionWithOptions { always_retry_on_network_error: Some(true), @@ -3498,29 +3471,5 @@ mod tests { "CREATE FUNCTION foo(INT) RETURNS INT LANGUAGE python IMMUTABLE AS 'SELECT 1' WITH ( ALWAYS_RETRY_NETWORK_ERRORS = true )", format!("{}", create_function) ); - - let create_function = Statement::CreateFunction { - temporary: false, - or_replace: false, - name: ObjectName(vec![Ident::new_unchecked("foo")]), - args: Some(vec![OperateFunctionArg::unnamed(DataType::Int)]), - returns: Some(CreateFunctionReturns::Value(DataType::Int)), - params: CreateFunctionBody { - language: Some(Ident::new_unchecked("javascript")), - behavior: None, - as_: Some(FunctionDefinition::SingleQuotedDef("SELECT 1".to_string())), - return_: None, - using: None, - runtime: Some(Ident::new_unchecked("deno")), - function_type: Some(CreateFunctionType::AsyncGenerator), - }, - with_options: CreateFunctionWithOptions { - always_retry_on_network_error: None, - }, - }; - assert_eq!( - "CREATE FUNCTION foo(INT) RETURNS INT LANGUAGE javascript RUNTIME deno AS 'SELECT 1' ASYNC GENERATOR", - format!("{}", create_function) - ); } } diff --git a/src/sqlparser/src/keywords.rs b/src/sqlparser/src/keywords.rs index ead0bec453f0c..d3b196d3bcb06 100644 --- a/src/sqlparser/src/keywords.rs +++ b/src/sqlparser/src/keywords.rs @@ -90,7 +90,6 @@ define_keywords!( ASENSITIVE, ASOF, ASYMMETRIC, - ASYNC, AT, ATOMIC, AUTHORIZATION, @@ -252,7 +251,6 @@ define_keywords!( FUNCTIONS, FUSION, GAP, - GENERATOR, GET, GLOBAL, GRANT, diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 41c4702e21ed7..c5403658773fb 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -2365,15 +2365,6 @@ impl Parser<'_> { } else if self.parse_keyword(Keyword::USING) { ensure_not_set(&body.using, "USING")?; body.using = Some(self.parse_create_function_using()?); - } else if self.parse_keyword(Keyword::SYNC) { - ensure_not_set(&body.function_type, "SYNC | ASYNC")?; - body.function_type = Some(self.parse_function_type(false, false)?); - } else if self.parse_keyword(Keyword::ASYNC) { - ensure_not_set(&body.function_type, "SYNC | ASYNC")?; - body.function_type = Some(self.parse_function_type(true, false)?); - } else if self.parse_keyword(Keyword::GENERATOR) { - ensure_not_set(&body.function_type, "SYNC | ASYNC")?; - body.function_type = Some(self.parse_function_type(false, true)?); } else { return Ok(body); } @@ -2396,25 +2387,6 @@ impl Parser<'_> { } } - fn parse_function_type( - &mut self, - is_async: bool, - is_generator: bool, - ) -> PResult { - let is_generator = if is_generator { - true - } else { - self.parse_keyword(Keyword::GENERATOR) - }; - - match (is_async, is_generator) { - (false, false) => Ok(CreateFunctionType::Sync), - (true, false) => Ok(CreateFunctionType::Async), - (false, true) => Ok(CreateFunctionType::Generator), - (true, true) => Ok(CreateFunctionType::AsyncGenerator), - } - } - // CREATE USER name [ [ WITH ] option [ ... ] ] // where option can be: // SUPERUSER | NOSUPERUSER diff --git a/src/sqlparser/tests/sqlparser_postgres.rs b/src/sqlparser/tests/sqlparser_postgres.rs index 311b2ba213c45..549920d1c7585 100644 --- a/src/sqlparser/tests/sqlparser_postgres.rs +++ b/src/sqlparser/tests/sqlparser_postgres.rs @@ -874,31 +874,6 @@ fn parse_create_function() { with_options: Default::default(), } ); - - let sql = "CREATE FUNCTION add(INT, INT) RETURNS INT LANGUAGE SQL IMMUTABLE AS 'select $1 + $2;' ASYNC"; - assert_eq!( - verified_stmt(sql), - Statement::CreateFunction { - or_replace: false, - temporary: false, - name: ObjectName(vec![Ident::new_unchecked("add")]), - args: Some(vec![ - OperateFunctionArg::unnamed(DataType::Int), - OperateFunctionArg::unnamed(DataType::Int), - ]), - returns: Some(CreateFunctionReturns::Value(DataType::Int)), - params: CreateFunctionBody { - language: Some("SQL".into()), - behavior: Some(FunctionBehavior::Immutable), - as_: Some(FunctionDefinition::SingleQuotedDef( - "select $1 + $2;".into() - )), - function_type: Some(CreateFunctionType::Async), - ..Default::default() - }, - with_options: Default::default(), - } - ); } #[test] diff --git a/src/tests/simulation/src/main.rs b/src/tests/simulation/src/main.rs index 4d3122c486d94..3b8e6d7b24afa 100644 --- a/src/tests/simulation/src/main.rs +++ b/src/tests/simulation/src/main.rs @@ -133,10 +133,13 @@ pub struct Args { /// Use arrangement backfill #[clap(long, default_value = "false")] use_arrangement_backfill: bool, + + /// Set vnode count (`STREAMING_MAX_PARALLELISM`) to random value before running DDL. + #[clap(long, env = "RW_SIM_RANDOM_VNODE_COUNT")] + random_vnode_count: bool, } #[tokio::main] -#[cfg_or_panic(madsim)] async fn main() { use std::sync::Arc; @@ -165,7 +168,6 @@ async fn main() { } else { vec!["SET STREAMING_USE_ARRANGEMENT_BACKFILL = false;".to_string()].into() }, - ..Default::default() }; let kill_opts = KillOpts { kill_meta: false, @@ -186,7 +188,7 @@ async fn main() { cluster.create_kafka_producer(&datadir).await; } - let seed = madsim::runtime::Handle::current().seed(); + let seed = sqlsmith_seed(); if let Some(count) = args.sqlsmith { cluster .run_on_client(async move { @@ -248,7 +250,12 @@ async fn main() { if let Some(jobs) = args.jobs { run_parallel_slt_task(glob, jobs).await.unwrap(); } else { - run_slt_task(cluster0, glob, &kill_opts, args.background_ddl_rate).await; + let opts = Opts { + kill_opts, + background_ddl_rate: args.background_ddl_rate, + random_vnode_count: args.random_vnode_count, + }; + run_slt_task(cluster0, glob, opts).await; } }) .await; @@ -270,3 +277,8 @@ async fn main() { cluster.graceful_shutdown().await; } + +#[cfg_or_panic(madsim)] +fn sqlsmith_seed() -> u64 { + madsim::runtime::Handle::current().seed() +} diff --git a/src/tests/simulation/src/slt.rs b/src/tests/simulation/src/slt.rs index 7bf9d62d19649..ede789792d1af 100644 --- a/src/tests/simulation/src/slt.rs +++ b/src/tests/simulation/src/slt.rs @@ -19,6 +19,7 @@ use std::time::Duration; use anyhow::{bail, Result}; use itertools::Itertools; +use rand::seq::IteratorRandom; use rand::{thread_rng, Rng, SeedableRng}; use rand_chacha::ChaChaRng; use sqllogictest::{Condition, ParallelTestError, QueryExpect, Record, StatementExpect}; @@ -85,6 +86,15 @@ impl SqlCmd { // are not transactional, we can't kill during `alter table add/drop columns` for now, will // remove it until transactional commit of table fragment and catalog is supported. } + + fn is_create(&self) -> bool { + matches!( + self, + SqlCmd::Create { .. } + | SqlCmd::CreateSink { .. } + | SqlCmd::CreateMaterializedView { .. } + ) + } } fn extract_sql_command(sql: &str) -> SqlCmd { @@ -189,13 +199,23 @@ async fn wait_background_mv_finished(mview_name: &str) -> Result<()> { } } +pub struct Opts { + pub kill_opts: KillOpts, + /// Probability of `background_ddl` being set to true per ddl record. + pub background_ddl_rate: f64, + /// Set vnode count (`STREAMING_MAX_PARALLELISM`) to random value before running DDL. + pub random_vnode_count: bool, +} + /// Run the sqllogictest files in `glob`. pub async fn run_slt_task( cluster: Arc, glob: &str, - opts: &KillOpts, - // Probability of background_ddl being set to true per ddl record. - background_ddl_rate: f64, + Opts { + kill_opts, + background_ddl_rate, + random_vnode_count, + }: Opts, ) { tracing::info!("background_ddl_rate: {}", background_ddl_rate); let seed = std::env::var("MADSIM_TEST_SEED") @@ -203,7 +223,10 @@ pub async fn run_slt_task( .parse::() .unwrap(); let mut rng = ChaChaRng::seed_from_u64(seed); - let kill = opts.kill_compute || opts.kill_meta || opts.kill_frontend || opts.kill_compactor; + let kill = kill_opts.kill_compute + || kill_opts.kill_meta + || kill_opts.kill_frontend + || kill_opts.kill_compactor; let files = glob::glob(glob).expect("failed to read glob pattern"); for file in files { // use a session per file @@ -229,7 +252,22 @@ pub async fn run_slt_task( // We can revert it back to false only if we encounter a record that sets background_ddl to false. let mut manual_background_ddl_enabled = false; - for record in sqllogictest::parse_file(path).expect("failed to parse file") { + let records = sqllogictest::parse_file(path).expect("failed to parse file"); + let random_vnode_count = random_vnode_count + // Skip using random vnode count if the test case cares about parallelism, including + // setting parallelism manually or checking the parallelism with system tables. + && records.iter().all(|record| { + if let Record::Statement { sql, .. } | Record::Query { sql, .. } = record + && sql.to_lowercase().contains("parallelism") + { + println!("[RANDOM VNODE COUNT] skip: {}", path.display()); + false + } else { + true + } + }); + + for record in records { // uncomment to print metrics for task counts // let metrics = madsim::runtime::Handle::current().metrics(); // println!("{:#?}", metrics); @@ -238,8 +276,42 @@ pub async fn run_slt_task( break; } + let cmd = match &record { + sqllogictest::Record::Statement { sql, .. } + | sqllogictest::Record::Query { sql, .. } => extract_sql_command(sql), + _ => SqlCmd::Others, + }; + // For normal records. if !kill { + // Set random vnode count if needed. + if random_vnode_count + && cmd.is_create() + && let Record::Statement { + loc, + conditions, + connection, + .. + } = &record + { + let vnode_count = (2..=64) // small + .chain(224..=288) // normal + .chain(992..=1056) // 1024 affects row id gen behavior + .choose(&mut thread_rng()) + .unwrap(); + let sql = format!("SET STREAMING_MAX_PARALLELISM = {vnode_count};"); + println!("[RANDOM VNODE COUNT] set: {vnode_count}"); + let set_random_vnode_count = Record::Statement { + loc: loc.clone(), + conditions: conditions.clone(), + connection: connection.clone(), + sql, + expected: StatementExpect::Ok, + }; + tester.run_async(set_random_vnode_count).await.unwrap(); + println!("[RANDOM VNODE COUNT] run: {record}"); + } + match tester .run_async(record.clone()) .timed(|_res, elapsed| { @@ -253,11 +325,6 @@ pub async fn run_slt_task( } // For kill enabled. - let cmd = match &record { - sqllogictest::Record::Statement { sql, .. } - | sqllogictest::Record::Query { sql, .. } => extract_sql_command(sql), - _ => SqlCmd::Others, - }; tracing::debug!(?cmd, "Running"); if background_ddl_rate > 0.0 @@ -329,11 +396,11 @@ pub async fn run_slt_task( continue; } - let should_kill = thread_rng().gen_bool(opts.kill_rate as f64); + let should_kill = thread_rng().gen_bool(kill_opts.kill_rate as f64); // spawn a background task to kill nodes let handle = if should_kill { let cluster = cluster.clone(); - let opts = *opts; + let opts = kill_opts; Some(tokio::spawn(async move { let t = thread_rng().gen_range(Duration::default()..Duration::from_secs(1)); tokio::time::sleep(t).await;