diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 7b9426ec78559..8afc5ab6c836c 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -298,21 +298,15 @@ impl CatalogController { let inner = self.inner.write().await; let txn = inner.db.begin().await?; - // Add fragments, actors and actor dispatchers. - for (fragment, actors, actor_dispatchers) in fragment_actors { + // Add fragments. + let (fragments, actor_with_dispatchers): (Vec<_>, Vec<_>) = fragment_actors + .into_iter() + .map(|(fragment, actors, actor_dispatchers)| (fragment, (actors, actor_dispatchers))) + .unzip(); + for fragment in fragments { let fragment = fragment.into_active_model(); let fragment = fragment.insert(&txn).await?; - for actor in actors { - let actor = actor.into_active_model(); - actor.insert(&txn).await?; - } - for (_, actor_dispatchers) in actor_dispatchers { - for actor_dispatcher in actor_dispatchers { - let mut actor_dispatcher = actor_dispatcher.into_active_model(); - actor_dispatcher.id = NotSet; - actor_dispatcher.insert(&txn).await?; - } - } + // Update fragment id for all state tables. if !for_replace { for state_table_id in fragment.state_table_ids.into_inner() { @@ -327,6 +321,21 @@ impl CatalogController { } } + // Add actors and actor dispatchers. + for (actors, actor_dispatchers) in actor_with_dispatchers { + for actor in actors { + let actor = actor.into_active_model(); + actor.insert(&txn).await?; + } + for (_, actor_dispatchers) in actor_dispatchers { + for actor_dispatcher in actor_dispatchers { + let mut actor_dispatcher = actor_dispatcher.into_active_model(); + actor_dispatcher.id = NotSet; + actor_dispatcher.insert(&txn).await?; + } + } + } + if !for_replace { // // Update dml fragment id. if let StreamingJob::Table(_, table, ..) = streaming_job { diff --git a/src/meta/src/controller/user.rs b/src/meta/src/controller/user.rs index 1444fa2b0e41a..19afd724a82b0 100644 --- a/src/meta/src/controller/user.rs +++ b/src/meta/src/controller/user.rs @@ -37,7 +37,7 @@ use crate::controller::utils::{ extract_grant_obj_id, get_referring_privileges_cascade, get_user_privilege, list_user_info_by_ids, PartialUserPrivilege, }; -use crate::manager::NotificationVersion; +use crate::manager::{NotificationVersion, IGNORED_NOTIFICATION_VERSION}; use crate::{MetaError, MetaResult}; impl CatalogController { @@ -402,9 +402,8 @@ impl CatalogController { ); } if root_user_privileges.is_empty() { - return Err(MetaError::invalid_parameter( - "no privilege to revoke".to_string(), - )); + tracing::warn!("no privilege to revoke, ignore it"); + return Ok(IGNORED_NOTIFICATION_VERSION); } // check if the user granted any privileges to other users.