Skip to content

Commit

Permalink
Merge branch 'yiming/extract-graph-info' into yiming/snapshot-backfill
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Aug 8, 2024
2 parents 655b07d + 262285d commit 3cb4a9a
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 28 deletions.
6 changes: 0 additions & 6 deletions src/meta/src/barrier/creating_job_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::sync::Arc;

use itertools::Itertools;
use risingwave_common::util::epoch::Epoch;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::stream_service::BarrierCompleteResponse;
Expand Down Expand Up @@ -211,7 +210,6 @@ impl CreatingStreamingJobControl {
pub(super) fn may_inject_fake_barrier(
&mut self,
control_stream_manager: &mut ControlStreamManager,
node_map: &HashMap<WorkerId, WorkerNode>,
is_checkpoint: bool,
) -> MetaResult<()> {
if let CreatingStreamingJobStatus::ConsumingSnapshot {
Expand All @@ -230,7 +228,6 @@ impl CreatingStreamingJobControl {
let prev_epoch = Epoch::from_physical_time(*prev_epoch_fake_physical_time);
let node_to_collect = control_stream_manager.inject_barrier(
table_id,
node_map,
None,
(
&TracedEpoch::new(self.backfill_epoch),
Expand All @@ -248,7 +245,6 @@ impl CreatingStreamingJobControl {
for command in pending_commands {
let node_to_collect = control_stream_manager.inject_barrier(
table_id,
node_map,
command.to_mutation(),
(&command.curr_epoch, &command.prev_epoch),
&command.kind,
Expand All @@ -273,7 +269,6 @@ impl CreatingStreamingJobControl {
};
let node_to_collect = control_stream_manager.inject_barrier(
table_id,
node_map,
None,
(&curr_epoch, &prev_epoch),
&kind,
Expand Down Expand Up @@ -313,7 +308,6 @@ impl CreatingStreamingJobControl {
CreatingStreamingJobStatus::ConsumingLogStore { graph_info } => {
let node_to_collect = control_stream_manager.inject_barrier(
Some(table_id),
&command_ctx.node_map,
if to_finish {
// erase the mutation on upstream except the last Finish command
command_ctx.to_mutation()
Expand Down
6 changes: 1 addition & 5 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -968,11 +968,7 @@ impl GlobalBarrierManager {
.creating_streaming_job_controls
.values_mut()
{
creating_job.may_inject_fake_barrier(
&mut self.control_stream_manager,
self.active_streaming_nodes.current(),
checkpoint,
)?
creating_job.may_inject_fake_barrier(&mut self.control_stream_manager, checkpoint)?
}

self.pending_non_checkpoint_barriers
Expand Down
27 changes: 10 additions & 17 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,6 @@ impl ControlStreamManager {
) -> MetaResult<HashSet<WorkerId>> {
self.inject_barrier(
None,
&command_ctx.node_map,
command_ctx.to_mutation(),
(&command_ctx.curr_epoch, &command_ctx.prev_epoch),
&command_ctx.kind,
Expand All @@ -269,7 +268,6 @@ impl ControlStreamManager {
pub(super) fn inject_barrier(
&mut self,
creating_table_id: Option<TableId>,
node_map: &HashMap<WorkerId, WorkerNode>,
mutation: Option<Mutation>,
(curr_epoch, prev_epoch): (&TracedEpoch, &TracedEpoch),
kind: &BarrierKind,
Expand All @@ -288,17 +286,21 @@ impl ControlStreamManager {
})
.unwrap_or(u32::MAX);

for worker_id in pre_applied_graph_info.worker_ids() {
if !node_map.contains_key(&worker_id) {
return Err(anyhow!("worker id {} not exist", worker_id).into());
for worker_id in pre_applied_graph_info.worker_ids().chain(
applied_graph_info
.into_iter()
.flat_map(|info| info.worker_ids()),
) {
if !self.nodes.contains_key(&worker_id) {
return Err(anyhow!("unconnected worker node {}", worker_id).into());
}
}

let mut node_need_collect = HashSet::new();

node_map
.iter()
.map(|(node_id, worker_node)| {
self.nodes
.iter_mut()
.map(|(node_id, node)| {
let actor_ids_to_send: Vec<_> =
pre_applied_graph_info.actor_ids_to_send(*node_id).collect();
let actor_ids_to_collect: Vec<_> = pre_applied_graph_info
Expand All @@ -318,15 +320,6 @@ impl ControlStreamManager {
};

{
let Some(node) = self.nodes.get_mut(node_id) else {
if actor_ids_to_collect.is_empty() {
// Worker node get disconnected but has no actor to collect. Simply skip it.
return Ok(());
}
return Err(
anyhow!("unconnected worker node: {:?}", worker_node.host).into()
);
};
let mutation = mutation.clone();
let barrier = Barrier {
epoch: Some(risingwave_pb::data::Epoch {
Expand Down

0 comments on commit 3cb4a9a

Please sign in to comment.