Skip to content

Commit

Permalink
Merge branch 'yiming/finished-mv-in-tracker' into yiming/non-async-co…
Browse files Browse the repository at this point in the history
…mplete-barrier
  • Loading branch information
wenym1 committed Feb 20, 2024
2 parents 9055c21 + 2f28b42 commit abff967
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 106 deletions.
9 changes: 0 additions & 9 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,15 +709,6 @@ impl CommandContext {
}
}

/// For `CancelStreamingJob`, returns the actors of the `StreamScan` nodes. For other commands,
/// returns an empty set.
pub fn actors_to_cancel(&self) -> HashSet<ActorId> {
match &self.command {
Command::CancelStreamingJob(table_fragments) => table_fragments.backfill_actor_ids(),
_ => Default::default(),
}
}

/// For `CancelStreamingJob`, returns the table id of the target table.
pub fn table_to_cancel(&self) -> Option<TableId> {
match &self.command {
Expand Down
84 changes: 8 additions & 76 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use self::notifier::Notifier;
use self::progress::TrackingCommand;
use crate::barrier::info::InflightActorInfo;
use crate::barrier::notifier::BarrierInfo;
use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob};
use crate::barrier::progress::CreateMviewProgressTracker;
use crate::barrier::rpc::BarrierCollectFuture;
use crate::barrier::state::BarrierManagerState;
use crate::hummock::{CommitEpochInfo, HummockManagerRef};
Expand Down Expand Up @@ -214,55 +214,7 @@ impl CheckpointControl {
context,
}
}
}

impl CreateMviewProgressTracker {
/// Stash a command to finish later.
fn stash_command_to_finish(&mut self, finished_job: TrackingJob) {
self.finished_jobs.push(finished_job);
}

/// Finish stashed jobs.
/// If checkpoint, means all jobs can be finished.
/// If not checkpoint, jobs which do not require checkpoint can be finished.
///
/// Returns whether there are still remaining stashed jobs to finish.
async fn finish_jobs(&mut self, checkpoint: bool) -> MetaResult<bool> {
for job in self
.finished_jobs
.extract_if(|job| checkpoint || !job.is_checkpoint_required())
{
// The command is ready to finish. We can now call `pre_finish`.
job.pre_finish().await?;
job.notify_finished();
}
Ok(!self.finished_jobs.is_empty())
}
}

impl CheckpointControl {
fn cancel_command(&mut self, cancelled_job: TrackingJob) {
if let TrackingJob::New(cancelled_command) = cancelled_job {
if let Some(index) = self.inflight_command_ctx_queue.iter().position(|command| {
command.command_ctx.prev_epoch.value()
== cancelled_command.context.prev_epoch.value()
}) {
self.inflight_command_ctx_queue.remove(index);
}
} else {
// Recovered jobs do not need to be cancelled since only `RUNNING` actors will get recovered.
}
}
}

impl CreateMviewProgressTracker {
fn cancel_stashed_command(&mut self, id: TableId) {
self.finished_jobs
.retain(|x| x.table_to_create() != Some(id));
}
}

impl CheckpointControl {
/// Update the metrics of barrier nums.
fn update_barrier_nums_metrics(&self) {
self.context
Expand Down Expand Up @@ -357,15 +309,10 @@ struct InflightCommand {
notifiers: Vec<Notifier>,
}

struct CompleteBarrierOutput {
cancelled_job: Option<TrackingJob>,
has_remaining_job: bool,
}

struct CompletingCommand {
command_ctx: Arc<CommandContext>,

join_handle: JoinHandle<MetaResult<CompleteBarrierOutput>>,
join_handle: JoinHandle<MetaResult<bool>>,
}

/// The result of barrier collect.
Expand Down Expand Up @@ -735,7 +682,7 @@ impl GlobalBarrierManagerContext {
resps: Vec<BarrierCompleteResponse>,
mut notifiers: Vec<Notifier>,
enqueue_time: HistogramTimer,
) -> MetaResult<CompleteBarrierOutput> {
) -> MetaResult<bool> {
let wait_commit_timer = self.metrics.barrier_wait_commit_latency.start_timer();
let (commit_info, create_mview_progress) = collect_commit_epoch_info(resps);
if let Err(e) = self.update_snapshot(&command_ctx, commit_info).await {
Expand Down Expand Up @@ -814,22 +761,13 @@ impl GlobalBarrierManagerContext {
notifiers: Vec<Notifier>,
command_ctx: Arc<CommandContext>,
create_mview_progress: Vec<CreateMviewProgress>,
) -> MetaResult<CompleteBarrierOutput> {
) -> MetaResult<bool> {
{
{
// Notify about collected.
let version_stats = self.hummock_manager.get_version_stats().await;

let mut tracker = self.tracker.lock().await;

// Save `cancelled_command` for Create MVs.
let actors_to_cancel = command_ctx.actors_to_cancel();
let cancelled_job = if !actors_to_cancel.is_empty() {
tracker.find_cancelled_command(actors_to_cancel)
} else {
None
};

// Save `finished_commands` for Create MVs.
let finished_commands = {
let mut commands = vec![];
Expand Down Expand Up @@ -864,17 +802,14 @@ impl GlobalBarrierManagerContext {
if let Some(table_id) = command_ctx.table_to_cancel() {
// the cancelled command is possibly stashed in `finished_commands` and waiting
// for checkpoint, we should also clear it.
tracker.cancel_stashed_command(table_id);
tracker.cancel_command(table_id);
}

let has_remaining_job = tracker
.finish_jobs(command_ctx.kind.is_checkpoint())
.await?;

Ok(CompleteBarrierOutput {
cancelled_job,
has_remaining_job,
})
Ok(has_remaining_job)
}
}
}
Expand Down Expand Up @@ -952,12 +887,9 @@ impl CheckpointControl {
})
.and_then(|result| result);
let completed_command = self.completing_command.take().expect("non-empty");
let result = join_result.map(|output| {
let result = join_result.map(|has_remaining| {
let command_ctx = completed_command.command_ctx.clone();
if let Some(job) = output.cancelled_job {
self.cancel_command(job);
}
(command_ctx, output.has_remaining_job)
(command_ctx, has_remaining)
});

Poll::Ready(result)
Expand Down
49 changes: 28 additions & 21 deletions src/meta/src/barrier/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::util::epoch::Epoch;
use risingwave_pb::ddl_service::DdlProgress;
Expand Down Expand Up @@ -44,7 +43,7 @@ enum BackfillState {

/// Progress of all actors containing backfill executors while creating mview.
#[derive(Debug)]
struct Progress {
pub(super) struct Progress {
states: HashMap<ActorId, BackfillState>,

done_count: usize,
Expand Down Expand Up @@ -256,7 +255,7 @@ pub(super) struct CreateMviewProgressTracker {
actor_map: HashMap<ActorId, TableId>,

/// Get notified when we finished Create MV and collect a barrier(checkpoint = true)
pub(super) finished_jobs: Vec<TrackingJob>,
finished_jobs: Vec<TrackingJob>,
}

impl CreateMviewProgressTracker {
Expand Down Expand Up @@ -343,25 +342,33 @@ impl CreateMviewProgressTracker {
.collect()
}

/// Try to find the target create-streaming-job command from track.
/// Stash a command to finish later.
pub(super) fn stash_command_to_finish(&mut self, finished_job: TrackingJob) {
self.finished_jobs.push(finished_job);
}

/// Finish stashed jobs.
/// If checkpoint, means all jobs can be finished.
/// If not checkpoint, jobs which do not require checkpoint can be finished.
///
/// Return the target command as it should be cancelled based on the input actors.
pub fn find_cancelled_command(
&mut self,
actors_to_cancel: HashSet<ActorId>,
) -> Option<TrackingJob> {
let epochs = actors_to_cancel
.into_iter()
.map(|actor_id| self.actor_map.get(&actor_id))
.collect_vec();
assert!(epochs.iter().all_equal());
// If the target command found in progress map, return and remove it. Note that the command
// should have finished if not found.
if let Some(Some(epoch)) = epochs.first() {
Some(self.progress_map.remove(epoch).unwrap().1)
} else {
None
/// Returns whether there are still remaining stashed jobs to finish.
pub(super) async fn finish_jobs(&mut self, checkpoint: bool) -> MetaResult<bool> {
for job in self
.finished_jobs
.extract_if(|job| checkpoint || !job.is_checkpoint_required())
{
// The command is ready to finish. We can now call `pre_finish`.
job.pre_finish().await?;
job.notify_finished();
}
Ok(!self.finished_jobs.is_empty())
}

pub(super) fn cancel_command(&mut self, id: TableId) {
let _ = self.progress_map.remove(&id);
self.finished_jobs
.retain(|x| x.table_to_create() != Some(id));
self.actor_map.retain(|_, table_id| *table_id != id);
}

/// Add a new create-mview DDL command to track.
Expand Down Expand Up @@ -501,7 +508,7 @@ impl CreateMviewProgressTracker {
table_id
);

// Clean-up the mapping from actors to DDL epoch.
// Clean-up the mapping from actors to DDL table_id.
for actor in o.get().0.actors() {
self.actor_map.remove(&actor);
}
Expand Down

0 comments on commit abff967

Please sign in to comment.