From 542aaeea0a0277035f782221f4167fdd96e33663 Mon Sep 17 00:00:00 2001 From: Matt Keeter Date: Mon, 11 Nov 2024 16:35:51 -0500 Subject: [PATCH] Use semaphores for global backpressure, instead of delays --- cmon/src/main.rs | 8 - tools/dtrace/single_up_info.d | 3 +- tools/dtrace/sled_upstairs_info.d | 3 +- tools/dtrace/upstairs_info.d | 5 +- upstairs/src/backpressure.rs | 375 ------------------------- upstairs/src/client.rs | 56 ++-- upstairs/src/deferred.rs | 11 +- upstairs/src/downstairs.rs | 163 +++++------ upstairs/src/dummy_downstairs_tests.rs | 3 +- upstairs/src/guest.rs | 137 +++++---- upstairs/src/io_limits.rs | 158 +++++++++++ upstairs/src/lib.rs | 45 ++- upstairs/src/upstairs.rs | 264 ++++++++++++----- 13 files changed, 551 insertions(+), 680 deletions(-) delete mode 100644 upstairs/src/backpressure.rs create mode 100644 upstairs/src/io_limits.rs diff --git a/cmon/src/main.rs b/cmon/src/main.rs index 4f3fe1952..1f3595808 100644 --- a/cmon/src/main.rs +++ b/cmon/src/main.rs @@ -41,7 +41,6 @@ enum DtraceDisplay { Replaced, ExtentLiveRepair, ExtentLimit, - Backpressure, NextJobId, JobDelta, DsDelay, @@ -61,7 +60,6 @@ impl fmt::Display for DtraceDisplay { DtraceDisplay::Replaced => write!(f, "replaced"), DtraceDisplay::ExtentLiveRepair => write!(f, "extent_live_repair"), DtraceDisplay::ExtentLimit => write!(f, "extent_under_repair"), - DtraceDisplay::Backpressure => write!(f, "backpressure"), DtraceDisplay::NextJobId => write!(f, "next_job_id"), DtraceDisplay::JobDelta => write!(f, "job_delta"), DtraceDisplay::DsDelay => write!(f, "ds_delay"), @@ -229,9 +227,6 @@ fn print_dtrace_header(dd: &[DtraceDisplay]) { DtraceDisplay::ExtentLimit => { print!(" {:>4}", "EXTL"); } - DtraceDisplay::Backpressure => { - print!(" {:>5}", "BAKPR"); - } DtraceDisplay::NextJobId => { print!(" {:>7}", "NEXTJOB"); } @@ -348,9 +343,6 @@ fn print_dtrace_row(d_out: Arg, dd: &[DtraceDisplay], last_job_id: &mut u64) { DtraceDisplay::ExtentLimit => { print!(" {:4}", d_out.ds_extent_limit); } - DtraceDisplay::Backpressure => { - print!(" {:>5}", d_out.up_backpressure); - } DtraceDisplay::NextJobId => { print!(" {:>7}", d_out.next_job_id); } diff --git a/tools/dtrace/single_up_info.d b/tools/dtrace/single_up_info.d index 0eb1f4e1b..476edb8eb 100755 --- a/tools/dtrace/single_up_info.d +++ b/tools/dtrace/single_up_info.d @@ -41,7 +41,7 @@ crucible_upstairs*:::up-status * I'm not very happy about this, but if we don't print it all on one * line, then multiple sessions will clobber each others output. */ - printf("%8s %17s %17s %17s %5s %5s %9s %5s %10s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s\n", + printf("%8s %17s %17s %17s %5s %5s %5s %10s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s\n", substr(session_id, 0, 8), @@ -62,7 +62,6 @@ crucible_upstairs*:::up-status * Job ID delta and backpressure */ json(copyinstr(arg1), "ok.next_job_id"), - json(copyinstr(arg1), "ok.up_backpressure"), json(copyinstr(arg1), "ok.write_bytes_out"), /* diff --git a/tools/dtrace/sled_upstairs_info.d b/tools/dtrace/sled_upstairs_info.d index 277b8757e..e1ec9b256 100755 --- a/tools/dtrace/sled_upstairs_info.d +++ b/tools/dtrace/sled_upstairs_info.d @@ -46,7 +46,7 @@ crucible_upstairs*:::up-status * we don't print it all on one line, then multiple sessions will * clobber each others output. */ - printf("%5d %8s %17s %17s %17s %5s %5s %9s %5s %10s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s\n", + printf("%5d %8s %17s %17s %17s %5s %5s %5s %10s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s\n", pid, substr(session_id, 0, 8), @@ -62,7 +62,6 @@ crucible_upstairs*:::up-status /* Job ID and backpressure */ json(copyinstr(arg1), "ok.next_job_id"), - json(copyinstr(arg1), "ok.up_backpressure"), json(copyinstr(arg1), "ok.write_bytes_out"), /* In progress jobs on the work list for each downstairs */ diff --git a/tools/dtrace/upstairs_info.d b/tools/dtrace/upstairs_info.d index 64294fc2c..5cb4fc178 100755 --- a/tools/dtrace/upstairs_info.d +++ b/tools/dtrace/upstairs_info.d @@ -21,7 +21,7 @@ tick-1s { printf("%6s ", "PID"); printf("%17s %17s %17s", "DS STATE 0", "DS STATE 1", "DS STATE 2"); - printf(" %5s %5s %9s %5s", "UPW", "DSW", "JOBID", "BAKPR"); + printf(" %5s %5s %9s", "UPW", "DSW", "JOBID"); printf(" %10s", "WRITE_BO"); printf(" %5s %5s %5s", "IP0", "IP1", "IP2"); printf(" %5s %5s %5s", "D0", "D1", "D2"); @@ -49,10 +49,9 @@ crucible_upstairs*:::up-status printf(" %5s", json(copyinstr(arg1), "ok.ds_count")); /* - * Job ID and backpressure + * Job ID and outstanding bytes */ printf(" %9s", json(copyinstr(arg1), "ok.next_job_id")); - printf(" %5s", json(copyinstr(arg1), "ok.up_backpressure")); printf(" %10s", json(copyinstr(arg1), "ok.write_bytes_out")); /* diff --git a/upstairs/src/backpressure.rs b/upstairs/src/backpressure.rs deleted file mode 100644 index feaf4a33f..000000000 --- a/upstairs/src/backpressure.rs +++ /dev/null @@ -1,375 +0,0 @@ -// Copyright 2024 Oxide Computer Company - -use crate::{IOop, IO_OUTSTANDING_MAX_BYTES, IO_OUTSTANDING_MAX_JOBS}; -use std::{ - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, - }, - time::Duration, -}; - -/// Helper struct to contain a set of backpressure counters -#[derive(Debug)] -pub struct BackpressureCounters(Arc); - -/// Inner data structure for individual backpressure counters -#[derive(Debug)] -struct BackpressureCountersInner { - /// Number of bytes from `Write` and `WriteUnwritten` operations - /// - /// This value is used for global backpressure, to avoid buffering too many - /// writes (which otherwise return immediately, and are not persistent until - /// a flush) - write_bytes: AtomicU64, - - /// Number of jobs in the queue - /// - /// This value is also used for global backpressure - // XXX should we only count write jobs here? Or should we also count read - // bytes for global backpressure? Much to ponder... - jobs: AtomicU64, - - /// Number of bytes from `Write`, `WriteUnwritten`, and `Read` operations - /// - /// This value is used for local backpressure, to keep the 3x Downstairs - /// roughly in sync. Otherwise, the fastest Downstairs will answer all read - /// requests, and the others can get arbitrarily far behind. - io_bytes: AtomicU64, -} - -/// Guard to automatically decrement backpressure bytes when dropped -#[derive(Debug)] -pub struct BackpressureGuard { - counter: Arc, - write_bytes: u64, - io_bytes: u64, - // There's also an implicit "1 job" here -} - -impl Drop for BackpressureGuard { - fn drop(&mut self) { - self.counter - .write_bytes - .fetch_sub(self.write_bytes, Ordering::Relaxed); - self.counter - .io_bytes - .fetch_sub(self.io_bytes, Ordering::Relaxed); - self.counter.jobs.fetch_sub(1, Ordering::Relaxed); - } -} - -impl BackpressureGuard { - #[cfg(test)] - pub fn dummy() -> Self { - let counter = Arc::new(BackpressureCountersInner { - write_bytes: 0.into(), - io_bytes: 0.into(), - jobs: 1.into(), - }); - Self { - counter, - write_bytes: 0, - io_bytes: 0, - } - } -} - -impl BackpressureCounters { - pub fn new() -> Self { - Self(Arc::new(BackpressureCountersInner { - write_bytes: AtomicU64::new(0), - io_bytes: AtomicU64::new(0), - jobs: AtomicU64::new(0), - })) - } - - pub fn get_write_bytes(&self) -> u64 { - self.0.write_bytes.load(Ordering::Relaxed) - } - - pub fn get_io_bytes(&self) -> u64 { - self.0.io_bytes.load(Ordering::Relaxed) - } - - pub fn get_jobs(&self) -> u64 { - self.0.jobs.load(Ordering::Relaxed) - } - - /// Stores write / IO bytes (and 1 job) for a pending write - #[must_use] - pub fn early_write_increment(&mut self, bytes: u64) -> BackpressureGuard { - self.0.write_bytes.fetch_add(bytes, Ordering::Relaxed); - self.0.io_bytes.fetch_add(bytes, Ordering::Relaxed); - self.0.jobs.fetch_add(1, Ordering::Relaxed); - BackpressureGuard { - counter: self.0.clone(), - write_bytes: bytes, - io_bytes: bytes, - // implicit 1 job - } - } - - /// Stores write / IO bytes (and 1 job) in the backpressure counters - #[must_use] - pub fn increment(&mut self, io: &IOop) -> BackpressureGuard { - let write_bytes = io.write_bytes(); - let io_bytes = io.job_bytes(); - self.0.write_bytes.fetch_add(write_bytes, Ordering::Relaxed); - self.0.io_bytes.fetch_add(io_bytes, Ordering::Relaxed); - self.0.jobs.fetch_add(1, Ordering::Relaxed); - BackpressureGuard { - counter: self.0.clone(), - write_bytes, - io_bytes, - // implicit 1 job - } - } -} - -/// Configuration for host-side backpressure -/// -/// Backpressure adds an artificial delay to host write messages (which are -/// otherwise acked immediately, before actually being complete). The delay is -/// varied based on two metrics: -/// -/// - number of write bytes outstanding -/// - queue length (in jobs) -/// -/// We compute backpressure delay based on both metrics, then pick the larger of -/// the two delays. -#[derive(Copy, Clone, Debug)] -pub struct BackpressureConfig { - pub bytes: BackpressureChannelConfig, - pub queue: BackpressureChannelConfig, -} - -impl Default for BackpressureConfig { - fn default() -> BackpressureConfig { - // `max_value` values below must be higher than `IO_OUTSTANDING_MAX_*`; - // otherwise, replaying jobs to a previously-Offline Downstairs could - // immediately kick us into the saturated regime, which would be - // unfortunate. - BackpressureConfig { - // Byte-based backpressure - bytes: BackpressureChannelConfig { - start_value: 50 * 1024u64.pow(2), // 50 MiB - max_value: IO_OUTSTANDING_MAX_BYTES * 2, - delay_scale: Duration::from_millis(100), - delay_max: Duration::from_millis(30_000), - }, - - // Queue-based backpressure - queue: BackpressureChannelConfig { - start_value: 500, - max_value: IO_OUTSTANDING_MAX_JOBS as u64 * 2, - delay_scale: Duration::from_millis(5), - delay_max: Duration::from_millis(30_000), - }, - } - } -} - -#[derive(Copy, Clone, Debug)] -pub struct BackpressureChannelConfig { - /// When should backpressure start - pub start_value: u64, - /// Value at which backpressure is saturated - pub max_value: u64, - - /// Characteristic scale of backpressure - /// - /// This scale sets the backpressure delay halfway between `start`_value and - /// `max_value` - pub delay_scale: Duration, - - /// Maximum delay (returned at `max_value`) - pub delay_max: Duration, -} - -#[derive(Copy, Clone, Debug, Eq, PartialEq)] -pub enum BackpressureAmount { - Duration(Duration), - Saturated, -} - -impl std::cmp::Ord for BackpressureAmount { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - match (self, other) { - (BackpressureAmount::Saturated, BackpressureAmount::Saturated) => { - std::cmp::Ordering::Equal - } - (BackpressureAmount::Saturated, _) => std::cmp::Ordering::Greater, - (_, BackpressureAmount::Saturated) => std::cmp::Ordering::Less, - ( - BackpressureAmount::Duration(a), - BackpressureAmount::Duration(b), - ) => a.cmp(b), - } - } -} - -impl std::cmp::PartialOrd for BackpressureAmount { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl BackpressureAmount { - /// Converts to a delay in microseconds - /// - /// The saturated case is marked by `u64::MAX` - pub fn as_micros(&self) -> u64 { - match self { - BackpressureAmount::Duration(t) => t.as_micros() as u64, - BackpressureAmount::Saturated => u64::MAX, - } - } -} - -/// Helper `struct` to store a shared backpressure amount -/// -/// Under the hood, this stores a value in microseconds in an `AtomicU64`, so it -/// can be read and written atomically. `BackpressureAmount::Saturated` is -/// indicated by `u64::MAX`. -#[derive(Clone, Debug)] -pub struct SharedBackpressureAmount(Arc); - -impl SharedBackpressureAmount { - pub fn new() -> Self { - Self(Arc::new(AtomicU64::new(0))) - } - - pub fn store(&self, b: BackpressureAmount) { - let v = match b { - BackpressureAmount::Duration(d) => d.as_micros() as u64, - BackpressureAmount::Saturated => u64::MAX, - }; - self.0.store(v, Ordering::Relaxed); - } - - pub fn load(&self) -> BackpressureAmount { - match self.0.load(Ordering::Relaxed) { - u64::MAX => BackpressureAmount::Saturated, - delay_us => { - BackpressureAmount::Duration(Duration::from_micros(delay_us)) - } - } - } -} - -impl BackpressureChannelConfig { - fn get_backpressure(&self, value: u64) -> BackpressureAmount { - // Return a special value if we're saturated - if value >= self.max_value { - return BackpressureAmount::Saturated; - } - - // This ratio starts at 0 (at start_value) and hits 1 when backpressure - // should be maxed out. - let frac = value.saturating_sub(self.start_value) as f64 - / (self.max_value - self.start_value) as f64; - - let v = if frac < 0.5 { - frac * 2.0 - } else { - 1.0 / (2.0 * (1.0 - frac)) - }; - - BackpressureAmount::Duration( - self.delay_scale.mul_f64(v.powi(2)).min(self.delay_max), - ) - } -} - -impl BackpressureConfig { - pub fn get_backpressure( - &self, - bytes: u64, - jobs: u64, - ) -> BackpressureAmount { - let bp_bytes = self.bytes.get_backpressure(bytes); - let bp_queue = self.queue.get_backpressure(jobs); - bp_bytes.max(bp_queue) - } -} - -#[cfg(test)] -mod test { - use super::*; - - /// Confirm that replaying the max number of jobs / bytes will not saturate - #[test] - fn check_outstanding_backpressure() { - let cfg = BackpressureConfig::default(); - let BackpressureAmount::Duration(_) = - cfg.get_backpressure(IO_OUTSTANDING_MAX_BYTES, 0) - else { - panic!("backpressure saturated") - }; - - let BackpressureAmount::Duration(_) = - cfg.get_backpressure(0, IO_OUTSTANDING_MAX_JOBS as u64) - else { - panic!("backpressure saturated") - }; - } - - #[test] - fn check_max_backpressure() { - let cfg = BackpressureConfig::default(); - let BackpressureAmount::Duration(timeout) = cfg - .get_backpressure(IO_OUTSTANDING_MAX_BYTES * 2 - 1024u64.pow(2), 0) - else { - panic!("backpressure saturated") - }; - println!( - "max byte-based delay: {}", - humantime::format_duration(timeout) - ); - assert!( - timeout >= Duration::from_secs(30), - "max byte-based backpressure delay is too low; - expected >= 30 secs, got {}", - humantime::format_duration(timeout) - ); - - let BackpressureAmount::Duration(timeout) = - cfg.get_backpressure(0, IO_OUTSTANDING_MAX_JOBS as u64 * 2 - 1) - else { - panic!("backpressure saturated") - }; - println!( - "max job-based delay: {}", - humantime::format_duration(timeout) - ); - assert!( - timeout >= Duration::from_secs(30), - "max job-based backpressure delay is too low; - expected >= 30 secs, got {}", - humantime::format_duration(timeout) - ); - } - - #[test] - fn check_saturated_backpressure() { - let cfg = BackpressureConfig::default(); - assert!(matches!( - cfg.get_backpressure(IO_OUTSTANDING_MAX_BYTES * 2 + 1, 0), - BackpressureAmount::Saturated - )); - assert!(matches!( - cfg.get_backpressure(IO_OUTSTANDING_MAX_BYTES * 2, 0), - BackpressureAmount::Saturated - )); - - assert!(matches!( - cfg.get_backpressure(0, IO_OUTSTANDING_MAX_JOBS as u64 * 2 + 1), - BackpressureAmount::Saturated - )); - assert!(matches!( - cfg.get_backpressure(0, IO_OUTSTANDING_MAX_JOBS as u64 * 2), - BackpressureAmount::Saturated - )); - } -} diff --git a/upstairs/src/client.rs b/upstairs/src/client.rs index b5dde9d61..59406787f 100644 --- a/upstairs/src/client.rs +++ b/upstairs/src/client.rs @@ -1,9 +1,9 @@ // Copyright 2023 Oxide Computer Company use crate::{ - backpressure::BackpressureCounters, cdt, integrity_hash, - live_repair::ExtentInfo, upstairs::UpstairsConfig, upstairs::UpstairsState, - ClientIOStateCount, ClientId, CrucibleDecoder, CrucibleError, DownstairsIO, - DsState, EncryptionContext, IOState, IOop, JobId, Message, RawReadResponse, + cdt, integrity_hash, io_limits::ClientIOLimits, live_repair::ExtentInfo, + upstairs::UpstairsConfig, upstairs::UpstairsState, ClientIOStateCount, + ClientId, CrucibleDecoder, CrucibleError, DownstairsIO, DsState, + EncryptionContext, IOState, IOop, JobId, Message, RawReadResponse, ReconcileIO, ReconcileIOState, RegionDefinitionStatus, RegionMetadata, }; use crucible_common::{x509::TLSContext, ExtentId, VerboseTimeout}; @@ -122,11 +122,8 @@ pub(crate) struct DownstairsClient { /// Number of bytes associated with each IO state io_state_byte_count: ClientIOStateCount, - /// Jobs, write bytes, and total IO bytes in this client's queue - /// - /// These values are used for both global and local (per-client) - /// backpressure. - pub(crate) backpressure_counters: BackpressureCounters, + /// Absolute IO limits for this client + io_limits: ClientIOLimits, /// UUID for this downstairs region /// @@ -200,6 +197,7 @@ impl DownstairsClient { client_id: ClientId, cfg: Arc, target_addr: Option, + io_limits: ClientIOLimits, log: Logger, tls_context: Option>, ) -> Self { @@ -216,6 +214,7 @@ impl DownstairsClient { &log, ), client_id, + io_limits, region_uuid: None, needs_replay: false, negotiation_state: NegotiationState::Start, @@ -232,7 +231,6 @@ impl DownstairsClient { repair_info: None, io_state_job_count: ClientIOStateCount::default(), io_state_byte_count: ClientIOStateCount::default(), - backpressure_counters: BackpressureCounters::new(), connection_id: ConnectionId(0), client_delay_us, } @@ -256,6 +254,10 @@ impl DownstairsClient { cfg, client_task: Self::new_dummy_task(false), client_id: ClientId::new(0), + io_limits: ClientIOLimits::new( + crate::IO_OUTSTANDING_MAX_JOBS * 3 / 2, + crate::IO_OUTSTANDING_MAX_BYTES as usize * 3 / 2, + ), region_uuid: None, needs_replay: false, negotiation_state: NegotiationState::Start, @@ -272,7 +274,6 @@ impl DownstairsClient { repair_info: None, io_state_job_count: ClientIOStateCount::default(), io_state_byte_count: ClientIOStateCount::default(), - backpressure_counters: BackpressureCounters::new(), connection_id: ConnectionId(0), client_delay_us, } @@ -359,19 +360,24 @@ impl DownstairsClient { // Update our bytes-in-flight counter if was_running && !is_running { // Because the job is no longer running, it shouldn't count for - // backpressure. Remove the backpressure guard for this client, - // which decrements backpressure counters on drop. - job.backpressure_guard.take(&self.client_id); - } else if is_running - && !was_running - && !job.backpressure_guard.contains(&self.client_id) - { - // This should only happen if a job is replayed, but that still - // counts! - job.backpressure_guard.insert( - self.client_id, - self.backpressure_counters.increment(&job.work), - ); + // backpressure or IO limits. Remove the backpressure guard for + // this client, which decrements backpressure counters on drop. + job.io_limits.take(&self.client_id); + } else if is_running && !was_running { + match self.io_limits.try_claim(job.work.job_bytes() as u32) { + Ok(g) => { + job.io_limits.insert(self.client_id, g); + } + Err(e) => { + // We can't handle the case of "running out of permits + // during replay", because waiting for a permit would + // deadlock the worker task. Log the error and continue. + warn!( + self.log, + "could not claim IO permits when replaying job: {e:?}" + ) + } + } } old_state @@ -2245,7 +2251,7 @@ impl DownstairsClient { } pub(crate) fn total_bytes_outstanding(&self) -> usize { - self.backpressure_counters.get_io_bytes() as usize + self.io_state_byte_count.in_progress as usize } /// Returns a unique ID for the current connection, or `None` diff --git a/upstairs/src/deferred.rs b/upstairs/src/deferred.rs index cbe43932e..301575a64 100644 --- a/upstairs/src/deferred.rs +++ b/upstairs/src/deferred.rs @@ -3,9 +3,8 @@ use std::sync::Arc; use crate::{ - backpressure::BackpressureGuard, client::ConnectionId, - upstairs::UpstairsConfig, BlockContext, BlockOp, ClientData, ClientId, - ImpactedBlocks, Message, RawWrite, + client::ConnectionId, io_limits::IOLimitGuard, upstairs::UpstairsConfig, + BlockContext, BlockOp, ClientId, ImpactedBlocks, Message, RawWrite, }; use bytes::BytesMut; use crucible_common::{integrity_hash, CrucibleError, RegionDefinition}; @@ -114,7 +113,7 @@ pub(crate) struct DeferredWrite { pub data: BytesMut, pub is_write_unwritten: bool, pub cfg: Arc, - pub guard: ClientData, + pub io_guard: IOLimitGuard, } /// Result of a deferred `BlockOp` @@ -135,7 +134,7 @@ pub(crate) struct EncryptedWrite { pub data: RawWrite, pub impacted_blocks: ImpactedBlocks, pub is_write_unwritten: bool, - pub guard: ClientData, + pub io_guard: IOLimitGuard, } impl DeferredWrite { @@ -183,7 +182,7 @@ impl DeferredWrite { data, impacted_blocks: self.impacted_blocks, is_write_unwritten: self.is_write_unwritten, - guard: self.guard, + io_guard: self.io_guard, } } } diff --git a/upstairs/src/downstairs.rs b/upstairs/src/downstairs.rs index 16b97774c..cc0a94e71 100644 --- a/upstairs/src/downstairs.rs +++ b/upstairs/src/downstairs.rs @@ -7,10 +7,10 @@ use std::{ }; use crate::{ - backpressure::BackpressureGuard, cdt, client::{ClientAction, ClientStopReason, DownstairsClient, EnqueueResult}, guest::GuestBlockRes, + io_limits::{IOLimitGuard, IOLimits}, live_repair::ExtentInfo, stats::DownstairsStatOuter, upstairs::{UpstairsConfig, UpstairsState}, @@ -270,6 +270,7 @@ impl Downstairs { ds_target: ClientMap, tls_context: Option>, stats: DownstairsStatOuter, + io_limits: &IOLimits, log: Logger, ) -> Self { let mut clients = [None, None, None]; @@ -278,6 +279,7 @@ impl Downstairs { i, cfg.clone(), ds_target.get(&i).copied(), + io_limits.client_limits(i), log.new(o!("client" => i.get().to_string())), tls_context.clone(), )); @@ -349,7 +351,12 @@ impl Downstairs { }); let stats = DownstairsStatOuter::default(); - let mut ds = Self::new(cfg, ClientMap::new(), None, stats, log); + // Build a set of fake IO limits that we won't hit + let io_limits = IOLimits::new(u32::MAX as usize, u32::MAX as usize); + + let mut ds = + Self::new(cfg, ClientMap::new(), None, stats, &io_limits, log); + // Create a fake repair address so this field is populated. for cid in ClientId::iter() { ds.clients[cid].repair_addr = @@ -1115,7 +1122,7 @@ impl Downstairs { // reassignment is handled below. if finished || (repair.aborting_repair && !have_reserved_jobs) { // We're done, submit a final flush! - let flush_id = self.submit_flush(None, None); + let flush_id = self.submit_flush(None, None, None); info!(self.log, "LiveRepair final flush submitted"); cdt::up__to__ds__flush__start!(|| (flush_id.0)); @@ -1184,7 +1191,7 @@ impl Downstairs { let nio = IOop::ExtentLiveNoOp { dependencies: deps }; cdt::gw__noop__start!(|| (noop_id.0)); - self.enqueue(noop_id, nio, None, ClientMap::new()) + self.enqueue(noop_id, nio, None, None) } #[allow(clippy::too_many_arguments)] @@ -1200,7 +1207,7 @@ impl Downstairs { cdt::gw__repair__start!(|| (repair_id.0, eid.0)); - self.enqueue(repair_id, repair_io, None, ClientMap::new()) + self.enqueue(repair_id, repair_io, None, None) } fn repair_or_noop( @@ -1389,7 +1396,7 @@ impl Downstairs { cdt::gw__reopen__start!(|| (reopen_id.0, eid.0)); - self.enqueue(reopen_id, reopen_io, None, ClientMap::new()) + self.enqueue(reopen_id, reopen_io, None, None) } #[cfg(test)] @@ -1417,7 +1424,7 @@ impl Downstairs { block_size: 512, }; - self.enqueue(ds_id, aread, None, ClientMap::new()); + self.enqueue(ds_id, aread, None, None); ds_id } @@ -1448,7 +1455,7 @@ impl Downstairs { iblocks, request, is_write_unwritten, - ClientData::from_fn(|_| BackpressureGuard::dummy()), + IOLimitGuard::dummy(), ) } @@ -1457,7 +1464,7 @@ impl Downstairs { blocks: ImpactedBlocks, write: RawWrite, is_write_unwritten: bool, - bp_guard: ClientData, + io_guard: IOLimitGuard, ) -> JobId { let ds_id = self.next_id(); if is_write_unwritten { @@ -1500,7 +1507,7 @@ impl Downstairs { ds_id, awrite, Some(GuestBlockRes::Acked), // writes are always acked - bp_guard.into(), + Some(io_guard), ); ds_id } @@ -1530,7 +1537,7 @@ impl Downstairs { let close_io = self.create_close_io(eid, deps, repair.to_vec()); cdt::gw__close__start!(|| (close_id.0, eid.0)); - self.enqueue(close_id, close_io, None, ClientMap::new()) + self.enqueue(close_id, close_io, None, None) } /// Get the repair IDs and dependencies for this extent. @@ -1874,6 +1881,7 @@ impl Downstairs { &mut self, snapshot_details: Option, res: Option, + io_guard: Option, ) -> JobId { let next_id = self.next_id(); cdt::gw__flush__start!(|| (next_id.0)); @@ -1903,12 +1911,7 @@ impl Downstairs { }; self.pending_barrier += 1; - self.enqueue( - next_id, - flush, - res.map(GuestBlockRes::Other), - ClientMap::new(), - ); + self.enqueue(next_id, flush, res.map(GuestBlockRes::Other), io_guard); next_id } @@ -1964,12 +1967,7 @@ impl Downstairs { debug!(self.log, "IO Barrier {next_id} has deps {dependencies:?}"); self.pending_barrier += 1; - self.enqueue( - next_id, - IOop::Barrier { dependencies }, - None, - ClientMap::new(), - ); + self.enqueue(next_id, IOop::Barrier { dependencies }, None, None); next_id } @@ -2048,6 +2046,7 @@ impl Downstairs { blocks: ImpactedBlocks, data: Buffer, res: BlockRes, + io_guard: IOLimitGuard, ) -> JobId { // If there is a live-repair in progress that intersects with this read, // then reserve job IDs for those jobs. We must do this before @@ -2077,7 +2076,7 @@ impl Downstairs { }; let res = GuestBlockRes::Read(data, res); - self.enqueue(ds_id, aread, Some(res), ClientMap::new()); + self.enqueue(ds_id, aread, Some(res), Some(io_guard)); ds_id } @@ -2087,7 +2086,7 @@ impl Downstairs { blocks: ImpactedBlocks, write: RawWrite, is_write_unwritten: bool, - backpressure_guard: ClientData, + io_guard: IOLimitGuard, ) -> JobId { // If there is a live-repair in progress that intersects with this read, // then reserve job IDs for those jobs. @@ -2097,7 +2096,7 @@ impl Downstairs { blocks, write, is_write_unwritten, - backpressure_guard, + io_guard, ) } @@ -2150,27 +2149,27 @@ impl Downstairs { ds_id: JobId, io: IOop, res: Option, - mut bp_guard: ClientMap, + io_limits: Option, ) { let mut skipped = 0; let last_repair_extent = self.last_repair_extent(); + let mut io_limits = io_limits + .map(ClientMap::from) + .unwrap_or_else(ClientMap::new); + // Send the job to each client! let state = ClientData::from_fn(|cid| { let client = &mut self.clients[cid]; let r = client.enqueue(ds_id, &io, last_repair_extent); match r { - EnqueueResult::Send | EnqueueResult::Hold => { - // Update the per-client backpressure guard - if !bp_guard.contains(&cid) { - let g = client.backpressure_counters.increment(&io); - bp_guard.insert(cid, g); - } - if matches!(r, EnqueueResult::Send) { - self.send(ds_id, io.clone(), cid); - } + EnqueueResult::Send => self.send(ds_id, io.clone(), cid), + EnqueueResult::Hold => (), + EnqueueResult::Skip => { + // Take and drop the ClientIOLimitGuard, freeing up tokens + let _ = io_limits.take(&cid); + skipped += 1; } - EnqueueResult::Skip => skipped += 1, } r.state() }); @@ -2194,7 +2193,7 @@ impl Downstairs { res, replay: false, data: None, - backpressure_guard: bp_guard, + io_limits, }, ); if acked { @@ -2731,10 +2730,10 @@ impl Downstairs { // **not** when they are retired. We'll do a sanity check here // and print a warning if that's not the case. for c in ClientId::iter() { - if job.backpressure_guard.contains(&c) { + if job.io_limits.contains(&c) { warn!( self.log, - "job {ds_id} had pending backpressure bytes \ + "job {ds_id} had pending io limits \ for client {c}" ); // Backpressure is decremented on drop @@ -3362,19 +3361,11 @@ impl Downstairs { } pub(crate) fn write_bytes_outstanding(&self) -> u64 { + // XXX this overlaps with io_state_byte_count self.clients .iter() .filter(|c| matches!(c.state(), DsState::Active)) - .map(|c| c.backpressure_counters.get_write_bytes()) - .max() - .unwrap_or(0) - } - - pub(crate) fn jobs_outstanding(&self) -> u64 { - self.clients - .iter() - .filter(|c| matches!(c.state(), DsState::Active)) - .map(|c| c.backpressure_counters.get_jobs()) + .map(|c| c.io_state_byte_count().in_progress) .max() .unwrap_or(0) } @@ -3502,7 +3493,7 @@ impl Downstairs { data, }, is_write_unwritten, - ClientData::from_fn(|_| BackpressureGuard::dummy()), + IOLimitGuard::dummy(), ) } @@ -3533,7 +3524,7 @@ impl Downstairs { let ddef = self.ddef.as_ref().unwrap(); let block_count = blocks.blocks(ddef).len(); let buf = Buffer::new(block_count, ddef.block_size() as usize); - self.submit_read(blocks, buf, BlockRes::dummy()) + self.submit_read(blocks, buf, BlockRes::dummy(), IOLimitGuard::dummy()) } /// Create a test downstairs which has all clients Active @@ -4243,19 +4234,6 @@ impl Downstairs { }); } - /// Assign the given number of write bytes to the backpressure counters - #[must_use] - pub(crate) fn early_write_backpressure( - &mut self, - bytes: u64, - ) -> ClientData { - ClientData::from_fn(|i| { - self.clients[i] - .backpressure_counters - .early_write_increment(bytes) - }) - } - pub(crate) fn set_ddef(&mut self, ddef: RegionDefinition) { self.ddef = Some(ddef); } @@ -4264,7 +4242,7 @@ impl Downstairs { pub(crate) fn has_live_jobs(&self) -> bool { self.clients .iter() - .any(|c| c.backpressure_counters.get_jobs() > 0) + .any(|c| c.io_state_job_count().in_progress > 0) } /// Returns the per-client state for the given job @@ -4388,7 +4366,7 @@ pub(crate) mod test { let mut ds = Downstairs::test_default(); ds.force_active(); - let next_id = ds.submit_flush(None, None); + let next_id = ds.submit_flush(None, None, None); assert!(!ds.process_ds_completion( next_id, @@ -4435,6 +4413,7 @@ pub(crate) mod test { snapshot_name: String::from("snap"), }), None, + None, ); assert!(!ds.process_ds_completion( @@ -4480,7 +4459,7 @@ pub(crate) mod test { let mut ds = Downstairs::test_default(); ds.force_active(); - let next_id = ds.submit_flush(None, None); + let next_id = ds.submit_flush(None, None, None); assert!(!ds.process_ds_completion( next_id, @@ -4526,7 +4505,7 @@ pub(crate) mod test { let mut ds = Downstairs::test_default(); ds.force_active(); - let next_id = ds.submit_flush(None, None); + let next_id = ds.submit_flush(None, None, None); assert!(!ds.process_ds_completion( next_id, @@ -4922,7 +4901,7 @@ pub(crate) mod test { assert!(ds.retired_ids.is_empty()); // Create the flush and send it to the downstairs - let flush_id = ds.submit_flush(None, None); + let flush_id = ds.submit_flush(None, None, None); // Simulate completing the flush to downstairs 0 and 1 assert!(!ds.process_ds_completion( @@ -5138,7 +5117,7 @@ pub(crate) mod test { // A flush is required to move work to completed // Create the flush then send it to all downstairs. - let next_id = ds.submit_flush(None, None); + let next_id = ds.submit_flush(None, None, None); // Complete the Flush at each downstairs. assert!(!ds.process_ds_completion( @@ -5204,7 +5183,7 @@ pub(crate) mod test { // A flush is required to move work to completed // Create the flush then send it to all downstairs. - let next_id = ds.submit_flush(None, None); + let next_id = ds.submit_flush(None, None, None); // Send and complete the Flush at each downstairs. for cid in ClientId::iter() { @@ -5281,7 +5260,7 @@ pub(crate) mod test { assert!(ds.retired_ids.is_empty()); // Create the flush IO - let next_id = ds.submit_flush(None, None); + let next_id = ds.submit_flush(None, None, None); // Complete the flush on all three downstairs. assert!(!ds.process_ds_completion( @@ -5376,7 +5355,7 @@ pub(crate) mod test { assert!(ds.retired_ids.is_empty()); // Create and send the flush. - let flush_id = ds.submit_flush(None, None); + let flush_id = ds.submit_flush(None, None, None); // Complete the flush on those downstairs. assert!(!ds.process_ds_completion( @@ -6598,7 +6577,7 @@ pub(crate) mod test { // Create a flush, enqueue it on both the downstairs // and the guest work queues. - let next_id = ds.submit_flush(None, None); + let next_id = ds.submit_flush(None, None, None); let response = || Ok(Default::default()); ds.process_ds_completion( @@ -6644,7 +6623,7 @@ pub(crate) mod test { // Create a flush, enqueue it on both the downstairs // and the guest work queues. - let next_id = ds.submit_flush(None, None); + let next_id = ds.submit_flush(None, None, None); assert!(ds.process_ds_completion( next_id, ClientId::new(0), @@ -6678,7 +6657,7 @@ pub(crate) mod test { // Create a flush, enqueue it on both the downstairs // and the guest work queues. - let next_id = ds.submit_flush(None, None); + let next_id = ds.submit_flush(None, None, None); // DS 1 has a failure, and this won't return true as we don't // have enough success yet to ACK to the guest. @@ -6866,7 +6845,7 @@ pub(crate) mod test { // Perform the flush. let next_id = { - let next_id = ds.submit_flush(None, None); + let next_id = ds.submit_flush(None, None, None); // As this DS is failed, it should have been skipped assert_eq!( @@ -7093,7 +7072,7 @@ pub(crate) mod test { assert_eq!(ds.clients[ClientId::new(2)].skipped_jobs.len(), 0); // Enqueue the flush. - let flush_id = ds.submit_flush(None, None); + let flush_id = ds.submit_flush(None, None, None); assert_eq!( ds.job_states(flush_id), @@ -7535,7 +7514,7 @@ pub(crate) mod test { let read_one = ds.create_and_enqueue_generic_read_eob(); // Finally, add a flush - let flush_one = ds.submit_flush(None, None); + let flush_one = ds.submit_flush(None, None, None); let job = ds.ds_active.get(&write_one).unwrap(); assert_eq!(job.state[ClientId::new(0)], IOState::Skipped); @@ -7661,7 +7640,7 @@ pub(crate) mod test { let read_one = ds.create_and_enqueue_generic_read_eob(); // Finally, add a flush - let flush_one = ds.submit_flush(None, None); + let flush_one = ds.submit_flush(None, None, None); let job = ds.ds_active.get(&write_one).unwrap(); assert_eq!(job.state[ClientId::new(0)], IOState::Skipped); @@ -7845,7 +7824,7 @@ pub(crate) mod test { } // Create a flush. - let flush_one = ds.submit_flush(None, None); + let flush_one = ds.submit_flush(None, None, None); let job = ds.ds_active.get(&flush_one).unwrap(); assert_eq!(job.state[ClientId::new(0)], IOState::Skipped); assert_eq!(job.state[ClientId::new(1)], IOState::Skipped); @@ -7898,7 +7877,7 @@ pub(crate) mod test { let write_one = ds.create_and_enqueue_generic_write_eob(false); // Create a flush - let flush_one = ds.submit_flush(None, None); + let flush_one = ds.submit_flush(None, None, None); // Verify all jobs can be acked (or should have been fast-acked) let write_job = ds.ds_active.get(&write_one).unwrap(); assert!(write_job.acked); @@ -7957,12 +7936,12 @@ pub(crate) mod test { // Create a read, write, flush let read_one = ds.create_and_enqueue_generic_read_eob(); let write_one = ds.create_and_enqueue_generic_write_eob(false); - let flush_one = ds.submit_flush(None, None); + let flush_one = ds.submit_flush(None, None, None); // Create more IOs. let _read_two = ds.create_and_enqueue_generic_read_eob(); let _write_two = ds.create_and_enqueue_generic_write_eob(false); - let _flush_two = ds.submit_flush(None, None); + let _flush_two = ds.submit_flush(None, None, None); // Six jobs have been skipped. for cid in ClientId::iter() { @@ -8897,7 +8876,7 @@ pub(crate) mod test { let read_id = ds.submit_read_block(eid, BlockOffset(0)); let write_id = ds.submit_test_write_block(eid, BlockOffset(1), false); - let flush_id = ds.submit_flush(None, None); + let flush_id = ds.submit_flush(None, None, None); let repair_ids = create_and_enqueue_repair_ops(&mut ds, eid); @@ -9014,7 +8993,7 @@ pub(crate) mod test { let eid = ExtentId(1); let repair_ids = create_and_enqueue_repair_ops(&mut ds, eid); - let flush_id = ds.submit_flush(None, None); + let flush_id = ds.submit_flush(None, None, None); assert_eq!(ds.ds_active.len(), 5); @@ -9094,9 +9073,9 @@ pub(crate) mod test { let mut ds = Downstairs::repair_test_one_repair(); - let flush_id1 = ds.submit_flush(None, None); + let flush_id1 = ds.submit_flush(None, None, None); let repair_ids = create_and_enqueue_repair_ops(&mut ds, ExtentId(1)); - let flush_id2 = ds.submit_flush(None, None); + let flush_id2 = ds.submit_flush(None, None, None); assert_eq!(ds.ds_active.len(), 6); @@ -9129,7 +9108,7 @@ pub(crate) mod test { let repair_ids1 = create_and_enqueue_repair_ops(&mut ds, ExtentId(0)); - let flush_id = ds.submit_flush(None, None); + let flush_id = ds.submit_flush(None, None, None); let repair_ids2 = create_and_enqueue_repair_ops(&mut ds, ExtentId(1)); @@ -9622,7 +9601,7 @@ pub(crate) mod test { let read_id = ds.submit_read_block(ExtentId(0), BlockOffset(1)); - let flush_id = ds.submit_flush(None, None); + let flush_id = ds.submit_flush(None, None, None); assert_eq!(ds.ds_active.len(), 3); @@ -9809,7 +9788,7 @@ pub(crate) mod test { }, }); - let flush_id = ds.submit_flush(None, None); + let flush_id = ds.submit_flush(None, None, None); assert_eq!(ds.ds_active.len(), 1); assert!(ds.get_deps(flush_id).is_empty()); diff --git a/upstairs/src/dummy_downstairs_tests.rs b/upstairs/src/dummy_downstairs_tests.rs index 69bda4a19..fb7254fbd 100644 --- a/upstairs/src/dummy_downstairs_tests.rs +++ b/upstairs/src/dummy_downstairs_tests.rs @@ -567,8 +567,7 @@ impl TestHarness { // require triggering a timeout let (g, mut io) = Guest::new(Some(log.clone())); if opts.disable_backpressure { - io.disable_queue_backpressure(); - io.disable_byte_backpressure(); + io.disable_backpressure(); } let guest = Arc::new(g); diff --git a/upstairs/src/guest.rs b/upstairs/src/guest.rs index 077f4c676..da14210fa 100644 --- a/upstairs/src/guest.rs +++ b/upstairs/src/guest.rs @@ -2,13 +2,10 @@ use std::{ net::SocketAddr, sync::atomic::{AtomicU64, Ordering}, - time::Duration, }; use crate::{ - backpressure::{ - BackpressureAmount, BackpressureConfig, SharedBackpressureAmount, - }, + io_limits::{IOLimitView, IOLimits}, BlockIO, BlockOp, BlockOpWaiter, BlockRes, Buffer, ReadBlockContext, ReplaceResult, UpstairsAction, }; @@ -18,8 +15,8 @@ use crucible_protocol::SnapshotDetails; use async_trait::async_trait; use bytes::BytesMut; -use slog::{error, info, warn, Logger}; -use tokio::sync::{mpsc, Mutex}; +use slog::{info, warn, Logger}; +use tokio::sync::mpsc; use tracing::{instrument, span, Level}; use uuid::Uuid; @@ -110,19 +107,8 @@ pub struct Guest { /// it can be read from a `&self` reference. block_size: AtomicU64, - /// Backpressure is implemented as a delay on host write operations - /// - /// It is stored in an `Arc` so that the `GuestIoHandle` can update it from - /// the IO task. - backpressure: SharedBackpressureAmount, - - /// Lock held during backpressure delay - /// - /// Without this lock, multiple tasks could submit jobs to the upstairs and - /// wait in parallel, which defeats the purpose of backpressure (since you - /// could send arbitrarily many jobs at high speed by sending them from - /// different tasks). - backpressure_lock: Mutex<()>, + /// View into global IO limits + io_limits: IOLimitView, /// Logger for the guest log: Logger, @@ -150,21 +136,31 @@ impl Guest { // time spent waiting for the queue versus time spent in Upstairs code). let (req_tx, req_rx) = mpsc::channel(500); - let backpressure = SharedBackpressureAmount::new(); + // We have to set limits above `IO_OUTSTANDING_MAX_JOBS/BYTES`: + // an `Offline` downstairs must hit that threshold to transition to + // `Faulted`, so we can't be IO-limited before that point. + let io_limits = IOLimits::new( + crate::IO_OUTSTANDING_MAX_JOBS * 3 / 2, + crate::IO_OUTSTANDING_MAX_BYTES as usize * 3 / 2, + ); + let io_limits_view = io_limits.view(); + let io = GuestIoHandle { req_rx, - backpressure: backpressure.clone(), - backpressure_config: BackpressureConfig::default(), + io_limits, + + #[cfg(test)] + disable_backpressure: false, + log: log.clone(), }; let guest = Guest { req_tx, block_size: AtomicU64::new(0), + io_limits: io_limits_view, - backpressure, - backpressure_lock: Mutex::new(()), log, }; (guest, io) @@ -198,28 +194,6 @@ impl Guest { rx.wait().await } - /// Sleeps for a backpressure-dependent amount, holding the lock - /// - /// If backpressure is saturated, logs and returns an error. - async fn backpressure_sleep(&self) -> Result<(), CrucibleError> { - let bp = self.backpressure.load(); - match bp { - BackpressureAmount::Saturated => { - let err = "write queue is saturated"; - error!(self.log, "{err}"); - Err(CrucibleError::IoError(err.to_owned())) - } - BackpressureAmount::Duration(d) => { - if d > Duration::ZERO { - let _guard = self.backpressure_lock.lock().await; - tokio::time::sleep(d).await; - drop(_guard); - } - Ok(()) - } - } - } - #[cfg(test)] pub async fn downstairs_state( &self, @@ -353,11 +327,20 @@ impl BlockIO for Guest { assert_eq!(chunk.len() as u64 % bs, 0); let offset_change = chunk.len() as u64 / bs; + let io_guard = + self.io_limits.claim(chunk.len() as u32).await.map_err( + |e| { + CrucibleError::IoError(format!( + "could not get IO guard for Read: {e:?}" + )) + }, + )?; let (rx, done) = BlockOpWaiter::pair(); let rio = BlockOp::Read { offset, data: chunk, done, + io_guard, }; // Our return value always includes the buffer, so we can splice it @@ -405,9 +388,8 @@ impl BlockIO for Guest { } // We split writes into chunks to bound the maximum (typical) latency of - // any single `BlockOp::Write`. Otherwise, the host could send writes - // which are large enough that our maximum backpressure delay wouldn't - // compensate for them. + // any single `BlockOp::Write`. This makes the system's performance + // characteristics easier to think about. const MDTS: usize = 1024 * 1024; // 1 MiB while !data.is_empty() { @@ -415,13 +397,19 @@ impl BlockIO for Guest { assert_eq!(buf.len() as u64 % bs, 0); let offset_change = buf.len() as u64 / bs; - self.backpressure_sleep().await?; + let io_guard = + self.io_limits.claim(buf.len() as u32).await.map_err(|e| { + CrucibleError::IoError(format!( + "could not get IO guard for Write: {e:?}" + )) + })?; let reply = self .send_and_wait(|done| BlockOp::Write { offset, data: buf, done, + io_guard, }) .await; reply?; @@ -442,11 +430,17 @@ impl BlockIO for Guest { return Ok(()); } - self.backpressure_sleep().await?; + let io_guard = + self.io_limits.claim(data.len() as u32).await.map_err(|e| { + CrucibleError::IoError(format!( + "could not get IO guard for WriteUnwritten: {e:?}" + )) + })?; self.send_and_wait(|done| BlockOp::WriteUnwritten { offset, data, done, + io_guard, }) .await } @@ -455,9 +449,15 @@ impl BlockIO for Guest { &self, snapshot_details: Option, ) -> Result<(), CrucibleError> { + let io_guard = self.io_limits.claim(0).await.map_err(|e| { + CrucibleError::IoError(format!( + "could not get IO guard for flush: {e:?}" + )) + })?; self.send_and_wait(|done| BlockOp::Flush { snapshot_details, done, + io_guard, }) .await } @@ -514,14 +514,15 @@ pub struct GuestIoHandle { /// Queue to receive new blockreqs req_rx: mpsc::Receiver, - /// Current backpressure (shared with the `Guest`) - backpressure: SharedBackpressureAmount, - - /// Backpressure configuration, as a starting point and max delay - backpressure_config: BackpressureConfig, + /// IO limiting (shared with the `Guest`) + io_limits: IOLimits, /// Log handle, mainly to pass it into the [`Upstairs`] pub log: Logger, + + /// Flag to disable backpressure during unit tests + #[cfg(test)] + disable_backpressure: bool, } impl GuestIoHandle { @@ -536,29 +537,17 @@ impl GuestIoHandle { } #[cfg(test)] - pub fn disable_queue_backpressure(&mut self) { - self.backpressure_config.queue.delay_scale = Duration::ZERO; + pub fn disable_backpressure(&mut self) { + self.disable_backpressure = true; } #[cfg(test)] - pub fn disable_byte_backpressure(&mut self) { - self.backpressure_config.bytes.delay_scale = Duration::ZERO; - } - - #[cfg(test)] - pub fn is_queue_backpressure_disabled(&self) -> bool { - self.backpressure_config.queue.delay_scale == Duration::ZERO - } - - /// Set `self.backpressure` based on outstanding IO ratio - pub fn set_backpressure(&self, bytes: u64, jobs: u64) { - let bp = self.backpressure_config.get_backpressure(bytes, jobs); - self.backpressure.store(bp); + pub fn is_backpressure_disabled(&self) -> bool { + self.disable_backpressure } - /// Looks up current backpressure - pub fn get_backpressure(&self) -> BackpressureAmount { - self.backpressure.load() + pub(crate) fn io_limits(&self) -> &IOLimits { + &self.io_limits } } diff --git a/upstairs/src/io_limits.rs b/upstairs/src/io_limits.rs new file mode 100644 index 000000000..14958f2a2 --- /dev/null +++ b/upstairs/src/io_limits.rs @@ -0,0 +1,158 @@ +// Copyright 2024 Oxide Computer Company +use crate::{ClientData, ClientId, ClientMap}; +use std::sync::Arc; +use tokio::sync::{ + AcquireError, OwnedSemaphorePermit, Semaphore, TryAcquireError, +}; + +/// Per-client IO limits +#[derive(Clone, Debug)] +pub struct ClientIOLimits { + /// Semaphore to claim IO bytes on behalf of a job + io_bytes: Arc, + + /// Semaphore to claim individual IO jobs + jobs: Arc, +} + +impl ClientIOLimits { + /// Builds a new `ClientIOLimits` object with the given limits + pub fn new(max_jobs: usize, max_io_bytes: usize) -> Self { + ClientIOLimits { + io_bytes: Semaphore::new(max_io_bytes).into(), + jobs: Semaphore::new(max_jobs).into(), + } + } + + /// Claims a certain number of bytes (and one job) + /// + /// This function waits until the given resources are available; as such, it + /// should not be called from the same task which is processing jobs (since + /// that could create a deadlock). + async fn claim( + &self, + bytes: u32, + ) -> Result { + let io_bytes = self.io_bytes.clone().acquire_many_owned(bytes).await?; + let jobs = self.jobs.clone().acquire_owned().await?; + Ok(ClientIOLimitGuard { io_bytes, jobs }) + } + + /// Tries to claim a certain number of bytes (and one job) + pub fn try_claim( + &self, + bytes: u32, + ) -> Result { + let io_bytes = self.io_bytes.clone().try_acquire_many_owned(bytes)?; + let jobs = self.jobs.clone().try_acquire_owned()?; + Ok(ClientIOLimitGuard { io_bytes, jobs }) + } +} + +/// Read-write handle for IO limits across all 3x clients +#[derive(Clone, Debug)] +pub struct IOLimits(ClientData); + +impl IOLimits { + /// Builds a new set of IO limits + pub fn new(max_jobs: usize, max_io_bytes: usize) -> Self { + Self(ClientData::from_fn(|_i| { + ClientIOLimits::new(max_jobs, max_io_bytes) + })) + } + + /// Returns a per-client IO limit handle + /// + /// This handle shares permits with the parent + pub fn client_limits(&self, i: ClientId) -> ClientIOLimits { + self.0[i].clone() + } + + /// Returns a view handle for the IO limits + pub fn view(&self) -> IOLimitView { + IOLimitView(self.clone()) + } + + /// Try to claim some number of bytes (and one job) + /// + /// Returns `Err((ClientId, e))` if any of the claims fail + pub fn try_claim( + &self, + bytes: u32, + ) -> Result { + let mut out = ClientData::from_fn(|_| None); + for i in ClientId::iter() { + out[i] = Some(self.0[i].try_claim(bytes).map_err(|e| (i, e))?); + } + + Ok(IOLimitGuard(out.map(Option::unwrap))) + } +} + +/// View into global IO limits +/// +/// This is equivalent to an [`IOLimits`], but exposes a different API. It +/// should be owned by a separate task, to avoid deadlocks when trying to claim +/// resources. +#[derive(Clone, Debug)] +pub struct IOLimitView(IOLimits); + +impl IOLimitView { + /// Claim some number of bytes (and one job) + /// + /// This function waits until the given resources are available; as such, it + /// should not be called from the same task which is processing jobs (since + /// that could create a deadlock). + pub async fn claim( + &self, + bytes: u32, + ) -> Result { + let mut out = ClientData::from_fn(|_| None); + let lim = &self.0; + for i in ClientId::iter() { + out[i] = Some(lim.0[i].claim(bytes).await?); + } + Ok(IOLimitGuard(out.map(Option::unwrap))) + } +} + +//////////////////////////////////////////////////////////////////////////////// + +/// Handle owning some amount of per-client IO +/// +/// The IO permits are released when this handle is dropped +#[derive(Debug)] +pub struct ClientIOLimitGuard { + #[expect(unused)] + io_bytes: OwnedSemaphorePermit, + #[expect(unused)] + jobs: OwnedSemaphorePermit, +} + +impl ClientIOLimitGuard { + #[cfg(test)] + pub fn dummy() -> Self { + let a = Arc::new(Semaphore::new(1)); + let b = Arc::new(Semaphore::new(1)); + let io_bytes = a.try_acquire_owned().unwrap(); + let jobs = b.try_acquire_owned().unwrap(); + ClientIOLimitGuard { io_bytes, jobs } + } +} + +/// Handle which stores IO limit guards for all 3x clients +#[derive(Debug)] +pub struct IOLimitGuard(ClientData); + +impl From for ClientMap { + fn from(i: IOLimitGuard) -> Self { + i.0.into() + } +} + +impl IOLimitGuard { + #[cfg(test)] + pub fn dummy() -> Self { + Self(ClientData::from_fn(|_i| ClientIOLimitGuard::dummy())) + } +} diff --git a/upstairs/src/lib.rs b/upstairs/src/lib.rs index bc511deb5..4e798cfce 100644 --- a/upstairs/src/lib.rs +++ b/upstairs/src/lib.rs @@ -48,9 +48,6 @@ pub use in_memory::InMemoryBlockIO; pub mod block_io; pub use block_io::{FileBlockIO, ReqwestBlockIO}; -pub(crate) mod backpressure; -use backpressure::BackpressureGuard; - pub mod block_req; pub(crate) use block_req::{BlockOpWaiter, BlockRes}; @@ -86,17 +83,20 @@ mod downstairs; mod upstairs; use upstairs::{UpCounters, UpstairsAction}; +mod io_limits; +use io_limits::IOLimitGuard; + /// Max number of write bytes between the upstairs and an offline downstairs /// -/// If we exceed this value, the upstairs will give -/// up and mark the offline downstairs as faulted. -const IO_OUTSTANDING_MAX_BYTES: u64 = 1024 * 1024 * 1024; // 1 GiB +/// If we exceed this value, the upstairs will give up and mark the offline +/// downstairs as faulted. +const IO_OUTSTANDING_MAX_BYTES: u64 = 50 * 1024 * 1024; // 50 MiB /// Max number of outstanding IOs between the upstairs and an offline downstairs /// /// If we exceed this value, the upstairs will give up and mark that offline /// downstairs as faulted. -const IO_OUTSTANDING_MAX_JOBS: usize = 10000; +pub const IO_OUTSTANDING_MAX_JOBS: usize = 1000; /// Maximum of bytes to cache from complete (but un-flushed) IO /// @@ -448,6 +448,11 @@ impl ClientData { ]) } + /// Builds a new `ClientData` by applying a function to each item + pub fn map U>(self, f: F) -> ClientData { + ClientData(self.0.map(f)) + } + #[cfg(test)] pub fn get(&self) -> &[T; 3] { &self.0 @@ -926,11 +931,7 @@ struct DownstairsIO { /// consistency checking with subsequent replies. data: Option, - /// Handle for this job's contribution to guest backpressure - /// - /// Each of these guard handles will automatically decrement the - /// backpressure count for their respective Downstairs when dropped. - backpressure_guard: ClientMap, + io_limits: ClientMap, } impl DownstairsIO { @@ -1315,16 +1316,6 @@ impl IOop { _ => 0, } } - - /// Returns the number of bytes written - fn write_bytes(&self) -> u64 { - match &self { - IOop::Write { data, .. } | IOop::WriteUnwritten { data, .. } => { - data.len() as u64 - } - _ => 0, - } - } } #[allow(clippy::derive_partial_eq_without_eq)] @@ -1502,20 +1493,24 @@ pub(crate) enum BlockOp { offset: BlockIndex, data: Buffer, done: BlockRes, + io_guard: IOLimitGuard, }, Write { offset: BlockIndex, data: BytesMut, done: BlockRes, + io_guard: IOLimitGuard, }, WriteUnwritten { offset: BlockIndex, data: BytesMut, done: BlockRes, + io_guard: IOLimitGuard, }, Flush { snapshot_details: Option, done: BlockRes, + io_guard: IOLimitGuard, }, GoActive { done: BlockRes, @@ -1584,8 +1579,6 @@ pub struct Arg { pub up_counters: UpCounters, /// Next JobID pub next_job_id: JobId, - /// Backpressure value - pub up_backpressure: u64, /// Jobs on the downstairs work queue. pub ds_count: u32, /// Number of write bytes in flight @@ -1664,7 +1657,7 @@ pub fn up_main( }; #[cfg(test)] - let disable_backpressure = guest.is_queue_backpressure_disabled(); + let disable_backpressure = guest.is_backpressure_disabled(); /* * Build the Upstairs struct that we use to share data between @@ -1675,7 +1668,7 @@ pub fn up_main( #[cfg(test)] if disable_backpressure { - up.disable_client_backpressure(); + up.downstairs.disable_client_backpressure(); } if let Some(pr) = producer_registry { diff --git a/upstairs/src/upstairs.rs b/upstairs/src/upstairs.rs index a19a504fa..31a518b7e 100644 --- a/upstairs/src/upstairs.rs +++ b/upstairs/src/upstairs.rs @@ -10,6 +10,7 @@ use crate::{ }, downstairs::{Downstairs, DownstairsAction}, extent_from_offset, + io_limits::IOLimitGuard, stats::UpStatOuter, BlockOp, BlockRes, Buffer, ClientId, ClientMap, CrucibleOpts, DsState, EncryptionContext, GuestIoHandle, Message, RegionDefinition, @@ -172,7 +173,7 @@ impl UpCounters { /// - Ack all ackable jobs to the guest /// - Step through the live-repair state machine (if it's running) /// - Check for client-side deactivation (if it's pending) -/// - Set backpressure time in the guest +/// - Set backpressure time in the clients /// /// Keeping the `Upstairs` "clean" through this invariant maintenance makes it /// easier to think about its state, because it's guaranteed to be clean when we @@ -389,6 +390,7 @@ impl Upstairs { ds_target, tls_context, stats.ds_stats(), + guest.io_limits(), log.new(o!("" => "downstairs")), ); let flush_timeout_secs = opt.flush_timeout.unwrap_or(0.5); @@ -423,11 +425,6 @@ impl Upstairs { } } - #[cfg(test)] - pub(crate) fn disable_client_backpressure(&mut self) { - self.downstairs.disable_client_backpressure(); - } - /// Build an Upstairs for simple tests #[cfg(test)] pub fn test_default(ddef: Option) -> Self { @@ -570,7 +567,8 @@ impl Upstairs { .counters .action_flush_check)); if self.need_flush { - self.submit_flush(None, None); + let io_guard = self.try_acquire_io(0); + self.submit_flush(None, None, io_guard); } self.flush_deadline = Instant::now() + self.flush_interval; } @@ -661,7 +659,7 @@ impl Upstairs { // For now, check backpressure after every event. We may want to make // this more nuanced in the future. - self.set_backpressure(); + self.downstairs.set_client_backpressure(); // We do this last because some of the code above can be slow // (especially during debug builds), and we don't want to set our flush @@ -671,6 +669,33 @@ impl Upstairs { } } + /// Attempts to acquire permits to perform an IO job with the given bytes + /// + /// Upon failure, logs an error and returns `None`. + /// + /// This function is used by messages generated internally to the Upstairs + /// for best-effort IO limiting. If the message would exceed our available + /// permits, it's still allowed (because to do otherwise would deadlock the + /// upstairs task). In other words, internally generated messages can limit + /// guest IO work, but not the other way around + fn try_acquire_io(&self, bytes: usize) -> Option { + let Ok(bytes) = u32::try_from(bytes) else { + warn!(self.log, "too many bytes for try_acquire_io"); + return None; + }; + match self.guest.io_limits().try_claim(bytes) { + Ok(v) => Some(v), + Err((i, e)) => { + warn!( + self.log, + "could not apply IO limits to upstairs work: \ + client {i} returned {e:?}" + ); + None + } + } + } + /// Helper function to await all deferred block requests /// /// This is only useful in tests because it **only** processes deferred @@ -726,7 +751,6 @@ impl Upstairs { up_count: self.downstairs.gw_active.len() as u32, up_counters: self.counters, next_job_id: self.downstairs.peek_next_id(), - up_backpressure: self.guest.get_backpressure().as_micros(), write_bytes_out: self.downstairs.write_bytes_outstanding(), ds_count: self.downstairs.active_count() as u32, ds_state: self.downstairs.collect_stats(|c| c.state()), @@ -919,11 +943,21 @@ impl Upstairs { match op { // All Write operations are deferred, because they will offload // encryption to a separate thread pool. - BlockOp::Write { offset, data, done } => { - self.submit_deferred_write(offset, data, done, false); + BlockOp::Write { + offset, + data, + done, + io_guard, + } => { + self.submit_deferred_write(offset, data, done, false, io_guard); } - BlockOp::WriteUnwritten { offset, data, done } => { - self.submit_deferred_write(offset, data, done, true); + BlockOp::WriteUnwritten { + offset, + data, + done, + io_guard, + } => { + self.submit_deferred_write(offset, data, done, true, io_guard); } // If we have any deferred requests in the FuturesOrdered, then we // have to keep using it for subsequent requests (even ones that are @@ -1090,15 +1124,19 @@ impl Upstairs { done.send_ok(self.show_all_work()); } - BlockOp::Read { offset, data, done } => { - self.submit_read(offset, data, done) - } + BlockOp::Read { + offset, + data, + done, + io_guard, + } => self.submit_read(offset, data, done, io_guard), BlockOp::Write { .. } | BlockOp::WriteUnwritten { .. } => { panic!("writes must always be deferred") } BlockOp::Flush { snapshot_details, done, + io_guard, } => { /* * Submit for read and write both check if the upstairs is @@ -1111,7 +1149,7 @@ impl Upstairs { done.send_err(CrucibleError::UpstairsInactive); return; } - self.submit_flush(Some(done), snapshot_details); + self.submit_flush(Some(done), snapshot_details, Some(io_guard)); } BlockOp::ReplaceDownstairs { id, old, new, done } => { let r = self.downstairs.replace(id, old, new, &self.state); @@ -1251,7 +1289,8 @@ impl Upstairs { } if !self.downstairs.can_deactivate_immediately() { debug!(self.log, "not ready to deactivate; submitting final flush"); - self.submit_flush(None, None); + let io_guard = self.try_acquire_io(0); + self.submit_flush(None, None, io_guard); } else { debug!(self.log, "ready to deactivate right away"); // Deactivation is handled in the invariant-checking portion of @@ -1265,6 +1304,7 @@ impl Upstairs { &mut self, res: Option, snapshot_details: Option, + io_guard: Option, ) { // Notice that unlike submit_read and submit_write, we do not check for // guest_io_ready here. The upstairs itself can call submit_flush @@ -1282,7 +1322,9 @@ impl Upstairs { if snapshot_details.is_some() { info!(self.log, "flush with snap requested"); } - let ds_id = self.downstairs.submit_flush(snapshot_details, res); + let ds_id = + self.downstairs + .submit_flush(snapshot_details, res, io_guard); cdt::up__to__ds__flush__start!(|| (ds_id.0)); } @@ -1305,8 +1347,7 @@ impl Upstairs { offset: BlockIndex, data: Buffer, ) { - let br = BlockRes::dummy(); - self.submit_read(offset, data, br) + self.submit_read(offset, data, BlockRes::dummy(), IOLimitGuard::dummy()) } /// Submit a read job to the downstairs @@ -1315,6 +1356,7 @@ impl Upstairs { offset: BlockIndex, data: Buffer, res: BlockRes, + io_guard: IOLimitGuard, ) { if !self.guest_io_ready() { res.send_err((data, CrucibleError::UpstairsInactive)); @@ -1353,7 +1395,9 @@ impl Upstairs { * Grab this ID after extent_from_offset: in case of Err we don't * want to create a gap in the IDs. */ - let ds_id = self.downstairs.submit_read(impacted_blocks, data, res); + let ds_id = + self.downstairs + .submit_read(impacted_blocks, data, res, io_guard); cdt::up__to__ds__read__start!(|| (ds_id.0)); } @@ -1373,6 +1417,7 @@ impl Upstairs { data, BlockRes::dummy(), is_write_unwritten, + IOLimitGuard::dummy(), ) { self.submit_write(DeferredWrite::run(w)) } @@ -1389,14 +1434,19 @@ impl Upstairs { data: BytesMut, res: BlockRes, is_write_unwritten: bool, + io_guard: IOLimitGuard, ) { // It's possible for the write to be invalid out of the gate, in which // case `compute_deferred_write` replies to the `res` itself and returns // `None`. Otherwise, we have to store a future to process the write // result. - if let Some(w) = - self.compute_deferred_write(offset, data, res, is_write_unwritten) - { + if let Some(w) = self.compute_deferred_write( + offset, + data, + res, + is_write_unwritten, + io_guard, + ) { let should_defer = !self.deferred_ops.is_empty() || w.data.len() > MIN_DEFER_SIZE_BYTES as usize; if should_defer { @@ -1418,6 +1468,7 @@ impl Upstairs { data: BytesMut, res: BlockRes, is_write_unwritten: bool, + io_guard: IOLimitGuard, ) -> Option { if !self.guest_io_ready() { res.send_err(CrucibleError::UpstairsInactive); @@ -1445,8 +1496,6 @@ impl Upstairs { let impacted_blocks = extent_from_offset(&ddef, offset, ddef.bytes_to_blocks(data.len())); - let guard = self.downstairs.early_write_backpressure(data.len() as u64); - // Fast-ack, pretending to be done immediately operations res.send_ok(()); @@ -1456,7 +1505,7 @@ impl Upstairs { data, is_write_unwritten, cfg: self.cfg.clone(), - guard, + io_guard, }) } @@ -1476,7 +1525,7 @@ impl Upstairs { write.impacted_blocks, write.data, write.is_write_unwritten, - write.guard, + write.io_guard, ); if write.is_write_unwritten { @@ -2024,16 +2073,6 @@ impl Upstairs { self.downstairs.reinitialize(client_id, &self.state); } - /// Sets both guest and per-client backpressure - fn set_backpressure(&self) { - self.guest.set_backpressure( - self.downstairs.write_bytes_outstanding(), - self.downstairs.jobs_outstanding(), - ); - - self.downstairs.set_client_backpressure(); - } - /// Returns the `RegionDefinition` /// /// # Panics @@ -2355,7 +2394,7 @@ pub(crate) mod test { ); // op 1 - upstairs.submit_flush(None, None); + upstairs.submit_flush(None, None, None); // op 2 upstairs.submit_dummy_write( @@ -2400,7 +2439,7 @@ pub(crate) mod test { } // op 3 - upstairs.submit_flush(None, None); + upstairs.submit_flush(None, None, None); // ops 4 to 6 for i in 3..6 { @@ -2731,7 +2770,7 @@ pub(crate) mod test { ); // op 1 - upstairs.submit_flush(None, None); + upstairs.submit_flush(None, None, None); // op 2 upstairs.submit_dummy_read(BlockIndex(0), Buffer::new(2, 512)); @@ -2758,11 +2797,11 @@ pub(crate) mod test { let mut upstairs = make_upstairs(); upstairs.force_active().unwrap(); - upstairs.submit_flush(None, None); + upstairs.submit_flush(None, None, None); - upstairs.submit_flush(None, None); + upstairs.submit_flush(None, None, None); - upstairs.submit_flush(None, None); + upstairs.submit_flush(None, None, None); let jobs = upstairs.downstairs.get_all_jobs(); assert_eq!(jobs.len(), 3); @@ -2792,7 +2831,7 @@ pub(crate) mod test { upstairs.force_active().unwrap(); // op 0 - upstairs.submit_flush(None, None); + upstairs.submit_flush(None, None, None); // ops 1 to 2 for i in 0..2 { @@ -2804,7 +2843,7 @@ pub(crate) mod test { } // op 3 - upstairs.submit_flush(None, None); + upstairs.submit_flush(None, None, None); // ops 4 to 6 for i in 0..3 { @@ -2816,7 +2855,7 @@ pub(crate) mod test { } // op 7 - upstairs.submit_flush(None, None); + upstairs.submit_flush(None, None, None); let jobs = upstairs.downstairs.get_all_jobs(); assert_eq!(jobs.len(), 8); @@ -3270,7 +3309,7 @@ pub(crate) mod test { upstairs.submit_dummy_read(BlockIndex(95), Buffer::new(2, 512)); // op 1 - upstairs.submit_flush(None, None); + upstairs.submit_flush(None, None, None); // op 2 upstairs.submit_dummy_write( @@ -3489,10 +3528,21 @@ pub(crate) mod test { let offset = BlockIndex(7); let data = BytesMut::from([1; 512].as_slice()); let (_write_res, done) = BlockOpWaiter::pair(); + let io_guard = IOLimitGuard::dummy(); let op = if is_write_unwritten { - BlockOp::WriteUnwritten { offset, data, done } + BlockOp::WriteUnwritten { + offset, + data, + done, + io_guard, + } } else { - BlockOp::Write { offset, data, done } + BlockOp::Write { + offset, + data, + done, + io_guard, + } }; up.apply(UpstairsAction::Guest(op)); up.await_deferred_ops().await; @@ -3605,7 +3655,13 @@ pub(crate) mod test { let data = Buffer::new(1, 512); let offset = BlockIndex(7); let (_res, done) = BlockOpWaiter::pair(); - up.apply(UpstairsAction::Guest(BlockOp::Read { offset, data, done })); + let io_guard = IOLimitGuard::dummy(); + up.apply(UpstairsAction::Guest(BlockOp::Read { + offset, + data, + done, + io_guard, + })); // fake read response from downstairs that will successfully decrypt let mut data = Vec::from([1u8; 512]); @@ -3653,7 +3709,13 @@ pub(crate) mod test { let data = Buffer::new(blocks, 512); let offset = BlockIndex(7); let (_res, done) = BlockOpWaiter::pair(); - up.apply(UpstairsAction::Guest(BlockOp::Read { offset, data, done })); + let io_guard = IOLimitGuard::dummy(); + up.apply(UpstairsAction::Guest(BlockOp::Read { + offset, + data, + done, + io_guard, + })); let mut data = Vec::from([1u8; 512]); @@ -3711,7 +3773,13 @@ pub(crate) mod test { let data = Buffer::new(blocks, 512); let offset = BlockIndex(7); let (_res, done) = BlockOpWaiter::pair(); - up.apply(UpstairsAction::Guest(BlockOp::Read { offset, data, done })); + let io_guard = IOLimitGuard::dummy(); + up.apply(UpstairsAction::Guest(BlockOp::Read { + offset, + data, + done, + io_guard, + })); // fake read response from downstairs that will fail decryption let mut data = Vec::from([1u8; 512]); @@ -3787,7 +3855,13 @@ pub(crate) mod test { let data = Buffer::new(1, 512); let offset = BlockIndex(7); let (_res, done) = BlockOpWaiter::pair(); - up.apply(UpstairsAction::Guest(BlockOp::Read { offset, data, done })); + let io_guard = IOLimitGuard::dummy(); + up.apply(UpstairsAction::Guest(BlockOp::Read { + offset, + data, + done, + io_guard, + })); // fake read response from downstairs that will fail decryption let mut data = Vec::from([1u8; 512]); @@ -3850,7 +3924,13 @@ pub(crate) mod test { let data = Buffer::new(1, 512); let offset = BlockIndex(7); let (_res, done) = BlockOpWaiter::pair(); - up.apply(UpstairsAction::Guest(BlockOp::Read { offset, data, done })); + let io_guard = IOLimitGuard::dummy(); + up.apply(UpstairsAction::Guest(BlockOp::Read { + offset, + data, + done, + io_guard, + })); // fake read response from downstairs that will fail integrity hash // check @@ -3895,7 +3975,13 @@ pub(crate) mod test { let data = Buffer::new(1, 512); let offset = BlockIndex(7); let (_res, done) = BlockOpWaiter::pair(); - up.apply(UpstairsAction::Guest(BlockOp::Read { offset, data, done })); + let io_guard = IOLimitGuard::dummy(); + up.apply(UpstairsAction::Guest(BlockOp::Read { + offset, + data, + done, + io_guard, + })); let data = BytesMut::from([1u8; 512].as_slice()); let hash = integrity_hash(&[&data]); @@ -3956,7 +4042,13 @@ pub(crate) mod test { let data = Buffer::new(1, 512); let offset = BlockIndex(7); let (_res, done) = BlockOpWaiter::pair(); - up.apply(UpstairsAction::Guest(BlockOp::Read { offset, data, done })); + let io_guard = IOLimitGuard::dummy(); + up.apply(UpstairsAction::Guest(BlockOp::Read { + offset, + data, + done, + io_guard, + })); for client_id in [ClientId::new(0), ClientId::new(1)] { let data = BytesMut::from([1u8; 512].as_slice()); @@ -4019,7 +4111,13 @@ pub(crate) mod test { let data = Buffer::new(1, 512); let offset = BlockIndex(7); let (_res, done) = BlockOpWaiter::pair(); - up.apply(UpstairsAction::Guest(BlockOp::Read { offset, data, done })); + let io_guard = IOLimitGuard::dummy(); + up.apply(UpstairsAction::Guest(BlockOp::Read { + offset, + data, + done, + io_guard, + })); let data = BytesMut::from([1u8; 512].as_slice()); let hash = integrity_hash(&[&data]); @@ -4079,7 +4177,13 @@ pub(crate) mod test { let data = Buffer::new(1, 512); let offset = BlockIndex(7); let (_res, done) = BlockOpWaiter::pair(); - up.apply(UpstairsAction::Guest(BlockOp::Read { offset, data, done })); + let io_guard = IOLimitGuard::dummy(); + up.apply(UpstairsAction::Guest(BlockOp::Read { + offset, + data, + done, + io_guard, + })); // The first read has no block contexts, because it was unwritten let data = BytesMut::from([0u8; 512].as_slice()); @@ -4135,7 +4239,13 @@ pub(crate) mod test { let data = Buffer::new(1, 512); let offset = BlockIndex(7); let (_res, done) = BlockOpWaiter::pair(); - up.apply(UpstairsAction::Guest(BlockOp::Read { offset, data, done })); + let io_guard = IOLimitGuard::dummy(); + up.apply(UpstairsAction::Guest(BlockOp::Read { + offset, + data, + done, + io_guard, + })); // The first read has no block contexts, because it was unwritten let data = BytesMut::from([0u8; 512].as_slice()); @@ -4195,7 +4305,13 @@ pub(crate) mod test { data.extend_from_slice(vec![1; NODEFER_SIZE].as_slice()); let offset = BlockIndex(7); let (_res, done) = BlockOpWaiter::pair(); - up.apply(UpstairsAction::Guest(BlockOp::Write { offset, data, done })); + let io_guard = IOLimitGuard::dummy(); + up.apply(UpstairsAction::Guest(BlockOp::Write { + offset, + data, + done, + io_guard, + })); assert_eq!(up.deferred_ops.len(), 0); // Submit a long write, which should be deferred @@ -4203,7 +4319,13 @@ pub(crate) mod test { data.extend_from_slice(vec![2; DEFER_SIZE].as_slice()); let offset = BlockIndex(7); let (_res, done) = BlockOpWaiter::pair(); - up.apply(UpstairsAction::Guest(BlockOp::Write { offset, data, done })); + let io_guard = IOLimitGuard::dummy(); + up.apply(UpstairsAction::Guest(BlockOp::Write { + offset, + data, + done, + io_guard, + })); assert_eq!(up.deferred_ops.len(), 1); assert_eq!(up.deferred_msgs.len(), 0); @@ -4213,7 +4335,13 @@ pub(crate) mod test { data.extend_from_slice(vec![3; NODEFER_SIZE].as_slice()); let offset = BlockIndex(7); let (_res, done) = BlockOpWaiter::pair(); - up.apply(UpstairsAction::Guest(BlockOp::Write { offset, data, done })); + let io_guard = IOLimitGuard::dummy(); + up.apply(UpstairsAction::Guest(BlockOp::Write { + offset, + data, + done, + io_guard, + })); assert_eq!(up.deferred_ops.len(), 2); assert_eq!(up.deferred_msgs.len(), 0); } @@ -4235,7 +4363,13 @@ pub(crate) mod test { let data = Buffer::new(1, 512); let offset = BlockIndex(7); let (res, done) = BlockOpWaiter::pair(); - up.apply(UpstairsAction::Guest(BlockOp::Read { offset, data, done })); + let io_guard = IOLimitGuard::dummy(); + up.apply(UpstairsAction::Guest(BlockOp::Read { + offset, + data, + done, + io_guard, + })); let reply = res.wait_raw().await.unwrap(); match reply {