From 57da88643f800b8ed505dcafee35aa92a51cd634 Mon Sep 17 00:00:00 2001 From: August Date: Mon, 1 Apr 2024 17:11:10 +0800 Subject: [PATCH] resolve comments, add cluster version info and fix some corner cases --- e2e_test/sql_migration/check.slt | 5 +++++ e2e_test/sql_migration/prepare.slt | 3 +++ src/ctl/Cargo.toml | 8 +------- src/ctl/src/cmd_impl/meta/migration.rs | 14 +++++++++++--- 4 files changed, 20 insertions(+), 10 deletions(-) diff --git a/e2e_test/sql_migration/check.slt b/e2e_test/sql_migration/check.slt index 95667adc236a0..b4c97bba50bf1 100644 --- a/e2e_test/sql_migration/check.slt +++ b/e2e_test/sql_migration/check.slt @@ -16,6 +16,11 @@ public rw_catalog schema1 +query T +SELECT setting FROM pg_catalog.pg_settings where name = 'max_concurrent_creating_streaming_jobs'; +---- +4 + query T rowsort select name, relation_type from rw_relations where relation_type != 'system table' AND relation_type != 'view'; ---- diff --git a/e2e_test/sql_migration/prepare.slt b/e2e_test/sql_migration/prepare.slt index be2fce5993929..f0669a4c6b297 100644 --- a/e2e_test/sql_migration/prepare.slt +++ b/e2e_test/sql_migration/prepare.slt @@ -7,6 +7,9 @@ create database db1; statement ok create schema schema1; +statement ok +ALTER SYSTEM SET max_concurrent_creating_streaming_jobs TO 4; + statement ok create source src (v int) with ( connector = 'datagen', diff --git a/src/ctl/Cargo.toml b/src/ctl/Cargo.toml index f95cab9b04a6a..20af56afb98e4 100644 --- a/src/ctl/Cargo.toml +++ b/src/ctl/Cargo.toml @@ -39,13 +39,7 @@ risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } risingwave_storage = { workspace = true } risingwave_stream = { workspace = true } -sea-orm = { version = "0.12.14", features = [ - "sqlx-mysql", - "sqlx-postgres", - "sqlx-sqlite", - "runtime-tokio-native-tls", - "macros", -] } +sea-orm = { workspace = true } serde = "1" serde_json = "1" serde_yaml = "0.9.25" diff --git a/src/ctl/src/cmd_impl/meta/migration.rs b/src/ctl/src/cmd_impl/meta/migration.rs index 8231cd4726e9b..693d24a2cf0be 100644 --- a/src/ctl/src/cmd_impl/meta/migration.rs +++ b/src/ctl/src/cmd_impl/meta/migration.rs @@ -325,6 +325,10 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an owner_id: Set(object.owner as _), database_id: Set(Some(*db_rewrite.get(&object.database_id).unwrap() as _)), schema_id: Set(Some(*schema_rewrite.get(&object.schema_id).unwrap() as _)), + initialized_at_cluster_version: Set(object + .initialized_at_cluster_version + .clone()), + created_at_cluster_version: Set(object.created_at_cluster_version.clone()), ..Default::default() }; if let Some(epoch) = object.initialized_at_epoch.map(Epoch::from) { @@ -354,6 +358,8 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an owner_id: Set(table.owner as _), database_id: Set(Some(*db_rewrite.get(&table.database_id).unwrap() as _)), schema_id: Set(Some(*schema_rewrite.get(&table.schema_id).unwrap() as _)), + initialized_at_cluster_version: Set(table.initialized_at_cluster_version.clone()), + created_at_cluster_version: Set(table.created_at_cluster_version.clone()), ..Default::default() }; if let Some(epoch) = table.initialized_at_epoch.map(Epoch::from) { @@ -567,9 +573,11 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an println!("subscriptions migrated"); // object_dependency - ObjectDependency::insert_many(object_dependencies) - .exec(&meta_store_sql.conn) - .await?; + if !object_dependencies.is_empty() { + ObjectDependency::insert_many(object_dependencies) + .exec(&meta_store_sql.conn) + .await?; + } println!("object dependencies migrated"); // user privilege