Skip to content

Commit

Permalink
Merge branch 'yiming/extract-graph-info' into yiming/snapshot-backfill
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Aug 9, 2024
2 parents 3cb4a9a + d2e8828 commit adc3752
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 0 deletions.
11 changes: 11 additions & 0 deletions src/meta/src/barrier/creating_job_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::Arc;

use itertools::Itertools;
use risingwave_common::util::epoch::Epoch;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::stream_service::BarrierCompleteResponse;
Expand Down Expand Up @@ -132,6 +133,16 @@ impl CreatingStreamingJobControl {
}
}

pub(super) fn on_new_worker_node_map(&self, node_map: &HashMap<WorkerId, WorkerNode>) {
match &self.status {
CreatingStreamingJobStatus::ConsumingSnapshot { graph_info, .. }
| CreatingStreamingJobStatus::ConsumingLogStore { graph_info, .. } => {
graph_info.on_new_worker_node_map(node_map)
}
CreatingStreamingJobStatus::Finishing(_) | CreatingStreamingJobStatus::Finished(_) => {}
}
}

fn latest_epoch(&self) -> Option<u64> {
self.inflight_barrier_queue
.last_key_value()
Expand Down
10 changes: 10 additions & 0 deletions src/meta/src/barrier/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::{HashMap, HashSet};

use risingwave_common::catalog::TableId;
use risingwave_pb::common::WorkerNode;
use tracing::warn;

use crate::barrier::Command;
Expand Down Expand Up @@ -80,6 +81,15 @@ impl InflightGraphInfo {
}
}

/// Update worker nodes snapshot. We need to support incremental updates for it in the future.
pub fn on_new_worker_node_map(&self, node_map: &HashMap<WorkerId, WorkerNode>) {
for (node_id, actors) in &self.actor_map {
if !node_map.contains_key(node_id) {
warn!(node_id, ?actors, "node with running actors is deleted");
}
}
}

pub(crate) fn extend(&mut self, other: &Self) {
self.apply_add(other.fragment_infos.iter().map(|(fragment_id, info)| {
(
Expand Down
3 changes: 3 additions & 0 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,9 @@ impl GlobalBarrierManager {

info!(?changed_worker, "worker changed");

self.state.inflight_graph_info
.on_new_worker_node_map(self.active_streaming_nodes.current());
self.checkpoint_control.creating_streaming_job_controls.values().for_each(|job| job.on_new_worker_node_map(self.active_streaming_nodes.current()));
if let ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) = changed_worker {
self.control_stream_manager.add_worker(node).await;
}
Expand Down

0 comments on commit adc3752

Please sign in to comment.