Skip to content

Commit

Permalink
resolve comments, add cluster version info and fix some corner cases
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 committed Apr 1, 2024
1 parent fb2e799 commit 57da886
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 10 deletions.
5 changes: 5 additions & 0 deletions e2e_test/sql_migration/check.slt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ public
rw_catalog
schema1

query T
SELECT setting FROM pg_catalog.pg_settings where name = 'max_concurrent_creating_streaming_jobs';
----
4

query T rowsort
select name, relation_type from rw_relations where relation_type != 'system table' AND relation_type != 'view';
----
Expand Down
3 changes: 3 additions & 0 deletions e2e_test/sql_migration/prepare.slt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ create database db1;
statement ok
create schema schema1;

statement ok
ALTER SYSTEM SET max_concurrent_creating_streaming_jobs TO 4;

statement ok
create source src (v int) with (
connector = 'datagen',
Expand Down
8 changes: 1 addition & 7 deletions src/ctl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,7 @@ risingwave_pb = { workspace = true }
risingwave_rpc_client = { workspace = true }
risingwave_storage = { workspace = true }
risingwave_stream = { workspace = true }
sea-orm = { version = "0.12.14", features = [
"sqlx-mysql",
"sqlx-postgres",
"sqlx-sqlite",
"runtime-tokio-native-tls",
"macros",
] }
sea-orm = { workspace = true }
serde = "1"
serde_json = "1"
serde_yaml = "0.9.25"
Expand Down
14 changes: 11 additions & 3 deletions src/ctl/src/cmd_impl/meta/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,10 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
owner_id: Set(object.owner as _),
database_id: Set(Some(*db_rewrite.get(&object.database_id).unwrap() as _)),
schema_id: Set(Some(*schema_rewrite.get(&object.schema_id).unwrap() as _)),
initialized_at_cluster_version: Set(object
.initialized_at_cluster_version
.clone()),
created_at_cluster_version: Set(object.created_at_cluster_version.clone()),
..Default::default()
};
if let Some(epoch) = object.initialized_at_epoch.map(Epoch::from) {
Expand Down Expand Up @@ -354,6 +358,8 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
owner_id: Set(table.owner as _),
database_id: Set(Some(*db_rewrite.get(&table.database_id).unwrap() as _)),
schema_id: Set(Some(*schema_rewrite.get(&table.schema_id).unwrap() as _)),
initialized_at_cluster_version: Set(table.initialized_at_cluster_version.clone()),
created_at_cluster_version: Set(table.created_at_cluster_version.clone()),
..Default::default()
};
if let Some(epoch) = table.initialized_at_epoch.map(Epoch::from) {
Expand Down Expand Up @@ -567,9 +573,11 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
println!("subscriptions migrated");

// object_dependency
ObjectDependency::insert_many(object_dependencies)
.exec(&meta_store_sql.conn)
.await?;
if !object_dependencies.is_empty() {
ObjectDependency::insert_many(object_dependencies)
.exec(&meta_store_sql.conn)
.await?;
}
println!("object dependencies migrated");

// user privilege
Expand Down

0 comments on commit 57da886

Please sign in to comment.