From b6753879deaa450f1e5d999786e95c8cbc9ecf69 Mon Sep 17 00:00:00 2001 From: John Gallagher Date: Thu, 30 Nov 2023 12:03:15 -0500 Subject: [PATCH 01/13] Move BlockSizeBufWriter to installinator-common --- Cargo.lock | 1 + installinator-common/Cargo.toml | 1 + .../src/block_size_writer.rs | 29 ++++++++++++++----- installinator-common/src/lib.rs | 2 ++ installinator/src/lib.rs | 1 - installinator/src/write.rs | 11 +++---- 6 files changed, 29 insertions(+), 16 deletions(-) rename {installinator => installinator-common}/src/block_size_writer.rs (85%) diff --git a/Cargo.lock b/Cargo.lock index 981dd99082..eca3c458ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3267,6 +3267,7 @@ dependencies = [ "serde_json", "serde_with", "thiserror", + "tokio", "update-engine", ] diff --git a/installinator-common/Cargo.toml b/installinator-common/Cargo.toml index 4381de74eb..e5feefa068 100644 --- a/installinator-common/Cargo.toml +++ b/installinator-common/Cargo.toml @@ -13,5 +13,6 @@ serde.workspace = true serde_json.workspace = true serde_with.workspace = true thiserror.workspace = true +tokio.workspace = true update-engine.workspace = true omicron-workspace-hack.workspace = true diff --git a/installinator/src/block_size_writer.rs b/installinator-common/src/block_size_writer.rs similarity index 85% rename from installinator/src/block_size_writer.rs rename to installinator-common/src/block_size_writer.rs index 3f41a4ee99..488e7338db 100644 --- a/installinator/src/block_size_writer.rs +++ b/installinator-common/src/block_size_writer.rs @@ -11,31 +11,37 @@ use tokio::io::AsyncWrite; /// `BlockSizeBufWriter` is analogous to a tokio's `BufWriter`, except it /// guarantees that writes made to the underlying writer are always -/// _exactly_ the requested block size, with two exceptions: explicitly -/// calling (1) `flush()` or (2) `shutdown()` will write any -/// buffered-but-not-yet-written data to the underlying buffer regardless of -/// its length. +/// _exactly_ the requested block size, with three exceptions: +/// +/// 1. Calling `flush()` will write any currently-buffered data to the +/// underlying writer, regardless of its length. +/// 2. Similarily, calling `shutdown()` will flush any currently-buffered data +/// to the underlying writer. +/// 3. When `BlockSizeBufWriter` attempts to write a block-length amount of data +/// to the underlying writer, if that writer only accepts a portion of that +/// data, `BlockSizeBufWriter` will continue attempting to write the +/// remainder of the block. /// /// When `BlockSizeBufWriter` is dropped, any buffered data it's holding /// will be discarded. It is critical to manually call /// `BlockSizeBufWriter:flush()` or `BlockSizeBufWriter::shutdown()` prior /// to dropping to avoid data loss. -pub(crate) struct BlockSizeBufWriter { +pub struct BlockSizeBufWriter { inner: W, buf: Vec, block_size: usize, } impl BlockSizeBufWriter { - pub(crate) fn with_block_size(block_size: usize, inner: W) -> Self { + pub fn with_block_size(block_size: usize, inner: W) -> Self { Self { inner, buf: Vec::with_capacity(block_size), block_size } } - pub(crate) fn into_inner(self) -> W { + pub fn into_inner(self) -> W { self.inner } - pub(crate) fn block_size(&self) -> usize { + pub fn block_size(&self) -> usize { self.block_size } @@ -46,6 +52,13 @@ impl BlockSizeBufWriter { fn flush_buf(&mut self, cx: &mut Context<'_>) -> Poll> { let mut written = 0; let mut ret = Ok(()); + + // We expect this loop to execute exactly one time: we try to write the + // entirety of `self.buf` to `self.inner`, and presumably it is a type + // that expects to receive a block of data at once, so we'll immediately + // jump to `written == self.buf.len()`. If it returns `Ok(n)` for some + // `n < self.buf.len()`, we'll loop and try to write the rest of the + // data in less-than-block-sized chunks. while written < self.buf.len() { match ready!( Pin::new(&mut self.inner).poll_write(cx, &self.buf[written..]) diff --git a/installinator-common/src/lib.rs b/installinator-common/src/lib.rs index b77385840f..1f6f60388f 100644 --- a/installinator-common/src/lib.rs +++ b/installinator-common/src/lib.rs @@ -4,6 +4,8 @@ //! Common types shared by the installinator client and server. +mod block_size_writer; mod progress; +pub use block_size_writer::*; pub use progress::*; diff --git a/installinator/src/lib.rs b/installinator/src/lib.rs index c7de189576..3b1d768a7d 100644 --- a/installinator/src/lib.rs +++ b/installinator/src/lib.rs @@ -4,7 +4,6 @@ mod artifact; mod async_temp_file; -mod block_size_writer; mod bootstrap; mod dispatch; mod errors; diff --git a/installinator/src/write.rs b/installinator/src/write.rs index 22dd2adbf6..64d412be93 100644 --- a/installinator/src/write.rs +++ b/installinator/src/write.rs @@ -20,9 +20,9 @@ use illumos_utils::{ zpool::{Zpool, ZpoolName}, }; use installinator_common::{ - ControlPlaneZonesSpec, ControlPlaneZonesStepId, M2Slot, StepContext, - StepProgress, StepResult, StepSuccess, UpdateEngine, WriteComponent, - WriteError, WriteOutput, WriteSpec, WriteStepId, + BlockSizeBufWriter, ControlPlaneZonesSpec, ControlPlaneZonesStepId, M2Slot, + StepContext, StepProgress, StepResult, StepSuccess, UpdateEngine, + WriteComponent, WriteError, WriteOutput, WriteSpec, WriteStepId, }; use omicron_common::update::{ArtifactHash, ArtifactHashId}; use sha2::{Digest, Sha256}; @@ -36,10 +36,7 @@ use update_engine::{ errors::NestedEngineError, events::ProgressUnits, StepSpec, }; -use crate::{ - async_temp_file::AsyncNamedTempFile, block_size_writer::BlockSizeBufWriter, - hardware::Hardware, -}; +use crate::{async_temp_file::AsyncNamedTempFile, hardware::Hardware}; #[derive(Clone, Debug)] struct ArtifactDestination { From daeac69f6b6ae8defc18a107361c4eca6240b679 Mon Sep 17 00:00:00 2001 From: John Gallagher Date: Thu, 30 Nov 2023 15:09:53 -0500 Subject: [PATCH 02/13] extract RawDiskWriter from installinator --- Cargo.lock | 1 + installinator-common/Cargo.toml | 1 + installinator-common/src/lib.rs | 2 + installinator-common/src/raw_disk_writer.rs | 123 ++++++++++++++++++++ installinator/src/write.rs | 58 +++------ 5 files changed, 140 insertions(+), 45 deletions(-) create mode 100644 installinator-common/src/raw_disk_writer.rs diff --git a/Cargo.lock b/Cargo.lock index eca3c458ca..f6238fa51f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3261,6 +3261,7 @@ dependencies = [ "anyhow", "camino", "illumos-utils", + "libc", "omicron-workspace-hack", "schemars", "serde", diff --git a/installinator-common/Cargo.toml b/installinator-common/Cargo.toml index e5feefa068..ffec81e03d 100644 --- a/installinator-common/Cargo.toml +++ b/installinator-common/Cargo.toml @@ -8,6 +8,7 @@ license = "MPL-2.0" anyhow.workspace = true camino.workspace = true illumos-utils.workspace = true +libc.workspace = true schemars.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/installinator-common/src/lib.rs b/installinator-common/src/lib.rs index 1f6f60388f..4771de7b27 100644 --- a/installinator-common/src/lib.rs +++ b/installinator-common/src/lib.rs @@ -6,6 +6,8 @@ mod block_size_writer; mod progress; +mod raw_disk_writer; pub use block_size_writer::*; pub use progress::*; +pub use raw_disk_writer::*; diff --git a/installinator-common/src/raw_disk_writer.rs b/installinator-common/src/raw_disk_writer.rs new file mode 100644 index 0000000000..549b4b5c75 --- /dev/null +++ b/installinator-common/src/raw_disk_writer.rs @@ -0,0 +1,123 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Async writer for raw disks on illumos (e.g., host OS phase 2 images written +//! to M.2 drives). + +use crate::BlockSizeBufWriter; +use illumos_utils::dkio; +use illumos_utils::dkio::MediaInfoExtended; +use tokio::io::AsyncWriteExt; +use std::io; +use std::os::fd::AsRawFd; +use std::path::Path; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; +use tokio::fs::File; +use tokio::io::AsyncWrite; + +/// Writer for illumos raw disks. +/// +/// Construct an instance via [`RawDiskWriter::open()`], write to it just like +/// any other async writer (it will handle passing writes down to the device in +/// chunks of length [`RawDiskWriter::block_size()`]), and then call +/// [`RawDiskWriter::finalize()`]. It is **critical** to call `finalize()`; +/// failure to do so will likely lead to data loss. +/// +/// `RawDiskWriter` attempts to be as conservative as it can about ensuring data +/// is written: +/// +/// * The device is opened with `O_SYNC` +/// * In `finalize()`, the file is `fsync`'d after any remaining data is flushed +/// * In `finalize()`, the disk write cache is flushed (if supported by the +/// target device) +/// +/// Writing an amount of data that is not a multiple of the device's +/// `block_size()` will likely result in a failure when writing / flushing the +/// final not-correctly-sized chunk. +/// +/// This type is illumos-specific due to using dkio for two things: +/// +/// 1. Determining the logical block size of the device +/// 2. Flushing the disk write cache +pub struct RawDiskWriter { + inner: BlockSizeBufWriter, +} + +impl RawDiskWriter { + /// Open the disk device at `path` for writing, and attempt to determine its + /// logical block size via [`MediaInfoExtended`]. + pub async fn open(path: &Path) -> io::Result { + let f = tokio::fs::OpenOptions::new() + .create(false) + .write(true) + .truncate(false) + .custom_flags(libc::O_SYNC) + .open(path) + .await?; + + let media_info = MediaInfoExtended::from_fd(f.as_raw_fd())?; + + let inner = BlockSizeBufWriter::with_block_size( + media_info.logical_block_size as usize, + f, + ); + + Ok(Self { inner }) + } + + /// The logical block size of the underlying device. + pub fn block_size(&self) -> usize { + self.inner.block_size() + } + + /// Flush any remaining data and attempt to ensure synchronization with the + /// device. + pub async fn finalize(mut self) -> io::Result<()> { + // Flush any remaining data in our buffer + self.inner.flush().await?; + + // `fsync` the file... + let f = self.inner.into_inner(); + f.sync_all().await?; + + // ...and also attempt to flush the disk write cache + tokio::task::spawn_blocking(move || { + match dkio::flush_write_cache(f.as_raw_fd()) { + Ok(()) => Ok(()), + // Some drives don't support `flush_write_cache`; we don't want + // to fail in this case. + Err(err) if err.raw_os_error() == Some(libc::ENOTSUP) => Ok(()), + Err(err) => Err(err), + } + }) + .await + .unwrap() + } +} + +impl AsyncWrite for RawDiskWriter { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.inner).poll_write(cx, buf) + } + + fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut self.inner).poll_flush(cx) + } + + fn poll_shutdown( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut self.inner).poll_shutdown(cx) + } +} diff --git a/installinator/src/write.rs b/installinator/src/write.rs index 64d412be93..380595b4cd 100644 --- a/installinator/src/write.rs +++ b/installinator/src/write.rs @@ -6,7 +6,6 @@ use std::{ collections::{btree_map::Entry, BTreeMap, BTreeSet}, fmt, io::{self, Read}, - os::fd::AsRawFd, time::Duration, }; @@ -15,12 +14,9 @@ use async_trait::async_trait; use buf_list::BufList; use bytes::Buf; use camino::{Utf8Path, Utf8PathBuf}; -use illumos_utils::{ - dkio::{self, MediaInfoExtended}, - zpool::{Zpool, ZpoolName}, -}; +use illumos_utils::zpool::{Zpool, ZpoolName}; use installinator_common::{ - BlockSizeBufWriter, ControlPlaneZonesSpec, ControlPlaneZonesStepId, M2Slot, + ControlPlaneZonesSpec, ControlPlaneZonesStepId, M2Slot, RawDiskWriter, StepContext, StepProgress, StepResult, StepSuccess, UpdateEngine, WriteComponent, WriteError, WriteOutput, WriteSpec, WriteStepId, }; @@ -751,28 +747,13 @@ impl WriteTransportWriter for AsyncNamedTempFile { } #[async_trait] -impl WriteTransportWriter for BlockSizeBufWriter { +impl WriteTransportWriter for RawDiskWriter { fn block_size(&self) -> Option { - Some(BlockSizeBufWriter::block_size(self)) + Some(RawDiskWriter::block_size(self)) } async fn finalize(self) -> io::Result<()> { - let f = self.into_inner(); - f.sync_all().await?; - - // We only create `BlockSizeBufWriter` for the raw block device storing - // the OS ramdisk. After `fsync`'ing, also flush the write cache. - tokio::task::spawn_blocking(move || { - match dkio::flush_write_cache(f.as_raw_fd()) { - Ok(()) => Ok(()), - // Some drives don't support `flush_write_cache`; we don't want - // to fail in this case. - Err(err) if err.raw_os_error() == Some(libc::ENOTSUP) => Ok(()), - Err(err) => Err(err), - } - }) - .await - .unwrap() + RawDiskWriter::finalize(self).await } } @@ -807,7 +788,7 @@ struct BlockDeviceTransport; #[async_trait] impl WriteTransport for BlockDeviceTransport { - type W = BlockSizeBufWriter; + type W = RawDiskWriter; async fn make_writer( &mut self, @@ -816,12 +797,7 @@ impl WriteTransport for BlockDeviceTransport { destination: &Utf8Path, total_bytes: u64, ) -> Result { - let f = tokio::fs::OpenOptions::new() - .create(false) - .write(true) - .truncate(false) - .custom_flags(libc::O_SYNC) - .open(destination) + let writer = RawDiskWriter::open(destination.as_std_path()) .await .map_err(|error| WriteError::WriteError { component, @@ -831,18 +807,7 @@ impl WriteTransport for BlockDeviceTransport { error, })?; - let media_info = - MediaInfoExtended::from_fd(f.as_raw_fd()).map_err(|error| { - WriteError::WriteError { - component, - slot, - written_bytes: 0, - total_bytes, - error, - } - })?; - - let block_size = u64::from(media_info.logical_block_size); + let block_size = writer.block_size() as u64; // When writing to a block device, we must write a multiple of the block // size. We can assume the image we're given should be @@ -855,12 +820,15 @@ impl WriteTransport for BlockDeviceTransport { total_bytes, error: io::Error::new( io::ErrorKind::InvalidData, - format!("file size ({total_bytes}) is not a multiple of target device block size ({block_size})") + format!( + "file size ({total_bytes}) is not a multiple of \ + target device block size ({block_size})" + ), ), }); } - Ok(BlockSizeBufWriter::with_block_size(block_size as usize, f)) + Ok(writer) } } From 0a7a278eea43454b8c5569e55179d266d07fecc1 Mon Sep 17 00:00:00 2001 From: John Gallagher Date: Thu, 30 Nov 2023 15:58:15 -0500 Subject: [PATCH 03/13] [sled-agent] add preliminary "write boot disk OS" http endpoints --- Cargo.lock | 5 + common/src/lib.rs | 2 + common/src/update.rs | 4 +- installinator-common/src/raw_disk_writer.rs | 2 +- openapi/sled-agent.json | 205 +++++++ sled-agent/Cargo.toml | 5 + sled-agent/src/boot_disk_os_writer.rs | 563 ++++++++++++++++++++ sled-agent/src/http_entrypoints.rs | 77 ++- sled-agent/src/lib.rs | 1 + sled-agent/src/sled_agent.rs | 9 + 10 files changed, 870 insertions(+), 3 deletions(-) create mode 100644 sled-agent/src/boot_disk_os_writer.rs diff --git a/Cargo.lock b/Cargo.lock index f6238fa51f..525f43a154 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4816,6 +4816,7 @@ dependencies = [ "crucible-agent-client", "ddm-admin-client", "derive_more", + "display-error-chain", "dns-server", "dns-service-client", "dpd-client", @@ -4825,10 +4826,12 @@ dependencies = [ "futures", "gateway-client", "glob", + "hex", "http", "hyper", "hyper-staticfile", "illumos-utils", + "installinator-common", "internal-dns", "ipnetwork", "itertools 0.12.0", @@ -4856,6 +4859,7 @@ dependencies = [ "schemars", "semver 1.0.20", "serde", + "serde_human_bytes", "serde_json", "serial_test", "sha3", @@ -4874,6 +4878,7 @@ dependencies = [ "thiserror", "tofino", "tokio", + "tokio-stream", "toml 0.8.8", "usdt", "uuid", diff --git a/common/src/lib.rs b/common/src/lib.rs index 1d2ed0afdb..0d63de90fb 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -31,6 +31,8 @@ pub mod postgres_config; pub mod update; pub mod vlan; +pub use update::hex_schema; + #[macro_export] macro_rules! generate_logging_api { ($path:literal) => { diff --git a/common/src/update.rs b/common/src/update.rs index 81256eb526..28d5ae50a6 100644 --- a/common/src/update.rs +++ b/common/src/update.rs @@ -296,7 +296,9 @@ impl FromStr for ArtifactHash { } } -fn hex_schema(gen: &mut SchemaGenerator) -> Schema { +/// Produce an OpenAPI schema describing a hex array of a specific length (e.g., +/// a hash digest). +pub fn hex_schema(gen: &mut SchemaGenerator) -> Schema { let mut schema: SchemaObject = ::json_schema(gen).into(); schema.format = Some(format!("hex string ({N} bytes)")); schema.into() diff --git a/installinator-common/src/raw_disk_writer.rs b/installinator-common/src/raw_disk_writer.rs index 549b4b5c75..af72cd239c 100644 --- a/installinator-common/src/raw_disk_writer.rs +++ b/installinator-common/src/raw_disk_writer.rs @@ -8,7 +8,6 @@ use crate::BlockSizeBufWriter; use illumos_utils::dkio; use illumos_utils::dkio::MediaInfoExtended; -use tokio::io::AsyncWriteExt; use std::io; use std::os::fd::AsRawFd; use std::path::Path; @@ -17,6 +16,7 @@ use std::task::Context; use std::task::Poll; use tokio::fs::File; use tokio::io::AsyncWrite; +use tokio::io::AsyncWriteExt; /// Writer for illumos raw disks. /// diff --git a/openapi/sled-agent.json b/openapi/sled-agent.json index 5e217b27a4..466a67dfee 100644 --- a/openapi/sled-agent.json +++ b/openapi/sled-agent.json @@ -10,6 +10,96 @@ "version": "0.0.1" }, "paths": { + "/boot-disk/{boot_disk}/os/write": { + "post": { + "summary": "Write a new host OS image to the specified boot disk", + "operationId": "host_os_write_start", + "parameters": [ + { + "in": "path", + "name": "boot_disk", + "required": true, + "schema": { + "$ref": "#/components/schemas/M2Slot" + } + }, + { + "in": "query", + "name": "sha3_256_digest", + "required": true, + "schema": { + "type": "string", + "format": "hex string (32 bytes)" + } + }, + { + "in": "query", + "name": "update_id", + "required": true, + "schema": { + "type": "string", + "format": "uuid" + } + } + ], + "requestBody": { + "content": { + "application/octet-stream": { + "schema": { + "type": "string", + "format": "binary" + } + } + }, + "required": true + }, + "responses": { + "204": { + "description": "resource updated" + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + }, + "/boot-disk/{boot_disk}/os/write/status": { + "get": { + "summary": "Get the status of writing a new host OS", + "operationId": "host_os_write_status", + "parameters": [ + { + "in": "path", + "name": "boot_disk", + "required": true, + "schema": { + "$ref": "#/components/schemas/M2Slot" + } + } + ], + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/BootDiskOsWriteStatus" + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + }, "/cockroachdb": { "post": { "summary": "Initializes a CockroachDB cluster", @@ -2135,6 +2225,113 @@ "range" ] }, + "BootDiskOsWriteProgress": { + "oneOf": [ + { + "type": "object", + "properties": { + "bytes_received": { + "type": "integer", + "format": "uint", + "minimum": 0 + }, + "state": { + "type": "string", + "enum": [ + "receiving_uploaded_image" + ] + } + }, + "required": [ + "bytes_received", + "state" + ] + } + ] + }, + "BootDiskOsWriteStatus": { + "oneOf": [ + { + "type": "object", + "properties": { + "status": { + "type": "string", + "enum": [ + "no_update_running" + ] + } + }, + "required": [ + "status" + ] + }, + { + "type": "object", + "properties": { + "progress": { + "$ref": "#/components/schemas/BootDiskOsWriteProgress" + }, + "status": { + "type": "string", + "enum": [ + "in_progress" + ] + }, + "update_id": { + "type": "string", + "format": "uuid" + } + }, + "required": [ + "progress", + "status", + "update_id" + ] + }, + { + "type": "object", + "properties": { + "status": { + "type": "string", + "enum": [ + "complete" + ] + }, + "update_id": { + "type": "string", + "format": "uuid" + } + }, + "required": [ + "status", + "update_id" + ] + }, + { + "type": "object", + "properties": { + "message": { + "type": "string" + }, + "status": { + "type": "string", + "enum": [ + "failed" + ] + }, + "update_id": { + "type": "string", + "format": "uuid" + } + }, + "required": [ + "message", + "status", + "update_id" + ] + } + ] + }, "BundleUtilization": { "description": "The portion of a debug dataset used for zone bundles.", "type": "object", @@ -6480,6 +6677,14 @@ "description": "Zpool names are of the format ox{i,p}_. They are either Internal or External, and should be unique", "type": "string", "pattern": "^ox[ip]_[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$" + }, + "M2Slot": { + "description": "An M.2 slot that was written.", + "type": "string", + "enum": [ + "A", + "B" + ] } }, "responses": { diff --git a/sled-agent/Cargo.toml b/sled-agent/Cargo.toml index 61e61709e1..3b6fd7c162 100644 --- a/sled-agent/Cargo.toml +++ b/sled-agent/Cargo.toml @@ -25,14 +25,17 @@ derive_more.workspace = true dns-server.workspace = true dns-service-client.workspace = true dpd-client.workspace = true +display-error-chain.workspace = true dropshot.workspace = true flate2.workspace = true futures.workspace = true glob.workspace = true +hex.workspace = true http.workspace = true hyper-staticfile.workspace = true gateway-client.workspace = true illumos-utils.workspace = true +installinator-common.workspace = true internal-dns.workspace = true ipnetwork.workspace = true itertools.workspace = true @@ -53,6 +56,7 @@ reqwest = { workspace = true, features = ["rustls-tls", "stream"] } schemars = { workspace = true, features = [ "chrono", "uuid1" ] } semver.workspace = true serde.workspace = true +serde_human_bytes.workspace = true serde_json = {workspace = true, features = ["raw_value"]} sha3.workspace = true sled-agent-client.workspace = true @@ -93,6 +97,7 @@ subprocess.workspace = true slog-async.workspace = true slog-term.workspace = true tempfile.workspace = true +tokio-stream.workspace = true illumos-utils = { workspace = true, features = ["testing", "tmp_keypath"] } sled-storage = { workspace = true, features = ["testing"] } diff --git a/sled-agent/src/boot_disk_os_writer.rs b/sled-agent/src/boot_disk_os_writer.rs new file mode 100644 index 0000000000..6fb2230dcd --- /dev/null +++ b/sled-agent/src/boot_disk_os_writer.rs @@ -0,0 +1,563 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! TODO FIXME + +use crate::http_entrypoints::BootDiskOsWriteProgress; +use crate::http_entrypoints::BootDiskOsWriteStatus; +use bytes::Bytes; +use display_error_chain::DisplayErrorChain; +use dropshot::HttpError; +use futures::Stream; +use futures::TryStreamExt; +use installinator_common::M2Slot; +use sha3::Digest; +use sha3::Sha3_256; +use slog::Logger; +use std::collections::btree_map::Entry; +use std::collections::BTreeMap; +use std::io; +use std::sync::Arc; +use std::sync::Mutex; +use tokio::fs::File; +use tokio::io::AsyncSeekExt; +use tokio::io::AsyncWriteExt; +use tokio::sync::oneshot; +use tokio::sync::oneshot::error::TryRecvError; +use tokio::sync::watch; +use uuid::Uuid; + +impl BootDiskOsWriteStatus { + fn from_result( + update_id: Uuid, + result: &Result<(), Arc>, + ) -> Self { + match result { + Ok(()) => Self::Complete { update_id }, + Err(err) => Self::Failed { + update_id, + message: DisplayErrorChain::new(err).to_string(), + }, + } + } +} + +#[derive(Debug, thiserror::Error)] +pub(crate) enum BootDiskOsWriteError { + // This variant should be impossible in production, as we build with + // panic=abort, but may be constructed in tests (e.g., during tokio runtime + // shutdown). + #[error("internal error (task panic)")] + TaskPanic, + #[error("another update is still running ({0})")] + AnotherUpdateRunning(Uuid), + #[error("failed to create temporary file")] + FailedCreatingTempfile(io::Error), + #[error("failed writing to temporary file")] + FailedWritingTempfile(io::Error), + #[error("failed downloading image from HTTP client")] + FailedDownloadingImage(HttpError), + #[error("hash mismatch in image from HTTP client: expected {expected} but got {got}")] + UploadedImageHashMismatch { expected: String, got: String }, +} + +impl From<&BootDiskOsWriteError> for HttpError { + fn from(error: &BootDiskOsWriteError) -> Self { + let message = DisplayErrorChain::new(error).to_string(); + match error { + BootDiskOsWriteError::AnotherUpdateRunning(_) + | BootDiskOsWriteError::FailedDownloadingImage(_) + | BootDiskOsWriteError::UploadedImageHashMismatch { .. } => { + HttpError::for_bad_request(None, message) + } + BootDiskOsWriteError::TaskPanic + | BootDiskOsWriteError::FailedCreatingTempfile(_) + | BootDiskOsWriteError::FailedWritingTempfile(_) => HttpError { + status_code: http::StatusCode::SERVICE_UNAVAILABLE, + error_code: None, + external_message: message.clone(), + internal_message: message, + }, + } + } +} + +#[derive(Debug)] +pub(crate) struct BootDiskOsWriter { + // Note: We use a std Mutex here to avoid cancellation issues with tokio + // Mutex. We never need to keep the lock held longer than necessary to copy + // or replace the current writer state. + states: Mutex>, + log: Logger, +} + +impl BootDiskOsWriter { + pub(crate) fn new(log: &Logger) -> Self { + Self { + states: Mutex::default(), + log: log.new(slog::o!("component" => "BootDiskOsWriter")), + } + } + + pub(crate) async fn start_update( + &self, + boot_disk: M2Slot, + update_id: Uuid, + sha3_256_digest: [u8; 32], + image_upload: S, + ) -> Result<(), Arc> + where + S: Stream> + Send + 'static, + { + // Construct a closure that will spawn a task to drive this update, but + // don't actually start it yet: we only allow an update to start if + // there's not currently an update running targetting the same slot, so + // we'll spawn this after checking that below. + let spawn_update_task = || { + let (uploaded_image_tx, uploaded_image_rx) = oneshot::channel(); + let (progress_tx, progress_rx) = watch::channel( + BootDiskOsWriteProgress::ReceivingUploadedImage { + bytes_received: 0, + }, + ); + let (complete_tx, complete_rx) = oneshot::channel(); + let task = BootDiskOsWriteTask { + log: self + .log + .new(slog::o!("update_id" => update_id.to_string())), + sha3_256_digest, + progress_tx, + complete_tx, + }; + tokio::spawn(task.run(image_upload, uploaded_image_tx)); + ( + uploaded_image_rx, + TaskRunningState { update_id, progress_rx, complete_rx }, + ) + }; + + // Either call `spawn_update_task` and get back the handle to + // `uploaded_image_rx`, or return an error (if another update for this + // boot disk is still running). + let uploaded_image_rx = { + let mut states = self.states.lock().unwrap(); + match states.entry(boot_disk) { + Entry::Vacant(slot) => { + let (uploaded_image_rx, running) = spawn_update_task(); + slot.insert(WriterState::TaskRunning(running)); + uploaded_image_rx + } + Entry::Occupied(mut slot) => match slot.get_mut() { + WriterState::TaskRunning(running) => { + // Check whether the task is _actually_ still running, + // or whether it's done and just waiting for us to + // realize it. + match running.complete_rx.try_recv() { + Ok(_prev_result) => { + // A previous write is done, but we're + // immedately starting a new one, so discard the + // previous result. + let (uploaded_image_rx, running) = + spawn_update_task(); + slot.insert(WriterState::TaskRunning(running)); + uploaded_image_rx + } + Err(TryRecvError::Empty) => { + return Err(Arc::new( + BootDiskOsWriteError::AnotherUpdateRunning( + running.update_id, + ), + )); + } + Err(TryRecvError::Closed) => { + return Err(Arc::new( + BootDiskOsWriteError::TaskPanic, + )); + } + } + } + WriterState::Complete(_) => { + let (uploaded_image_rx, running) = spawn_update_task(); + slot.insert(WriterState::TaskRunning(running)); + uploaded_image_rx + } + }, + } + }; + + // We've now spawned a task to drive the update, and we want to wait for + // it to finish copying from `image_upload`. + uploaded_image_rx.await.map_err(|_| BootDiskOsWriteError::TaskPanic)? + } + + pub(crate) fn status(&self, boot_disk: M2Slot) -> BootDiskOsWriteStatus { + let mut states = self.states.lock().unwrap(); + let mut slot = match states.entry(boot_disk) { + Entry::Vacant(_) => return BootDiskOsWriteStatus::NoUpdateRunning, + Entry::Occupied(slot) => slot, + }; + + match slot.get_mut() { + WriterState::TaskRunning(running) => { + match running.complete_rx.try_recv() { + Ok(result) => { + let update_id = running.update_id; + let status = BootDiskOsWriteStatus::from_result( + update_id, &result, + ); + slot.insert(WriterState::Complete(TaskCompleteState { + update_id, + result, + })); + status + } + Err(TryRecvError::Empty) => { + let progress = *running.progress_rx.borrow_and_update(); + BootDiskOsWriteStatus::InProgress { + update_id: running.update_id, + progress, + } + } + Err(TryRecvError::Closed) => { + let update_id = running.update_id; + let result = + Err(Arc::new(BootDiskOsWriteError::TaskPanic)); + let status = BootDiskOsWriteStatus::from_result( + update_id, &result, + ); + slot.insert(WriterState::Complete(TaskCompleteState { + update_id, + result, + })); + status + } + } + } + WriterState::Complete(complete) => { + BootDiskOsWriteStatus::from_result( + complete.update_id, + &complete.result, + ) + } + } + } +} + +#[derive(Debug)] +enum WriterState { + /// A task is running to write a new image to a boot disk. + TaskRunning(TaskRunningState), + /// The result of the most recent write. + Complete(TaskCompleteState), +} + +#[derive(Debug)] +struct TaskRunningState { + update_id: Uuid, + progress_rx: watch::Receiver, + complete_rx: oneshot::Receiver>>, +} + +#[derive(Debug)] +struct TaskCompleteState { + update_id: Uuid, + result: Result<(), Arc>, +} + +#[derive(Debug)] +struct BootDiskOsWriteTask { + log: Logger, + sha3_256_digest: [u8; 32], + progress_tx: watch::Sender, + complete_tx: oneshot::Sender>>, +} + +impl BootDiskOsWriteTask { + async fn run( + self, + image_upload: S, + uploaded_image_tx: oneshot::Sender< + Result<(), Arc>, + >, + ) where + S: Stream> + Send + 'static, + { + let result = self.run_impl(image_upload, uploaded_image_tx).await; + + // It's possible (albeit unlikely) our caller has discarded the receive + // half of this channel; ignore any send error. + _ = self.complete_tx.send(result); + } + + async fn run_impl( + &self, + image_upload: S, + uploaded_image_tx: oneshot::Sender< + Result<(), Arc>, + >, + ) -> Result<(), Arc> + where + S: Stream> + Send + 'static, + { + // Copy from `image_upload` into a tempfile, then report the result on + // `uploaded_image_tx`. Our dropshot server will not respond to the + // client that requested this update until we finish this step and send + // a response on `uploaded_image_tx`, as `image_upload` is the + // `StreamingBody` attached to their request. + // + // If this step fails, we will send the error to the client who sent the + // request _and_ a copy of the same error in our current update state. + let image_tempfile = match self + .download_body_to_tempfile(image_upload) + .await + .map_err(Arc::new) + { + Ok(tempfile) => { + _ = uploaded_image_tx.send(Ok(())); + tempfile + } + Err(err) => { + _ = uploaded_image_tx.send(Err(Arc::clone(&err))); + return Err(err); + } + }; + + warn!( + self.log, + "update implementation incomplete - \ + abandoning after copying image to a local tempfile" + ); + _ = image_tempfile; + + Ok(()) + } + + async fn download_body_to_tempfile( + &self, + image_upload: S, + ) -> Result + where + S: Stream> + Send + 'static, + { + let tempfile = camino_tempfile::tempfile() + .map_err(BootDiskOsWriteError::FailedCreatingTempfile)?; + + let mut tempfile = + tokio::io::BufWriter::new(tokio::fs::File::from_std(tempfile)); + + let mut image_upload = std::pin::pin!(image_upload.into_stream()); + let mut hasher = Sha3_256::default(); + let mut bytes_received = 0; + + // Stream the uploaded image into our tempfile. + while let Some(bytes) = image_upload + .try_next() + .await + .map_err(BootDiskOsWriteError::FailedDownloadingImage)? + { + hasher.update(&bytes); + tempfile + .write_all(&bytes) + .await + .map_err(BootDiskOsWriteError::FailedWritingTempfile)?; + bytes_received += bytes.len(); + self.progress_tx.send_modify(|progress| { + *progress = BootDiskOsWriteProgress::ReceivingUploadedImage { + bytes_received, + } + }); + } + + // Rewind the tempfile. + let mut tempfile = tempfile.into_inner(); + tempfile + .seek(io::SeekFrom::Start(0)) + .await + .map_err(BootDiskOsWriteError::FailedWritingTempfile)?; + + // Ensure the data the client sent us matches the hash they also sent + // us. A failure here means either the client lied or something has gone + // horribly wrong. + let hash: [u8; 32] = hasher.finalize().into(); + let expected_hash_str = hex::encode(&self.sha3_256_digest); + if hash == self.sha3_256_digest { + info!( + self.log, "received uploaded image"; + "bytes_received" => bytes_received, + "hash" => expected_hash_str, + ); + + Ok(tempfile) + } else { + let computed_hash_str = hex::encode(&hash); + error!( + self.log, "received uploaded image: incorrect hash"; + "bytes_received" => bytes_received, + "computed_hash" => &computed_hash_str, + "expected_hash" => &expected_hash_str, + ); + + Err(BootDiskOsWriteError::UploadedImageHashMismatch { + expected: expected_hash_str, + got: computed_hash_str, + }) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use omicron_test_utils::dev::test_setup_log; + use std::mem; + use std::time::Duration; + use tokio::sync::mpsc; + use tokio_stream::wrappers::UnboundedReceiverStream; + + // TODO DOCUMENT AND BUMP TO 30 + const TEST_TIMEOUT: Duration = Duration::from_secs(10); + + fn expect_in_progress( + status: BootDiskOsWriteStatus, + ) -> BootDiskOsWriteProgress { + let BootDiskOsWriteStatus::InProgress { progress, .. } = status else { + panic!("expected Status::InProgress; got {status:?}"); + }; + progress + } + + #[tokio::test] + async fn boot_disk_os_writer_delivers_upload_progress_and_rejects_bad_hashes( + ) { + let logctx = + test_setup_log("boot_disk_os_writer_delivers_upload_progress_and_rejects_bad_hashes"); + + let writer = Arc::new(BootDiskOsWriter::new(&logctx.log)); + let boot_disk = M2Slot::A; + + // We'll give the writer an intentionally-wrong sha3 digest and confirm + // it rejects the upload based on this. + let claimed_sha3_digest = [0; 32]; + + // Construct an in-memory stream around an mpsc channel as our client + // upload. + let (upload_tx, upload_rx) = mpsc::unbounded_channel(); + + // Spawn the `start_update` onto a background task; this won't end until + // we close (or send an error on) `upload_tx`. + let start_update_task = { + let writer = Arc::clone(&writer); + tokio::spawn(async move { + writer + .start_update( + boot_disk, + Uuid::new_v4(), + claimed_sha3_digest, + UnboundedReceiverStream::new(upload_rx), + ) + .await + }) + }; + + // As we stream data in, we'll compute the actual hash to check against + // the error we expect to see. + let mut actual_data_hasher = Sha3_256::new(); + + // Run the rest of the test under a timeout to catch any incorrect + // assumptions that result in a hang. + tokio::time::timeout(TEST_TIMEOUT, async move { + // We're racing `writer`'s spawning of the actual update task; spin + // until we transition from "no update" to "receiving uploaded + // image". + loop { + match writer.status(boot_disk) { + BootDiskOsWriteStatus::NoUpdateRunning => { + tokio::time::sleep(Duration::from_millis(50)).await; + continue; + } + BootDiskOsWriteStatus::InProgress { progress, .. } => { + println!("got {progress:?}"); + assert_eq!( + progress, + BootDiskOsWriteProgress::ReceivingUploadedImage { + bytes_received: 0 + } + ); + break; + } + status @ (BootDiskOsWriteStatus::Complete { .. } + | BootDiskOsWriteStatus::Failed { .. }) => { + panic!("unexpected status {status:?}") + } + } + } + + let mut prev_bytes_received = 0; + + // Send a few chunks of data. After each, we're racing with `writer` + // which has to copy that data to a temp file before the status will + // change, so loop until we see what we expect. Our TEST_TIMEOUT + // ensures we don't stay here forever if something goes wrong. + for i in 1..=10 { + let data_len = i * 100; + let chunk = vec![0; data_len]; + actual_data_hasher.update(&chunk); + upload_tx.send(Ok(Bytes::from(chunk))).unwrap(); + + loop { + let progress = expect_in_progress(writer.status(boot_disk)); + + // If we lost the race, the status is still what it was + // previously; sleep briefly and check again. + if progress + == (BootDiskOsWriteProgress::ReceivingUploadedImage { + bytes_received: prev_bytes_received, + }) + { + tokio::time::sleep(Duration::from_millis(50)).await; + continue; + } + + // It's not the old status; it should be exactly the new + // status. If it is, update our count and break out of this + // inner loop. + assert_eq!( + progress, + BootDiskOsWriteProgress::ReceivingUploadedImage { + bytes_received: prev_bytes_received + data_len + } + ); + prev_bytes_received += data_len; + println!("chunk {i}: got {progress:?}"); + break; + } + } + + // Close the channel; `writer` should recognize the upload is + // complete, then realize there's a hash mismatch and fail the + // request. + mem::drop(upload_tx); + + let start_update_result = start_update_task.await.unwrap(); + let error = start_update_result.unwrap_err(); + match &*error { + BootDiskOsWriteError::UploadedImageHashMismatch { + expected, + got, + } => { + assert_eq!( + *got, + hex::encode(actual_data_hasher.finalize()) + ); + assert_eq!(*expected, hex::encode(claimed_sha3_digest)); + } + _ => panic!("unexpected error {error:?}"), + } + }) + .await + .unwrap(); + + logctx.cleanup_successful(); + } +} diff --git a/sled-agent/src/http_entrypoints.rs b/sled-agent/src/http_entrypoints.rs index 9c3a079dac..62a4e8186c 100644 --- a/sled-agent/src/http_entrypoints.rs +++ b/sled-agent/src/http_entrypoints.rs @@ -21,11 +21,13 @@ use camino::Utf8PathBuf; use dropshot::{ endpoint, ApiDescription, FreeformBody, HttpError, HttpResponseCreated, HttpResponseDeleted, HttpResponseHeaders, HttpResponseOk, - HttpResponseUpdatedNoContent, Path, Query, RequestContext, TypedBody, + HttpResponseUpdatedNoContent, Path, Query, RequestContext, StreamingBody, + TypedBody, }; use illumos_utils::opte::params::{ DeleteVirtualNetworkInterfaceHost, SetVirtualNetworkInterfaceHost, }; +use installinator_common::M2Slot; use omicron_common::api::external::Error; use omicron_common::api::internal::nexus::{ DiskRuntimeState, SledInstanceState, UpdateArtifactId, @@ -75,6 +77,8 @@ pub fn api() -> SledApiDescription { api.register(write_network_bootstore_config)?; api.register(add_sled_to_initialized_rack)?; api.register(metrics_collect)?; + api.register(host_os_write_start)?; + api.register(host_os_write_status)?; Ok(()) } @@ -752,3 +756,74 @@ async fn metrics_collect( let producer_id = path_params.into_inner().producer_id; collect(&sa.metrics_registry(), producer_id).await } + +#[derive(Clone, Copy, Debug, Deserialize, JsonSchema, Serialize)] +pub struct BootDiskPathParams { + pub boot_disk: M2Slot, +} + +#[derive(Clone, Copy, Debug, Deserialize, JsonSchema, Serialize)] +pub struct BootDiskWriteStartQueryParams { + pub update_id: Uuid, + // TODO do we already have sha2-256 hashes of the OS images, and if so + // should we use that instead? Another option is to use the external API + // `Digest` type, although it predates `serde_human_bytes` so just stores + // the hash as a `String`. + #[serde(with = "serde_human_bytes::hex_array")] + #[schemars(schema_with = "omicron_common::hex_schema::<32>")] + pub sha3_256_digest: [u8; 32], +} + +/// Write a new host OS image to the specified boot disk +#[endpoint { + method = POST, + path = "/boot-disk/{boot_disk}/os/write", +}] +async fn host_os_write_start( + request_context: RequestContext, + path_params: Path, + query_params: Query, + body: StreamingBody, +) -> Result { + let sa = request_context.context(); + let boot_disk = path_params.into_inner().boot_disk; + let BootDiskWriteStartQueryParams { update_id, sha3_256_digest } = + query_params.into_inner(); + sa.boot_disk_os_writer() + .start_update(boot_disk, update_id, sha3_256_digest, body.into_stream()) + .await + .map_err(|err| HttpError::from(&*err))?; + Ok(HttpResponseUpdatedNoContent()) +} + +#[derive( + Debug, Clone, Copy, PartialEq, Eq, Deserialize, JsonSchema, Serialize, +)] +#[serde(tag = "state", rename_all = "snake_case")] +pub enum BootDiskOsWriteProgress { + ReceivingUploadedImage { bytes_received: usize }, +} + +#[derive(Debug, Clone, Deserialize, JsonSchema, Serialize)] +#[serde(tag = "status", rename_all = "snake_case")] +pub enum BootDiskOsWriteStatus { + NoUpdateRunning, + InProgress { update_id: Uuid, progress: BootDiskOsWriteProgress }, + Complete { update_id: Uuid }, + Failed { update_id: Uuid, message: String }, +} + +/// Get the status of writing a new host OS +#[endpoint { + method = GET, + path = "/boot-disk/{boot_disk}/os/write/status", +}] +async fn host_os_write_status( + request_context: RequestContext, + path_params: Path, +) -> Result, HttpError> { + let sa = request_context.context(); + let boot_disk = path_params.into_inner().boot_disk; + let status = sa.boot_disk_os_writer().status(boot_disk); + Ok(HttpResponseOk(status)) +} diff --git a/sled-agent/src/lib.rs b/sled-agent/src/lib.rs index d77ec7a3c0..527b483ee8 100644 --- a/sled-agent/src/lib.rs +++ b/sled-agent/src/lib.rs @@ -18,6 +18,7 @@ pub mod common; // Modules for the non-simulated sled agent. mod backing_fs; +mod boot_disk_os_writer; pub mod bootstrap; pub mod config; pub(crate) mod dump_setup; diff --git a/sled-agent/src/sled_agent.rs b/sled-agent/src/sled_agent.rs index 90e9706198..501d4587e2 100644 --- a/sled-agent/src/sled_agent.rs +++ b/sled-agent/src/sled_agent.rs @@ -4,6 +4,7 @@ //! Sled agent implementation +use crate::boot_disk_os_writer::BootDiskOsWriter; use crate::bootstrap::config::BOOTSTRAP_AGENT_RACK_INIT_PORT; use crate::bootstrap::early_networking::{ EarlyNetworkConfig, EarlyNetworkSetupError, @@ -264,6 +265,9 @@ struct SledAgentInner { // Object handling production of metrics for oximeter. metrics_manager: MetricsManager, + + // Handle to the traffic manager for writing OS updates to our boot disks. + boot_disk_os_writer: BootDiskOsWriter, } impl SledAgentInner { @@ -542,6 +546,7 @@ impl SledAgent { zone_bundler: long_running_task_handles.zone_bundler.clone(), bootstore: long_running_task_handles.bootstore.clone(), metrics_manager, + boot_disk_os_writer: BootDiskOsWriter::new(&parent_log), }), log: log.clone(), }; @@ -1040,6 +1045,10 @@ impl SledAgent { pub fn metrics_registry(&self) -> &ProducerRegistry { self.inner.metrics_manager.registry() } + + pub(crate) fn boot_disk_os_writer(&self) -> &BootDiskOsWriter { + &self.inner.boot_disk_os_writer + } } async fn register_metric_producer_with_nexus( From e5fb5bc77073d66c2c4fff5a1cb7e1aab8fbbbf4 Mon Sep 17 00:00:00 2001 From: John Gallagher Date: Tue, 5 Dec 2023 18:19:55 -0500 Subject: [PATCH 04/13] initial impl of writing the uploaded image to the real disk --- Cargo.lock | 1 + sled-agent/Cargo.toml | 1 + sled-agent/src/boot_disk_os_writer.rs | 441 ++++++++++++++++++++++++-- sled-agent/src/http_entrypoints.rs | 61 +++- sled-agent/src/sled_agent.rs | 4 + 5 files changed, 486 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 525f43a154..fcffb36978 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4879,6 +4879,7 @@ dependencies = [ "tofino", "tokio", "tokio-stream", + "tokio-util", "toml 0.8.8", "usdt", "uuid", diff --git a/sled-agent/Cargo.toml b/sled-agent/Cargo.toml index 3b6fd7c162..7607d57b95 100644 --- a/sled-agent/Cargo.toml +++ b/sled-agent/Cargo.toml @@ -98,6 +98,7 @@ slog-async.workspace = true slog-term.workspace = true tempfile.workspace = true tokio-stream.workspace = true +tokio-util.workspace = true illumos-utils = { workspace = true, features = ["testing", "tmp_keypath"] } sled-storage = { workspace = true, features = ["testing"] } diff --git a/sled-agent/src/boot_disk_os_writer.rs b/sled-agent/src/boot_disk_os_writer.rs index 6fb2230dcd..e720824976 100644 --- a/sled-agent/src/boot_disk_os_writer.rs +++ b/sled-agent/src/boot_disk_os_writer.rs @@ -6,23 +6,30 @@ use crate::http_entrypoints::BootDiskOsWriteProgress; use crate::http_entrypoints::BootDiskOsWriteStatus; +use async_trait::async_trait; use bytes::Bytes; +use camino::Utf8PathBuf; use display_error_chain::DisplayErrorChain; use dropshot::HttpError; use futures::Stream; use futures::TryStreamExt; use installinator_common::M2Slot; +use installinator_common::RawDiskWriter; use sha3::Digest; use sha3::Sha3_256; use slog::Logger; use std::collections::btree_map::Entry; use std::collections::BTreeMap; use std::io; +use std::path::Path; use std::sync::Arc; use std::sync::Mutex; use tokio::fs::File; +use tokio::io::AsyncReadExt; use tokio::io::AsyncSeekExt; +use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; +use tokio::io::BufReader; use tokio::sync::oneshot; use tokio::sync::oneshot::error::TryRecvError; use tokio::sync::watch; @@ -53,13 +60,32 @@ pub(crate) enum BootDiskOsWriteError { #[error("another update is still running ({0})")] AnotherUpdateRunning(Uuid), #[error("failed to create temporary file")] - FailedCreatingTempfile(io::Error), + FailedCreatingTempfile(#[source] io::Error), #[error("failed writing to temporary file")] - FailedWritingTempfile(io::Error), + FailedWritingTempfile(#[source] io::Error), #[error("failed downloading image from HTTP client")] - FailedDownloadingImage(HttpError), + FailedDownloadingImage(#[source] HttpError), #[error("hash mismatch in image from HTTP client: expected {expected} but got {got}")] UploadedImageHashMismatch { expected: String, got: String }, + #[error("failed to open disk for writing {path}")] + FailedOpenDisk { + #[source] + error: io::Error, + path: Utf8PathBuf, + }, + #[error("image size ({image_size}) is not a multiple of disk block size ({disk_block_size})")] + ImageSizeNotMultipleOfBlockSize { + image_size: usize, + disk_block_size: usize, + }, + #[error("failed reading from temporary file")] + FailedReadingTempfile(#[source] io::Error), + #[error("failed writing to disk {path}")] + FailedWritingDisk { + #[source] + error: io::Error, + path: Utf8PathBuf, + }, } impl From<&BootDiskOsWriteError> for HttpError { @@ -68,12 +94,16 @@ impl From<&BootDiskOsWriteError> for HttpError { match error { BootDiskOsWriteError::AnotherUpdateRunning(_) | BootDiskOsWriteError::FailedDownloadingImage(_) - | BootDiskOsWriteError::UploadedImageHashMismatch { .. } => { - HttpError::for_bad_request(None, message) - } + | BootDiskOsWriteError::UploadedImageHashMismatch { .. } + | BootDiskOsWriteError::ImageSizeNotMultipleOfBlockSize { + .. + } => HttpError::for_bad_request(None, message), BootDiskOsWriteError::TaskPanic | BootDiskOsWriteError::FailedCreatingTempfile(_) - | BootDiskOsWriteError::FailedWritingTempfile(_) => HttpError { + | BootDiskOsWriteError::FailedWritingTempfile(_) + | BootDiskOsWriteError::FailedReadingTempfile(_) + | BootDiskOsWriteError::FailedOpenDisk { .. } + | BootDiskOsWriteError::FailedWritingDisk { .. } => HttpError { status_code: http::StatusCode::SERVICE_UNAVAILABLE, error_code: None, external_message: message.clone(), @@ -103,12 +133,37 @@ impl BootDiskOsWriter { pub(crate) async fn start_update( &self, boot_disk: M2Slot, + disk_devfs_path: Utf8PathBuf, update_id: Uuid, sha3_256_digest: [u8; 32], image_upload: S, ) -> Result<(), Arc> where S: Stream> + Send + 'static, + { + self.start_update_impl( + boot_disk, + disk_devfs_path, + update_id, + sha3_256_digest, + image_upload, + InjectRawDiskWriter, + ) + .await + } + + async fn start_update_impl( + &self, + boot_disk: M2Slot, + disk_devfs_path: Utf8PathBuf, + update_id: Uuid, + sha3_256_digest: [u8; 32], + image_upload: S, + disk_writer: Writer, + ) -> Result<(), Arc> + where + S: Stream> + Send + 'static, + Writer: InjectDiskWriter + Send + Sync + 'static, { // Construct a closure that will spawn a task to drive this update, but // don't actually start it yet: we only allow an update to start if @@ -126,9 +181,11 @@ impl BootDiskOsWriter { log: self .log .new(slog::o!("update_id" => update_id.to_string())), + disk_devfs_path, sha3_256_digest, progress_tx, complete_tx, + disk_writer, }; tokio::spawn(task.run(image_upload, uploaded_image_tx)); ( @@ -266,14 +323,16 @@ struct TaskCompleteState { } #[derive(Debug)] -struct BootDiskOsWriteTask { +struct BootDiskOsWriteTask { log: Logger, sha3_256_digest: [u8; 32], + disk_devfs_path: Utf8PathBuf, progress_tx: watch::Sender, complete_tx: oneshot::Sender>>, + disk_writer: W, } -impl BootDiskOsWriteTask { +impl BootDiskOsWriteTask { async fn run( self, image_upload: S, @@ -308,7 +367,7 @@ impl BootDiskOsWriteTask { // // If this step fails, we will send the error to the client who sent the // request _and_ a copy of the same error in our current update state. - let image_tempfile = match self + let (image_tempfile, image_size) = match self .download_body_to_tempfile(image_upload) .await .map_err(Arc::new) @@ -323,12 +382,14 @@ impl BootDiskOsWriteTask { } }; - warn!( - self.log, - "update implementation incomplete - \ - abandoning after copying image to a local tempfile" - ); - _ = image_tempfile; + let disk_block_size = self + .copy_tempfile_to_disk(image_tempfile, image_size) + .await + .map_err(Arc::new)?; + + self.validate_written_image(image_size, disk_block_size) + .await + .map_err(Arc::new)?; Ok(()) } @@ -336,7 +397,7 @@ impl BootDiskOsWriteTask { async fn download_body_to_tempfile( &self, image_upload: S, - ) -> Result + ) -> Result<(File, usize), BootDiskOsWriteError> where S: Stream> + Send + 'static, { @@ -369,10 +430,16 @@ impl BootDiskOsWriteTask { }); } + // Flush any remaining buffered data. + tempfile + .flush() + .await + .map_err(BootDiskOsWriteError::FailedWritingTempfile)?; + // Rewind the tempfile. let mut tempfile = tempfile.into_inner(); tempfile - .seek(io::SeekFrom::Start(0)) + .rewind() .await .map_err(BootDiskOsWriteError::FailedWritingTempfile)?; @@ -388,7 +455,7 @@ impl BootDiskOsWriteTask { "hash" => expected_hash_str, ); - Ok(tempfile) + Ok((tempfile, bytes_received)) } else { let computed_hash_str = hex::encode(&hash); error!( @@ -404,19 +471,222 @@ impl BootDiskOsWriteTask { }) } } + + /// Copy from `image_tempfile` to the disk device at `self.disk_devfs_path`. + /// Returns the block size of that disk. + async fn copy_tempfile_to_disk( + &self, + image_tempfile: File, + image_size: usize, + ) -> Result { + let disk_writer = self + .disk_writer + .open(self.disk_devfs_path.as_std_path()) + .await + .map_err(|error| BootDiskOsWriteError::FailedOpenDisk { + error, + path: self.disk_devfs_path.clone(), + })?; + tokio::pin!(disk_writer); + + let disk_block_size = disk_writer.block_size(); + + if image_size % disk_block_size != 0 { + return Err( + BootDiskOsWriteError::ImageSizeNotMultipleOfBlockSize { + image_size, + disk_block_size, + }, + ); + } + let num_blocks = image_size / disk_block_size; + + let mut buf = vec![0; disk_block_size]; + let mut image_tempfile = BufReader::new(image_tempfile); + + for block in 0..num_blocks { + image_tempfile + .read_exact(&mut buf) + .await + .map_err(BootDiskOsWriteError::FailedReadingTempfile)?; + + disk_writer.write_all(&buf).await.map_err(|error| { + BootDiskOsWriteError::FailedWritingDisk { + error, + path: self.disk_devfs_path.clone(), + } + })?; + + self.progress_tx.send_modify(|progress| { + *progress = BootDiskOsWriteProgress::WritingImageToDisk { + bytes_written: (block + 1) * buf.len(), + } + }); + } + + Ok(disk_block_size) + } + + async fn validate_written_image( + &self, + image_size: usize, + disk_block_size: usize, + ) -> Result<(), BootDiskOsWriteError> { + // TODO + Ok(()) + } +} + +// Utility traits to allow injecting an in-memory "disk" for unit tests. +#[async_trait] +trait DiskWriter: AsyncWrite + Send + Sized { + fn block_size(&self) -> usize; + async fn finalize(self) -> io::Result<()>; +} +#[async_trait] +trait InjectDiskWriter { + type Writer: DiskWriter; + async fn open(&self, path: &Path) -> io::Result; +} + +#[async_trait] +impl DiskWriter for RawDiskWriter { + fn block_size(&self) -> usize { + RawDiskWriter::block_size(self) + } + + async fn finalize(self) -> io::Result<()> { + RawDiskWriter::finalize(self).await + } +} + +struct InjectRawDiskWriter; + +#[async_trait] +impl InjectDiskWriter for InjectRawDiskWriter { + type Writer = RawDiskWriter; + + async fn open(&self, path: &Path) -> io::Result { + RawDiskWriter::open(path).await + } } #[cfg(test)] mod tests { use super::*; + use futures::future; + use futures::stream; + use installinator_common::BlockSizeBufWriter; use omicron_test_utils::dev::test_setup_log; + use rand::RngCore; use std::mem; + use std::path::PathBuf; + use std::pin::Pin; + use std::task::ready; + use std::task::Context; + use std::task::Poll; use std::time::Duration; use tokio::sync::mpsc; + use tokio::sync::Semaphore; use tokio_stream::wrappers::UnboundedReceiverStream; + use tokio_util::sync::PollSemaphore; // TODO DOCUMENT AND BUMP TO 30 - const TEST_TIMEOUT: Duration = Duration::from_secs(10); + const TEST_TIMEOUT: Duration = Duration::from_secs(5); + + struct InMemoryDiskContents { + path: PathBuf, + data: Vec, + } + + struct InjectInMemoryDiskWriter { + semaphore: Arc, + finalized_writes: Arc>>, + } + + impl InjectInMemoryDiskWriter { + const BLOCK_SIZE: usize = 16; + + fn new(semaphore: Semaphore) -> Self { + Self { + semaphore: Arc::new(semaphore), + finalized_writes: Arc::default(), + } + } + } + + #[async_trait] + impl InjectDiskWriter for InjectInMemoryDiskWriter { + type Writer = InMemoryDiskWriter; + + async fn open(&self, path: &Path) -> io::Result { + Ok(InMemoryDiskWriter { + opened_path: path.into(), + data: BlockSizeBufWriter::with_block_size( + Self::BLOCK_SIZE, + Vec::new(), + ), + semaphore: PollSemaphore::new(Arc::clone(&self.semaphore)), + finalized_writes: Arc::clone(&self.finalized_writes), + }) + } + } + + struct InMemoryDiskWriter { + opened_path: PathBuf, + data: BlockSizeBufWriter>, + semaphore: PollSemaphore, + finalized_writes: Arc>>, + } + + #[async_trait] + impl DiskWriter for InMemoryDiskWriter { + fn block_size(&self) -> usize { + self.data.block_size() + } + + async fn finalize(mut self) -> io::Result<()> { + self.data.flush().await?; + + let mut finalized = self.finalized_writes.lock().unwrap(); + finalized.push(InMemoryDiskContents { + path: self.opened_path, + data: self.data.into_inner(), + }); + + Ok(()) + } + } + + impl AsyncWrite for InMemoryDiskWriter { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let permit = match ready!(self.semaphore.poll_acquire(cx)) { + Some(permit) => permit, + None => panic!("test semaphore closed"), + }; + let result = Pin::new(&mut self.data).poll_write(cx, buf); + permit.forget(); + result + } + + fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut self.data).poll_flush(cx) + } + + fn poll_shutdown( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut self.data).poll_shutdown(cx) + } + } fn expect_in_progress( status: BootDiskOsWriteStatus, @@ -452,6 +722,7 @@ mod tests { writer .start_update( boot_disk, + "/does-not-matter".into(), Uuid::new_v4(), claimed_sha3_digest, UnboundedReceiverStream::new(upload_rx), @@ -477,7 +748,6 @@ mod tests { continue; } BootDiskOsWriteStatus::InProgress { progress, .. } => { - println!("got {progress:?}"); assert_eq!( progress, BootDiskOsWriteProgress::ReceivingUploadedImage { @@ -560,4 +830,133 @@ mod tests { logctx.cleanup_successful(); } + + #[tokio::test] + async fn boot_disk_os_writer_writes_data_to_disk() { + let logctx = test_setup_log("boot_disk_os_writer_writes_data_to_disk"); + + // generate a small, random "OS image" consisting of 10 "blocks" + let num_data_blocks = 10; + let data_len = num_data_blocks * InjectInMemoryDiskWriter::BLOCK_SIZE; + let mut data = vec![0; data_len]; + rand::thread_rng().fill_bytes(&mut data); + let data_hash = Sha3_256::digest(&data); + + // generate a disk writer with a 0-permit semaphore; we'll inject + // permits in the main loop below to force single-stepping through + // writing the data + let inject_disk_writer = + InjectInMemoryDiskWriter::new(Semaphore::new(0)); + let shared_semaphore = Arc::clone(&inject_disk_writer.semaphore); + + let writer = Arc::new(BootDiskOsWriter::new(&logctx.log)); + let boot_disk = M2Slot::A; + let disk_devfs_path = "/unit-test/disk"; + + writer + .start_update_impl( + boot_disk, + disk_devfs_path.into(), + Uuid::new_v4(), + data_hash.into(), + stream::once(future::ready(Ok(Bytes::from(data.clone())))), + inject_disk_writer, + ) + .await + .unwrap(); + + // Run the rest of the test under a timeout to catch any incorrect + // assumptions that result in a hang. + tokio::time::timeout(TEST_TIMEOUT, async move { + // Wait until `writer` has copied our data into a temp file + loop { + let progress = expect_in_progress(writer.status(boot_disk)); + match progress { + BootDiskOsWriteProgress::ReceivingUploadedImage { + bytes_received, + } => { + if bytes_received == data.len() { + break; + } else { + println!( + "got status with {} bytes received", + bytes_received + ); + } + } + _ => panic!("unexpected progress {progress:?}"), + } + } + + for i in 0..num_data_blocks { + // Add one permit to our shared semaphore, allowing one block of + // data to be written to the "disk". + shared_semaphore.add_permits(1); + + // Wait until we see the status we expect + loop { + let status = writer.status(boot_disk); + if i + 1 < num_data_blocks { + // not the last block - we should see progress that + // matches the amount of data being copied + let progress = expect_in_progress(status); + match progress { + BootDiskOsWriteProgress::WritingImageToDisk { + bytes_written, + } if (i + 1) + * InjectInMemoryDiskWriter::BLOCK_SIZE + == bytes_written => + { + println!("saw expected progress for block {i}"); + break; + } + _ => { + // This is not an error: we could still be in + // `ReceivingUploadedImage` or the previous + // block's `WritingImageToDisk` + println!( + "saw irrelevant progress {progress:?}" + ); + tokio::time::sleep(Duration::from_millis(50)) + .await; + continue; + } + } + } else { + // On the last block, we may see an "in progress" with + // all data written, or we may skip straight to + // "complete". Either is fine and signals all data has + // been written. + match status { + BootDiskOsWriteStatus::Complete { .. } => break, + BootDiskOsWriteStatus::InProgress { + progress: BootDiskOsWriteProgress::WritingImageToDisk { + bytes_written, + }, + .. + } if bytes_written == data_len => break, + BootDiskOsWriteStatus::InProgress { + progress, .. + } => { + println!( + "saw irrelevant progress {progress:?}" + ); + tokio::time::sleep(Duration::from_millis(50)) + .await; + continue; + } + BootDiskOsWriteStatus::NoUpdateRunning + | BootDiskOsWriteStatus::Failed {..} => { + panic!("unexpected status {status:?}"); + } + } + } + } + } + }) + .await + .unwrap(); + + logctx.cleanup_successful(); + } } diff --git a/sled-agent/src/http_entrypoints.rs b/sled-agent/src/http_entrypoints.rs index 62a4e8186c..6220c9fb82 100644 --- a/sled-agent/src/http_entrypoints.rs +++ b/sled-agent/src/http_entrypoints.rs @@ -18,6 +18,7 @@ use crate::sled_agent::Error as SledAgentError; use crate::zone_bundle; use bootstore::schemes::v0::NetworkConfig; use camino::Utf8PathBuf; +use display_error_chain::DisplayErrorChain; use dropshot::{ endpoint, ApiDescription, FreeformBody, HttpError, HttpResponseCreated, HttpResponseDeleted, HttpResponseHeaders, HttpResponseOk, @@ -38,6 +39,7 @@ use oximeter_producer::collect; use oximeter_producer::ProducerIdPathParams; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use sled_hardware::DiskVariant; use std::collections::BTreeMap; use uuid::Uuid; @@ -787,10 +789,66 @@ async fn host_os_write_start( ) -> Result { let sa = request_context.context(); let boot_disk = path_params.into_inner().boot_disk; + + // Find our corresponding disk. + let maybe_disk_path = + sa.storage().get_latest_resources().await.disks().values().find_map( + |(disk, _pool)| { + // Synthetic disks panic if asked for their `slot()`, so filter + // them out first; additionally, filter out any non-M2 disks. + if disk.is_synthetic() || disk.variant() != DiskVariant::M2 { + return None; + } + + // Convert this M2 disk's slot to an M2Slot, and skip any that + // don't match the requested boot_disk. + let Ok(slot) = M2Slot::try_from(disk.slot()) else { + return None; + }; + if slot != boot_disk { + return None; + } + + let raw_devs_path = true; + Some(disk.boot_image_devfs_path(raw_devs_path)) + }, + ); + + let disk_path = match maybe_disk_path { + Some(Ok(path)) => path, + Some(Err(err)) => { + let message = format!( + "failed to find devfs path for {boot_disk:?}: {}", + DisplayErrorChain::new(&err) + ); + return Err(HttpError { + status_code: http::StatusCode::SERVICE_UNAVAILABLE, + error_code: None, + external_message: message.clone(), + internal_message: message, + }); + } + None => { + let message = format!("no disk found for slot {boot_disk:?}",); + return Err(HttpError { + status_code: http::StatusCode::SERVICE_UNAVAILABLE, + error_code: None, + external_message: message.clone(), + internal_message: message, + }); + } + }; + let BootDiskWriteStartQueryParams { update_id, sha3_256_digest } = query_params.into_inner(); sa.boot_disk_os_writer() - .start_update(boot_disk, update_id, sha3_256_digest, body.into_stream()) + .start_update( + boot_disk, + disk_path, + update_id, + sha3_256_digest, + body.into_stream(), + ) .await .map_err(|err| HttpError::from(&*err))?; Ok(HttpResponseUpdatedNoContent()) @@ -802,6 +860,7 @@ async fn host_os_write_start( #[serde(tag = "state", rename_all = "snake_case")] pub enum BootDiskOsWriteProgress { ReceivingUploadedImage { bytes_received: usize }, + WritingImageToDisk { bytes_written: usize }, } #[derive(Debug, Clone, Deserialize, JsonSchema, Serialize)] diff --git a/sled-agent/src/sled_agent.rs b/sled-agent/src/sled_agent.rs index 501d4587e2..ce570ed228 100644 --- a/sled-agent/src/sled_agent.rs +++ b/sled-agent/src/sled_agent.rs @@ -1046,6 +1046,10 @@ impl SledAgent { self.inner.metrics_manager.registry() } + pub(crate) fn storage(&self) -> &StorageHandle { + &self.inner.storage + } + pub(crate) fn boot_disk_os_writer(&self) -> &BootDiskOsWriter { &self.inner.boot_disk_os_writer } From 24acde4d17e3938c7c70196713b58f72f9102757 Mon Sep 17 00:00:00 2001 From: John Gallagher Date: Wed, 6 Dec 2023 13:17:41 -0500 Subject: [PATCH 05/13] reread disk to validate data written --- dev-tools/omdb/tests/successes.out | 2 +- openapi/sled-agent.json | 205 ------------- sled-agent/src/boot_disk_os_writer.rs | 405 +++++++++++++++++++------- sled-agent/src/http_entrypoints.rs | 1 + 4 files changed, 308 insertions(+), 305 deletions(-) diff --git a/dev-tools/omdb/tests/successes.out b/dev-tools/omdb/tests/successes.out index 65520ab59c..ef5330a288 100644 --- a/dev-tools/omdb/tests/successes.out +++ b/dev-tools/omdb/tests/successes.out @@ -333,7 +333,7 @@ task: "nat_v4_garbage_collector" currently executing: no last completed activation: iter 2, triggered by an explicit signal started at (s ago) and ran for ms -warning: unknown background task: "nat_v4_garbage_collector" (don't know how to interpret details: Null) + last completion reported error: failed to read generation of dpd: Error Response: status: 404 Not Found; headers: {"content-type": "application/json", "x-request-id": "REDACTED_UUID_REDACTED_UUID_REDACTED", "content-length": "84", "date": "Wed, 06 Dec 2023 18:36:02 GMT"}; value: Error { error_code: None, message: "Not Found", request_id: "REDACTED_UUID_REDACTED_UUID_REDACTED" } task: "external_endpoints" configured period: every 1m diff --git a/openapi/sled-agent.json b/openapi/sled-agent.json index 466a67dfee..5e217b27a4 100644 --- a/openapi/sled-agent.json +++ b/openapi/sled-agent.json @@ -10,96 +10,6 @@ "version": "0.0.1" }, "paths": { - "/boot-disk/{boot_disk}/os/write": { - "post": { - "summary": "Write a new host OS image to the specified boot disk", - "operationId": "host_os_write_start", - "parameters": [ - { - "in": "path", - "name": "boot_disk", - "required": true, - "schema": { - "$ref": "#/components/schemas/M2Slot" - } - }, - { - "in": "query", - "name": "sha3_256_digest", - "required": true, - "schema": { - "type": "string", - "format": "hex string (32 bytes)" - } - }, - { - "in": "query", - "name": "update_id", - "required": true, - "schema": { - "type": "string", - "format": "uuid" - } - } - ], - "requestBody": { - "content": { - "application/octet-stream": { - "schema": { - "type": "string", - "format": "binary" - } - } - }, - "required": true - }, - "responses": { - "204": { - "description": "resource updated" - }, - "4XX": { - "$ref": "#/components/responses/Error" - }, - "5XX": { - "$ref": "#/components/responses/Error" - } - } - } - }, - "/boot-disk/{boot_disk}/os/write/status": { - "get": { - "summary": "Get the status of writing a new host OS", - "operationId": "host_os_write_status", - "parameters": [ - { - "in": "path", - "name": "boot_disk", - "required": true, - "schema": { - "$ref": "#/components/schemas/M2Slot" - } - } - ], - "responses": { - "200": { - "description": "successful operation", - "content": { - "application/json": { - "schema": { - "$ref": "#/components/schemas/BootDiskOsWriteStatus" - } - } - } - }, - "4XX": { - "$ref": "#/components/responses/Error" - }, - "5XX": { - "$ref": "#/components/responses/Error" - } - } - } - }, "/cockroachdb": { "post": { "summary": "Initializes a CockroachDB cluster", @@ -2225,113 +2135,6 @@ "range" ] }, - "BootDiskOsWriteProgress": { - "oneOf": [ - { - "type": "object", - "properties": { - "bytes_received": { - "type": "integer", - "format": "uint", - "minimum": 0 - }, - "state": { - "type": "string", - "enum": [ - "receiving_uploaded_image" - ] - } - }, - "required": [ - "bytes_received", - "state" - ] - } - ] - }, - "BootDiskOsWriteStatus": { - "oneOf": [ - { - "type": "object", - "properties": { - "status": { - "type": "string", - "enum": [ - "no_update_running" - ] - } - }, - "required": [ - "status" - ] - }, - { - "type": "object", - "properties": { - "progress": { - "$ref": "#/components/schemas/BootDiskOsWriteProgress" - }, - "status": { - "type": "string", - "enum": [ - "in_progress" - ] - }, - "update_id": { - "type": "string", - "format": "uuid" - } - }, - "required": [ - "progress", - "status", - "update_id" - ] - }, - { - "type": "object", - "properties": { - "status": { - "type": "string", - "enum": [ - "complete" - ] - }, - "update_id": { - "type": "string", - "format": "uuid" - } - }, - "required": [ - "status", - "update_id" - ] - }, - { - "type": "object", - "properties": { - "message": { - "type": "string" - }, - "status": { - "type": "string", - "enum": [ - "failed" - ] - }, - "update_id": { - "type": "string", - "format": "uuid" - } - }, - "required": [ - "message", - "status", - "update_id" - ] - } - ] - }, "BundleUtilization": { "description": "The portion of a debug dataset used for zone bundles.", "type": "object", @@ -6677,14 +6480,6 @@ "description": "Zpool names are of the format ox{i,p}_. They are either Internal or External, and should be unique", "type": "string", "pattern": "^ox[ip]_[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$" - }, - "M2Slot": { - "description": "An M.2 slot that was written.", - "type": "string", - "enum": [ - "A", - "B" - ] } }, "responses": { diff --git a/sled-agent/src/boot_disk_os_writer.rs b/sled-agent/src/boot_disk_os_writer.rs index e720824976..3bed021ecc 100644 --- a/sled-agent/src/boot_disk_os_writer.rs +++ b/sled-agent/src/boot_disk_os_writer.rs @@ -21,6 +21,7 @@ use slog::Logger; use std::collections::btree_map::Entry; use std::collections::BTreeMap; use std::io; +use std::io::Read; use std::path::Path; use std::sync::Arc; use std::sync::Mutex; @@ -68,7 +69,7 @@ pub(crate) enum BootDiskOsWriteError { #[error("hash mismatch in image from HTTP client: expected {expected} but got {got}")] UploadedImageHashMismatch { expected: String, got: String }, #[error("failed to open disk for writing {path}")] - FailedOpenDisk { + FailedOpenDiskForWrite { #[source] error: io::Error, path: Utf8PathBuf, @@ -86,6 +87,24 @@ pub(crate) enum BootDiskOsWriteError { error: io::Error, path: Utf8PathBuf, }, + #[error("failed to open disk for reading {path}")] + FailedOpenDiskForRead { + #[source] + error: io::Error, + path: Utf8PathBuf, + }, + #[error("failed reading from disk {path}")] + FailedReadingDisk { + #[source] + error: io::Error, + path: Utf8PathBuf, + }, + #[error("hash mismatch after writing disk {path}: expected {expected} but got {got}")] + WrittenImageHashMismatch { + path: Utf8PathBuf, + expected: String, + got: String, + }, } impl From<&BootDiskOsWriteError> for HttpError { @@ -102,13 +121,18 @@ impl From<&BootDiskOsWriteError> for HttpError { | BootDiskOsWriteError::FailedCreatingTempfile(_) | BootDiskOsWriteError::FailedWritingTempfile(_) | BootDiskOsWriteError::FailedReadingTempfile(_) - | BootDiskOsWriteError::FailedOpenDisk { .. } - | BootDiskOsWriteError::FailedWritingDisk { .. } => HttpError { - status_code: http::StatusCode::SERVICE_UNAVAILABLE, - error_code: None, - external_message: message.clone(), - internal_message: message, - }, + | BootDiskOsWriteError::FailedOpenDiskForWrite { .. } + | BootDiskOsWriteError::FailedOpenDiskForRead { .. } + | BootDiskOsWriteError::FailedWritingDisk { .. } + | BootDiskOsWriteError::FailedReadingDisk { .. } + | BootDiskOsWriteError::WrittenImageHashMismatch { .. } => { + HttpError { + status_code: http::StatusCode::SERVICE_UNAVAILABLE, + error_code: None, + external_message: message.clone(), + internal_message: message, + } + } } } } @@ -147,7 +171,7 @@ impl BootDiskOsWriter { update_id, sha3_256_digest, image_upload, - InjectRawDiskWriter, + RealDiskInterface {}, ) .await } @@ -163,7 +187,7 @@ impl BootDiskOsWriter { ) -> Result<(), Arc> where S: Stream> + Send + 'static, - Writer: InjectDiskWriter + Send + Sync + 'static, + Writer: DiskInterface + Send + Sync + 'static, { // Construct a closure that will spawn a task to drive this update, but // don't actually start it yet: we only allow an update to start if @@ -184,10 +208,13 @@ impl BootDiskOsWriter { disk_devfs_path, sha3_256_digest, progress_tx, - complete_tx, - disk_writer, + disk_interface: disk_writer, }; - tokio::spawn(task.run(image_upload, uploaded_image_tx)); + tokio::spawn(task.run( + image_upload, + uploaded_image_tx, + complete_tx, + )); ( uploaded_image_rx, TaskRunningState { update_id, progress_rx, complete_rx }, @@ -328,17 +355,17 @@ struct BootDiskOsWriteTask { sha3_256_digest: [u8; 32], disk_devfs_path: Utf8PathBuf, progress_tx: watch::Sender, - complete_tx: oneshot::Sender>>, - disk_writer: W, + disk_interface: W, } -impl BootDiskOsWriteTask { +impl BootDiskOsWriteTask { async fn run( self, image_upload: S, uploaded_image_tx: oneshot::Sender< Result<(), Arc>, >, + complete_tx: oneshot::Sender>>, ) where S: Stream> + Send + 'static, { @@ -346,11 +373,11 @@ impl BootDiskOsWriteTask { // It's possible (albeit unlikely) our caller has discarded the receive // half of this channel; ignore any send error. - _ = self.complete_tx.send(result); + _ = complete_tx.send(result); } async fn run_impl( - &self, + self, image_upload: S, uploaded_image_tx: oneshot::Sender< Result<(), Arc>, @@ -446,9 +473,9 @@ impl BootDiskOsWriteTask { // Ensure the data the client sent us matches the hash they also sent // us. A failure here means either the client lied or something has gone // horribly wrong. - let hash: [u8; 32] = hasher.finalize().into(); + let hash = hasher.finalize(); let expected_hash_str = hex::encode(&self.sha3_256_digest); - if hash == self.sha3_256_digest { + if hash == self.sha3_256_digest.into() { info!( self.log, "received uploaded image"; "bytes_received" => bytes_received, @@ -479,15 +506,14 @@ impl BootDiskOsWriteTask { image_tempfile: File, image_size: usize, ) -> Result { - let disk_writer = self - .disk_writer - .open(self.disk_devfs_path.as_std_path()) + let mut disk_writer = self + .disk_interface + .open_writer(self.disk_devfs_path.as_std_path()) .await - .map_err(|error| BootDiskOsWriteError::FailedOpenDisk { + .map_err(|error| BootDiskOsWriteError::FailedOpenDiskForWrite { error, path: self.disk_devfs_path.clone(), })?; - tokio::pin!(disk_writer); let disk_block_size = disk_writer.block_size(); @@ -524,29 +550,106 @@ impl BootDiskOsWriteTask { }); } + disk_writer.finalize().await.map_err(|error| { + BootDiskOsWriteError::FailedWritingDisk { + error, + path: self.disk_devfs_path.clone(), + } + })?; + + info!( + self.log, "copied OS image to disk"; + "path" => %self.disk_devfs_path, + "bytes_written" => image_size, + ); + Ok(disk_block_size) } async fn validate_written_image( - &self, + self, image_size: usize, disk_block_size: usize, ) -> Result<(), BootDiskOsWriteError> { - // TODO - Ok(()) + // We're reading the OS image back from disk and hashing it; this can + // all be synchronous inside a spawn_blocking. + tokio::task::spawn_blocking(move || { + let mut f = self + .disk_interface + .open_reader(self.disk_devfs_path.as_std_path()) + .map_err(|error| { + BootDiskOsWriteError::FailedOpenDiskForRead { + error, + path: self.disk_devfs_path.clone(), + } + })?; + + let mut buf = vec![0; disk_block_size]; + let mut hasher = Sha3_256::default(); + let mut bytes_read = 0; + + while bytes_read < image_size { + // We already confirmed while writing the image that the image + // size is an exact multiple of the disk block size, so we can + // always read a full `buf` here. + f.read_exact(&mut buf).map_err(|error| { + BootDiskOsWriteError::FailedReadingDisk { + error, + path: self.disk_devfs_path.clone(), + } + })?; + + hasher.update(&buf); + bytes_read += buf.len(); + self.progress_tx.send_modify(|progress| { + *progress = + BootDiskOsWriteProgress::ValidatingWrittenImage { + bytes_read, + }; + }); + } + + let expected_hash_str = hex::encode(&self.sha3_256_digest); + let hash = hasher.finalize(); + if hash == self.sha3_256_digest.into() { + info!( + self.log, "validated OS image written to disk"; + "path" => %self.disk_devfs_path, + "hash" => expected_hash_str, + ); + Ok(()) + } else { + let computed_hash_str = hex::encode(&hash); + error!( + self.log, "failed to validate written OS image"; + "bytes_hashed" => image_size, + "computed_hash" => &computed_hash_str, + "expected_hash" => &expected_hash_str, + ); + Err(BootDiskOsWriteError::WrittenImageHashMismatch { + path: self.disk_devfs_path, + expected: expected_hash_str, + got: computed_hash_str, + }) + } + }) + .await + .expect("blocking task panicked") } } // Utility traits to allow injecting an in-memory "disk" for unit tests. #[async_trait] -trait DiskWriter: AsyncWrite + Send + Sized { +trait DiskWriter: AsyncWrite + Send + Sized + Unpin { fn block_size(&self) -> usize; async fn finalize(self) -> io::Result<()>; } #[async_trait] -trait InjectDiskWriter { +trait DiskInterface: Send + Sync + 'static { type Writer: DiskWriter; - async fn open(&self, path: &Path) -> io::Result; + type Reader: io::Read + Send; + async fn open_writer(&self, path: &Path) -> io::Result; + fn open_reader(&self, path: &Path) -> io::Result; } #[async_trait] @@ -560,15 +663,20 @@ impl DiskWriter for RawDiskWriter { } } -struct InjectRawDiskWriter; +struct RealDiskInterface {} #[async_trait] -impl InjectDiskWriter for InjectRawDiskWriter { +impl DiskInterface for RealDiskInterface { type Writer = RawDiskWriter; + type Reader = std::fs::File; - async fn open(&self, path: &Path) -> io::Result { + async fn open_writer(&self, path: &Path) -> io::Result { RawDiskWriter::open(path).await } + + fn open_reader(&self, path: &Path) -> io::Result { + std::fs::File::open(path) + } } #[cfg(test)] @@ -591,20 +699,27 @@ mod tests { use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_util::sync::PollSemaphore; - // TODO DOCUMENT AND BUMP TO 30 - const TEST_TIMEOUT: Duration = Duration::from_secs(5); + // Most of the tests below end up looping while calling + // `BootDiskOsWriter::status()` waiting for a specific status message to + // arrive. If we get that wrong (or the code under test is wrong!), that + // could end up looping forever, so we run all the relevant bits of the + // tests under a tokio timeout. We expect all the tests to complete very + // quickly in general (< 1 second), so we'll pick something + // outrageously-long-enough that if we hit it, we're almost certainly + // dealing with a hung test. + const TEST_TIMEOUT: Duration = Duration::from_secs(30); struct InMemoryDiskContents { path: PathBuf, data: Vec, } - struct InjectInMemoryDiskWriter { + struct InMemoryDiskInterface { semaphore: Arc, finalized_writes: Arc>>, } - impl InjectInMemoryDiskWriter { + impl InMemoryDiskInterface { const BLOCK_SIZE: usize = 16; fn new(semaphore: Semaphore) -> Self { @@ -616,10 +731,11 @@ mod tests { } #[async_trait] - impl InjectDiskWriter for InjectInMemoryDiskWriter { + impl DiskInterface for InMemoryDiskInterface { type Writer = InMemoryDiskWriter; + type Reader = io::Cursor>; - async fn open(&self, path: &Path) -> io::Result { + async fn open_writer(&self, path: &Path) -> io::Result { Ok(InMemoryDiskWriter { opened_path: path.into(), data: BlockSizeBufWriter::with_block_size( @@ -630,6 +746,19 @@ mod tests { finalized_writes: Arc::clone(&self.finalized_writes), }) } + + fn open_reader(&self, path: &Path) -> io::Result { + let written_files = self.finalized_writes.lock().unwrap(); + for contents in written_files.iter() { + if contents.path == path { + return Ok(io::Cursor::new(contents.data.clone())); + } + } + Err(io::Error::new( + io::ErrorKind::Other, + format!("no written file for {}", path.display()), + )) + } } struct InMemoryDiskWriter { @@ -837,7 +966,7 @@ mod tests { // generate a small, random "OS image" consisting of 10 "blocks" let num_data_blocks = 10; - let data_len = num_data_blocks * InjectInMemoryDiskWriter::BLOCK_SIZE; + let data_len = num_data_blocks * InMemoryDiskInterface::BLOCK_SIZE; let mut data = vec![0; data_len]; rand::thread_rng().fill_bytes(&mut data); let data_hash = Sha3_256::digest(&data); @@ -845,9 +974,9 @@ mod tests { // generate a disk writer with a 0-permit semaphore; we'll inject // permits in the main loop below to force single-stepping through // writing the data - let inject_disk_writer = - InjectInMemoryDiskWriter::new(Semaphore::new(0)); - let shared_semaphore = Arc::clone(&inject_disk_writer.semaphore); + let inject_disk_interface = + InMemoryDiskInterface::new(Semaphore::new(0)); + let shared_semaphore = Arc::clone(&inject_disk_interface.semaphore); let writer = Arc::new(BootDiskOsWriter::new(&logctx.log)); let boot_disk = M2Slot::A; @@ -860,7 +989,7 @@ mod tests { Uuid::new_v4(), data_hash.into(), stream::once(future::ready(Ok(Bytes::from(data.clone())))), - inject_disk_writer, + inject_disk_interface, ) .await .unwrap(); @@ -893,70 +1022,148 @@ mod tests { // data to be written to the "disk". shared_semaphore.add_permits(1); - // Wait until we see the status we expect + // Did we just release the write of the final block? If so, + // break; we'll wait for completion below. + if i + 1 == num_data_blocks { + break; + } + + // Wait until we see the status we expect for a not-yet-last + // block (i.e., that the disk is still being written). loop { - let status = writer.status(boot_disk); - if i + 1 < num_data_blocks { - // not the last block - we should see progress that - // matches the amount of data being copied - let progress = expect_in_progress(status); - match progress { - BootDiskOsWriteProgress::WritingImageToDisk { - bytes_written, - } if (i + 1) - * InjectInMemoryDiskWriter::BLOCK_SIZE - == bytes_written => - { - println!("saw expected progress for block {i}"); - break; - } - _ => { - // This is not an error: we could still be in - // `ReceivingUploadedImage` or the previous - // block's `WritingImageToDisk` - println!( - "saw irrelevant progress {progress:?}" - ); - tokio::time::sleep(Duration::from_millis(50)) - .await; - continue; - } + let progress = expect_in_progress(writer.status(boot_disk)); + match progress { + BootDiskOsWriteProgress::WritingImageToDisk { + bytes_written, + } if (i + 1) * InMemoryDiskInterface::BLOCK_SIZE + == bytes_written => + { + println!("saw expected progress for block {i}"); + break; } - } else { - // On the last block, we may see an "in progress" with - // all data written, or we may skip straight to - // "complete". Either is fine and signals all data has - // been written. - match status { - BootDiskOsWriteStatus::Complete { .. } => break, - BootDiskOsWriteStatus::InProgress { - progress: BootDiskOsWriteProgress::WritingImageToDisk { - bytes_written, - }, - .. - } if bytes_written == data_len => break, - BootDiskOsWriteStatus::InProgress { - progress, .. - } => { - println!( - "saw irrelevant progress {progress:?}" - ); - tokio::time::sleep(Duration::from_millis(50)) - .await; - continue; - } - BootDiskOsWriteStatus::NoUpdateRunning - | BootDiskOsWriteStatus::Failed {..} => { - panic!("unexpected status {status:?}"); - } + _ => { + // This is not an error: we could still be in + // `ReceivingUploadedImage` or the previous + // block's `WritingImageToDisk` + println!("saw irrelevant progress {progress:?}"); + tokio::time::sleep(Duration::from_millis(50)).await; + continue; } } } } + + // The last block is being or has been written, and after that the + // writer will reread it to validate the hash. We won't bother + // repeating the same machinery to check each step of that process; + // we'll just wait for the eventual successful completion. + loop { + let status = writer.status(boot_disk); + match status { + BootDiskOsWriteStatus::Complete { .. } => break, + BootDiskOsWriteStatus::InProgress { .. } => { + println!("saw irrelevant progress {status:?}"); + tokio::time::sleep(Duration::from_millis(50)).await; + continue; + } + BootDiskOsWriteStatus::NoUpdateRunning + | BootDiskOsWriteStatus::Failed { .. } => { + panic!("unexpected status {status:?}") + } + } + } }) .await .unwrap(); logctx.cleanup_successful(); } + + #[tokio::test] + async fn boot_disk_os_writer_fails_if_reading_from_disk_doesnt_match() { + let logctx = test_setup_log( + "boot_disk_os_writer_fails_if_reading_from_disk_doesnt_match", + ); + + // generate a small, random "OS image" consisting of 10 "blocks" + let num_data_blocks = 10; + let data_len = num_data_blocks * InMemoryDiskInterface::BLOCK_SIZE; + let mut data = vec![0; data_len]; + rand::thread_rng().fill_bytes(&mut data); + let original_data_hash = Sha3_256::digest(&data); + + // generate a disk writer with (effectively) unlimited semaphore + // permits, since we don't need to throttle the "disk writing" + let inject_disk_interface = + InMemoryDiskInterface::new(Semaphore::new(Semaphore::MAX_PERMITS)); + + let writer = Arc::new(BootDiskOsWriter::new(&logctx.log)); + let boot_disk = M2Slot::A; + let disk_devfs_path = "/unit-test/disk"; + + // copy the data and corrupt it, then stage this in + // `inject_disk_interface` so that it returns this corrupted data when + // "reading" the disk + let mut bad_data = data.clone(); + bad_data[0] ^= 1; // bit flip + let bad_data_hash = Sha3_256::digest(&bad_data); + inject_disk_interface.finalized_writes.lock().unwrap().push( + InMemoryDiskContents { + path: disk_devfs_path.into(), + data: bad_data, + }, + ); + + writer + .start_update_impl( + boot_disk, + disk_devfs_path.into(), + Uuid::new_v4(), + original_data_hash.into(), + stream::once(future::ready(Ok(Bytes::from(data.clone())))), + inject_disk_interface, + ) + .await + .unwrap(); + + // We expect the update to eventually fail; wait for it to do so. + let failure_message = tokio::time::timeout(TEST_TIMEOUT, async move { + loop { + let status = writer.status(boot_disk); + match status { + BootDiskOsWriteStatus::Failed { message, .. } => { + return message; + } + BootDiskOsWriteStatus::InProgress { .. } => { + println!("saw irrelevant status {status:?}"); + tokio::time::sleep(Duration::from_millis(50)).await; + continue; + } + BootDiskOsWriteStatus::Complete { .. } + | BootDiskOsWriteStatus::NoUpdateRunning => { + panic!("unexpected status {status:?}"); + } + } + } + }) + .await + .unwrap(); + + // Confirm that the update fails for the reason we expect: when + // re-reading what had been written to disk, it got our corrupt data + // (which hashes to `bad_data_hash`) instead of the expected + // `original_data_hash`. + let expected_error = BootDiskOsWriteError::WrittenImageHashMismatch { + path: disk_devfs_path.into(), + expected: hex::encode(&original_data_hash), + got: hex::encode(&bad_data_hash), + }; + + assert_eq!( + failure_message, + DisplayErrorChain::new(&expected_error).to_string() + ); + + logctx.cleanup_successful(); + } } diff --git a/sled-agent/src/http_entrypoints.rs b/sled-agent/src/http_entrypoints.rs index 6220c9fb82..714bf269d8 100644 --- a/sled-agent/src/http_entrypoints.rs +++ b/sled-agent/src/http_entrypoints.rs @@ -861,6 +861,7 @@ async fn host_os_write_start( pub enum BootDiskOsWriteProgress { ReceivingUploadedImage { bytes_received: usize }, WritingImageToDisk { bytes_written: usize }, + ValidatingWrittenImage { bytes_read: usize }, } #[derive(Debug, Clone, Deserialize, JsonSchema, Serialize)] From 10d1fea36803004916ea0daef11d89355dd062ba Mon Sep 17 00:00:00 2001 From: John Gallagher Date: Wed, 6 Dec 2023 16:25:04 -0500 Subject: [PATCH 06/13] additional tests --- sled-agent/src/boot_disk_os_writer.rs | 241 +++++++++++++++++++++++++- 1 file changed, 237 insertions(+), 4 deletions(-) diff --git a/sled-agent/src/boot_disk_os_writer.rs b/sled-agent/src/boot_disk_os_writer.rs index 3bed021ecc..0b7c3c798e 100644 --- a/sled-agent/src/boot_disk_os_writer.rs +++ b/sled-agent/src/boot_disk_os_writer.rs @@ -709,11 +709,13 @@ mod tests { // dealing with a hung test. const TEST_TIMEOUT: Duration = Duration::from_secs(30); + #[derive(Debug, Clone, PartialEq, Eq)] struct InMemoryDiskContents { path: PathBuf, data: Vec, } + #[derive(Debug, Clone)] struct InMemoryDiskInterface { semaphore: Arc, finalized_writes: Arc>>, @@ -938,6 +940,11 @@ mod tests { // request. mem::drop(upload_tx); + // We expect to see an upload hash mismatch error with these hex + // strings. + let expected_hash = hex::encode(claimed_sha3_digest); + let got_hash = hex::encode(actual_data_hasher.finalize()); + let start_update_result = start_update_task.await.unwrap(); let error = start_update_result.unwrap_err(); match &*error { @@ -945,13 +952,31 @@ mod tests { expected, got, } => { + assert_eq!(*got, got_hash); + assert_eq!(*expected, expected_hash); + } + _ => panic!("unexpected error {error:?}"), + } + + // The same error should be present in the current update status. + let expected_error = + BootDiskOsWriteError::UploadedImageHashMismatch { + expected: expected_hash.clone(), + got: got_hash.clone(), + }; + let status = writer.status(boot_disk); + match status { + BootDiskOsWriteStatus::Failed { message, .. } => { assert_eq!( - *got, - hex::encode(actual_data_hasher.finalize()) + message, + DisplayErrorChain::new(&expected_error).to_string() ); - assert_eq!(*expected, hex::encode(claimed_sha3_digest)); } - _ => panic!("unexpected error {error:?}"), + BootDiskOsWriteStatus::NoUpdateRunning + | BootDiskOsWriteStatus::InProgress { .. } + | BootDiskOsWriteStatus::Complete { .. } => { + panic!("unexpected status {status:?}") + } } }) .await @@ -1166,4 +1191,212 @@ mod tests { logctx.cleanup_successful(); } + + #[tokio::test] + async fn boot_disk_os_writer_can_update_both_slots_simultaneously() { + let logctx = test_setup_log( + "boot_disk_os_writer_can_update_both_slots_simultaneously", + ); + + // generate two small, random "OS image"s consisting of 10 "blocks" each + let num_data_blocks = 10; + let data_len = num_data_blocks * InMemoryDiskInterface::BLOCK_SIZE; + let mut data_a = vec![0; data_len]; + let mut data_b = vec![0; data_len]; + rand::thread_rng().fill_bytes(&mut data_a); + rand::thread_rng().fill_bytes(&mut data_b); + let data_hash_a = Sha3_256::digest(&data_a); + let data_hash_b = Sha3_256::digest(&data_b); + + // generate a disk writer with no semaphore permits so the updates block + // until we get a chance to start both of them + let inject_disk_interface = + InMemoryDiskInterface::new(Semaphore::new(0)); + let shared_semaphore = Arc::clone(&inject_disk_interface.semaphore); + + let writer = Arc::new(BootDiskOsWriter::new(&logctx.log)); + let disk_devfs_path_a = "/unit-test/disk/a"; + let disk_devfs_path_b = "/unit-test/disk/b"; + + let update_id_a = Uuid::new_v4(); + let update_id_b = Uuid::new_v4(); + + writer + .start_update_impl( + M2Slot::A, + disk_devfs_path_a.into(), + update_id_a, + data_hash_a.into(), + stream::once(future::ready(Ok(Bytes::from(data_a.clone())))), + inject_disk_interface.clone(), + ) + .await + .unwrap(); + + writer + .start_update_impl( + M2Slot::B, + disk_devfs_path_b.into(), + update_id_b, + data_hash_b.into(), + stream::once(future::ready(Ok(Bytes::from(data_b.clone())))), + inject_disk_interface.clone(), + ) + .await + .unwrap(); + + // Both updates have successfully started; unblock the "disks". + shared_semaphore.add_permits(Semaphore::MAX_PERMITS); + + // Wait for both updates to complete successfully. + for boot_disk in [M2Slot::A, M2Slot::B] { + tokio::time::timeout(TEST_TIMEOUT, async { + loop { + let status = writer.status(boot_disk); + match status { + BootDiskOsWriteStatus::InProgress { .. } => { + println!("saw irrelevant status {status:?}"); + tokio::time::sleep(Duration::from_millis(50)).await; + continue; + } + BootDiskOsWriteStatus::Complete { update_id } => { + match boot_disk { + M2Slot::A => assert_eq!(update_id, update_id_a), + M2Slot::B => assert_eq!(update_id, update_id_b), + } + break; + } + BootDiskOsWriteStatus::Failed { .. } + | BootDiskOsWriteStatus::NoUpdateRunning => { + panic!("unexpected status {status:?}"); + } + } + } + }) + .await + .unwrap(); + } + + // Ensure each "disk" saw the expected contents. + let expected_disks = [ + InMemoryDiskContents { + path: disk_devfs_path_a.into(), + data: data_a, + }, + InMemoryDiskContents { + path: disk_devfs_path_b.into(), + data: data_b, + }, + ]; + let written_disks = + inject_disk_interface.finalized_writes.lock().unwrap(); + assert_eq!(written_disks.len(), expected_disks.len()); + for expected in expected_disks { + assert!( + written_disks.contains(&expected), + "written disks missing expected contents for {}", + expected.path.display(), + ); + } + + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn boot_disk_os_writer_rejects_new_updates_while_old_running() { + let logctx = test_setup_log( + "boot_disk_os_writer_rejects_new_updates_while_old_running", + ); + + // generate two small, random "OS image"s consisting of 10 "blocks" each + let num_data_blocks = 10; + let data_len = num_data_blocks * InMemoryDiskInterface::BLOCK_SIZE; + let mut data_a = vec![0; data_len]; + let mut data_b = vec![0; data_len]; + rand::thread_rng().fill_bytes(&mut data_a); + rand::thread_rng().fill_bytes(&mut data_b); + let data_hash_a = Sha3_256::digest(&data_a); + let data_hash_b = Sha3_256::digest(&data_b); + + // generate a disk writer with no semaphore permits so the updates block + // until we get a chance to (try to) start both of them + let inject_disk_interface = + InMemoryDiskInterface::new(Semaphore::new(0)); + let shared_semaphore = Arc::clone(&inject_disk_interface.semaphore); + + let writer = Arc::new(BootDiskOsWriter::new(&logctx.log)); + let disk_devfs_path = "/unit-test/disk"; + let boot_disk = M2Slot::A; + + let update_id_a = Uuid::new_v4(); + let update_id_b = Uuid::new_v4(); + + writer + .start_update_impl( + boot_disk, + disk_devfs_path.into(), + update_id_a, + data_hash_a.into(), + stream::once(future::ready(Ok(Bytes::from(data_a.clone())))), + inject_disk_interface.clone(), + ) + .await + .unwrap(); + + let error = writer + .start_update_impl( + boot_disk, + disk_devfs_path.into(), + update_id_b, + data_hash_b.into(), + stream::once(future::ready(Ok(Bytes::from(data_b.clone())))), + inject_disk_interface.clone(), + ) + .await + .unwrap_err(); + match &*error { + BootDiskOsWriteError::AnotherUpdateRunning(running_id) => { + assert_eq!(*running_id, update_id_a); + } + _ => panic!("unexpected error {error}"), + } + + // Both update attempts started; unblock the "disk". + shared_semaphore.add_permits(Semaphore::MAX_PERMITS); + + // Wait for the first update to complete successfully. + tokio::time::timeout(TEST_TIMEOUT, async { + loop { + let status = writer.status(boot_disk); + match status { + BootDiskOsWriteStatus::InProgress { .. } => { + println!("saw irrelevant status {status:?}"); + tokio::time::sleep(Duration::from_millis(50)).await; + continue; + } + BootDiskOsWriteStatus::Complete { update_id } => { + assert_eq!(update_id, update_id_a); + break; + } + BootDiskOsWriteStatus::Failed { .. } + | BootDiskOsWriteStatus::NoUpdateRunning => { + panic!("unexpected status {status:?}"); + } + } + } + }) + .await + .unwrap(); + + // Ensure we wrote the contents of the first update. + let expected_disks = [InMemoryDiskContents { + path: disk_devfs_path.into(), + data: data_a, + }]; + let written_disks = + inject_disk_interface.finalized_writes.lock().unwrap(); + assert_eq!(*written_disks, expected_disks); + + logctx.cleanup_successful(); + } } From 8b85684f7781216bec2d2f751eaeb778aad5c2ae Mon Sep 17 00:00:00 2001 From: John Gallagher Date: Wed, 6 Dec 2023 16:27:16 -0500 Subject: [PATCH 07/13] more doc comments --- sled-agent/src/boot_disk_os_writer.rs | 47 ++++++++++++++++++++++----- sled-agent/src/http_entrypoints.rs | 11 ++++++- 2 files changed, 49 insertions(+), 9 deletions(-) diff --git a/sled-agent/src/boot_disk_os_writer.rs b/sled-agent/src/boot_disk_os_writer.rs index 0b7c3c798e..dfd99964cc 100644 --- a/sled-agent/src/boot_disk_os_writer.rs +++ b/sled-agent/src/boot_disk_os_writer.rs @@ -2,7 +2,8 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -//! TODO FIXME +//! This module provides `BootDiskOsWriter`, via which sled-agent can write new +//! OS images to its boot disks. use crate::http_entrypoints::BootDiskOsWriteProgress; use crate::http_entrypoints::BootDiskOsWriteStatus; @@ -154,6 +155,28 @@ impl BootDiskOsWriter { } } + /// Attempt to start a new update to the given disk (identified by both its + /// slot and the path to its devfs device). + /// + /// This method will return after the `image_upload` stream has been saved + /// to a local temporary file, but before the update has completed. Callers + /// must poll `status()` to discover when the running update completes (or + /// fails). + /// + /// # Errors + /// + /// This method will return an error and not start an update if any of the + /// following are true: + /// + /// * A previously-started update of this same `boot_disk` is still running + /// * The `image_upload` stream returns an error + /// * The hash of the data provided by `image_upload` does not match + /// `sha3_256_digest` + /// * Any of a variety of I/O errors occurs while copying from + /// `image_upload` to a temporary file + /// + /// In all but the first case, the error returned will also be saved and + /// returned when `status()` is called (until another update is started). pub(crate) async fn start_update( &self, boot_disk: M2Slot, @@ -261,6 +284,11 @@ impl BootDiskOsWriter { } } } + // TODO-correctness Should we separate require callers to + // explicitly clear out the results of a completed update + // before starting a new update? (Answer could be different + // depending on whether the most recent update succeeded or + // failed.) WriterState::Complete(_) => { let (uploaded_image_rx, running) = spawn_update_task(); slot.insert(WriterState::TaskRunning(running)); @@ -275,15 +303,18 @@ impl BootDiskOsWriter { uploaded_image_rx.await.map_err(|_| BootDiskOsWriteError::TaskPanic)? } + /// Get the status of any update running that targets `boot_disk`. pub(crate) fn status(&self, boot_disk: M2Slot) -> BootDiskOsWriteStatus { let mut states = self.states.lock().unwrap(); let mut slot = match states.entry(boot_disk) { - Entry::Vacant(_) => return BootDiskOsWriteStatus::NoUpdateRunning, + Entry::Vacant(_) => return BootDiskOsWriteStatus::NoUpdateStarted, Entry::Occupied(slot) => slot, }; match slot.get_mut() { WriterState::TaskRunning(running) => { + // Is the task actually still running? Check and see if it's + // sent us a result that we just haven't noticed yet. match running.complete_rx.try_recv() { Ok(result) => { let update_id = running.update_id; @@ -874,7 +905,7 @@ mod tests { // image". loop { match writer.status(boot_disk) { - BootDiskOsWriteStatus::NoUpdateRunning => { + BootDiskOsWriteStatus::NoUpdateStarted => { tokio::time::sleep(Duration::from_millis(50)).await; continue; } @@ -972,7 +1003,7 @@ mod tests { DisplayErrorChain::new(&expected_error).to_string() ); } - BootDiskOsWriteStatus::NoUpdateRunning + BootDiskOsWriteStatus::NoUpdateStarted | BootDiskOsWriteStatus::InProgress { .. } | BootDiskOsWriteStatus::Complete { .. } => { panic!("unexpected status {status:?}") @@ -1091,7 +1122,7 @@ mod tests { tokio::time::sleep(Duration::from_millis(50)).await; continue; } - BootDiskOsWriteStatus::NoUpdateRunning + BootDiskOsWriteStatus::NoUpdateStarted | BootDiskOsWriteStatus::Failed { .. } => { panic!("unexpected status {status:?}") } @@ -1165,7 +1196,7 @@ mod tests { continue; } BootDiskOsWriteStatus::Complete { .. } - | BootDiskOsWriteStatus::NoUpdateRunning => { + | BootDiskOsWriteStatus::NoUpdateStarted => { panic!("unexpected status {status:?}"); } } @@ -1267,7 +1298,7 @@ mod tests { break; } BootDiskOsWriteStatus::Failed { .. } - | BootDiskOsWriteStatus::NoUpdateRunning => { + | BootDiskOsWriteStatus::NoUpdateStarted => { panic!("unexpected status {status:?}"); } } @@ -1379,7 +1410,7 @@ mod tests { break; } BootDiskOsWriteStatus::Failed { .. } - | BootDiskOsWriteStatus::NoUpdateRunning => { + | BootDiskOsWriteStatus::NoUpdateStarted => { panic!("unexpected status {status:?}"); } } diff --git a/sled-agent/src/http_entrypoints.rs b/sled-agent/src/http_entrypoints.rs index 714bf269d8..e1112d4ed6 100644 --- a/sled-agent/src/http_entrypoints.rs +++ b/sled-agent/src/http_entrypoints.rs @@ -854,22 +854,31 @@ async fn host_os_write_start( Ok(HttpResponseUpdatedNoContent()) } +/// Current progress of an OS image being written to disk. #[derive( Debug, Clone, Copy, PartialEq, Eq, Deserialize, JsonSchema, Serialize, )] #[serde(tag = "state", rename_all = "snake_case")] pub enum BootDiskOsWriteProgress { + /// The image is still being uploaded. ReceivingUploadedImage { bytes_received: usize }, + /// The image is being written to disk. WritingImageToDisk { bytes_written: usize }, + /// The image is being read back from disk for validation. ValidatingWrittenImage { bytes_read: usize }, } +/// Status of an update to a boot disk OS. #[derive(Debug, Clone, Deserialize, JsonSchema, Serialize)] #[serde(tag = "status", rename_all = "snake_case")] pub enum BootDiskOsWriteStatus { - NoUpdateRunning, + /// No update has been started for this disk since this server started. + NoUpdateStarted, + /// An update is currently running. InProgress { update_id: Uuid, progress: BootDiskOsWriteProgress }, + /// The most recent update completed successfully. Complete { update_id: Uuid }, + /// The most recent update failed. Failed { update_id: Uuid, message: String }, } From af2a645d2a2c1dd77dcd0cfaadce9e6620a2f320 Mon Sep 17 00:00:00 2001 From: John Gallagher Date: Wed, 6 Dec 2023 16:50:22 -0500 Subject: [PATCH 08/13] update sled-agent openapi spec --- openapi/sled-agent.json | 254 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 254 insertions(+) diff --git a/openapi/sled-agent.json b/openapi/sled-agent.json index 5e217b27a4..71109ad551 100644 --- a/openapi/sled-agent.json +++ b/openapi/sled-agent.json @@ -10,6 +10,96 @@ "version": "0.0.1" }, "paths": { + "/boot-disk/{boot_disk}/os/write": { + "post": { + "summary": "Write a new host OS image to the specified boot disk", + "operationId": "host_os_write_start", + "parameters": [ + { + "in": "path", + "name": "boot_disk", + "required": true, + "schema": { + "$ref": "#/components/schemas/M2Slot" + } + }, + { + "in": "query", + "name": "sha3_256_digest", + "required": true, + "schema": { + "type": "string", + "format": "hex string (32 bytes)" + } + }, + { + "in": "query", + "name": "update_id", + "required": true, + "schema": { + "type": "string", + "format": "uuid" + } + } + ], + "requestBody": { + "content": { + "application/octet-stream": { + "schema": { + "type": "string", + "format": "binary" + } + } + }, + "required": true + }, + "responses": { + "204": { + "description": "resource updated" + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + }, + "/boot-disk/{boot_disk}/os/write/status": { + "get": { + "summary": "Get the status of writing a new host OS", + "operationId": "host_os_write_status", + "parameters": [ + { + "in": "path", + "name": "boot_disk", + "required": true, + "schema": { + "$ref": "#/components/schemas/M2Slot" + } + } + ], + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/BootDiskOsWriteStatus" + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + }, "/cockroachdb": { "post": { "summary": "Initializes a CockroachDB cluster", @@ -2135,6 +2225,162 @@ "range" ] }, + "BootDiskOsWriteProgress": { + "description": "Current progress of an OS image being written to disk.", + "oneOf": [ + { + "description": "The image is still being uploaded.", + "type": "object", + "properties": { + "bytes_received": { + "type": "integer", + "format": "uint", + "minimum": 0 + }, + "state": { + "type": "string", + "enum": [ + "receiving_uploaded_image" + ] + } + }, + "required": [ + "bytes_received", + "state" + ] + }, + { + "description": "The image is being written to disk.", + "type": "object", + "properties": { + "bytes_written": { + "type": "integer", + "format": "uint", + "minimum": 0 + }, + "state": { + "type": "string", + "enum": [ + "writing_image_to_disk" + ] + } + }, + "required": [ + "bytes_written", + "state" + ] + }, + { + "description": "The image is being read back from disk for validation.", + "type": "object", + "properties": { + "bytes_read": { + "type": "integer", + "format": "uint", + "minimum": 0 + }, + "state": { + "type": "string", + "enum": [ + "validating_written_image" + ] + } + }, + "required": [ + "bytes_read", + "state" + ] + } + ] + }, + "BootDiskOsWriteStatus": { + "description": "Status of an update to a boot disk OS.", + "oneOf": [ + { + "description": "No update has been started for this disk since this server started.", + "type": "object", + "properties": { + "status": { + "type": "string", + "enum": [ + "no_update_started" + ] + } + }, + "required": [ + "status" + ] + }, + { + "description": "An update is currently running.", + "type": "object", + "properties": { + "progress": { + "$ref": "#/components/schemas/BootDiskOsWriteProgress" + }, + "status": { + "type": "string", + "enum": [ + "in_progress" + ] + }, + "update_id": { + "type": "string", + "format": "uuid" + } + }, + "required": [ + "progress", + "status", + "update_id" + ] + }, + { + "description": "The most recent update completed successfully.", + "type": "object", + "properties": { + "status": { + "type": "string", + "enum": [ + "complete" + ] + }, + "update_id": { + "type": "string", + "format": "uuid" + } + }, + "required": [ + "status", + "update_id" + ] + }, + { + "description": "The most recent update failed.", + "type": "object", + "properties": { + "message": { + "type": "string" + }, + "status": { + "type": "string", + "enum": [ + "failed" + ] + }, + "update_id": { + "type": "string", + "format": "uuid" + } + }, + "required": [ + "message", + "status", + "update_id" + ] + } + ] + }, "BundleUtilization": { "description": "The portion of a debug dataset used for zone bundles.", "type": "object", @@ -6480,6 +6726,14 @@ "description": "Zpool names are of the format ox{i,p}_. They are either Internal or External, and should be unique", "type": "string", "pattern": "^ox[ip]_[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$" + }, + "M2Slot": { + "description": "An M.2 slot that was written.", + "type": "string", + "enum": [ + "A", + "B" + ] } }, "responses": { From 8aba5583671d462a004594c655fc5b805f552db3 Mon Sep 17 00:00:00 2001 From: John Gallagher Date: Thu, 7 Dec 2023 09:35:40 -0500 Subject: [PATCH 09/13] fix installinator unit tests --- Cargo.lock | 2 ++ installinator-common/Cargo.toml | 4 ++++ installinator-common/src/block_size_writer.rs | 15 ++++++++++++++- 3 files changed, 20 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index fcffb36978..79c6bf93bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3263,10 +3263,12 @@ dependencies = [ "illumos-utils", "libc", "omicron-workspace-hack", + "proptest", "schemars", "serde", "serde_json", "serde_with", + "test-strategy", "thiserror", "tokio", "update-engine", diff --git a/installinator-common/Cargo.toml b/installinator-common/Cargo.toml index ffec81e03d..dd8540c6f8 100644 --- a/installinator-common/Cargo.toml +++ b/installinator-common/Cargo.toml @@ -17,3 +17,7 @@ thiserror.workspace = true tokio.workspace = true update-engine.workspace = true omicron-workspace-hack.workspace = true + +[dev-dependencies] +proptest.workspace = true +test-strategy.workspace = true diff --git a/installinator-common/src/block_size_writer.rs b/installinator-common/src/block_size_writer.rs index 488e7338db..1548594b41 100644 --- a/installinator-common/src/block_size_writer.rs +++ b/installinator-common/src/block_size_writer.rs @@ -141,8 +141,8 @@ impl AsyncWrite for BlockSizeBufWriter { #[cfg(test)] mod tests { use super::*; - use crate::test_helpers::with_test_runtime; use anyhow::Result; + use std::future::Future; use test_strategy::proptest; use tokio::io::AsyncWriteExt; @@ -180,6 +180,19 @@ mod tests { } } + fn with_test_runtime(f: F) -> T + where + F: FnOnce() -> Fut, + Fut: Future, + { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_time() + .start_paused(true) + .build() + .expect("tokio Runtime built successfully"); + runtime.block_on(f()) + } + #[proptest] fn proptest_block_writer( chunks: Vec>, From ed58889f75151a8a3d6d57272f5267f26a5a6798 Mon Sep 17 00:00:00 2001 From: John Gallagher Date: Thu, 7 Dec 2023 09:36:27 -0500 Subject: [PATCH 10/13] restore unrelated change --- dev-tools/omdb/tests/successes.out | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev-tools/omdb/tests/successes.out b/dev-tools/omdb/tests/successes.out index ef5330a288..65520ab59c 100644 --- a/dev-tools/omdb/tests/successes.out +++ b/dev-tools/omdb/tests/successes.out @@ -333,7 +333,7 @@ task: "nat_v4_garbage_collector" currently executing: no last completed activation: iter 2, triggered by an explicit signal started at (s ago) and ran for ms - last completion reported error: failed to read generation of dpd: Error Response: status: 404 Not Found; headers: {"content-type": "application/json", "x-request-id": "REDACTED_UUID_REDACTED_UUID_REDACTED", "content-length": "84", "date": "Wed, 06 Dec 2023 18:36:02 GMT"}; value: Error { error_code: None, message: "Not Found", request_id: "REDACTED_UUID_REDACTED_UUID_REDACTED" } +warning: unknown background task: "nat_v4_garbage_collector" (don't know how to interpret details: Null) task: "external_endpoints" configured period: every 1m From af908e3f2511bd0d88df58f7b969954ad6cf5306 Mon Sep 17 00:00:00 2001 From: John Gallagher Date: Thu, 7 Dec 2023 09:50:07 -0500 Subject: [PATCH 11/13] review feedback --- installinator-common/src/raw_disk_writer.rs | 2 +- sled-agent/src/boot_disk_os_writer.rs | 24 +++++++++++++++------ 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/installinator-common/src/raw_disk_writer.rs b/installinator-common/src/raw_disk_writer.rs index af72cd239c..35d3862e67 100644 --- a/installinator-common/src/raw_disk_writer.rs +++ b/installinator-common/src/raw_disk_writer.rs @@ -94,7 +94,7 @@ impl RawDiskWriter { } }) .await - .unwrap() + .expect("task panicked") } } diff --git a/sled-agent/src/boot_disk_os_writer.rs b/sled-agent/src/boot_disk_os_writer.rs index dfd99964cc..1f75ee5742 100644 --- a/sled-agent/src/boot_disk_os_writer.rs +++ b/sled-agent/src/boot_disk_os_writer.rs @@ -138,6 +138,13 @@ impl From<&BootDiskOsWriteError> for HttpError { } } +// Note to future maintainers: `installinator` uses the `update_engine` crate to +// drive its process (which includes writing the boot disk). We could also use +// `update_engine` inside `BootDiskOsWriter`; instead, we've hand-rolled a state +// machine with manual progress reporting. The current implementation is +// _probably_ simple enough that this was a reasonable choice, but if it becomes +// more complex (or if additional work needs to be done that `update_engine` +// would make easier), consider switching it over. #[derive(Debug)] pub(crate) struct BootDiskOsWriter { // Note: We use a std Mutex here to avoid cancellation issues with tokio @@ -424,7 +431,8 @@ impl BootDiskOsWriteTask { // `StreamingBody` attached to their request. // // If this step fails, we will send the error to the client who sent the - // request _and_ a copy of the same error in our current update state. + // request _and_ store a copy of the same error in our current update + // state. let (image_tempfile, image_size) = match self .download_body_to_tempfile(image_upload) .await @@ -465,7 +473,7 @@ impl BootDiskOsWriteTask { let mut tempfile = tokio::io::BufWriter::new(tokio::fs::File::from_std(tempfile)); - let mut image_upload = std::pin::pin!(image_upload.into_stream()); + let mut image_upload = std::pin::pin!(image_upload); let mut hasher = Sha3_256::default(); let mut bytes_received = 0; @@ -719,7 +727,6 @@ mod tests { use omicron_test_utils::dev::test_setup_log; use rand::RngCore; use std::mem; - use std::path::PathBuf; use std::pin::Pin; use std::task::ready; use std::task::Context; @@ -742,7 +749,7 @@ mod tests { #[derive(Debug, Clone, PartialEq, Eq)] struct InMemoryDiskContents { - path: PathBuf, + path: Utf8PathBuf, data: Vec, } @@ -770,7 +777,10 @@ mod tests { async fn open_writer(&self, path: &Path) -> io::Result { Ok(InMemoryDiskWriter { - opened_path: path.into(), + opened_path: path + .to_owned() + .try_into() + .expect("non-utf8 test path"), data: BlockSizeBufWriter::with_block_size( Self::BLOCK_SIZE, Vec::new(), @@ -795,7 +805,7 @@ mod tests { } struct InMemoryDiskWriter { - opened_path: PathBuf, + opened_path: Utf8PathBuf, data: BlockSizeBufWriter>, semaphore: PollSemaphore, finalized_writes: Arc>>, @@ -1326,7 +1336,7 @@ mod tests { assert!( written_disks.contains(&expected), "written disks missing expected contents for {}", - expected.path.display(), + expected.path, ); } From 865d281db9c9d191852f264e8553e92f79b24bce Mon Sep 17 00:00:00 2001 From: John Gallagher Date: Thu, 7 Dec 2023 17:22:00 -0500 Subject: [PATCH 12/13] move sled-agent dropshot config to toml file; bump request max size to allow for host OS images --- sled-agent/src/config.rs | 6 ++++++ sled-agent/src/server.rs | 7 ++++--- smf/sled-agent/gimlet-standalone/config.toml | 5 +++++ smf/sled-agent/gimlet/config.toml | 5 +++++ smf/sled-agent/non-gimlet/config.toml | 5 +++++ 5 files changed, 25 insertions(+), 3 deletions(-) diff --git a/sled-agent/src/config.rs b/sled-agent/src/config.rs index a596cf83db..058f343e2a 100644 --- a/sled-agent/src/config.rs +++ b/sled-agent/src/config.rs @@ -6,6 +6,7 @@ use crate::updates::ConfigUpdates; use camino::{Utf8Path, Utf8PathBuf}; +use dropshot::ConfigDropshot; use dropshot::ConfigLogging; use illumos_utils::dladm::Dladm; use illumos_utils::dladm::FindPhysicalLinkError; @@ -44,6 +45,11 @@ pub struct SoftPortConfig { #[derive(Clone, Debug, Deserialize)] #[serde(deny_unknown_fields)] pub struct Config { + /// Configuration for the sled agent dropshot server + /// + /// If the `bind_address` is set, it will be ignored. The remaining fields + /// will be respected. + pub dropshot: ConfigDropshot, /// Configuration for the sled agent debug log pub log: ConfigLogging, /// The sled's mode of operation (auto detect or force gimlet/scrimlet). diff --git a/sled-agent/src/server.rs b/sled-agent/src/server.rs index 903c8dabaa..b93ad0721c 100644 --- a/sled-agent/src/server.rs +++ b/sled-agent/src/server.rs @@ -70,9 +70,10 @@ impl Server { .await .map_err(|e| e.to_string())?; - let mut dropshot_config = dropshot::ConfigDropshot::default(); - dropshot_config.request_body_max_bytes = 1024 * 1024; - dropshot_config.bind_address = SocketAddr::V6(sled_address); + let dropshot_config = dropshot::ConfigDropshot { + bind_address: SocketAddr::V6(sled_address), + ..config.dropshot + }; let dropshot_log = log.new(o!("component" => "dropshot (SledAgent)")); let http_server = dropshot::HttpServerStarter::new( &dropshot_config, diff --git a/smf/sled-agent/gimlet-standalone/config.toml b/smf/sled-agent/gimlet-standalone/config.toml index e714504311..4d06895453 100644 --- a/smf/sled-agent/gimlet-standalone/config.toml +++ b/smf/sled-agent/gimlet-standalone/config.toml @@ -41,6 +41,11 @@ swap_device_size_gb = 256 data_links = ["net0", "net1"] +[dropshot] +# Host OS images are just over 800 MiB currently; set this to 2 GiB to give some +# breathing room. +request_body_max_bytes = 2_147_483_648 + [log] level = "info" mode = "file" diff --git a/smf/sled-agent/gimlet/config.toml b/smf/sled-agent/gimlet/config.toml index 442e76b393..666d55f359 100644 --- a/smf/sled-agent/gimlet/config.toml +++ b/smf/sled-agent/gimlet/config.toml @@ -37,6 +37,11 @@ swap_device_size_gb = 256 data_links = ["cxgbe0", "cxgbe1"] +[dropshot] +# Host OS images are just over 800 MiB currently; set this to 2 GiB to give some +# breathing room. +request_body_max_bytes = 2_147_483_648 + [log] level = "info" mode = "file" diff --git a/smf/sled-agent/non-gimlet/config.toml b/smf/sled-agent/non-gimlet/config.toml index 176f4002a5..432652c50b 100644 --- a/smf/sled-agent/non-gimlet/config.toml +++ b/smf/sled-agent/non-gimlet/config.toml @@ -76,6 +76,11 @@ switch_zone_maghemite_links = ["tfportrear0_0"] data_links = ["net0", "net1"] +[dropshot] +# Host OS images are just over 800 MiB currently; set this to 2 GiB to give some +# breathing room. +request_body_max_bytes = 2_147_483_648 + [log] level = "info" mode = "file" From 8a65c206c03d2229270e4fc2a736f9373bcabc81 Mon Sep 17 00:00:00 2001 From: John Gallagher Date: Fri, 8 Dec 2023 10:17:14 -0500 Subject: [PATCH 13/13] Require explicitly clearing old update status before starting a new one --- openapi/sled-agent.json | 40 +++- sled-agent/src/boot_disk_os_writer.rs | 306 ++++++++++++++++++++++---- sled-agent/src/http_entrypoints.rs | 32 ++- 3 files changed, 333 insertions(+), 45 deletions(-) diff --git a/openapi/sled-agent.json b/openapi/sled-agent.json index 71109ad551..b5206b03a6 100644 --- a/openapi/sled-agent.json +++ b/openapi/sled-agent.json @@ -69,7 +69,7 @@ "/boot-disk/{boot_disk}/os/write/status": { "get": { "summary": "Get the status of writing a new host OS", - "operationId": "host_os_write_status", + "operationId": "host_os_write_status_get", "parameters": [ { "in": "path", @@ -100,6 +100,42 @@ } } }, + "/boot-disk/{boot_disk}/os/write/status/{update_id}": { + "delete": { + "summary": "Clear the status of a completed write of a new host OS", + "operationId": "host_os_write_status_delete", + "parameters": [ + { + "in": "path", + "name": "boot_disk", + "required": true, + "schema": { + "$ref": "#/components/schemas/M2Slot" + } + }, + { + "in": "path", + "name": "update_id", + "required": true, + "schema": { + "type": "string", + "format": "uuid" + } + } + ], + "responses": { + "204": { + "description": "resource updated" + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + }, "/cockroachdb": { "post": { "summary": "Initializes a CockroachDB cluster", @@ -2297,7 +2333,7 @@ "description": "Status of an update to a boot disk OS.", "oneOf": [ { - "description": "No update has been started for this disk since this server started.", + "description": "No update has been started for this disk, or any previously-started update has completed and had its status cleared.", "type": "object", "properties": { "status": { diff --git a/sled-agent/src/boot_disk_os_writer.rs b/sled-agent/src/boot_disk_os_writer.rs index 1f75ee5742..a0798ed174 100644 --- a/sled-agent/src/boot_disk_os_writer.rs +++ b/sled-agent/src/boot_disk_os_writer.rs @@ -59,8 +59,10 @@ pub(crate) enum BootDiskOsWriteError { // shutdown). #[error("internal error (task panic)")] TaskPanic, - #[error("another update is still running ({0})")] - AnotherUpdateRunning(Uuid), + #[error("an update is still running ({0})")] + UpdateRunning(Uuid), + #[error("a previous update completed ({0}); clear its status before starting a new update")] + CannotStartWithoutClearingPreviousStatus(Uuid), #[error("failed to create temporary file")] FailedCreatingTempfile(#[source] io::Error), #[error("failed writing to temporary file")] @@ -106,18 +108,26 @@ pub(crate) enum BootDiskOsWriteError { expected: String, got: String, }, + #[error("unexpected update ID {0}: cannot clear status")] + WrongUpdateIdClearingStatus(Uuid), } impl From<&BootDiskOsWriteError> for HttpError { fn from(error: &BootDiskOsWriteError) -> Self { let message = DisplayErrorChain::new(error).to_string(); match error { - BootDiskOsWriteError::AnotherUpdateRunning(_) + BootDiskOsWriteError::UpdateRunning(_) + | BootDiskOsWriteError::CannotStartWithoutClearingPreviousStatus( + _, + ) | BootDiskOsWriteError::FailedDownloadingImage(_) | BootDiskOsWriteError::UploadedImageHashMismatch { .. } | BootDiskOsWriteError::ImageSizeNotMultipleOfBlockSize { .. - } => HttpError::for_bad_request(None, message), + } + | BootDiskOsWriteError::WrongUpdateIdClearingStatus(_) => { + HttpError::for_bad_request(None, message) + } BootDiskOsWriteError::TaskPanic | BootDiskOsWriteError::FailedCreatingTempfile(_) | BootDiskOsWriteError::FailedWritingTempfile(_) @@ -176,6 +186,7 @@ impl BootDiskOsWriter { /// following are true: /// /// * A previously-started update of this same `boot_disk` is still running + /// * A previously-completed update has not had its status cleared /// * The `image_upload` stream returns an error /// * The hash of the data provided by `image_upload` does not match /// `sha3_256_digest` @@ -264,42 +275,25 @@ impl BootDiskOsWriter { } Entry::Occupied(mut slot) => match slot.get_mut() { WriterState::TaskRunning(running) => { - // Check whether the task is _actually_ still running, - // or whether it's done and just waiting for us to - // realize it. - match running.complete_rx.try_recv() { - Ok(_prev_result) => { - // A previous write is done, but we're - // immedately starting a new one, so discard the - // previous result. - let (uploaded_image_rx, running) = - spawn_update_task(); - slot.insert(WriterState::TaskRunning(running)); - uploaded_image_rx - } - Err(TryRecvError::Empty) => { - return Err(Arc::new( - BootDiskOsWriteError::AnotherUpdateRunning( - running.update_id, - ), - )); - } - Err(TryRecvError::Closed) => { - return Err(Arc::new( - BootDiskOsWriteError::TaskPanic, - )); - } - } + // It's possible this task is actually complete and a + // result is sitting in the `running.complete_rx` + // oneshot, but for the purposes of starting a new + // update it doesn't matter either way: we'll refuse to + // start. Return the "another update running" error; the + // caller will have to check the `status()`, which will + // trigger a "see if it's actually done after all" + // check. + return Err(Arc::new( + BootDiskOsWriteError::UpdateRunning( + running.update_id, + ), + )); } - // TODO-correctness Should we separate require callers to - // explicitly clear out the results of a completed update - // before starting a new update? (Answer could be different - // depending on whether the most recent update succeeded or - // failed.) - WriterState::Complete(_) => { - let (uploaded_image_rx, running) = spawn_update_task(); - slot.insert(WriterState::TaskRunning(running)); - uploaded_image_rx + WriterState::Complete(complete) => { + return Err(Arc::new( + BootDiskOsWriteError::CannotStartWithoutClearingPreviousStatus( + complete.update_id, + ))); } }, } @@ -310,6 +304,80 @@ impl BootDiskOsWriter { uploaded_image_rx.await.map_err(|_| BootDiskOsWriteError::TaskPanic)? } + /// Clear the status of a finished or failed update with the given ID + /// targetting `boot_disk`. + /// + /// If no update has ever been started for this `boot_disk`, returns + /// `Ok(())`. + /// + /// # Errors + /// + /// Fails if an update to `boot_disk` is currently running; only terminal + /// statuses can be cleared. Fails if the most recent terminal status + /// targetting `boot_disk` had a different update ID. + pub(crate) fn clear_terminal_status( + &self, + boot_disk: M2Slot, + update_id: Uuid, + ) -> Result<(), BootDiskOsWriteError> { + let mut states = self.states.lock().unwrap(); + let mut slot = match states.entry(boot_disk) { + // No status; nothing to clear. + Entry::Vacant(_slot) => return Ok(()), + Entry::Occupied(slot) => slot, + }; + + match slot.get_mut() { + WriterState::Complete(complete) => { + if complete.update_id == update_id { + slot.remove(); + Ok(()) + } else { + Err(BootDiskOsWriteError::WrongUpdateIdClearingStatus( + complete.update_id, + )) + } + } + WriterState::TaskRunning(running) => { + // Check whether the task is _actually_ still running, + // or whether it's done and just waiting for us to + // realize it. + match running.complete_rx.try_recv() { + Ok(result) => { + if running.update_id == update_id { + // This is a little surprising but legal: we've been + // asked to clear the terminal status of this + // update_id, even though we just realized it + // finished. + slot.remove(); + Ok(()) + } else { + let running_update_id = running.update_id; + // A different update just finished; store the + // result we got from the oneshot and don't remove + // the status. + slot.insert(WriterState::Complete( + TaskCompleteState { + update_id: running_update_id, + result, + }, + )); + Err(BootDiskOsWriteError::WrongUpdateIdClearingStatus( + running_update_id + )) + } + } + Err(TryRecvError::Empty) => Err( + BootDiskOsWriteError::UpdateRunning(running.update_id), + ), + Err(TryRecvError::Closed) => { + Err(BootDiskOsWriteError::TaskPanic) + } + } + } + } + } + /// Get the status of any update running that targets `boot_disk`. pub(crate) fn status(&self, boot_disk: M2Slot) -> BootDiskOsWriteStatus { let mut states = self.states.lock().unwrap(); @@ -1396,7 +1464,7 @@ mod tests { .await .unwrap_err(); match &*error { - BootDiskOsWriteError::AnotherUpdateRunning(running_id) => { + BootDiskOsWriteError::UpdateRunning(running_id) => { assert_eq!(*running_id, update_id_a); } _ => panic!("unexpected error {error}"), @@ -1440,4 +1508,162 @@ mod tests { logctx.cleanup_successful(); } + + #[tokio::test] + async fn boot_disk_os_writer_rejects_new_updates_while_old_completed() { + let logctx = test_setup_log( + "boot_disk_os_writer_rejects_new_updates_while_old_completed", + ); + + // generate two small, random "OS image"s consisting of 10 "blocks" each + let num_data_blocks = 10; + let data_len = num_data_blocks * InMemoryDiskInterface::BLOCK_SIZE; + let mut data_a = vec![0; data_len]; + let mut data_b = vec![0; data_len]; + rand::thread_rng().fill_bytes(&mut data_a); + rand::thread_rng().fill_bytes(&mut data_b); + let data_hash_a = Sha3_256::digest(&data_a); + let data_hash_b = Sha3_256::digest(&data_b); + + // generate a disk writer with effectively infinite semaphore permits + let inject_disk_interface = + InMemoryDiskInterface::new(Semaphore::new(Semaphore::MAX_PERMITS)); + + let writer = Arc::new(BootDiskOsWriter::new(&logctx.log)); + let disk_devfs_path = "/unit-test/disk"; + let boot_disk = M2Slot::A; + + let update_id_a = Uuid::new_v4(); + let update_id_b = Uuid::new_v4(); + + writer + .start_update_impl( + boot_disk, + disk_devfs_path.into(), + update_id_a, + data_hash_a.into(), + stream::once(future::ready(Ok(Bytes::from(data_a.clone())))), + inject_disk_interface.clone(), + ) + .await + .unwrap(); + + // Wait for the first update to complete successfully. + tokio::time::timeout(TEST_TIMEOUT, async { + loop { + let status = writer.status(boot_disk); + match status { + BootDiskOsWriteStatus::InProgress { update_id, .. } => { + assert_eq!(update_id, update_id_a); + println!("saw irrelevant status {status:?}"); + tokio::time::sleep(Duration::from_millis(50)).await; + continue; + } + BootDiskOsWriteStatus::Complete { update_id } => { + assert_eq!(update_id, update_id_a); + break; + } + BootDiskOsWriteStatus::Failed { .. } + | BootDiskOsWriteStatus::NoUpdateStarted => { + panic!("unexpected status {status:?}"); + } + } + } + }) + .await + .unwrap(); + + // Ensure we wrote the contents of the first update. + let expected_disks = [InMemoryDiskContents { + path: disk_devfs_path.into(), + data: data_a, + }]; + { + let mut written_disks = + inject_disk_interface.finalized_writes.lock().unwrap(); + assert_eq!(*written_disks, expected_disks); + written_disks.clear(); + } + + // Check that we get the expected error when attempting to start another + // update to this same disk. + let expected_error = + BootDiskOsWriteError::CannotStartWithoutClearingPreviousStatus( + update_id_a, + ); + let error = writer + .start_update_impl( + boot_disk, + disk_devfs_path.into(), + update_id_b, + data_hash_b.into(), + stream::once(future::ready(Ok(Bytes::from(data_b.clone())))), + inject_disk_interface.clone(), + ) + .await + .unwrap_err(); + assert_eq!(error.to_string(), expected_error.to_string()); + + // We should not be able to clear the status with an incorrect update + // ID. + let expected_error = + BootDiskOsWriteError::WrongUpdateIdClearingStatus(update_id_a); + let error = + writer.clear_terminal_status(boot_disk, update_id_b).unwrap_err(); + assert_eq!(error.to_string(), expected_error.to_string()); + + // We should be able to clear the status with the correct update ID, and + // then start the new one. + writer.clear_terminal_status(boot_disk, update_id_a).unwrap(); + writer + .start_update_impl( + boot_disk, + disk_devfs_path.into(), + update_id_b, + data_hash_b.into(), + stream::once(future::ready(Ok(Bytes::from(data_b.clone())))), + inject_disk_interface.clone(), + ) + .await + .unwrap(); + + // Wait for the second update to complete successfully. + tokio::time::timeout(TEST_TIMEOUT, async { + loop { + let status = writer.status(boot_disk); + match status { + BootDiskOsWriteStatus::InProgress { update_id, .. } => { + assert_eq!(update_id, update_id_b); + println!("saw irrelevant status {status:?}"); + tokio::time::sleep(Duration::from_millis(50)).await; + continue; + } + BootDiskOsWriteStatus::Complete { update_id } => { + assert_eq!(update_id, update_id_b); + break; + } + BootDiskOsWriteStatus::Failed { .. } + | BootDiskOsWriteStatus::NoUpdateStarted => { + panic!("unexpected status {status:?}"); + } + } + } + }) + .await + .unwrap(); + + // Ensure we wrote the contents of the second update. + let expected_disks = [InMemoryDiskContents { + path: disk_devfs_path.into(), + data: data_b, + }]; + { + let mut written_disks = + inject_disk_interface.finalized_writes.lock().unwrap(); + assert_eq!(*written_disks, expected_disks); + written_disks.clear(); + } + + logctx.cleanup_successful(); + } } diff --git a/sled-agent/src/http_entrypoints.rs b/sled-agent/src/http_entrypoints.rs index e1112d4ed6..e2922c55d4 100644 --- a/sled-agent/src/http_entrypoints.rs +++ b/sled-agent/src/http_entrypoints.rs @@ -80,7 +80,8 @@ pub fn api() -> SledApiDescription { api.register(add_sled_to_initialized_rack)?; api.register(metrics_collect)?; api.register(host_os_write_start)?; - api.register(host_os_write_status)?; + api.register(host_os_write_status_get)?; + api.register(host_os_write_status_delete)?; Ok(()) } @@ -764,6 +765,12 @@ pub struct BootDiskPathParams { pub boot_disk: M2Slot, } +#[derive(Clone, Copy, Debug, Deserialize, JsonSchema, Serialize)] +pub struct BootDiskUpdatePathParams { + pub boot_disk: M2Slot, + pub update_id: Uuid, +} + #[derive(Clone, Copy, Debug, Deserialize, JsonSchema, Serialize)] pub struct BootDiskWriteStartQueryParams { pub update_id: Uuid, @@ -872,7 +879,8 @@ pub enum BootDiskOsWriteProgress { #[derive(Debug, Clone, Deserialize, JsonSchema, Serialize)] #[serde(tag = "status", rename_all = "snake_case")] pub enum BootDiskOsWriteStatus { - /// No update has been started for this disk since this server started. + /// No update has been started for this disk, or any previously-started + /// update has completed and had its status cleared. NoUpdateStarted, /// An update is currently running. InProgress { update_id: Uuid, progress: BootDiskOsWriteProgress }, @@ -887,7 +895,7 @@ pub enum BootDiskOsWriteStatus { method = GET, path = "/boot-disk/{boot_disk}/os/write/status", }] -async fn host_os_write_status( +async fn host_os_write_status_get( request_context: RequestContext, path_params: Path, ) -> Result, HttpError> { @@ -896,3 +904,21 @@ async fn host_os_write_status( let status = sa.boot_disk_os_writer().status(boot_disk); Ok(HttpResponseOk(status)) } + +/// Clear the status of a completed write of a new host OS +#[endpoint { + method = DELETE, + path = "/boot-disk/{boot_disk}/os/write/status/{update_id}", +}] +async fn host_os_write_status_delete( + request_context: RequestContext, + path_params: Path, +) -> Result { + let sa = request_context.context(); + let BootDiskUpdatePathParams { boot_disk, update_id } = + path_params.into_inner(); + sa.boot_disk_os_writer() + .clear_terminal_status(boot_disk, update_id) + .map_err(|err| HttpError::from(&err))?; + Ok(HttpResponseUpdatedNoContent()) +}