diff --git a/Cargo.lock b/Cargo.lock index 693a74d8a..54fe42502 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -861,6 +861,7 @@ dependencies = [ "http", "hyper", "hyper-staticfile", + "itertools 0.11.0", "libc", "mime_guess", "nix", diff --git a/common/src/lib.rs b/common/src/lib.rs index e278b0d4b..b7f6aef47 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -146,6 +146,15 @@ pub enum CrucibleError { #[error("Invalid downstairs replace {0}")] ReplaceRequestInvalid(String), + + #[error("missing context slot for block {0}")] + MissingContextSlot(u64), + + #[error("metadata deserialization failed: {0}")] + BadMetadata(String), + + #[error("context slot deserialization failed: {0}")] + BadContextSlot(String), } impl From for CrucibleError { @@ -386,7 +395,10 @@ impl From for dropshot::HttpError { | CrucibleError::SubvolumeSizeMismatch | CrucibleError::UpstairsAlreadyActive | CrucibleError::UpstairsDeactivating - | CrucibleError::UuidMismatch => { + | CrucibleError::UuidMismatch + | CrucibleError::MissingContextSlot(..) + | CrucibleError::BadMetadata(..) + | CrucibleError::BadContextSlot(..) => { dropshot::HttpError::for_internal_error(e.to_string()) } } diff --git a/common/src/region.rs b/common/src/region.rs index 1595d48cd..46c2f4371 100644 --- a/common/src/region.rs +++ b/common/src/region.rs @@ -274,7 +274,7 @@ impl RegionDefinition { } } -#[derive(Deserialize, Serialize, Clone, Debug, PartialEq)] +#[derive(Serialize, Clone, Debug, PartialEq)] pub struct RegionOptions { /** * The size of each block in bytes. Must be a power of 2, minimum 512. diff --git a/downstairs/Cargo.toml b/downstairs/Cargo.toml index ed9cdaa0f..2cc93866e 100644 --- a/downstairs/Cargo.toml +++ b/downstairs/Cargo.toml @@ -22,6 +22,7 @@ hex.workspace = true http.workspace = true hyper-staticfile.workspace = true hyper.workspace = true +itertools.workspace = true libc.workspace = true mime_guess.workspace = true nix.workspace = true @@ -69,3 +70,4 @@ version_check = "0.9.4" asm = ["usdt/asm"] default = [] zfs_snapshot = [] +integration-tests = [] # Enables creating SQLite volumes diff --git a/downstairs/src/extent.rs b/downstairs/src/extent.rs index a03379a7c..345751233 100644 --- a/downstairs/src/extent.rs +++ b/downstairs/src/extent.rs @@ -2,10 +2,10 @@ use std::convert::TryInto; use std::fmt; use std::fmt::Debug; -use std::fs::File; +use std::fs::{File, OpenOptions}; use tokio::sync::Mutex; -use anyhow::{anyhow, bail, Result}; +use anyhow::{anyhow, bail, Context, Result}; use nix::unistd::{sysconf, SysconfVar}; use serde::{Deserialize, Serialize}; @@ -43,7 +43,7 @@ pub(crate) trait ExtentInner: Send + Debug { ) -> Result<(), CrucibleError>; fn read( - &self, + &mut self, job_id: JobId, requests: &[&crucible_protocol::ReadRequest], responses: &mut Vec, @@ -60,14 +60,15 @@ pub(crate) trait ExtentInner: Send + Debug { #[cfg(test)] fn get_block_contexts( - &self, + &mut self, block: u64, count: u64, ) -> Result>, CrucibleError>; /// Sets the dirty flag and updates a block context /// - /// This should only be called from test functions + /// This should only be called from test functions, where we want to + /// manually modify block contexts and test associated behavior #[cfg(test)] fn set_dirty_and_block_context( &mut self, @@ -76,7 +77,7 @@ pub(crate) trait ExtentInner: Send + Debug { } /// BlockContext, with the addition of block index and on_disk_hash -#[derive(Clone)] +#[derive(Copy, Clone)] pub struct DownstairsBlockContext { pub block_context: BlockContext, @@ -120,8 +121,18 @@ pub struct ExtentMeta { /// Extent version for SQLite-backed metadata /// /// See [`extent_inner_sqlite::SqliteInner`] for the implementation. +/// +/// This is no longer used when creating new extents, but we support opening +/// existing SQLite-based extents because snapshot images are on read-only +/// volumes, so we can't migrate them. +#[cfg(any(test, feature = "integration-tests"))] pub const EXTENT_META_SQLITE: u32 = 1; +/// Extent version for raw-file-backed metadata +/// +/// See [`extent_inner_raw::RawInner`] for the implementation. +pub const EXTENT_META_RAW: u32 = 2; + impl ExtentMeta { pub fn new(ext_version: u32) -> ExtentMeta { ExtentMeta { @@ -264,13 +275,13 @@ impl Extent { /** * Open an existing extent file at the location requested. - * Read in the metadata from the first block of the file. */ pub fn open( dir: &Path, def: &RegionDefinition, number: u32, read_only: bool, + backend: Backend, log: &Logger, ) -> Result { /* @@ -285,6 +296,11 @@ impl Extent { // If the replace directory exists for this extent, then it means // a repair was interrupted before it could finish. We will continue // the repair before we open the extent. + // + // Note that `move_replacement_extent` must support migrating old + // (SQLite-based) extents and new (raw file) extents, because it's + // possible that an old extent replacement crashed right before we + // updated Crucible. let replace_dir = replace_dir(dir, number); if !read_only && Path::new(&replace_dir).exists() { info!( @@ -295,26 +311,128 @@ impl Extent { move_replacement_extent(dir, number as usize, log)?; } - // Pick the format for the downstairs files + // We will migrate every read-write extent with a SQLite file present. + // + // We use the presence of the .db file as a marker to whether the extent + // data file has been migrated. // - // Right now, this only supports SQLite-flavored Downstairs - let inner = { - let mut sqlite_path = path.clone(); - sqlite_path.set_extension("db"); - if sqlite_path.exists() { + // Remember, the goal is to turn 2-4 files (extent data, .db, .db-wal, + // db-shm) into a single file containing + // + // - The extent data (same as before, at the beginning of the file) + // - Metadata and encryption context slots (after extent data in the + // - same file) + // + // Here's the procedure: + // + // - If the .db file is present + // - Truncate it to the length of the extent data. This is a no-op + // normally, but lets us recover from a partial migration (e.g. + // if we wrote the raw context but crashed before deleting the + // .db file) + // - Open the extent using our existing SQLite extent code + // - Using standard extent APIs, find the metadata and encryption + // context for each block. Append this to the existing data file. + // - Close the (SQLite) extent + // - Open the extent data file in append mode, and append the new + // raw metadata + block contexts to the end of the file. + // - Close the extent data file and fsync it to disk + // - Delete the .db file + // - fsync the containing directory (to make the deletion durable; + // not sure if this is necessary) + // - At this point, the .db file is not present and the extent data + // file represents a raw extent that has been persisted to disk + // - If the .db-wal file is present, remove it + // - If the .db-shm file is present, remove it + // + // It's safe to fail at any point in this procedure; the next time + // `Extent::open` is called, we will restart the migration (if we + // haven't gotten to deleting the `.db` file). + let mut has_sqlite = path.with_extension("db").exists(); + let force_sqlite_backend = match backend { + Backend::RawFile => false, + #[cfg(any(test, feature = "integration-tests"))] + Backend::SQLite => true, + }; + let should_migrate = has_sqlite && !read_only && !force_sqlite_backend; + if should_migrate { + info!(log, "Migrating extent {number}"); + // Truncate the file to the length of the extent data + { + let f = + OpenOptions::new().read(true).write(true).open(&path)?; + f.set_len(def.extent_size().value * def.block_size())?; + } + + // Compute supplemental data from the SQLite extent + let mut inner = extent_inner_sqlite::SqliteInner::open( + dir, def, number, read_only, log, + )?; + let ctxs = inner.export_contexts()?; + let dirty = inner.dirty()?; + let flush_number = inner.flush_number()?; + let gen_number = inner.gen_number()?; + drop(inner); + + // Reopen the file and import those changes + { + let mut f = + OpenOptions::new().read(true).write(true).open(&path)?; + extent_inner_raw::RawInner::import( + &mut f, + def, + ctxs, + dirty, + flush_number, + gen_number, + )?; + f.sync_all() + .with_context(|| format!("{path:?}: fsync failure"))?; + } + + // Remove the .db file, because our extent is now a raw extent and + // has been persisted to disk. + std::fs::remove_file(path.with_extension("db"))?; + + // Make that removal persistent by synching the parent directory + sync_path(extent_dir(dir, number), log)?; + + // We have now migrated from SQLite to raw file extents! + has_sqlite = false; + info!(log, "Done migrating extent {number}"); + } + + // Clean up spare SQLite files if this is a raw file extent. These + // deletions don't need to be durable, because we're not using their + // presence / absence for anything meaningful. + if !has_sqlite && !read_only { + for ext in ["db-shm", "db-wal"] { + let f = path.with_extension(ext); + if f.exists() { + std::fs::remove_file(f)?; + } + } + } + + // Pick the format for the downstairs files. In most cases, we will be + // using the raw extent format, but for older read-only snapshots that + // were constructed using the SQLite backend, we have to keep them + // as-is. + let inner: Box = { + if has_sqlite { + assert!(read_only || force_sqlite_backend); let inner = extent_inner_sqlite::SqliteInner::open( - &path, def, number, read_only, log, + dir, def, number, read_only, log, )?; Box::new(inner) } else { - bail!( - "db file {sqlite_path:?} for extent#{number} is not present" - ); + let inner = extent_inner_raw::RawInner::open( + dir, def, number, read_only, log, + )?; + Box::new(inner) } }; - // XXX: schema updates? - let extent = Extent { number, read_only, @@ -351,6 +469,7 @@ impl Extent { dir: &Path, def: &RegionDefinition, number: u32, + backend: Backend, ) -> Result { /* * Store extent data in files within a directory hierarchy so that @@ -368,7 +487,15 @@ impl Extent { } remove_copy_cleanup_dir(dir, number)?; - let inner = extent_inner_sqlite::SqliteInner::create(dir, def, number)?; + let inner: Box = match backend { + Backend::RawFile => { + Box::new(extent_inner_raw::RawInner::create(dir, def, number)?) + } + #[cfg(any(test, feature = "integration-tests"))] + Backend::SQLite => Box::new( + extent_inner_sqlite::SqliteInner::create(dir, def, number)?, + ), + }; /* * Complete the construction of our new extent @@ -377,7 +504,7 @@ impl Extent { number, read_only: false, iov_max: Extent::get_iov_max()?, - inner: Mutex::new(Box::new(inner)), + inner: Mutex::new(inner), }) } @@ -399,7 +526,7 @@ impl Extent { (job_id.0, self.number, requests.len() as u64) }); - let inner = self.inner.lock().await; + let mut inner = self.inner.lock().await; inner.read(job_id, requests, responses, self.iov_max)?; @@ -525,25 +652,14 @@ pub(crate) fn move_replacement_extent>( } sync_path(&original_file, log)?; + // We distinguish between SQLite-backend and raw-file extents based on the + // presence of the `.db` file. We should never do live migration across + // different extent formats; in fact, we should never live-migrate + // SQLite-backed extents at all, but must still handle the case of + // unfinished migrations. new_file.set_extension("db"); original_file.set_extension("db"); - if let Err(e) = std::fs::copy(new_file.clone(), original_file.clone()) { - crucible_bail!( - IoError, - "copy {:?} to {:?} got: {:?}", - new_file, - original_file, - e - ); - } - sync_path(&original_file, log)?; - - // The .db-shm and .db-wal files may or may not exist. If they don't - // exist on the source side, then be sure to remove them locally to - // avoid database corruption from a mismatch between old and new. - new_file.set_extension("db-shm"); - original_file.set_extension("db-shm"); - if new_file.exists() { + if original_file.exists() { if let Err(e) = std::fs::copy(new_file.clone(), original_file.clone()) { crucible_bail!( IoError, @@ -554,35 +670,57 @@ pub(crate) fn move_replacement_extent>( ); } sync_path(&original_file, log)?; - } else if original_file.exists() { - info!( - log, - "Remove old file {:?} as there is no replacement", - original_file.clone() - ); - std::fs::remove_file(&original_file)?; - } - new_file.set_extension("db-wal"); - original_file.set_extension("db-wal"); - if new_file.exists() { - if let Err(e) = std::fs::copy(new_file.clone(), original_file.clone()) { - crucible_bail!( - IoError, - "copy {:?} to {:?} got: {:?}", - new_file, - original_file, - e + // The .db-shm and .db-wal files may or may not exist. If they don't + // exist on the source side, then be sure to remove them locally to + // avoid database corruption from a mismatch between old and new. + new_file.set_extension("db-shm"); + original_file.set_extension("db-shm"); + if new_file.exists() { + if let Err(e) = + std::fs::copy(new_file.clone(), original_file.clone()) + { + crucible_bail!( + IoError, + "copy {:?} to {:?} got: {:?}", + new_file, + original_file, + e + ); + } + sync_path(&original_file, log)?; + } else if original_file.exists() { + info!( + log, + "Remove old file {:?} as there is no replacement", + original_file.clone() ); + std::fs::remove_file(&original_file)?; + } + + new_file.set_extension("db-wal"); + original_file.set_extension("db-wal"); + if new_file.exists() { + if let Err(e) = + std::fs::copy(new_file.clone(), original_file.clone()) + { + crucible_bail!( + IoError, + "copy {:?} to {:?} got: {:?}", + new_file, + original_file, + e + ); + } + sync_path(&original_file, log)?; + } else if original_file.exists() { + info!( + log, + "Remove old file {:?} as there is no replacement", + original_file.clone() + ); + std::fs::remove_file(&original_file)?; } - sync_path(&original_file, log)?; - } else if original_file.exists() { - info!( - log, - "Remove old file {:?} as there is no replacement", - original_file.clone() - ); - std::fs::remove_file(&original_file)?; } sync_path(&destination_dir, log)?; diff --git a/downstairs/src/extent_inner_raw.rs b/downstairs/src/extent_inner_raw.rs new file mode 100644 index 000000000..7f00089ad --- /dev/null +++ b/downstairs/src/extent_inner_raw.rs @@ -0,0 +1,2517 @@ +// Copyright 2023 Oxide Computer Company +use crate::{ + cdt, + extent::{ + check_input, extent_path, DownstairsBlockContext, ExtentInner, + EXTENT_META_RAW, + }, + integrity_hash, mkdir_for_file, + region::{BatchedPwritev, JobOrReconciliationId}, + Block, BlockContext, CrucibleError, JobId, ReadResponse, RegionDefinition, +}; + +use itertools::Itertools; +use serde::{Deserialize, Serialize}; +use slog::{error, Logger}; + +use std::collections::HashSet; +use std::fs::{File, OpenOptions}; +use std::io::{BufReader, IoSliceMut, Read}; +use std::os::fd::AsRawFd; +use std::path::Path; + +/// Equivalent to `DownstairsBlockContext`, but without one's own block number +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct OnDiskDownstairsBlockContext { + pub block_context: BlockContext, + pub on_disk_hash: u64, +} + +/// Equivalent to `ExtentMeta`, but ordered for efficient on-disk serialization +/// +/// In particular, the `dirty` byte is first, so it's easy to read at a known +/// offset within the file. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OnDiskMeta { + pub dirty: bool, + pub gen_number: u64, + pub flush_number: u64, + pub ext_version: u32, +} + +/// Size of backup data +/// +/// This must be large enough to fit an `Option` +/// serialized using `bincode`. +pub const BLOCK_CONTEXT_SLOT_SIZE_BYTES: u64 = 48; + +/// Size of metadata region +/// +/// This must be large enough to contain an `OnDiskMeta` serialized using +/// `bincode`. +pub const BLOCK_META_SIZE_BYTES: u64 = 32; + +/// Number of extra syscalls per read / write that triggers defragmentation +const DEFRAGMENT_THRESHOLD: u64 = 3; + +/// `RawInner` is a wrapper around a [`std::fs::File`] representing an extent +/// +/// The file is structured as follows: +/// - Block data, structured as `block_size` × `extent_size` +/// - Block contexts (for encryption). There are two arrays of context slots, +/// each containing `extent_size` elements (i.e. one slot for each block). +/// Each slot is [`BLOCK_CONTEXT_SLOT_SIZE_BYTES`] in size, so this section of +/// the file is `BLOCK_CONTEXT_SLOT_SIZE_BYTES * extent_size * 2` bytes in +/// total. The slots contain an `Option`, +/// serialized using `bincode`. +/// - Active context slots, stored as a bit-packed array (where 0 is +/// [`ContextSlot::A`] and 1 is [`ContextSlot::B`]). This array contains +/// `(extent_size + 7) / 8` bytes. It is only valid when the `dirty` bit is +/// cleared. This is an optimization that speeds up opening a clean extent +/// file; otherwise, we would have to rehash every block to find the active +/// context slot. +/// - [`BLOCK_META_SIZE_BYTES`], which contains an [`OnDiskMeta`] serialized +/// using `bincode`. The first byte of this range is `dirty`, serialized as a +/// `u8` (where `1` is dirty and `0` is clean). +/// +/// There are a few considerations that led to this particular ordering: +/// - Active context slots and metadata must be contiguous, because we want to +/// write them atomically when clearing the `dirty` flag +/// - The metadata contains an extent version (currently [`EXTENT_META_RAW`]). +/// We will eventually have multiple raw file formats, so it's convenient to +/// always place the metadata at the end; this lets us deserialize it without +/// knowing anything else about the file, then dispatch based on extent +/// version. +#[derive(Debug)] +pub struct RawInner { + file: File, + + /// Our extent number + extent_number: u32, + + /// Extent size, in blocks + extent_size: Block, + + /// Helper `struct` controlling layout within the file + layout: RawLayout, + + /// Is the `A` or `B` context slot active, on a per-block basis? + active_context: Vec, + + /// Local cache for the `dirty` value + /// + /// This allows us to only write the flag when the value changes + dirty: bool, + + /// Marks whether the given context slot is dirty + /// + /// A dirty context slot has not yet been saved to disk, and must be + /// synched before being overwritten. + /// + /// Context slots are stored as a 2-bit field, with bit 0 marking + /// `ContextSlot::A` and bit 1 marking `ContextSlot::B`. + context_slot_dirty: Vec, + + /// Total number of extra syscalls due to context slot fragmentation + extra_syscall_count: u64, + + /// Denominator corresponding to `extra_syscall_count` + extra_syscall_denominator: u64, +} + +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +enum ContextSlot { + A, + B, +} + +impl std::ops::Not for ContextSlot { + type Output = Self; + fn not(self) -> Self { + match self { + ContextSlot::A => ContextSlot::B, + ContextSlot::B => ContextSlot::A, + } + } +} + +impl ExtentInner for RawInner { + fn flush_number(&self) -> Result { + self.get_metadata().map(|v| v.flush_number) + } + + fn gen_number(&self) -> Result { + self.get_metadata().map(|v| v.gen_number) + } + + fn dirty(&self) -> Result { + Ok(self.dirty) + } + + fn write( + &mut self, + job_id: JobId, + writes: &[&crucible_protocol::Write], + only_write_unwritten: bool, + iov_max: usize, + ) -> Result<(), CrucibleError> { + // If the same block is written multiple times in a single write, then + // (1) that's weird, and (2) we need to handle it specially. To handle + // such cases, we split the `writes` slice into sub-slices of unique + // writes. + let mut seen = HashSet::new(); + let mut start = 0; + for i in 0..=writes.len() { + // If this value is a duplicate or we have reached the end of + // the list, then write everything up to this point and adjust + // our starting point and `seen` array + if i == writes.len() || !seen.insert(writes[i].offset.value) { + self.write_without_overlaps( + job_id, + &writes[start..i], + only_write_unwritten, + iov_max, + )?; + // Keep going, resetting the hashmap and start position + if i != writes.len() { + seen.clear(); + seen.insert(writes[i].offset.value); + start = i; + } + } + } + Ok(()) + } + + fn read( + &mut self, + job_id: JobId, + requests: &[&crucible_protocol::ReadRequest], + responses: &mut Vec, + iov_max: usize, + ) -> Result<(), CrucibleError> { + // This code batches up operations for contiguous regions of + // ReadRequests, so we can perform larger read syscalls queries. This + // significantly improves read throughput. + + // Keep track of the index of the first request in any contiguous run + // of requests. Of course, a "contiguous run" might just be a single + // request. + let mut req_run_start = 0; + let block_size = self.extent_size.block_size_in_bytes(); + while req_run_start < requests.len() { + let first_req = requests[req_run_start]; + + // Starting from the first request in the potential run, we scan + // forward until we find a request with a block that isn't + // contiguous with the request before it. Since we're counting + // pairs, and the number of pairs is one less than the number of + // requests, we need to add 1 to get our actual run length. + let mut n_contiguous_requests = 1; + + for request_window in requests[req_run_start..].windows(2) { + if (request_window[0].offset.value + 1 + == request_window[1].offset.value) + && ((n_contiguous_requests + 1) < iov_max) + { + n_contiguous_requests += 1; + } else { + break; + } + } + + // Create our responses and push them into the output. While we're + // at it, check for overflows. + let resp_run_start = responses.len(); + let mut iovecs = Vec::with_capacity(n_contiguous_requests); + for req in requests[req_run_start..][..n_contiguous_requests].iter() + { + let resp = ReadResponse::from_request(req, block_size as usize); + check_input(self.extent_size, req.offset, &resp.data)?; + responses.push(resp); + } + + // Create what amounts to an iovec for each response data buffer. + let mut expected_bytes = 0; + for resp in + &mut responses[resp_run_start..][..n_contiguous_requests] + { + expected_bytes += resp.data.len(); + iovecs.push(IoSliceMut::new(&mut resp.data[..])); + } + + // Finally we get to read the actual data. That's why we're here + cdt::extent__read__file__start!(|| { + (job_id.0, self.extent_number, n_contiguous_requests as u64) + }); + + // Perform the bulk read, then check against the expected number of + // bytes. We could do more robust error handling here (e.g. + // retrying in a loop), but for now, simply bailing out seems wise. + let num_bytes = nix::sys::uio::preadv( + self.file.as_raw_fd(), + &mut iovecs, + first_req.offset.value as i64 * block_size as i64, + ) + .map_err(|e| { + CrucibleError::IoError(format!( + "extent {}: read failed: {e}", + self.extent_number + )) + })?; + if num_bytes != expected_bytes { + return Err(CrucibleError::IoError(format!( + "extent {}: incomplete read \ + (expected {expected_bytes}, got {num_bytes})", + self.extent_number + ))); + } + + cdt::extent__read__file__done!(|| { + (job_id.0, self.extent_number, n_contiguous_requests as u64) + }); + + // Query the block metadata + cdt::extent__read__get__contexts__start!(|| { + (job_id.0, self.extent_number, n_contiguous_requests as u64) + }); + let block_contexts = self.get_block_contexts( + first_req.offset.value, + n_contiguous_requests as u64, + )?; + cdt::extent__read__get__contexts__done!(|| { + (job_id.0, self.extent_number, n_contiguous_requests as u64) + }); + + // Now it's time to put block contexts into the responses. + // We use into_iter here to move values out of enc_ctxts/hashes, + // avoiding a clone(). For code consistency, we use iters for the + // response and data chunks too. These iters will be the same length + // (equal to n_contiguous_requests) so zipping is fine + let resp_iter = + responses[resp_run_start..][..n_contiguous_requests].iter_mut(); + let ctx_iter = block_contexts.into_iter(); + + for (resp, r_ctx) in resp_iter.zip(ctx_iter) { + resp.block_contexts = + r_ctx.into_iter().map(|x| x.block_context).collect(); + } + + req_run_start += n_contiguous_requests; + } + + Ok(()) + } + + fn flush( + &mut self, + new_flush: u64, + new_gen: u64, + job_id: JobOrReconciliationId, + ) -> Result<(), CrucibleError> { + if !self.dirty()? { + /* + * If we have made no writes to this extent since the last flush, + * we do not need to update the extent on disk + */ + return Ok(()); + } + + cdt::extent__flush__start!(|| { + (job_id.get(), self.extent_number, 0) + }); + + // We put all of our metadata updates into a single write to make this + // operation atomic. + self.set_flush_number(new_flush, new_gen)?; + + // Now, we fsync to ensure data is flushed to disk. It's okay to crash + // before this point, because setting the flush number is atomic. + cdt::extent__flush__file__start!(|| { + (job_id.get(), self.extent_number, 0) + }); + if let Err(e) = self.file.sync_all() { + /* + * XXX Retry? Mark extent as broken? + */ + return Err(CrucibleError::IoError(format!( + "extent {}: fsync 1 failure: {e:?}", + self.extent_number, + ))); + } + self.context_slot_dirty.fill(0); + cdt::extent__flush__file__done!(|| { + (job_id.get(), self.extent_number, 0) + }); + + // Check for fragmentation in the context slots leading to worse + // performance, and defragment if that's the case. + let extra_syscalls_per_rw = self + .extra_syscall_count + .checked_div(self.extra_syscall_denominator) + .unwrap_or(0); + self.extra_syscall_count = 0; + self.extra_syscall_denominator = 0; + let r = if extra_syscalls_per_rw > DEFRAGMENT_THRESHOLD { + self.defragment() + } else { + Ok(()) + }; + + cdt::extent__flush__done!(|| { (job_id.get(), self.extent_number, 0) }); + + r + } + + #[cfg(test)] + fn set_dirty_and_block_context( + &mut self, + block_context: &DownstairsBlockContext, + ) -> Result<(), CrucibleError> { + self.set_dirty()?; + self.set_block_contexts(&[*block_context])?; + self.active_context[block_context.block as usize] = + !self.active_context[block_context.block as usize]; + Ok(()) + } + + #[cfg(test)] + fn get_block_contexts( + &mut self, + block: u64, + count: u64, + ) -> Result>, CrucibleError> { + let out = RawInner::get_block_contexts(self, block, count)?; + Ok(out.into_iter().map(|v| v.into_iter().collect()).collect()) + } +} + +impl RawInner { + /// Imports context and metadata + /// + /// Returns a buffer that must be appended to raw block data to form the + /// full raw extent file. + pub fn import( + file: &mut File, + def: &RegionDefinition, + ctxs: Vec>, + dirty: bool, + flush_number: u64, + gen_number: u64, + ) -> Result<(), CrucibleError> { + let layout = RawLayout::new(def.extent_size()); + let block_count = layout.block_count() as usize; + assert_eq!(block_count, def.extent_size().value as usize); + assert_eq!(block_count, ctxs.len()); + + file.set_len(layout.file_size())?; + layout.write_context_slots_contiguous( + file, + 0, + ctxs.iter().map(Option::as_ref), + ContextSlot::A, + )?; + layout.write_context_slots_contiguous( + file, + 0, + std::iter::repeat(None).take(block_count), + ContextSlot::B, + )?; + + layout.write_active_context_and_metadata( + file, + vec![ContextSlot::A; block_count].as_slice(), + dirty, + flush_number, + gen_number, + )?; + + Ok(()) + } + + pub fn create( + dir: &Path, + def: &RegionDefinition, + extent_number: u32, + ) -> Result { + let path = extent_path(dir, extent_number); + let extent_size = def.extent_size(); + let layout = RawLayout::new(extent_size); + let size = layout.file_size(); + + mkdir_for_file(&path)?; + let file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(&path)?; + + // All 0s are fine for everything except extent version in the metadata + file.set_len(size)?; + let mut out = Self { + file, + dirty: false, + extent_size, + layout, + extent_number, + active_context: vec![ + ContextSlot::A; // both slots are empty, so this is fine + def.extent_size().value as usize + ], + context_slot_dirty: vec![0; def.extent_size().value as usize], + extra_syscall_count: 0, + extra_syscall_denominator: 0, + }; + // Setting the flush number also writes the extent version, since + // they're serialized together in the same block. + out.set_flush_number(0, 0)?; + + // Sync the file to disk, to avoid any questions + if let Err(e) = out.file.sync_all() { + return Err(CrucibleError::IoError(format!( + "extent {}: fsync 1 failure during initial sync: {e}", + out.extent_number, + ))); + } + Ok(out) + } + + /// Constructs a new `Inner` object from files that already exist on disk + pub fn open( + dir: &Path, + def: &RegionDefinition, + extent_number: u32, + read_only: bool, + log: &Logger, + ) -> Result { + let path = extent_path(dir, extent_number); + let extent_size = def.extent_size(); + let layout = RawLayout::new(extent_size); + let size = layout.file_size(); + + /* + * Open the extent file and verify the size is as we expect. + */ + let file = + match OpenOptions::new().read(true).write(!read_only).open(&path) { + Err(e) => { + error!( + log, + "Open of {path:?} for extent#{extent_number} \ + returned: {e}", + ); + return Err(CrucibleError::IoError(format!( + "extent {extent_number}: open of {path:?} failed: {e}", + ))); + } + Ok(f) => { + let cur_size = f.metadata().unwrap().len(); + if size != cur_size { + return Err(CrucibleError::IoError(format!( + "extent {extent_number}: file size {cur_size:?} \ + does not match expected {size:?}", + ))); + } + f + } + }; + + // Just in case, let's be very sure that the file on disk is what it + // should be + if !read_only { + if let Err(e) = file.sync_all() { + return Err(CrucibleError::IoError(format!( + "extent {extent_number}: \ + fsync 1 failure during initial rehash: {e}", + ))); + } + } + + let layout = RawLayout::new(def.extent_size()); + let meta = layout.get_metadata(&file)?; + + // If the file is dirty, then we have to recompute which context slot is + // active for every block. This is slow, but can't be avoided; we + // closed the file without a flush so we can't be confident about the + // data that was on disk. + let active_context = if !meta.dirty { + // Easy case first: if it's **not** dirty, then just assign active + // slots based on the bitpacked active context buffer from the file. + layout.get_active_contexts(&file)? + } else { + // Otherwise, read block-size chunks and check hashes against + // both context slots, looking for a match. + let ctx_a = layout.read_context_slots_contiguous( + &file, + 0, + layout.block_count(), + ContextSlot::A, + )?; + let ctx_b = layout.read_context_slots_contiguous( + &file, + 0, + layout.block_count(), + ContextSlot::B, + )?; + + // Now that we've read the context slot arrays, read file data and + // figure out which context slot is active. + let mut file_buffered = BufReader::with_capacity(64 * 1024, &file); + let mut active_context = vec![]; + let mut buf = vec![0; extent_size.block_size_in_bytes() as usize]; + let mut last_seek_block = 0; + for (block, (context_a, context_b)) in + ctx_a.into_iter().zip(ctx_b).enumerate() + { + let slot = if context_a.is_none() && context_b.is_none() { + // Small optimization: if both context slots are empty, the + // block must also be empty (and we don't need to read + + // hash it, saving a little time) + // + // Note that if `context_a == context_b` but they are both + // `Some(..)`, we still want to read + hash the block to + // make sure that it matches. Otherwise, someone has managed + // to corrupt our extent file on disk, which is Bad News. + ContextSlot::A + } else { + // Otherwise, we have to compute hashes from the file. + if block != last_seek_block { + file_buffered.seek_relative( + (block - last_seek_block) as i64 + * extent_size.block_size_in_bytes() as i64, + )?; + } + file_buffered.read_exact(&mut buf)?; + last_seek_block = block + 1; // since we just read a block + let hash = integrity_hash(&[&buf]); + + let mut matching_slot = None; + let mut empty_slot = None; + + for slot in [ContextSlot::A, ContextSlot::B] { + let context = [context_a, context_b][slot as usize]; + if let Some(context) = context { + if context.on_disk_hash == hash { + matching_slot = Some(slot); + } + } else if empty_slot.is_none() { + empty_slot = Some(slot); + } + } + + matching_slot.or(empty_slot).ok_or( + CrucibleError::MissingContextSlot(block as u64), + )? + }; + active_context.push(slot); + } + active_context + }; + + Ok(Self { + file, + active_context, + dirty: meta.dirty, + extent_number, + extent_size: def.extent_size(), + layout: RawLayout::new(def.extent_size()), + context_slot_dirty: vec![0; def.extent_size().value as usize], + extra_syscall_count: 0, + extra_syscall_denominator: 0, + }) + } + + fn set_dirty(&mut self) -> Result<(), CrucibleError> { + if !self.dirty { + self.layout.set_dirty(&self.file)?; + self.dirty = true; + } + Ok(()) + } + + /// Updates `self.active_context[block]` based on data read from the file + /// + /// This returns an error if neither context slot matches the block data, + /// which should never happen (even during error conditions or after a + /// crash). + /// + /// We expect to call this function rarely, so it does not attempt to + /// minimize the number of syscalls it executes. + fn recompute_slot_from_file( + &mut self, + block: u64, + ) -> Result<(), CrucibleError> { + // Read the block data itself: + let block_size = self.extent_size.block_size_in_bytes(); + let mut buf = vec![0; block_size as usize]; + pread_all( + self.file.as_raw_fd(), + &mut buf, + (block_size as u64 * block) as i64, + ) + .map_err(|e| { + CrucibleError::IoError(format!( + "extent {}: reading block {block} data failed: {e}", + self.extent_number + )) + })?; + let hash = integrity_hash(&[&buf]); + + // Then, read the slot data and decide if either slot + // (1) is present and + // (2) has a matching hash + let mut matching_slot = None; + let mut empty_slot = None; + for slot in [ContextSlot::A, ContextSlot::B] { + // Read a single context slot, which is by definition contiguous + let mut context = self + .layout + .read_context_slots_contiguous(&self.file, block, 1, slot)?; + assert_eq!(context.len(), 1); + let context = context.pop().unwrap(); + + if let Some(context) = context { + if context.on_disk_hash == hash { + matching_slot = Some(slot); + } + } else if empty_slot.is_none() { + empty_slot = Some(slot); + } + } + let value = matching_slot + .or(empty_slot) + .ok_or(CrucibleError::MissingContextSlot(block))?; + self.active_context[block as usize] = value; + Ok(()) + } + + fn set_block_contexts( + &mut self, + block_contexts: &[DownstairsBlockContext], + ) -> Result<(), CrucibleError> { + // If any of these block contexts will be overwriting an unsynched + // context slot, then we insert a sync here. + let needs_sync = block_contexts.iter().any(|block_context| { + let block = block_context.block as usize; + // We'll be writing to the inactive slot + let slot = !self.active_context[block]; + (self.context_slot_dirty[block] & (1 << slot as usize)) != 0 + }); + if needs_sync { + self.file.sync_all().map_err(|e| { + CrucibleError::IoError(format!( + "extent {}: fsync 1 failure: {e}", + self.extent_number, + )) + })?; + self.context_slot_dirty.fill(0); + } + // Mark the to-be-written slots as unsynched on disk + // + // It's harmless if we bail out before writing the actual context slot + // here, because all it will do is force a sync next time this is called + // (that sync is right above here!) + for block_context in block_contexts { + let block = block_context.block as usize; + let slot = !self.active_context[block]; + self.context_slot_dirty[block] |= 1 << (slot as usize); + } + + let mut start = 0; + let mut write_count = 0; + for i in 0..block_contexts.len() { + if i + 1 == block_contexts.len() + || block_contexts[i].block + 1 != block_contexts[i + 1].block + { + write_count += self.set_block_contexts_contiguous( + &block_contexts[start..=i], + )?; + start = i + 1; + } + } + cdt::extent__set__block__contexts__write__count!(|| ( + self.extent_number, + write_count, + )); + Ok(()) + } + + /// Efficiently sets block contexts in bulk + /// + /// Returns the number of writes, for profiling + /// + /// # Panics + /// `block_contexts` must represent a contiguous set of blocks + fn set_block_contexts_contiguous( + &mut self, + block_contexts: &[DownstairsBlockContext], + ) -> Result { + for (a, b) in block_contexts.iter().zip(block_contexts.iter().skip(1)) { + assert_eq!(a.block + 1, b.block, "blocks must be contiguous"); + } + + let mut writes = 0u64; + for (slot, group) in block_contexts + .iter() + .group_by(|block_context| { + // We'll be writing to the inactive slot + !self.active_context[block_context.block as usize] + }) + .into_iter() + { + let mut group = group.peekable(); + let start = group.peek().unwrap().block; + self.layout.write_context_slots_contiguous( + &self.file, + start, + group.map(Option::Some), + slot, + )?; + writes += 1; + } + if let Some(writes) = writes.checked_sub(1) { + self.extra_syscall_count += writes; + self.extra_syscall_denominator += 1; + } + + Ok(writes) + } + + fn get_metadata(&self) -> Result { + self.layout.get_metadata(&self.file) + } + + /// Update the flush number, generation number, and clear the dirty bit + fn set_flush_number( + &mut self, + new_flush: u64, + new_gen: u64, + ) -> Result<(), CrucibleError> { + self.layout.write_active_context_and_metadata( + &self.file, + &self.active_context, + false, // dirty + new_flush, + new_gen, + )?; + self.dirty = false; + Ok(()) + } + + /// Returns the valid block contexts (or `None`) for the given block range + fn get_block_contexts( + &mut self, + block: u64, + count: u64, + ) -> Result>, CrucibleError> { + let mut out = vec![]; + let mut reads = 0u64; + for (slot, group) in (block..block + count) + .group_by(|block| self.active_context[*block as usize]) + .into_iter() + { + let mut group = group.peekable(); + let start = *group.peek().unwrap(); + let count = group.count(); + out.extend(self.layout.read_context_slots_contiguous( + &self.file, + start, + count as u64, + slot, + )?); + reads += 1; + } + if let Some(reads) = reads.checked_sub(1) { + self.extra_syscall_count += reads; + self.extra_syscall_denominator += 1; + } + + Ok(out) + } + + fn write_inner( + &self, + writes: &[&crucible_protocol::Write], + writes_to_skip: &HashSet, + iov_max: usize, + ) -> Result<(), CrucibleError> { + // Now, batch writes into iovecs and use pwritev to write them all out. + let mut batched_pwritev = BatchedPwritev::new( + self.file.as_raw_fd(), + writes.len(), + self.extent_size.block_size_in_bytes().into(), + iov_max, + ); + + for write in writes { + if !writes_to_skip.contains(&write.offset.value) { + batched_pwritev.add_write(write)?; + } + } + + // Write any remaining data + batched_pwritev.perform_writes()?; + + Ok(()) + } + + /// Helper function to get a single block context + /// + /// This is inefficient and should only be used in unit tests + #[cfg(test)] + fn get_block_context( + &mut self, + block: u64, + ) -> Result, CrucibleError> { + let mut out = self.get_block_contexts(block, 1)?; + assert_eq!(out.len(), 1); + Ok(out.pop().unwrap()) + } + + /// Consolidates context slots into either the A or B array + /// + /// This must only be run directly after the file is synced to disk + fn defragment(&mut self) -> Result<(), CrucibleError> { + // At this point, the active context slots (on a per-block basis) + // may be scattered across the two arrays: + // + // block | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | ... | + // context | A | A | | | | | A | A | A | ... | [A array] + // | | | B | B | B | B | | | | ... | [B array] + // + // This can be inefficient, because it means that a write would have to + // be split into updating multiple regions (instead of a single + // contiguous write). As such, if the context slots disagree, we + // "defragment" them: + // + // - Figure out whether A or B is more popular + // - Copy context data from the less-popular slot to the more-popular + // + // In the example above, `A` is more popular so we would perform the + // following copy: + // + // block | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | ... | + // context | A | A | ^ | ^ | ^ | ^ | A | A | A | ... | [A array] + // | | | B | B | B | B | | | | ... | [B array] + // + // This is safe because it occurs immediately after a flush, so we are + // dead certain that the active context matches file contents. This + // means that we can safely overwrite the old (inactive) context. + // + // We track the number of A vs B slots, as well as the range covered by + // the slots. It's that range that we'll need to read + write, so we + // want to pick whichever slot does less work. + assert!(!self.dirty); // This can only be called after a flush! + + #[derive(Copy, Clone)] + struct Counter { + count: usize, + min_block: u64, + max_block: u64, + } + let mut a_count = Counter { + count: 0, + min_block: u64::MAX, + max_block: 0, + }; + let mut b_count = a_count; + for (i, s) in self.active_context.iter().enumerate() { + let count = match s { + ContextSlot::A => &mut a_count, + ContextSlot::B => &mut b_count, + }; + count.count += 1; + count.min_block = count.min_block.min(i as u64); + count.max_block = count.max_block.max(i as u64); + } + if a_count.count == 0 || b_count.count == 0 { + return Ok(()); + } + + let (copy_from, counter) = if a_count.count < b_count.count { + (ContextSlot::A, a_count) + } else { + (ContextSlot::B, b_count) + }; + assert!(counter.count > 0); + assert!(counter.max_block >= counter.min_block); + let num_slots = counter.max_block + 1 - counter.min_block; + + // Read the source context slots from the file + let source_slots = self.layout.read_context_slots_contiguous( + &self.file, + counter.min_block, + num_slots, + copy_from, + )?; + let mut dest_slots = self.layout.read_context_slots_contiguous( + &self.file, + counter.min_block, + num_slots, + !copy_from, + )?; + + // Selectively overwrite dest with source context slots + for (i, block) in (counter.min_block..=counter.max_block).enumerate() { + let block = block as usize; + if self.active_context[block] == copy_from { + dest_slots[i] = source_slots[i]; + + // Mark this slot as unsynched, so that we don't overwrite + // it later on without a sync + self.context_slot_dirty[block] |= 1 << (!copy_from as usize); + } + } + let r = self.layout.write_context_slots_contiguous( + &self.file, + counter.min_block, + dest_slots.iter().map(|v| v.as_ref()), + !copy_from, + ); + + // If this write failed, then try recomputing every context slot + // within the unknown range + if r.is_err() { + for block in counter.min_block..=counter.max_block { + self.recompute_slot_from_file(block).unwrap(); + } + } else { + for block in counter.min_block..=counter.max_block { + self.active_context[block as usize] = !copy_from; + } + // At this point, the `dirty` bit is not set, but values in + // `self.active_context` disagree with the active context slot + // stored in the file. Normally, this is bad: if we crashed and + // reloaded the file from disk right at this moment, we'd end up + // with different values in `self.active_context`. In this case, + // though, it's okay: the values that have changed have the **same + // data** in both context slots, so it would still be a valid state + // for the file. + } + r.map(|_| ()) + } + + /// Implementation details for `ExtentInner::write` + /// + /// This function requires that `writes` not have any overlapping writes, + /// i.e. blocks that are written multiple times. We write the contexts + /// first, then block data; if a single block is written multiple times, + /// then we'd write multiple contexts, then multiple block data, and it + /// would be possible for them to get out of sync. + fn write_without_overlaps( + &mut self, + job_id: JobId, + writes: &[&crucible_protocol::Write], + only_write_unwritten: bool, + iov_max: usize, + ) -> Result<(), CrucibleError> { + /* + * In order to be crash consistent, perform the following steps in + * order: + * + * 1) set the dirty bit + * 2) for each write: + * a) write out encryption context and hashes first + * b) write out extent data second + * + * If encryption context is written after the extent data, a crash or + * interruption before extent data is written would potentially leave + * data on the disk that cannot be decrypted. + * + * If hash is written after extent data, same thing - a crash or + * interruption would leave data on disk that would fail the + * integrity hash check. + * + * Note that writing extent data here does not assume that it is + * durably on disk - the only guarantee of that is returning + * ok from fsync. The data is only potentially on disk and + * this depends on operating system implementation. + * + * To minimize the performance hit of sending many transactions to the + * filesystem, as much as possible is written at the same time. This + * means multiple loops are required. The steps now look like: + * + * 1) set the dirty bit + * 2) gather and write all encryption contexts + hashes + * 3) write all extent data + * + * If "only_write_unwritten" is true, then we only issue a write for + * a block if that block has not been written to yet. Note + * that we can have a write that is "sparse" if the range of + * blocks it contains has a mix of written an unwritten + * blocks. + * + * We define a block being written to or not has if that block has + * `Some(...)` with a matching checksum serialized into a context slot + * or not. So it is required that a written block has a checksum. + */ + + // If `only_write_written`, we need to skip writing to blocks that + // already contain data. We'll first query the metadata to see which + // blocks have hashes + let mut writes_to_skip = HashSet::new(); + if only_write_unwritten { + cdt::extent__write__get__hashes__start!(|| { + (job_id.0, self.extent_number, writes.len() as u64) + }); + let mut write_run_start = 0; + while write_run_start < writes.len() { + let first_write = writes[write_run_start]; + + // Starting from the first write in the potential run, we scan + // forward until we find a write with a block that isn't + // contiguous with the request before it. Since we're counting + // pairs, and the number of pairs is one less than the number of + // writes, we need to add 1 to get our actual run length. + let n_contiguous_writes = writes[write_run_start..] + .windows(2) + .take_while(|wr_pair| { + wr_pair[0].offset.value + 1 == wr_pair[1].offset.value + }) + .count() + + 1; + + // Query hashes for the write range. + let block_contexts = self.get_block_contexts( + first_write.offset.value, + n_contiguous_writes as u64, + )?; + + for (i, block_contexts) in block_contexts.iter().enumerate() { + if block_contexts.is_some() { + let _ = writes_to_skip + .insert(i as u64 + first_write.offset.value); + } + } + + write_run_start += n_contiguous_writes; + } + cdt::extent__write__get__hashes__done!(|| { + (job_id.0, self.extent_number, writes.len() as u64) + }); + + if writes_to_skip.len() == writes.len() { + // Nothing to do + return Ok(()); + } + } + + self.set_dirty()?; + + // Write all the context data to the raw file + // + // TODO right now we're including the integrity_hash() time in the + // measured time. Is it small enough to be ignored? + cdt::extent__write__raw__context__insert__start!(|| { + (job_id.0, self.extent_number, writes.len() as u64) + }); + + // Compute block contexts, then write them to disk + let block_ctx: Vec<_> = writes + .iter() + .filter(|write| !writes_to_skip.contains(&write.offset.value)) + .map(|write| { + // TODO it would be nice if we could profile what % of time we're + // spending on hashes locally vs writing to disk + let on_disk_hash = integrity_hash(&[&write.data[..]]); + + DownstairsBlockContext { + block_context: write.block_context, + block: write.offset.value, + on_disk_hash, + } + }) + .collect(); + + self.set_block_contexts(&block_ctx)?; + + cdt::extent__write__raw__context__insert__done!(|| { + (job_id.0, self.extent_number, writes.len() as u64) + }); + + // PERFORMANCE TODO: + // + // Something worth considering for small writes is that, based on + // my memory of conversations we had with propolis folks about what + // OSes expect out of an NVMe driver, I believe our contract with the + // upstairs doesn't require us to have the writes inside the file + // until after a flush() returns. If that is indeed true, we could + // buffer a certain amount of writes, only actually writing that + // buffer when either a flush is issued or the buffer exceeds some + // set size (based on our memory constraints). This would have + // benefits on any workload that frequently writes to the same block + // between flushes, would have benefits for small contiguous writes + // issued over multiple write commands by letting us batch them into + // a larger write, and (speculation) may benefit non-contiguous writes + // by cutting down the number of metadata writes. But, it introduces + // complexity. The time spent implementing that would probably better be + // spent switching to aio or something like that. + cdt::extent__write__file__start!(|| { + (job_id.0, self.extent_number, writes.len() as u64) + }); + + let r = self.write_inner(writes, &writes_to_skip, iov_max); + + if r.is_err() { + for write in writes.iter() { + let block = write.offset.value; + if !writes_to_skip.contains(&block) { + // Try to recompute the context slot from the file. If this + // fails, then we _really_ can't recover, so bail out + // unceremoniously. + self.recompute_slot_from_file(block).unwrap(); + } + } + } else { + // Now that writes have gone through, update active context slots + for write in writes.iter() { + let block = write.offset.value; + if !writes_to_skip.contains(&block) { + // We always write to the inactive slot, so just swap it + self.active_context[block as usize] = + !self.active_context[block as usize]; + } + } + } + + cdt::extent__write__file__done!(|| { + (job_id.0, self.extent_number, writes.len() as u64) + }); + + Ok(()) + } +} + +/// Data structure that implements the on-disk layout of a raw extent file +struct RawLayout { + extent_size: Block, +} + +impl std::fmt::Debug for RawLayout { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RawLayout") + .field("extent_size", &self.extent_size) + .finish() + } +} + +impl RawLayout { + fn new(extent_size: Block) -> Self { + RawLayout { extent_size } + } + + /// Sets the dirty flag in the file true + /// + /// This unconditionally writes to the file; to avoid extra syscalls, it + /// would be wise to cache this at a higher level and only write if it has + /// changed. + fn set_dirty(&self, file: &File) -> Result<(), CrucibleError> { + let offset = self.metadata_offset(); + pwrite_all(file.as_raw_fd(), &[1u8], offset as i64).map_err(|e| { + CrucibleError::IoError(format!("writing dirty byte failed: {e}",)) + })?; + Ok(()) + } + + /// Returns the total size of the raw data file + /// + /// This includes block data, context slots, active slot array, and metadata + fn file_size(&self) -> u64 { + let block_count = self.block_count(); + self.block_size().checked_mul(block_count).unwrap() + + BLOCK_META_SIZE_BYTES + + (block_count + 7) / 8 + + BLOCK_CONTEXT_SLOT_SIZE_BYTES * block_count * 2 + } + + /// Returns the beginning of supplementary data in the file + fn supplementary_data_offset(&self) -> u64 { + self.block_count() * self.block_size() + } + + /// Returns the byte offset of the given context slot in the file + /// + /// Contexts slots are located after block data in the extent file. There + /// are two context slots arrays, each of which contains one context slot + /// per block. We use a ping-pong strategy to ensure that one of them is + /// always valid (i.e. matching the data in the file). + fn context_slot_offset(&self, block: u64, slot: ContextSlot) -> u64 { + self.supplementary_data_offset() + + (self.block_count() * slot as u64 + block) + * BLOCK_CONTEXT_SLOT_SIZE_BYTES + } + + /// Number of blocks in the extent file + fn block_count(&self) -> u64 { + self.extent_size.value + } + + /// Returns the byte offset of the `active_context` bitpacked array + fn active_context_offset(&self) -> u64 { + self.supplementary_data_offset() + + self.block_count() * 2 * BLOCK_CONTEXT_SLOT_SIZE_BYTES + } + + fn active_context_size(&self) -> u64 { + (self.block_count() + 7) / 8 + } + + fn metadata_offset(&self) -> u64 { + self.active_context_offset() + self.active_context_size() + } + + /// Number of bytes in each block + fn block_size(&self) -> u64 { + self.extent_size.block_size_in_bytes() as u64 + } + + fn get_metadata(&self, file: &File) -> Result { + let mut buf = [0u8; BLOCK_META_SIZE_BYTES as usize]; + let offset = self.metadata_offset(); + pread_all(file.as_raw_fd(), &mut buf, offset as i64).map_err(|e| { + CrucibleError::IoError(format!("reading metadata failed: {e}")) + })?; + let out: OnDiskMeta = bincode::deserialize(&buf) + .map_err(|e| CrucibleError::BadMetadata(e.to_string()))?; + Ok(out) + } + + fn write_context_slots_contiguous<'a, I>( + &self, + file: &File, + block_start: u64, + iter: I, + slot: ContextSlot, + ) -> Result<(), CrucibleError> + where + I: Iterator>, + { + let mut buf = vec![]; + + for block_context in iter { + let n = buf.len(); + buf.extend([0u8; BLOCK_CONTEXT_SLOT_SIZE_BYTES as usize]); + let d = block_context.map(|b| OnDiskDownstairsBlockContext { + block_context: b.block_context, + on_disk_hash: b.on_disk_hash, + }); + bincode::serialize_into(&mut buf[n..], &d).unwrap(); + } + let offset = self.context_slot_offset(block_start, slot); + pwrite_all(file.as_raw_fd(), &buf, offset as i64).map_err(|e| { + CrucibleError::IoError(format!("writing context slots failed: {e}")) + })?; + Ok(()) + } + + fn read_context_slots_contiguous( + &self, + file: &File, + block_start: u64, + block_count: u64, + slot: ContextSlot, + ) -> Result>, CrucibleError> { + let mut buf = + vec![0u8; (BLOCK_CONTEXT_SLOT_SIZE_BYTES * block_count) as usize]; + + let offset = self.context_slot_offset(block_start, slot); + pread_all(file.as_raw_fd(), &mut buf, offset as i64).map_err(|e| { + CrucibleError::IoError(format!("reading context slots failed: {e}")) + })?; + + let mut out = vec![]; + for (i, chunk) in buf + .chunks_exact(BLOCK_CONTEXT_SLOT_SIZE_BYTES as usize) + .enumerate() + { + let ctx: Option = + bincode::deserialize(chunk).map_err(|e| { + CrucibleError::BadContextSlot(e.to_string()) + })?; + out.push(ctx.map(|c| DownstairsBlockContext { + block: block_start + i as u64, + block_context: c.block_context, + on_disk_hash: c.on_disk_hash, + })); + } + Ok(out) + } + + /// Write out the active context array and metadata section of the file + /// + /// This is done in a single write, so it should be atomic. + /// + /// # Panics + /// `active_context.len()` must match `self.block_count()`, and the function + /// will panic otherwise. + fn write_active_context_and_metadata( + &self, + file: &File, + active_context: &[ContextSlot], + dirty: bool, + flush_number: u64, + gen_number: u64, + ) -> Result<(), CrucibleError> { + assert_eq!(active_context.len(), self.block_count() as usize); + + // Serialize bitpacked active slot values + let mut buf = vec![]; + for c in active_context.chunks(8) { + let mut v = 0; + for (i, slot) in c.iter().enumerate() { + v |= (*slot as u8) << i; + } + buf.push(v); + } + + let d = OnDiskMeta { + dirty, + flush_number, + gen_number, + ext_version: EXTENT_META_RAW, + }; + let mut meta = [0u8; BLOCK_META_SIZE_BYTES as usize]; + bincode::serialize_into(meta.as_mut_slice(), &d).unwrap(); + buf.extend(meta); + + let offset = self.active_context_offset(); + + pwrite_all(file.as_raw_fd(), &buf, offset as i64).map_err(|e| { + CrucibleError::IoError(format!("writing metadata failed: {e}")) + })?; + + Ok(()) + } + + /// Decodes the active contexts from the given file + /// + /// The file descriptor offset is not changed by this function + fn get_active_contexts( + &self, + file: &File, + ) -> Result, CrucibleError> { + let mut buf = vec![0u8; self.active_context_size() as usize]; + let offset = self.active_context_offset(); + pread_all(file.as_raw_fd(), &mut buf, offset as i64).map_err(|e| { + CrucibleError::IoError(format!( + "could not read active contexts: {e}" + )) + })?; + + let mut active_context = vec![]; + for bit in buf + .iter() + .flat_map(|b| (0..8).map(move |i| b & (1 << i) == 0)) + .take(self.block_count() as usize) + { + // Unpack bits from each byte + active_context.push(if bit { + ContextSlot::A + } else { + ContextSlot::B + }); + } + assert_eq!(active_context.len(), self.block_count() as usize); + Ok(active_context) + } +} + +/// Call `pread` repeatedly to read an entire buffer +/// +/// Quoth the standard, +/// +/// > The value returned may be less than nbyte if the number of bytes left in +/// > the file is less than nbyte, if the read() request was interrupted by a +/// > signal, or if the file is a pipe or FIFO or special file and has fewer +/// > than nbyte bytes immediately available for reading. For example, a read() +/// > from a file associated with a terminal may return one typed line of data. +/// +/// We don't have to worry about most of these conditions, but it may be +/// possible for Crucible to be interrupted by a signal, so let's play it safe. +fn pread_all( + fd: std::os::fd::RawFd, + mut buf: &mut [u8], + mut offset: i64, +) -> Result<(), nix::errno::Errno> { + while !buf.is_empty() { + let n = nix::sys::uio::pread(fd, buf, offset)?; + offset += n as i64; + buf = &mut buf[n..]; + } + Ok(()) +} + +/// Call `pwrite` repeatedly to write an entire buffer +/// +/// See details for why this is necessary in [`pread_all`] +fn pwrite_all( + fd: std::os::fd::RawFd, + mut buf: &[u8], + mut offset: i64, +) -> Result<(), nix::errno::Errno> { + while !buf.is_empty() { + let n = nix::sys::uio::pwrite(fd, buf, offset)?; + offset += n as i64; + buf = &buf[n..]; + } + Ok(()) +} + +#[cfg(test)] +mod test { + use super::*; + use anyhow::Result; + use bytes::{Bytes, BytesMut}; + use crucible_protocol::EncryptionContext; + use crucible_protocol::ReadRequest; + use rand::Rng; + use tempfile::tempdir; + + const IOV_MAX_TEST: usize = 1000; + + fn new_region_definition() -> RegionDefinition { + let opt = crate::region::test::new_region_options(); + RegionDefinition::from_options(&opt).unwrap() + } + + #[tokio::test] + async fn encryption_context() -> Result<()> { + let dir = tempdir()?; + let mut inner = + RawInner::create(dir.as_ref(), &new_region_definition(), 0) + .unwrap(); + + // Encryption context for blocks 0 and 1 should start blank + + assert!(inner.get_block_context(0)?.is_none()); + assert!(inner.get_block_context(1)?.is_none()); + + // Set and verify block 0's context + inner.set_dirty_and_block_context(&DownstairsBlockContext { + block_context: BlockContext { + encryption_context: Some(EncryptionContext { + nonce: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12], + tag: [ + 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, + 19, + ], + }), + hash: 123, + }, + block: 0, + on_disk_hash: 456, + })?; + + let ctx = inner.get_block_context(0)?.unwrap(); + assert_eq!( + ctx.block_context.encryption_context.unwrap().nonce, + [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12] + ); + assert_eq!( + ctx.block_context.encryption_context.unwrap().tag, + [4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19] + ); + assert_eq!(ctx.block_context.hash, 123); + assert_eq!(ctx.on_disk_hash, 456); + + // Block 1 should still be blank + assert!(inner.get_block_context(1)?.is_none()); + + // Set and verify a new context for block 0 + let blob1 = rand::thread_rng().gen::<[u8; 12]>(); + let blob2 = rand::thread_rng().gen::<[u8; 16]>(); + + // Set and verify block 0's context + inner.set_dirty_and_block_context(&DownstairsBlockContext { + block_context: BlockContext { + encryption_context: Some(EncryptionContext { + nonce: blob1, + tag: blob2, + }), + hash: 1024, + }, + block: 0, + on_disk_hash: 65536, + })?; + + let ctx = inner.get_block_context(0)?.unwrap(); + + // Second context was appended + assert_eq!(ctx.block_context.encryption_context.unwrap().nonce, blob1); + assert_eq!(ctx.block_context.encryption_context.unwrap().tag, blob2); + assert_eq!(ctx.block_context.hash, 1024); + assert_eq!(ctx.on_disk_hash, 65536); + + Ok(()) + } + + #[tokio::test] + async fn multiple_context() -> Result<()> { + let dir = tempdir()?; + let mut inner = + RawInner::create(dir.as_ref(), &new_region_definition(), 0) + .unwrap(); + + // Encryption context for blocks 0 and 1 should start blank + + assert!(inner.get_block_context(0)?.is_none()); + assert!(inner.get_block_context(1)?.is_none()); + + // Set block 0's and 1's context and dirty flag + inner.set_dirty_and_block_context(&DownstairsBlockContext { + block_context: BlockContext { + encryption_context: Some(EncryptionContext { + nonce: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12], + tag: [ + 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, + 19, + ], + }), + hash: 123, + }, + block: 0, + on_disk_hash: 456, + })?; + inner.set_dirty_and_block_context(&DownstairsBlockContext { + block_context: BlockContext { + encryption_context: Some(EncryptionContext { + nonce: [4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15], + tag: [8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13], + }), + hash: 9999, + }, + block: 1, + on_disk_hash: 1234567890, + })?; + + // Verify block 0's context + let ctx = inner.get_block_context(0)?.unwrap(); + assert_eq!( + ctx.block_context.encryption_context.unwrap().nonce, + [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12] + ); + assert_eq!( + ctx.block_context.encryption_context.unwrap().tag, + [4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19] + ); + assert_eq!(ctx.block_context.hash, 123); + assert_eq!(ctx.on_disk_hash, 456); + + // Verify block 1's context + let ctx = inner.get_block_context(1)?.unwrap(); + + assert_eq!( + ctx.block_context.encryption_context.unwrap().nonce, + [4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15] + ); + assert_eq!( + ctx.block_context.encryption_context.unwrap().tag, + [8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13] + ); + assert_eq!(ctx.block_context.hash, 9999); + assert_eq!(ctx.on_disk_hash, 1234567890); + + // Return both block 0's and block 1's context, and verify + + let ctxs = inner.get_block_contexts(0, 2)?; + let ctx = ctxs[0].as_ref().unwrap(); + assert_eq!( + ctx.block_context.encryption_context.unwrap().nonce, + [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12] + ); + assert_eq!( + ctx.block_context.encryption_context.unwrap().tag, + [4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19] + ); + assert_eq!(ctx.block_context.hash, 123); + assert_eq!(ctx.on_disk_hash, 456); + + let ctx = ctxs[1].as_ref().unwrap(); + assert_eq!( + ctx.block_context.encryption_context.unwrap().nonce, + [4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15] + ); + assert_eq!( + ctx.block_context.encryption_context.unwrap().tag, + [8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13] + ); + assert_eq!(ctx.block_context.hash, 9999); + assert_eq!(ctx.on_disk_hash, 1234567890); + + // Append a whole bunch of block context rows + for i in 0..10 { + inner.set_dirty_and_block_context(&DownstairsBlockContext { + block_context: BlockContext { + encryption_context: Some(EncryptionContext { + nonce: rand::thread_rng().gen::<[u8; 12]>(), + tag: rand::thread_rng().gen::<[u8; 16]>(), + }), + hash: rand::thread_rng().gen::(), + }, + block: 0, + on_disk_hash: i, + })?; + inner.set_dirty_and_block_context(&DownstairsBlockContext { + block_context: BlockContext { + encryption_context: Some(EncryptionContext { + nonce: rand::thread_rng().gen::<[u8; 12]>(), + tag: rand::thread_rng().gen::<[u8; 16]>(), + }), + hash: rand::thread_rng().gen::(), + }, + block: 1, + on_disk_hash: i, + })?; + } + + let ctxs = inner.get_block_contexts(0, 2)?; + + assert!(ctxs[0].is_some()); + assert_eq!(ctxs[0].as_ref().unwrap().on_disk_hash, 9); + + assert!(ctxs[1].is_some()); + assert_eq!(ctxs[1].as_ref().unwrap().on_disk_hash, 9); + + Ok(()) + } + + #[test] + fn test_write_unwritten_without_flush() -> Result<()> { + let dir = tempdir()?; + let mut inner = + RawInner::create(dir.as_ref(), &new_region_definition(), 0) + .unwrap(); + + // Write a block, but don't flush. + let data = Bytes::from(vec![0x55; 512]); + let hash = integrity_hash(&[&data[..]]); + let write = crucible_protocol::Write { + eid: 0, + offset: Block::new_512(0), + data, + block_context: BlockContext { + encryption_context: None, + hash, + }, + }; + inner.write(JobId(10), &[&write], false, IOV_MAX_TEST)?; + + // The context should be in place, though we haven't flushed yet + + // Therefore, we expect that write_unwritten to the first block won't + // do anything. + { + let data = Bytes::from(vec![0x66; 512]); + let hash = integrity_hash(&[&data[..]]); + let block_context = BlockContext { + encryption_context: None, + hash, + }; + let write = crucible_protocol::Write { + eid: 0, + offset: Block::new_512(0), + data: data.clone(), + block_context, + }; + inner.write(JobId(20), &[&write], true, IOV_MAX_TEST)?; + + let mut resp = Vec::new(); + let read = ReadRequest { + eid: 0, + offset: Block::new_512(0), + }; + inner.read(JobId(21), &[&read], &mut resp, IOV_MAX_TEST)?; + + // We should not get back our data, because block 0 was written. + assert_ne!( + resp, + vec![ReadResponse { + eid: 0, + offset: Block::new_512(0), + data: BytesMut::from(data.as_ref()), + block_contexts: vec![block_context] + }] + ); + } + + // But, writing to the second block still should! + { + let data = Bytes::from(vec![0x66; 512]); + let hash = integrity_hash(&[&data[..]]); + let block_context = BlockContext { + encryption_context: None, + hash, + }; + let write = crucible_protocol::Write { + eid: 0, + offset: Block::new_512(1), + data: data.clone(), + block_context, + }; + inner.write(JobId(30), &[&write], true, IOV_MAX_TEST)?; + + let mut resp = Vec::new(); + let read = ReadRequest { + eid: 0, + offset: Block::new_512(1), + }; + inner.read(JobId(31), &[&read], &mut resp, IOV_MAX_TEST)?; + + // We should get back our data! Block 1 was never written. + assert_eq!( + resp, + vec![ReadResponse { + eid: 0, + offset: Block::new_512(1), + data: BytesMut::from(data.as_ref()), + block_contexts: vec![block_context] + }] + ); + } + + Ok(()) + } + + #[test] + fn test_auto_sync() -> Result<()> { + let dir = tempdir()?; + let mut inner = + RawInner::create(dir.as_ref(), &new_region_definition(), 0) + .unwrap(); + + // Write a block, but don't flush. + let data = Bytes::from(vec![0x55; 512]); + let hash = integrity_hash(&[&data[..]]); + let write = crucible_protocol::Write { + eid: 0, + offset: Block::new_512(0), + data, + block_context: BlockContext { + encryption_context: None, + hash, + }, + }; + + // Write block 1, so that we can notice when a sync happens. The write + // should go to slot B. + let write1 = crucible_protocol::Write { + offset: Block::new_512(1), + ..write.clone() + }; + inner.write(JobId(10), &[&write1], false, IOV_MAX_TEST)?; + assert_eq!(inner.context_slot_dirty[0], 0b00); + assert_eq!(inner.active_context[0], ContextSlot::A); + assert_eq!(inner.context_slot_dirty[1], 0b10); + assert_eq!(inner.active_context[1], ContextSlot::B); + + // The context should be written to block 0, slot B + inner.write(JobId(10), &[&write], false, IOV_MAX_TEST)?; + assert_eq!(inner.context_slot_dirty[0], 0b10); + assert_eq!(inner.active_context[0], ContextSlot::B); + assert_eq!(inner.context_slot_dirty[1], 0b10); // unchanged + assert_eq!(inner.active_context[1], ContextSlot::B); // unchanged + + // The context should be written to block 0, slot A + inner.write(JobId(11), &[&write], false, IOV_MAX_TEST)?; + assert_eq!(inner.context_slot_dirty[0], 0b11); + assert_eq!(inner.active_context[0], ContextSlot::A); + assert_eq!(inner.context_slot_dirty[1], 0b10); // unchanged + assert_eq!(inner.active_context[1], ContextSlot::B); // unchanged + + // The context should be written to slot B, forcing a sync + inner.write(JobId(12), &[&write], false, IOV_MAX_TEST)?; + assert_eq!(inner.context_slot_dirty[0], 0b10); + assert_eq!(inner.active_context[0], ContextSlot::B); + assert_eq!(inner.context_slot_dirty[1], 0b00); + assert_eq!(inner.active_context[1], ContextSlot::B); // unchanged + + Ok(()) + } + + #[test] + fn test_auto_sync_flush() -> Result<()> { + let dir = tempdir()?; + let mut inner = + RawInner::create(dir.as_ref(), &new_region_definition(), 0) + .unwrap(); + + // Write a block, but don't flush. + let data = Bytes::from(vec![0x55; 512]); + let hash = integrity_hash(&[&data[..]]); + let write = crucible_protocol::Write { + eid: 0, + offset: Block::new_512(0), + data, + block_context: BlockContext { + encryption_context: None, + hash, + }, + }; + // The context should be written to slot B + inner.write(JobId(10), &[&write], false, IOV_MAX_TEST)?; + assert_eq!(inner.active_context[0], ContextSlot::B); + assert_eq!(inner.context_slot_dirty[0], 0b10); + + // Flush! This will mark all slots as synched + inner.flush(12, 12, JobId(11).into())?; + assert_eq!(inner.active_context[0], ContextSlot::B); + assert_eq!(inner.context_slot_dirty[0], 0b00); + + // The context should be written to slot A + inner.write(JobId(11), &[&write], false, IOV_MAX_TEST)?; + assert_eq!(inner.active_context[0], ContextSlot::A); + assert_eq!(inner.context_slot_dirty[0], 0b01); + + // The context should be written to slot B + inner.write(JobId(12), &[&write], false, IOV_MAX_TEST)?; + assert_eq!(inner.active_context[0], ContextSlot::B); + assert_eq!(inner.context_slot_dirty[0], 0b11); + + // The context should be written to slot A, forcing a sync + inner.write(JobId(12), &[&write], false, IOV_MAX_TEST)?; + assert_eq!(inner.active_context[0], ContextSlot::A); + assert_eq!(inner.context_slot_dirty[0], 0b01); + + Ok(()) + } + + #[test] + fn test_auto_sync_flush_2() -> Result<()> { + let dir = tempdir()?; + let mut inner = + RawInner::create(dir.as_ref(), &new_region_definition(), 0) + .unwrap(); + + // Write a block, but don't flush. + let data = Bytes::from(vec![0x55; 512]); + let hash = integrity_hash(&[&data[..]]); + let write = crucible_protocol::Write { + eid: 0, + offset: Block::new_512(0), + data, + block_context: BlockContext { + encryption_context: None, + hash, + }, + }; + // The context should be written to slot B + inner.write(JobId(10), &[&write], false, IOV_MAX_TEST)?; + assert_eq!(inner.active_context[0], ContextSlot::B); + assert_eq!(inner.context_slot_dirty[0], 0b10); + + // The context should be written to slot A + inner.write(JobId(11), &[&write], false, IOV_MAX_TEST)?; + assert_eq!(inner.active_context[0], ContextSlot::A); + assert_eq!(inner.context_slot_dirty[0], 0b11); + + // Flush! This will mark all slots as synched + inner.flush(12, 12, JobId(11).into())?; + assert_eq!(inner.active_context[0], ContextSlot::A); + assert_eq!(inner.context_slot_dirty[0], 0b00); + + // The context should be written to slot B + inner.write(JobId(12), &[&write], false, IOV_MAX_TEST)?; + assert_eq!(inner.active_context[0], ContextSlot::B); + assert_eq!(inner.context_slot_dirty[0], 0b10); + + // The context should be written to slot A + inner.write(JobId(12), &[&write], false, IOV_MAX_TEST)?; + assert_eq!(inner.active_context[0], ContextSlot::A); + assert_eq!(inner.context_slot_dirty[0], 0b11); + + // The context should be written to slot B, forcing a sync + inner.write(JobId(11), &[&write], false, IOV_MAX_TEST)?; + assert_eq!(inner.active_context[0], ContextSlot::B); + assert_eq!(inner.context_slot_dirty[0], 0b10); + + Ok(()) + } + + /// If a write successfully put a context into a context slot, but it never + /// actually got the data onto the disk, that block should revert back to + /// being "unwritten". After all, the data never was truly written. + /// + /// This test is very similar to test_region_open_removes_partial_writes. + #[test] + fn test_reopen_marks_blocks_unwritten_if_data_never_hit_disk() -> Result<()> + { + let dir = tempdir()?; + let mut inner = + RawInner::create(dir.as_ref(), &new_region_definition(), 0) + .unwrap(); + + // Partial write, the data never hits disk, but there's a context + // in the DB and the dirty flag is set. + inner.set_dirty_and_block_context(&DownstairsBlockContext { + block_context: BlockContext { + encryption_context: None, + hash: 1024, + }, + block: 0, + on_disk_hash: 65536, + })?; + drop(inner); + + // Reopen, which should note that the hash doesn't match on-disk values + // and decide that block 0 is unwritten. + let mut inner = + RawInner::create(dir.as_ref(), &new_region_definition(), 0) + .unwrap(); + + // Writing to block 0 should succeed with only_write_unwritten + { + let data = Bytes::from(vec![0x66; 512]); + let hash = integrity_hash(&[&data[..]]); + let block_context = BlockContext { + encryption_context: None, + hash, + }; + let write = crucible_protocol::Write { + eid: 0, + offset: Block::new_512(0), + data: data.clone(), + block_context, + }; + inner.write(JobId(30), &[&write], true, IOV_MAX_TEST)?; + + let mut resp = Vec::new(); + let read = ReadRequest { + eid: 0, + offset: Block::new_512(0), + }; + inner.read(JobId(31), &[&read], &mut resp, IOV_MAX_TEST)?; + + // We should get back our data! Block 1 was never written. + assert_eq!( + resp, + vec![ReadResponse { + eid: 0, + offset: Block::new_512(0), + data: BytesMut::from(data.as_ref()), + block_contexts: vec![block_context] + }] + ); + } + + Ok(()) + } + + #[test] + fn test_defragment_full() -> Result<()> { + let dir = tempdir()?; + let mut inner = + RawInner::create(dir.as_ref(), &new_region_definition(), 0) + .unwrap(); + + // Write every other block, so that the active context slot alternates + let data = Bytes::from(vec![0x55; 512]); + let hash = integrity_hash(&[&data[..]]); + for i in 0..5 { + let write = crucible_protocol::Write { + eid: 0, + offset: Block::new_512(i * 2), + data: data.clone(), + block_context: BlockContext { + encryption_context: None, + hash, + }, + }; + inner.write(JobId(30), &[&write], false, IOV_MAX_TEST)?; + } + + for i in 0..10 { + assert_eq!( + inner.active_context[i], + if i % 2 == 0 { + ContextSlot::B + } else { + ContextSlot::A + } + ); + } + assert_eq!(inner.extra_syscall_count, 0); + assert_eq!(inner.extra_syscall_denominator, 5); + inner.flush(10, 10, JobId(10).into())?; + assert!(inner.context_slot_dirty.iter().all(|v| *v == 0)); + + // This should not have changed active context slots! + for i in 0..10 { + assert_eq!( + inner.active_context[i], + if i % 2 == 0 { + ContextSlot::B + } else { + ContextSlot::A + } + ); + } + + // Now, do one big write, which will be forced to bounce between the + // context slots. + let mut writes = vec![]; + let data = Bytes::from(vec![0x33; 512]); + let hash = integrity_hash(&[&data[..]]); + for i in 0..10 { + let write = crucible_protocol::Write { + eid: 0, + offset: Block::new_512(i), + data: data.clone(), + block_context: BlockContext { + encryption_context: None, + hash, + }, + }; + writes.push(write); + } + // This write has toggled every single context slot + let writes_ref: Vec<&_> = writes.iter().collect(); + inner.write(JobId(30), &writes_ref, false, IOV_MAX_TEST)?; + for i in 0..10 { + assert_eq!( + inner.active_context[i], + if i % 2 == 0 { + ContextSlot::A + } else { + ContextSlot::B + }, + "invalid context slot at {i}", + ); + } + assert!(inner.extra_syscall_count > 0); + assert_eq!(inner.extra_syscall_denominator, 1); + + // Do a flush! Because we had a bunch of extra syscalls, this should + // trigger defragmentation; after the flush, every context slot should + // be in array A. + inner.flush(11, 11, JobId(11).into())?; + + for i in 0..10 { + assert_eq!( + inner.active_context[i], + ContextSlot::A, + "invalid context slot at {i}", + ); + } + + Ok(()) + } + + #[test] + fn test_defragment_into_b() -> Result<()> { + let dir = tempdir()?; + let mut inner = + RawInner::create(dir.as_ref(), &new_region_definition(), 0) + .unwrap(); + + // Write every other block, so that the active context slot alternates + let data = Bytes::from(vec![0x55; 512]); + let hash = integrity_hash(&[&data[..]]); + + for i in 0..3 { + let write = crucible_protocol::Write { + eid: 0, + offset: Block::new_512(i * 2), + data: data.clone(), + block_context: BlockContext { + encryption_context: None, + hash, + }, + }; + inner.write(JobId(30), &[&write], false, IOV_MAX_TEST)?; + } + // 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 + // --|---|---|---|---|---|---|---|---|--- + // B | A | B | A | B | A | A | A | A | A + + for i in 0..10 { + assert_eq!( + inner.active_context[i], + if i % 2 == 0 && i <= 4 { + ContextSlot::B + } else { + ContextSlot::A + } + ); + } + assert_eq!(inner.extra_syscall_count, 0); + assert_eq!(inner.extra_syscall_denominator, 3); + inner.flush(10, 10, JobId(10).into())?; + + // This should not have changed active context slots! + for i in 0..10 { + assert_eq!( + inner.active_context[i], + if i % 2 == 0 && i <= 4 { + ContextSlot::B + } else { + ContextSlot::A + } + ); + } + + // Now, do one big write, which will be forced to bounce between the + // context slots. + let mut writes = vec![]; + let data = Bytes::from(vec![0x33; 512]); + let hash = integrity_hash(&[&data[..]]); + for i in 0..10 { + let write = crucible_protocol::Write { + eid: 0, + offset: Block::new_512(i), + data: data.clone(), + block_context: BlockContext { + encryption_context: None, + hash, + }, + }; + writes.push(write); + } + // This write should toggled every single context slot: + // 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 + // --|---|---|---|---|---|---|---|---|--- + // A | B | A | B | A | B | B | B | B | B + let writes_ref: Vec<&_> = writes.iter().collect(); + inner.write(JobId(30), &writes_ref, false, IOV_MAX_TEST)?; + for i in 0..10 { + assert_eq!( + inner.active_context[i], + if i % 2 == 0 && i <= 4 { + ContextSlot::A + } else { + ContextSlot::B + }, + "invalid context slot at {i}", + ); + } + assert!(inner.extra_syscall_count > 0); + assert_eq!(inner.extra_syscall_denominator, 1); + + // Do a flush! Because we had a bunch of extra syscalls, this should + // trigger defragmentation; after the flush, every context slot should + // be in array B (which minimizes copies) + inner.flush(11, 11, JobId(11).into())?; + + for i in 0..10 { + assert_eq!( + inner.active_context[i], + ContextSlot::B, + "invalid context slot at {i}", + ); + } + + Ok(()) + } + + #[test] + fn test_defragment_into_a() -> Result<()> { + // Identical to `test_defragment_a`, except that we force the + // defragmentation to copy into the A slots + let dir = tempdir()?; + let mut inner = + RawInner::create(dir.as_ref(), &new_region_definition(), 0) + .unwrap(); + + // Write every other block, so that the active context slot alternates + let data = Bytes::from(vec![0x55; 512]); + let hash = integrity_hash(&[&data[..]]); + + for i in 0..3 { + let write = crucible_protocol::Write { + eid: 0, + offset: Block::new_512(i * 2), + data: data.clone(), + block_context: BlockContext { + encryption_context: None, + hash, + }, + }; + inner.write(JobId(30), &[&write], false, IOV_MAX_TEST)?; + } + // 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 + // --|---|---|---|---|---|---|---|---|--- + // B | A | B | A | B | A | A | A | A | A + + for i in 0..10 { + assert_eq!( + inner.active_context[i], + if i % 2 == 0 && i <= 4 { + ContextSlot::B + } else { + ContextSlot::A + } + ); + } + assert_eq!(inner.extra_syscall_count, 0); + assert_eq!(inner.extra_syscall_denominator, 3); + inner.flush(10, 10, JobId(10).into())?; + + // This should not have changed active context slots! + for i in 0..10 { + assert_eq!( + inner.active_context[i], + if i % 2 == 0 && i <= 4 { + ContextSlot::B + } else { + ContextSlot::A + } + ); + } + + // Now, do two big writes, which will be forced to bounce between the + // context slots. + let mut writes = vec![]; + let data = Bytes::from(vec![0x33; 512]); + let hash = integrity_hash(&[&data[..]]); + for i in 0..10 { + let write = crucible_protocol::Write { + eid: 0, + offset: Block::new_512(i), + data: data.clone(), + block_context: BlockContext { + encryption_context: None, + hash, + }, + }; + writes.push(write); + } + let writes_ref: Vec<&_> = writes.iter().collect(); + inner.write(JobId(30), &writes_ref, false, IOV_MAX_TEST)?; + inner.write(JobId(30), &writes_ref, false, IOV_MAX_TEST)?; + // This write should toggled every single context slot: + // 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 + // --|---|---|---|---|---|---|---|---|--- + // B | A | B | A | B | A | A | A | A | A + for i in 0..10 { + assert_eq!( + inner.active_context[i], + if i % 2 == 0 && i <= 4 { + ContextSlot::B + } else { + ContextSlot::A + }, + "invalid context slot at {i}", + ); + } + assert!(inner.extra_syscall_count > 0); + assert_eq!(inner.extra_syscall_denominator, 2); + + // Do a flush! Because we had a bunch of extra syscalls, this should + // trigger defragmentation; after the flush, every context slot should + // be in array A (which minimizes copies) + inner.flush(11, 11, JobId(11).into())?; + + for i in 0..10 { + assert_eq!( + inner.active_context[i], + ContextSlot::A, + "invalid context slot at {i}", + ); + } + + Ok(()) + } + + #[test] + fn test_defragment_not_enough() -> Result<()> { + // Identical to `test_defragment_a`, except that we force the + // defragmentation to copy into the A slots + let dir = tempdir()?; + let mut inner = + RawInner::create(dir.as_ref(), &new_region_definition(), 0) + .unwrap(); + + // Write every other block, so that the active context slot alternates + let data = Bytes::from(vec![0x55; 512]); + let hash = integrity_hash(&[&data[..]]); + + for i in 0..2 { + let write = crucible_protocol::Write { + eid: 0, + offset: Block::new_512(i * 2), + data: data.clone(), + block_context: BlockContext { + encryption_context: None, + hash, + }, + }; + inner.write(JobId(30), &[&write], false, IOV_MAX_TEST)?; + } + // 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 + // --|---|---|---|---|---|---|---|---|--- + // B | A | B | A | A | A | A | A | A | A + + for i in 0..10 { + assert_eq!( + inner.active_context[i], + if i % 2 == 0 && i <= 2 { + ContextSlot::B + } else { + ContextSlot::A + } + ); + } + assert_eq!(inner.extra_syscall_count, 0); + assert_eq!(inner.extra_syscall_denominator, 2); + inner.flush(10, 10, JobId(10).into())?; + + // This should not have changed active context slots! + for i in 0..10 { + assert_eq!( + inner.active_context[i], + if i % 2 == 0 && i <= 2 { + ContextSlot::B + } else { + ContextSlot::A + } + ); + } + + // Now, do two big writes, which will be forced to bounce between the + // context slots. + let mut writes = vec![]; + let data = Bytes::from(vec![0x33; 512]); + let hash = integrity_hash(&[&data[..]]); + for i in 0..10 { + let write = crucible_protocol::Write { + eid: 0, + offset: Block::new_512(i), + data: data.clone(), + block_context: BlockContext { + encryption_context: None, + hash, + }, + }; + writes.push(write); + } + let writes_ref: Vec<&_> = writes.iter().collect(); + inner.write(JobId(30), &writes_ref, false, IOV_MAX_TEST)?; + inner.write(JobId(30), &writes_ref, false, IOV_MAX_TEST)?; + // This write should toggled every single context slot: + // 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 + // --|---|---|---|---|---|---|---|---|--- + // B | A | B | A | A | A | A | A | A | A + for i in 0..10 { + assert_eq!( + inner.active_context[i], + if i % 2 == 0 && i <= 2 { + ContextSlot::B + } else { + ContextSlot::A + }, + "invalid context slot at {i}", + ); + } + assert!(inner.extra_syscall_count > 0); + assert_eq!(inner.extra_syscall_denominator, 2); + + // This write didn't add enough extra syscalls to trigger + // defragmentation. + inner.flush(11, 11, JobId(11).into())?; + + // These should be the same! + for i in 0..10 { + assert_eq!( + inner.active_context[i], + if i % 2 == 0 && i <= 2 { + ContextSlot::B + } else { + ContextSlot::A + }, + "invalid context slot at {i}", + ); + } + + Ok(()) + } + + #[test] + fn test_serialized_sizes() { + let c = OnDiskDownstairsBlockContext { + block_context: BlockContext { + hash: u64::MAX, + encryption_context: Some(EncryptionContext { + nonce: [0xFF; 12], + tag: [0xFF; 16], + }), + }, + on_disk_hash: u64::MAX, + }; + let mut ctx_buf = [0u8; BLOCK_CONTEXT_SLOT_SIZE_BYTES as usize]; + bincode::serialize_into(ctx_buf.as_mut_slice(), &Some(c)).unwrap(); + + let m = OnDiskMeta { + dirty: true, + gen_number: u64::MAX, + flush_number: u64::MAX, + ext_version: u32::MAX, + }; + let mut meta_buf = [0u8; BLOCK_META_SIZE_BYTES as usize]; + bincode::serialize_into(meta_buf.as_mut_slice(), &Some(m)).unwrap(); + } + + /// Test that multiple writes to the same location work + #[test] + fn test_multiple_writes_to_same_location_raw() -> Result<()> { + let dir = tempdir()?; + let mut inner = + RawInner::create(dir.as_ref(), &new_region_definition(), 0) + .unwrap(); + + // Write the same block four times in the same write command. + + let writes: Vec<_> = (0..4) + .map(|i| { + let data = Bytes::from(vec![i as u8; 512]); + let hash = integrity_hash(&[&data[..]]); + + crucible_protocol::Write { + eid: 0, + offset: Block::new_512(0), + data, + block_context: BlockContext { + encryption_context: None, + hash, + }, + } + }) + .collect(); + + let write_refs: Vec<_> = writes.iter().collect(); + + assert_eq!(inner.context_slot_dirty[0], 0b00); + inner.write(JobId(30), &write_refs, false, IOV_MAX_TEST)?; + + // The write should be split into four separate calls to + // `write_without_overlaps`, triggering one bonus fsync. + assert_eq!(inner.context_slot_dirty[0], 0b11); + + // Block 0 should be 0x03 repeated. + let mut resp = Vec::new(); + let read = ReadRequest { + eid: 0, + offset: Block::new_512(0), + }; + + inner.read(JobId(31), &[&read], &mut resp, IOV_MAX_TEST)?; + + let data = Bytes::from(vec![0x03; 512]); + let hash = integrity_hash(&[&data[..]]); + let block_context = BlockContext { + encryption_context: None, + hash, + }; + + assert_eq!( + resp, + vec![ReadResponse { + eid: 0, + offset: Block::new_512(0), + data: BytesMut::from(data.as_ref()), + // Only the most recent block context should be returned + block_contexts: vec![block_context], + }] + ); + + Ok(()) + } +} diff --git a/downstairs/src/extent_inner_sqlite.rs b/downstairs/src/extent_inner_sqlite.rs index 24a9144a7..34056c7b6 100644 --- a/downstairs/src/extent_inner_sqlite.rs +++ b/downstairs/src/extent_inner_sqlite.rs @@ -1,11 +1,8 @@ // Copyright 2023 Oxide Computer Company use crate::{ cdt, crucible_bail, - extent::{ - check_input, extent_path, DownstairsBlockContext, ExtentInner, - ExtentMeta, EXTENT_META_SQLITE, - }, - integrity_hash, mkdir_for_file, + extent::{check_input, extent_path, DownstairsBlockContext, ExtentInner}, + integrity_hash, region::{BatchedPwritev, JobOrReconciliationId}, Block, BlockContext, CrucibleError, JobId, ReadResponse, RegionDefinition, }; @@ -160,7 +157,7 @@ impl ExtentInner for SqliteInner { } fn read( - &self, + &mut self, job_id: JobId, requests: &[&crucible_protocol::ReadRequest], responses: &mut Vec, @@ -394,7 +391,7 @@ impl ExtentInner for SqliteInner { let on_disk_hash = integrity_hash(&[&write.data[..]]); self.set_block_context(&DownstairsBlockContext { - block_context: write.block_context.clone(), + block_context: write.block_context, block: write.offset.value, on_disk_hash, })?; @@ -489,7 +486,7 @@ impl ExtentInner for SqliteInner { /// parent Vec contains all contexts for a single block. #[cfg(test)] fn get_block_contexts( - &self, + &mut self, block: u64, count: u64, ) -> Result>, CrucibleError> { @@ -508,6 +505,41 @@ impl ExtentInner for SqliteInner { } impl SqliteInner { + /// Exports context slots for every block in the file + /// + /// Unlike `get_block_contexts`, this function ensures that only a single + /// context per block is present (or `None`, if the block is unwritten). + pub fn export_contexts( + &mut self, + ) -> Result>> { + // Check whether we need to rehash. This is theoretically represented + // by the `dirty` bit, but we're being _somewhat_ paranoid and manually + // forcing a rehash if any blocks have multiple contexts stored. + let ctxs = self.get_block_contexts(0, self.extent_size.value)?; + let needs_rehash = ctxs.iter().any(|c| c.len() > 1); + + let mut ctxs = if needs_rehash { + // Clean up stale hashes. After this is done, each block should + // have either 0 or 1 contexts. + self.fully_rehash_and_clean_all_stale_contexts(true)?; + self.get_block_contexts(0, self.extent_size.value)? + } else { + ctxs + }; + + let mut out = vec![]; + for c in ctxs.iter_mut() { + let ctx = match c.len() { + 0 => None, + 1 => c.pop(), + i => panic!("invalid context count: {i}"), + }; + out.push(ctx); + } + + Ok(out) + } + fn get_block_contexts( &self, block: u64, @@ -558,11 +590,19 @@ impl SqliteInner { Ok(results) } + // We should never create a new SQLite-backed extent in production code, + // because we should be using raw extents everywhere. However, we'll create + // them during tests to check that our automatic migration system works. + #[cfg(any(test, feature = "integration-tests"))] pub fn create( dir: &Path, def: &RegionDefinition, extent_number: u32, ) -> Result { + use crate::{ + extent::{ExtentMeta, EXTENT_META_SQLITE}, + mkdir_for_file, + }; let mut path = extent_path(dir, extent_number); let bcount = def.extent_size().value; let size = def.block_size().checked_mul(bcount).unwrap(); @@ -709,12 +749,13 @@ impl SqliteInner { } pub fn open( - path: &Path, + dir: &Path, def: &RegionDefinition, extent_number: u32, read_only: bool, log: &Logger, ) -> Result { + let mut path = extent_path(dir, extent_number); let bcount = def.extent_size().value; let size = def.block_size().checked_mul(bcount).unwrap(); @@ -722,7 +763,7 @@ impl SqliteInner { * Open the extent file and verify the size is as we expect. */ let file = - match OpenOptions::new().read(true).write(!read_only).open(path) { + match OpenOptions::new().read(true).write(!read_only).open(&path) { Err(e) => { error!( log, @@ -749,7 +790,6 @@ impl SqliteInner { /* * Open a connection to the metadata db */ - let mut path = path.to_path_buf(); path.set_extension("db"); let metadb = match open_sqlite_connection(&path) { Err(e) => { @@ -778,7 +818,9 @@ impl SqliteInner { }; // Clean out any irrelevant block contexts, which may be present // if downstairs crashed between a write() and a flush(). - out.fully_rehash_and_clean_all_stale_contexts(false)?; + if !read_only { + out.fully_rehash_and_clean_all_stale_contexts(false)?; + } Ok(out) } @@ -1496,7 +1538,7 @@ mod test { eid: 0, offset: Block::new_512(0), data: data.clone(), - block_context: block_context.clone(), + block_context, }; inner.write(JobId(20), &[&write], true, IOV_MAX_TEST)?; @@ -1531,7 +1573,7 @@ mod test { eid: 0, offset: Block::new_512(1), data: data.clone(), - block_context: block_context.clone(), + block_context, }; inner.write(JobId(30), &[&write], true, IOV_MAX_TEST)?; @@ -1598,7 +1640,7 @@ mod test { eid: 0, offset: Block::new_512(0), data: data.clone(), - block_context: block_context.clone(), + block_context, }; inner.write(JobId(30), &[&write], true, IOV_MAX_TEST)?; diff --git a/downstairs/src/lib.rs b/downstairs/src/lib.rs index 257d190c9..2cd421568 100644 --- a/downstairs/src/lib.rs +++ b/downstairs/src/lib.rs @@ -37,6 +37,7 @@ pub mod region; pub mod repair; mod stats; +mod extent_inner_raw; mod extent_inner_sqlite; use region::Region; @@ -445,6 +446,18 @@ pub mod cdt { n_blocks: u64, ) { } + fn extent__write__raw__context__insert__start( + job_id: u64, + extent_id: u32, + extent_size: u64, + ) { + } + fn extent__write__raw__context__insert__done( + _job_id: u64, + _extent_id: u32, + extent_size: u64, + ) { + } fn extent__read__start(job_id: u64, extent_id: u32, n_blocks: u64) {} fn extent__read__done(job_id: u64, extent_id: u32, n_blocks: u64) {} fn extent__read__get__contexts__start( @@ -464,6 +477,11 @@ pub mod cdt { fn extent__context__truncate__start(n_deletions: u64) {} fn extent__context__truncate__done() {} + fn extent__set__block__contexts__write__count( + extent_id: u32, + n_blocks: u64, + ) { + } fn submit__el__close__done(_: u64) {} fn submit__el__flush__close__done(_: u64) {} fn submit__el__repair__done(_: u64) {} @@ -3012,6 +3030,18 @@ enum WrappedStream { Https(tokio_rustls::server::TlsStream), } +/// On-disk backend for downstairs storage +/// +/// Normally, we only allow the most recent backend. However, for integration +/// tests, it can be useful to create volumes using older backends. +#[derive(Copy, Clone, Debug, PartialEq)] +pub enum Backend { + RawFile, + + #[cfg(any(test, feature = "integration-tests"))] + SQLite, +} + pub async fn create_region( block_size: u64, data: PathBuf, @@ -3020,41 +3050,96 @@ pub async fn create_region( uuid: Uuid, encrypted: bool, log: Logger, +) -> Result { + create_region_with_backend( + data, + Block { + value: extent_size, + shift: block_size.trailing_zeros(), + }, + extent_count, + uuid, + encrypted, + Backend::RawFile, + log, + ) + .await +} + +pub async fn create_region_with_backend( + data: PathBuf, + extent_size: Block, + extent_count: u64, + uuid: Uuid, + encrypted: bool, + backend: Backend, + log: Logger, ) -> Result { /* * Create the region options, then the region. */ let mut region_options: crucible_common::RegionOptions = Default::default(); - region_options.set_block_size(block_size); - region_options - .set_extent_size(Block::new(extent_size, block_size.trailing_zeros())); + region_options.set_block_size(extent_size.block_size_in_bytes().into()); + region_options.set_extent_size(extent_size); region_options.set_uuid(uuid); region_options.set_encrypted(encrypted); - let mut region = Region::create(data, region_options, log).await?; + let mut region = + Region::create_with_backend(data, region_options, backend, log).await?; region.extend(extent_count as u32).await?; Ok(region) } +pub async fn build_downstairs_for_region( + data: &Path, + lossy: bool, + read_errors: bool, + write_errors: bool, + flush_errors: bool, + read_only: bool, + log_request: Option, +) -> Result>> { + build_downstairs_for_region_with_backend( + data, + lossy, + read_errors, + write_errors, + flush_errors, + read_only, + Backend::RawFile, + log_request, + ) + .await +} + // Build the downstairs struct given a region directory and some additional // needed information. If a logger is passed in, we will use that, otherwise // a logger will be created. -pub async fn build_downstairs_for_region( +#[allow(clippy::too_many_arguments)] +pub async fn build_downstairs_for_region_with_backend( data: &Path, lossy: bool, read_errors: bool, write_errors: bool, flush_errors: bool, read_only: bool, + backend: Backend, log_request: Option, ) -> Result>> { let log = match log_request { Some(log) => log, None => build_logger(), }; - let region = - Region::open(data, Default::default(), true, read_only, &log).await?; + let region = Region::open_with_backend( + data, + Default::default(), + true, + read_only, + backend, + &log, + ) + .await?; info!(log, "UUID: {:?}", region.def().uuid()); info!( diff --git a/downstairs/src/region.rs b/downstairs/src/region.rs index ed0460fad..ecd65f1eb 100644 --- a/downstairs/src/region.rs +++ b/downstairs/src/region.rs @@ -27,26 +27,13 @@ use crate::extent::{ }; /** - * Validate a list of sorted repair files. - * There are either two or four files we expect to find, any more or less - * and we have a bad list. No duplicates. + * Validate the repair file. + * + * There is only one repair file: the raw file itself (which also contains + * structured context and metadata at the end). */ pub fn validate_repair_files(eid: usize, files: &[String]) -> bool { - let eid = eid as u32; - - let some = vec![ - extent_file_name(eid, ExtentType::Data), - extent_file_name(eid, ExtentType::Db), - ]; - - let mut all = some.clone(); - all.extend(vec![ - extent_file_name(eid, ExtentType::DbShm), - extent_file_name(eid, ExtentType::DbWal), - ]); - - // Either we have some or all. - files == some || files == all + files == [extent_file_name(eid as u32, ExtentType::Data)] } /// Wrapper type for either a job or reconciliation ID @@ -99,6 +86,12 @@ pub struct Region { read_only: bool, log: Logger, + + /// Select the backend to use when creating and opening extents + /// + /// The SQLite backend is only allowed in tests; all new extents must be raw + /// files + backend: Backend, } impl Region { @@ -194,6 +187,15 @@ impl Region { dir: P, options: RegionOptions, log: Logger, + ) -> Result { + Self::create_with_backend(dir, options, Backend::RawFile, log).await + } + + pub async fn create_with_backend>( + dir: P, + options: RegionOptions, + backend: Backend, + log: Logger, ) -> Result { options.validate()?; @@ -214,9 +216,6 @@ impl Region { write_json(&cp, &def, false)?; info!(log, "Created new region file {:?}", cp); - /* - * Open every extent that presently exists. - */ let mut region = Region { dir: dir.as_ref().to_path_buf(), def, @@ -224,10 +223,9 @@ impl Region { dirty_extents: HashSet::new(), read_only: false, log, + backend, }; - region.open_extents(true).await?; - Ok(region) } @@ -240,6 +238,25 @@ impl Region { verbose: bool, read_only: bool, log: &Logger, + ) -> Result { + Self::open_with_backend( + dir, + options, + verbose, + read_only, + Backend::RawFile, + log, + ) + .await + } + + pub async fn open_with_backend>( + dir: P, + options: RegionOptions, + verbose: bool, + read_only: bool, + backend: Backend, + log: &Logger, ) -> Result { options.validate()?; @@ -299,6 +316,7 @@ impl Region { dirty_extents: HashSet::new(), read_only, log: log.clone(), + backend, }; region.open_extents(false).await?; @@ -338,13 +356,14 @@ impl Region { for eid in eid_range { let extent = if create { - Extent::create(&self.dir, &self.def, eid)? + Extent::create(&self.dir, &self.def, eid, self.backend)? } else { let extent = Extent::open( &self.dir, &self.def, eid, self.read_only, + self.backend, &self.log, )?; @@ -413,6 +432,7 @@ impl Region { &self.def, eid as u32, self.read_only, + Backend::RawFile, &self.log, )?; @@ -559,9 +579,7 @@ impl Region { ); // The repair file list should always contain the extent data - // file itself, and the .db file (metadata) for that extent. - // Missing these means the repair will not succeed. - // Optionally, there could be both .db-shm and .db-wal. + // file itself, and nothing else. if !validate_repair_files(eid, &repair_files) { crucible_bail!( RepairFilesInvalid, @@ -588,58 +606,6 @@ impl Region { }; save_stream_to_file(extent_copy, repair_stream.into_inner()).await?; - // The .db file is also required to exist for any valid extent. - let extent_db = Self::create_copy_file( - copy_dir.clone(), - eid, - Some(ExtentType::Db), - )?; - let repair_stream = match repair_server - .get_extent_file(eid as u32, FileType::Db) - .await - { - Ok(rs) => rs, - Err(e) => { - crucible_bail!( - RepairRequestError, - "Failed to get extent {} db file: {:?}", - eid, - e, - ); - } - }; - save_stream_to_file(extent_db, repair_stream.into_inner()).await?; - - // These next two are optional. - for opt_file in &[ExtentType::DbShm, ExtentType::DbWal] { - let filename = extent_file_name(eid as u32, opt_file.clone()); - - if repair_files.contains(&filename) { - let extent_shm = Self::create_copy_file( - copy_dir.clone(), - eid, - Some(opt_file.clone()), - )?; - let repair_stream = match repair_server - .get_extent_file(eid as u32, opt_file.to_file_type()) - .await - { - Ok(rs) => rs, - Err(e) => { - crucible_bail!( - RepairRequestError, - "Failed to get extent {} {} file: {:?}", - eid, - opt_file, - e, - ); - } - }; - save_stream_to_file(extent_shm, repair_stream.into_inner()) - .await?; - } - } - // After we have all files: move the repair dir. info!( self.log, @@ -1108,6 +1074,7 @@ struct BatchedPwritevState<'a> { byte_offset: u64, iovecs: Vec>, next_block_in_run: u64, + expected_bytes: usize, } pub(crate) struct BatchedPwritev<'a> { @@ -1171,6 +1138,7 @@ impl<'a> BatchedPwritev<'a> { assert_eq!(block, state.next_block_in_run); state.iovecs.push(IoSlice::new(&write.data)); state.next_block_in_run += 1; + state.expected_bytes += write.data.len(); } else { // start fresh self.state = Some(BatchedPwritevState { @@ -1180,6 +1148,7 @@ impl<'a> BatchedPwritev<'a> { iovecs.push(IoSlice::new(&write.data)); iovecs }, + expected_bytes: write.data.len(), next_block_in_run: block + 1, }); } @@ -1192,12 +1161,18 @@ impl<'a> BatchedPwritev<'a> { if let Some(state) = &mut self.state { assert!(!state.iovecs.is_empty()); - nix::sys::uio::pwritev( + let n = nix::sys::uio::pwritev( self.fd, &state.iovecs[..], state.byte_offset as i64, ) .map_err(|e| CrucibleError::IoError(e.to_string()))?; + if n != state.expected_bytes { + return Err(CrucibleError::IoError(format!( + "pwritev incomplete (expected {}, got {n} bytes)", + state.expected_bytes + ))); + } self.state = None; } @@ -1529,6 +1504,64 @@ pub(crate) mod test { // Make copy directory for this extent let cp = Region::create_copy_dir(&dir, 1)?; + // We are simulating the copy of files from the "source" repair + // extent by copying the files from extent zero into the copy + // directory. + let dest_name = extent_file_name(1, ExtentType::Data); + let source_path = extent_path(&dir, 0); + let mut dest_path = cp.clone(); + dest_path.push(dest_name); + std::fs::copy(source_path.clone(), dest_path.clone())?; + + let rd = replace_dir(&dir, 1); + rename(cp.clone(), rd.clone())?; + + // Now we have a replace directory, we verify that special + // action is taken when we (re)open the extent. + + // Reopen extent 1 + region.reopen_extent(1).await?; + + let _ext_one = region.get_opened_extent(1).await; + + // Make sure all repair directories are gone + assert!(!Path::new(&cp).exists()); + assert!(!Path::new(&rd).exists()); + + // The reopen should have replayed the repair, renamed, then + // deleted this directory. + let cd = completed_dir(&dir, 1); + assert!(!Path::new(&cd).exists()); + + Ok(()) + } + + #[tokio::test] + async fn reopen_extent_cleanup_replay_sqlite() -> Result<()> { + // Verify on extent open that a replacement directory will + // have it's contents replace an extents existing data and + // metadata files. + // Create the region, make three extents + let dir = tempdir()?; + let mut region = Region::create_with_backend( + &dir, + new_region_options(), + Backend::SQLite, + csl(), + ) + .await?; + region.extend(3).await?; + + // Close extent 1 + region.close_extent(1).await.unwrap(); + assert!(matches!( + *region.extents[1].lock().await, + ExtentState::Closed + )); + + // Make copy directory for this extent + let cp = Region::create_copy_dir(&dir, 1)?; + // We are simulating the copy of files from the "source" repair // extent by copying the files from extent zero into the copy // directory. @@ -1575,6 +1608,62 @@ pub(crate) mod test { #[tokio::test] async fn reopen_extent_cleanup_replay_short() -> Result<()> { + // test move_replacement_extent(), create a copy dir, populate it + // and let the reopen do the work. This time we make sure our + // copy dir only has the extent data file. + + // Create the region, make three extents + let dir = tempdir()?; + let mut region = + Region::create(&dir, new_region_options(), csl()).await?; + region.extend(3).await?; + + // Close extent 1 + region.close_extent(1).await.unwrap(); + assert!(matches!( + *region.extents[1].lock().await, + ExtentState::Closed + )); + + // Make copy directory for this extent + let cp = Region::create_copy_dir(&dir, 1)?; + + // We are simulating the copy of files from the "source" repair + // extent by copying the files from extent zero into the copy + // directory. + let dest_name = extent_file_name(1, ExtentType::Data); + let source_path = extent_path(&dir, 0); + let mut dest_path = cp.clone(); + dest_path.push(dest_name); + println!("cp {:?} to {:?}", source_path, dest_path); + std::fs::copy(source_path.clone(), dest_path.clone())?; + + let rd = replace_dir(&dir, 1); + rename(cp.clone(), rd.clone())?; + + // Now we have a replace directory populated and our files to + // delete are ready. We verify that special action is taken + // when we (re)open the extent. + + // Reopen extent 1 + region.reopen_extent(1).await?; + + let _ext_one = region.get_opened_extent(1).await; + + // Make sure all repair directories are gone + assert!(!Path::new(&cp).exists()); + assert!(!Path::new(&rd).exists()); + + // The reopen should have replayed the repair, renamed, then + // deleted this directory. + let cd = completed_dir(&dir, 1); + assert!(!Path::new(&cd).exists()); + + Ok(()) + } + + #[tokio::test] + async fn reopen_extent_cleanup_replay_short_sqlite() -> Result<()> { // test move_replacement_extent(), create a copy dir, populate it // and let the reopen do the work. This time we make sure our // copy dir only has extent data and .db files, and not .db-shm @@ -1582,8 +1671,13 @@ pub(crate) mod test { // extent after the reopen has cleaned them up. // Create the region, make three extents let dir = tempdir()?; - let mut region = - Region::create(&dir, new_region_options(), csl()).await?; + let mut region = Region::create_with_backend( + &dir, + new_region_options(), + Backend::SQLite, + csl(), + ) + .await?; region.extend(3).await?; // Close extent 1 @@ -1656,7 +1750,7 @@ pub(crate) mod test { } #[tokio::test] - async fn reopen_extent_no_replay_readonly() -> Result<()> { + async fn reopen_extent_no_replay_readonly_sqlite() -> Result<()> { // Verify on a read-only region a replacement directory will // be ignored. This is required by the dump command, as it would // be tragic if the command to inspect a region changed that @@ -1664,8 +1758,13 @@ pub(crate) mod test { // Create the region, make three extents let dir = tempdir()?; - let mut region = - Region::create(&dir, new_region_options(), csl()).await?; + let mut region = Region::create_with_backend( + &dir, + new_region_options(), + Backend::SQLite, + csl(), + ) + .await?; region.extend(3).await?; // Make copy directory for this extent @@ -1704,6 +1803,251 @@ pub(crate) mod test { Ok(()) } + #[tokio::test] + async fn reopen_extent_no_replay_readonly() -> Result<()> { + // Verify on a read-only region a replacement directory will + // be ignored. This is required by the dump command, as it would + // be tragic if the command to inspect a region changed that + // region's contents in the act of inspecting. + + // Create the region, make three extents + let dir = tempdir()?; + let mut region = + Region::create(&dir, new_region_options(), csl()).await?; + region.extend(3).await?; + + // Make copy directory for this extent + let _ext_one = region.get_opened_extent(1).await; + let cp = Region::create_copy_dir(&dir, 1)?; + + // We are simulating the copy of files from the "source" repair + // extent by copying the files from extent zero into the copy + // directory. + let dest_name = extent_file_name(1, ExtentType::Data); + let source_path = extent_path(&dir, 0); + let mut dest_path = cp.clone(); + dest_path.push(dest_name); + std::fs::copy(source_path.clone(), dest_path.clone())?; + + let rd = replace_dir(&dir, 1); + rename(cp, rd.clone())?; + + drop(region); + + // Open up the region read_only now. + let region = + Region::open(&dir, new_region_options(), false, true, &csl()) + .await?; + + // Verify extent 1 has opened again. + let _ext_one = region.get_opened_extent(1).await; + + // Make sure repair directory is still present + assert!(Path::new(&rd).exists()); + + Ok(()) + } + + #[tokio::test] + async fn reopen_extent_partial_migration() -> Result<()> { + let log = csl(); + let dir = tempdir()?; + let mut region = Region::create_with_backend( + &dir, + new_region_options(), + Backend::SQLite, + csl(), + ) + .await?; + region.extend(3).await?; + let ddef = region.def(); + + region + .region_write( + &[ + crucible_protocol::Write { + eid: 1, + offset: Block::new_512(0), + data: Bytes::from(vec![1u8; 512]), + block_context: BlockContext { + encryption_context: None, + hash: 8717892996238908351, // hash for all 1s + }, + }, + crucible_protocol::Write { + eid: 2, + offset: Block::new_512(0), + data: Bytes::from(vec![2u8; 512]), + block_context: BlockContext { + encryption_context: None, + hash: 2192425179333611943, // hash for all 2s + }, + }, + ], + JobId(0), + false, + ) + .await?; + drop(region); + + // Manually calculate the migration from extent 1 + let extent_file = extent_path(&dir, 1); + let mut inner = extent_inner_sqlite::SqliteInner::open( + dir.path(), + &ddef, + 1, + false, + &log, + )?; + use crate::extent::ExtentInner; + let ctxs = inner.export_contexts()?; + let dirty = inner.dirty()?; + let flush_number = inner.flush_number()?; + let gen_number = inner.gen_number()?; + drop(inner); + + let mut file = OpenOptions::new() + .read(true) + .write(true) + .open(&extent_file)?; + println!("about to call `import` on {:?}", extent_file); + extent_inner_raw::RawInner::import( + &mut file, + &ddef, + ctxs, + dirty, + flush_number, + gen_number, + )?; + println!("dooone"); + // At this point, we have manually written the file, but have not + // deleted the `.db` on disk. As such, migration should restart when + // the extent is reopened. + + let region = + Region::open(&dir, new_region_options(), true, false, &log).await?; + let out = region + .region_read( + &[ + ReadRequest { + eid: 1, + offset: Block::new_512(0), + }, + ReadRequest { + eid: 2, + offset: Block::new_512(0), + }, + ], + JobId(0), + ) + .await?; + assert_eq!(out[0].data.as_ref(), [1; 512]); + assert_eq!(out[1].data.as_ref(), [2; 512]); + + Ok(()) + } + + #[tokio::test] + async fn reopen_extent_partial_migration_corrupt() -> Result<()> { + let log = csl(); + let dir = tempdir()?; + let mut region = Region::create_with_backend( + &dir, + new_region_options(), + Backend::SQLite, + csl(), + ) + .await?; + region.extend(3).await?; + let ddef = region.def(); + + // Make some writes, which we'll check after migration + region + .region_write( + &[ + crucible_protocol::Write { + eid: 1, + offset: Block::new_512(0), + data: Bytes::from(vec![1u8; 512]), + block_context: BlockContext { + encryption_context: None, + hash: 8717892996238908351, // hash for all 1s + }, + }, + crucible_protocol::Write { + eid: 2, + offset: Block::new_512(0), + data: Bytes::from(vec![2u8; 512]), + block_context: BlockContext { + encryption_context: None, + hash: 2192425179333611943, // hash for all 2s + }, + }, + ], + JobId(0), + false, + ) + .await?; + drop(region); + + // Manually calculate the migration from extent 1, but deliberately mess + // with the context values (simulating a migration that didn't manage to + // write everything to disk). + let extent_file = extent_path(&dir, 1); + let mut inner = extent_inner_sqlite::SqliteInner::open( + dir.path(), + &ddef, + 1, + false, + &log, + )?; + use crate::extent::ExtentInner; + let ctxs = inner.export_contexts()?.into_iter().map(|_| None).collect(); + let dirty = inner.dirty()?; + let flush_number = inner.flush_number()?; + let gen_number = inner.gen_number()?; + drop(inner); + + // Stage the corrupted migration + let mut file = OpenOptions::new() + .read(true) + .write(true) + .open(&extent_file)?; + extent_inner_raw::RawInner::import( + &mut file, + &ddef, + ctxs, + dirty, + flush_number, + gen_number, + )?; + // At this point, we have manually written the file, but have not + // deleted the `.db` on disk. As such, migration should restart when + // the extent is reopened, and we should recover from corruption. + + let region = + Region::open(&dir, new_region_options(), true, false, &log).await?; + let out = region + .region_read( + &[ + ReadRequest { + eid: 1, + offset: Block::new_512(0), + }, + ReadRequest { + eid: 2, + offset: Block::new_512(0), + }, + ], + JobId(0), + ) + .await?; + assert_eq!(out[0].data.as_ref(), [1; 512]); + assert_eq!(out[1].data.as_ref(), [2; 512]); + + Ok(()) + } + #[test] fn validate_repair_files_empty() { // No repair files is a failure @@ -1713,21 +2057,8 @@ pub(crate) mod test { #[test] fn validate_repair_files_good() { // This is an expected list of files for an extent - let good_files: Vec = vec![ - "001".to_string(), - "001.db".to_string(), - "001.db-shm".to_string(), - "001.db-wal".to_string(), - ]; - - assert!(validate_repair_files(1, &good_files)); - } + let good_files: Vec = vec!["001".to_string()]; - #[test] - fn validate_repair_files_also_good() { - // This is also an expected list of files for an extent - let good_files: Vec = - vec!["001".to_string(), "001.db".to_string()]; assert!(validate_repair_files(1, &good_files)); } @@ -1739,64 +2070,20 @@ pub(crate) mod test { assert!(!validate_repair_files(2, &good_files)); } - #[test] - fn validate_repair_files_duplicate_pair() { - // duplicate file names for extent 2 - let good_files: Vec = vec![ - "002".to_string(), - "002".to_string(), - "002.db".to_string(), - "002.db".to_string(), - ]; - assert!(!validate_repair_files(2, &good_files)); - } - - #[test] - fn validate_repair_files_quad_duplicate() { - // This is an expected list of files for an extent - let good_files: Vec = vec![ - "001".to_string(), - "001.db".to_string(), - "001.db-shm".to_string(), - "001.db-shm".to_string(), - ]; - assert!(!validate_repair_files(1, &good_files)); - } - #[test] fn validate_repair_files_offbyon() { // Incorrect file names for extent 2 - let good_files: Vec = vec![ - "001".to_string(), - "001.db".to_string(), - "001.db-shm".to_string(), - "001.db-wal".to_string(), - ]; + let good_files: Vec = vec!["001".to_string()]; assert!(!validate_repair_files(2, &good_files)); } #[test] - fn validate_repair_files_too_good() { - // Duplicate file in list - let good_files: Vec = vec![ - "001".to_string(), - "001".to_string(), - "001.db".to_string(), - "001.db-shm".to_string(), - "001.db-wal".to_string(), - ]; - assert!(!validate_repair_files(1, &good_files)); - } + fn validate_repair_files_old() { + // Old extent files + let good_files: Vec = + vec!["001".to_string(), "001.db".to_string()]; - #[test] - fn validate_repair_files_not_good_enough() { - // We require 2 or 4 files, not 3 - let good_files: Vec = vec![ - "001".to_string(), - "001.db".to_string(), - "001.db-wal".to_string(), - ]; assert!(!validate_repair_files(1, &good_files)); } @@ -2022,11 +2309,13 @@ pub(crate) mod test { let mut read_from_files: Vec = Vec::with_capacity(total_size); + let extent_data_size = + (ddef.extent_size().value * ddef.block_size()) as usize; for i in 0..ddef.extent_count() { let path = extent_path(&dir, i); - let mut data = std::fs::read(path).expect("Unable to read file"); + let data = std::fs::read(path).expect("Unable to read file"); - read_from_files.append(&mut data); + read_from_files.extend(&data[..extent_data_size]); } assert_eq!(buffer.len(), read_from_files.len()); @@ -2059,6 +2348,126 @@ pub(crate) mod test { Ok(()) } + #[tokio::test] + async fn test_big_write_migrate() -> Result<()> { + let log = csl(); + let dir = tempdir()?; + let mut region = Region::create_with_backend( + &dir, + new_region_options(), + Backend::SQLite, + csl(), + ) + .await?; + region.extend(3).await?; + + let ddef = region.def(); + let total_size: usize = ddef.total_size() as usize; + let num_blocks: usize = + ddef.extent_size().value as usize * ddef.extent_count() as usize; + + // use region_write to fill region + + let mut rng = rand::thread_rng(); + let mut buffer: Vec = vec![0; total_size]; + rng.fill_bytes(&mut buffer); + + let mut writes: Vec = + Vec::with_capacity(num_blocks); + + for i in 0..num_blocks { + let eid: u64 = i as u64 / ddef.extent_size().value; + let offset: Block = + Block::new_512((i as u64) % ddef.extent_size().value); + + let data = BytesMut::from(&buffer[(i * 512)..((i + 1) * 512)]); + let data = data.freeze(); + let hash = integrity_hash(&[&data[..]]); + + writes.push(crucible_protocol::Write { + eid, + offset, + data, + block_context: BlockContext { + encryption_context: None, + hash, + }, + }); + } + + region.region_write(&writes, JobId(0), false).await?; + for i in 0..3 { + region.region_flush_extent(i, 10, 15, JobId(21)).await?; + } + let meta = region.get_opened_extent(0).await.get_meta_info().await; + assert_eq!(meta.gen_number, 10); + assert_eq!(meta.flush_number, 15); + drop(region); + + // Open the region as read-only, which doesn't trigger a migration + let region = + Region::open(&dir, new_region_options(), true, true, &log).await?; + let meta = region.get_opened_extent(0).await.get_meta_info().await; + assert_eq!(meta.gen_number, 10); + assert_eq!(meta.flush_number, 15); + + // Assert that the .db files still exist + for i in 0..3 { + assert!(extent_dir(&dir, i) + .join(extent_file_name(i, ExtentType::Db)) + .exists()); + } + + // read all using region_read + let mut requests: Vec = + Vec::with_capacity(num_blocks); + + for i in 0..num_blocks { + let eid: u64 = i as u64 / ddef.extent_size().value; + let offset: Block = + Block::new_512((i as u64) % ddef.extent_size().value); + + requests.push(crucible_protocol::ReadRequest { eid, offset }); + } + + let read_from_region: Vec = region + .region_read(&requests, JobId(0)) + .await? + .into_iter() + .flat_map(|i| i.data.to_vec()) + .collect(); + + assert_eq!(buffer.len(), read_from_region.len()); + assert_eq!(buffer, read_from_region); + drop(region); + + // Open the region as read-write, which **does** trigger a migration + let region = + Region::open(&dir, new_region_options(), true, false, &log).await?; + let meta = region.get_opened_extent(0).await.get_meta_info().await; + assert_eq!(meta.gen_number, 10); + assert_eq!(meta.flush_number, 15); + + // Assert that the .db files have been deleted during the migration + for i in 0..3 { + assert!(!extent_dir(&dir, i) + .join(extent_file_name(i, ExtentType::Db)) + .exists()); + } + let read_from_region: Vec = region + .region_read(&requests, JobId(0)) + .await? + .into_iter() + .flat_map(|i| i.data.to_vec()) + .collect(); + + assert_eq!(buffer.len(), read_from_region.len()); + assert_eq!(buffer, read_from_region); + drop(region); + + Ok(()) + } + #[tokio::test] async fn test_region_open_removes_partial_writes() -> Result<()> { // Opening a dirty extent should fully rehash the extent to remove any @@ -2102,7 +2511,7 @@ pub(crate) mod test { // Verify no block context rows exist { let ext = region.get_opened_extent(0).await; - let inner = ext.lock().await; + let mut inner = ext.lock().await; assert!(inner.get_block_contexts(0, 1)?[0].is_empty()); } @@ -2449,11 +2858,13 @@ pub(crate) mod test { // read data into File, compare what was written to buffer let mut read_from_files: Vec = Vec::with_capacity(total_size); + let extent_data_size = + (ddef.extent_size().value * ddef.block_size()) as usize; for i in 0..ddef.extent_count() { let path = extent_path(&dir, i); - let mut data = std::fs::read(path).expect("Unable to read file"); + let data = std::fs::read(path).expect("Unable to read file"); - read_from_files.append(&mut data); + read_from_files.extend(&data[..extent_data_size]); } assert_eq!(buffer, read_from_files); @@ -2567,11 +2978,13 @@ pub(crate) mod test { // read data into File, compare what was written to buffer let mut read_from_files: Vec = Vec::with_capacity(total_size); + let extent_data_size = + (ddef.extent_size().value * ddef.block_size()) as usize; for i in 0..ddef.extent_count() { let path = extent_path(&dir, i); - let mut data = std::fs::read(path).expect("Unable to read file"); + let data = std::fs::read(path).expect("Unable to read file"); - read_from_files.append(&mut data); + read_from_files.extend(&data[..extent_data_size]); } assert_eq!(buffer, read_from_files); @@ -2686,11 +3099,13 @@ pub(crate) mod test { // read data into File, compare what was written to buffer let mut read_from_files: Vec = Vec::with_capacity(total_size); + let extent_data_size = + (ddef.extent_size().value * ddef.block_size()) as usize; for i in 0..ddef.extent_count() { let path = extent_path(&dir, i); - let mut data = std::fs::read(path).expect("Unable to read file"); + let data = std::fs::read(path).expect("Unable to read file"); - read_from_files.append(&mut data); + read_from_files.extend(&data[..extent_data_size]); } assert_eq!(buffer, read_from_files); @@ -3304,7 +3719,7 @@ pub(crate) mod test { let last_writes = writes.last().unwrap(); let ext = region.get_opened_extent(0).await; - let inner = ext.lock().await; + let mut inner = ext.lock().await; for (i, range) in ranges.iter().enumerate() { // Get the contexts for the range @@ -3386,7 +3801,7 @@ pub(crate) mod test { let last_writes = writes.last().unwrap(); let ext = region.get_opened_extent(0).await; - let inner = ext.lock().await; + let mut inner = ext.lock().await; // Get the contexts for the range let ctxts = inner.get_block_contexts(0, EXTENT_SIZE)?; diff --git a/downstairs/src/repair.rs b/downstairs/src/repair.rs index 4b47c8953..3130fff33 100644 --- a/downstairs/src/repair.rs +++ b/downstairs/src/repair.rs @@ -226,12 +226,7 @@ fn extent_file_list( eid: u32, ) -> Result, HttpError> { let mut files = Vec::new(); - let possible_files = vec![ - (extent_file_name(eid, ExtentType::Data), true), - (extent_file_name(eid, ExtentType::Db), true), - (extent_file_name(eid, ExtentType::DbShm), false), - (extent_file_name(eid, ExtentType::DbWal), false), - ]; + let possible_files = vec![(extent_file_name(eid, ExtentType::Data), true)]; for (file, required) in possible_files.into_iter() { let mut fullname = extent_dir.clone(); @@ -282,7 +277,7 @@ mod test { let ed = extent_dir(&dir, 1); let mut ex_files = extent_file_list(ed, 1).unwrap(); ex_files.sort(); - let expected = vec!["001", "001.db", "001.db-shm", "001.db-wal"]; + let expected = vec!["001"]; println!("files: {:?}", ex_files); assert_eq!(ex_files, expected); @@ -290,42 +285,11 @@ mod test { } #[tokio::test] - async fn extent_expected_files_short() -> Result<()> { + async fn extent_expected_files_with_close() -> Result<()> { // Verify that the list of files returned for an extent matches // what we expect. In this case we expect the extent data file and - // the .db file, but not the .db-shm or .db-wal database files. - let dir = tempdir()?; - let mut region = - Region::create(&dir, new_region_options(), csl()).await?; - region.extend(3).await?; - - // Determine the directory and name for expected extent files. - let extent_dir = extent_dir(&dir, 1); - - // Delete db-wal and db-shm - let mut rm_file = extent_dir.clone(); - rm_file.push(extent_file_name(1, ExtentType::Data)); - rm_file.set_extension("db-wal"); - std::fs::remove_file(&rm_file).unwrap(); - rm_file.set_extension("db-shm"); - std::fs::remove_file(rm_file).unwrap(); - - let mut ex_files = extent_file_list(extent_dir, 1).unwrap(); - ex_files.sort(); - let expected = vec!["001", "001.db"]; - println!("files: {:?}", ex_files); - assert_eq!(ex_files, expected); - - Ok(()) - } - - #[tokio::test] - async fn extent_expected_files_short_with_close() -> Result<()> { - // Verify that the list of files returned for an extent matches - // what we expect. In this case we expect the extent data file and - // the .db file, but not the .db-shm or .db-wal database files. - // We close the extent here first, and on illumos that behaves - // a little different than elsewhere. + // nothing else. We close the extent here first, and on illumos that + // behaves a little different than elsewhere. let dir = tempdir()?; let mut region = Region::create(&dir, new_region_options(), csl()).await?; @@ -336,18 +300,9 @@ mod test { // Determine the directory and name for expected extent files. let extent_dir = extent_dir(&dir, 1); - // Delete db-wal and db-shm. On illumos the close of the extent - // may remove these for us, so we ignore errors on the removal. - let mut rm_file = extent_dir.clone(); - rm_file.push(extent_file_name(1, ExtentType::Data)); - rm_file.set_extension("db-wal"); - let _ = std::fs::remove_file(&rm_file); - rm_file.set_extension("db-shm"); - let _ = std::fs::remove_file(rm_file); - let mut ex_files = extent_file_list(extent_dir, 1).unwrap(); ex_files.sort(); - let expected = vec!["001", "001.db"]; + let expected = vec!["001"]; println!("files: {:?}", ex_files); assert_eq!(ex_files, expected); @@ -356,29 +311,6 @@ mod test { #[tokio::test] async fn extent_expected_files_fail() -> Result<()> { - // Verify that we get an error if the expected extent.db file - // is missing. - let dir = tempdir()?; - let mut region = - Region::create(&dir, new_region_options(), csl()).await?; - region.extend(3).await?; - - // Determine the directory and name for expected extent files. - let extent_dir = extent_dir(&dir, 2); - - // Delete db - let mut rm_file = extent_dir.clone(); - rm_file.push(extent_file_name(2, ExtentType::Data)); - rm_file.set_extension("db"); - std::fs::remove_file(&rm_file).unwrap(); - - assert!(extent_file_list(extent_dir, 2).is_err()); - - Ok(()) - } - - #[tokio::test] - async fn extent_expected_files_fail_two() -> Result<()> { // Verify that we get an error if the expected extent file // is missing. let dir = tempdir()?; @@ -389,7 +321,7 @@ mod test { // Determine the directory and name for expected extent files. let extent_dir = extent_dir(&dir, 1); - // Delete db + // Delete the extent file let mut rm_file = extent_dir.clone(); rm_file.push(extent_file_name(1, ExtentType::Data)); std::fs::remove_file(&rm_file).unwrap(); diff --git a/integration_tests/Cargo.toml b/integration_tests/Cargo.toml index 68b8e405c..113173f12 100644 --- a/integration_tests/Cargo.toml +++ b/integration_tests/Cargo.toml @@ -16,7 +16,7 @@ bytes.workspace = true crucible-client-types.workspace = true # importantly, don't use features = ["zfs_snapshot"] here, this will cause # cleanup issues! -crucible-downstairs.workspace = true +crucible-downstairs = { workspace = true, features = ["integration-tests"] } crucible-pantry-client.workspace = true crucible-pantry.workspace = true crucible.workspace = true diff --git a/integration_tests/src/lib.rs b/integration_tests/src/lib.rs index aae0fea73..4c90d3760 100644 --- a/integration_tests/src/lib.rs +++ b/integration_tests/src/lib.rs @@ -44,30 +44,35 @@ mod test { blocks_per_extent: u64, extent_count: u32, problematic: bool, + backend: Backend, ) -> Result { let tempdir = tempfile::Builder::new() .prefix(&"downstairs-") .rand_bytes(8) .tempdir()?; - let _region = create_region( - 512, /* block_size */ + let _region = create_region_with_backend( tempdir.path().to_path_buf(), - blocks_per_extent, + Block { + value: blocks_per_extent, + shift: 9, + }, extent_count.into(), Uuid::new_v4(), encrypted, + backend, csl(), ) .await?; - let downstairs = build_downstairs_for_region( + let downstairs = build_downstairs_for_region_with_backend( tempdir.path(), problematic, /* lossy */ problematic, /* read errors */ problematic, /* write errors */ problematic, /* flush errors */ read_only, + backend, Some(csl()), ) .await?; @@ -118,6 +123,33 @@ mod test { Ok(()) } + pub async fn reboot_read_write(&mut self) -> Result<()> { + self.downstairs = build_downstairs_for_region( + self.tempdir.path(), + false, /* lossy */ + false, /* read errors */ + false, /* write errors */ + false, /* flush errors */ + false, + Some(csl()), + ) + .await?; + + let _join_handle = start_downstairs( + self.downstairs.clone(), + self.address, + None, /* oximeter */ + 0, /* any port */ + 0, /* any rport */ + None, /* cert_pem */ + None, /* key_pem */ + None, /* root_cert_pem */ + ) + .await?; + + Ok(()) + } + pub async fn address(&self) -> SocketAddr { // If start_downstairs returned Ok, then address will be populated self.downstairs.lock().await.address.unwrap() @@ -145,6 +177,24 @@ mod test { blocks_per_extent, extent_count, false, + Backend::RawFile, + ) + .await + } + + /// Spin off three SQLite downstairs, with a 5120b region + pub async fn small_sqlite( + read_only: bool, + ) -> Result { + // 5 * 2 * 512 = 5120b + let blocks_per_extent = 5; + let extent_count = 2; + TestDownstairsSet::new_with_flag( + read_only, + blocks_per_extent, + extent_count, + false, + Backend::SQLite, ) .await } @@ -159,6 +209,7 @@ mod test { blocks_per_extent, extent_count, false, + Backend::RawFile, ) .await } @@ -173,6 +224,7 @@ mod test { blocks_per_extent, extent_count, true, // problematic + Backend::RawFile, ) .await } @@ -183,6 +235,7 @@ mod test { blocks_per_extent: u64, extent_count: u32, problematic: bool, + backend: Backend, ) -> Result { let downstairs1 = TestDownstairs::new( "127.0.0.1".parse()?, @@ -191,6 +244,7 @@ mod test { blocks_per_extent, extent_count, problematic, + backend, ) .await?; let downstairs2 = TestDownstairs::new( @@ -200,6 +254,7 @@ mod test { blocks_per_extent, extent_count, problematic, + backend, ) .await?; let downstairs3 = TestDownstairs::new( @@ -209,6 +264,7 @@ mod test { blocks_per_extent, extent_count, problematic, + backend, ) .await?; @@ -273,6 +329,21 @@ mod test { Ok(()) } + pub async fn reboot_read_write(&mut self) -> Result<()> { + assert!(!self.crucible_opts.read_only); + self.downstairs1.reboot_read_write().await?; + self.downstairs2.reboot_read_write().await?; + self.downstairs3.reboot_read_write().await?; + + self.crucible_opts.target = vec![ + self.downstairs1.address().await, + self.downstairs2.address().await, + self.downstairs3.address().await, + ]; + + Ok(()) + } + pub async fn new_downstairs(&self) -> Result { TestDownstairs::new( "127.0.0.1".parse()?, @@ -281,6 +352,7 @@ mod test { self.blocks_per_extent, self.extent_count, false, + Backend::RawFile, ) .await } @@ -2237,6 +2309,281 @@ mod test { Ok(()) } + #[tokio::test] + async fn integration_test_sqlite_backed_vol() -> Result<()> { + // Test using an old SQLite backend as a read-only parent. + + const BLOCK_SIZE: usize = 512; + + // boot three downstairs, write some data to them, then change to + // read-only. + let mut test_downstairs_set = + TestDownstairsSet::small_sqlite(false).await?; + // This must be a SQLite extent! + assert!(test_downstairs_set + .downstairs1 + .tempdir + .path() + .join("00/000/000.db") + .exists()); + + let mut volume = Volume::new(BLOCK_SIZE as u64, csl()); + volume + .add_subvolume_create_guest( + test_downstairs_set.opts(), + volume::RegionExtentInfo { + block_size: BLOCK_SIZE as u64, + blocks_per_extent: test_downstairs_set.blocks_per_extent(), + extent_count: test_downstairs_set.extent_count(), + }, + 1, + None, + ) + .await?; + + volume.activate().await?; + + let random_buffer = { + let mut random_buffer = + vec![0u8; volume.total_size().await? as usize]; + rand::thread_rng().fill(&mut random_buffer[..]); + random_buffer + }; + + volume + .write( + Block::new(0, BLOCK_SIZE.trailing_zeros()), + Bytes::from(random_buffer.clone()), + ) + .await?; + + volume.deactivate().await?; + + drop(volume); + + test_downstairs_set.reboot_read_only().await?; + // This must still be a SQLite backend! + assert!(test_downstairs_set + .downstairs1 + .tempdir + .path() + .join("00/000/000.db") + .exists()); + + // Validate that this now accepts reads and flushes, but rejects writes + { + let mut volume = Volume::new(BLOCK_SIZE as u64, csl()); + volume + .add_subvolume_create_guest( + test_downstairs_set.opts(), + volume::RegionExtentInfo { + block_size: BLOCK_SIZE as u64, + blocks_per_extent: test_downstairs_set + .blocks_per_extent(), + extent_count: test_downstairs_set.extent_count(), + }, + 2, + None, + ) + .await?; + + volume.activate().await?; + + let buffer = Buffer::new(volume.total_size().await? as usize); + volume + .read( + Block::new(0, BLOCK_SIZE.trailing_zeros()), + buffer.clone(), + ) + .await?; + + assert_eq!(*buffer.as_vec().await, random_buffer); + + assert!(volume + .write( + Block::new(0, BLOCK_SIZE.trailing_zeros()), + Bytes::from(vec![0u8; BLOCK_SIZE]), + ) + .await + .is_err()); + + volume.flush(None).await?; + } + + // create a new volume, layering a new set of downstairs on top of the + // read-only one we just (re)booted + let top_layer_tds = TestDownstairsSet::small(false).await?; + let top_layer_opts = top_layer_tds.opts(); + let bottom_layer_opts = test_downstairs_set.opts(); + // The new volume is **not** using the SQLite backend! + assert!(!top_layer_tds + .downstairs1 + .tempdir + .path() + .join("00/000/000.db") + .exists()); + + let vcr: VolumeConstructionRequest = + VolumeConstructionRequest::Volume { + id: Uuid::new_v4(), + block_size: BLOCK_SIZE as u64, + sub_volumes: vec![VolumeConstructionRequest::Region { + block_size: BLOCK_SIZE as u64, + blocks_per_extent: top_layer_tds.blocks_per_extent(), + extent_count: top_layer_tds.extent_count(), + opts: top_layer_opts, + gen: 3, + }], + read_only_parent: Some(Box::new( + VolumeConstructionRequest::Volume { + id: Uuid::new_v4(), + block_size: BLOCK_SIZE as u64, + sub_volumes: vec![VolumeConstructionRequest::Region { + block_size: BLOCK_SIZE as u64, + blocks_per_extent: test_downstairs_set + .blocks_per_extent(), + extent_count: test_downstairs_set.extent_count(), + opts: bottom_layer_opts, + gen: 3, + }], + read_only_parent: None, + }, + )), + }; + + let volume = Volume::construct(vcr, None, csl()).await?; + volume.activate().await?; + + // Validate that source blocks originally come from the read-only parent + { + let buffer = Buffer::new(volume.total_size().await? as usize); + volume + .read( + Block::new(0, BLOCK_SIZE.trailing_zeros()), + buffer.clone(), + ) + .await?; + + assert_eq!(*buffer.as_vec().await, random_buffer); + } + + // Validate a flush works + volume.flush(None).await?; + + // Write one block of 0x00 in, validate with a read + volume + .write( + Block::new(0, BLOCK_SIZE.trailing_zeros()), + Bytes::from(vec![0u8; BLOCK_SIZE]), + ) + .await?; + + { + let buffer = Buffer::new(volume.total_size().await? as usize); + volume + .read( + Block::new(0, BLOCK_SIZE.trailing_zeros()), + buffer.clone(), + ) + .await?; + + let buffer_vec = buffer.as_vec().await; + + assert_eq!(buffer_vec[..BLOCK_SIZE], vec![0u8; BLOCK_SIZE]); + assert_eq!(buffer_vec[BLOCK_SIZE..], random_buffer[BLOCK_SIZE..]); + } + + // Validate a flush still works + volume.flush(None).await?; + + Ok(()) + } + + #[tokio::test] + async fn integration_test_sqlite_migration() -> Result<()> { + const BLOCK_SIZE: usize = 512; + + // boot three downstairs, write some data to them, then reopen as + // read-write (which will automatically migrate the extent) + let mut test_downstairs_set = + TestDownstairsSet::small_sqlite(false).await?; + // This must be a SQLite extent! + assert!(test_downstairs_set + .downstairs1 + .tempdir + .path() + .join("00/000/000.db") + .exists()); + + let mut volume = Volume::new(BLOCK_SIZE as u64, csl()); + volume + .add_subvolume_create_guest( + test_downstairs_set.opts(), + volume::RegionExtentInfo { + block_size: BLOCK_SIZE as u64, + blocks_per_extent: test_downstairs_set.blocks_per_extent(), + extent_count: test_downstairs_set.extent_count(), + }, + 1, + None, + ) + .await?; + + volume.activate().await?; + + let random_buffer = { + let mut random_buffer = + vec![0u8; volume.total_size().await? as usize]; + rand::thread_rng().fill(&mut random_buffer[..]); + random_buffer + }; + + volume + .write( + Block::new(0, BLOCK_SIZE.trailing_zeros()), + Bytes::from(random_buffer.clone()), + ) + .await?; + + volume.deactivate().await?; + + drop(volume); + + test_downstairs_set.reboot_read_write().await?; + // This should now be migrated, and the DB file should be deleted + assert!(!test_downstairs_set + .downstairs1 + .tempdir + .path() + .join("00/000/000.db") + .exists()); + + let mut volume = Volume::new(BLOCK_SIZE as u64, csl()); + volume + .add_subvolume_create_guest( + test_downstairs_set.opts(), + volume::RegionExtentInfo { + block_size: BLOCK_SIZE as u64, + blocks_per_extent: test_downstairs_set.blocks_per_extent(), + extent_count: test_downstairs_set.extent_count(), + }, + 2, + None, + ) + .await?; + + volume.activate().await?; + // Validate that source blocks are the same + let buffer = Buffer::new(volume.total_size().await? as usize); + volume + .read(Block::new(0, BLOCK_SIZE.trailing_zeros()), buffer.clone()) + .await?; + + assert_eq!(*buffer.as_vec().await, random_buffer); + + Ok(()) + } + #[tokio::test] async fn integration_test_volume_replace_downstairs() -> Result<()> { // Replace a downstairs with a new one diff --git a/protocol/src/lib.rs b/protocol/src/lib.rs index 7d0223ac9..9c559b810 100644 --- a/protocol/src/lib.rs +++ b/protocol/src/lib.rs @@ -157,7 +157,7 @@ impl ReadResponse { } #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] +#[derive(Copy, Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct BlockContext { /// If this is a non-encrypted write, then the integrity hasher has the /// data as an input: @@ -183,7 +183,7 @@ pub struct BlockContext { } #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] +#[derive(Copy, Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct EncryptionContext { pub nonce: [u8; 12], pub tag: [u8; 16], @@ -241,6 +241,12 @@ pub struct SnapshotDetails { #[repr(u32)] #[derive(IntoPrimitive)] pub enum MessageVersion { + /// Switched to raw file extents + /// + /// The message format remains the same, but live repair between SQLite and + /// raw file extents is not possible. + V5 = 5, + /// Added ErrorReport V4 = 4, @@ -255,7 +261,7 @@ pub enum MessageVersion { } impl MessageVersion { pub const fn current() -> Self { - Self::V4 + Self::V5 } } @@ -264,7 +270,7 @@ impl MessageVersion { * This, along with the MessageVersion enum above should be updated whenever * changes are made to the Message enum below. */ -pub const CRUCIBLE_MESSAGE_VERSION: u32 = 4; +pub const CRUCIBLE_MESSAGE_VERSION: u32 = 5; /* * If you add or change the Message enum, you must also increment the