Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sled-agent] add preliminary "write boot disk OS" http endpoints #4633

Merged
merged 13 commits into from
Dec 8, 2023
Merged
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub mod postgres_config;
pub mod update;
pub mod vlan;

pub use update::hex_schema;

#[macro_export]
macro_rules! generate_logging_api {
($path:literal) => {
Expand Down
4 changes: 3 additions & 1 deletion common/src/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,9 @@ impl FromStr for ArtifactHash {
}
}

fn hex_schema<const N: usize>(gen: &mut SchemaGenerator) -> Schema {
/// Produce an OpenAPI schema describing a hex array of a specific length (e.g.,
/// a hash digest).
pub fn hex_schema<const N: usize>(gen: &mut SchemaGenerator) -> Schema {
let mut schema: SchemaObject = <String>::json_schema(gen).into();
schema.format = Some(format!("hex string ({N} bytes)"));
schema.into()
Expand Down
6 changes: 6 additions & 0 deletions installinator-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,16 @@ license = "MPL-2.0"
anyhow.workspace = true
camino.workspace = true
illumos-utils.workspace = true
libc.workspace = true
schemars.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true
thiserror.workspace = true
tokio.workspace = true
update-engine.workspace = true
omicron-workspace-hack.workspace = true

[dev-dependencies]
proptest.workspace = true
test-strategy.workspace = true
Original file line number Diff line number Diff line change
Expand Up @@ -11,31 +11,37 @@ use tokio::io::AsyncWrite;

/// `BlockSizeBufWriter` is analogous to a tokio's `BufWriter`, except it
/// guarantees that writes made to the underlying writer are always
/// _exactly_ the requested block size, with two exceptions: explicitly
/// calling (1) `flush()` or (2) `shutdown()` will write any
/// buffered-but-not-yet-written data to the underlying buffer regardless of
/// its length.
/// _exactly_ the requested block size, with three exceptions:
///
/// 1. Calling `flush()` will write any currently-buffered data to the
/// underlying writer, regardless of its length.
/// 2. Similarily, calling `shutdown()` will flush any currently-buffered data
/// to the underlying writer.
/// 3. When `BlockSizeBufWriter` attempts to write a block-length amount of data
/// to the underlying writer, if that writer only accepts a portion of that
/// data, `BlockSizeBufWriter` will continue attempting to write the
/// remainder of the block.
///
/// When `BlockSizeBufWriter` is dropped, any buffered data it's holding
/// will be discarded. It is critical to manually call
/// `BlockSizeBufWriter:flush()` or `BlockSizeBufWriter::shutdown()` prior
/// to dropping to avoid data loss.
pub(crate) struct BlockSizeBufWriter<W> {
pub struct BlockSizeBufWriter<W> {
inner: W,
buf: Vec<u8>,
block_size: usize,
}

impl<W: AsyncWrite + Unpin> BlockSizeBufWriter<W> {
pub(crate) fn with_block_size(block_size: usize, inner: W) -> Self {
pub fn with_block_size(block_size: usize, inner: W) -> Self {
Self { inner, buf: Vec::with_capacity(block_size), block_size }
}

pub(crate) fn into_inner(self) -> W {
pub fn into_inner(self) -> W {
self.inner
}

pub(crate) fn block_size(&self) -> usize {
pub fn block_size(&self) -> usize {
self.block_size
}

Expand All @@ -46,6 +52,13 @@ impl<W: AsyncWrite + Unpin> BlockSizeBufWriter<W> {
fn flush_buf(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let mut written = 0;
let mut ret = Ok(());

// We expect this loop to execute exactly one time: we try to write the
// entirety of `self.buf` to `self.inner`, and presumably it is a type
// that expects to receive a block of data at once, so we'll immediately
// jump to `written == self.buf.len()`. If it returns `Ok(n)` for some
// `n < self.buf.len()`, we'll loop and try to write the rest of the
// data in less-than-block-sized chunks.
while written < self.buf.len() {
match ready!(
Pin::new(&mut self.inner).poll_write(cx, &self.buf[written..])
Expand Down Expand Up @@ -128,8 +141,8 @@ impl<W: AsyncWrite + Unpin> AsyncWrite for BlockSizeBufWriter<W> {
#[cfg(test)]
mod tests {
use super::*;
use crate::test_helpers::with_test_runtime;
use anyhow::Result;
use std::future::Future;
use test_strategy::proptest;
use tokio::io::AsyncWriteExt;

Expand Down Expand Up @@ -167,6 +180,19 @@ mod tests {
}
}

fn with_test_runtime<F, Fut, T>(f: F) -> T
where
F: FnOnce() -> Fut,
Fut: Future<Output = T>,
{
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_time()
.start_paused(true)
.build()
.expect("tokio Runtime built successfully");
runtime.block_on(f())
}

#[proptest]
fn proptest_block_writer(
chunks: Vec<Vec<u8>>,
Expand Down
4 changes: 4 additions & 0 deletions installinator-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

//! Common types shared by the installinator client and server.

mod block_size_writer;
mod progress;
mod raw_disk_writer;

pub use block_size_writer::*;
pub use progress::*;
pub use raw_disk_writer::*;
123 changes: 123 additions & 0 deletions installinator-common/src/raw_disk_writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

//! Async writer for raw disks on illumos (e.g., host OS phase 2 images written
//! to M.2 drives).

use crate::BlockSizeBufWriter;
use illumos_utils::dkio;
use illumos_utils::dkio::MediaInfoExtended;
use std::io;
use std::os::fd::AsRawFd;
use std::path::Path;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use tokio::fs::File;
use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt;

/// Writer for illumos raw disks.
///
/// Construct an instance via [`RawDiskWriter::open()`], write to it just like
/// any other async writer (it will handle passing writes down to the device in
/// chunks of length [`RawDiskWriter::block_size()`]), and then call
/// [`RawDiskWriter::finalize()`]. It is **critical** to call `finalize()`;
/// failure to do so will likely lead to data loss.
///
/// `RawDiskWriter` attempts to be as conservative as it can about ensuring data
/// is written:
///
/// * The device is opened with `O_SYNC`
/// * In `finalize()`, the file is `fsync`'d after any remaining data is flushed
/// * In `finalize()`, the disk write cache is flushed (if supported by the
/// target device)
///
/// Writing an amount of data that is not a multiple of the device's
/// `block_size()` will likely result in a failure when writing / flushing the
/// final not-correctly-sized chunk.
///
/// This type is illumos-specific due to using dkio for two things:
///
/// 1. Determining the logical block size of the device
/// 2. Flushing the disk write cache
pub struct RawDiskWriter {
inner: BlockSizeBufWriter<File>,
}

impl RawDiskWriter {
/// Open the disk device at `path` for writing, and attempt to determine its
/// logical block size via [`MediaInfoExtended`].
pub async fn open(path: &Path) -> io::Result<Self> {
let f = tokio::fs::OpenOptions::new()
.create(false)
.write(true)
.truncate(false)
.custom_flags(libc::O_SYNC)
.open(path)
.await?;

let media_info = MediaInfoExtended::from_fd(f.as_raw_fd())?;

let inner = BlockSizeBufWriter::with_block_size(
media_info.logical_block_size as usize,
f,
);

Ok(Self { inner })
}

/// The logical block size of the underlying device.
pub fn block_size(&self) -> usize {
self.inner.block_size()
}

/// Flush any remaining data and attempt to ensure synchronization with the
/// device.
pub async fn finalize(mut self) -> io::Result<()> {
// Flush any remaining data in our buffer
self.inner.flush().await?;

// `fsync` the file...
let f = self.inner.into_inner();
f.sync_all().await?;

// ...and also attempt to flush the disk write cache
tokio::task::spawn_blocking(move || {
match dkio::flush_write_cache(f.as_raw_fd()) {
Ok(()) => Ok(()),
// Some drives don't support `flush_write_cache`; we don't want
// to fail in this case.
Err(err) if err.raw_os_error() == Some(libc::ENOTSUP) => Ok(()),
Err(err) => Err(err),
}
})
.await
.expect("task panicked")
}
}

impl AsyncWrite for RawDiskWriter {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
Pin::new(&mut self.inner).poll_write(cx, buf)
}

fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.inner).poll_flush(cx)
}

fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.inner).poll_shutdown(cx)
}
}
1 change: 0 additions & 1 deletion installinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

mod artifact;
mod async_temp_file;
mod block_size_writer;
mod bootstrap;
mod dispatch;
mod errors;
Expand Down
Loading
Loading