diff --git a/sled-agent/src/instance.rs b/sled-agent/src/instance.rs index 2f799d5b4e..ae0e8a8488 100644 --- a/sled-agent/src/instance.rs +++ b/sled-agent/src/instance.rs @@ -255,8 +255,7 @@ impl InstanceMonitorRunner { gen, }) .send() - .await - .map_err(|e| anyhow!(e))? + .await? .into_inner(); let observed_gen = response.gen; @@ -268,10 +267,9 @@ impl InstanceMonitorRunner { let (tx, rx) = oneshot::channel(); self.tx_monitor .send(InstanceMonitorRequest::Update { state: response, tx }) - .await - .map_err(|e| anyhow!(e))?; + .await?; - if let Reaction::Terminate = rx.await.map_err(|e| anyhow!(e))? { + if let Reaction::Terminate = rx.await? { return Ok(()); } @@ -353,7 +351,7 @@ struct InstanceRunner { } impl InstanceRunner { - async fn run(mut self) -> Result<(), Error> { + async fn run(mut self) { while !self.should_terminate { tokio::select! { biased; @@ -367,17 +365,21 @@ impl InstanceRunner { self.state.instance(), &state, ); - let reaction = self.observe_state(&observed).await?; + let reaction = self.observe_state(&observed).await; self.publish_state_to_nexus().await; // NOTE: If we fail to send here, the - // InstanceMonitorRunner has stopped running for some - // reason. We may want to handle that case. + // InstanceMonitorRunner has stopped running for + // some reason. We'd presumably handle that on the + // next iteration of the loop. let _ = tx.send(reaction); }, // NOTE: This case shouldn't really happen, as we keep a copy // of the sender alive in "self.tx_monitor". - None => continue, + None => { + warn!(self.log, "Instance 'VMM monitor' channel closed; shutting down"); + self.terminate().await; + }, } }, @@ -405,10 +407,9 @@ impl InstanceRunner { ); }, Some(Terminate { tx }) => { - let _ = tx.send(self.terminate().await - .map(|r| InstanceUnregisterResponse { updated_runtime: Some(r) }) - .map_err(|e| e.into()) - ); + let _ = tx.send(Ok(InstanceUnregisterResponse { + updated_runtime: Some(self.terminate().await) + })); }, Some(IssueSnapshotRequest { disk_id, snapshot_id, tx }) => { let _ = tx.send( @@ -424,13 +425,16 @@ impl InstanceRunner { Some(DeleteExternalIp { ip, tx }) => { let _ = tx.send(self.delete_external_ip(&ip).await.map_err(|e| e.into())); }, - None => continue, + None => { + warn!(self.log, "Instance request channel closed; shutting down"); + self.terminate().await; + break; + }, } } } } self.publish_state_to_nexus().await; - return Ok(()); } /// Yields this instance's ID. @@ -528,7 +532,7 @@ impl InstanceRunner { async fn observe_state( &mut self, state: &ObservedPropolisState, - ) -> Result { + ) -> Reaction { info!(self.log, "Observing new propolis state: {:?}", state); // This instance may no longer have a Propolis zone if it was rudely @@ -546,7 +550,7 @@ impl InstanceRunner { // Return the Terminate action so that the caller will cleanly // cease to monitor this Propolis. Note that terminating an instance // that's already terminated is a no-op. - return Ok(Reaction::Terminate); + return Reaction::Terminate; } // Update the Sled Agent's internal state machine. @@ -569,10 +573,10 @@ impl InstanceRunner { info!(self.log, "terminating VMM that has exited"; "instance_id" => %self.id()); - self.terminate().await?; - Ok(Reaction::Terminate) + self.terminate().await; + Reaction::Terminate } - None => Ok(Reaction::Continue), + None => Reaction::Continue, } } @@ -697,7 +701,7 @@ impl InstanceRunner { /// /// This routine is safe to call even if the instance's zone was never /// started. It is also safe to call multiple times on a single instance. - async fn terminate_inner(&mut self) -> Result<(), Error> { + async fn terminate_inner(&mut self) { let zname = propolis_zone_name(self.propolis_id()); // First fetch the running state. @@ -715,7 +719,7 @@ impl InstanceRunner { // Ensure the instance is removed from the instance manager's table // so that a new instance can take its place. self.instance_ticket.deregister(); - return Ok(()); + return; }; // Take a zone bundle whenever this instance stops. @@ -753,8 +757,6 @@ impl InstanceRunner { // Remove any OPTE ports from the port manager. running_state.running_zone.release_opte_ports(); - - Ok(()) } async fn add_external_ip_inner( @@ -867,7 +869,7 @@ pub struct Instance { tx: mpsc::Sender, #[allow(dead_code)] - runner_handle: tokio::task::JoinHandle>, + runner_handle: tokio::task::JoinHandle<()>, } #[derive(Debug)] @@ -1213,7 +1215,7 @@ impl InstanceRunner { // this happens, generate an instance record bearing the // "Destroyed" state and return it to the caller. if self.running_state.is_none() { - self.terminate().await?; + self.terminate().await; (None, None) } else { ( @@ -1410,13 +1412,13 @@ impl InstanceRunner { Ok(PropolisSetup { client, running_zone }) } - async fn terminate(&mut self) -> Result { - self.terminate_inner().await?; + async fn terminate(&mut self) -> SledInstanceState { + self.terminate_inner().await; self.state.terminate_rudely(); // This causes the "run" task to exit on the next iteration. self.should_terminate = true; - Ok(self.state.sled_instance_state()) + self.state.sled_instance_state() } async fn issue_snapshot_request( diff --git a/sled-agent/src/instance_manager.rs b/sled-agent/src/instance_manager.rs index 491007c22d..408383a726 100644 --- a/sled-agent/src/instance_manager.rs +++ b/sled-agent/src/instance_manager.rs @@ -89,13 +89,13 @@ struct InstanceManagerInternal { tx: mpsc::Sender, // NOTE: Arguably, this field could be "owned" by the InstanceManagerRunner. // It was not moved there, and the reservoir functions were not converted to - // use the message-apssing interface (see: "InstanceManagerRequest") because + // use the message-passing interface (see: "InstanceManagerRequest") because // callers of "get/set reservoir size" are not async, and (in the case of // getting the size) they also do not expect a "Result" type. reservoir_size: Mutex, #[allow(dead_code)] - runner_handle: tokio::task::JoinHandle>, + runner_handle: tokio::task::JoinHandle<()>, } /// All instances currently running on the sled. @@ -472,7 +472,7 @@ impl InstanceManagerRunner { /// Run the main loop of the InstanceManager. /// /// This should be spawned in a tokio task. - pub async fn run(mut self) -> Result<(), Error> { + pub async fn run(mut self) { use InstanceManagerRequest::*; loop { tokio::select! { @@ -486,8 +486,11 @@ impl InstanceManagerRunner { match request { Some(request) => { self.instances.remove(&request.id); - } - None => break, + }, + None => { + warn!(self.log, "InstanceManager's 'instance terminate' channel closed; shutting down"); + break; + }, } }, request = self.rx.recv() => { @@ -516,7 +519,7 @@ impl InstanceManagerRunner { let _ = self.instance_issue_disk_snapshot_request(tx, instance_id, disk_id, snapshot_id).await; }, Some(CreateZoneBundle{ name, tx }) => { - let _ = self.create_zone_bundle(tx, &name).await; + let _ = self.create_zone_bundle(tx, &name).await.map_err(Error::from); }, Some(InstanceAddExternalIp{ instance_id, ip, tx }) => { let _ = self.add_external_ip(tx, instance_id, &ip).await; @@ -524,12 +527,18 @@ impl InstanceManagerRunner { Some(InstanceDeleteExternalIp{ instance_id, ip, tx }) => { let _ = self.delete_external_ip(tx, instance_id, &ip).await; }, - None => break, + None => { + warn!(self.log, "InstanceManager's request channel closed; shutting down"); + break; + }, } } } } - Ok(()) + } + + fn get_instance(&self, instance_id: Uuid) -> Option<&Instance> { + self.instances.get(&instance_id).map(|(_id, v)| v) } /// Ensures that the instance manager contains a registered instance with @@ -641,7 +650,7 @@ impl InstanceManagerRunner { instance_id: Uuid, ) -> Result<(), Error> { // If the instance does not exist, we response immediately. - let Some((_, instance)) = self.instances.get(&instance_id) else { + let Some(instance) = self.get_instance(instance_id) else { tx.send(Ok(InstanceUnregisterResponse { updated_runtime: None })) .map_err(|_| Error::FailedSendChannelClosed)?; return Ok(()); @@ -661,34 +670,28 @@ impl InstanceManagerRunner { instance_id: Uuid, target: InstanceStateRequested, ) -> Result<(), Error> { - let instance = { - let instance = self.instances.get(&instance_id); - - if let Some((_, instance)) = instance { - instance - } else { - match target { - // If the instance isn't registered, then by definition it - // isn't running here. Allow requests to stop or destroy the - // instance to succeed to provide idempotency. This has to - // be handled here (that is, on the "instance not found" - // path) to handle the case where a stop request arrived, - // Propolis handled it, sled agent unregistered the - // instance, and only then did a second stop request - // arrive. - InstanceStateRequested::Stopped => { - tx.send(Ok(InstancePutStateResponse { - updated_runtime: None, - })) + let Some(instance) = self.get_instance(instance_id) else { + match target { + // If the instance isn't registered, then by definition it + // isn't running here. Allow requests to stop or destroy the + // instance to succeed to provide idempotency. This has to + // be handled here (that is, on the "instance not found" + // path) to handle the case where a stop request arrived, + // Propolis handled it, sled agent unregistered the + // instance, and only then did a second stop request + // arrive. + InstanceStateRequested::Stopped => { + tx.send(Ok(InstancePutStateResponse { + updated_runtime: None, + })) + .map_err(|_| Error::FailedSendChannelClosed)?; + } + _ => { + tx.send(Err(Error::NoSuchInstance(instance_id))) .map_err(|_| Error::FailedSendChannelClosed)?; - } - _ => { - tx.send(Err(Error::NoSuchInstance(instance_id))) - .map_err(|_| Error::FailedSendChannelClosed)?; - } } - return Ok(()); } + return Ok(()); }; instance.put_state(tx, target).await?; Ok(()) @@ -756,12 +759,9 @@ impl InstanceManagerRunner { instance_id: Uuid, ip: &InstanceExternalIpBody, ) -> Result<(), Error> { - let instance = { self.instances.get(&instance_id).map(|(_id, v)| v) }; - - let Some(instance) = instance else { + let Some(instance) = self.get_instance(instance_id) else { return Err(Error::NoSuchInstance(instance_id)); }; - instance.add_external_ip(tx, ip).await?; Ok(()) } @@ -772,9 +772,7 @@ impl InstanceManagerRunner { instance_id: Uuid, ip: &InstanceExternalIpBody, ) -> Result<(), Error> { - let instance = { self.instances.get(&instance_id).map(|(_id, v)| v) }; - - let Some(instance) = instance else { + let Some(instance) = self.get_instance(instance_id) else { return Err(Error::NoSuchInstance(instance_id)); };