Skip to content

Commit

Permalink
review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
smklein committed Feb 2, 2024
1 parent 0ac6457 commit 447bee9
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 71 deletions.
62 changes: 32 additions & 30 deletions sled-agent/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,7 @@ impl InstanceMonitorRunner {
gen,
})
.send()
.await
.map_err(|e| anyhow!(e))?
.await?
.into_inner();
let observed_gen = response.gen;

Expand All @@ -268,10 +267,9 @@ impl InstanceMonitorRunner {
let (tx, rx) = oneshot::channel();
self.tx_monitor
.send(InstanceMonitorRequest::Update { state: response, tx })
.await
.map_err(|e| anyhow!(e))?;
.await?;

if let Reaction::Terminate = rx.await.map_err(|e| anyhow!(e))? {
if let Reaction::Terminate = rx.await? {
return Ok(());
}

Expand Down Expand Up @@ -353,7 +351,7 @@ struct InstanceRunner {
}

impl InstanceRunner {
async fn run(mut self) -> Result<(), Error> {
async fn run(mut self) {
while !self.should_terminate {
tokio::select! {
biased;
Expand All @@ -367,17 +365,21 @@ impl InstanceRunner {
self.state.instance(),
&state,
);
let reaction = self.observe_state(&observed).await?;
let reaction = self.observe_state(&observed).await;
self.publish_state_to_nexus().await;

// NOTE: If we fail to send here, the
// InstanceMonitorRunner has stopped running for some
// reason. We may want to handle that case.
// InstanceMonitorRunner has stopped running for
// some reason. We'd presumably handle that on the
// next iteration of the loop.
let _ = tx.send(reaction);
},
// NOTE: This case shouldn't really happen, as we keep a copy
// of the sender alive in "self.tx_monitor".
None => continue,
None => {
warn!(self.log, "Instance 'VMM monitor' channel closed; shutting down");
self.terminate().await;
},
}

},
Expand Down Expand Up @@ -405,10 +407,9 @@ impl InstanceRunner {
);
},
Some(Terminate { tx }) => {
let _ = tx.send(self.terminate().await
.map(|r| InstanceUnregisterResponse { updated_runtime: Some(r) })
.map_err(|e| e.into())
);
let _ = tx.send(Ok(InstanceUnregisterResponse {
updated_runtime: Some(self.terminate().await)
}));
},
Some(IssueSnapshotRequest { disk_id, snapshot_id, tx }) => {
let _ = tx.send(
Expand All @@ -424,13 +425,16 @@ impl InstanceRunner {
Some(DeleteExternalIp { ip, tx }) => {
let _ = tx.send(self.delete_external_ip(&ip).await.map_err(|e| e.into()));
},
None => continue,
None => {
warn!(self.log, "Instance request channel closed; shutting down");
self.terminate().await;
break;
},
}
}
}
}
self.publish_state_to_nexus().await;
return Ok(());
}

/// Yields this instance's ID.
Expand Down Expand Up @@ -528,7 +532,7 @@ impl InstanceRunner {
async fn observe_state(
&mut self,
state: &ObservedPropolisState,
) -> Result<Reaction, Error> {
) -> Reaction {
info!(self.log, "Observing new propolis state: {:?}", state);

// This instance may no longer have a Propolis zone if it was rudely
Expand All @@ -546,7 +550,7 @@ impl InstanceRunner {
// Return the Terminate action so that the caller will cleanly
// cease to monitor this Propolis. Note that terminating an instance
// that's already terminated is a no-op.
return Ok(Reaction::Terminate);
return Reaction::Terminate;
}

// Update the Sled Agent's internal state machine.
Expand All @@ -569,10 +573,10 @@ impl InstanceRunner {
info!(self.log, "terminating VMM that has exited";
"instance_id" => %self.id());

self.terminate().await?;
Ok(Reaction::Terminate)
self.terminate().await;
Reaction::Terminate
}
None => Ok(Reaction::Continue),
None => Reaction::Continue,
}
}

Expand Down Expand Up @@ -697,7 +701,7 @@ impl InstanceRunner {
///
/// This routine is safe to call even if the instance's zone was never
/// started. It is also safe to call multiple times on a single instance.
async fn terminate_inner(&mut self) -> Result<(), Error> {
async fn terminate_inner(&mut self) {
let zname = propolis_zone_name(self.propolis_id());

// First fetch the running state.
Expand All @@ -715,7 +719,7 @@ impl InstanceRunner {
// Ensure the instance is removed from the instance manager's table
// so that a new instance can take its place.
self.instance_ticket.deregister();
return Ok(());
return;
};

// Take a zone bundle whenever this instance stops.
Expand Down Expand Up @@ -753,8 +757,6 @@ impl InstanceRunner {

// Remove any OPTE ports from the port manager.
running_state.running_zone.release_opte_ports();

Ok(())
}

async fn add_external_ip_inner(
Expand Down Expand Up @@ -867,7 +869,7 @@ pub struct Instance {
tx: mpsc::Sender<InstanceRequest>,

#[allow(dead_code)]
runner_handle: tokio::task::JoinHandle<Result<(), Error>>,
runner_handle: tokio::task::JoinHandle<()>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -1213,7 +1215,7 @@ impl InstanceRunner {
// this happens, generate an instance record bearing the
// "Destroyed" state and return it to the caller.
if self.running_state.is_none() {
self.terminate().await?;
self.terminate().await;
(None, None)
} else {
(
Expand Down Expand Up @@ -1410,13 +1412,13 @@ impl InstanceRunner {
Ok(PropolisSetup { client, running_zone })
}

async fn terminate(&mut self) -> Result<SledInstanceState, Error> {
self.terminate_inner().await?;
async fn terminate(&mut self) -> SledInstanceState {
self.terminate_inner().await;
self.state.terminate_rudely();

// This causes the "run" task to exit on the next iteration.
self.should_terminate = true;
Ok(self.state.sled_instance_state())
self.state.sled_instance_state()
}

async fn issue_snapshot_request(
Expand Down
80 changes: 39 additions & 41 deletions sled-agent/src/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,13 @@ struct InstanceManagerInternal {
tx: mpsc::Sender<InstanceManagerRequest>,
// NOTE: Arguably, this field could be "owned" by the InstanceManagerRunner.
// It was not moved there, and the reservoir functions were not converted to
// use the message-apssing interface (see: "InstanceManagerRequest") because
// use the message-passing interface (see: "InstanceManagerRequest") because
// callers of "get/set reservoir size" are not async, and (in the case of
// getting the size) they also do not expect a "Result" type.
reservoir_size: Mutex<ByteCount>,

#[allow(dead_code)]
runner_handle: tokio::task::JoinHandle<Result<(), Error>>,
runner_handle: tokio::task::JoinHandle<()>,
}

/// All instances currently running on the sled.
Expand Down Expand Up @@ -472,7 +472,7 @@ impl InstanceManagerRunner {
/// Run the main loop of the InstanceManager.
///
/// This should be spawned in a tokio task.
pub async fn run(mut self) -> Result<(), Error> {
pub async fn run(mut self) {
use InstanceManagerRequest::*;
loop {
tokio::select! {
Expand All @@ -486,8 +486,11 @@ impl InstanceManagerRunner {
match request {
Some(request) => {
self.instances.remove(&request.id);
}
None => break,
},
None => {
warn!(self.log, "InstanceManager's 'instance terminate' channel closed; shutting down");
break;
},
}
},
request = self.rx.recv() => {
Expand Down Expand Up @@ -516,20 +519,26 @@ impl InstanceManagerRunner {
let _ = self.instance_issue_disk_snapshot_request(tx, instance_id, disk_id, snapshot_id).await;
},
Some(CreateZoneBundle{ name, tx }) => {
let _ = self.create_zone_bundle(tx, &name).await;
let _ = self.create_zone_bundle(tx, &name).await.map_err(Error::from);
},
Some(InstanceAddExternalIp{ instance_id, ip, tx }) => {
let _ = self.add_external_ip(tx, instance_id, &ip).await;
},
Some(InstanceDeleteExternalIp{ instance_id, ip, tx }) => {
let _ = self.delete_external_ip(tx, instance_id, &ip).await;
},
None => break,
None => {
warn!(self.log, "InstanceManager's request channel closed; shutting down");
break;
},
}
}
}
}
Ok(())
}

fn get_instance(&self, instance_id: Uuid) -> Option<&Instance> {
self.instances.get(&instance_id).map(|(_id, v)| v)
}

/// Ensures that the instance manager contains a registered instance with
Expand Down Expand Up @@ -641,7 +650,7 @@ impl InstanceManagerRunner {
instance_id: Uuid,
) -> Result<(), Error> {
// If the instance does not exist, we response immediately.
let Some((_, instance)) = self.instances.get(&instance_id) else {
let Some(instance) = self.get_instance(instance_id) else {
tx.send(Ok(InstanceUnregisterResponse { updated_runtime: None }))
.map_err(|_| Error::FailedSendChannelClosed)?;
return Ok(());
Expand All @@ -661,34 +670,28 @@ impl InstanceManagerRunner {
instance_id: Uuid,
target: InstanceStateRequested,
) -> Result<(), Error> {
let instance = {
let instance = self.instances.get(&instance_id);

if let Some((_, instance)) = instance {
instance
} else {
match target {
// If the instance isn't registered, then by definition it
// isn't running here. Allow requests to stop or destroy the
// instance to succeed to provide idempotency. This has to
// be handled here (that is, on the "instance not found"
// path) to handle the case where a stop request arrived,
// Propolis handled it, sled agent unregistered the
// instance, and only then did a second stop request
// arrive.
InstanceStateRequested::Stopped => {
tx.send(Ok(InstancePutStateResponse {
updated_runtime: None,
}))
let Some(instance) = self.get_instance(instance_id) else {
match target {
// If the instance isn't registered, then by definition it
// isn't running here. Allow requests to stop or destroy the
// instance to succeed to provide idempotency. This has to
// be handled here (that is, on the "instance not found"
// path) to handle the case where a stop request arrived,
// Propolis handled it, sled agent unregistered the
// instance, and only then did a second stop request
// arrive.
InstanceStateRequested::Stopped => {
tx.send(Ok(InstancePutStateResponse {
updated_runtime: None,
}))
.map_err(|_| Error::FailedSendChannelClosed)?;
}
_ => {
tx.send(Err(Error::NoSuchInstance(instance_id)))
.map_err(|_| Error::FailedSendChannelClosed)?;
}
_ => {
tx.send(Err(Error::NoSuchInstance(instance_id)))
.map_err(|_| Error::FailedSendChannelClosed)?;
}
}
return Ok(());
}
return Ok(());
};
instance.put_state(tx, target).await?;
Ok(())
Expand Down Expand Up @@ -756,12 +759,9 @@ impl InstanceManagerRunner {
instance_id: Uuid,
ip: &InstanceExternalIpBody,
) -> Result<(), Error> {
let instance = { self.instances.get(&instance_id).map(|(_id, v)| v) };

let Some(instance) = instance else {
let Some(instance) = self.get_instance(instance_id) else {
return Err(Error::NoSuchInstance(instance_id));
};

instance.add_external_ip(tx, ip).await?;
Ok(())
}
Expand All @@ -772,9 +772,7 @@ impl InstanceManagerRunner {
instance_id: Uuid,
ip: &InstanceExternalIpBody,
) -> Result<(), Error> {
let instance = { self.instances.get(&instance_id).map(|(_id, v)| v) };

let Some(instance) = instance else {
let Some(instance) = self.get_instance(instance_id) else {
return Err(Error::NoSuchInstance(instance_id));
};

Expand Down

0 comments on commit 447bee9

Please sign in to comment.