Skip to content

Commit

Permalink
fix: avoid non deterministic initialization of fragments for new crea…
Browse files Browse the repository at this point in the history
…te job
  • Loading branch information
yezizp2012 committed Feb 5, 2024
1 parent 823382b commit 7eefa67
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 17 deletions.
35 changes: 22 additions & 13 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,21 +298,15 @@ impl CatalogController {
let inner = self.inner.write().await;
let txn = inner.db.begin().await?;

// Add fragments, actors and actor dispatchers.
for (fragment, actors, actor_dispatchers) in fragment_actors {
// Add fragments.
let (fragments, actor_with_dispatchers): (Vec<_>, Vec<_>) = fragment_actors
.into_iter()
.map(|(fragment, actors, actor_dispatchers)| (fragment, (actors, actor_dispatchers)))
.unzip();
for fragment in fragments {
let fragment = fragment.into_active_model();
let fragment = fragment.insert(&txn).await?;
for actor in actors {
let actor = actor.into_active_model();
actor.insert(&txn).await?;
}
for (_, actor_dispatchers) in actor_dispatchers {
for actor_dispatcher in actor_dispatchers {
let mut actor_dispatcher = actor_dispatcher.into_active_model();
actor_dispatcher.id = NotSet;
actor_dispatcher.insert(&txn).await?;
}
}

// Update fragment id for all state tables.
if !for_replace {
for state_table_id in fragment.state_table_ids.into_inner() {
Expand All @@ -327,6 +321,21 @@ impl CatalogController {
}
}

// Add actors and actor dispatchers.
for (actors, actor_dispatchers) in actor_with_dispatchers {
for actor in actors {
let actor = actor.into_active_model();
actor.insert(&txn).await?;
}
for (_, actor_dispatchers) in actor_dispatchers {
for actor_dispatcher in actor_dispatchers {
let mut actor_dispatcher = actor_dispatcher.into_active_model();
actor_dispatcher.id = NotSet;
actor_dispatcher.insert(&txn).await?;
}
}
}

if !for_replace {
// // Update dml fragment id.
if let StreamingJob::Table(_, table, ..) = streaming_job {
Expand Down
7 changes: 3 additions & 4 deletions src/meta/src/controller/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::controller::utils::{
extract_grant_obj_id, get_referring_privileges_cascade, get_user_privilege,
list_user_info_by_ids, PartialUserPrivilege,
};
use crate::manager::NotificationVersion;
use crate::manager::{NotificationVersion, IGNORED_NOTIFICATION_VERSION};
use crate::{MetaError, MetaResult};

impl CatalogController {
Expand Down Expand Up @@ -402,9 +402,8 @@ impl CatalogController {
);
}
if root_user_privileges.is_empty() {
return Err(MetaError::invalid_parameter(
"no privilege to revoke".to_string(),
));
tracing::warn!("no privilege to revoke, ignore it");
return Ok(IGNORED_NOTIFICATION_VERSION);
}

// check if the user granted any privileges to other users.
Expand Down

0 comments on commit 7eefa67

Please sign in to comment.