diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index a29ceedc81b86..37c270da30c78 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -704,12 +704,21 @@ impl GlobalBarrierManagerContext { let mgr = self.metadata_manager.as_v1_ref(); debug!("start resetting actors distribution"); + if info.actor_location_map.is_empty() { + debug!("empty cluster, skipping"); + return Ok(true); + } + let current_parallelism = info .node_map .values() .flat_map(|worker_node| worker_node.parallel_units.iter()) .count(); + if current_parallelism == 0 { + return Err(anyhow!("no available parallel units for auto scaling").into()); + } + /// We infer the new parallelism strategy based on the prior level of parallelism of the table. /// If the parallelism strategy is Fixed or Auto, we won't make any modifications. /// For Custom, we'll assess the parallelism of the core fragment; diff --git a/src/meta/src/manager/cluster.rs b/src/meta/src/manager/cluster.rs index 0f96c6aa95a52..38e1ff3ca8f33 100644 --- a/src/meta/src/manager/cluster.rs +++ b/src/meta/src/manager/cluster.rs @@ -112,6 +112,7 @@ impl ClusterManager { let mut core = self.core.write().await; if let Some(worker) = core.get_worker_by_host_mut(host_address.clone()) { + tracing::info!("worker {} re-joined the cluster", worker.worker_id()); worker.update_resource(Some(resource)); worker.update_started_at(timestamp_now_sec()); if let Some(property) = &mut property { @@ -233,12 +234,25 @@ impl ClusterManager { worker.insert(self.env.meta_store_checked()).await?; // Update core. core.add_worker_node(worker); + + tracing::info!( + "new worker {} from {}:{} joined cluster", + worker_id, + host_address.get_host(), + host_address.get_port() + ); + Ok(worker_node) } pub async fn activate_worker_node(&self, host_address: HostAddress) -> MetaResult<()> { let mut core = self.core.write().await; let mut worker = core.get_worker_by_host_checked(host_address.clone())?; + + let worker_id = worker.worker_id(); + + tracing::info!("worker {} activating", worker_id); + if worker.worker_node.state != State::Running as i32 { worker.worker_node.state = State::Running as i32; worker.insert(self.env.meta_store_checked()).await?; @@ -258,6 +272,8 @@ impl ClusterManager { .notify_local_subscribers(LocalNotification::WorkerNodeActivated(worker.worker_node)) .await; + tracing::info!("worker {} activated", worker_id); + Ok(()) }