diff --git a/ci/scripts/run-backfill-tests.sh b/ci/scripts/run-backfill-tests.sh index e809053f9f37..a61819368c68 100755 --- a/ci/scripts/run-backfill-tests.sh +++ b/ci/scripts/run-backfill-tests.sh @@ -265,6 +265,20 @@ test_backfill_snapshot_with_limited_storage_throughput() { kill_cluster } +# Test case where we do backfill with PK of 10 columns to measure performance impact. +test_backfill_snapshot_with_wider_rows() { + echo "--- e2e, test_backfill_snapshot_with_wider_rows, $RUNTIME_CLUSTER_PROFILE" + cargo make ci-start $RUNTIME_CLUSTER_PROFILE + sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_wide_table.slt' + sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert_wide_snapshot.slt' + sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_arrangement_backfill_mv.slt' + sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_no_shuffle_mv.slt' + sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/validate_rows_no_shuffle.slt' + sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/validate_rows_arrangement.slt' + + kill_cluster +} + main() { set -euo pipefail test_snapshot_and_upstream_read @@ -282,6 +296,7 @@ main() { # Backfill will happen in sequence here. test_backfill_snapshot_runtime + test_backfill_snapshot_with_wider_rows test_backfill_snapshot_with_limited_storage_throughput # No upstream only tests, because if there's no snapshot, diff --git a/e2e_test/backfill/runtime/create_wide_table.slt b/e2e_test/backfill/runtime/create_wide_table.slt new file mode 100644 index 000000000000..43453ebb5d2d --- /dev/null +++ b/e2e_test/backfill/runtime/create_wide_table.slt @@ -0,0 +1,20 @@ +statement ok +CREATE TABLE t ( + v1 int, + v2 varchar, + v3 bigint, + v4 int, + v5 varchar, + v6 bigint, + v7 int, + v8 varchar, + v9 bigint, + v10 int, + v11 varchar, + v12 bigint, + v13 int, + v14 varchar, + v15 bigint, + primary key (v1, v2, v3, v4, v5, + v8, v9, v10, v11, v12) +); \ No newline at end of file diff --git a/e2e_test/backfill/runtime/insert_wide_snapshot.slt b/e2e_test/backfill/runtime/insert_wide_snapshot.slt new file mode 100644 index 000000000000..cd4d2bfadeb6 --- /dev/null +++ b/e2e_test/backfill/runtime/insert_wide_snapshot.slt @@ -0,0 +1,22 @@ +# 15 columns wide +statement ok +INSERT INTO t select + generate_series, + 'jakbj2khbe2', + 22222222222, + generate_series, + 'jakbj2khbe2', + 22222222222, + generate_series, + 'jakbj2khbe2', + 22222222222, + generate_series, + 'jakbj2khbe2', + 22222222222, + generate_series, + 'jakbj2khbe2', + 22222222222 + from generate_series(2000001, 4000000); + +statement ok +flush; \ No newline at end of file diff --git a/e2e_test/udf/sql_udf.slt b/e2e_test/udf/sql_udf.slt index 4b4d1f1c39f7..758ec43ca53f 100644 --- a/e2e_test/udf/sql_udf.slt +++ b/e2e_test/udf/sql_udf.slt @@ -13,6 +13,35 @@ create function sub(INT, INT) returns int language sql as 'select $1 - $2'; statement ok create function add_sub_binding() returns int language sql as 'select add(1, 1) + sub(2, 2)'; +# Create a named sql udf +statement ok +create function add_named(a INT, b INT) returns int language sql as 'select a + b'; + +# Create another named sql udf +statement ok +create function sub_named(a INT, b INT) returns int language sql as 'select a - b'; + +# Mixed parameter with named / anonymous parameters +statement ok +create function add_sub_mix(INT, a INT, INT) returns int language sql as 'select $1 - a + $3'; + +# Mixed parameter with calling inner sql udfs +# statement ok +# create function add_sub_mix_wrapper(INT, a INT, INT) returns int language sql as 'select add($1, a) + a + sub(a, $3)'; + +# Named sql udf with corner case +statement ok +create function corner_case(INT, a INT, INT) returns varchar language sql as $$select '$1 + a + $3'$$; + +# Named sql udf with invalid parameter in body definition +# Will be rejected at creation time +statement error failed to find named parameter aa +create function unknown_parameter(a INT) returns int language sql as 'select a + aa + a'; + +# Call anonymous sql udf inside named sql udf +statement ok +create function add_named_wrapper(a INT, b INT) returns int language sql as 'select add(a, b)'; + # Create an anonymous function that calls built-in functions # Note that double dollar signs should be used otherwise the parsing will fail, as illutrates below statement ok @@ -81,7 +110,20 @@ create function print_add_two(INT) returns int language sql as 'select print($1 statement error failed to conduct semantic check, please see if you are calling non-existence functions create function non_exist(INT) returns int language sql as 'select yo(114514)'; -# Call the defined sql udf +# Try to create an anonymous sql udf whose return type mismatches with the sql body definition +statement error return type mismatch detected +create function type_mismatch(INT) returns varchar language sql as 'select $1 + 114514 + $1'; + +# A valid example +statement ok +create function type_match(INT) returns varchar language sql as $$select '$1 + 114514 + $1'$$; + +query T +select type_match(114514); +---- +$1 + 114514 + $1 + +# Call the defined anonymous sql udfs query I select add(1, -1); ---- @@ -92,6 +134,32 @@ select sub(1, 1); ---- 0 +# Call the defined named sql udfs +query I +select add_named(1, -1); +---- +0 + +query I +select sub_named(1, 1); +---- +0 + +query I +select add_sub_mix(1, 2, 3); +---- +2 + +query T +select corner_case(1, 2, 3); +---- +$1 + a + $3 + +query I +select add_named_wrapper(1, -1); +---- +0 + query I select add_sub_binding(); ---- @@ -145,14 +213,30 @@ select print_add_one(1), print_add_one(114513), print_add_two(2); ---- 2 114514 4 -# Create a mock table +# Create a mock table for anonymous sql udf statement ok create table t1 (c1 INT, c2 INT); +# Create a mock table for named sql udf +statement ok +create table t3 (a INT, b INT); + # Insert some data into the mock table statement ok insert into t1 values (1, 1), (2, 2), (3, 3), (4, 4), (5, 5); +statement ok +insert into t3 values (1, 1), (2, 2), (3, 3), (4, 4), (5, 5); + +query I +select add_named(a, b) from t3 order by a asc; +---- +2 +4 +6 +8 +10 + query III select sub(c1, c2), c1, c2, add(c1, c2) from t1 order by c1 asc; ---- @@ -189,7 +273,7 @@ create function add_sub(INT, FLOAT, INT) returns float language sql as $$select # Complex types interleaving statement ok -create function add_sub_types(INT, BIGINT, FLOAT, DECIMAL, REAL) returns real language sql as 'select $1 + $2 - $3 + $4 + $5'; +create function add_sub_types(INT, BIGINT, FLOAT, DECIMAL, REAL) returns double language sql as 'select $1 + $2 - $3 + $4 + $5'; statement ok create function add_sub_return(INT, FLOAT, INT) returns float language sql return -$1 + $2 - $3; @@ -290,9 +374,30 @@ drop function print_add_two; statement ok drop function regexp_replace_wrapper; +statement ok +drop function corner_case; + +statement ok +drop function add_named; + +statement ok +drop function sub_named; + +statement ok +drop function add_sub_mix; + +statement ok +drop function add_named_wrapper; + +statement ok +drop function type_match; + # Drop the mock table statement ok drop table t1; statement ok drop table t2; + +statement ok +drop table t3; \ No newline at end of file diff --git a/src/frontend/src/binder/expr/column.rs b/src/frontend/src/binder/expr/column.rs index dbee0b0708cb..9fb17c6e4352 100644 --- a/src/frontend/src/binder/expr/column.rs +++ b/src/frontend/src/binder/expr/column.rs @@ -52,6 +52,8 @@ impl Binder { // The reason that we directly return error here, // is because during a valid sql udf binding, // there will not exist any column identifiers + // And invalid cases should already be caught + // during semantic check phase return Err(ErrorCode::BindError(format!( "failed to find named parameter {column_name}" )) diff --git a/src/frontend/src/binder/mod.rs b/src/frontend/src/binder/mod.rs index 978074e7455e..e726d7dc01d4 100644 --- a/src/frontend/src/binder/mod.rs +++ b/src/frontend/src/binder/mod.rs @@ -219,26 +219,29 @@ impl UdfContext { Ok(expr) } - /// TODO: add name related logic - /// NOTE: need to think of a way to prevent naming conflict - /// e.g., when existing column names conflict with parameter names in sql udf pub fn create_udf_context( args: &[FunctionArg], - _catalog: &Arc, + catalog: &Arc, ) -> Result> { let mut ret: HashMap = HashMap::new(); for (i, current_arg) in args.iter().enumerate() { - if let FunctionArg::Unnamed(arg) = current_arg { - let FunctionArgExpr::Expr(e) = arg else { - return Err(ErrorCode::InvalidInputSyntax("invalid syntax".to_string()).into()); - }; - // if catalog.arg_names.is_some() { - // todo!() - // } - ret.insert(format!("${}", i + 1), e.clone()); - continue; + match current_arg { + FunctionArg::Unnamed(arg) => { + let FunctionArgExpr::Expr(e) = arg else { + return Err( + ErrorCode::InvalidInputSyntax("invalid syntax".to_string()).into() + ); + }; + if catalog.arg_names[i].is_empty() { + ret.insert(format!("${}", i + 1), e.clone()); + } else { + // The index mapping here is accurate + // So that we could directly use the index + ret.insert(catalog.arg_names[i].clone(), e.clone()); + } + } + _ => return Err(ErrorCode::InvalidInputSyntax("invalid syntax".to_string()).into()), } - return Err(ErrorCode::InvalidInputSyntax("invalid syntax".to_string()).into()); } Ok(ret) } diff --git a/src/frontend/src/handler/create_sql_function.rs b/src/frontend/src/handler/create_sql_function.rs index 166e940b6081..4eaa78f82533 100644 --- a/src/frontend/src/handler/create_sql_function.rs +++ b/src/frontend/src/handler/create_sql_function.rs @@ -29,18 +29,28 @@ use thiserror_ext::AsReport; use super::*; use crate::binder::UdfContext; use crate::catalog::CatalogError; -use crate::expr::{ExprImpl, Literal}; +use crate::expr::{Expr, ExprImpl, Literal}; use crate::{bind_data_type, Binder}; /// Create a mock `udf_context`, which is used for semantic check -fn create_mock_udf_context(arg_types: Vec) -> HashMap { - (1..=arg_types.len()) +fn create_mock_udf_context( + arg_types: Vec, + arg_names: Vec, +) -> HashMap { + let mut ret: HashMap = (1..=arg_types.len()) .map(|i| { let mock_expr = ExprImpl::Literal(Box::new(Literal::new(None, arg_types[i - 1].clone()))); - (format!("${i}"), mock_expr.clone()) + (format!("${i}"), mock_expr) }) - .collect() + .collect(); + + for (i, arg_name) in arg_names.into_iter().enumerate() { + let mock_expr = ExprImpl::Literal(Box::new(Literal::new(None, arg_types[i].clone()))); + ret.insert(arg_name, mock_expr); + } + + ret } pub async fn handle_create_sql_function( @@ -173,15 +183,31 @@ pub async fn handle_create_sql_function( binder .udf_context_mut() - .update_context(create_mock_udf_context(arg_types.clone())); + .update_context(create_mock_udf_context( + arg_types.clone(), + arg_names.clone(), + )); + + binder.set_udf_binding_flag(); if let Ok(expr) = UdfContext::extract_udf_expression(ast) { - if let Err(e) = binder.bind_expr(expr) { - return Err(ErrorCode::InvalidInputSyntax(format!( + match binder.bind_expr(expr) { + Ok(expr) => { + // Check if the return type mismatches + if expr.return_type() != return_type { + return Err(ErrorCode::InvalidInputSyntax(format!( + "\nreturn type mismatch detected\nexpected: [{}]\nactual: [{}]\nplease adjust your function definition accordingly", + return_type, + expr.return_type() + )) + .into()); + } + } + Err(e) => return Err(ErrorCode::InvalidInputSyntax(format!( "failed to conduct semantic check, please see if you are calling non-existence functions: {}", e.as_report() )) - .into()); + .into()), } } else { return Err(ErrorCode::InvalidInputSyntax( @@ -191,6 +217,8 @@ pub async fn handle_create_sql_function( ) .into()); } + + binder.unset_udf_binding_flag(); } // Create the actual function, will be stored in function catalog diff --git a/src/meta/model_v2/migration/src/lib.rs b/src/meta/model_v2/migration/src/lib.rs index 6b8a2b01c2d4..c021942bcbad 100644 --- a/src/meta/model_v2/migration/src/lib.rs +++ b/src/meta/model_v2/migration/src/lib.rs @@ -16,3 +16,33 @@ impl MigratorTrait for Migrator { ] } } + +#[macro_export] +macro_rules! assert_not_has_tables { + ($manager:expr, $( $table:ident ),+) => { + $( + assert!( + !$manager + .has_table($table::Table.to_string()) + .await? + ); + )+ + }; +} + +#[macro_export] +macro_rules! drop_tables { + ($manager:expr, $( $table:ident ),+) => { + $( + $manager + .drop_table( + sea_orm_migration::prelude::Table::drop() + .table($table::Table) + .if_exists() + .cascade() + .to_owned(), + ) + .await?; + )+ + }; +} diff --git a/src/meta/model_v2/migration/src/m20230908_072257_init.rs b/src/meta/model_v2/migration/src/m20230908_072257_init.rs index bf8cb8c0fc1e..bf96b8c6227a 100644 --- a/src/meta/model_v2/migration/src/m20230908_072257_init.rs +++ b/src/meta/model_v2/migration/src/m20230908_072257_init.rs @@ -1,5 +1,7 @@ use sea_orm_migration::prelude::{Index as MigrationIndex, Table as MigrationTable, *}; +use crate::{assert_not_has_tables, drop_tables}; + #[derive(DeriveMigrationName)] pub struct Migration; @@ -7,38 +9,30 @@ pub struct Migration; impl MigrationTrait for Migration { async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { // 1. check if the table exists. - assert!(!manager.has_table(Cluster::Table.to_string()).await?); - assert!(!manager.has_table(Worker::Table.to_string()).await?); - assert!(!manager.has_table(WorkerProperty::Table.to_string()).await?); - assert!(!manager.has_table(User::Table.to_string()).await?); - assert!(!manager.has_table(UserPrivilege::Table.to_string()).await?); - assert!(!manager.has_table(Database::Table.to_string()).await?); - assert!(!manager.has_table(Schema::Table.to_string()).await?); - assert!(!manager.has_table(StreamingJob::Table.to_string()).await?); - assert!(!manager.has_table(Fragment::Table.to_string()).await?); - assert!(!manager.has_table(Actor::Table.to_string()).await?); - assert!( - !manager - .has_table(ActorDispatcher::Table.to_string()) - .await? - ); - assert!(!manager.has_table(Table::Table.to_string()).await?); - assert!(!manager.has_table(Source::Table.to_string()).await?); - assert!(!manager.has_table(Sink::Table.to_string()).await?); - assert!(!manager.has_table(Connection::Table.to_string()).await?); - assert!(!manager.has_table(View::Table.to_string()).await?); - assert!(!manager.has_table(Index::Table.to_string()).await?); - assert!(!manager.has_table(Function::Table.to_string()).await?); - assert!(!manager.has_table(Object::Table.to_string()).await?); - assert!( - !manager - .has_table(ObjectDependency::Table.to_string()) - .await? - ); - assert!( - !manager - .has_table(SystemParameter::Table.to_string()) - .await? + assert_not_has_tables!( + manager, + Cluster, + Worker, + WorkerProperty, + User, + UserPrivilege, + Database, + Schema, + StreamingJob, + Fragment, + Actor, + ActorDispatcher, + Table, + Source, + Sink, + Connection, + View, + Index, + Function, + Object, + ObjectDependency, + SystemParameter, + CatalogVersion ); // 2. create tables. @@ -742,6 +736,24 @@ impl MigrationTrait for Migration { .to_owned(), ) .await?; + manager + .create_table( + crate::Table::create() + .table(CatalogVersion::Table) + .col( + ColumnDef::new(CatalogVersion::Name) + .string() + .not_null() + .primary_key(), + ) + .col( + ColumnDef::new(CatalogVersion::Version) + .big_integer() + .not_null(), + ) + .to_owned(), + ) + .await?; // 3. create indexes. manager @@ -836,22 +848,6 @@ impl MigrationTrait for Migration { } async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { - macro_rules! drop_tables { - ($manager:expr, $( $table:ident ),+) => { - $( - $manager - .drop_table( - MigrationTable::drop() - .table($table::Table) - .if_exists() - .cascade() - .to_owned(), - ) - .await?; - )+ - }; - } - // drop tables cascade. drop_tables!( manager, @@ -875,7 +871,8 @@ impl MigrationTrait for Migration { Function, Object, ObjectDependency, - SystemParameter + SystemParameter, + CatalogVersion ); Ok(()) } @@ -1136,3 +1133,10 @@ enum SystemParameter { IsMutable, Description, } + +#[derive(DeriveIden)] +enum CatalogVersion { + Table, + Name, + Version, +} diff --git a/src/meta/model_v2/migration/src/m20231008_020431_hummock.rs b/src/meta/model_v2/migration/src/m20231008_020431_hummock.rs index 5f1f26b3e21d..b90e088da1f1 100644 --- a/src/meta/model_v2/migration/src/m20231008_020431_hummock.rs +++ b/src/meta/model_v2/migration/src/m20231008_020431_hummock.rs @@ -1,22 +1,13 @@ use sea_orm_migration::prelude::*; +use crate::{assert_not_has_tables, drop_tables}; + #[derive(DeriveMigrationName)] pub struct Migration; #[async_trait::async_trait] impl MigrationTrait for Migration { async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { - macro_rules! assert_not_has_tables { - ($manager:expr, $( $table:ident ),+) => { - $( - assert!( - !$manager - .has_table($table::Table.to_string()) - .await? - ); - )+ - }; - } assert_not_has_tables!( manager, CompactionTask, @@ -25,7 +16,8 @@ impl MigrationTrait for Migration { HummockPinnedVersion, HummockPinnedSnapshot, HummockVersionDelta, - HummockVersionStats + HummockVersionStats, + HummockSequence ); manager @@ -196,21 +188,6 @@ impl MigrationTrait for Migration { } async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { - macro_rules! drop_tables { - ($manager:expr, $( $table:ident ),+) => { - $( - $manager - .drop_table( - Table::drop() - .table($table::Table) - .if_exists() - .cascade() - .to_owned(), - ) - .await?; - )+ - }; - } drop_tables!( manager, CompactionTask, diff --git a/src/meta/model_v2/src/catalog_version.rs b/src/meta/model_v2/src/catalog_version.rs new file mode 100644 index 000000000000..53c6f9109635 --- /dev/null +++ b/src/meta/model_v2/src/catalog_version.rs @@ -0,0 +1,37 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use sea_orm::entity::prelude::*; + +#[derive(Clone, Copy, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)] +#[sea_orm(rs_type = "String", db_type = "String(None)")] +pub enum VersionCategory { + #[sea_orm(string_value = "NOTIFICATION")] + Notification, + #[sea_orm(string_value = "TABLE_REVISION")] + TableRevision, +} + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[sea_orm(table_name = "catalog_version")] +pub struct Model { + #[sea_orm(primary_key, auto_increment = false)] + pub name: VersionCategory, + pub version: i64, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/src/meta/model_v2/src/lib.rs b/src/meta/model_v2/src/lib.rs index 572e59c82035..1cec6b553c16 100644 --- a/src/meta/model_v2/src/lib.rs +++ b/src/meta/model_v2/src/lib.rs @@ -23,6 +23,7 @@ pub mod prelude; pub mod actor; pub mod actor_dispatcher; +pub mod catalog_version; pub mod cluster; pub mod compaction_config; pub mod compaction_status; diff --git a/src/meta/model_v2/src/prelude.rs b/src/meta/model_v2/src/prelude.rs index b1cc5c54ff34..6a5316b0e422 100644 --- a/src/meta/model_v2/src/prelude.rs +++ b/src/meta/model_v2/src/prelude.rs @@ -14,6 +14,7 @@ pub use super::actor::Entity as Actor; pub use super::actor_dispatcher::Entity as ActorDispatcher; +pub use super::catalog_version::Entity as CatalogVersion; pub use super::cluster::Entity as Cluster; pub use super::compaction_config::Entity as CompactionConfig; pub use super::compaction_status::Entity as CompactionStatus; diff --git a/src/meta/src/controller/system_param.rs b/src/meta/src/controller/system_param.rs index b97bf0804a95..6802ad17e176 100644 --- a/src/meta/src/controller/system_param.rs +++ b/src/meta/src/controller/system_param.rs @@ -217,7 +217,7 @@ impl SystemParamsController { .await; // Sync params to worker nodes. - self.notify_workers(¶ms).await; + self.notify_workers(¶ms); Ok(params) } @@ -241,8 +241,7 @@ impl SystemParamsController { } } system_params_controller - .notify_workers(&*system_params_controller.params.read().await) - .await; + .notify_workers(&*system_params_controller.params.read().await); } }); @@ -250,16 +249,14 @@ impl SystemParamsController { } // Notify workers of parameter change. - async fn notify_workers(&self, params: &PbSystemParams) { + // TODO: add system params into snapshot to avoid periodically sync. + fn notify_workers(&self, params: &PbSystemParams) { self.notification_manager - .notify_frontend(Operation::Update, Info::SystemParams(params.clone())) - .await; + .notify_frontend_without_version(Operation::Update, Info::SystemParams(params.clone())); self.notification_manager - .notify_compute(Operation::Update, Info::SystemParams(params.clone())) - .await; + .notify_compute_without_version(Operation::Update, Info::SystemParams(params.clone())); self.notification_manager - .notify_compactor(Operation::Update, Info::SystemParams(params.clone())) - .await; + .notify_compute_without_version(Operation::Update, Info::SystemParams(params.clone())); } } diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index 26601f6af037..ee4287b1aec3 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -286,7 +286,8 @@ impl MetaSrvEnv { // change to sync after refactor `IdGeneratorManager::new` sync. let id_gen_manager = Arc::new(IdGeneratorManager::new(meta_store.clone()).await); let stream_client_pool = Arc::new(StreamClientPool::default()); - let notification_manager = Arc::new(NotificationManager::new(meta_store.clone()).await); + let notification_manager = + Arc::new(NotificationManager::new(meta_store.clone(), meta_store_sql.clone()).await); let idle_manager = Arc::new(IdleManager::new(opts.max_idle_ms)); let (mut cluster_id, cluster_first_launch) = if let Some(id) = ClusterId::from_meta_store(&meta_store).await? { @@ -463,7 +464,8 @@ impl MetaSrvEnv { let meta_store_sql = Some(SqlMetaStore::for_test().await); let id_gen_manager = Arc::new(IdGeneratorManager::new(meta_store.clone()).await); - let notification_manager = Arc::new(NotificationManager::new(meta_store.clone()).await); + let notification_manager = + Arc::new(NotificationManager::new(meta_store.clone(), meta_store_sql.clone()).await); let stream_client_pool = Arc::new(StreamClientPool::default()); let idle_manager = Arc::new(IdleManager::disabled()); let (cluster_id, cluster_first_launch) = (ClusterId::new(), true); diff --git a/src/meta/src/manager/mod.rs b/src/meta/src/manager/mod.rs index 4ab91c388792..24c981008425 100644 --- a/src/meta/src/manager/mod.rs +++ b/src/meta/src/manager/mod.rs @@ -21,6 +21,7 @@ mod id; mod idle; mod metadata; mod notification; +mod notification_version; pub mod sink_coordination; mod streaming_job; mod system_param; diff --git a/src/meta/src/manager/notification.rs b/src/meta/src/manager/notification.rs index 3132f662cd76..49f85a993bdd 100644 --- a/src/meta/src/manager/notification.rs +++ b/src/meta/src/manager/notification.rs @@ -28,8 +28,10 @@ use tokio::sync::mpsc::{self, UnboundedSender}; use tokio::sync::Mutex; use tonic::Status; +use crate::controller::SqlMetaStore; use crate::manager::cluster::WorkerKey; -use crate::model::{FragmentId, NotificationVersion as Version}; +use crate::manager::notification_version::NotificationVersionGenerator; +use crate::model::FragmentId; use crate::storage::MetaStoreRef; pub type MessageStatus = Status; @@ -77,17 +79,19 @@ pub struct NotificationManager { core: Arc>, /// Sender used to add a notification into the waiting queue. task_tx: UnboundedSender, - /// The current notification version. - current_version: Mutex, - meta_store: MetaStoreRef, + /// The current notification version generator. + version_generator: Mutex, } impl NotificationManager { - pub async fn new(meta_store: MetaStoreRef) -> Self { + pub async fn new(meta_store: MetaStoreRef, meta_store_sql: Option) -> Self { // notification waiting queue. let (task_tx, mut task_rx) = mpsc::unbounded_channel::(); let core = Arc::new(Mutex::new(NotificationManagerCore::new())); let core_clone = core.clone(); + let version_generator = NotificationVersionGenerator::new(meta_store, meta_store_sql) + .await + .unwrap(); tokio::spawn(async move { while let Some(task) = task_rx.recv().await { @@ -105,8 +109,7 @@ impl NotificationManager { Self { core: core_clone, task_tx, - current_version: Mutex::new(Version::new(&meta_store).await), - meta_store, + version_generator: Mutex::new(version_generator), } } @@ -140,9 +143,9 @@ impl NotificationManager { operation: Operation, info: Info, ) -> NotificationVersion { - let mut version_guard = self.current_version.lock().await; - version_guard.increase_version(&self.meta_store).await; - let version = version_guard.version(); + let mut version_guard = self.version_generator.lock().await; + version_guard.increase_version().await; + let version = version_guard.current_version(); self.notify(target, operation, info, Some(version)); version } @@ -252,6 +255,10 @@ impl NotificationManager { self.notify_without_version(SubscribeType::Hummock.into(), operation, info) } + pub fn notify_compactor_without_version(&self, operation: Operation, info: Info) { + self.notify_without_version(SubscribeType::Compactor.into(), operation, info) + } + #[cfg(any(test, feature = "test"))] pub fn notify_hummock_with_version( &self, @@ -320,8 +327,8 @@ impl NotificationManager { } pub async fn current_version(&self) -> NotificationVersion { - let version_guard = self.current_version.lock().await; - version_guard.version() + let version_guard = self.version_generator.lock().await; + version_guard.current_version() } } @@ -411,7 +418,7 @@ mod tests { #[tokio::test] async fn test_multiple_subscribers_one_worker() { - let mgr = NotificationManager::new(MemStore::new().into_ref()).await; + let mgr = NotificationManager::new(MemStore::new().into_ref(), None).await; let worker_key1 = WorkerKey(HostAddress { host: "a".to_string(), port: 1, diff --git a/src/meta/src/manager/notification_version.rs b/src/meta/src/manager/notification_version.rs new file mode 100644 index 000000000000..ea461da976ac --- /dev/null +++ b/src/meta/src/manager/notification_version.rs @@ -0,0 +1,84 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_meta_model_v2::catalog_version; +use risingwave_meta_model_v2::catalog_version::VersionCategory; +use risingwave_meta_model_v2::prelude::CatalogVersion; +use sea_orm::ActiveValue::Set; +use sea_orm::{ActiveModelTrait, DatabaseConnection, EntityTrait, TransactionTrait}; + +use crate::controller::SqlMetaStore; +use crate::model::NotificationVersion as NotificationModelV1; +use crate::storage::MetaStoreRef; +use crate::MetaResult; + +pub enum NotificationVersionGenerator { + KvGenerator(NotificationModelV1, MetaStoreRef), + SqlGenerator(u64, DatabaseConnection), +} + +// TODO: add pre-allocation if necessary +impl NotificationVersionGenerator { + pub async fn new( + meta_store: MetaStoreRef, + meta_store_sql: Option, + ) -> MetaResult { + if let Some(sql) = meta_store_sql { + let txn = sql.conn.begin().await?; + let model = CatalogVersion::find_by_id(VersionCategory::Notification) + .one(&txn) + .await?; + let current_version = model.as_ref().map(|m| m.version).unwrap_or(1) as u64; + if model.is_none() { + catalog_version::ActiveModel { + name: Set(VersionCategory::Notification), + version: Set(1), + } + .insert(&txn) + .await?; + txn.commit().await?; + } + + Ok(Self::SqlGenerator(current_version, sql.conn)) + } else { + let current_version = NotificationModelV1::new(&meta_store).await; + Ok(Self::KvGenerator(current_version, meta_store)) + } + } + + pub fn current_version(&self) -> u64 { + match self { + NotificationVersionGenerator::KvGenerator(v, _) => v.version(), + NotificationVersionGenerator::SqlGenerator(v, _) => *v, + } + } + + pub async fn increase_version(&mut self) { + match self { + NotificationVersionGenerator::KvGenerator(v, meta_store) => { + v.increase_version(meta_store).await + } + NotificationVersionGenerator::SqlGenerator(v, conn) => { + catalog_version::ActiveModel { + name: Set(VersionCategory::Notification), + version: Set((*v + 1) as i64), + } + .update(conn) + .await + .unwrap(); + *v += 1; + } + } + } +} diff --git a/src/meta/src/manager/system_param/mod.rs b/src/meta/src/manager/system_param/mod.rs index a7336a10e5de..bdd54d3ed7b5 100644 --- a/src/meta/src/manager/system_param/mod.rs +++ b/src/meta/src/manager/system_param/mod.rs @@ -116,7 +116,7 @@ impl SystemParamsManager { .await; // Sync params to worker nodes. - self.notify_workers(params).await; + self.notify_workers(params); Ok(params.clone()) } @@ -142,9 +142,7 @@ impl SystemParamsManager { return; } } - system_params_manager - .notify_workers(&*system_params_manager.params.read().await) - .await; + system_params_manager.notify_workers(&*system_params_manager.params.read().await); } }); @@ -152,16 +150,16 @@ impl SystemParamsManager { } // Notify workers of parameter change. - async fn notify_workers(&self, params: &SystemParams) { + // TODO: add system params into snapshot to avoid periodically sync. + fn notify_workers(&self, params: &SystemParams) { self.notification_manager - .notify_frontend(Operation::Update, Info::SystemParams(params.clone())) - .await; - self.notification_manager - .notify_compute(Operation::Update, Info::SystemParams(params.clone())) - .await; + .notify_frontend_without_version(Operation::Update, Info::SystemParams(params.clone())); self.notification_manager - .notify_compactor(Operation::Update, Info::SystemParams(params.clone())) - .await; + .notify_compute_without_version(Operation::Update, Info::SystemParams(params.clone())); + self.notification_manager.notify_compactor_without_version( + Operation::Update, + Info::SystemParams(params.clone()), + ); } } diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index b0dd649c59d1..08312c0c882d 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -1127,24 +1127,6 @@ where Ok(()) } - // TODO(st1page): maybe we should extract a pub struct to do it - /// just specially used by those state table read-only and after the call the data - /// in the epoch will be visible - pub fn commit_no_data_expected(&mut self, new_epoch: EpochPair) { - assert_eq!(self.epoch(), new_epoch.prev); - assert!(!self.is_dirty()); - // Tick the watermark buffer here because state table is expected to be committed once - // per epoch. - self.watermark_buffer_strategy.tick(); - self.local_store.seal_current_epoch( - new_epoch.curr, - SealCurrentEpochOptions { - table_watermarks: None, - switch_op_consistency_level: None, - }, - ); - } - /// Write to state store. async fn seal_current_epoch( &mut self, diff --git a/src/stream/src/executor/backfill/arrangement_backfill.rs b/src/stream/src/executor/backfill/arrangement_backfill.rs index 76f9271862e2..fa39c87d74ea 100644 --- a/src/stream/src/executor/backfill/arrangement_backfill.rs +++ b/src/stream/src/executor/backfill/arrangement_backfill.rs @@ -374,7 +374,6 @@ where } // consume upstream buffer chunk - let upstream_chunk_buffer_is_empty = upstream_chunk_buffer.is_empty(); for chunk in upstream_chunk_buffer.drain(..) { cur_barrier_upstream_processed_rows += chunk.cardinality() as u64; // FIXME: Replace with `snapshot_is_processed` @@ -397,11 +396,7 @@ where upstream_table.write_chunk(chunk); } - if upstream_chunk_buffer_is_empty { - upstream_table.commit_no_data_expected(barrier.epoch) - } else { - upstream_table.commit(barrier.epoch).await?; - } + upstream_table.commit(barrier.epoch).await?; self.metrics .arrangement_backfill_snapshot_read_row_count diff --git a/src/stream/src/executor/backfill/utils.rs b/src/stream/src/executor/backfill/utils.rs index 8937d5260774..8fc27ab5864a 100644 --- a/src/stream/src/executor/backfill/utils.rs +++ b/src/stream/src/executor/backfill/utils.rs @@ -555,10 +555,7 @@ pub(crate) async fn flush_data( ) -> StreamExecutorResult<()> { let vnodes = table.vnodes().clone(); if let Some(old_state) = old_state { - if old_state[1..] == current_partial_state[1..] { - table.commit_no_data_expected(epoch); - return Ok(()); - } else { + if old_state[1..] != current_partial_state[1..] { vnodes.iter_vnodes_scalar().for_each(|vnode| { let datum = Some(vnode.into()); current_partial_state[0] = datum.clone(); @@ -727,7 +724,6 @@ pub(crate) async fn persist_state_per_vnode, ) -> StreamExecutorResult<()> { - let mut has_progress = false; for vnode in vnodes { if !backfill_state.need_commit(&vnode) { continue; @@ -762,7 +758,6 @@ pub(crate) async fn persist_state_per_vnode( flush_data(table, epoch, old_state, current_state).await?; *old_state = Some(current_state.into()); } else { - table.commit_no_data_expected(epoch); + table.commit(epoch).await?; } Ok(()) } diff --git a/src/stream/src/executor/dedup/append_only_dedup.rs b/src/stream/src/executor/dedup/append_only_dedup.rs index b2d17e9da638..6049b963359a 100644 --- a/src/stream/src/executor/dedup/append_only_dedup.rs +++ b/src/stream/src/executor/dedup/append_only_dedup.rs @@ -76,8 +76,6 @@ impl AppendOnlyDedupExecutor { // The first barrier message should be propagated. yield Message::Barrier(barrier); - let mut commit_data = false; - #[for_await] for msg in input { self.cache.evict(); @@ -127,21 +125,13 @@ impl AppendOnlyDedupExecutor { let chunk = StreamChunk::with_visibility(ops, columns, vis); self.state_table.write_chunk(chunk.clone()); - commit_data = true; - yield Message::Chunk(chunk); } self.state_table.try_flush().await?; } Message::Barrier(barrier) => { - if commit_data { - // Only commit when we have new data in this epoch. - self.state_table.commit(barrier.epoch).await?; - commit_data = false; - } else { - self.state_table.commit_no_data_expected(barrier.epoch); - } + self.state_table.commit(barrier.epoch).await?; if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(self.ctx.id) { let (_prev_vnode_bitmap, cache_may_stale) = diff --git a/src/stream/src/executor/dynamic_filter.rs b/src/stream/src/executor/dynamic_filter.rs index 14cf7192bd4d..d3299d99e8a3 100644 --- a/src/stream/src/executor/dynamic_filter.rs +++ b/src/stream/src/executor/dynamic_filter.rs @@ -462,17 +462,13 @@ impl DynamicFilterExecutor NowExecutor { last_timestamp = state_row.and_then(|row| row[0].clone()); paused = barrier.is_pause_on_startup(); initialized = true; - } else if paused { - // Assert that no data is updated. - state_table.commit_no_data_expected(barrier.epoch); } else { state_table.commit(barrier.epoch).await?; } diff --git a/src/stream/src/executor/simple_agg.rs b/src/stream/src/executor/simple_agg.rs index f957241a402c..b8915a070dbc 100644 --- a/src/stream/src/executor/simple_agg.rs +++ b/src/stream/src/executor/simple_agg.rs @@ -214,12 +214,6 @@ impl SimpleAggExecutor { this.intermediate_state_table .update_without_old_value(encoded_states); - // Commit all state tables. - futures::future::try_join_all( - this.all_state_tables_mut().map(|table| table.commit(epoch)), - ) - .await?; - // Retrieve modified states and put the changes into the builders. vars.agg_group .build_change(&this.storages, &this.agg_funcs) @@ -227,13 +221,13 @@ impl SimpleAggExecutor { .map(|change| change.to_stream_chunk(&this.info.schema.data_types())) } else { // No state is changed. - // Call commit on state table to increment the epoch. - this.all_state_tables_mut().for_each(|table| { - table.commit_no_data_expected(epoch); - }); None }; + // Commit all state tables. + futures::future::try_join_all(this.all_state_tables_mut().map(|table| table.commit(epoch))) + .await?; + vars.state_changed = false; Ok(chunk) } @@ -241,7 +235,6 @@ impl SimpleAggExecutor { async fn try_flush_data(this: &mut ExecutorInner) -> StreamExecutorResult<()> { futures::future::try_join_all(this.all_state_tables_mut().map(|table| table.try_flush())) .await?; - Ok(()) } diff --git a/src/stream/src/executor/sort.rs b/src/stream/src/executor/sort.rs index 786eabdeabbf..f07967ce4529 100644 --- a/src/stream/src/executor/sort.rs +++ b/src/stream/src/executor/sort.rs @@ -53,7 +53,6 @@ struct ExecutorInner { struct ExecutionVars { buffer: SortBuffer, - buffer_changed: bool, } impl Executor for SortExecutor { @@ -103,7 +102,6 @@ impl SortExecutor { let mut vars = ExecutionVars { buffer: SortBuffer::new(this.sort_column_index, &this.buffer_table), - buffer_changed: false, }; // Populate the sort buffer cache on initialization. @@ -131,7 +129,6 @@ impl SortExecutor { if let Some(chunk) = chunk_builder.take() { yield Message::Chunk(chunk); } - vars.buffer_changed = true; yield Message::Watermark(watermark); } @@ -141,16 +138,10 @@ impl SortExecutor { } Message::Chunk(chunk) => { vars.buffer.apply_chunk(chunk, &mut this.buffer_table); - vars.buffer_changed = true; this.buffer_table.try_flush().await?; } Message::Barrier(barrier) => { - if vars.buffer_changed { - this.buffer_table.commit(barrier.epoch).await?; - } else { - this.buffer_table.commit_no_data_expected(barrier.epoch); - } - vars.buffer_changed = false; + this.buffer_table.commit(barrier.epoch).await?; // Update the vnode bitmap for state tables of all agg calls if asked. if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(this.actor_ctx.id) { diff --git a/src/stream/src/executor/watermark_filter.rs b/src/stream/src/executor/watermark_filter.rs index 43a6ba3add1e..6e9c3a49856d 100644 --- a/src/stream/src/executor/watermark_filter.rs +++ b/src/stream/src/executor/watermark_filter.rs @@ -269,11 +269,10 @@ impl WatermarkFilterExecutor { table.insert(row); } } - table.commit(barrier.epoch).await?; - } else { - table.commit_no_data_expected(barrier.epoch); } + table.commit(barrier.epoch).await?; + if barrier.kind.is_checkpoint() { if idle_input { // Align watermark