Skip to content

Commit

Permalink
More review cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjstone committed Nov 10, 2023
1 parent 35e252b commit 11f1b36
Showing 1 changed file with 71 additions and 42 deletions.
113 changes: 71 additions & 42 deletions sled-storage/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,31 @@ pub enum StorageManagerState {
Normal,
}

enum AddDiskResult {
DiskInserted,
DiskAlreadyInserted,
DiskQueued,
}

impl AddDiskResult {
fn disk_inserted(&self) -> bool {
match self {
AddDiskResult::DiskInserted => true,
_ => false,
}
}
}

impl From<bool> for AddDiskResult {
fn from(value: bool) -> Self {
if value {
AddDiskResult::DiskInserted
} else {
AddDiskResult::DiskAlreadyInserted
}
}
}

#[derive(Debug)]
struct NewFilesystemRequest {
dataset_id: Uuid,
Expand Down Expand Up @@ -281,7 +306,7 @@ impl StorageManager {
queued_u2_drives: HashSet::new(),
key_requester,
resource_updates: update_tx,
last_logged_capacity: 0,
last_logged_capacity: QUEUE_SIZE,
},
StorageHandle { tx, resource_updates: update_rx },
)
Expand Down Expand Up @@ -349,7 +374,7 @@ impl StorageManager {
info!(self.log, "Received {:?}", req);
let should_send_updates = match req {
StorageRequest::AddDisk(raw_disk) => {
self.add_disk(raw_disk).await?
self.add_disk(raw_disk).await?.disk_inserted()
}
StorageRequest::RemoveDisk(raw_disk) => self.remove_disk(raw_disk),
StorageRequest::DisksChanged(raw_disks) => {
Expand Down Expand Up @@ -394,72 +419,73 @@ impl StorageManager {
//
// Return true if updates should be sent to watchers, false otherwise
async fn add_queued_disks(&mut self) -> bool {
info!(self.log, "Attempting to add queued disks");
info!(
self.log,
"Attempting to add queued disks";
"num_disks" => %self.queued_u2_drives.len()
);
self.state = StorageManagerState::Normal;

let mut send_updates = false;

// Disks that should be requeued.
let mut saved = HashSet::new();
let queued = std::mem::take(&mut self.queued_u2_drives);
let mut iter = queued.into_iter();
while let Some(disk) = iter.next() {
let queued = self.queued_u2_drives.clone();
let mut to_dequeue = HashSet::new();
for disk in queued.iter() {
if self.state == StorageManagerState::QueueingDisks {
// We hit a transient error in a prior iteration.
saved.insert(disk);
break;
} else {
// Try to add the disk. If there was a transient error the disk will
// have been requeued. If there was a permanent error, it will have been
// dropped. If there is an another unexpected error, we will handle it and
// requeue ourselves.
match self.add_u2_disk(disk.clone()).await {
Err(err) => {
warn!(
self.log,
"Potentially transient error: {err}: requeuing disk";
"disk_id" => ?disk.identity()
);
saved.insert(disk);
Err(_) => {
// This is an unrecoverable error, so we don't queue the
// disk again.
to_dequeue.insert(disk);
}
Ok(AddDiskResult::DiskInserted) => {
send_updates = true;
to_dequeue.insert(disk);
}
Ok(true) => send_updates = true,
Ok(false) => (),
Ok(AddDiskResult::DiskAlreadyInserted) => {
to_dequeue.insert(disk);
}
Ok(AddDiskResult::DiskQueued) => (),
}
}
}
// Merge any requeued disks from transient errors with saved disks here
self.queued_u2_drives.extend(saved);
// Dequeue any inserted disks
self.queued_u2_drives.retain(|k| !to_dequeue.contains(k));
send_updates
}

// Add a disk to `StorageResources` if it is new,
// updated, or its pool has been updated as determined by
// [`$crate::resources::StorageResources::insert_disk`] and we decide not to
// queue the disk for later addition. If the disk was inserted to resources
// return `Ok(true)`.
//
// In case the disk is queued, it wasn't inserted into `StorageResources`
// for another reason, or we have already consumed and logged an error
// return `Ok(false).
//
// In all other cases return an Error.
async fn add_disk(&mut self, raw_disk: RawDisk) -> Result<bool, Error> {
// queue the disk for later addition.
async fn add_disk(
&mut self,
raw_disk: RawDisk,
) -> Result<AddDiskResult, Error> {
match raw_disk.variant() {
DiskVariant::U2 => self.add_u2_disk(raw_disk).await,
DiskVariant::M2 => self.add_m2_disk(raw_disk).await,
}
}

// Add a U.2 disk to [`StorageResources`] or queue it to be added later
async fn add_u2_disk(&mut self, raw_disk: RawDisk) -> Result<bool, Error> {
async fn add_u2_disk(
&mut self,
raw_disk: RawDisk,
) -> Result<AddDiskResult, Error> {
if self.state != StorageManagerState::Normal {
self.queued_u2_drives.insert(raw_disk);
return Ok(false);
return Ok(AddDiskResult::DiskQueued);
}

match Disk::new(&self.log, raw_disk.clone(), Some(&self.key_requester))
.await
{
Ok(disk) => self.resources.insert_disk(disk),
Ok(disk) => self.resources.insert_disk(disk).map(Into::into),
Err(err @ DiskError::Dataset(DatasetError::KeyManager(_))) => {
warn!(
self.log,
Expand All @@ -468,15 +494,15 @@ impl StorageManager {
);
self.queued_u2_drives.insert(raw_disk);
self.state = StorageManagerState::QueueingDisks;
Ok(false)
Ok(AddDiskResult::DiskQueued)
}
Err(err) => {
error!(
self.log,
"Persistent error: {err}: not queueing disk";
"disk_id" => ?raw_disk.identity()
);
Ok(false)
Err(err.into())
}
}
}
Expand All @@ -486,11 +512,14 @@ impl StorageManager {
//
// We never queue M.2 drives, as they don't rely on [`KeyManager`] based
// encryption
async fn add_m2_disk(&mut self, raw_disk: RawDisk) -> Result<bool, Error> {
async fn add_m2_disk(
&mut self,
raw_disk: RawDisk,
) -> Result<AddDiskResult, Error> {
let disk =
Disk::new(&self.log, raw_disk.clone(), Some(&self.key_requester))
.await?;
self.resources.insert_disk(disk)
self.resources.insert_disk(disk).map(Into::into)
}

// Delete a real disk and return `true` if the disk was actually removed
Expand All @@ -500,7 +529,7 @@ impl StorageManager {
self.resources.remove_disk(raw_disk.identity())
}

// Find all disks to remove that are not in raw_disks and remove them Then
// Find all disks to remove that are not in raw_disks and remove them. Then
// take the remaining disks and try to add them all. `StorageResources` will
// inform us if anything changed, and if so we return true, otherwise we
// return false.
Expand Down Expand Up @@ -540,8 +569,8 @@ impl StorageManager {
for raw_disk in raw_disks {
let disk_id = raw_disk.identity().clone();
match self.add_disk(raw_disk).await {
Ok(true) => should_update = true,
Ok(false) => (),
Ok(AddDiskResult::DiskInserted) => should_update = true,
Ok(_) => (),
Err(err) => {
warn!(
self.log,
Expand Down

0 comments on commit 11f1b36

Please sign in to comment.