diff --git a/Cargo.toml b/Cargo.toml index 66e45cbf5..97f8cec63 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ rand = "0.3" serde = "0.9" uuid = {version = "0.4", features = ["v4"]} fnv = "1.0.3" +futures = "0.1" [target.'cfg(any(target_os = "linux", target_os = "freebsd"))'.dependencies] mio = "0.6.1" diff --git a/src/ipc.rs b/src/ipc.rs index b82435bf6..d08eb4fa2 100644 --- a/src/ipc.rs +++ b/src/ipc.rs @@ -10,12 +10,13 @@ use platform::{self, OsIpcChannel, OsIpcReceiver, OsIpcReceiverSet, OsIpcSender}; use platform::{OsIpcOneShotServer, OsIpcSelectionResult, OsIpcSharedMemory, OsOpaqueIpcChannel}; +use futures::{Stream, Sink, Poll, Async, StartSend}; use bincode; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::cell::RefCell; use std::cmp::min; use std::fmt::{self, Debug, Formatter}; -use std::io::Error; +use std::io::{Error, ErrorKind}; use std::marker::PhantomData; use std::mem; use std::ops::Deref; @@ -86,6 +87,24 @@ impl IpcReceiver where T: Deserialize + Serialize { } } +impl Stream for IpcReceiver where T: Deserialize + Serialize { + type Item = T; + type Error = bincode::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + match self.try_recv() { + Ok(msg) => Ok(Async::Ready(Some(msg))), + Err(e) => match *e { + bincode::ErrorKind::IoError(ref e) if e.kind() == ErrorKind::ConnectionReset => + Ok(Async::Ready(None)), + bincode::ErrorKind::IoError(ref e) if e.kind() == ErrorKind::WouldBlock => + Ok(Async::NotReady), + _ => Err(e), + }, + } + } +} + impl Deserialize for IpcReceiver where T: Deserialize + Serialize { fn deserialize(deserializer: D) -> Result where D: Deserializer { let index: usize = try!(Deserialize::deserialize(deserializer)); @@ -173,6 +192,19 @@ impl IpcSender where T: Serialize { } } +impl Sink for IpcSender where T: Serialize { + type SinkItem = T; + type SinkError = bincode::Error; + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend { + unimplemented!(); + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + unimplemented!(); + } +} + impl Deserialize for IpcSender where T: Serialize { fn deserialize(deserializer: D) -> Result where D: Deserializer { let os_sender = try!(deserialize_os_ipc_sender(deserializer)); diff --git a/src/lib.rs b/src/lib.rs index 58a7f2d34..901dcd799 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -30,6 +30,7 @@ extern crate fnv; target_os="linux"))] #[macro_use] extern crate syscall; +extern crate futures; pub mod ipc;