Skip to content

Commit

Permalink
refactor(meta): merge drop and cancel streaming job command (#19369)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Nov 14, 2024
1 parent c5c2119 commit 9aded71
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 72 deletions.
28 changes: 17 additions & 11 deletions src/meta/src/barrier/checkpoint/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -758,20 +758,26 @@ impl DatabaseCheckpointControl {
(None, vec![])
};

if let Some(table_to_cancel) = command.as_ref().and_then(Command::table_to_cancel)
&& self
for table_to_cancel in command
.as_ref()
.map(Command::tables_to_drop)
.into_iter()
.flatten()
{
if self
.creating_streaming_job_controls
.contains_key(&table_to_cancel)
{
warn!(
table_id = table_to_cancel.table_id,
"ignore cancel command on creating streaming job"
);
for notifier in notifiers {
notifier
.notify_start_failed(anyhow!("cannot cancel creating streaming job, the job will continue creating until created or recovery").into());
{
warn!(
table_id = table_to_cancel.table_id,
"ignore cancel command on creating streaming job"
);
for notifier in notifiers {
notifier
.notify_start_failed(anyhow!("cannot cancel creating streaming job, the job will continue creating until created or recovery").into());
}
return Ok(());
}
return Ok(());
}

if let Some(Command::RescheduleFragment { .. }) = &command {
Expand Down
39 changes: 20 additions & 19 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ pub enum Command {
/// After the barrier is collected, it notifies the local stream manager of compute nodes to
/// drop actors, and then delete the table fragments info from meta store.
DropStreamingJobs {
table_fragments_ids: HashSet<TableId>,
actors: Vec<ActorId>,
unregistered_state_table_ids: HashSet<TableId>,
unregistered_fragment_ids: HashSet<FragmentId>,
Expand All @@ -253,11 +254,6 @@ pub enum Command {
MergeSnapshotBackfillStreamingJobs(
HashMap<TableId, (SnapshotBackfillInfo, InflightStreamingJobInfo)>,
),
/// `CancelStreamingJob` command generates a `Stop` barrier including the actors of the given
/// table fragment.
///
/// The collecting and cleaning part works exactly the same as `DropStreamingJobs` command.
CancelStreamingJob(TableFragments),

/// `Reschedule` command generates a `Update` barrier by the [`Reschedule`] of each fragment.
/// Mainly used for scaling and migration.
Expand Down Expand Up @@ -313,6 +309,18 @@ impl Command {
Self::Resume(reason)
}

pub fn cancel(table_fragments: &TableFragments) -> Self {
Self::DropStreamingJobs {
table_fragments_ids: HashSet::from_iter([table_fragments.table_id()]),
actors: table_fragments.actor_ids(),
unregistered_state_table_ids: table_fragments
.all_table_ids()
.map(TableId::new)
.collect(),
unregistered_fragment_ids: table_fragments.fragment_ids().collect(),
}
}

pub(crate) fn fragment_changes(&self) -> Option<HashMap<FragmentId, CommandFragmentChanges>> {
match self {
Command::Flush => None,
Expand Down Expand Up @@ -352,13 +360,6 @@ impl Command {

Some(changes)
}
Command::CancelStreamingJob(table_fragments) => Some(
table_fragments
.fragments
.values()
.map(|fragment| (fragment.fragment_id, CommandFragmentChanges::RemoveFragment))
.collect(),
),
Command::RescheduleFragment { reschedules, .. } => Some(
reschedules
.iter()
Expand Down Expand Up @@ -726,11 +727,6 @@ impl Command {
}))
}

Command::CancelStreamingJob(table_fragments) => {
let actors = table_fragments.actor_ids();
Some(Mutation::Stop(StopMutation { actors }))
}

Command::ReplaceTable(ReplaceTablePlan {
old_table_fragments,
merge_updates,
Expand Down Expand Up @@ -1013,10 +1009,15 @@ impl Command {
}

/// For `CancelStreamingJob`, returns the table id of the target table.
pub fn table_to_cancel(&self) -> Option<TableId> {
pub fn tables_to_drop(&self) -> impl Iterator<Item = TableId> + '_ {
match self {
Command::CancelStreamingJob(table_fragments) => Some(table_fragments.table_id()),
Command::DropStreamingJobs {
table_fragments_ids,
..
} => Some(table_fragments_ids.iter().cloned()),
_ => None,
}
.into_iter()
.flatten()
}
}
19 changes: 0 additions & 19 deletions src/meta/src/barrier/context/context_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::sync::Arc;

use futures::future::try_join_all;
use risingwave_common::catalog::TableId;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::meta::PausedReason;
Expand Down Expand Up @@ -163,24 +162,6 @@ impl CommandContext {
.await?;
}

Command::CancelStreamingJob(table_fragments) => {
tracing::debug!(id = ?table_fragments.table_id(), "cancelling stream job");

// NOTE(kwannoel): At this point, meta has already registered the table ids.
// We should unregister them.
// This is required for background ddl, for foreground ddl this is a no-op.
// Foreground ddl is handled entirely by stream manager, so it will unregister
// the table ids on failure.
// On the other hand background ddl could be handled by barrier manager.
// It won't clean the tables on failure,
// since the failure could be recoverable.
// As such it needs to be handled here.
barrier_manager_context
.hummock_manager
.unregister_table_ids(table_fragments.all_table_ids().map(TableId::new))
.await?;
}

Command::CreateStreamingJob { info, job_type } => {
let CreateStreamingJobCommandInfo {
table_fragments,
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/barrier/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ impl CreateMviewProgressTracker {
.flat_map(|resp| resp.create_mview_progress.iter()),
version_stats,
);
if let Some(table_id) = command.and_then(Command::table_to_cancel) {
for table_id in command.map(Command::tables_to_drop).into_iter().flatten() {
// the cancelled command is possibly stashed in `finished_commands` and waiting
// for checkpoint, we should also clear it.
self.cancel_command(table_id);
Expand Down
25 changes: 8 additions & 17 deletions src/meta/src/barrier/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{HashSet, VecDeque};
use std::collections::VecDeque;
use std::iter::once;
use std::sync::Arc;
use std::time::{Duration, Instant};
Expand All @@ -32,7 +32,6 @@ use super::notifier::Notifier;
use super::{Command, Scheduled};
use crate::barrier::context::GlobalBarrierWorkerContext;
use crate::hummock::HummockManagerRef;
use crate::model::ActorId;
use crate::rpc::metrics::MetaMetrics;
use crate::{MetaError, MetaResult};

Expand Down Expand Up @@ -106,9 +105,7 @@ impl ScheduledQueue {
if let QueueStatus::Blocked(reason) = &self.status
&& !matches!(
scheduled.command,
Command::DropStreamingJobs { .. }
| Command::CancelStreamingJob(_)
| Command::DropSubscription { .. }
Command::DropStreamingJobs { .. } | Command::DropSubscription { .. }
)
{
return Err(MetaError::unavailable(reason));
Expand Down Expand Up @@ -400,9 +397,7 @@ impl ScheduledBarriers {
impl ScheduledBarriers {
/// Pre buffered drop and cancel command, return true if any.
pub(super) fn pre_apply_drop_cancel(&self) -> bool {
let (dropped_actors, cancelled) = self.pre_apply_drop_cancel_scheduled();

!dropped_actors.is_empty() || !cancelled.is_empty()
self.pre_apply_drop_cancel_scheduled()
}

/// Mark command scheduler as blocked and abort all queued scheduled command and notify with
Expand All @@ -425,22 +420,18 @@ impl ScheduledBarriers {

/// Try to pre apply drop and cancel scheduled command and return them if any.
/// It should only be called in recovery.
pub(super) fn pre_apply_drop_cancel_scheduled(&self) -> (Vec<ActorId>, HashSet<TableId>) {
pub(super) fn pre_apply_drop_cancel_scheduled(&self) -> bool {
let mut queue = self.inner.queue.lock();
assert_matches!(queue.status, QueueStatus::Blocked(_));
let (mut dropped_actors, mut cancel_table_ids) = (vec![], HashSet::new());
let mut applied = false;

while let Some(ScheduledQueueItem {
notifiers, command, ..
}) = queue.queue.pop_front()
{
match command {
Command::DropStreamingJobs { actors, .. } => {
dropped_actors.extend(actors);
}
Command::CancelStreamingJob(table_fragments) => {
let table_id = table_fragments.table_id();
cancel_table_ids.insert(table_id);
Command::DropStreamingJobs { .. } => {
applied = true;
}
Command::DropSubscription { .. } => {}
_ => {
Expand All @@ -451,7 +442,7 @@ impl ScheduledBarriers {
notify.notify_collected();
});
}
(dropped_actors, cancel_table_ids)
applied
}
}

Expand Down
11 changes: 6 additions & 5 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,7 @@ impl GlobalStreamManager {
.await?;

self.barrier_scheduler
.run_command(
database_id,
Command::CancelStreamingJob(table_fragments),
)
.run_command(database_id, Command::cancel(&table_fragments))
.await?;
} else {
// streaming job is already completed.
Expand Down Expand Up @@ -514,6 +511,10 @@ impl GlobalStreamManager {
.run_command(
database_id,
Command::DropStreamingJobs {
table_fragments_ids: streaming_job_ids
.iter()
.map(|job_id| TableId::new(*job_id as _))
.collect(),
actors: removed_actors,
unregistered_state_table_ids: state_table_ids
.into_iter()
Expand Down Expand Up @@ -576,7 +577,7 @@ impl GlobalStreamManager {

if let Some(database_id) = database_id {
self.barrier_scheduler
.run_command(DatabaseId::new(database_id as _), Command::CancelStreamingJob(fragment))
.run_command(DatabaseId::new(database_id as _), Command::cancel(&fragment))
.await?;
}
};
Expand Down

0 comments on commit 9aded71

Please sign in to comment.