From 0868720454fa9228f3c8c2bad56ed78f1ee7881f Mon Sep 17 00:00:00 2001 From: Greg Holland <30577851+horned-sphere@users.noreply.github.com> Date: Fri, 11 Oct 2024 15:16:54 +0100 Subject: [PATCH 1/2] Added budget consumer. --- .../swimos_byte_channel/src/coop/mod.rs | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/swimos_utilities/swimos_byte_channel/src/coop/mod.rs b/swimos_utilities/swimos_byte_channel/src/coop/mod.rs index c8ee1d3d4..a79f0be0a 100644 --- a/swimos_utilities/swimos_byte_channel/src/coop/mod.rs +++ b/swimos_utilities/swimos_byte_channel/src/coop/mod.rs @@ -19,7 +19,7 @@ use std::{ task::{Context, Poll}, }; -use futures::Future; +use futures::{ready, Future}; use pin_project::pin_project; #[cfg(test)] @@ -133,3 +133,19 @@ impl Future for RunWithBudget { projected.fut.poll(cx) } } + +#[pin_project] +pub struct BudgetConsumer { + #[pin] + fut: F, +} + +impl Future for BudgetConsumer { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + ready!(consume_budget(cx)); + let projected = self.project(); + track_progress(projected.fut.poll(cx)) + } +} \ No newline at end of file From 156b645f6669ac4aa937656e7042fb5a89ee5561 Mon Sep 17 00:00:00 2001 From: Greg Holland <30577851+horned-sphere@users.noreply.github.com> Date: Fri, 11 Oct 2024 16:08:56 +0100 Subject: [PATCH 2/2] Budget consumer future. --- .../swimos_byte_channel/src/coop/mod.rs | 26 ++++++--- .../swimos_byte_channel/src/coop/tests.rs | 53 ++++++++++++++++++- .../swimos_byte_channel/src/lib.rs | 2 +- 3 files changed, 72 insertions(+), 9 deletions(-) diff --git a/swimos_utilities/swimos_byte_channel/src/coop/mod.rs b/swimos_utilities/swimos_byte_channel/src/coop/mod.rs index a79f0be0a..a9b677829 100644 --- a/swimos_utilities/swimos_byte_channel/src/coop/mod.rs +++ b/swimos_utilities/swimos_byte_channel/src/coop/mod.rs @@ -71,7 +71,7 @@ fn set_budget(n: usize) { }) } -/// Wraps a futures and ensures that the byte channel budget is reset each time it is polled. +/// Wraps a futures and ensures that the task budget is reset each time it is polled. #[pin_project] #[derive(Debug, Clone, Copy)] pub struct RunWithBudget { @@ -95,22 +95,27 @@ impl RunWithBudget { } } -/// Extension trait to allow futures to be run with a byte channel budget. +/// Extension trait to allow futures to be run with a task budget. pub trait BudgetedFutureExt: Sized + Future { - /// Run this future with the default byte channel budget. + /// Run this future with the default task budget. fn budgeted(self) -> RunWithBudget { RunWithBudget::new(self) } - /// Run this future with the specified byte channel budget. + /// Run this future with the specified task budget. fn with_budget(self, budget: NonZeroUsize) -> RunWithBudget { RunWithBudget::with_budget(budget, self) } - /// Run this future wit a specified byte channel budget or the default if not is specified. + /// Run this future wit a specified task budget or the default if not is specified. fn with_budget_or_default(self, budget: Option) -> RunWithBudget { RunWithBudget::with_budget(budget.unwrap_or(DEFAULT_START_BUDGET), self) } + + /// Run this future, consuming budget if it does work. + fn consuming(self) -> BudgetConsumer { + BudgetConsumer::new(self) + } } impl BudgetedFutureExt for F {} @@ -134,12 +139,19 @@ impl Future for RunWithBudget { } } +/// Wraps a future to consume the task budget when it performs work. #[pin_project] pub struct BudgetConsumer { - #[pin] + #[pin] fut: F, } +impl BudgetConsumer { + pub fn new(fut: F) -> Self { + BudgetConsumer { fut } + } +} + impl Future for BudgetConsumer { type Output = F::Output; @@ -148,4 +160,4 @@ impl Future for BudgetConsumer { let projected = self.project(); track_progress(projected.fut.poll(cx)) } -} \ No newline at end of file +} diff --git a/swimos_utilities/swimos_byte_channel/src/coop/tests.rs b/swimos_utilities/swimos_byte_channel/src/coop/tests.rs index 4c03cc799..290cbb27a 100644 --- a/swimos_utilities/swimos_byte_channel/src/coop/tests.rs +++ b/swimos_utilities/swimos_byte_channel/src/coop/tests.rs @@ -12,9 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use futures::task::{waker, ArcWake}; +use futures::{ + future::{pending, ready}, + task::{waker, ArcWake}, +}; use std::{ cell::Cell, + future::Future, num::NonZeroUsize, sync::{atomic::AtomicBool, Arc}, task::{Context, Poll}, @@ -25,6 +29,8 @@ use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use crate::{byte_channel, RunWithBudget}; +use super::BudgetedFutureExt; + struct TestWaker(AtomicBool); impl TestWaker { @@ -165,3 +171,48 @@ async fn with_budget_sets_budget() { fut.await; } + +#[test] +fn consume_budget_consumes() { + let w = Arc::new(TestWaker::default()); + let waker = waker(w.clone()); + let mut cx = Context::from_waker(&waker); + super::set_budget(2); + + let fut = pin!(ready(0).consuming()); + + assert_eq!(fut.poll(&mut cx), Poll::Ready(0)); + assert_eq!(super::TASK_BUDGET.with(Cell::get), Some(1)); + + assert!(!w.triggered()); +} + +#[test] +fn consume_budget_pending_no_consume() { + let w = Arc::new(TestWaker::default()); + let waker = waker(w.clone()); + let mut cx = Context::from_waker(&waker); + super::set_budget(2); + + let fut = pin!(pending::().consuming()); + + assert_eq!(fut.poll(&mut cx), Poll::Pending); + assert_eq!(super::TASK_BUDGET.with(Cell::get), Some(2)); + assert!(!w.triggered()); +} + +#[test] +fn consume_budget_yields_on_exhaustion() { + let w = Arc::new(TestWaker::default()); + let waker = waker(w.clone()); + let mut cx = Context::from_waker(&waker); + super::set_budget(1); + + let mut fut = pin!(ready(0).consuming()); + + assert_eq!(fut.as_mut().poll(&mut cx), Poll::Pending); + assert_eq!(super::TASK_BUDGET.with(Cell::get), None); + assert!(w.triggered()); + + assert_eq!(fut.poll(&mut cx), Poll::Ready(0)); +} diff --git a/swimos_utilities/swimos_byte_channel/src/lib.rs b/swimos_utilities/swimos_byte_channel/src/lib.rs index 3593c3cdb..7911f2d49 100644 --- a/swimos_utilities/swimos_byte_channel/src/lib.rs +++ b/swimos_utilities/swimos_byte_channel/src/lib.rs @@ -38,4 +38,4 @@ mod coop; pub use channel::{are_connected, byte_channel, ByteReader, ByteWriter}; #[cfg(feature = "coop")] -pub use coop::{BudgetedFutureExt, RunWithBudget}; +pub use coop::{BudgetConsumer, BudgetedFutureExt, RunWithBudget};