Skip to content

Commit

Permalink
Add more logging if client disconnects
Browse files Browse the repository at this point in the history
  • Loading branch information
smklein committed Feb 2, 2024
1 parent 447bee9 commit 0b060f6
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 39 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions sled-agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ slog-async.workspace = true
slog-dtrace.workspace = true
slog-term.workspace = true
smf.workspace = true
strum.workspace = true
tar.workspace = true
thiserror.workspace = true
tofino.workspace = true
Expand Down
56 changes: 42 additions & 14 deletions sled-agent/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ pub enum Error {
#[error("Failed to send request to Instance: Channel closed")]
FailedSendChannelClosed,

#[error(
"Failed to send request from Instance Runner: Client Channel closed"
)]
FailedSendClientClosed,

#[error("Instance dropped our request")]
RequestDropped(#[from] oneshot::error::RecvError),
}
Expand Down Expand Up @@ -199,6 +204,7 @@ struct PropolisSetup {
}

// Requests that can be made of instances
#[derive(strum::Display)]
enum InstanceRequest {
RequestZoneBundle {
tx: oneshot::Sender<Result<ZoneBundleMetadata, BundleError>>,
Expand Down Expand Up @@ -372,7 +378,9 @@ impl InstanceRunner {
// InstanceMonitorRunner has stopped running for
// some reason. We'd presumably handle that on the
// next iteration of the loop.
let _ = tx.send(reaction);
if let Err(_) = tx.send(reaction) {
warn!(self.log, "InstanceRunner failed to send to InstanceMonitorRunner");
}
},
// NOTE: This case shouldn't really happen, as we keep a copy
// of the sender alive in "self.tx_monitor".
Expand All @@ -386,52 +394,72 @@ impl InstanceRunner {
// Handle external requests to act upon the instance.
request = self.rx.recv() => {
use InstanceRequest::*;
match request {
let request_variant = request.as_ref().map(|r| r.to_string());
let result = match request {
Some(RequestZoneBundle { tx }) => {
let _ = tx.send(self.request_zone_bundle().await);
tx.send(self.request_zone_bundle().await)
.map_err(|_| Error::FailedSendClientClosed)
},
Some(CurrentState{ tx }) => {
let _ = tx.send(self.current_state().await);
tx.send(self.current_state().await)
.map_err(|_| Error::FailedSendClientClosed)
},
Some(PutState{ state, tx }) => {
let _ = tx.send(self.put_state(state).await
tx.send(self.put_state(state).await
.map(|r| InstancePutStateResponse { updated_runtime: Some(r) })
.map_err(|e| e.into()));
.map_err(|e| e.into()))
.map_err(|_| Error::FailedSendClientClosed)
},
Some(PutMigrationIds{ old_runtime, migration_ids, tx }) => {
let _ = tx.send(
tx.send(
self.put_migration_ids(
&old_runtime,
&migration_ids
).await.map_err(|e| e.into())
);
)
.map_err(|_| Error::FailedSendClientClosed)
},
Some(Terminate { tx }) => {
let _ = tx.send(Ok(InstanceUnregisterResponse {
tx.send(Ok(InstanceUnregisterResponse {
updated_runtime: Some(self.terminate().await)
}));
}))
.map_err(|_| Error::FailedSendClientClosed)
},
Some(IssueSnapshotRequest { disk_id, snapshot_id, tx }) => {
let _ = tx.send(
tx.send(
self.issue_snapshot_request(
disk_id,
snapshot_id
).await.map_err(|e| e.into())
);
)
.map_err(|_| Error::FailedSendClientClosed)
},
Some(AddExternalIp { ip, tx }) => {
let _ = tx.send(self.add_external_ip(&ip).await.map_err(|e| e.into()));
tx.send(self.add_external_ip(&ip).await.map_err(|e| e.into()))
.map_err(|_| Error::FailedSendClientClosed)
},
Some(DeleteExternalIp { ip, tx }) => {
let _ = tx.send(self.delete_external_ip(&ip).await.map_err(|e| e.into()));
tx.send(self.delete_external_ip(&ip).await.map_err(|e| e.into()))
.map_err(|_| Error::FailedSendClientClosed)
},
None => {
warn!(self.log, "Instance request channel closed; shutting down");
self.terminate().await;
break;
},
};

if let Err(err) = result {
warn!(
self.log,
"Error handling request";
"request" => request_variant.unwrap(),
"err" => ?err,

);
}
}

}
}
self.publish_state_to_nexus().await;
Expand Down
66 changes: 41 additions & 25 deletions sled-agent/src/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@ pub enum Error {
ZoneBundle(#[from] BundleError),

#[error("Failed to send request to Instance Manager: Channel closed")]
FailedSendChannelClosed,
FailedSendInstanceManagerClosed,

#[error(
"Failed to send request from Instance Manager: Client Channel closed"
)]
FailedSendClientClosed,

#[error("Instance Manager dropped our request")]
RequestDropped(#[from] oneshot::error::RecvError),
Expand Down Expand Up @@ -244,7 +249,7 @@ impl InstanceManager {
tx,
})
.await
.map_err(|_| Error::FailedSendChannelClosed)?;
.map_err(|_| Error::FailedSendInstanceManagerClosed)?;
rx.await?
}

Expand All @@ -260,7 +265,7 @@ impl InstanceManager {
tx,
})
.await
.map_err(|_| Error::FailedSendChannelClosed)?;
.map_err(|_| Error::FailedSendInstanceManagerClosed)?;
rx.await?
}

Expand All @@ -278,7 +283,7 @@ impl InstanceManager {
tx,
})
.await
.map_err(|_| Error::FailedSendChannelClosed)?;
.map_err(|_| Error::FailedSendInstanceManagerClosed)?;
rx.await?
}

Expand All @@ -298,7 +303,7 @@ impl InstanceManager {
tx,
})
.await
.map_err(|_| Error::FailedSendChannelClosed)?;
.map_err(|_| Error::FailedSendInstanceManagerClosed)?;
rx.await?
}

Expand All @@ -318,7 +323,7 @@ impl InstanceManager {
tx,
})
.await
.map_err(|_| Error::FailedSendChannelClosed)?;
.map_err(|_| Error::FailedSendInstanceManagerClosed)?;
rx.await?
}

Expand Down Expand Up @@ -353,7 +358,7 @@ impl InstanceManager {
tx,
})
.await
.map_err(|_| Error::FailedSendChannelClosed)?;
.map_err(|_| Error::FailedSendInstanceManagerClosed)?;
rx.await?
}

Expand All @@ -371,7 +376,7 @@ impl InstanceManager {
tx,
})
.await
.map_err(|_| Error::FailedSendChannelClosed)?;
.map_err(|_| Error::FailedSendInstanceManagerClosed)?;
rx.await?
}
}
Expand All @@ -382,6 +387,7 @@ impl InstanceManager {
// the runner task.
//
// By convention, responses are sent on the "tx" oneshot.
#[derive(strum::Display)]
enum InstanceManagerRequest {
EnsureRegistered {
instance_id: Uuid,
Expand Down Expand Up @@ -494,7 +500,8 @@ impl InstanceManagerRunner {
}
},
request = self.rx.recv() => {
match request {
let request_variant = request.as_ref().map(|r| r.to_string());
let result = match request {
Some(EnsureRegistered {
instance_id,
propolis_id,
Expand All @@ -504,33 +511,42 @@ impl InstanceManagerRunner {
propolis_addr,
tx,
}) => {
let _ = tx.send(self.ensure_registered(instance_id, propolis_id, hardware, instance_runtime, vmm_runtime, propolis_addr).await);
tx.send(self.ensure_registered(instance_id, propolis_id, hardware, instance_runtime, vmm_runtime, propolis_addr).await).map_err(|_| Error::FailedSendClientClosed)
},
Some(EnsureUnregistered { instance_id, tx }) => {
let _ = self.ensure_unregistered(tx, instance_id).await;
self.ensure_unregistered(tx, instance_id).await
},
Some(EnsureState { instance_id, target, tx }) => {
let _ = self.ensure_state(tx, instance_id, target).await;
self.ensure_state(tx, instance_id, target).await
},
Some(PutMigrationIds{ instance_id, old_runtime, migration_ids, tx }) => {
let _ = self.put_migration_ids(tx, instance_id, &old_runtime, &migration_ids).await;
Some(PutMigrationIds { instance_id, old_runtime, migration_ids, tx }) => {
self.put_migration_ids(tx, instance_id, &old_runtime, &migration_ids).await
},
Some(InstanceIssueDiskSnapshot{ instance_id, disk_id, snapshot_id, tx }) => {
let _ = self.instance_issue_disk_snapshot_request(tx, instance_id, disk_id, snapshot_id).await;
Some(InstanceIssueDiskSnapshot { instance_id, disk_id, snapshot_id, tx }) => {
self.instance_issue_disk_snapshot_request(tx, instance_id, disk_id, snapshot_id).await
},
Some(CreateZoneBundle{ name, tx }) => {
let _ = self.create_zone_bundle(tx, &name).await.map_err(Error::from);
Some(CreateZoneBundle { name, tx }) => {
self.create_zone_bundle(tx, &name).await.map_err(Error::from)
},
Some(InstanceAddExternalIp{ instance_id, ip, tx }) => {
let _ = self.add_external_ip(tx, instance_id, &ip).await;
Some(InstanceAddExternalIp { instance_id, ip, tx }) => {
self.add_external_ip(tx, instance_id, &ip).await
},
Some(InstanceDeleteExternalIp{ instance_id, ip, tx }) => {
let _ = self.delete_external_ip(tx, instance_id, &ip).await;
Some(InstanceDeleteExternalIp { instance_id, ip, tx }) => {
self.delete_external_ip(tx, instance_id, &ip).await
},
None => {
warn!(self.log, "InstanceManager's request channel closed; shutting down");
break;
},
};

if let Err(err) = result {
warn!(
self.log,
"Error handling request";
"request" => request_variant.unwrap(),
"err" => ?err
);
}
}
}
Expand Down Expand Up @@ -652,7 +668,7 @@ impl InstanceManagerRunner {
// If the instance does not exist, we response immediately.
let Some(instance) = self.get_instance(instance_id) else {
tx.send(Ok(InstanceUnregisterResponse { updated_runtime: None }))
.map_err(|_| Error::FailedSendChannelClosed)?;
.map_err(|_| Error::FailedSendClientClosed)?;
return Ok(());
};

Expand Down Expand Up @@ -684,11 +700,11 @@ impl InstanceManagerRunner {
tx.send(Ok(InstancePutStateResponse {
updated_runtime: None,
}))
.map_err(|_| Error::FailedSendChannelClosed)?;
.map_err(|_| Error::FailedSendClientClosed)?;
}
_ => {
tx.send(Err(Error::NoSuchInstance(instance_id)))
.map_err(|_| Error::FailedSendChannelClosed)?;
.map_err(|_| Error::FailedSendClientClosed)?;
}
}
return Ok(());
Expand Down

0 comments on commit 0b060f6

Please sign in to comment.