From 59e80194d90051f0c8d4bf8dc1f08e3fdda1b52f Mon Sep 17 00:00:00 2001 From: August Date: Mon, 16 Dec 2024 20:59:48 +0800 Subject: [PATCH 1/4] fix: fix missing rewrite of table fragments during migration to sql backend --- src/ctl/src/cmd_impl/meta/migration.rs | 73 ++++++++++++++++++++++++-- 1 file changed, 70 insertions(+), 3 deletions(-) diff --git a/src/ctl/src/cmd_impl/meta/migration.rs b/src/ctl/src/cmd_impl/meta/migration.rs index 619065543a59b..6930195c85fa1 100644 --- a/src/ctl/src/cmd_impl/meta/migration.rs +++ b/src/ctl/src/cmd_impl/meta/migration.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeSet, HashMap, HashSet}; +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::time::Duration; use anyhow::Context; @@ -59,7 +59,7 @@ use risingwave_meta_model_v2::{ use risingwave_pb::catalog::table::PbTableType; use risingwave_pb::catalog::{ PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource, - PbSubscription, PbTable, PbView, + PbStreamSourceInfo, PbSubscription, PbTable, PbView, }; use risingwave_pb::common::WorkerType; use risingwave_pb::hummock::{ @@ -67,6 +67,9 @@ use risingwave_pb::hummock::{ }; use risingwave_pb::meta::table_fragments::State; use risingwave_pb::meta::PbSystemParams; +use risingwave_pb::secret::PbSecretRef; +use risingwave_pb::stream_plan::stream_node::PbNodeBody; +use risingwave_pb::stream_plan::PbStreamNode; use risingwave_pb::user::grant_privilege::PbObject as GrantObject; use risingwave_pb::user::PbUserInfo; use sea_orm::ActiveValue::Set; @@ -468,6 +471,9 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an table.database_id = *db_rewrite.get(&table.database_id).unwrap(); table.schema_id = *schema_rewrite.get(&table.schema_id).unwrap(); }); + // rewrite secret ids. + visit_secret_ref_mut(&mut stream_node, &secret_rewrite); + let mut fragment = fragment.into_active_model(); fragment.stream_node = Set((&stream_node).into()); Fragment::insert(fragment) @@ -561,7 +567,12 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an if let Some(info) = &mut src.info { for secret_ref in info.format_encode_secret_refs.values_mut() { secret_ref.secret_id = *secret_rewrite.get(&secret_ref.secret_id).unwrap(); - dependent_secret_ids.insert(secret_ref.secret_id); + } + if let Some(table) = &mut info.external_table { + for secret_ref in table.secret_refs.values_mut() { + secret_ref.secret_id = + *secret_rewrite.get(&secret_ref.secret_id).unwrap(); + } } } object_dependencies.extend(dependent_secret_ids.into_iter().map(|secret_id| { @@ -968,6 +979,62 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an Ok(()) } +fn visit_secret_ref_mut(stream_node: &mut PbStreamNode, secret_rewrite: &HashMap) { + let visit_map_secret_refs = |refs: &mut BTreeMap| { + for secret_ref in refs.values_mut() { + secret_ref.secret_id = *secret_rewrite.get(&secret_ref.secret_id).unwrap(); + } + }; + let visit_info = |info: &mut PbStreamSourceInfo| { + for secret_ref in info.format_encode_secret_refs.values_mut() { + secret_ref.secret_id = *secret_rewrite.get(&secret_ref.secret_id).unwrap(); + } + if let Some(table) = &mut info.external_table { + for secret_ref in table.secret_refs.values_mut() { + secret_ref.secret_id = *secret_rewrite.get(&secret_ref.secret_id).unwrap(); + } + } + }; + + let visit_body = |body: &mut PbNodeBody| match body { + PbNodeBody::Source(node) => { + if let Some(inner) = &mut node.source_inner { + visit_map_secret_refs(&mut inner.secret_refs); + inner.info.as_mut().map(visit_info); + } + } + PbNodeBody::Sink(node) => { + if let Some(desc) = &mut node.sink_desc { + visit_map_secret_refs(&mut desc.secret_refs); + desc.format_desc + .as_mut() + .map(|desc| visit_map_secret_refs(&mut desc.secret_refs)); + } + } + PbNodeBody::StreamFsFetch(node) => { + if let Some(inner) = &mut node.node_inner { + visit_map_secret_refs(&mut inner.secret_refs); + inner.info.as_mut().map(visit_info); + } + } + PbNodeBody::StreamCdcScan(node) => { + node.cdc_table_desc + .as_mut() + .map(|desc| visit_map_secret_refs(&mut desc.secret_refs)); + } + PbNodeBody::CdcFilter(_) => {} + PbNodeBody::SourceBackfill(node) => { + visit_map_secret_refs(&mut node.secret_refs); + node.info.as_mut().map(visit_info); + } + _ => {} + }; + visit_body(stream_node.node_body.as_mut().unwrap()); + for input in &mut stream_node.input { + visit_secret_ref_mut(input, secret_rewrite); + } +} + async fn load_current_id(meta_store: &MetaStoreRef, category: &str, start: Option) -> u64 { let category_gen_key = format!("{}_id_next_generator", category); let res = meta_store From 6b3785a69b15bacabac502e0c2a557d398409dfd Mon Sep 17 00:00:00 2001 From: August Date: Tue, 17 Dec 2024 18:47:51 +0800 Subject: [PATCH 2/4] clippy --- src/ctl/src/cmd_impl/meta/migration.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/ctl/src/cmd_impl/meta/migration.rs b/src/ctl/src/cmd_impl/meta/migration.rs index 6930195c85fa1..4b1d88ce02a63 100644 --- a/src/ctl/src/cmd_impl/meta/migration.rs +++ b/src/ctl/src/cmd_impl/meta/migration.rs @@ -1006,9 +1006,9 @@ fn visit_secret_ref_mut(stream_node: &mut PbStreamNode, secret_rewrite: &HashMap PbNodeBody::Sink(node) => { if let Some(desc) = &mut node.sink_desc { visit_map_secret_refs(&mut desc.secret_refs); - desc.format_desc - .as_mut() - .map(|desc| visit_map_secret_refs(&mut desc.secret_refs)); + if let Some(desc) = &mut desc.format_desc { + visit_map_secret_refs(&mut desc.secret_refs) + } } } PbNodeBody::StreamFsFetch(node) => { @@ -1018,11 +1018,10 @@ fn visit_secret_ref_mut(stream_node: &mut PbStreamNode, secret_rewrite: &HashMap } } PbNodeBody::StreamCdcScan(node) => { - node.cdc_table_desc - .as_mut() - .map(|desc| visit_map_secret_refs(&mut desc.secret_refs)); + if let Some(desc) = &mut node.cdc_table_desc { + visit_map_secret_refs(&mut desc.secret_refs) + } } - PbNodeBody::CdcFilter(_) => {} PbNodeBody::SourceBackfill(node) => { visit_map_secret_refs(&mut node.secret_refs); node.info.as_mut().map(visit_info); From 6f29822aec17312a6bd0ec58cd4aceacd62fd153 Mon Sep 17 00:00:00 2001 From: August Date: Tue, 17 Dec 2024 19:15:31 +0800 Subject: [PATCH 3/4] fix schema id not found --- src/ctl/src/cmd_impl/meta/migration.rs | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/src/ctl/src/cmd_impl/meta/migration.rs b/src/ctl/src/cmd_impl/meta/migration.rs index 4b1d88ce02a63..8d36881c47231 100644 --- a/src/ctl/src/cmd_impl/meta/migration.rs +++ b/src/ctl/src/cmd_impl/meta/migration.rs @@ -469,7 +469,9 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an let mut stream_node = fragment.stream_node.to_protobuf(); visit_stream_node_tables(&mut stream_node, |table, _| { table.database_id = *db_rewrite.get(&table.database_id).unwrap(); - table.schema_id = *schema_rewrite.get(&table.schema_id).unwrap(); + // Note: The schema could be altered and not updated here. + // It's safe to leave it as 0 if not found. + table.schema_id = *schema_rewrite.get(&table.schema_id).unwrap_or(0); }); // rewrite secret ids. visit_secret_ref_mut(&mut stream_node, &secret_rewrite); @@ -718,9 +720,9 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an for user in users { for gp in user.grant_privileges { let id = match gp.get_object()? { - GrantObject::DatabaseId(id) => *db_rewrite.get(id).unwrap(), - GrantObject::SchemaId(id) => *schema_rewrite.get(id).unwrap(), - GrantObject::FunctionId(id) => *function_rewrite.get(id).unwrap(), + GrantObject::DatabaseId(id) => *db_rewrite.get(id).unwrap_or(0), + GrantObject::SchemaId(id) => *schema_rewrite.get(id).unwrap_or(0), + GrantObject::FunctionId(id) => *function_rewrite.get(id).unwrap_or(0), GrantObject::TableId(id) | GrantObject::SourceId(id) | GrantObject::SinkId(id) @@ -728,6 +730,14 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an | GrantObject::SubscriptionId(id) => *id, ty => unreachable!("invalid object type: {:?}", ty), }; + if id == 0 { + tracing::warn!("grant object not found"); + continue; + } else if !inuse_obj_ids.contains(&id) { + tracing::warn!("grant object id {} not found", id); + continue; + } + for action_with_opt in &gp.action_with_opts { privileges.push(user_privilege::ActiveModel { user_id: Set(user.id as _), From daba68828571b16bf53bde6f1f3d47be7a32d00f Mon Sep 17 00:00:00 2001 From: August Date: Tue, 17 Dec 2024 19:55:00 +0800 Subject: [PATCH 4/4] fmt --- src/ctl/src/cmd_impl/meta/migration.rs | 30 +++++++++++++------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/src/ctl/src/cmd_impl/meta/migration.rs b/src/ctl/src/cmd_impl/meta/migration.rs index 8d36881c47231..e23aee944e188 100644 --- a/src/ctl/src/cmd_impl/meta/migration.rs +++ b/src/ctl/src/cmd_impl/meta/migration.rs @@ -201,15 +201,15 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an .chain(subscriptions.iter().map(|s| s.id)) .collect::>(); - // Helper function to get next available id. - let mut next_available_id = || -> u32 { - let id = inuse_obj_ids + // Helper function to get the next available id. + let next_available_id = |objs: &mut BTreeSet| -> u32 { + let id = objs .iter() .enumerate() .find(|(i, id)| i + 1 != **id as usize) .map(|(i, _)| i + 1) - .unwrap_or(inuse_obj_ids.len() + 1) as u32; - inuse_obj_ids.insert(id); + .unwrap_or(objs.len() + 1) as u32; + objs.insert(id); id }; @@ -236,7 +236,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an // database let mut db_rewrite = HashMap::new(); for mut db in databases { - let id = next_available_id(); + let id = next_available_id(&mut inuse_obj_ids); db_rewrite.insert(db.id, id); db.id = id as _; @@ -256,7 +256,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an // schema let mut schema_rewrite = HashMap::new(); for mut schema in schemas { - let id = next_available_id(); + let id = next_available_id(&mut inuse_obj_ids); schema_rewrite.insert(schema.id, id); schema.id = id as _; @@ -277,7 +277,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an // function let mut function_rewrite = HashMap::new(); for mut function in functions { - let id = next_available_id(); + let id = next_available_id(&mut inuse_obj_ids); function_rewrite.insert(function.id, id); function.id = id as _; @@ -299,7 +299,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an // connection mapping let mut connection_rewrite = HashMap::new(); for mut connection in connections { - let id = next_available_id(); + let id = next_available_id(&mut inuse_obj_ids); connection_rewrite.insert(connection.id, id); connection.id = id as _; @@ -323,7 +323,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an // secret mapping let mut secret_rewrite = HashMap::new(); for mut secret in secrets { - let id = next_available_id(); + let id = next_available_id(&mut inuse_obj_ids); secret_rewrite.insert(secret.id, id); secret.id = id as _; @@ -471,7 +471,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an table.database_id = *db_rewrite.get(&table.database_id).unwrap(); // Note: The schema could be altered and not updated here. // It's safe to leave it as 0 if not found. - table.schema_id = *schema_rewrite.get(&table.schema_id).unwrap_or(0); + table.schema_id = *schema_rewrite.get(&table.schema_id).unwrap_or(&0); }); // rewrite secret ids. visit_secret_ref_mut(&mut stream_node, &secret_rewrite); @@ -720,9 +720,9 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an for user in users { for gp in user.grant_privileges { let id = match gp.get_object()? { - GrantObject::DatabaseId(id) => *db_rewrite.get(id).unwrap_or(0), - GrantObject::SchemaId(id) => *schema_rewrite.get(id).unwrap_or(0), - GrantObject::FunctionId(id) => *function_rewrite.get(id).unwrap_or(0), + GrantObject::DatabaseId(id) => *db_rewrite.get(id).unwrap_or(&0), + GrantObject::SchemaId(id) => *schema_rewrite.get(id).unwrap_or(&0), + GrantObject::FunctionId(id) => *function_rewrite.get(id).unwrap_or(&0), GrantObject::TableId(id) | GrantObject::SourceId(id) | GrantObject::SinkId(id) @@ -944,7 +944,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an format!("ALTER TABLE worker AUTO_INCREMENT = {next_worker_id};"), )) .await?; - let next_object_id = next_available_id(); + let next_object_id = next_available_id(&mut inuse_obj_ids); meta_store_sql .conn .execute(Statement::from_string(