diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index c0018d4c2d8..020cc1e4ac2 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -92,7 +92,6 @@ time = [] tokio-macros = { version = "~2.2.0", path = "../tokio-macros", optional = true } pin-project-lite = "0.2.11" -rustversion = { git = "https://github.com/wenym1/rustversion", rev = "0b11410" } # Everything else is optional... bytes = { version = "1.0.0", optional = true } @@ -153,8 +152,7 @@ wasm-bindgen-test = "0.3.0" mio-aio = { version = "0.8.0", features = ["tokio"] } [target.'cfg(loom)'.dev-dependencies] -loom = { git = "https://github.com/wenym1/loom", rev = "4edf9435", features = ["futures", "checkpoint"] } -#loom = { path = "/Users/william/repo/loom", features = ["futures", "checkpoint"] } +loom = { version = "0.7", features = ["futures", "checkpoint"] } [package.metadata.docs.rs] all-features = true diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs index 6b8c5675a19..f954811f0ab 100644 --- a/tokio/src/sync/oneshot.rs +++ b/tokio/src/sync/oneshot.rs @@ -627,32 +627,7 @@ impl Sender { ) }); - #[rustversion::since(1.70)] - fn consume_inner(inner: Arc>) -> Result<(), T> { - if let Some(inner) = Arc::into_inner(inner) { - if let Some(t) = inner.value.with_mut(|ptr| unsafe { - // SAFETY: we have successfully returned with `Some`, which means we are the - // only accessor to `ptr`. - // - // Note: value can be `None` even though we have previously set it as `Some`, - // because the value may have been consumed by receiver before we reach here. - (*ptr).take() - }) { - Err(t) - } else { - Ok(()) - } - } else { - Ok(()) - } - } - - #[rustversion::before(1.70)] - fn consume_inner(_inner: Arc>) -> Result<(), T> { - Ok(()) - } - - consume_inner(inner) + Ok(()) } /// Waits for the associated [`Receiver`] handle to close. @@ -1097,7 +1072,16 @@ impl Receiver { impl Drop for Receiver { fn drop(&mut self) { if let Some(inner) = self.inner.as_ref() { - inner.close(); + let state = inner.close(); + + if state.is_complete() { + drop(unsafe { + // SAFETY: we have ensured that the `VALUE_SENT` bit has been set, + // so only the receiver can access the value. + inner.consume_value() + }); + } + #[cfg(all(tokio_unstable, feature = "tracing"))] self.resource_span.in_scope(|| { tracing::trace!( @@ -1227,7 +1211,7 @@ impl Inner { } /// Called by `Receiver` to indicate that the value will never be received. - fn close(&self) { + fn close(&self) -> State { let prev = State::set_closed(&self.state); if prev.is_tx_task_set() && !prev.is_complete() { @@ -1235,6 +1219,8 @@ impl Inner { self.tx_task.with_task(Waker::wake_by_ref); } } + + prev } /// Consumes the value. This function does not check `state`. @@ -1273,6 +1259,16 @@ impl Drop for Inner { self.tx_task.drop_task(); } } + + unsafe { + // SAFETY: we have `&mut self`, and therefore we have + // exclusive access to the value. + // + // Note: the assertion holds because if the value has been sent by sender, + // we must ensure that the value must have been consumed by the receiver before + // dropping the `Inner`. + debug_assert!(self.consume_value().is_none()); + } } }