Skip to content

Commit

Permalink
transit to log store in collect
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Oct 12, 2024
1 parent f645580 commit 62106c4
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 135 deletions.
2 changes: 2 additions & 0 deletions src/meta/src/barrier/creating_job/barrier_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ impl CreatingStreamingJobBarrierControl {
/// Return Some((epoch, resps, `is_first_commit`))
///
/// Only epoch within the `epoch_end_bound` can be started.
/// Usually `epoch_end_bound` is the upstream committed epoch. This is to ensure that
/// the creating job won't have higher committed epoch than the upstream.
pub(super) fn start_completing(
&mut self,
epoch_end_bound: Bound<u64>,
Expand Down
124 changes: 73 additions & 51 deletions src/meta/src/barrier/creating_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::collections::HashMap;
use std::ops::Bound::{Excluded, Unbounded};
use std::sync::Arc;

use risingwave_common::catalog::TableId;
use risingwave_common::metrics::LabelGuardedIntGauge;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::ddl_service::DdlProgress;
Expand Down Expand Up @@ -168,43 +169,32 @@ impl CreatingStreamingJobControl {
))
}

pub(super) fn may_inject_fake_barrier(
&mut self,
fn inject_barrier(
table_id: TableId,
control_stream_manager: &mut ControlStreamManager,
is_checkpoint: bool,
barrier_control: &mut CreatingStreamingJobBarrierControl,
pre_applied_graph_info: &InflightGraphInfo,
applied_graph_info: Option<&InflightGraphInfo>,
CreatingJobInjectBarrierInfo {
curr_epoch,
prev_epoch,
kind,
new_actors,
mutation,
}: CreatingJobInjectBarrierInfo,
) -> MetaResult<()> {
if let Some(barriers_to_inject) = self.status.may_inject_fake_barrier(is_checkpoint) {
let graph_info = self
.status
.active_graph_info()
.expect("must exist when having barriers to inject");
let table_id = self.info.table_fragments.table_id();
for CreatingJobInjectBarrierInfo {
curr_epoch,
prev_epoch,
kind,
new_actors,
mutation,
} in barriers_to_inject
{
let node_to_collect = control_stream_manager.inject_barrier(
Some(table_id),
mutation,
(&curr_epoch, &prev_epoch),
&kind,
graph_info,
Some(graph_info),
new_actors,
vec![],
vec![],
)?;
self.barrier_control.enqueue_epoch(
prev_epoch.value().0,
node_to_collect,
kind.is_checkpoint(),
);
}
}
let node_to_collect = control_stream_manager.inject_barrier(
Some(table_id),
mutation,
(&curr_epoch, &prev_epoch),
&kind,
pre_applied_graph_info,
applied_graph_info,
new_actors,
vec![],
vec![],
)?;
barrier_control.enqueue_epoch(prev_epoch.value().0, node_to_collect, kind.is_checkpoint());
Ok(())
}

Expand Down Expand Up @@ -237,35 +227,52 @@ impl CreatingStreamingJobControl {
);
match &mut self.status {
CreatingStreamingJobStatus::ConsumingSnapshot {
pending_commands, ..
pending_commands,
prev_epoch_fake_physical_time,
pending_non_checkpoint_barriers,
initial_barrier_info,
ref graph_info,
..
} => {
assert!(
!start_consume_upstream,
"should not start consuming upstream for a job that are consuming snapshot"
);
let new_barrier = CreatingStreamingJobStatus::new_fake_barrier(
prev_epoch_fake_physical_time,
pending_non_checkpoint_barriers,
initial_barrier_info,
command_ctx.kind.is_checkpoint(),
);
pending_commands.push(command_ctx.clone());
Self::inject_barrier(
self.info.table_fragments.table_id(),
control_stream_manager,
&mut self.barrier_control,
graph_info,
Some(graph_info),
new_barrier,
)?;
}
CreatingStreamingJobStatus::ConsumingLogStore { graph_info, .. } => {
let node_to_collect = control_stream_manager.inject_barrier(
Some(table_id),
None,
(&command_ctx.curr_epoch, &command_ctx.prev_epoch),
&command_ctx.kind,
Self::inject_barrier(
self.info.table_fragments.table_id(),
control_stream_manager,
&mut self.barrier_control,
graph_info,
if start_consume_upstream {
None
} else {
Some(graph_info)
},
None,
vec![],
vec![],
CreatingJobInjectBarrierInfo {
curr_epoch: command_ctx.curr_epoch.clone(),
prev_epoch: command_ctx.prev_epoch.clone(),
kind: command_ctx.kind.clone(),
new_actors: None,
mutation: None,
},
)?;
self.barrier_control.enqueue_epoch(
command_ctx.prev_epoch.value().0,
node_to_collect,
command_ctx.kind.is_checkpoint(),
);
let prev_epoch = command_ctx.prev_epoch.value().0;
if start_consume_upstream {
info!(
Expand All @@ -291,9 +298,24 @@ impl CreatingStreamingJobControl {
epoch: u64,
worker_id: WorkerId,
resp: BarrierCompleteResponse,
) {
self.status.update_progress(&resp.create_mview_progress);
control_stream_manager: &mut ControlStreamManager,
) -> MetaResult<()> {
let prev_barriers_to_inject = self.status.update_progress(&resp.create_mview_progress);
self.barrier_control.collect(epoch, worker_id, resp);
if let Some((prev_barriers_to_inject, graph_info)) = prev_barriers_to_inject {
let table_id = self.info.table_fragments.table_id();
for info in prev_barriers_to_inject {
Self::inject_barrier(
table_id,
control_stream_manager,
&mut self.barrier_control,
graph_info,
Some(graph_info),
info,
)?;
}
}
Ok(())
}

pub(super) fn should_merge_to_upstream(&self) -> Option<InflightGraphInfo> {
Expand Down
106 changes: 55 additions & 51 deletions src/meta/src/barrier/creating_job/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::mem::take;
use std::sync::Arc;

use risingwave_common::hash::ActorId;
use risingwave_common::must_match;
use risingwave_common::util::epoch::Epoch;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::stream_plan::barrier_mutation::Mutation;
Expand Down Expand Up @@ -147,59 +148,40 @@ impl CreatingStreamingJobStatus {
pub(super) fn update_progress(
&mut self,
create_mview_progress: impl IntoIterator<Item = &CreateMviewProgress>,
) {
) -> Option<(Vec<CreatingJobInjectBarrierInfo>, &InflightGraphInfo)> {
match self {
Self::ConsumingSnapshot {
create_mview_tracker,
ref version_stats,
prev_epoch_fake_physical_time,
pending_commands,
ref graph_info,
pending_non_checkpoint_barriers,
ref backfill_epoch,
initial_barrier_info,
ref snapshot_backfill_actors,
..
} => {
create_mview_tracker.update_tracking_jobs(
None,
create_mview_progress,
version_stats,
);
}
CreatingStreamingJobStatus::ConsumingLogStore {
log_store_progress_tracker,
..
} => {
log_store_progress_tracker.update(create_mview_progress);
}
CreatingStreamingJobStatus::Finishing(_) => {}
}
}

/// return
/// - Some(vec[(`curr_epoch`, `prev_epoch`, `barrier_kind`)]) of barriers to newly inject
pub(super) fn may_inject_fake_barrier(
&mut self,
is_checkpoint: bool,
) -> Option<Vec<CreatingJobInjectBarrierInfo>> {
if let CreatingStreamingJobStatus::ConsumingSnapshot {
prev_epoch_fake_physical_time,
pending_commands,
create_mview_tracker,
ref graph_info,
pending_non_checkpoint_barriers,
ref backfill_epoch,
initial_barrier_info,
ref snapshot_backfill_actors,
..
} = self
{
if create_mview_tracker.has_pending_finished_jobs() {
assert!(initial_barrier_info.is_none());
pending_non_checkpoint_barriers.push(*backfill_epoch);
if create_mview_tracker.has_pending_finished_jobs() {
let (new_actors, mutation) = match initial_barrier_info.take() {
Some((new_actors, mutation)) => (Some(new_actors), Some(mutation)),
None => (None, None),
};
assert!(initial_barrier_info.is_none());
pending_non_checkpoint_barriers.push(*backfill_epoch);

let prev_epoch = Epoch::from_physical_time(*prev_epoch_fake_physical_time);
let barriers_to_inject: Vec<_> =
[CreatingJobInjectBarrierInfo {
let prev_epoch = Epoch::from_physical_time(*prev_epoch_fake_physical_time);
let barriers_to_inject: Vec<_> = [CreatingJobInjectBarrierInfo {
curr_epoch: TracedEpoch::new(Epoch(*backfill_epoch)),
prev_epoch: TracedEpoch::new(prev_epoch),
kind: BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers)),
new_actors: None,
mutation: None,
new_actors,
mutation,
}]
.into_iter()
.chain(pending_commands.drain(..).map(|command_ctx| {
Expand All @@ -213,15 +195,39 @@ impl CreatingStreamingJobStatus {
}))
.collect();

*self = CreatingStreamingJobStatus::ConsumingLogStore {
graph_info: graph_info.clone(),
log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
snapshot_backfill_actors.iter().cloned(),
barriers_to_inject.len(),
),
};
Some(barriers_to_inject)
} else {
*self = CreatingStreamingJobStatus::ConsumingLogStore {
graph_info: graph_info.clone(),
log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
snapshot_backfill_actors.iter().cloned(),
barriers_to_inject.len(),
),
};
let graph_info = must_match!(self,
CreatingStreamingJobStatus::ConsumingLogStore {graph_info, ..} => graph_info);
Some((barriers_to_inject, graph_info))
} else {
None
}
}
CreatingStreamingJobStatus::ConsumingLogStore {
log_store_progress_tracker,
..
} => {
log_store_progress_tracker.update(create_mview_progress);
None
}
CreatingStreamingJobStatus::Finishing(_) => None,
}
}

pub(super) fn new_fake_barrier(
prev_epoch_fake_physical_time: &mut u64,
pending_non_checkpoint_barriers: &mut Vec<u64>,
initial_barrier_info: &mut Option<(HashMap<WorkerId, Vec<StreamActor>>, Mutation)>,
is_checkpoint: bool,
) -> CreatingJobInjectBarrierInfo {
{
{
let prev_epoch =
TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
*prev_epoch_fake_physical_time += 1;
Expand All @@ -239,16 +245,14 @@ impl CreatingStreamingJobStatus {
} else {
Default::default()
};
Some(vec![CreatingJobInjectBarrierInfo {
CreatingJobInjectBarrierInfo {
curr_epoch,
prev_epoch,
kind,
new_actors,
mutation,
}])
}
}
} else {
None
}
}
}
26 changes: 9 additions & 17 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,11 @@ impl CheckpointControl {

/// Change the state of this `prev_epoch` to `Completed`. Return continuous nodes
/// with `Completed` starting from first node [`Completed`..`InFlight`) and remove them.
fn barrier_collected(&mut self, resp: BarrierCompleteResponse) {
fn barrier_collected(
&mut self,
resp: BarrierCompleteResponse,
control_stream_manager: &mut ControlStreamManager,
) -> MetaResult<()> {
let worker_id = resp.worker_id;
let prev_epoch = resp.epoch;
tracing::trace!(
Expand All @@ -351,8 +355,9 @@ impl CheckpointControl {
self.creating_streaming_job_controls
.get_mut(&creating_table_id)
.expect("should exist")
.collect(prev_epoch, worker_id, resp);
.collect(prev_epoch, worker_id, resp, control_stream_manager)?;
}
Ok(())
}

/// Pause inject barrier until True.
Expand Down Expand Up @@ -817,12 +822,8 @@ impl GlobalBarrierManager {
}
}
(worker_id, resp_result) = self.control_stream_manager.next_complete_barrier_response() => {
match resp_result {
Ok(resp) => {
self.checkpoint_control.barrier_collected(resp);

}
Err(e) => {
if let Err(e) = resp_result.and_then(|resp| self.checkpoint_control.barrier_collected(resp, &mut self.control_stream_manager)) {
{
let failed_command = self.checkpoint_control.command_wait_collect_from_worker(worker_id);
if failed_command.is_some()
|| self.state.inflight_graph_info.contains_worker(worker_id)
Expand Down Expand Up @@ -962,15 +963,6 @@ impl GlobalBarrierManager {
);
}

// may inject fake barrier
for creating_job in self
.checkpoint_control
.creating_streaming_job_controls
.values_mut()
{
creating_job.may_inject_fake_barrier(&mut self.control_stream_manager, checkpoint)?
}

self.pending_non_checkpoint_barriers
.push(prev_epoch.value().0);
let kind = if checkpoint {
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/barrier/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ impl BarrierManagerState {

/// Returns the inflight actor infos that have included the newly added actors in the given command. The dropped actors
/// will be removed from the state after the info get resolved.
///
/// Return (`graph_info`, `subscription_info`, `table_ids_to_commit`, `jobs_to_wait`)
pub fn apply_command(
&mut self,
command: &Command,
Expand Down
Loading

0 comments on commit 62106c4

Please sign in to comment.