Skip to content

Commit

Permalink
Improved logging/error handling for cluster management
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky committed Feb 1, 2024
1 parent 6b85e80 commit 9d6a072
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 0 deletions.
9 changes: 9 additions & 0 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -704,12 +704,21 @@ impl GlobalBarrierManagerContext {
let mgr = self.metadata_manager.as_v1_ref();
debug!("start resetting actors distribution");

if info.actor_location_map.is_empty() {
debug!("empty cluster, skipping");
return Ok(true);
}

let current_parallelism = info
.node_map
.values()
.flat_map(|worker_node| worker_node.parallel_units.iter())
.count();

if current_parallelism == 0 {
return Err(anyhow!("no available parallel units for auto scaling").into());
}

/// We infer the new parallelism strategy based on the prior level of parallelism of the table.
/// If the parallelism strategy is Fixed or Auto, we won't make any modifications.
/// For Custom, we'll assess the parallelism of the core fragment;
Expand Down
16 changes: 16 additions & 0 deletions src/meta/src/manager/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ impl ClusterManager {
let mut core = self.core.write().await;

if let Some(worker) = core.get_worker_by_host_mut(host_address.clone()) {
tracing::info!("worker {} re-joined the cluster", worker.worker_id());
worker.update_resource(Some(resource));
worker.update_started_at(timestamp_now_sec());
if let Some(property) = &mut property {
Expand Down Expand Up @@ -233,12 +234,25 @@ impl ClusterManager {
worker.insert(self.env.meta_store_checked()).await?;
// Update core.
core.add_worker_node(worker);

tracing::info!(
"new worker {} from {}:{} joined cluster",
worker_id,
host_address.get_host(),
host_address.get_port()
);

Ok(worker_node)
}

pub async fn activate_worker_node(&self, host_address: HostAddress) -> MetaResult<()> {
let mut core = self.core.write().await;
let mut worker = core.get_worker_by_host_checked(host_address.clone())?;

let worker_id = worker.worker_id();

tracing::info!("worker {} activating", worker_id);

if worker.worker_node.state != State::Running as i32 {
worker.worker_node.state = State::Running as i32;
worker.insert(self.env.meta_store_checked()).await?;
Expand All @@ -258,6 +272,8 @@ impl ClusterManager {
.notify_local_subscribers(LocalNotification::WorkerNodeActivated(worker.worker_node))
.await;

tracing::info!("worker {} activated", worker_id);

Ok(())
}

Expand Down

0 comments on commit 9d6a072

Please sign in to comment.