Skip to content

Commit

Permalink
Addresses are sinks POC (based on flume changes)
Browse files Browse the repository at this point in the history
  • Loading branch information
Restioson committed Jun 9, 2022
1 parent 559e471 commit e1a6f57
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 290 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ categories = ["asynchronous", "concurrency"]
async-trait = "0.1.36"
barrage = "0.2.1"
catty = "0.1.4"
flume = { version = "0.10.9", default-features = false, features = ["async"] }
flume = { git = "https://github.com/Restioson/flume", branch = "sink_to_sender", default-features = false, features = ["async"] }
futures-core = { version = "0.3.5", default-features = false, features = ["alloc"] }
futures-sink = { version = "0.3.5", default-features = false }
futures-util = { version = "0.3.5", default-features = false, features = ["sink"] }
Expand Down
79 changes: 53 additions & 26 deletions src/address.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
//! An address to an actor is a way to send it a message. An address allows an actor to be sent any
//! kind of message that it can receive.
use flume::r#async::SendSink;
use futures_core::Stream;
use futures_sink::Sink;
use futures_util::{future, StreamExt};
use std::fmt::{self, Display, Formatter};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{cmp::Ordering, error::Error, hash::Hash};

use flume::Sender;
use futures_core::Stream;
use futures_util::{future, StreamExt};

use crate::envelope::ReturningEnvelope;
use crate::envelope::{NonReturningEnvelope, ReturningEnvelope};
use crate::manager::AddressMessage;
use crate::refcount::{Either, RefCounter, Strong, Weak};
use crate::send_future::{NameableSending, ResolveToHandlerReturn, SendFuture};
use crate::sink::AddressSink;
use crate::{Handler, KeepRunning};

/// The actor is no longer running and disconnected from the sending address. For why this could
Expand All @@ -39,8 +40,8 @@ impl Error for Disconnected {}
/// should be used instead. An address is created by calling the
/// [`Actor::create`](../trait.Actor.html#method.create) or
/// [`Context::run`](../struct.Context.html#method.run) methods, or by cloning another `Address`.
pub struct Address<A, Rc: RefCounter = Strong> {
pub(crate) sender: Sender<AddressMessage<A>>,
pub struct Address<A: 'static, Rc: RefCounter = Strong> {
pub(crate) sink: SendSink<'static, AddressMessage<A>>,
pub(crate) ref_counter: Rc,
}

Expand All @@ -57,7 +58,7 @@ impl<A> Address<A, Strong> {
/// addresses exist.
pub fn downgrade(&self) -> WeakAddress<A> {
WeakAddress {
sender: self.sender.clone(),
sink: self.sink.clone(),
ref_counter: self.ref_counter.downgrade(),
}
}
Expand All @@ -68,7 +69,7 @@ impl<A> Address<A, Either> {
/// Converts this address into a weak address.
pub fn downgrade(&self) -> WeakAddress<A> {
WeakAddress {
sender: self.sender.clone(),
sink: self.sink.clone(),
ref_counter: self.ref_counter.clone().into_weak(),
}
}
Expand Down Expand Up @@ -104,17 +105,17 @@ impl<A, Rc: RefCounter> Address<A, Rc> {
/// })
/// ```
pub fn is_connected(&self) -> bool {
!self.sender.is_disconnected()
!self.sink.is_disconnected()
}

/// Returns the number of messages in the actor's mailbox.
pub fn len(&self) -> usize {
self.sender.len()
self.sink.len()
}

/// The total capacity of the actor's mailbox.
pub fn capacity(&self) -> Option<usize> {
self.sender.capacity()
self.sink.capacity()
}

/// Returns whether the actor's mailbox is empty.
Expand All @@ -125,8 +126,8 @@ impl<A, Rc: RefCounter> Address<A, Rc> {
/// Convert this address into a generic address which can be weak or strong.
pub fn as_either(&self) -> Address<A, Either> {
Address {
sink: self.sink.clone(),
ref_counter: self.ref_counter.clone().into_either(),
sender: self.sender.clone(),
}
}

Expand All @@ -152,8 +153,8 @@ impl<A, Rc: RefCounter> Address<A, Rc> {
if self.is_connected() {
let (envelope, rx) = ReturningEnvelope::<A, M, <A as Handler<M>>::Return>::new(message);
let tx = self
.sender
.clone()
.sink
.sender()
.into_send_async(AddressMessage::Message(Box::new(envelope)));

SendFuture::sending_named(tx, rx)
Expand Down Expand Up @@ -196,25 +197,51 @@ impl<A, Rc: RefCounter> Address<A, Rc> {
}
}

/// Converts this address into a [futures `Sink`](https://docs.rs/futures/0.3/futures/io/struct.Sink.html).
pub fn into_sink(self) -> AddressSink<A, Rc> {
AddressSink {
sink: self.sender.clone().into_sink(),
ref_counter: self.ref_counter.clone(),
}
}

/// Waits until this address becomes disconnected.
pub fn join(&self) -> impl Future<Output = ()> + Send + Unpin {
self.ref_counter.disconnect_notice()
}
}

impl<A, M, Rc> Sink<M> for Address<A, Rc>
where
A: Handler<M>,
M: Send + 'static,
Rc: RefCounter,
{
type Error = Disconnected;

fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.sink)
.poll_ready(cx)
.map_err(|_| Disconnected)
}

fn start_send(mut self: Pin<&mut Self>, item: M) -> Result<(), Self::Error> {
let item = AddressMessage::Message(Box::new(NonReturningEnvelope::new(item)));
Pin::new(&mut self.sink)
.start_send(item)
.map_err(|_| Disconnected)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.sink)
.poll_flush(cx)
.map_err(|_| Disconnected)
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.sink)
.poll_close(cx)
.map_err(|_| Disconnected)
}
}

// Required because #[derive] adds an A: Clone bound
impl<A, Rc: RefCounter> Clone for Address<A, Rc> {
fn clone(&self) -> Self {
Address {
sender: self.sender.clone(),
sink: self.sink.clone(),
ref_counter: self.ref_counter.clone(),
}
}
Expand All @@ -226,7 +253,7 @@ impl<A, Rc: RefCounter> Drop for Address<A, Rc> {
// We should notify the ActorManager that there are no more strong Addresses and the actor
// should be stopped.
if self.ref_counter.is_last_strong() {
let _ = self.sender.send(AddressMessage::LastAddress);
let _ = self.sink.sender().send(AddressMessage::LastAddress);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl<A: Actor> Context<A> {
let weak = strong.downgrade();

let addr = Address {
sender: sender.clone(),
sink: sender.clone().into_sink(),
ref_counter: strong,
};

Expand Down Expand Up @@ -136,7 +136,7 @@ impl<A: Actor> Context<A> {
/// Get an address to the current actor if there are still external addresses to the actor.
pub fn address(&self) -> Result<Address<A>, ActorShutdown> {
Ok(Address {
sender: self.sender.clone(),
sink: self.sender.clone().into_sink(),
ref_counter: self.ref_counter.upgrade().ok_or(ActorShutdown)?,
})
}
Expand Down
3 changes: 0 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ mod receiver;
/// influences whether the address will keep the actor alive for as long as it lives.
pub mod refcount;
mod send_future;
pub mod sink;
/// This module contains a trait to spawn actors, implemented for all major async runtimes by default.
pub mod spawn;
#[cfg(feature = "with-tracing-0_1")]
Expand Down Expand Up @@ -251,7 +250,6 @@ impl From<()> for KeepRunning {

mod private {
use crate::refcount::{Either, RefCounter, Strong, Weak};
use crate::sink::AddressSink;
use crate::{Actor, Address};

pub trait Sealed {}
Expand All @@ -260,5 +258,4 @@ mod private {
impl Sealed for Weak {}
impl Sealed for Either {}
impl<A: Actor, Rc: RefCounter> Sealed for Address<A, Rc> {}
impl<A: Actor, Rc: RefCounter> Sealed for AddressSink<A, Rc> {}
}
40 changes: 3 additions & 37 deletions src/message_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use crate::private::Sealed;
use crate::receiver::Receiver;
use crate::refcount::{RefCounter, Shared, Strong};
use crate::send_future::{ResolveToHandlerReturn, SendFuture};
use crate::sink::{AddressSink, MessageSink, StrongMessageSink, WeakMessageSink};
use crate::{Handler, KeepRunning};

/// A message channel is a channel through which you can send only one kind of message, but to
Expand Down Expand Up @@ -77,7 +76,7 @@ use crate::{Handler, KeepRunning};
/// })
/// }
/// ```
pub trait MessageChannel<M>: Sealed + Unpin + Send + Sync {
pub trait MessageChannel<M>: Sealed + Unpin + Send {
/// The return value of the handler for `M`.
type Return: Send + 'static;

Expand Down Expand Up @@ -127,10 +126,6 @@ pub trait MessageChannel<M>: Sealed + Unpin + Send + Sync {
/// Clones this channel as a boxed trait object.
fn clone_channel(&self) -> Box<dyn MessageChannel<M, Return = Self::Return>>;

/// Use this message channel as [a futures `Sink`](https://docs.rs/futures/0.3/futures/io/struct.Sink.html)
/// and asynchronously send messages through it.
fn sink(&self) -> Box<dyn MessageSink<M>>;

/// Determines whether this and the other message channel address the same actor mailbox.
fn eq(&self, other: &dyn MessageChannel<M, Return = Self::Return>) -> bool;

Expand Down Expand Up @@ -161,10 +156,6 @@ pub trait StrongMessageChannel<M>: MessageChannel<M> {

/// Clones this channel as a boxed trait object.
fn clone_channel(&self) -> Box<dyn StrongMessageChannel<M, Return = Self::Return>>;

/// Use this message channel as [a futures `Sink`](https://docs.rs/futures/0.3/futures/io/struct.Sink.html)
/// and asynchronously send messages through it.
fn sink(&self) -> Box<dyn StrongMessageSink<M>>;
}

/// A message channel is a channel through which you can send only one kind of message, but to
Expand All @@ -185,10 +176,6 @@ pub trait WeakMessageChannel<M>: MessageChannel<M> {

/// Clones this channel as a boxed trait object.
fn clone_channel(&self) -> Box<dyn WeakMessageChannel<M, Return = Self::Return>>;

/// Use this message channel as [a futures `Sink`](https://docs.rs/futures/0.3/futures/io/struct.Sink.html)
/// and asynchronously send messages through it.
fn sink(&self) -> Box<dyn WeakMessageSink<M>>;
}

impl<A, R, M, Rc: RefCounter> MessageChannel<M> for Address<A, Rc>
Expand Down Expand Up @@ -218,8 +205,8 @@ where
if self.is_connected() {
let (envelope, rx) = ReturningEnvelope::<A, M, R>::new(message);
let sending = self
.sender
.clone()
.sink
.sender()
.into_send_async(AddressMessage::Message(Box::new(envelope)));

#[allow(clippy::async_yields_async)] // We only want to await the sending.
Expand All @@ -245,13 +232,6 @@ where
Box::new(self.clone())
}

fn sink(&self) -> Box<dyn MessageSink<M>> {
Box::new(AddressSink {
sink: self.sender.clone().into_sink(),
ref_counter: self.ref_counter.clone(),
})
}

fn eq(&self, other: &dyn MessageChannel<M, Return = Self::Return>) -> bool {
other._ref_counter_eq(self.ref_counter.as_ptr())
}
Expand Down Expand Up @@ -285,13 +265,6 @@ where
fn clone_channel(&self) -> Box<dyn StrongMessageChannel<M, Return = Self::Return>> {
Box::new(self.clone())
}

fn sink(&self) -> Box<dyn StrongMessageSink<M>> {
Box::new(AddressSink {
sink: self.sender.clone().into_sink(),
ref_counter: self.ref_counter.clone(),
})
}
}

impl<A, M> WeakMessageChannel<M> for WeakAddress<A>
Expand All @@ -314,11 +287,4 @@ where
fn clone_channel(&self) -> Box<dyn WeakMessageChannel<M, Return = Self::Return>> {
Box::new(self.clone())
}

fn sink(&self) -> Box<dyn WeakMessageSink<M>> {
Box::new(AddressSink {
sink: self.sender.clone().into_sink(),
ref_counter: self.ref_counter.clone(),
})
}
}
Loading

0 comments on commit e1a6f57

Please sign in to comment.