Skip to content

Commit

Permalink
feat(meta): maintain snapshot of active streaming compute node in wor…
Browse files Browse the repository at this point in the history
…ker loop
  • Loading branch information
wenym1 committed Feb 6, 2024
1 parent 2a98c6e commit 8f19930
Show file tree
Hide file tree
Showing 6 changed files with 388 additions and 139 deletions.
103 changes: 74 additions & 29 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use risingwave_hummock_sdk::table_watermark::{
};
use risingwave_hummock_sdk::{ExtendedSstableInfo, HummockSstableObjectId};
use risingwave_pb::catalog::table::TableType;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::PausedReason;
Expand All @@ -41,7 +42,7 @@ use thiserror_ext::AsReport;
use tokio::sync::oneshot::{Receiver, Sender};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tracing::Instrument;
use tracing::{info, warn, Instrument};

use self::command::CommandContext;
use self::notifier::Notifier;
Expand All @@ -54,7 +55,9 @@ use crate::barrier::state::BarrierManagerState;
use crate::barrier::BarrierEpochState::{Completed, InFlight};
use crate::hummock::{CommitEpochInfo, HummockManagerRef};
use crate::manager::sink_coordination::SinkCoordinatorManager;
use crate::manager::{LocalNotification, MetaSrvEnv, MetadataManager, WorkerId};
use crate::manager::{
ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv, MetadataManager, WorkerId,
};
use crate::model::{ActorId, TableFragments};
use crate::rpc::metrics::MetaMetrics;
use crate::stream::{ScaleController, ScaleControllerRef, SourceManagerRef};
Expand Down Expand Up @@ -183,6 +186,8 @@ pub struct GlobalBarrierManager {
checkpoint_control: CheckpointControl,

rpc_manager: BarrierRpcManager,

active_streaming_nodes: ActiveStreamingWorkerNodes,
}

/// Controls the concurrent execution of commands.
Expand Down Expand Up @@ -396,6 +401,9 @@ impl GlobalBarrierManager {
);
let checkpoint_control = CheckpointControl::new(metrics.clone());

let active_streaming_nodes =
ActiveStreamingWorkerNodes::uninitialized(metadata_manager.clone());

let tracker = CreateMviewProgressTracker::new();

let scale_controller = Arc::new(ScaleController::new(
Expand Down Expand Up @@ -429,6 +437,7 @@ impl GlobalBarrierManager {
state: initial_invalid_state,
checkpoint_control,
rpc_manager,
active_streaming_nodes,
}
}

Expand Down Expand Up @@ -504,7 +513,7 @@ impl GlobalBarrierManager {
}
}

self.state = {
{
let latest_snapshot = self.context.hummock_manager.latest_snapshot();
assert_eq!(
latest_snapshot.committed_epoch, latest_snapshot.current_epoch,
Expand All @@ -524,11 +533,10 @@ impl GlobalBarrierManager {
let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
let paused_reason = paused.then_some(PausedReason::Manual);

self.context
.recovery(prev_epoch, paused_reason, &self.scheduled_barriers)
self.recovery(prev_epoch, paused_reason)
.instrument(span)
.await
};
.await;
}

self.context.set_status(BarrierManagerStatus::Running).await;

Expand All @@ -551,6 +559,58 @@ impl GlobalBarrierManager {
tracing::info!("Barrier manager is stopped");
break;
}

changed_worker = self.active_streaming_nodes.changed() => {
#[cfg(debug_assertions)]
{
match self
.context
.metadata_manager
.list_active_streaming_compute_nodes()
.await
{
Ok(worker_nodes) => {
let ignore_irrelevant_info = |node: &WorkerNode| {
(
node.id,
WorkerNode {
id: node.id,
r#type: node.r#type,
host: node.host.clone(),
parallel_units: node.parallel_units.clone(),
property: node.property.clone(),
resource: node.resource.clone(),
..Default::default()
},
)
};
let worker_nodes: HashMap<_, _> =
worker_nodes.iter().map(ignore_irrelevant_info).collect();
let curr_worker_nodes: HashMap<_, _> = self
.active_streaming_nodes
.current()
.values()
.map(ignore_irrelevant_info)
.collect();
if worker_nodes != curr_worker_nodes {
warn!(
?worker_nodes,
?curr_worker_nodes,
"different to global snapshot"
);
}
}
Err(e) => {
warn!(e = ?e.as_report(), "fail to list_active_streaming_compute_nodes to compare with local snapshot");
}
}
}

info!(?changed_worker, "worker changed");

// TODO: may apply the changed worker to state
}

// Checkpoint frequency changes.
notification = local_notification_rx.recv() => {
let notification = notification.unwrap();
Expand Down Expand Up @@ -601,13 +661,8 @@ impl GlobalBarrierManager {
span,
} = self.scheduled_barriers.pop_or_default().await;

let all_nodes = self
.context
.metadata_manager
.list_active_streaming_compute_nodes()
.await
.unwrap();
self.state.resolve_worker_nodes(all_nodes);
self.state
.resolve_worker_nodes(self.active_streaming_nodes.current().values().cloned());
let info = self.state.apply_command(&command);

let (prev_epoch, curr_epoch) = self.state.next_epoch_pair();
Expand Down Expand Up @@ -740,11 +795,7 @@ impl GlobalBarrierManager {

// No need to clean dirty tables for barrier recovery,
// The foreground stream job should cleanup their own tables.
self.state = self
.context
.recovery(prev_epoch, None, &self.scheduled_barriers)
.instrument(span)
.await;
self.recovery(prev_epoch, None).instrument(span).await;
self.context.set_status(BarrierManagerStatus::Running).await;
} else {
panic!("failed to execute barrier: {}", err.as_report());
Expand Down Expand Up @@ -920,23 +971,17 @@ impl GlobalBarrierManagerContext {
/// Resolve actor information from cluster, fragment manager and `ChangedTableId`.
/// We use `changed_table_id` to modify the actors to be sent or collected. Because these actor
/// will create or drop before this barrier flow through them.
async fn resolve_actor_info(&self) -> MetaResult<InflightActorInfo> {
async fn resolve_actor_info(
&self,
all_nodes: Vec<WorkerNode>,
) -> MetaResult<InflightActorInfo> {
let info = match &self.metadata_manager {
MetadataManager::V1(mgr) => {
let all_nodes = mgr
.cluster_manager
.list_active_streaming_compute_nodes()
.await;
let all_actor_infos = mgr.fragment_manager.load_all_actors().await;

InflightActorInfo::resolve(all_nodes, all_actor_infos)
}
MetadataManager::V2(mgr) => {
let all_nodes = mgr
.cluster_controller
.list_active_streaming_workers()
.await
.unwrap();
let all_actor_infos = mgr.catalog_controller.load_all_actors().await?;

InflightActorInfo::resolve(all_nodes, all_actor_infos)
Expand Down
Loading

0 comments on commit 8f19930

Please sign in to comment.