Skip to content

Commit

Permalink
only allow cancel if table is not yet created
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Oct 27, 2023
1 parent 407a301 commit a99d25c
Showing 1 changed file with 9 additions and 2 deletions.
11 changes: 9 additions & 2 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::hummock::HummockManagerRef;
use crate::manager::{CatalogManagerRef, FragmentManagerRef, WorkerId};
use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments};
use crate::stream::{build_actor_connector_splits, SourceManagerRef, SplitAssignment};
use crate::MetaResult;
use crate::{MetaError, MetaResult};

/// [`Reschedule`] is for the [`Command::RescheduleFragment`], which is used for rescheduling actors
/// in some fragment, like scaling or migrating.
Expand Down Expand Up @@ -668,6 +668,14 @@ impl CommandContext {
}

Command::CancelStreamingJob(table_fragments) => {
let table_id = table_fragments.table_id().table_id;
if self.catalog_manager.table_is_created(table_id).await {
return Err(MetaError::invalid_parameter(format!(
"table is already created id={:#?}",
table_id
)));
}
tracing::debug!(id = ?table_fragments.table_id(), "cancelling stream job");
let node_actors = table_fragments.worker_actor_ids();
self.clean_up(node_actors).await?;

Expand All @@ -680,7 +688,6 @@ impl CommandContext {
// It won't clean the tables on failure,
// since the failure could be recoverable.
// As such it needs to be handled here.
let table_id = table_fragments.table_id().table_id;
let mut table_ids = table_fragments.internal_table_ids();
table_ids.push(table_id);
if let Err(e) = self.hummock_manager.unregister_table_ids(&table_ids).await {
Expand Down

0 comments on commit a99d25c

Please sign in to comment.