diff --git a/Cargo.toml b/Cargo.toml index 6aef255c..2d511a8f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ categories = ["asynchronous", "concurrency"] async-trait = "0.1.36" barrage = "0.2.1" catty = "0.1.4" -flume = { version = "0.10.9", default-features = false, features = ["async"] } +flume = { git = "https://github.com/Restioson/flume", branch = "sink_to_sender", default-features = false, features = ["async"] } futures-core = { version = "0.3.5", default-features = false, features = ["alloc"] } futures-sink = { version = "0.3.5", default-features = false } futures-util = { version = "0.3.5", default-features = false, features = ["sink"] } diff --git a/src/address.rs b/src/address.rs index 3699cd54..ebf9961a 100644 --- a/src/address.rs +++ b/src/address.rs @@ -1,19 +1,20 @@ //! An address to an actor is a way to send it a message. An address allows an actor to be sent any //! kind of message that it can receive. +use flume::r#async::SendSink; +use futures_core::Stream; +use futures_sink::Sink; +use futures_util::{future, StreamExt}; use std::fmt::{self, Display, Formatter}; use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; use std::{cmp::Ordering, error::Error, hash::Hash}; -use flume::Sender; -use futures_core::Stream; -use futures_util::{future, StreamExt}; - -use crate::envelope::ReturningEnvelope; +use crate::envelope::{NonReturningEnvelope, ReturningEnvelope}; use crate::manager::AddressMessage; use crate::refcount::{Either, RefCounter, Strong, Weak}; use crate::send_future::{NameableSending, ResolveToHandlerReturn, SendFuture}; -use crate::sink::AddressSink; use crate::{Handler, KeepRunning}; /// The actor is no longer running and disconnected from the sending address. For why this could @@ -39,8 +40,8 @@ impl Error for Disconnected {} /// should be used instead. An address is created by calling the /// [`Actor::create`](../trait.Actor.html#method.create) or /// [`Context::run`](../struct.Context.html#method.run) methods, or by cloning another `Address`. -pub struct Address { - pub(crate) sender: Sender>, +pub struct Address { + pub(crate) sink: SendSink<'static, AddressMessage>, pub(crate) ref_counter: Rc, } @@ -57,7 +58,7 @@ impl Address { /// addresses exist. pub fn downgrade(&self) -> WeakAddress { WeakAddress { - sender: self.sender.clone(), + sink: self.sink.clone(), ref_counter: self.ref_counter.downgrade(), } } @@ -68,7 +69,7 @@ impl Address { /// Converts this address into a weak address. pub fn downgrade(&self) -> WeakAddress { WeakAddress { - sender: self.sender.clone(), + sink: self.sink.clone(), ref_counter: self.ref_counter.clone().into_weak(), } } @@ -104,17 +105,17 @@ impl Address { /// }) /// ``` pub fn is_connected(&self) -> bool { - !self.sender.is_disconnected() + !self.sink.is_disconnected() } /// Returns the number of messages in the actor's mailbox. pub fn len(&self) -> usize { - self.sender.len() + self.sink.len() } /// The total capacity of the actor's mailbox. pub fn capacity(&self) -> Option { - self.sender.capacity() + self.sink.capacity() } /// Returns whether the actor's mailbox is empty. @@ -125,8 +126,8 @@ impl Address { /// Convert this address into a generic address which can be weak or strong. pub fn as_either(&self) -> Address { Address { + sink: self.sink.clone(), ref_counter: self.ref_counter.clone().into_either(), - sender: self.sender.clone(), } } @@ -152,8 +153,8 @@ impl Address { if self.is_connected() { let (envelope, rx) = ReturningEnvelope::>::Return>::new(message); let tx = self - .sender - .clone() + .sink + .sender() .into_send_async(AddressMessage::Message(Box::new(envelope))); SendFuture::sending_named(tx, rx) @@ -196,25 +197,51 @@ impl Address { } } - /// Converts this address into a [futures `Sink`](https://docs.rs/futures/0.3/futures/io/struct.Sink.html). - pub fn into_sink(self) -> AddressSink { - AddressSink { - sink: self.sender.clone().into_sink(), - ref_counter: self.ref_counter.clone(), - } - } - /// Waits until this address becomes disconnected. pub fn join(&self) -> impl Future + Send + Unpin { self.ref_counter.disconnect_notice() } } +impl Sink for Address +where + A: Handler, + M: Send + 'static, + Rc: RefCounter, +{ + type Error = Disconnected; + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.sink) + .poll_ready(cx) + .map_err(|_| Disconnected) + } + + fn start_send(mut self: Pin<&mut Self>, item: M) -> Result<(), Self::Error> { + let item = AddressMessage::Message(Box::new(NonReturningEnvelope::new(item))); + Pin::new(&mut self.sink) + .start_send(item) + .map_err(|_| Disconnected) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.sink) + .poll_flush(cx) + .map_err(|_| Disconnected) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.sink) + .poll_close(cx) + .map_err(|_| Disconnected) + } +} + // Required because #[derive] adds an A: Clone bound impl Clone for Address { fn clone(&self) -> Self { Address { - sender: self.sender.clone(), + sink: self.sink.clone(), ref_counter: self.ref_counter.clone(), } } @@ -226,7 +253,7 @@ impl Drop for Address { // We should notify the ActorManager that there are no more strong Addresses and the actor // should be stopped. if self.ref_counter.is_last_strong() { - let _ = self.sender.send(AddressMessage::LastAddress); + let _ = self.sink.sender().send(AddressMessage::LastAddress); } } } diff --git a/src/context.rs b/src/context.rs index a59e5bd2..629b4b0f 100644 --- a/src/context.rs +++ b/src/context.rs @@ -88,7 +88,7 @@ impl Context { let weak = strong.downgrade(); let addr = Address { - sender: sender.clone(), + sink: sender.clone().into_sink(), ref_counter: strong, }; @@ -136,7 +136,7 @@ impl Context { /// Get an address to the current actor if there are still external addresses to the actor. pub fn address(&self) -> Result, ActorShutdown> { Ok(Address { - sender: self.sender.clone(), + sink: self.sender.clone().into_sink(), ref_counter: self.ref_counter.upgrade().ok_or(ActorShutdown)?, }) } diff --git a/src/lib.rs b/src/lib.rs index ac4c3a34..220f9835 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,7 +21,6 @@ mod receiver; /// influences whether the address will keep the actor alive for as long as it lives. pub mod refcount; mod send_future; -pub mod sink; /// This module contains a trait to spawn actors, implemented for all major async runtimes by default. pub mod spawn; #[cfg(feature = "with-tracing-0_1")] @@ -251,7 +250,6 @@ impl From<()> for KeepRunning { mod private { use crate::refcount::{Either, RefCounter, Strong, Weak}; - use crate::sink::AddressSink; use crate::{Actor, Address}; pub trait Sealed {} @@ -260,5 +258,4 @@ mod private { impl Sealed for Weak {} impl Sealed for Either {} impl Sealed for Address {} - impl Sealed for AddressSink {} } diff --git a/src/message_channel.rs b/src/message_channel.rs index 9d2ad186..82f9295d 100644 --- a/src/message_channel.rs +++ b/src/message_channel.rs @@ -12,7 +12,6 @@ use crate::private::Sealed; use crate::receiver::Receiver; use crate::refcount::{RefCounter, Shared, Strong}; use crate::send_future::{ResolveToHandlerReturn, SendFuture}; -use crate::sink::{AddressSink, MessageSink, StrongMessageSink, WeakMessageSink}; use crate::{Handler, KeepRunning}; /// A message channel is a channel through which you can send only one kind of message, but to @@ -77,7 +76,7 @@ use crate::{Handler, KeepRunning}; /// }) /// } /// ``` -pub trait MessageChannel: Sealed + Unpin + Send + Sync { +pub trait MessageChannel: Sealed + Unpin + Send { /// The return value of the handler for `M`. type Return: Send + 'static; @@ -127,10 +126,6 @@ pub trait MessageChannel: Sealed + Unpin + Send + Sync { /// Clones this channel as a boxed trait object. fn clone_channel(&self) -> Box>; - /// Use this message channel as [a futures `Sink`](https://docs.rs/futures/0.3/futures/io/struct.Sink.html) - /// and asynchronously send messages through it. - fn sink(&self) -> Box>; - /// Determines whether this and the other message channel address the same actor mailbox. fn eq(&self, other: &dyn MessageChannel) -> bool; @@ -161,10 +156,6 @@ pub trait StrongMessageChannel: MessageChannel { /// Clones this channel as a boxed trait object. fn clone_channel(&self) -> Box>; - - /// Use this message channel as [a futures `Sink`](https://docs.rs/futures/0.3/futures/io/struct.Sink.html) - /// and asynchronously send messages through it. - fn sink(&self) -> Box>; } /// A message channel is a channel through which you can send only one kind of message, but to @@ -185,10 +176,6 @@ pub trait WeakMessageChannel: MessageChannel { /// Clones this channel as a boxed trait object. fn clone_channel(&self) -> Box>; - - /// Use this message channel as [a futures `Sink`](https://docs.rs/futures/0.3/futures/io/struct.Sink.html) - /// and asynchronously send messages through it. - fn sink(&self) -> Box>; } impl MessageChannel for Address @@ -218,8 +205,8 @@ where if self.is_connected() { let (envelope, rx) = ReturningEnvelope::::new(message); let sending = self - .sender - .clone() + .sink + .sender() .into_send_async(AddressMessage::Message(Box::new(envelope))); #[allow(clippy::async_yields_async)] // We only want to await the sending. @@ -245,13 +232,6 @@ where Box::new(self.clone()) } - fn sink(&self) -> Box> { - Box::new(AddressSink { - sink: self.sender.clone().into_sink(), - ref_counter: self.ref_counter.clone(), - }) - } - fn eq(&self, other: &dyn MessageChannel) -> bool { other._ref_counter_eq(self.ref_counter.as_ptr()) } @@ -285,13 +265,6 @@ where fn clone_channel(&self) -> Box> { Box::new(self.clone()) } - - fn sink(&self) -> Box> { - Box::new(AddressSink { - sink: self.sender.clone().into_sink(), - ref_counter: self.ref_counter.clone(), - }) - } } impl WeakMessageChannel for WeakAddress @@ -314,11 +287,4 @@ where fn clone_channel(&self) -> Box> { Box::new(self.clone()) } - - fn sink(&self) -> Box> { - Box::new(AddressSink { - sink: self.sender.clone().into_sink(), - ref_counter: self.ref_counter.clone(), - }) - } } diff --git a/src/sink.rs b/src/sink.rs deleted file mode 100644 index fbbce09e..00000000 --- a/src/sink.rs +++ /dev/null @@ -1,221 +0,0 @@ -//! Module for the sink equivalents to [`Address`](../address/struct.Address.html) and -//! [`MessageChannel`](../message_channel/trait.MessageChannel.html). - -use std::pin::Pin; -use std::task::{Context, Poll}; - -use flume::r#async::SendSink; -use futures_sink::Sink; -use futures_util::SinkExt; - -use crate::address::Disconnected; -use crate::envelope::NonReturningEnvelope; -use crate::manager::AddressMessage; -use crate::private::Sealed; -use crate::refcount::{RefCounter, Strong, Weak}; -use crate::{Actor, Handler}; - -/// An `AddressSink` is the [futures `Sink`](https://docs.rs/futures/0.3/futures/io/struct.Sink.html) -/// returned by [`Address::into_sink`](../address/struct.Address.html#method.into_sink). Similarly to with -/// addresses, the strong variety of `AddressSink` will prevent the actor from being dropped, whereas -/// the [weak variety](struct.AddressSink.html) will not. -pub struct AddressSink { - pub(crate) sink: SendSink<'static, AddressMessage>, - pub(crate) ref_counter: Rc, -} - -impl Clone for AddressSink { - fn clone(&self) -> Self { - AddressSink { - sink: self.sink.clone(), - ref_counter: self.ref_counter.clone(), - } - } -} - -/// This variety of `AddressSink` will not prevent the actor from being dropped. -pub type WeakAddressSink = AddressSink; - -impl AddressSink { - /// Returns whether the actor referred to by this address sink is running and accepting messages. - pub fn is_connected(&self) -> bool { - self.ref_counter.is_connected() - } -} - -impl AddressSink { - /// Create a weak address sink. Unlike with the strong variety of address sink (this kind), - /// an actor will not be prevented from being dropped if only weak sinks, channels, and - /// addresses exist. - pub fn downgrade(&self) -> WeakAddressSink { - AddressSink { - sink: self.sink.clone(), - ref_counter: self.ref_counter.downgrade(), - } - } -} - -impl Drop for AddressSink { - fn drop(&mut self) { - // We should notify the ActorManager that there are no more strong Addresses and the actor - // should be stopped. - if self.ref_counter.is_last_strong() { - let _ = pollster::block_on(self.sink.send(AddressMessage::LastAddress)); - } - } -} - -impl Sink for AddressSink -where - A: Handler, - M: Send + 'static, -{ - type Error = Disconnected; - - fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.sink) - .poll_ready(cx) - .map_err(|_| Disconnected) - } - - fn start_send(mut self: Pin<&mut Self>, item: M) -> Result<(), Self::Error> { - let item = AddressMessage::Message(Box::new(NonReturningEnvelope::new(item))); - Pin::new(&mut self.sink) - .start_send(item) - .map_err(|_| Disconnected) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.sink) - .poll_flush(cx) - .map_err(|_| Disconnected) - } - - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.sink) - .poll_close(cx) - .map_err(|_| Disconnected) - } -} - -/// A `MessageSink` is similar to a [`MessageChannel`](../message_channel/trait.MessageChannel.html), -/// but it is a sink and operates asynchronously. -pub trait MessageSink: Sealed + Sink + Unpin + Send { - /// Returns whether the actor referred to by this message sink is running and accepting messages. - fn is_connected(&self) -> bool; - - /// Returns the number of messages in the actor's mailbox. Note that this does **not** - /// differentiate between types of messages; it will return the count of all messages in the - /// actor's mailbox, not only the messages sent by this message channel type. - fn len(&self) -> usize; - - /// The total capacity of the actor's mailbox. Note that this does **not** differentiate between - /// types of messages; it will return the total capacity of actor's mailbox, not only the - /// messages sent by this message channel type - fn capacity(&self) -> Option; - - /// Returns whether the actor's mailbox is empty. - fn is_empty(&self) -> bool { - self.len() == 0 - } - - /// Clones this message sink as a boxed trait object. - fn clone_message_sink(&self) -> Box>; -} - -/// A `WeakMessageSink` is a [`MessageSink`](trait.MessageSink.html) which does not inhibit the actor -/// from being dropped while it exists. -pub trait WeakMessageSink: MessageSink { - /// Upcasts this weak message sink into a boxed generic - /// [`MessageSink`](trait.MessageSink.html) trait object - fn upcast(self) -> Box>; - - /// Upcasts this weak message sink into a reference to the generic - /// [`MessageSink`](trait.MessageSink.html) trait object - fn upcast_ref(&self) -> &dyn MessageSink; - - /// Clones this message sink as a boxed trait object. - fn clone_message_sink(&self) -> Box>; -} - -/// A `StrongMessageSink` is a [`MessageSink`](trait.MessageSink.html) which does inhibit the actor -/// from being dropped while it exists. -pub trait StrongMessageSink: MessageSink { - /// Create a weak message sink. Unlike with the strong variety of message sink (this kind), - /// an actor will not be prevented from being dropped if only weak sinks, channels, and - /// addresses exist. - fn downgrade(self) -> Box>; - - /// Upcasts this strong message sink into a boxed generic - /// [`MessageSink`](trait.MessageSink.html) trait object - fn upcast(self) -> Box>; - - /// Upcasts this strong message sink into a reference to the generic - /// [`MessageSink`](trait.MessageSink.html) trait object - fn upcast_ref(&self) -> &dyn MessageSink; - - /// Clones this message sink as a boxed trait object. - fn clone_message_sink(&self) -> Box>; -} - -impl MessageSink for AddressSink -where - A: Handler, - M: Send + 'static, -{ - fn is_connected(&self) -> bool { - self.ref_counter.is_connected() - } - - fn len(&self) -> usize { - self.sink.len() - } - - fn capacity(&self) -> Option { - self.sink.capacity() - } - - fn clone_message_sink(&self) -> Box> { - Box::new(self.clone()) - } -} - -impl StrongMessageSink for AddressSink -where - A: Handler, - M: Send + 'static, -{ - fn downgrade(self) -> Box> { - Box::new(AddressSink::downgrade(&self)) - } - - fn upcast(self) -> Box> { - Box::new(self) - } - - fn upcast_ref(&self) -> &dyn MessageSink { - self - } - - fn clone_message_sink(&self) -> Box> { - Box::new(self.clone()) - } -} - -impl WeakMessageSink for AddressSink -where - A: Handler, - M: Send + 'static, -{ - fn upcast(self) -> Box> { - Box::new(self) - } - - fn upcast_ref(&self) -> &dyn MessageSink { - self - } - - fn clone_message_sink(&self) -> Box> { - Box::new(self.clone()) - } -}