diff --git a/kafkaesque/src/bin/capture_recv.rs b/kafkaesque/src/bin/capture_recv.rs index 862d44cbf6..d3573b59ca 100644 --- a/kafkaesque/src/bin/capture_recv.rs +++ b/kafkaesque/src/bin/capture_recv.rs @@ -3,7 +3,6 @@ use timely::dataflow::operators::capture::Replay; use timely::dataflow::operators::Accumulate; use rdkafka::config::ClientConfig; - use kafkaesque::EventConsumer; fn main() { @@ -31,7 +30,7 @@ fn main() { .filter(|i| i % worker.peers() == worker.index()) .map(|i| { let topic = format!("{}-{:?}", topic, i); - EventConsumer::<_,u64>::new(consumer_config.clone(), topic) + EventConsumer::<_, Vec>::new(consumer_config.clone(), topic) }) .collect::>(); diff --git a/kafkaesque/src/bin/capture_send.rs b/kafkaesque/src/bin/capture_send.rs index ea25025076..346ff59077 100644 --- a/kafkaesque/src/bin/capture_send.rs +++ b/kafkaesque/src/bin/capture_send.rs @@ -2,7 +2,6 @@ use timely::dataflow::operators::ToStream; use timely::dataflow::operators::capture::Capture; use rdkafka::config::ClientConfig; - use kafkaesque::EventProducer; fn main() { @@ -20,7 +19,7 @@ fn main() { .set("bootstrap.servers", brokers); let topic = format!("{}-{:?}", topic, worker.index()); - let producer = EventProducer::new(producer_config, topic); + let producer = EventProducer::>::new(producer_config, topic); worker.dataflow::(|scope| (0 .. count) diff --git a/kafkaesque/src/kafka_source.rs b/kafkaesque/src/kafka_source.rs index 0167913ddc..1fa9af23b2 100644 --- a/kafkaesque/src/kafka_source.rs +++ b/kafkaesque/src/kafka_source.rs @@ -1,11 +1,10 @@ use timely::Data; use timely::dataflow::{Scope, Stream}; use timely::dataflow::operators::Capability; -use timely::dataflow::operators::generic::OutputHandle; -use timely::dataflow::channels::pushers::Tee; - use rdkafka::Message; use rdkafka::consumer::{ConsumerContext, BaseConsumer}; +use timely::dataflow::channels::pushers::Tee; +use timely::dataflow::operators::generic::OutputHandle; /// Constructs a stream of data from a Kafka consumer. /// @@ -89,14 +88,14 @@ pub fn kafka_source( name: &str, consumer: BaseConsumer, logic: L -) -> Stream +) -> Stream> where C: ConsumerContext+'static, G: Scope, D: Data, L: Fn(&[u8], &mut Capability, - &mut OutputHandle>) -> bool+'static, + &mut OutputHandle, Tee>>) -> bool+'static, { use timely::dataflow::operators::generic::source; source(scope, name, move |capability, info| { @@ -135,4 +134,4 @@ where } }) -} \ No newline at end of file +} diff --git a/kafkaesque/src/lib.rs b/kafkaesque/src/lib.rs index ba91061e6e..9ecaaa7b6f 100644 --- a/kafkaesque/src/lib.rs +++ b/kafkaesque/src/lib.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use std::sync::atomic::{AtomicIsize, Ordering}; use abomonation::Abomonation; -use timely::dataflow::operators::capture::event::{EventCore, EventPusherCore, EventIteratorCore}; +use timely::dataflow::operators::capture::event::{Event, EventPusher, EventIterator}; use rdkafka::Message; use rdkafka::client::ClientContext; @@ -37,7 +37,7 @@ impl OutstandingCounterContext { } /// A wrapper for `W: Write` implementing `EventPusher`. -pub struct EventProducerCore { +pub struct EventProducer { topic: String, buffer: Vec, producer: BaseProducer, @@ -45,10 +45,7 @@ pub struct EventProducerCore { phant: ::std::marker::PhantomData<(T,D)>, } -/// [EventProducerCore] specialized to vector-based containers. -pub type EventProducer = EventProducerCore>; - -impl EventProducerCore { +impl EventProducer { /// Allocates a new `EventWriter` wrapping a supplied writer. pub fn new(config: ClientConfig, topic: String) -> Self { let counter = Arc::new(AtomicIsize::new(0)); @@ -65,8 +62,8 @@ impl EventProducerCore { } } -impl EventPusherCore for EventProducerCore { - fn push(&mut self, event: EventCore) { +impl EventPusher for EventProducer { + fn push(&mut self, event: Event) { unsafe { ::abomonation::encode(&event, &mut self.buffer).expect("Encode failure"); } // println!("sending {:?} bytes", self.buffer.len()); self.producer.send::<(),[u8]>(BaseRecord::to(self.topic.as_str()).payload(&self.buffer[..])).unwrap(); @@ -76,7 +73,7 @@ impl EventPusherCore for EventProducerCore } } -impl Drop for EventProducerCore { +impl Drop for EventProducer { fn drop(&mut self) { while self.counter.load(Ordering::SeqCst) > 0 { self.producer.poll(std::time::Duration::from_millis(10)); @@ -85,16 +82,13 @@ impl Drop for EventProducerCore { } /// A Wrapper for `R: Read` implementing `EventIterator`. -pub struct EventConsumerCore { +pub struct EventConsumer { consumer: BaseConsumer, buffer: Vec, phant: ::std::marker::PhantomData<(T,D)>, } -/// [EventConsumerCore] specialized to vector-based containers. -pub type EventConsumer = EventConsumerCore>; - -impl EventConsumerCore { +impl EventConsumer { /// Allocates a new `EventReader` wrapping a supplied reader. pub fn new(config: ClientConfig, topic: String) -> Self { println!("allocating consumer for topic {:?}", topic); @@ -108,14 +102,14 @@ impl EventConsumerCore { } } -impl EventIteratorCore for EventConsumerCore { - fn next(&mut self) -> Option<&EventCore> { +impl EventIterator for EventConsumer { + fn next(&mut self) -> Option<&Event> { if let Some(result) = self.consumer.poll(std::time::Duration::from_millis(0)) { match result { Ok(message) => { self.buffer.clear(); self.buffer.extend_from_slice(message.payload().unwrap()); - Some(unsafe { ::abomonation::decode::>(&mut self.buffer[..]).unwrap().0 }) + Some(unsafe { ::abomonation::decode::>(&mut self.buffer[..]).unwrap().0 }) }, Err(err) => { println!("KafkaConsumer error: {:?}", err); diff --git a/mdbook/src/chapter_2/chapter_2_1.md b/mdbook/src/chapter_2/chapter_2_1.md index 8f8fc6b949..aae2eac91c 100644 --- a/mdbook/src/chapter_2/chapter_2_1.md +++ b/mdbook/src/chapter_2/chapter_2_1.md @@ -16,7 +16,7 @@ fn main() { // initializes and runs a timely dataflow. timely::execute_from_args(std::env::args(), |worker| { - let mut input = InputHandle::<(), String>::new(); + let mut input = InputHandle::<(), Vec>::new(); // define a new dataflow worker.dataflow(|scope| { diff --git a/mdbook/src/chapter_4/chapter_4_4.md b/mdbook/src/chapter_4/chapter_4_4.md index 90e27bc62a..163f3906b4 100644 --- a/mdbook/src/chapter_4/chapter_4_4.md +++ b/mdbook/src/chapter_4/chapter_4_4.md @@ -52,7 +52,7 @@ One nice aspect of `capture_into` is that it really does reveal everything that At *its* core, `replay_into` takes some sequence of `Event` items and reproduces the stream, as it was recorded. It is also fairly simple, and we can just look at its implementation as well: ```rust,ignore - fn replay_into>(self, scope: &mut S) -> Stream{ + fn replay_into>(self, scope: &mut S) -> Stream>{ let mut builder = OperatorBuilder::new("Replay".to_owned(), scope.clone()); let (targets, stream) = builder.new_output(); diff --git a/mdbook/src/chapter_5/chapter_5_3.md b/mdbook/src/chapter_5/chapter_5_3.md index 5cd38763eb..bee026fbf4 100644 --- a/mdbook/src/chapter_5/chapter_5_3.md +++ b/mdbook/src/chapter_5/chapter_5_3.md @@ -8,15 +8,10 @@ Many parts of Timely assume that data is organized into `Vec`, i.e., batches This abstractions works well for many cases but precludes some advanced techniques, such as transferring translated or columnar data between operators. With the container abstraction, Timely specifies a minimal interface it requires tracking progress and provide data to operators. -## Core operators - -In Timely, we provide a set of `Core` operators that are generic on the container type they can handle. -In most cases, the `Core` operators are a immediate generalization of their non-core variant, providing the semantically equivalent functionality. - ## Limitations A challenge when genericizing Timely operators is that all interfaces need to be declared independent of a concrete type, for example as part of a trait. -For this reason, Timely doesn't currently support operators that require knowledge of the elements of a container or how to partition a container, with the only exception being the `Vec` type. +For this reason, Timely doesn't currently support operators that require knowledge of the elements of a container, with the only exception being the `Vec` type. ## A custom container diff --git a/timely/examples/barrier.rs b/timely/examples/barrier.rs index ff7f382838..f1d4c2a48f 100644 --- a/timely/examples/barrier.rs +++ b/timely/examples/barrier.rs @@ -11,7 +11,7 @@ fn main() { timely::execute_from_args(std::env::args().skip(2), move |worker| { worker.dataflow(move |scope| { - let (handle, stream) = scope.feedback::(1); + let (handle, stream) = scope.feedback::>(1); stream.unary_notify( Pipeline, "Barrier", diff --git a/timely/examples/bfs.rs b/timely/examples/bfs.rs index 0dc2d69876..2f1bcaddda 100644 --- a/timely/examples/bfs.rs +++ b/timely/examples/bfs.rs @@ -44,7 +44,7 @@ fn main() { .to_stream(scope); // define a loop variable, for the (node, worker) pairs. - let (handle, stream) = scope.feedback(1usize); + let (handle, stream) = scope.feedback::>(1usize); // use the stream of edges graph.binary_notify( @@ -58,7 +58,7 @@ fn main() { // receive edges, start to sort them input1.for_each(|time, data| { notify.notify_at(time.retain()); - edge_list.push(data.replace(Vec::new())); + edge_list.push(data.take()); }); // receive (node, worker) pairs, note any new ones. @@ -68,7 +68,7 @@ fn main() { notify.notify_at(time.retain()); Vec::new() }) - .push(data.replace(Vec::new())); + .push(data.take()); }); notify.for_each(|time, _num, _notify| { diff --git a/timely/examples/capture_recv.rs b/timely/examples/capture_recv.rs index d6f7291c11..bf616a351f 100644 --- a/timely/examples/capture_recv.rs +++ b/timely/examples/capture_recv.rs @@ -17,7 +17,7 @@ fn main() { .collect::>() .into_iter() .map(|l| l.incoming().next().unwrap().unwrap()) - .map(|r| EventReader::<_,u64,_>::new(r)) + .map(|r| EventReader::<_, Vec, _>::new(r)) .collect::>(); worker.dataflow::(|scope| { diff --git a/timely/examples/distinct.rs b/timely/examples/distinct.rs index 28971384a6..0336b63101 100644 --- a/timely/examples/distinct.rs +++ b/timely/examples/distinct.rs @@ -11,7 +11,7 @@ fn main() { // initializes and runs a timely dataflow. timely::execute_from_args(std::env::args(), |worker| { let index = worker.index(); - let mut input = InputHandle::new(); + let mut input = InputHandle::<_, Vec<_>>::new(); let mut probe = ProbeHandle::new(); // create a new input, exchange data, and inspect its output diff --git a/timely/examples/hashjoin.rs b/timely/examples/hashjoin.rs index ad8ef8809a..09e5775de4 100644 --- a/timely/examples/hashjoin.rs +++ b/timely/examples/hashjoin.rs @@ -22,8 +22,8 @@ fn main() { let index = worker.index(); let peers = worker.peers(); - let mut input1 = InputHandle::new(); - let mut input2 = InputHandle::new(); + let mut input1 = InputHandle::<_, Vec<_>>::new(); + let mut input2 = InputHandle::<_, Vec<_>>::new(); let mut probe = ProbeHandle::new(); worker.dataflow(|scope| { diff --git a/timely/examples/logging-recv.rs b/timely/examples/logging-recv.rs index 1c2a5ccda8..9d23177aa2 100644 --- a/timely/examples/logging-recv.rs +++ b/timely/examples/logging-recv.rs @@ -20,7 +20,7 @@ fn main() { .collect::>() .into_iter() .map(|l| l.incoming().next().unwrap().unwrap()) - .map(|r| EventReader::::new(r)) + .map(|r| EventReader::, _>::new(r)) .collect::>(); worker.dataflow(|scope| { diff --git a/timely/examples/pagerank.rs b/timely/examples/pagerank.rs index 0a2da23517..2fd239b2a2 100644 --- a/timely/examples/pagerank.rs +++ b/timely/examples/pagerank.rs @@ -13,7 +13,7 @@ fn main() { timely::execute_from_args(std::env::args().skip(3), move |worker| { - let mut input = InputHandle::new(); + let mut input = InputHandle::<_, Vec<_>>::new(); let mut probe = ProbeHandle::new(); worker.dataflow::(|scope| { @@ -22,7 +22,7 @@ fn main() { let edge_stream = input.to_stream(scope); // create a new feedback stream, which will be changes to ranks. - let (handle, rank_stream) = scope.feedback(1); + let (handle, rank_stream) = scope.feedback::>(1); // bring edges and ranks together! let changes = edge_stream.binary_frontier( diff --git a/timely/examples/unionfind.rs b/timely/examples/unionfind.rs index ab3cfa8144..3e425cffe5 100644 --- a/timely/examples/unionfind.rs +++ b/timely/examples/unionfind.rs @@ -54,8 +54,8 @@ trait UnionFind { fn union_find(&self) -> Self; } -impl UnionFind for Stream { - fn union_find(&self) -> Stream { +impl UnionFind for Stream> { + fn union_find(&self) -> Stream> { self.unary(Pipeline, "UnionFind", |_,_| { diff --git a/timely/src/dataflow/channels/mod.rs b/timely/src/dataflow/channels/mod.rs index 84303a0391..5bced98cd1 100644 --- a/timely/src/dataflow/channels/mod.rs +++ b/timely/src/dataflow/channels/mod.rs @@ -11,10 +11,7 @@ pub mod pullers; pub mod pact; /// The input to and output from timely dataflow communication channels. -pub type BundleCore = crate::communication::Message>; - -/// The input to and output from timely dataflow communication channels specialized to vectors. -pub type Bundle = BundleCore>; +pub type Bundle = crate::communication::Message>; /// A serializable representation of timestamped data. #[derive(Clone, Abomonation, Serialize, Deserialize)] @@ -46,11 +43,11 @@ impl Message { /// Forms a message, and pushes contents at `pusher`. Replaces `buffer` with what the pusher /// leaves in place, or the container's default element. #[inline] - pub fn push_at>>(buffer: &mut D, time: T, pusher: &mut P) { + pub fn push_at>>(buffer: &mut D, time: T, pusher: &mut P) { let data = ::std::mem::take(buffer); let message = Message::new(time, data, 0, 0); - let mut bundle = Some(BundleCore::from_typed(message)); + let mut bundle = Some(Bundle::from_typed(message)); pusher.push(&mut bundle); diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index 976023a19f..708c2a05ac 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -16,33 +16,28 @@ use crate::Container; use crate::worker::AsWorker; use crate::dataflow::channels::pushers::Exchange as ExchangePusher; -use super::{BundleCore, Message}; +use super::{Bundle, Message}; use crate::logging::{TimelyLogger as Logger, MessagesEvent}; use crate::progress::Timestamp; -/// A `ParallelizationContractCore` allocates paired `Push` and `Pull` implementors. -pub trait ParallelizationContractCore { +/// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors. +pub trait ParallelizationContract { /// Type implementing `Push` produced by this pact. - type Pusher: Push>+'static; + type Pusher: Push>+'static; /// Type implementing `Pull` produced by this pact. - type Puller: Pull>+'static; + type Puller: Pull>+'static; /// Allocates a matched pair of push and pull endpoints implementing the pact. fn connect(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option) -> (Self::Pusher, Self::Puller); } -/// A `ParallelizationContractCore` specialized for `Vec` containers -/// TODO: Use trait aliases once stable. -pub trait ParallelizationContract: ParallelizationContractCore> { } -impl>> ParallelizationContract for P { } - /// A direct connection #[derive(Debug)] pub struct Pipeline; -impl ParallelizationContractCore for Pipeline { - type Pusher = LogPusher>>; - type Puller = LogPuller>>; +impl ParallelizationContract for Pipeline { + type Pusher = LogPusher>>; + type Puller = LogPuller>>; fn connect(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option) -> (Self::Pusher, Self::Puller) { let (pusher, puller) = allocator.pipeline::>(identifier, address); // // ignore `&mut A` and use thread allocator @@ -53,15 +48,12 @@ impl ParallelizationContractCore for Pipeline { } /// An exchange between multiple observers by data -pub struct ExchangeCore { hash_func: F, phantom: PhantomData<(C, D)> } - -/// [ExchangeCore] specialized to vector-based containers. -pub type Exchange = ExchangeCore, D, F>; +pub struct Exchange { hash_func: F, phantom: PhantomData<(C, D)> } -implu64+'static> ExchangeCore { +implu64+'static> Exchange { /// Allocates a new `Exchange` pact from a distribution function. - pub fn new(func: F) -> ExchangeCore { - ExchangeCore { + pub fn new(func: F) -> Exchange { + Self { hash_func: func, phantom: PhantomData, } @@ -69,12 +61,12 @@ implu64+'static> ExchangeCore { } // Exchange uses a `Box` because it cannot know what type of pushable will return from the allocator. -implu64+'static> ParallelizationContractCore for ExchangeCore +implu64+'static> ParallelizationContract for Exchange where C: Data + Container + PushPartitioned, { - type Pusher = ExchangePusher>>>, F>; - type Puller = LogPuller>>>; + type Pusher = ExchangePusher>>>, F>; + type Puller = LogPuller>>>; fn connect(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option) -> (Self::Pusher, Self::Puller) { let (senders, receiver) = allocator.allocate::>(identifier, address); @@ -83,7 +75,7 @@ where } } -impl Debug for ExchangeCore { +impl Debug for Exchange { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Exchange").finish() } @@ -91,7 +83,7 @@ impl Debug for ExchangeCore { /// Wraps a `Message` pusher to provide a `Push<(T, Content)>`. #[derive(Debug)] -pub struct LogPusher>> { +pub struct LogPusher>> { pusher: P, channel: usize, counter: usize, @@ -101,7 +93,7 @@ pub struct LogPusher>> { logging: Option, } -impl>> LogPusher { +impl>> LogPusher { /// Allocates a new pusher. pub fn new(pusher: P, source: usize, target: usize, channel: usize, logging: Option) -> Self { LogPusher { @@ -116,9 +108,9 @@ impl>> LogPusher { } } -impl>> Push> for LogPusher { +impl>> Push> for LogPusher { #[inline] - fn push(&mut self, pair: &mut Option>) { + fn push(&mut self, pair: &mut Option>) { if let Some(bundle) = pair { self.counter += 1; @@ -147,7 +139,7 @@ impl>> Push> for LogP /// Wraps a `Message` puller to provide a `Pull<(T, Content)>`. #[derive(Debug)] -pub struct LogPuller>> { +pub struct LogPuller>> { puller: P, channel: usize, index: usize, @@ -155,7 +147,7 @@ pub struct LogPuller>> { logging: Option, } -impl>> LogPuller { +impl>> LogPuller { /// Allocates a new `Puller`. pub fn new(puller: P, index: usize, channel: usize, logging: Option) -> Self { LogPuller { @@ -168,9 +160,9 @@ impl>> LogPuller { } } -impl>> Pull> for LogPuller { +impl>> Pull> for LogPuller { #[inline] - fn pull(&mut self) -> &mut Option> { + fn pull(&mut self) -> &mut Option> { let result = self.puller.pull(); if let Some(bundle) = result { let channel = self.channel; diff --git a/timely/src/dataflow/channels/pullers/counter.rs b/timely/src/dataflow/channels/pullers/counter.rs index 50f0c546a0..c0169312d3 100644 --- a/timely/src/dataflow/channels/pullers/counter.rs +++ b/timely/src/dataflow/channels/pullers/counter.rs @@ -3,13 +3,13 @@ use std::rc::Rc; use std::cell::RefCell; -use crate::dataflow::channels::BundleCore; +use crate::dataflow::channels::Bundle; use crate::progress::ChangeBatch; use crate::communication::Pull; use crate::Container; /// A wrapper which accounts records pulled past in a shared count map. -pub struct Counter>> { +pub struct Counter>> { pullable: P, consumed: Rc>>, phantom: ::std::marker::PhantomData, @@ -36,15 +36,15 @@ impl Drop for ConsumedGuard { } } -impl>> Counter { +impl>> Counter { /// Retrieves the next timestamp and batch of data. #[inline] - pub fn next(&mut self) -> Option<&mut BundleCore> { + pub fn next(&mut self) -> Option<&mut Bundle> { self.next_guarded().map(|(_guard, bundle)| bundle) } #[inline] - pub(crate) fn next_guarded(&mut self) -> Option<(ConsumedGuard, &mut BundleCore)> { + pub(crate) fn next_guarded(&mut self) -> Option<(ConsumedGuard, &mut Bundle)> { if let Some(message) = self.pullable.pull() { if message.data.len() > 0 { let guard = ConsumedGuard { @@ -60,7 +60,7 @@ impl>> Counter>> Counter { +impl>> Counter { /// Allocates a new `Counter` from a boxed puller. pub fn new(pullable: P) -> Self { Counter { diff --git a/timely/src/dataflow/channels/pushers/buffer.rs b/timely/src/dataflow/channels/pushers/buffer.rs index 44079f7a82..5509abd0f0 100644 --- a/timely/src/dataflow/channels/pushers/buffer.rs +++ b/timely/src/dataflow/channels/pushers/buffer.rs @@ -1,7 +1,7 @@ //! Buffering and session mechanisms to provide the appearance of record-at-a-time sending, //! with the performance of batched sends. -use crate::dataflow::channels::{Bundle, BundleCore, Message}; +use crate::dataflow::channels::{Bundle, Message}; use crate::progress::Timestamp; use crate::dataflow::operators::Capability; use crate::communication::Push; @@ -12,7 +12,7 @@ use crate::{Container, Data}; /// The `Buffer` type should be used by calling `session` with a time, which checks whether /// data must be flushed and creates a `Session` object which allows sending at the given time. #[derive(Debug)] -pub struct BufferCore>> { +pub struct Buffer>> { /// the currently open time, if it is open time: Option, /// a buffer for records, to send at self.time @@ -20,10 +20,7 @@ pub struct BufferCore>> { pusher: P, } -/// A buffer specialized to vector-based containers. -pub type Buffer = BufferCore, P>; - -impl>> BufferCore where T: Eq+Clone { +impl>> Buffer where T: Eq+Clone { /// Creates a new `Buffer`. pub fn new(pusher: P) -> Self { @@ -41,10 +38,10 @@ impl>> BufferCore where T: Eq Session { buffer: self } } /// Allocates a new `AutoflushSession` which flushes itself on drop. - pub fn autoflush_session(&mut self, cap: Capability) -> AutoflushSessionCore where T: Timestamp { + pub fn autoflush_session(&mut self, cap: Capability) -> AutoflushSession where T: Timestamp { if let Some(true) = self.time.as_ref().map(|x| x != cap.time()) { self.flush(); } self.time = Some(cap.time().clone()); - AutoflushSessionCore { + AutoflushSession { buffer: self, _capability: cap, } @@ -79,7 +76,7 @@ impl>> BufferCore where T: Eq } } -impl>> Buffer where T: Eq+Clone { +impl>>> Buffer, P> where T: Eq+Clone { // internal method for use by `Session`. #[inline] fn give(&mut self, data: D) { @@ -110,18 +107,18 @@ impl>> Buffer where T: Eq+Clone { /// The `Session` struct provides the user-facing interface to an operator output, namely /// the `Buffer` type. A `Session` wraps a session of output at a specified time, and /// avoids what would otherwise be a constant cost of checking timestamp equality. -pub struct Session<'a, T, C: Container, P: Push>+'a> where T: Eq+Clone+'a, C: 'a { - buffer: &'a mut BufferCore, +pub struct Session<'a, T, C: Container, P: Push>+'a> where T: Eq+Clone+'a, C: 'a { + buffer: &'a mut Buffer, } -impl<'a, T, C: Container, P: Push>+'a> Session<'a, T, C, P> where T: Eq+Clone+'a, C: 'a { +impl<'a, T, C: Container, P: Push>+'a> Session<'a, T, C, P> where T: Eq+Clone+'a, C: 'a { /// Provide a container at the time specified by the [Session]. pub fn give_container(&mut self, container: &mut C) { self.buffer.give_container(container) } } -impl<'a, T, D: Data, P: Push>>+'a> Session<'a, T, Vec, P> where T: Eq+Clone+'a, D: 'a { +impl<'a, T, D: Data, P: Push>>+'a> Session<'a, T, Vec, P> where T: Eq+Clone+'a, D: 'a { /// Provides one record at the time specified by the `Session`. #[inline] pub fn give(&mut self, data: D) { @@ -148,18 +145,15 @@ impl<'a, T, D: Data, P: Push>>+'a> Session<'a, T, Vec, P } /// A session which will flush itself when dropped. -pub struct AutoflushSessionCore<'a, T: Timestamp, C: Container, P: Push>+'a> where +pub struct AutoflushSession<'a, T: Timestamp, C: Container, P: Push>+'a> where T: Eq+Clone+'a, C: 'a { /// A reference to the underlying buffer. - buffer: &'a mut BufferCore, + buffer: &'a mut Buffer, /// The capability being used to send the data. _capability: Capability, } -/// Auto-flush session specialized to vector-based containers. -pub type AutoflushSession<'a, T, D, P> = AutoflushSessionCore<'a, T, Vec, P>; - -impl<'a, T: Timestamp, D: Data, P: Push>>+'a> AutoflushSessionCore<'a, T, Vec, P> where T: Eq+Clone+'a, D: 'a { +impl<'a, T: Timestamp, D: Data, P: Push>>+'a> AutoflushSession<'a, T, Vec, P> where T: Eq+Clone+'a, D: 'a { /// Transmits a single record. #[inline] pub fn give(&mut self, data: D) { @@ -181,7 +175,7 @@ impl<'a, T: Timestamp, D: Data, P: Push>>+'a> AutoflushSess } } -impl<'a, T: Timestamp, C: Container, P: Push>+'a> Drop for AutoflushSessionCore<'a, T, C, P> where T: Eq+Clone+'a, C: 'a { +impl<'a, T: Timestamp, C: Container, P: Push>+'a> Drop for AutoflushSession<'a, T, C, P> where T: Eq+Clone+'a, C: 'a { fn drop(&mut self) { self.buffer.cease(); } diff --git a/timely/src/dataflow/channels/pushers/counter.rs b/timely/src/dataflow/channels/pushers/counter.rs index 59ccacf321..c40e3ddec3 100644 --- a/timely/src/dataflow/channels/pushers/counter.rs +++ b/timely/src/dataflow/channels/pushers/counter.rs @@ -5,24 +5,21 @@ use std::rc::Rc; use std::cell::RefCell; use crate::progress::{ChangeBatch, Timestamp}; -use crate::dataflow::channels::BundleCore; +use crate::dataflow::channels::Bundle; use crate::communication::Push; use crate::Container; /// A wrapper which updates shared `produced` based on the number of records pushed. #[derive(Debug)] -pub struct CounterCore>> { +pub struct Counter>> { pushee: P, produced: Rc>>, phantom: PhantomData, } -/// A counter specialized to vector. -pub type Counter = CounterCore, P>; - -impl Push> for CounterCore where P: Push> { +impl Push> for Counter where P: Push> { #[inline] - fn push(&mut self, message: &mut Option>) { + fn push(&mut self, message: &mut Option>) { if let Some(message) = message { self.produced.borrow_mut().update(message.time.clone(), message.data.len() as i64); } @@ -34,10 +31,10 @@ impl Push> for CounterCore>> CounterCore where T : Ord+Clone+'static { +impl>> Counter where T : Ord+Clone+'static { /// Allocates a new `Counter` from a pushee and shared counts. - pub fn new(pushee: P) -> CounterCore { - CounterCore { + pub fn new(pushee: P) -> Counter { + Counter { pushee, produced: Rc::new(RefCell::new(ChangeBatch::new())), phantom: PhantomData, diff --git a/timely/src/dataflow/channels/pushers/exchange.rs b/timely/src/dataflow/channels/pushers/exchange.rs index 9ea271d310..4491084ba1 100644 --- a/timely/src/dataflow/channels/pushers/exchange.rs +++ b/timely/src/dataflow/channels/pushers/exchange.rs @@ -3,11 +3,11 @@ use timely_container::PushPartitioned; use crate::{Container, Data}; use crate::communication::Push; -use crate::dataflow::channels::{BundleCore, Message}; +use crate::dataflow::channels::{Bundle, Message}; // TODO : Software write combining /// Distributes records among target pushees according to a distribution function. -pub struct Exchange>, H: FnMut(&D) -> u64> { +pub struct Exchange>, H: FnMut(&D) -> u64> { pushers: Vec

, buffers: Vec, current: Option, @@ -15,7 +15,7 @@ pub struct Exchange>, H: FnMut(&D) phantom: std::marker::PhantomData, } -impl>, H: FnMut(&D) -> u64> Exchange { +impl>, H: FnMut(&D) -> u64> Exchange { /// Allocates a new `Exchange` from a supplied set of pushers and a distribution function. pub fn new(pushers: Vec

, key: H) -> Exchange { let mut buffers = vec![]; @@ -40,12 +40,12 @@ impl>, H: FnMut(&D) -> } } -impl>, H: FnMut(&D) -> u64> Push> for Exchange +impl>, H: FnMut(&D) -> u64> Push> for Exchange where C: PushPartitioned { #[inline(never)] - fn push(&mut self, message: &mut Option>) { + fn push(&mut self, message: &mut Option>) { // if only one pusher, no exchange if self.pushers.len() == 1 { self.pushers[0].push(message); diff --git a/timely/src/dataflow/channels/pushers/mod.rs b/timely/src/dataflow/channels/pushers/mod.rs index 2ab6d4f9f6..295d033cab 100644 --- a/timely/src/dataflow/channels/pushers/mod.rs +++ b/timely/src/dataflow/channels/pushers/mod.rs @@ -1,6 +1,6 @@ -pub use self::tee::{Tee, TeeCore, TeeHelper}; +pub use self::tee::{Tee, TeeHelper}; pub use self::exchange::Exchange; -pub use self::counter::{Counter, CounterCore}; +pub use self::counter::Counter; pub mod tee; pub mod exchange; diff --git a/timely/src/dataflow/channels/pushers/tee.rs b/timely/src/dataflow/channels/pushers/tee.rs index 1ec05e4e75..210c9d1490 100644 --- a/timely/src/dataflow/channels/pushers/tee.rs +++ b/timely/src/dataflow/channels/pushers/tee.rs @@ -4,25 +4,22 @@ use std::cell::RefCell; use std::fmt::{self, Debug}; use std::rc::Rc; -use crate::dataflow::channels::{BundleCore, Message}; +use crate::dataflow::channels::{Bundle, Message}; use crate::communication::Push; use crate::{Container, Data}; -type PushList = Rc>>>>>; +type PushList = Rc>>>>>; /// Wraps a shared list of `Box` to forward pushes to. Owned by `Stream`. -pub struct TeeCore { +pub struct Tee { buffer: D, shared: PushList, } -/// [TeeCore] specialized to `Vec`-based container. -pub type Tee = TeeCore>; - -impl Push> for TeeCore { +impl Push> for Tee { #[inline] - fn push(&mut self, message: &mut Option>) { + fn push(&mut self, message: &mut Option>) { let mut pushers = self.shared.borrow_mut(); if let Some(message) = message { for index in 1..pushers.len() { @@ -42,11 +39,11 @@ impl Push> for TeeCore { } } -impl TeeCore { +impl Tee { /// Allocates a new pair of `Tee` and `TeeHelper`. - pub fn new() -> (TeeCore, TeeHelper) { + pub fn new() -> (Tee, TeeHelper) { let shared = Rc::new(RefCell::new(Vec::new())); - let port = TeeCore { + let port = Tee { buffer: Default::default(), shared: shared.clone(), }; @@ -55,7 +52,7 @@ impl TeeCore { } } -impl Clone for TeeCore { +impl Clone for Tee { fn clone(&self) -> Self { Self { buffer: Default::default(), @@ -64,7 +61,7 @@ impl Clone for TeeCore { } } -impl Debug for TeeCore +impl Debug for Tee where D: Debug, { @@ -89,7 +86,7 @@ pub struct TeeHelper { impl TeeHelper { /// Adds a new `Push` implementor to the list of recipients shared with a `Stream`. - pub fn add_pusher>+'static>(&self, pusher: P) { + pub fn add_pusher>+'static>(&self, pusher: P) { self.shared.borrow_mut().push(Box::new(pusher)); } } diff --git a/timely/src/dataflow/mod.rs b/timely/src/dataflow/mod.rs index a6a3c33a95..54da3cb7e2 100644 --- a/timely/src/dataflow/mod.rs +++ b/timely/src/dataflow/mod.rs @@ -13,10 +13,9 @@ //! }); //! ``` -pub use self::stream::{StreamCore, Stream}; +pub use self::stream::Stream; pub use self::scopes::{Scope, ScopeParent}; -pub use self::operators::input::HandleCore as InputHandleCore; pub use self::operators::input::Handle as InputHandle; pub use self::operators::probe::Handle as ProbeHandle; diff --git a/timely/src/dataflow/operators/aggregation/aggregate.rs b/timely/src/dataflow/operators/aggregation/aggregate.rs index 55f7d50a47..d55a6b7d1d 100644 --- a/timely/src/dataflow/operators/aggregation/aggregate.rs +++ b/timely/src/dataflow/operators/aggregation/aggregate.rs @@ -3,7 +3,7 @@ use std::hash::Hash; use std::collections::HashMap; use crate::{Data, ExchangeData}; -use crate::dataflow::{Stream, Scope}; +use crate::dataflow::{Scope, Stream}; use crate::dataflow::operators::generic::operator::Operator; use crate::dataflow::channels::pact::Exchange; @@ -64,16 +64,16 @@ pub trait Aggregate { &self, fold: F, emit: E, - hash: H) -> Stream where S::Timestamp: Eq; + hash: H) -> Stream> where S::Timestamp: Eq; } -impl Aggregate for Stream { +impl Aggregate for Stream> { fn aggregateR+'static, H: Fn(&K)->u64+'static>( &self, fold: F, emit: E, - hash: H) -> Stream where S::Timestamp: Eq { + hash: H) -> Stream> where S::Timestamp: Eq { let mut aggregates = HashMap::new(); let mut vector = Vec::new(); diff --git a/timely/src/dataflow/operators/aggregation/state_machine.rs b/timely/src/dataflow/operators/aggregation/state_machine.rs index 89e1cf4589..13ba32b20e 100644 --- a/timely/src/dataflow/operators/aggregation/state_machine.rs +++ b/timely/src/dataflow/operators/aggregation/state_machine.rs @@ -3,7 +3,7 @@ use std::hash::Hash; use std::collections::HashMap; use crate::{Data, ExchangeData}; -use crate::dataflow::{Stream, Scope}; +use crate::dataflow::{Scope, Stream}; use crate::dataflow::operators::generic::operator::Operator; use crate::dataflow::channels::pact::Exchange; @@ -51,17 +51,17 @@ pub trait StateMachine { I: IntoIterator, // type of output iterator F: Fn(&K, V, &mut D)->(bool, I)+'static, // state update logic H: Fn(&K)->u64+'static, // "hash" function for keys - >(&self, fold: F, hash: H) -> Stream where S::Timestamp : Hash+Eq ; + >(&self, fold: F, hash: H) -> Stream> where S::Timestamp : Hash+Eq ; } -impl StateMachine for Stream { +impl StateMachine for Stream> { fn state_machine< R: Data, // output type D: Default+'static, // per-key state (data) I: IntoIterator, // type of output iterator F: Fn(&K, V, &mut D)->(bool, I)+'static, // state update logic H: Fn(&K)->u64+'static, // "hash" function for keys - >(&self, fold: F, hash: H) -> Stream where S::Timestamp : Hash+Eq { + >(&self, fold: F, hash: H) -> Stream> where S::Timestamp : Hash+Eq { let mut pending: HashMap<_, Vec<(K, V)>> = HashMap::new(); // times -> (keys -> state) let mut states = HashMap::new(); // keys -> state diff --git a/timely/src/dataflow/operators/branch.rs b/timely/src/dataflow/operators/branch.rs index 70e087abde..a55d34acf2 100644 --- a/timely/src/dataflow/operators/branch.rs +++ b/timely/src/dataflow/operators/branch.rs @@ -2,7 +2,7 @@ use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; -use crate::dataflow::{Scope, Stream, StreamCore}; +use crate::dataflow::{Scope, Stream}; use crate::{Container, Data}; /// Extension trait for `Stream`. @@ -31,14 +31,14 @@ pub trait Branch { fn branch( &self, condition: impl Fn(&S::Timestamp, &D) -> bool + 'static, - ) -> (Stream, Stream); + ) -> (Stream>, Stream>); } -impl Branch for Stream { +impl Branch for Stream> { fn branch( &self, condition: impl Fn(&S::Timestamp, &D) -> bool + 'static, - ) -> (Stream, Stream) { + ) -> (Stream>, Stream>) { let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope()); let mut input = builder.new_input(self, Pipeline); @@ -94,7 +94,7 @@ pub trait BranchWhen: Sized { fn branch_when(&self, condition: impl Fn(&T) -> bool + 'static) -> (Self, Self); } -impl BranchWhen for StreamCore { +impl BranchWhen for Stream { fn branch_when(&self, condition: impl Fn(&S::Timestamp) -> bool + 'static) -> (Self, Self) { let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope()); diff --git a/timely/src/dataflow/operators/broadcast.rs b/timely/src/dataflow/operators/broadcast.rs index 21d80d6da4..2286a5f621 100644 --- a/timely/src/dataflow/operators/broadcast.rs +++ b/timely/src/dataflow/operators/broadcast.rs @@ -1,7 +1,7 @@ //! Broadcast records to all workers. use crate::ExchangeData; -use crate::dataflow::{Stream, Scope}; +use crate::dataflow::{Scope, Stream}; use crate::dataflow::operators::{Map, Exchange}; /// Broadcast records to all workers. @@ -21,8 +21,8 @@ pub trait Broadcast { fn broadcast(&self) -> Self; } -impl Broadcast for Stream { - fn broadcast(&self) -> Stream { +impl Broadcast for Stream> { + fn broadcast(&self) -> Stream> { // NOTE: Simplified implementation due to underlying motion // in timely dataflow internals. Optimize once they have diff --git a/timely/src/dataflow/operators/capture/capture.rs b/timely/src/dataflow/operators/capture/capture.rs index c62b95417b..bfb1f4f97e 100644 --- a/timely/src/dataflow/operators/capture/capture.rs +++ b/timely/src/dataflow/operators/capture/capture.rs @@ -5,7 +5,7 @@ //! and there are several default implementations, including a linked-list, Rust's MPSC //! queue, and a binary serializer wrapping any `W: Write`. -use crate::dataflow::{Scope, StreamCore}; +use crate::dataflow::{Scope, Stream}; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::channels::pullers::Counter as PullCounter; use crate::dataflow::operators::generic::builder_raw::OperatorBuilder; @@ -14,7 +14,7 @@ use crate::Container; use crate::progress::ChangeBatch; use crate::progress::Timestamp; -use super::{EventCore, EventPusherCore}; +use super::{Event, EventPusher}; /// Capture a stream of timestamped data for later replay. pub trait Capture { @@ -30,7 +30,7 @@ pub trait Capture { /// use std::sync::{Arc, Mutex}; /// use timely::dataflow::Scope; /// use timely::dataflow::operators::{Capture, ToStream, Inspect}; - /// use timely::dataflow::operators::capture::{EventLinkCore, Replay, Extract}; + /// use timely::dataflow::operators::capture::{EventLink, Replay, Extract}; /// /// // get send and recv endpoints, wrap send to share /// let (send, recv) = ::std::sync::mpsc::channel(); @@ -42,7 +42,7 @@ pub trait Capture { /// let send = send.lock().unwrap().clone(); /// /// // these are to capture/replay the stream. - /// let handle1 = Rc::new(EventLinkCore::new()); + /// let handle1 = Rc::new(EventLink::new()); /// let handle2 = Some(handle1.clone()); /// /// worker.dataflow::(|scope1| @@ -70,7 +70,7 @@ pub trait Capture { /// use std::sync::{Arc, Mutex}; /// use timely::dataflow::Scope; /// use timely::dataflow::operators::{Capture, ToStream, Inspect}; - /// use timely::dataflow::operators::capture::{EventReader, EventWriter, Replay, Extract}; + /// use timely::dataflow::operators::capture::{EventReader, Replay, Extract, EventWriter}; /// /// // get send and recv endpoints, wrap send to share /// let (send0, recv0) = ::std::sync::mpsc::channel(); @@ -95,7 +95,7 @@ pub trait Capture { /// ); /// /// worker.dataflow::(|scope2| { - /// Some(EventReader::<_,u64,_>::new(recv)) + /// Some(EventReader::<_, Vec,_>::new(recv)) /// .replay_into(scope2) /// .capture_into(send0) /// }); @@ -103,18 +103,18 @@ pub trait Capture { /// /// assert_eq!(recv0.extract()[0].1, (0..10).collect::>()); /// ``` - fn capture_into+'static>(&self, pusher: P); + fn capture_into+'static>(&self, pusher: P); /// Captures a stream using Rust's MPSC channels. - fn capture(&self) -> ::std::sync::mpsc::Receiver> { + fn capture(&self) -> ::std::sync::mpsc::Receiver> { let (send, recv) = ::std::sync::mpsc::channel(); self.capture_into(send); recv } } -impl Capture for StreamCore { - fn capture_into+'static>(&self, mut event_pusher: P) { +impl Capture for Stream { + fn capture_into+'static>(&self, mut event_pusher: P) { let mut builder = OperatorBuilder::new("Capture".to_owned(), self.scope()); let mut input = PullCounter::new(builder.new_input(self, Pipeline)); @@ -131,7 +131,7 @@ impl Capture for StreamCore { if !progress.frontiers[0].is_empty() { // transmit any frontier progress. let to_send = ::std::mem::replace(&mut progress.frontiers[0], ChangeBatch::new()); - event_pusher.push(EventCore::Progress(to_send.into_inner())); + event_pusher.push(Event::Progress(to_send.into_inner())); } use crate::communication::message::RefOrMut; @@ -143,7 +143,7 @@ impl Capture for StreamCore { RefOrMut::Mut(reference) => (&reference.time, RefOrMut::Mut(&mut reference.data)), }; let vector = data.replace(Default::default()); - event_pusher.push(EventCore::Messages(time.clone(), vector)); + event_pusher.push(Event::Messages(time.clone(), vector)); } input.consumed().borrow_mut().drain_into(&mut progress.consumeds[0]); false diff --git a/timely/src/dataflow/operators/capture/event.rs b/timely/src/dataflow/operators/capture/event.rs index 252b7361cc..2dfb24ea20 100644 --- a/timely/src/dataflow/operators/capture/event.rs +++ b/timely/src/dataflow/operators/capture/event.rs @@ -6,55 +6,34 @@ /// Data and progress events of the captured stream. #[derive(Debug, Clone, Abomonation, Hash, Ord, PartialOrd, Eq, PartialEq, Deserialize, Serialize)] -pub enum EventCore { +pub enum Event { /// Progress received via `push_external_progress`. Progress(Vec<(T, i64)>), /// Messages received via the data stream. Messages(T, D), } -/// Data and progress events of the captured stream, specialized to vector-based containers. -pub type Event = EventCore>; - -/// Iterates over contained `EventCore`. +/// Iterates over contained `Event`. /// /// The `EventIterator` trait describes types that can iterate over references to events, /// and which can be used to replay a stream into a new timely dataflow computation. /// /// This method is not simply an iterator because of the lifetime in the result. -pub trait EventIteratorCore { - /// Iterates over references to `EventCore` elements. - fn next(&mut self) -> Option<&EventCore>; -} - -/// A [EventIteratorCore] specialized to vector-based containers. -// TODO: use trait aliases once stable. -pub trait EventIterator: EventIteratorCore> { +pub trait EventIterator { /// Iterates over references to `Event` elements. fn next(&mut self) -> Option<&Event>; } -impl>> EventIterator for E { - fn next(&mut self) -> Option<&Event> { - >::next(self) - } -} - -/// Receives `EventCore` events. -pub trait EventPusherCore { +/// Receives `Event` events. +pub trait EventPusher { /// Provides a new `Event` to the pusher. - fn push(&mut self, event: EventCore); + fn push(&mut self, event: Event); } -/// A [EventPusherCore] specialized to vector-based containers. -// TODO: use trait aliases once stable. -pub trait EventPusher: EventPusherCore> {} -impl>> EventPusher for E {} - // implementation for the linked list behind a `Handle`. -impl EventPusherCore for ::std::sync::mpsc::Sender> { - fn push(&mut self, event: EventCore) { +impl EventPusher for ::std::sync::mpsc::Sender> { + fn push(&mut self, event: Event) { // NOTE: An Err(x) result just means "data not accepted" most likely // because the receiver is gone. No need to panic. let _ = self.send(event); @@ -67,40 +46,37 @@ pub mod link { use std::rc::Rc; use std::cell::RefCell; - use super::{EventCore, EventPusherCore, EventIteratorCore}; + use super::{Event, EventPusher, EventIterator}; - /// A linked list of EventCore. - pub struct EventLinkCore { + /// A linked list of Event. + pub struct EventLink { /// An event, if one exists. /// /// An event might not exist, if either we want to insert a `None` and have the output iterator pause, /// or in the case of the very first linked list element, which has no event when constructed. - pub event: Option>, + pub event: Option>, /// The next event, if it exists. - pub next: RefCell>>>, + pub next: RefCell>>>, } - /// A [EventLinkCore] specialized to vector-based containers. - pub type EventLink = EventLinkCore>; - - impl EventLinkCore { + impl EventLink { /// Allocates a new `EventLink`. - pub fn new() -> EventLinkCore { - EventLinkCore { event: None, next: RefCell::new(None) } + pub fn new() -> EventLink { + EventLink { event: None, next: RefCell::new(None) } } } // implementation for the linked list behind a `Handle`. - impl EventPusherCore for Rc> { - fn push(&mut self, event: EventCore) { - *self.next.borrow_mut() = Some(Rc::new(EventLinkCore { event: Some(event), next: RefCell::new(None) })); + impl EventPusher for Rc> { + fn push(&mut self, event: Event) { + *self.next.borrow_mut() = Some(Rc::new(EventLink { event: Some(event), next: RefCell::new(None) })); let next = self.next.borrow().as_ref().unwrap().clone(); *self = next; } } - impl EventIteratorCore for Rc> { - fn next(&mut self) -> Option<&EventCore> { + impl EventIterator for Rc> { + fn next(&mut self) -> Option<&Event> { let is_some = self.next.borrow().is_some(); if is_some { let next = self.next.borrow().as_ref().unwrap().clone(); @@ -114,7 +90,7 @@ pub mod link { } // Drop implementation to prevent stack overflow through naive drop impl. - impl Drop for EventLinkCore { + impl Drop for EventLink { fn drop(&mut self) { while let Some(link) = self.next.replace(None) { if let Ok(head) = Rc::try_unwrap(link) { @@ -124,7 +100,7 @@ pub mod link { } } - impl Default for EventLinkCore { + impl Default for EventLink { fn default() -> Self { Self::new() } @@ -132,10 +108,10 @@ pub mod link { #[test] fn avoid_stack_overflow_in_drop() { - let mut event1 = Rc::new(EventLinkCore::<(),()>::new()); + let mut event1 = Rc::new(EventLink::<(),()>::new()); let _event2 = event1.clone(); for _ in 0 .. 1_000_000 { - event1.push(EventCore::Progress(vec![])); + event1.push(Event::Progress(vec![])); } } } @@ -145,18 +121,15 @@ pub mod binary { use std::io::Write; use abomonation::Abomonation; - use super::{EventCore, EventPusherCore, EventIteratorCore}; + use super::{Event, EventPusher, EventIterator}; /// A wrapper for `W: Write` implementing `EventPusherCore`. - pub struct EventWriterCore { + pub struct EventWriter { stream: W, phant: ::std::marker::PhantomData<(T,D)>, } - /// [EventWriterCore] specialized to vector-based containers. - pub type EventWriter = EventWriterCore, W>; - - impl EventWriterCore { + impl EventWriter { /// Allocates a new `EventWriter` wrapping a supplied writer. pub fn new(w: W) -> Self { Self { @@ -166,15 +139,15 @@ pub mod binary { } } - impl EventPusherCore for EventWriterCore { - fn push(&mut self, event: EventCore) { + impl EventPusher for EventWriter { + fn push(&mut self, event: Event) { // TODO: `push` has no mechanism to report errors, so we `unwrap`. unsafe { ::abomonation::encode(&event, &mut self.stream).expect("Event abomonation/write failed"); } } } /// A Wrapper for `R: Read` implementing `EventIterator`. - pub struct EventReaderCore { + pub struct EventReader { reader: R, bytes: Vec, buff1: Vec, @@ -184,10 +157,7 @@ pub mod binary { phant: ::std::marker::PhantomData<(T,D)>, } - /// [EventReaderCore] specialized to vector-based containers. - pub type EventReader = EventReaderCore, R>; - - impl EventReaderCore { + impl EventReader { /// Allocates a new `EventReader` wrapping a supplied reader. pub fn new(r: R) -> Self { Self { @@ -202,12 +172,12 @@ pub mod binary { } } - impl EventIteratorCore for EventReaderCore { - fn next(&mut self) -> Option<&EventCore> { + impl EventIterator for EventReader { + fn next(&mut self) -> Option<&Event> { // if we can decode something, we should just return it! :D - if unsafe { ::abomonation::decode::>(&mut self.buff1[self.consumed..]) }.is_some() { - let (item, rest) = unsafe { ::abomonation::decode::>(&mut self.buff1[self.consumed..]) }.unwrap(); + if unsafe { ::abomonation::decode::>(&mut self.buff1[self.consumed..]) }.is_some() { + let (item, rest) = unsafe { ::abomonation::decode::>(&mut self.buff1[self.consumed..]) }.unwrap(); self.consumed = self.valid - rest.len(); return Some(item); } diff --git a/timely/src/dataflow/operators/capture/extract.rs b/timely/src/dataflow/operators/capture/extract.rs index dcf57ae3bb..1f603a9958 100644 --- a/timely/src/dataflow/operators/capture/extract.rs +++ b/timely/src/dataflow/operators/capture/extract.rs @@ -1,6 +1,6 @@ //! Traits and types for extracting captured timely dataflow streams. -use super::EventCore; +use super::Event; use crate::Container; use crate::Data; @@ -18,7 +18,7 @@ pub trait Extract { /// use std::sync::{Arc, Mutex}; /// use timely::dataflow::Scope; /// use timely::dataflow::operators::{Capture, ToStream, Inspect}; - /// use timely::dataflow::operators::capture::{EventLinkCore, Replay, Extract}; + /// use timely::dataflow::operators::capture::{EventLink, Replay, Extract}; /// /// // get send and recv endpoints, wrap send to share /// let (send, recv) = ::std::sync::mpsc::channel(); @@ -30,7 +30,7 @@ pub trait Extract { /// let send = send.lock().unwrap().clone(); /// /// // these are to capture/replay the stream. - /// let handle1 = Rc::new(EventLinkCore::new()); + /// let handle1 = Rc::new(EventLink::new()); /// let handle2 = Some(handle1.clone()); /// /// worker.dataflow::(|scope1| @@ -49,7 +49,7 @@ pub trait Extract { fn extract(self) -> Vec<(T, Vec)>; } -impl Extract for ::std::sync::mpsc::Receiver>> { +impl Extract for ::std::sync::mpsc::Receiver>> { fn extract(self) -> Vec<(T, Vec)> { let mut result = self.extract_core(); @@ -86,7 +86,7 @@ pub trait ExtractCore { /// use std::sync::{Arc, Mutex}; /// use timely::dataflow::Scope; /// use timely::dataflow::operators::{Capture, ToStream, Inspect}; - /// use timely::dataflow::operators::capture::{EventLinkCore, Replay, ExtractCore}; + /// use timely::dataflow::operators::capture::{EventLink, Replay, ExtractCore}; /// /// // get send and recv endpoints, wrap send to share /// let (send, recv) = ::std::sync::mpsc::channel(); @@ -98,7 +98,7 @@ pub trait ExtractCore { /// let send = send.lock().unwrap().clone(); /// /// // these are to capture/replay the stream. - /// let handle1 = Rc::new(EventLinkCore::new()); + /// let handle1 = Rc::new(EventLink::new()); /// let handle2 = Some(handle1.clone()); /// /// worker.dataflow::(|scope1| @@ -117,11 +117,11 @@ pub trait ExtractCore { fn extract_core(self) -> Vec<(T, C)>; } -impl ExtractCore for ::std::sync::mpsc::Receiver> { +impl ExtractCore for ::std::sync::mpsc::Receiver> { fn extract_core(self) -> Vec<(T, C)> { let mut result = Vec::new(); for event in self { - if let EventCore::Messages(time, data) = event { + if let Event::Messages(time, data) = event { result.push((time, data)); } } diff --git a/timely/src/dataflow/operators/capture/mod.rs b/timely/src/dataflow/operators/capture/mod.rs index 22d332ea02..5ff8cb81b0 100644 --- a/timely/src/dataflow/operators/capture/mod.rs +++ b/timely/src/dataflow/operators/capture/mod.rs @@ -22,10 +22,10 @@ //! use std::rc::Rc; //! use timely::dataflow::Scope; //! use timely::dataflow::operators::{Capture, ToStream, Inspect}; -//! use timely::dataflow::operators::capture::{EventLinkCore, Replay}; +//! use timely::dataflow::operators::capture::{EventLink, Replay}; //! //! timely::execute(timely::Config::thread(), |worker| { -//! let handle1 = Rc::new(EventLinkCore::new()); +//! let handle1 = Rc::new(EventLink::new()); //! let handle2 = Some(handle1.clone()); //! //! worker.dataflow::(|scope1| @@ -66,7 +66,7 @@ //! ); //! //! worker.dataflow::(|scope2| { -//! Some(EventReader::<_,u64,_>::new(recv)) +//! Some(EventReader::, TcpStream>::new(recv)) //! .replay_into(scope2) //! .inspect(|x| println!("replayed: {:?}", x)); //! }) @@ -76,10 +76,10 @@ pub use self::capture::Capture; pub use self::replay::Replay; pub use self::extract::{Extract, ExtractCore}; -pub use self::event::{Event, EventCore, EventPusher, EventPusherCore}; -pub use self::event::link::{EventLink, EventLinkCore}; -pub use self::event::binary::{EventReader, EventReaderCore}; -pub use self::event::binary::{EventWriter, EventWriterCore}; +pub use self::event::{Event, EventPusher}; +pub use self::event::link::EventLink; +pub use self::event::binary::EventReader; +pub use self::event::binary::EventWriter; pub mod capture; pub mod replay; diff --git a/timely/src/dataflow/operators/capture/replay.rs b/timely/src/dataflow/operators/capture/replay.rs index 2cc3254196..b4a75545cc 100644 --- a/timely/src/dataflow/operators/capture/replay.rs +++ b/timely/src/dataflow/operators/capture/replay.rs @@ -38,20 +38,20 @@ //! allowing the replay to occur in a timely dataflow computation with more or fewer workers //! than that in which the stream was captured. -use crate::dataflow::{Scope, StreamCore}; -use crate::dataflow::channels::pushers::CounterCore as PushCounter; -use crate::dataflow::channels::pushers::buffer::BufferCore as PushBuffer; +use crate::dataflow::{Scope, Stream}; +use crate::dataflow::channels::pushers::Counter as PushCounter; +use crate::dataflow::channels::pushers::buffer::Buffer as PushBuffer; use crate::dataflow::operators::generic::builder_raw::OperatorBuilder; use crate::progress::Timestamp; -use super::EventCore; -use super::event::EventIteratorCore; +use super::Event; +use super::event::EventIterator; use crate::Container; /// Replay a capture stream into a scope with the same timestamp. pub trait Replay : Sized { /// Replays `self` into the provided scope, as a `Stream`. - fn replay_into>(self, scope: &mut S) -> StreamCore { + fn replay_into>(self, scope: &mut S) -> Stream { self.replay_core(scope, Some(std::time::Duration::new(0, 0))) } /// Replays `self` into the provided scope, as a `Stream'. @@ -59,13 +59,13 @@ pub trait Replay : Sized { /// The `period` argument allows the specification of a re-activation period, where the operator /// will re-activate itself every so often. The `None` argument instructs the operator not to /// re-activate itself.us - fn replay_core>(self, scope: &mut S, period: Option) -> StreamCore; + fn replay_core>(self, scope: &mut S, period: Option) -> Stream; } impl Replay for I where I : IntoIterator, - ::Item: EventIteratorCore+'static { - fn replay_core>(self, scope: &mut S, period: Option) -> StreamCore{ + ::Item: EventIterator+'static { + fn replay_core>(self, scope: &mut S, period: Option) -> Stream{ let mut builder = OperatorBuilder::new("Replay".to_owned(), scope.clone()); @@ -93,10 +93,10 @@ where I : IntoIterator, for event_stream in event_streams.iter_mut() { while let Some(event) = event_stream.next() { match event { - EventCore::Progress(vec) => { + Event::Progress(vec) => { progress.internals[0].extend(vec.iter().cloned()); }, - EventCore::Messages(ref time, data) => { + Event::Messages(ref time, data) => { allocation.clone_from(data); output.session(time).give_container(&mut allocation); } diff --git a/timely/src/dataflow/operators/concat.rs b/timely/src/dataflow/operators/concat.rs index 449609f5bd..c65f14b9b0 100644 --- a/timely/src/dataflow/operators/concat.rs +++ b/timely/src/dataflow/operators/concat.rs @@ -3,7 +3,7 @@ use crate::Container; use crate::dataflow::channels::pact::Pipeline; -use crate::dataflow::{StreamCore, Scope}; +use crate::dataflow::{Stream, Scope}; /// Merge the contents of two streams. pub trait Concat { @@ -20,11 +20,11 @@ pub trait Concat { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn concat(&self, _: &StreamCore) -> StreamCore; + fn concat(&self, _: &Stream) -> Stream; } -impl Concat for StreamCore { - fn concat(&self, other: &StreamCore) -> StreamCore { +impl Concat for Stream { + fn concat(&self, other: &Stream) -> Stream { self.scope().concatenate([self.clone(), other.clone()]) } } @@ -47,15 +47,15 @@ pub trait Concatenate { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn concatenate(&self, sources: I) -> StreamCore + fn concatenate(&self, sources: I) -> Stream where - I: IntoIterator>; + I: IntoIterator>; } -impl Concatenate for StreamCore { - fn concatenate(&self, sources: I) -> StreamCore +impl Concatenate for Stream { + fn concatenate(&self, sources: I) -> Stream where - I: IntoIterator> + I: IntoIterator> { let clone = self.clone(); self.scope().concatenate(Some(clone).into_iter().chain(sources)) @@ -63,9 +63,9 @@ impl Concatenate for StreamCore { } impl Concatenate for G { - fn concatenate(&self, sources: I) -> StreamCore + fn concatenate(&self, sources: I) -> Stream where - I: IntoIterator> + I: IntoIterator> { // create an operator builder. diff --git a/timely/src/dataflow/operators/count.rs b/timely/src/dataflow/operators/count.rs index 2a1290bb5d..2d0da96186 100644 --- a/timely/src/dataflow/operators/count.rs +++ b/timely/src/dataflow/operators/count.rs @@ -5,7 +5,7 @@ use crate::communication::message::RefOrMut; use crate::Data; use crate::dataflow::channels::pact::Pipeline; -use crate::dataflow::{Stream, Scope}; +use crate::dataflow::{Scope, Stream}; use crate::dataflow::operators::generic::operator::Operator; /// Accumulates records within a timestamp. @@ -27,7 +27,7 @@ pub trait Accumulate { /// let extracted = captured.extract(); /// assert_eq!(extracted, vec![(0, vec![45])]); /// ``` - fn accumulate(&self, default: A, logic: impl Fn(&mut A, RefOrMut>)+'static) -> Stream; + fn accumulate(&self, default: A, logic: impl Fn(&mut A, RefOrMut>)+'static) -> Stream>; /// Counts the number of records observed at each time. /// /// # Examples @@ -45,13 +45,13 @@ pub trait Accumulate { /// let extracted = captured.extract(); /// assert_eq!(extracted, vec![(0, vec![10])]); /// ``` - fn count(&self) -> Stream { + fn count(&self) -> Stream> { self.accumulate(0, |sum, data| *sum += data.len()) } } -impl Accumulate for Stream { - fn accumulate(&self, default: A, logic: impl Fn(&mut A, RefOrMut>)+'static) -> Stream { +impl Accumulate for Stream> { + fn accumulate(&self, default: A, logic: impl Fn(&mut A, RefOrMut>)+'static) -> Stream> { let mut accums = HashMap::new(); self.unary_notify(Pipeline, "Accumulate", vec![], move |input, output, notificator| { diff --git a/timely/src/dataflow/operators/delay.rs b/timely/src/dataflow/operators/delay.rs index ddda2faaf9..53d792afc4 100644 --- a/timely/src/dataflow/operators/delay.rs +++ b/timely/src/dataflow/operators/delay.rs @@ -5,7 +5,7 @@ use std::collections::HashMap; use crate::Data; use crate::order::{PartialOrder, TotalOrder}; use crate::dataflow::channels::pact::Pipeline; -use crate::dataflow::{Stream, Scope}; +use crate::dataflow::{Scope, Stream}; use crate::dataflow::operators::generic::operator::Operator; /// Methods to advance the timestamps of records or batches of records. @@ -94,7 +94,7 @@ pub trait Delay { fn delay_batchG::Timestamp+'static>(&self, func: L) -> Self; } -impl Delay for Stream { +impl Delay for Stream> { fn delayG::Timestamp+'static>(&self, mut func: L) -> Self { let mut elements = HashMap::new(); let mut vector = Vec::new(); diff --git a/timely/src/dataflow/operators/enterleave.rs b/timely/src/dataflow/operators/enterleave.rs index bd9b1f86ad..56f7ae6b6b 100644 --- a/timely/src/dataflow/operators/enterleave.rs +++ b/timely/src/dataflow/operators/enterleave.rs @@ -27,11 +27,11 @@ use crate::progress::{Source, Target}; use crate::order::Product; use crate::{Container, Data}; use crate::communication::Push; -use crate::dataflow::channels::pushers::{CounterCore, TeeCore}; -use crate::dataflow::channels::{BundleCore, Message}; +use crate::dataflow::channels::pushers::{Counter, Tee}; +use crate::dataflow::channels::{Bundle, Message}; use crate::worker::AsWorker; -use crate::dataflow::{StreamCore, Scope, Stream}; +use crate::dataflow::{Stream, Scope}; use crate::dataflow::scopes::{Child, ScopeParent}; use crate::dataflow::operators::delay::Delay; @@ -51,7 +51,7 @@ pub trait Enter, C: Container> { /// }); /// }); /// ``` - fn enter<'a>(&self, _: &Child<'a, G, T>) -> StreamCore, C>; + fn enter<'a>(&self, _: &Child<'a, G, T>) -> Stream, C>; } use crate::dataflow::scopes::child::Iterative; @@ -72,24 +72,24 @@ pub trait EnterAt { /// }); /// }); /// ``` - fn enter_at<'a, F:FnMut(&D)->T+'static>(&self, scope: &Iterative<'a, G, T>, initial: F) -> Stream, D> ; + fn enter_at<'a, F:FnMut(&D)->T+'static>(&self, scope: &Iterative<'a, G, T>, initial: F) -> Stream, Vec>; } impl::Timestamp, T>, Vec>> EnterAt for E { fn enter_at<'a, F:FnMut(&D)->T+'static>(&self, scope: &Iterative<'a, G, T>, mut initial: F) -> - Stream, D> { + Stream, Vec> { self.enter(scope).delay(move |datum, time| Product::new(time.clone().to_outer(), initial(datum))) } } -impl, C: Data+Container> Enter for StreamCore { - fn enter<'a>(&self, scope: &Child<'a, G, T>) -> StreamCore, C> { +impl, C: Data+Container> Enter for Stream { + fn enter<'a>(&self, scope: &Child<'a, G, T>) -> Stream, C> { use crate::scheduling::Scheduler; - let (targets, registrar) = TeeCore::::new(); + let (targets, registrar) = Tee::::new(); let ingress = IngressNub { - targets: CounterCore::new(targets), + targets: Counter::new(targets), phantom: ::std::marker::PhantomData, activator: scope.activator_for(&scope.addr()), active: false, @@ -100,7 +100,7 @@ impl, C: Data+Container> Enter { /// }); /// }); /// ``` - fn leave(&self) -> StreamCore; + fn leave(&self) -> Stream; } -impl<'a, G: Scope, D: Clone+Container, T: Timestamp+Refines> Leave for StreamCore, D> { - fn leave(&self) -> StreamCore { +impl<'a, G: Scope, D: Clone+Container, T: Timestamp+Refines> Leave for Stream, D> { + fn leave(&self) -> Stream { let scope = self.scope(); let output = scope.subgraph.borrow_mut().new_output(); - let (targets, registrar) = TeeCore::::new(); + let (targets, registrar) = Tee::::new(); let channel_id = scope.clone().new_identifier(); self.connect_to(Target::new(0, output.port), EgressNub { targets, phantom: PhantomData }, channel_id); - StreamCore::new( + Stream::new( output, registrar, scope.parent, @@ -143,18 +143,18 @@ impl<'a, G: Scope, D: Clone+Container, T: Timestamp+Refines> Leave struct IngressNub, TData: Container> { - targets: CounterCore>, + targets: Counter>, phantom: ::std::marker::PhantomData, activator: crate::scheduling::Activator, active: bool, } -impl, TData: Container> Push> for IngressNub { - fn push(&mut self, element: &mut Option>) { +impl, TData: Container> Push> for IngressNub { + fn push(&mut self, element: &mut Option>) { if let Some(message) = element { let outer_message = message.as_mut(); let data = ::std::mem::take(&mut outer_message.data); - let mut inner_message = Some(BundleCore::from_typed(Message::new(TInner::to_inner(outer_message.time.clone()), data, 0, 0))); + let mut inner_message = Some(Bundle::from_typed(Message::new(TInner::to_inner(outer_message.time.clone()), data, 0, 0))); self.targets.push(&mut inner_message); if let Some(inner_message) = inner_message { if let Some(inner_message) = inner_message.if_typed() { @@ -175,17 +175,17 @@ impl, TData: Container> Pus struct EgressNub, TData: Data> { - targets: TeeCore, + targets: Tee, phantom: PhantomData, } -impl Push> for EgressNub +impl Push> for EgressNub where TOuter: Timestamp, TInner: Timestamp+Refines, TData: Data { - fn push(&mut self, message: &mut Option>) { + fn push(&mut self, message: &mut Option>) { if let Some(message) = message { let inner_message = message.as_mut(); let data = ::std::mem::take(&mut inner_message.data); - let mut outer_message = Some(BundleCore::from_typed(Message::new(inner_message.time.clone().to_outer(), data, 0, 0))); + let mut outer_message = Some(Bundle::from_typed(Message::new(inner_message.time.clone().to_outer(), data, 0, 0))); self.targets.push(&mut outer_message); if let Some(outer_message) = outer_message { if let Some(outer_message) = outer_message.if_typed() { diff --git a/timely/src/dataflow/operators/exchange.rs b/timely/src/dataflow/operators/exchange.rs index 1f603df25d..72b09d611a 100644 --- a/timely/src/dataflow/operators/exchange.rs +++ b/timely/src/dataflow/operators/exchange.rs @@ -2,9 +2,9 @@ use crate::ExchangeData; use crate::container::PushPartitioned; -use crate::dataflow::channels::pact::ExchangeCore; +use crate::dataflow::channels::pact::Exchange as ExchangePact; use crate::dataflow::operators::generic::operator::Operator; -use crate::dataflow::{Scope, StreamCore}; +use crate::dataflow::{Scope, Stream}; /// Exchange records between workers. pub trait Exchange { @@ -26,14 +26,14 @@ pub trait Exchange { fn exchange(&self, route: impl FnMut(&D) -> u64 + 'static) -> Self; } -impl Exchange for StreamCore +impl Exchange for Stream where C: PushPartitioned + ExchangeData, C::Item: ExchangeData, { - fn exchange(&self, route: impl FnMut(&C::Item) -> u64 + 'static) -> StreamCore { + fn exchange(&self, route: impl FnMut(&C::Item) -> u64 + 'static) -> Stream { let mut container = Default::default(); - self.unary(ExchangeCore::new(route), "Exchange", |_, _| { + self.unary(ExchangePact::new(route), "Exchange", |_, _| { move |input, output| { input.for_each(|time, data| { data.swap(&mut container); diff --git a/timely/src/dataflow/operators/feedback.rs b/timely/src/dataflow/operators/feedback.rs index a7eb90c658..52e9a26488 100644 --- a/timely/src/dataflow/operators/feedback.rs +++ b/timely/src/dataflow/operators/feedback.rs @@ -1,44 +1,21 @@ //! Create cycles in a timely dataflow graph. -use crate::{Container, Data}; +use crate::Container; use crate::progress::{Timestamp, PathSummary}; use crate::progress::frontier::Antichain; use crate::order::Product; -use crate::dataflow::channels::pushers::TeeCore; +use crate::dataflow::channels::pushers::Tee; use crate::dataflow::channels::pact::Pipeline; -use crate::dataflow::{StreamCore, Scope, Stream}; +use crate::dataflow::{Stream, Scope}; use crate::dataflow::scopes::child::Iterative; use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; use crate::dataflow::operators::generic::OutputWrapper; /// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`. pub trait Feedback { - /// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`. - /// - /// The resulting `Stream` will have its data defined by a future call to `connect_loop` with - /// its `Handle` passed as an argument. Data passed through the stream will have their - /// timestamps advanced by `summary`. - /// - /// # Examples - /// ``` - /// use timely::dataflow::Scope; - /// use timely::dataflow::operators::{Feedback, ConnectLoop, ToStream, Concat, Inspect, BranchWhen}; - /// - /// timely::example(|scope| { - /// // circulate 0..10 for 100 iterations. - /// let (handle, cycle) = scope.feedback(1); - /// (0..10).to_stream(scope) - /// .concat(&cycle) - /// .inspect(|x| println!("seen: {:?}", x)) - /// .branch_when(|t| t < &100).1 - /// .connect_loop(handle); - /// }); - /// ``` - fn feedback(&mut self, summary: ::Summary) -> (Handle, Stream); - - /// Creates a [StreamCore] and a [HandleCore] to later bind the source of that `Stream`. + /// Creates a [Stream] and a [Handle] to later bind the source of that `Stream`. /// /// The resulting `Stream` will have its data defined by a future call to `connect_loop` with /// its `Handle` passed as an argument. Data passed through the stream will have their @@ -51,7 +28,7 @@ pub trait Feedback { /// /// timely::example(|scope| { /// // circulate 0..10 for 100 iterations. - /// let (handle, cycle) = scope.feedback_core::>(1); + /// let (handle, cycle) = scope.feedback::>(1); /// (0..10).to_stream(scope) /// .concat(&cycle) /// .inspect(|x| println!("seen: {:?}", x)) @@ -59,7 +36,7 @@ pub trait Feedback { /// .connect_loop(handle); /// }); /// ``` - fn feedback_core(&mut self, summary: ::Summary) -> (HandleCore, StreamCore); + fn feedback(&mut self, summary: ::Summary) -> (Handle, Stream); } /// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`. @@ -87,26 +64,22 @@ pub trait LoopVariable<'a, G: Scope, T: Timestamp> { /// }); /// }); /// ``` - fn loop_variable(&mut self, summary: T::Summary) -> (HandleCore, D>, StreamCore, D>); + fn loop_variable(&mut self, summary: T::Summary) -> (Handle, D>, Stream, D>); } impl Feedback for G { - fn feedback(&mut self, summary: ::Summary) -> (Handle, Stream) { - self.feedback_core(summary) - } - - fn feedback_core(&mut self, summary: ::Summary) -> (HandleCore, StreamCore) { + fn feedback(&mut self, summary: ::Summary) -> (Handle, Stream) { let mut builder = OperatorBuilder::new("Feedback".to_owned(), self.clone()); let (output, stream) = builder.new_output(); - (HandleCore { builder, summary, output }, stream) + (Handle { builder, summary, output }, stream) } } impl<'a, G: Scope, T: Timestamp> LoopVariable<'a, G, T> for Iterative<'a, G, T> { - fn loop_variable(&mut self, summary: T::Summary) -> (HandleCore, D>, StreamCore, D>) { - self.feedback_core(Product::new(Default::default(), summary)) + fn loop_variable(&mut self, summary: T::Summary) -> (Handle, D>, Stream, D>) { + self.feedback(Product::new(Default::default(), summary)) } } @@ -129,11 +102,11 @@ pub trait ConnectLoop { /// .connect_loop(handle); /// }); /// ``` - fn connect_loop(&self, _: HandleCore); + fn connect_loop(&self, _: Handle); } -impl ConnectLoop for StreamCore { - fn connect_loop(&self, helper: HandleCore) { +impl ConnectLoop for Stream { + fn connect_loop(&self, helper: Handle) { let mut builder = helper.builder; let summary = helper.summary; @@ -159,11 +132,8 @@ impl ConnectLoop for StreamCore { /// A handle used to bind the source of a loop variable. #[derive(Debug)] -pub struct HandleCore { +pub struct Handle { builder: OperatorBuilder, summary: ::Summary, - output: OutputWrapper>, + output: OutputWrapper>, } - -/// A `HandleCore` specialized for using `Vec` as container -pub type Handle = HandleCore>; diff --git a/timely/src/dataflow/operators/filter.rs b/timely/src/dataflow/operators/filter.rs index 7034a9df09..391729f384 100644 --- a/timely/src/dataflow/operators/filter.rs +++ b/timely/src/dataflow/operators/filter.rs @@ -2,7 +2,7 @@ use crate::Data; use crate::dataflow::channels::pact::Pipeline; -use crate::dataflow::{Stream, Scope}; +use crate::dataflow::{Scope, Stream}; use crate::dataflow::operators::generic::operator::Operator; /// Extension trait for filtering. @@ -22,8 +22,8 @@ pub trait Filter { fn filterbool+'static>(&self, predicate: P) -> Self; } -impl Filter for Stream { - fn filterbool+'static>(&self, mut predicate: P) -> Stream { +impl Filter for Stream> { + fn filterbool+'static>(&self, mut predicate: P) -> Stream> { let mut vector = Vec::new(); self.unary(Pipeline, "Filter", move |_,_| move |input, output| { input.for_each(|time, data| { diff --git a/timely/src/dataflow/operators/flow_controlled.rs b/timely/src/dataflow/operators/flow_controlled.rs index 4f01bd8026..ea2ff65ede 100644 --- a/timely/src/dataflow/operators/flow_controlled.rs +++ b/timely/src/dataflow/operators/flow_controlled.rs @@ -5,7 +5,7 @@ use crate::order::{PartialOrder, TotalOrder}; use crate::progress::timestamp::Timestamp; use crate::dataflow::operators::generic::operator::source; use crate::dataflow::operators::probe::Handle; -use crate::dataflow::{Stream, Scope}; +use crate::dataflow::{Scope, Stream}; /// Output of the input reading function for iterator_source. pub struct IteratorSourceInput, I: IntoIterator> { @@ -82,7 +82,7 @@ pub fn iterator_source< name: &str, mut input_f: F, probe: Handle, - ) -> Stream where G::Timestamp: TotalOrder { + ) -> Stream> where G::Timestamp: TotalOrder { let mut target = G::Timestamp::minimum(); source(scope, name, |cap, info| { diff --git a/timely/src/dataflow/operators/generic/builder_raw.rs b/timely/src/dataflow/operators/generic/builder_raw.rs index 8e97492afd..c4a35cf9b5 100644 --- a/timely/src/dataflow/operators/generic/builder_raw.rs +++ b/timely/src/dataflow/operators/generic/builder_raw.rs @@ -14,9 +14,9 @@ use crate::progress::{Source, Target}; use crate::progress::{Timestamp, Operate, operate::SharedProgress, Antichain}; use crate::Container; -use crate::dataflow::{StreamCore, Scope}; -use crate::dataflow::channels::pushers::TeeCore; -use crate::dataflow::channels::pact::ParallelizationContractCore; +use crate::dataflow::{Stream, Scope}; +use crate::dataflow::channels::pushers::Tee; +use crate::dataflow::channels::pact::ParallelizationContract; use crate::dataflow::operators::generic::operator_info::OperatorInfo; /// Contains type-free information about the operator properties. @@ -105,17 +105,17 @@ impl OperatorBuilder { } /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use. - pub fn new_input(&mut self, stream: &StreamCore, pact: P) -> P::Puller + pub fn new_input(&mut self, stream: &Stream, pact: P) -> P::Puller where - P: ParallelizationContractCore { + P: ParallelizationContract { let connection = vec![Antichain::from_elem(Default::default()); self.shape.outputs]; self.new_input_connection(stream, pact, connection) } /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use. - pub fn new_input_connection(&mut self, stream: &StreamCore, pact: P, connection: Vec::Summary>>) -> P::Puller + pub fn new_input_connection(&mut self, stream: &Stream, pact: P, connection: Vec::Summary>>) -> P::Puller where - P: ParallelizationContractCore { + P: ParallelizationContract { let channel_id = self.scope.new_identifier(); let logging = self.scope.logging(); @@ -131,18 +131,18 @@ impl OperatorBuilder { } /// Adds a new output to a generic operator builder, returning the `Push` implementor to use. - pub fn new_output(&mut self) -> (TeeCore, StreamCore) { + pub fn new_output(&mut self) -> (Tee, Stream) { let connection = vec![Antichain::from_elem(Default::default()); self.shape.inputs]; self.new_output_connection(connection) } /// Adds a new output to a generic operator builder, returning the `Push` implementor to use. - pub fn new_output_connection(&mut self, connection: Vec::Summary>>) -> (TeeCore, StreamCore) { + pub fn new_output_connection(&mut self, connection: Vec::Summary>>) -> (Tee, Stream) { - let (targets, registrar) = TeeCore::::new(); + let (targets, registrar) = Tee::::new(); let source = Source::new(self.index, self.shape.outputs); - let stream = StreamCore::new(source, registrar, self.scope.clone()); + let stream = Stream::new(source, registrar, self.scope.clone()); self.shape.outputs += 1; assert_eq!(self.shape.inputs, connection.len()); diff --git a/timely/src/dataflow/operators/generic/builder_rc.rs b/timely/src/dataflow/operators/generic/builder_rc.rs index ca80e31828..ced9390eaa 100644 --- a/timely/src/dataflow/operators/generic/builder_rc.rs +++ b/timely/src/dataflow/operators/generic/builder_rc.rs @@ -9,14 +9,14 @@ use crate::progress::operate::SharedProgress; use crate::progress::frontier::{Antichain, MutableAntichain}; use crate::Container; -use crate::dataflow::{Scope, StreamCore}; -use crate::dataflow::channels::pushers::TeeCore; -use crate::dataflow::channels::pushers::CounterCore as PushCounter; -use crate::dataflow::channels::pushers::buffer::BufferCore as PushBuffer; -use crate::dataflow::channels::pact::ParallelizationContractCore; +use crate::dataflow::{Scope, Stream}; +use crate::dataflow::channels::pushers::Tee; +use crate::dataflow::channels::pushers::Counter as PushCounter; +use crate::dataflow::channels::pushers::buffer::Buffer as PushBuffer; +use crate::dataflow::channels::pact::ParallelizationContract; use crate::dataflow::channels::pullers::Counter as PullCounter; use crate::dataflow::operators::capability::Capability; -use crate::dataflow::operators::generic::handles::{InputHandleCore, new_input_handle, OutputWrapper}; +use crate::dataflow::operators::generic::handles::{InputHandle, new_input_handle, OutputWrapper}; use crate::dataflow::operators::generic::operator_info::OperatorInfo; use crate::dataflow::operators::generic::builder_raw::OperatorShape; @@ -59,9 +59,9 @@ impl OperatorBuilder { } /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use. - pub fn new_input(&mut self, stream: &StreamCore, pact: P) -> InputHandleCore + pub fn new_input(&mut self, stream: &Stream, pact: P) -> InputHandle where - P: ParallelizationContractCore { + P: ParallelizationContract { let connection = vec![Antichain::from_elem(Default::default()); self.builder.shape().outputs()]; self.new_input_connection(stream, pact, connection) @@ -75,9 +75,9 @@ impl OperatorBuilder { /// /// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty /// antichain indicating that there is no connection from the input to the output. - pub fn new_input_connection(&mut self, stream: &StreamCore, pact: P, connection: Vec::Summary>>) -> InputHandleCore + pub fn new_input_connection(&mut self, stream: &Stream, pact: P, connection: Vec::Summary>>) -> InputHandle where - P: ParallelizationContractCore { + P: ParallelizationContract { let puller = self.builder.new_input_connection(stream, pact, connection.clone()); @@ -92,7 +92,7 @@ impl OperatorBuilder { } /// Adds a new output to a generic operator builder, returning the `Push` implementor to use. - pub fn new_output(&mut self) -> (OutputWrapper>, StreamCore) { + pub fn new_output(&mut self) -> (OutputWrapper>, Stream) { let connection = vec![Antichain::from_elem(Default::default()); self.builder.shape().inputs()]; self.new_output_connection(connection) } @@ -105,7 +105,7 @@ impl OperatorBuilder { /// /// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty /// antichain indicating that there is no connection from the input to the output. - pub fn new_output_connection(&mut self, connection: Vec::Summary>>) -> (OutputWrapper>, StreamCore) { + pub fn new_output_connection(&mut self, connection: Vec::Summary>>) -> (OutputWrapper>, Stream) { let (tee, stream) = self.builder.new_output_connection(connection.clone()); diff --git a/timely/src/dataflow/operators/generic/handles.rs b/timely/src/dataflow/operators/generic/handles.rs index eeddf52952..fea24d77b9 100644 --- a/timely/src/dataflow/operators/generic/handles.rs +++ b/timely/src/dataflow/operators/generic/handles.rs @@ -11,9 +11,9 @@ use crate::progress::Timestamp; use crate::progress::ChangeBatch; use crate::progress::frontier::MutableAntichain; use crate::dataflow::channels::pullers::Counter as PullCounter; -use crate::dataflow::channels::pushers::CounterCore as PushCounter; -use crate::dataflow::channels::pushers::buffer::{BufferCore, Session}; -use crate::dataflow::channels::BundleCore; +use crate::dataflow::channels::pushers::Counter as PushCounter; +use crate::dataflow::channels::pushers::buffer::{Buffer, Session}; +use crate::dataflow::channels::Bundle; use crate::communication::{Push, Pull, message::RefOrMut}; use crate::Container; use crate::logging::TimelyLogger as Logger; @@ -22,7 +22,7 @@ use crate::dataflow::operators::InputCapability; use crate::dataflow::operators::capability::CapabilityTrait; /// Handle to an operator's input stream. -pub struct InputHandleCore>> { +pub struct InputHandle>> { pull_counter: PullCounter, internal: Rc>>>>>, /// Timestamp summaries from this input to each output. @@ -33,21 +33,15 @@ pub struct InputHandleCore> logging: Option, } -/// Handle to an operator's input stream, specialized to vectors. -pub type InputHandle = InputHandleCore, P>; - /// Handle to an operator's input stream and frontier. -pub struct FrontieredInputHandleCore<'a, T: Timestamp, D: Container+'a, P: Pull>+'a> { +pub struct FrontieredInputHandle<'a, T: Timestamp, D: Container+'a, P: Pull>+'a> { /// The underlying input handle. - pub handle: &'a mut InputHandleCore, + pub handle: &'a mut InputHandle, /// The frontier as reported by timely progress tracking. pub frontier: &'a MutableAntichain, } -/// Handle to an operator's input stream and frontier, specialized to vectors. -pub type FrontieredInputHandle<'a, T, D, P> = FrontieredInputHandleCore<'a, T, Vec, P>; - -impl<'a, T: Timestamp, D: Container, P: Pull>> InputHandleCore { +impl<'a, T: Timestamp, D: Container, P: Pull>> InputHandle { /// Reads the next input buffer (at some timestamp `t`) and a corresponding capability for `t`. /// The timestamp `t` of the input buffer can be retrieved by invoking `.time()` on the capability. @@ -99,10 +93,10 @@ impl<'a, T: Timestamp, D: Container, P: Pull>> InputHandleCore< } -impl<'a, T: Timestamp, D: Container, P: Pull>+'a> FrontieredInputHandleCore<'a, T, D, P> { +impl<'a, T: Timestamp, D: Container, P: Pull>+'a> FrontieredInputHandle<'a, T, D, P> { /// Allocate a new frontiered input handle. - pub fn new(handle: &'a mut InputHandleCore, frontier: &'a MutableAntichain) -> Self { - FrontieredInputHandleCore { + pub fn new(handle: &'a mut InputHandle, frontier: &'a MutableAntichain) -> Self { + FrontieredInputHandle { handle, frontier, } @@ -146,19 +140,19 @@ impl<'a, T: Timestamp, D: Container, P: Pull>+'a> FrontieredInp } } -pub fn _access_pull_counter>>(input: &mut InputHandleCore) -> &mut PullCounter { +pub fn _access_pull_counter>>(input: &mut InputHandle) -> &mut PullCounter { &mut input.pull_counter } /// Constructs an input handle. /// Declared separately so that it can be kept private when `InputHandle` is re-exported. -pub fn new_input_handle>>( +pub fn new_input_handle>>( pull_counter: PullCounter, internal: Rc>>>>>, summaries: Rc>>>, logging: Option -) -> InputHandleCore { - InputHandleCore { +) -> InputHandle { + InputHandle { pull_counter, internal, summaries, @@ -172,14 +166,14 @@ pub fn new_input_handle>>( /// than with an `OutputHandle`, whose methods ensure that capabilities are used and that the /// pusher is flushed (via the `cease` method) once it is no longer used. #[derive(Debug)] -pub struct OutputWrapper>> { - push_buffer: BufferCore>, +pub struct OutputWrapper>> { + push_buffer: Buffer>, internal_buffer: Rc>>, } -impl>> OutputWrapper { +impl>> OutputWrapper { /// Creates a new output wrapper from a push buffer. - pub fn new(push_buffer: BufferCore>, internal_buffer: Rc>>) -> Self { + pub fn new(push_buffer: Buffer>, internal_buffer: Rc>>) -> Self { OutputWrapper { push_buffer, internal_buffer, @@ -189,8 +183,8 @@ impl>> OutputWrapper OutputHandleCore { - OutputHandleCore { + pub fn activate(&mut self) -> OutputHandle { + OutputHandle { push_buffer: &mut self.push_buffer, internal_buffer: &self.internal_buffer, } @@ -199,15 +193,12 @@ impl>> OutputWrapper>+'a> { - push_buffer: &'a mut BufferCore>, +pub struct OutputHandle<'a, T: Timestamp, C: Container+'a, P: Push>+'a> { + push_buffer: &'a mut Buffer>, internal_buffer: &'a Rc>>, } -/// Handle specialized to `Vec`-based container. -pub type OutputHandle<'a, T, D, P> = OutputHandleCore<'a, T, Vec, P>; - -impl<'a, T: Timestamp, C: Container, P: Push>> OutputHandleCore<'a, T, C, P> { +impl<'a, T: Timestamp, C: Container, P: Push>> OutputHandle<'a, T, C, P> { /// Obtains a session that can send data at the timestamp associated with capability `cap`. /// /// In order to send data at a future timestamp, obtain a capability for the new timestamp @@ -241,7 +232,7 @@ impl<'a, T: Timestamp, C: Container, P: Push>> OutputHandleCore } } -impl<'a, T: Timestamp, C: Container, P: Push>> Drop for OutputHandleCore<'a, T, C, P> { +impl<'a, T: Timestamp, C: Container, P: Push>> Drop for OutputHandle<'a, T, C, P> { fn drop(&mut self) { self.push_buffer.cease(); } diff --git a/timely/src/dataflow/operators/generic/mod.rs b/timely/src/dataflow/operators/generic/mod.rs index e54f5e4234..e22f0b548b 100644 --- a/timely/src/dataflow/operators/generic/mod.rs +++ b/timely/src/dataflow/operators/generic/mod.rs @@ -8,7 +8,7 @@ mod handles; mod notificator; mod operator_info; -pub use self::handles::{InputHandle, InputHandleCore, FrontieredInputHandle, FrontieredInputHandleCore, OutputHandle, OutputHandleCore, OutputWrapper}; +pub use self::handles::{InputHandle, FrontieredInputHandle, OutputHandle, OutputWrapper}; pub use self::notificator::{Notificator, FrontierNotificator}; pub use self::operator::{Operator, source}; diff --git a/timely/src/dataflow/operators/generic/notificator.rs b/timely/src/dataflow/operators/generic/notificator.rs index 6c8dd8ea94..445cbfaac2 100644 --- a/timely/src/dataflow/operators/generic/notificator.rs +++ b/timely/src/dataflow/operators/generic/notificator.rs @@ -193,8 +193,8 @@ fn notificator_delivers_notifications_in_topo_order() { /// /// timely::execute(timely::Config::thread(), |worker| { /// let (mut in1, mut in2) = worker.dataflow::(|scope| { -/// let (in1_handle, in1) = scope.new_input(); -/// let (in2_handle, in2) = scope.new_input(); +/// let (in1_handle, in1) = scope.new_input::>(); +/// let (in2_handle, in2) = scope.new_input::>(); /// in1.binary_frontier(&in2, Pipeline, Pipeline, "example", |mut _default_cap, _info| { /// let mut notificator = FrontierNotificator::new(); /// let mut stash = HashMap::new(); diff --git a/timely/src/dataflow/operators/generic/operator.rs b/timely/src/dataflow/operators/generic/operator.rs index afb7a25d11..736b758aaf 100644 --- a/timely/src/dataflow/operators/generic/operator.rs +++ b/timely/src/dataflow/operators/generic/operator.rs @@ -1,13 +1,13 @@ //! Methods to construct generic streaming and blocking unary operators. -use crate::dataflow::channels::pushers::TeeCore; -use crate::dataflow::channels::pact::ParallelizationContractCore; +use crate::dataflow::channels::pushers::Tee; +use crate::dataflow::channels::pact::ParallelizationContract; -use crate::dataflow::operators::generic::handles::{InputHandleCore, FrontieredInputHandleCore, OutputHandleCore}; +use crate::dataflow::operators::generic::handles::{InputHandle, FrontieredInputHandle, OutputHandle}; use crate::dataflow::operators::capability::Capability; -use crate::dataflow::{Scope, StreamCore}; +use crate::dataflow::{Scope, Stream}; use super::builder_rc::OperatorBuilder; use crate::dataflow::operators::generic::OperatorInfo; @@ -55,13 +55,13 @@ pub trait Operator { /// }); /// } /// ``` - fn unary_frontier(&self, pact: P, name: &str, constructor: B) -> StreamCore + fn unary_frontier(&self, pact: P, name: &str, constructor: B) -> Stream where D2: Container, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut FrontieredInputHandleCore, - &mut OutputHandleCore>)+'static, - P: ParallelizationContractCore; + L: FnMut(&mut FrontieredInputHandle, + &mut OutputHandle>)+'static, + P: ParallelizationContract; /// Creates a new dataflow operator that partitions its input stream by a parallelization /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`. @@ -93,11 +93,11 @@ pub trait Operator { /// } /// ``` fn unary_notify, - &mut OutputHandleCore>, + L: FnMut(&mut InputHandle, + &mut OutputHandle>, &mut Notificator)+'static, - P: ParallelizationContractCore> - (&self, pact: P, name: &str, init: impl IntoIterator, logic: L) -> StreamCore; + P: ParallelizationContract> + (&self, pact: P, name: &str, init: impl IntoIterator, logic: L) -> Stream; /// Creates a new dataflow operator that partitions its input stream by a parallelization /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`. @@ -127,13 +127,13 @@ pub trait Operator { /// }); /// }); /// ``` - fn unary(&self, pact: P, name: &str, constructor: B) -> StreamCore + fn unary(&self, pact: P, name: &str, constructor: B) -> Stream where D2: Container, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut InputHandleCore, - &mut OutputHandleCore>)+'static, - P: ParallelizationContractCore; + L: FnMut(&mut InputHandle, + &mut OutputHandle>)+'static, + P: ParallelizationContract; /// Creates a new dataflow operator that partitions its input streams by a parallelization /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`. @@ -148,8 +148,8 @@ pub trait Operator { /// /// timely::execute(timely::Config::thread(), |worker| { /// let (mut in1, mut in2) = worker.dataflow::(|scope| { - /// let (in1_handle, in1) = scope.new_input(); - /// let (in2_handle, in2) = scope.new_input(); + /// let (in1_handle, in1) = scope.new_input::>(); + /// let (in2_handle, in2) = scope.new_input::>(); /// in1.binary_frontier(&in2, Pipeline, Pipeline, "example", |mut _default_cap, _info| { /// let mut notificator = FrontierNotificator::new(); /// let mut stash = HashMap::new(); @@ -185,16 +185,16 @@ pub trait Operator { /// } /// }).unwrap(); /// ``` - fn binary_frontier(&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore + fn binary_frontier(&self, other: &Stream, pact1: P1, pact2: P2, name: &str, constructor: B) -> Stream where D2: Container, D3: Container, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut FrontieredInputHandleCore, - &mut FrontieredInputHandleCore, - &mut OutputHandleCore>)+'static, - P1: ParallelizationContractCore, - P2: ParallelizationContractCore; + L: FnMut(&mut FrontieredInputHandle, + &mut FrontieredInputHandle, + &mut OutputHandle>)+'static, + P1: ParallelizationContract, + P2: ParallelizationContract; /// Creates a new dataflow operator that partitions its input streams by a parallelization /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`. @@ -209,8 +209,8 @@ pub trait Operator { /// /// timely::execute(timely::Config::thread(), |worker| { /// let (mut in1, mut in2) = worker.dataflow::(|scope| { - /// let (in1_handle, in1) = scope.new_input(); - /// let (in2_handle, in2) = scope.new_input(); + /// let (in1_handle, in1) = scope.new_input::>(); + /// let (in2_handle, in2) = scope.new_input::>(); /// /// let mut vector1 = Vec::new(); /// let mut vector2 = Vec::new(); @@ -243,13 +243,13 @@ pub trait Operator { /// ``` fn binary_notify, - &mut InputHandleCore, - &mut OutputHandleCore>, + L: FnMut(&mut InputHandle, + &mut InputHandle, + &mut OutputHandle>, &mut Notificator)+'static, - P1: ParallelizationContractCore, - P2: ParallelizationContractCore> - (&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator, logic: L) -> StreamCore; + P1: ParallelizationContract, + P2: ParallelizationContract> + (&self, other: &Stream, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator, logic: L) -> Stream; /// Creates a new dataflow operator that partitions its input streams by a parallelization /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`. @@ -285,16 +285,16 @@ pub trait Operator { /// }).inspect(|x| println!("{:?}", x)); /// }); /// ``` - fn binary(&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore + fn binary(&self, other: &Stream, pact1: P1, pact2: P2, name: &str, constructor: B) -> Stream where D2: Container, D3: Container, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut InputHandleCore, - &mut InputHandleCore, - &mut OutputHandleCore>)+'static, - P1: ParallelizationContractCore, - P2: ParallelizationContractCore; + L: FnMut(&mut InputHandle, + &mut InputHandle, + &mut OutputHandle>)+'static, + P1: ParallelizationContract, + P2: ParallelizationContract; /// Creates a new dataflow operator that partitions its input stream by a parallelization /// strategy `pact`, and repeatedly invokes the function `logic` which can read from the input stream @@ -321,19 +321,19 @@ pub trait Operator { /// ``` fn sink(&self, pact: P, name: &str, logic: L) where - L: FnMut(&mut FrontieredInputHandleCore)+'static, - P: ParallelizationContractCore; + L: FnMut(&mut FrontieredInputHandle)+'static, + P: ParallelizationContract; } -impl Operator for StreamCore { +impl Operator for Stream { - fn unary_frontier(&self, pact: P, name: &str, constructor: B) -> StreamCore + fn unary_frontier(&self, pact: P, name: &str, constructor: B) -> Stream where D2: Container, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut FrontieredInputHandleCore, - &mut OutputHandleCore>)+'static, - P: ParallelizationContractCore { + L: FnMut(&mut FrontieredInputHandle, + &mut OutputHandle>)+'static, + P: ParallelizationContract { let mut builder = OperatorBuilder::new(name.to_owned(), self.scope()); let operator_info = builder.operator_info(); @@ -346,7 +346,7 @@ impl Operator for StreamCore { let capability = capabilities.pop().unwrap(); let mut logic = constructor(capability, operator_info); move |frontiers| { - let mut input_handle = FrontieredInputHandleCore::new(&mut input, &frontiers[0]); + let mut input_handle = FrontieredInputHandle::new(&mut input, &frontiers[0]); let mut output_handle = output.activate(); logic(&mut input_handle, &mut output_handle); } @@ -356,11 +356,11 @@ impl Operator for StreamCore { } fn unary_notify, - &mut OutputHandleCore>, + L: FnMut(&mut InputHandle, + &mut OutputHandle>, &mut Notificator)+'static, - P: ParallelizationContractCore> - (&self, pact: P, name: &str, init: impl IntoIterator, mut logic: L) -> StreamCore { + P: ParallelizationContract> + (&self, pact: P, name: &str, init: impl IntoIterator, mut logic: L) -> Stream { self.unary_frontier(pact, name, move |capability, _info| { let mut notificator = FrontierNotificator::new(); @@ -377,13 +377,13 @@ impl Operator for StreamCore { }) } - fn unary(&self, pact: P, name: &str, constructor: B) -> StreamCore + fn unary(&self, pact: P, name: &str, constructor: B) -> Stream where D2: Container, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut InputHandleCore, - &mut OutputHandleCore>)+'static, - P: ParallelizationContractCore { + L: FnMut(&mut InputHandle, + &mut OutputHandle>)+'static, + P: ParallelizationContract { let mut builder = OperatorBuilder::new(name.to_owned(), self.scope()); let operator_info = builder.operator_info(); @@ -405,16 +405,16 @@ impl Operator for StreamCore { stream } - fn binary_frontier(&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore + fn binary_frontier(&self, other: &Stream, pact1: P1, pact2: P2, name: &str, constructor: B) -> Stream where D2: Container, D3: Container, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut FrontieredInputHandleCore, - &mut FrontieredInputHandleCore, - &mut OutputHandleCore>)+'static, - P1: ParallelizationContractCore, - P2: ParallelizationContractCore { + L: FnMut(&mut FrontieredInputHandle, + &mut FrontieredInputHandle, + &mut OutputHandle>)+'static, + P1: ParallelizationContract, + P2: ParallelizationContract { let mut builder = OperatorBuilder::new(name.to_owned(), self.scope()); let operator_info = builder.operator_info(); @@ -428,8 +428,8 @@ impl Operator for StreamCore { let capability = capabilities.pop().unwrap(); let mut logic = constructor(capability, operator_info); move |frontiers| { - let mut input1_handle = FrontieredInputHandleCore::new(&mut input1, &frontiers[0]); - let mut input2_handle = FrontieredInputHandleCore::new(&mut input2, &frontiers[1]); + let mut input1_handle = FrontieredInputHandle::new(&mut input1, &frontiers[0]); + let mut input2_handle = FrontieredInputHandle::new(&mut input2, &frontiers[1]); let mut output_handle = output.activate(); logic(&mut input1_handle, &mut input2_handle, &mut output_handle); } @@ -440,13 +440,13 @@ impl Operator for StreamCore { fn binary_notify, - &mut InputHandleCore, - &mut OutputHandleCore>, + L: FnMut(&mut InputHandle, + &mut InputHandle, + &mut OutputHandle>, &mut Notificator)+'static, - P1: ParallelizationContractCore, - P2: ParallelizationContractCore> - (&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator, mut logic: L) -> StreamCore { + P1: ParallelizationContract, + P2: ParallelizationContract> + (&self, other: &Stream, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator, mut logic: L) -> Stream { self.binary_frontier(other, pact1, pact2, name, |capability, _info| { let mut notificator = FrontierNotificator::new(); @@ -465,16 +465,16 @@ impl Operator for StreamCore { } - fn binary(&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore + fn binary(&self, other: &Stream, pact1: P1, pact2: P2, name: &str, constructor: B) -> Stream where D2: Container, D3: Container, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut InputHandleCore, - &mut InputHandleCore, - &mut OutputHandleCore>)+'static, - P1: ParallelizationContractCore, - P2: ParallelizationContractCore { + L: FnMut(&mut InputHandle, + &mut InputHandle, + &mut OutputHandle>)+'static, + P1: ParallelizationContract, + P2: ParallelizationContract { let mut builder = OperatorBuilder::new(name.to_owned(), self.scope()); let operator_info = builder.operator_info(); @@ -499,15 +499,15 @@ impl Operator for StreamCore { fn sink(&self, pact: P, name: &str, mut logic: L) where - L: FnMut(&mut FrontieredInputHandleCore)+'static, - P: ParallelizationContractCore { + L: FnMut(&mut FrontieredInputHandle)+'static, + P: ParallelizationContract { let mut builder = OperatorBuilder::new(name.to_owned(), self.scope()); let mut input = builder.new_input(self, pact); builder.build(|_capabilities| { move |frontiers| { - let mut input_handle = FrontieredInputHandleCore::new(&mut input, &frontiers[0]); + let mut input_handle = FrontieredInputHandle::new(&mut input, &frontiers[0]); logic(&mut input_handle); } }); @@ -555,11 +555,11 @@ impl Operator for StreamCore { /// .inspect(|x| println!("number: {:?}", x)); /// }); /// ``` -pub fn source(scope: &G, name: &str, constructor: B) -> StreamCore +pub fn source(scope: &G, name: &str, constructor: B) -> Stream where D: Container, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut OutputHandleCore>)+'static { + L: FnMut(&mut OutputHandle>)+'static { let mut builder = OperatorBuilder::new(name.to_owned(), scope.clone()); let operator_info = builder.operator_info(); @@ -599,7 +599,7 @@ where /// /// }); /// ``` -pub fn empty(scope: &G) -> StreamCore { +pub fn empty(scope: &G) -> Stream { source(scope, "Empty", |_capability, _info| |_output| { // drop capability, do nothing }) diff --git a/timely/src/dataflow/operators/input.rs b/timely/src/dataflow/operators/input.rs index 72719c5f85..fc55a319fd 100644 --- a/timely/src/dataflow/operators/input.rs +++ b/timely/src/dataflow/operators/input.rs @@ -11,8 +11,8 @@ use crate::progress::Source; use crate::{Container, Data}; use crate::communication::Push; -use crate::dataflow::{Stream, ScopeParent, Scope, StreamCore}; -use crate::dataflow::channels::pushers::{TeeCore, CounterCore}; +use crate::dataflow::{ScopeParent, Scope, Stream}; +use crate::dataflow::channels::pushers::{Tee, Counter}; use crate::dataflow::channels::Message; @@ -25,44 +25,9 @@ use crate::dataflow::channels::Message; /// Create a new `Stream` and `Handle` through which to supply input. pub trait Input : Scope { - /// Create a new `Stream` and `Handle` through which to supply input. + /// Create a new [Stream] and [Handle] through which to supply input. /// - /// The `new_input` method returns a pair `(Handle, Stream)` where the `Stream` can be used - /// immediately for timely dataflow construction, and the `Handle` is later used to introduce - /// data into the timely dataflow computation. - /// - /// The `Handle` also provides a means to indicate - /// to timely dataflow that the input has advanced beyond certain timestamps, allowing timely - /// to issue progress notifications. - /// - /// # Examples - /// ``` - /// use timely::*; - /// use timely::dataflow::operators::{Input, Inspect}; - /// - /// // construct and execute a timely dataflow - /// timely::execute(Config::thread(), |worker| { - /// - /// // add an input and base computation off of it - /// let mut input = worker.dataflow(|scope| { - /// let (input, stream) = scope.new_input(); - /// stream.inspect(|x| println!("hello {:?}", x)); - /// input - /// }); - /// - /// // introduce input, advance computation - /// for round in 0..10 { - /// input.send(round); - /// input.advance_to(round + 1); - /// worker.step(); - /// } - /// }); - /// ``` - fn new_input(&mut self) -> (Handle<::Timestamp, D>, Stream); - - /// Create a new [StreamCore] and [HandleCore] through which to supply input. - /// - /// The `new_input_core` method returns a pair `(HandleCore, StreamCore)` where the [StreamCore] can be used + /// The `new_input_core` method returns a pair `(HandleCore, Stream)` where the [Stream] can be used /// immediately for timely dataflow construction, and the `HandleCore` is later used to introduce /// data into the timely dataflow computation. /// @@ -80,7 +45,7 @@ pub trait Input : Scope { /// /// // add an input and base computation off of it /// let mut input = worker.dataflow(|scope| { - /// let (input, stream) = scope.new_input_core::>(); + /// let (input, stream) = scope.new_input(); /// stream.inspect(|x| println!("hello {:?}", x)); /// input /// }); @@ -93,7 +58,7 @@ pub trait Input : Scope { /// } /// }); /// ``` - fn new_input_core(&mut self) -> (HandleCore<::Timestamp, D>, StreamCore); + fn new_input(&mut self) -> (Handle<::Timestamp, D>, Stream); /// Create a new stream from a supplied interactive handle. /// @@ -125,60 +90,20 @@ pub trait Input : Scope { /// } /// }); /// ``` - fn input_from(&mut self, handle: &mut Handle<::Timestamp, D>) -> Stream; - - /// Create a new stream from a supplied interactive handle. - /// - /// This method creates a new timely stream whose data are supplied interactively through the `handle` - /// argument. Each handle may be used multiple times (or not at all), and will clone data as appropriate - /// if it as attached to more than one stream. - /// - /// # Examples - /// ``` - /// use timely::*; - /// use timely::dataflow::operators::{Input, Inspect}; - /// use timely::dataflow::operators::input::Handle; - /// - /// // construct and execute a timely dataflow - /// timely::execute(Config::thread(), |worker| { - /// - /// // add an input and base computation off of it - /// let mut input = Handle::new(); - /// worker.dataflow(|scope| { - /// scope.input_from_core(&mut input) - /// .inspect(|x| println!("hello {:?}", x)); - /// }); - /// - /// // introduce input, advance computation - /// for round in 0..10 { - /// input.send(round); - /// input.advance_to(round + 1); - /// worker.step(); - /// } - /// }); - /// ``` - fn input_from_core(&mut self, handle: &mut HandleCore<::Timestamp, D>) -> StreamCore; + fn input_from(&mut self, handle: &mut Handle<::Timestamp, D>) -> Stream; } use crate::order::TotalOrder; impl Input for G where ::Timestamp: TotalOrder { - fn new_input(&mut self) -> (Handle<::Timestamp, D>, Stream) { - self.new_input_core() - } - - fn input_from(&mut self, handle: &mut Handle<::Timestamp, D>) -> Stream { - self.input_from_core(handle) - } - - fn new_input_core(&mut self) -> (HandleCore<::Timestamp, D>, StreamCore) { - let mut handle = HandleCore::new(); - let stream = self.input_from_core(&mut handle); + fn new_input(&mut self) -> (Handle<::Timestamp, D>, Stream) { + let mut handle = Handle::new(); + let stream = self.input_from(&mut handle); (handle, stream) } - fn input_from_core(&mut self, handle: &mut HandleCore<::Timestamp, D>) -> StreamCore { - let (output, registrar) = TeeCore::<::Timestamp, D>::new(); - let counter = CounterCore::new(output); + fn input_from(&mut self, handle: &mut Handle<::Timestamp, D>) -> Stream { + let (output, registrar) = Tee::<::Timestamp, D>::new(); + let counter = Counter::new(output); let produced = counter.produced().clone(); let index = self.allocate_operator_index(); @@ -202,7 +127,7 @@ impl Input for G where ::Timestamp: TotalOrder { copies, }), index); - StreamCore::new(Source::new(index, 0), registrar, self.clone()) + Stream::new(Source::new(index, 0), registrar, self.clone()) } } @@ -246,19 +171,16 @@ impl Operate for Operator { /// A handle to an input `Stream`, used to introduce data to a timely dataflow computation. #[derive(Debug)] -pub struct HandleCore { +pub struct Handle { activate: Vec, progress: Vec>>>, - pushers: Vec>>, + pushers: Vec>>, buffer1: C, buffer2: C, now_at: T, } -/// A handle specialized to vector-based containers. -pub type Handle = HandleCore>; - -impl HandleCore { +impl Handle { /// Allocates a new input handle, from which one can create timely streams. /// /// # Examples @@ -322,17 +244,17 @@ impl HandleCore { /// } /// }); /// ``` - pub fn to_stream(&mut self, scope: &mut G) -> StreamCore + pub fn to_stream(&mut self, scope: &mut G) -> Stream where T: TotalOrder, G: ScopeParent, { - scope.input_from_core(self) + scope.input_from(self) } fn register( &mut self, - pusher: CounterCore>, + pusher: Counter>, progress: Rc>>, ) { // flush current contents, so new registrant does not see existing data. @@ -379,7 +301,7 @@ impl HandleCore { } } - /// Sends a batch of records into the corresponding timely dataflow [StreamCore], at the current epoch. + /// Sends a batch of records into the corresponding timely dataflow [Stream], at the current epoch. /// /// This method flushes single elements previously sent with `send`, to keep the insertion order. /// @@ -387,15 +309,15 @@ impl HandleCore { /// ``` /// use timely::*; /// use timely::dataflow::operators::{Input, InspectCore}; - /// use timely::dataflow::operators::input::HandleCore; + /// use timely::dataflow::operators::input::Handle; /// /// // construct and execute a timely dataflow /// timely::execute(Config::thread(), |worker| { /// /// // add an input and base computation off of it - /// let mut input = HandleCore::new(); + /// let mut input = Handle::new(); /// worker.dataflow(|scope| { - /// scope.input_from_core(&mut input) + /// scope.input_from(&mut input) /// .inspect_container(|x| println!("hello {:?}", x)); /// }); /// @@ -463,7 +385,7 @@ impl HandleCore { } } -impl Handle { +impl Handle> { #[inline] /// Sends one record into the corresponding timely dataflow `Stream`, at the current epoch. /// @@ -500,13 +422,13 @@ impl Handle { } } -impl Default for Handle { +impl Default for Handle> { fn default() -> Self { Self::new() } } -impl Drop for HandleCore { +impl Drop for Handle { fn drop(&mut self) { self.close_epoch(); } diff --git a/timely/src/dataflow/operators/inspect.rs b/timely/src/dataflow/operators/inspect.rs index 6e26856d18..26f9580d9a 100644 --- a/timely/src/dataflow/operators/inspect.rs +++ b/timely/src/dataflow/operators/inspect.rs @@ -5,7 +5,7 @@ use timely_container::columnation::{Columnation, TimelyStack}; use crate::Container; use crate::Data; use crate::dataflow::channels::pact::Pipeline; -use crate::dataflow::{Scope, StreamCore}; +use crate::dataflow::{Scope, Stream}; use crate::dataflow::operators::generic::Operator; /// Methods to inspect records and batches of records on a stream. @@ -87,19 +87,19 @@ pub trait Inspect: InspectCore + Sized { fn inspect_core(&self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &[C::Item]), &[G::Timestamp]>)+'static; } -impl Inspect> for StreamCore> { +impl Inspect> for Stream> { fn inspect_core(&self, mut func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &[D]), &[G::Timestamp]>) + 'static { self.inspect_container(move |r| func(r.map(|(t, c)| (t, &c[..])))) } } -impl Inspect> for StreamCore> { +impl Inspect> for Stream> { fn inspect_core(&self, mut func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &[D]), &[G::Timestamp]>) + 'static { self.inspect_container(move |r| func(r.map(|(t, c)| (t, &c[..])))) } } -impl Inspect> for StreamCore> +impl Inspect> for Stream> where C: AsRef<[C::Item]> { fn inspect_core(&self, mut func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &[C::Item]), &[G::Timestamp]>) + 'static { @@ -128,12 +128,12 @@ pub trait InspectCore { /// }); /// }); /// ``` - fn inspect_container(&self, func: F) -> StreamCore where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static; + fn inspect_container(&self, func: F) -> Stream where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static; } -impl InspectCore for StreamCore { +impl InspectCore for Stream { - fn inspect_container(&self, mut func: F) -> StreamCore + fn inspect_container(&self, mut func: F) -> Stream where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static { use crate::progress::timestamp::Timestamp; diff --git a/timely/src/dataflow/operators/map.rs b/timely/src/dataflow/operators/map.rs index 77eced3f68..c3fcecab67 100644 --- a/timely/src/dataflow/operators/map.rs +++ b/timely/src/dataflow/operators/map.rs @@ -1,7 +1,7 @@ //! Extension methods for `Stream` based on record-by-record transformation. use crate::Data; -use crate::dataflow::{Stream, Scope}; +use crate::dataflow::{Scope, Stream}; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::generic::operator::Operator; @@ -19,7 +19,7 @@ pub trait Map { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn mapD2+'static>(&self, logic: L) -> Stream; + fn mapD2+'static>(&self, logic: L) -> Stream>; /// Updates each element of the stream and yields the element, re-using memory where possible. /// /// # Examples @@ -32,7 +32,7 @@ pub trait Map { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn map_in_place(&self, logic: L) -> Stream; + fn map_in_place(&self, logic: L) -> Stream>; /// Consumes each element of the stream and yields some number of new elements. /// /// # Examples @@ -45,11 +45,11 @@ pub trait Map { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn flat_mapI+'static>(&self, logic: L) -> Stream where I::Item: Data; + fn flat_mapI+'static>(&self, logic: L) -> Stream::Item>> where I::Item: Data; } -impl Map for Stream { - fn mapD2+'static>(&self, mut logic: L) -> Stream { +impl Map for Stream> { + fn mapD2+'static>(&self, mut logic: L) -> Stream> { let mut vector = Vec::new(); self.unary(Pipeline, "Map", move |_,_| move |input, output| { input.for_each(|time, data| { @@ -58,7 +58,7 @@ impl Map for Stream { }); }) } - fn map_in_place(&self, mut logic: L) -> Stream { + fn map_in_place(&self, mut logic: L) -> Stream> { let mut vector = Vec::new(); self.unary(Pipeline, "MapInPlace", move |_,_| move |input, output| { input.for_each(|time, data| { @@ -71,7 +71,7 @@ impl Map for Stream { // TODO : This would be more robust if it captured an iterator and then pulled an appropriate // TODO : number of elements from the iterator. This would allow iterators that produce many // TODO : records without taking arbitrarily long and arbitrarily much memory. - fn flat_mapI+'static>(&self, mut logic: L) -> Stream where I::Item: Data { + fn flat_mapI+'static>(&self, mut logic: L) -> Stream::Item>> where I::Item: Data { let mut vector = Vec::new(); self.unary(Pipeline, "FlatMap", move |_,_| move |input, output| { input.for_each(|time, data| { diff --git a/timely/src/dataflow/operators/mod.rs b/timely/src/dataflow/operators/mod.rs index 508d10ac61..ff30da8052 100644 --- a/timely/src/dataflow/operators/mod.rs +++ b/timely/src/dataflow/operators/mod.rs @@ -10,7 +10,7 @@ pub use self::enterleave::{Enter, EnterAt, Leave}; pub use self::input::Input; -pub use self::unordered_input::{UnorderedInput, UnorderedInputCore}; +pub use self::unordered_input::UnorderedInput; pub use self::feedback::{Feedback, LoopVariable, ConnectLoop}; pub use self::concat::{Concat, Concatenate}; pub use self::partition::Partition; diff --git a/timely/src/dataflow/operators/ok_err.rs b/timely/src/dataflow/operators/ok_err.rs index 36d7946813..614fbfe451 100644 --- a/timely/src/dataflow/operators/ok_err.rs +++ b/timely/src/dataflow/operators/ok_err.rs @@ -32,7 +32,7 @@ pub trait OkErr { fn ok_err( &self, logic: L, - ) -> (Stream, Stream) + ) -> (Stream>, Stream>) where D1: Data, @@ -41,11 +41,11 @@ pub trait OkErr { ; } -impl OkErr for Stream { +impl OkErr for Stream> { fn ok_err( &self, mut logic: L, - ) -> (Stream, Stream) + ) -> (Stream>, Stream>) where D1: Data, diff --git a/timely/src/dataflow/operators/partition.rs b/timely/src/dataflow/operators/partition.rs index cc5097530d..fb96639309 100644 --- a/timely/src/dataflow/operators/partition.rs +++ b/timely/src/dataflow/operators/partition.rs @@ -22,11 +22,11 @@ pub trait Partition (u64, D2)> { /// streams[2].inspect(|x| println!("seen 2: {:?}", x)); /// }); /// ``` - fn partition(&self, parts: u64, route: F) -> Vec>; + fn partition(&self, parts: u64, route: F) -> Vec>>; } -impl(u64, D2)+'static> Partition for Stream { - fn partition(&self, parts: u64, route: F) -> Vec> { +impl(u64, D2)+'static> Partition for Stream> { + fn partition(&self, parts: u64, route: F) -> Vec>> { let mut builder = OperatorBuilder::new("Partition".to_owned(), self.scope()); let mut input = builder.new_input(self, Pipeline); diff --git a/timely/src/dataflow/operators/probe.rs b/timely/src/dataflow/operators/probe.rs index 7c5a8567ec..77b6534ecd 100644 --- a/timely/src/dataflow/operators/probe.rs +++ b/timely/src/dataflow/operators/probe.rs @@ -5,14 +5,14 @@ use std::cell::RefCell; use crate::progress::Timestamp; use crate::progress::frontier::{AntichainRef, MutableAntichain}; -use crate::dataflow::channels::pushers::CounterCore as PushCounter; -use crate::dataflow::channels::pushers::buffer::BufferCore as PushBuffer; +use crate::dataflow::channels::pushers::Counter as PushCounter; +use crate::dataflow::channels::pushers::buffer::Buffer as PushBuffer; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::channels::pullers::Counter as PullCounter; use crate::dataflow::operators::generic::builder_raw::OperatorBuilder; -use crate::dataflow::{StreamCore, Scope}; +use crate::dataflow::{Stream, Scope}; use crate::Container; /// Monitors progress at a `Stream`. @@ -76,10 +76,10 @@ pub trait Probe { /// } /// }).unwrap(); /// ``` - fn probe_with(&self, handle: &mut Handle) -> StreamCore; + fn probe_with(&self, handle: &mut Handle) -> Stream; } -impl Probe for StreamCore { +impl Probe for Stream { fn probe(&self) -> Handle { // the frontier is shared state; scope updates, handle reads. @@ -87,7 +87,7 @@ impl Probe for StreamCore { self.probe_with(&mut handle); handle } - fn probe_with(&self, handle: &mut Handle) -> StreamCore { + fn probe_with(&self, handle: &mut Handle) -> Stream { let mut builder = OperatorBuilder::new("Probe".to_owned(), self.scope()); let mut input = PullCounter::new(builder.new_input(self, Pipeline)); @@ -202,7 +202,7 @@ mod tests { // create a new input, and inspect its output let (mut input, probe) = worker.dataflow(move |scope| { - let (input, stream) = scope.new_input::(); + let (input, stream) = scope.new_input::>(); (input, stream.probe()) }); diff --git a/timely/src/dataflow/operators/rc.rs b/timely/src/dataflow/operators/rc.rs index eaae55093e..3ab83119ea 100644 --- a/timely/src/dataflow/operators/rc.rs +++ b/timely/src/dataflow/operators/rc.rs @@ -2,7 +2,7 @@ use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::Operator; -use crate::dataflow::{Scope, StreamCore}; +use crate::dataflow::{Scope, Stream}; use crate::Container; use std::rc::Rc; @@ -21,11 +21,11 @@ pub trait SharedStream { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn shared(&self) -> StreamCore>; + fn shared(&self) -> Stream>; } -impl SharedStream for StreamCore { - fn shared(&self) -> StreamCore> { +impl SharedStream for Stream { + fn shared(&self) -> Stream> { let mut container = Default::default(); self.unary(Pipeline, "Shared", move |_, _| { move |input, output| { diff --git a/timely/src/dataflow/operators/reclock.rs b/timely/src/dataflow/operators/reclock.rs index b656e0aaf8..d1bbbd3358 100644 --- a/timely/src/dataflow/operators/reclock.rs +++ b/timely/src/dataflow/operators/reclock.rs @@ -2,7 +2,7 @@ use crate::Container; use crate::order::PartialOrder; -use crate::dataflow::{Scope, StreamCore}; +use crate::dataflow::{Scope, Stream}; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::generic::operator::Operator; @@ -45,11 +45,11 @@ pub trait Reclock { /// assert_eq!(extracted[1], (5, vec![4,5])); /// assert_eq!(extracted[2], (8, vec![6,7,8])); /// ``` - fn reclock>(&self, clock: &StreamCore) -> Self; + fn reclock>(&self, clock: &Stream) -> Self; } -impl Reclock for StreamCore { - fn reclock>(&self, clock: &StreamCore) -> StreamCore { +impl Reclock for Stream { + fn reclock>(&self, clock: &Stream) -> Stream { let mut stash = vec![]; diff --git a/timely/src/dataflow/operators/result.rs b/timely/src/dataflow/operators/result.rs index 3607f9f8ef..bdea2bb63a 100644 --- a/timely/src/dataflow/operators/result.rs +++ b/timely/src/dataflow/operators/result.rs @@ -18,7 +18,7 @@ pub trait ResultStream { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn ok(&self) -> Stream; + fn ok(&self) -> Stream>; /// Returns a new instance of `self` containing only `err` records. /// @@ -32,7 +32,7 @@ pub trait ResultStream { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn err(&self) -> Stream; + fn err(&self) -> Stream>; /// Returns a new instance of `self` applying `logic` on all `Ok` records. /// @@ -46,7 +46,7 @@ pub trait ResultStream { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn map_ok T2 + 'static>(&self, logic: L) -> Stream>; + fn map_ok T2 + 'static>(&self, logic: L) -> Stream>>; /// Returns a new instance of `self` applying `logic` on all `Err` records. /// @@ -60,7 +60,7 @@ pub trait ResultStream { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn map_err E2 + 'static>(&self, logic: L) -> Stream>; + fn map_err E2 + 'static>(&self, logic: L) -> Stream>>; /// Returns a new instance of `self` applying `logic` on all `Ok` records, passes through `Err` /// records. @@ -78,7 +78,7 @@ pub trait ResultStream { fn and_then Result + 'static>( &self, logic: L, - ) -> Stream>; + ) -> Stream>>; /// Returns a new instance of `self` applying `logic` on all `Ok` records. /// @@ -92,31 +92,31 @@ pub trait ResultStream { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn unwrap_or_else T + 'static>(&self, logic: L) -> Stream; + fn unwrap_or_else T + 'static>(&self, logic: L) -> Stream>; } -impl ResultStream for Stream> { - fn ok(&self) -> Stream { +impl ResultStream for Stream>> { + fn ok(&self) -> Stream> { self.flat_map(Result::ok) } - fn err(&self) -> Stream { + fn err(&self) -> Stream> { self.flat_map(Result::err) } - fn map_ok T2 + 'static>(&self, mut logic: L) -> Stream> { + fn map_ok T2 + 'static>(&self, mut logic: L) -> Stream>> { self.map(move |r| r.map(|x| logic(x))) } - fn map_err E2 + 'static>(&self, mut logic: L) -> Stream> { + fn map_err E2 + 'static>(&self, mut logic: L) -> Stream>> { self.map(move |r| r.map_err(|x| logic(x))) } - fn and_then Result + 'static>(&self, mut logic: L) -> Stream> { + fn and_then Result + 'static>(&self, mut logic: L) -> Stream>> { self.map(move |r| r.and_then(|x| logic(x))) } - fn unwrap_or_else T + 'static>(&self, mut logic: L) -> Stream { + fn unwrap_or_else T + 'static>(&self, mut logic: L) -> Stream> { self.map(move |r| r.unwrap_or_else(|err| logic(err))) } } diff --git a/timely/src/dataflow/operators/to_stream.rs b/timely/src/dataflow/operators/to_stream.rs index 597ec9163c..eb44a62c33 100644 --- a/timely/src/dataflow/operators/to_stream.rs +++ b/timely/src/dataflow/operators/to_stream.rs @@ -7,7 +7,7 @@ use crate::Container; use crate::dataflow::operators::generic::operator::source; use crate::dataflow::operators::CapabilitySet; -use crate::dataflow::{StreamCore, Scope, Stream}; +use crate::dataflow::{Stream, Scope}; use crate::progress::Timestamp; use crate::Data; @@ -29,11 +29,11 @@ pub trait ToStream { /// /// assert_eq!(data1.extract(), data2.extract()); /// ``` - fn to_stream>(self, scope: &mut S) -> Stream; + fn to_stream>(self, scope: &mut S) -> Stream>; } impl ToStream for I where I::Item: Data { - fn to_stream>(self, scope: &mut S) -> Stream { + fn to_stream>(self, scope: &mut S) -> Stream::Item>> { source(scope, "ToStream", |capability, info| { @@ -62,9 +62,9 @@ impl ToStream for I where I:: } } -/// Converts to a timely [StreamCore]. +/// Converts to a timely [Stream]. pub trait ToStreamCore { - /// Converts to a timely [StreamCore]. + /// Converts to a timely [Stream]. /// /// # Examples /// @@ -80,11 +80,11 @@ pub trait ToStreamCore { /// /// assert_eq!(data1.extract(), data2.extract()); /// ``` - fn to_stream_core>(self, scope: &mut S) -> StreamCore; + fn to_stream_core>(self, scope: &mut S) -> Stream; } impl ToStreamCore for I where I::Item: Container { - fn to_stream_core>(self, scope: &mut S) -> StreamCore { + fn to_stream_core>(self, scope: &mut S) -> Stream { source(scope, "ToStreamCore", |capability, info| { @@ -124,7 +124,7 @@ pub enum Event { /// Converts to a timely `Stream`. pub trait ToStreamAsync { /// Converts a [native `Stream`](futures_util::stream::Stream) of [`Event`s](Event) into a [timely - /// `Stream`](crate::dataflow::Stream). + /// `Stream`](Stream::<_, Vec<_>> ). /// /// # Examples /// @@ -150,7 +150,7 @@ pub trait ToStreamAsync { /// /// assert_eq!(data1.extract(), data2.extract()); /// ``` - fn to_stream>(self, scope: &S) -> Stream; + fn to_stream>(self, scope: &S) -> Stream>; } impl ToStreamAsync for I @@ -160,7 +160,7 @@ where F: IntoIterator, I: futures_util::stream::Stream> + Unpin + 'static, { - fn to_stream>(mut self, scope: &S) -> Stream { + fn to_stream>(mut self, scope: &S) -> Stream> { source(scope, "ToStreamAsync", move |capability, info| { let activator = Arc::new(scope.sync_activator_for(&info.address[..])); diff --git a/timely/src/dataflow/operators/unordered_input.rs b/timely/src/dataflow/operators/unordered_input.rs index c7e6002341..0085651519 100644 --- a/timely/src/dataflow/operators/unordered_input.rs +++ b/timely/src/dataflow/operators/unordered_input.rs @@ -11,21 +11,20 @@ use crate::progress::{Operate, operate::SharedProgress, Timestamp}; use crate::progress::Source; use crate::progress::ChangeBatch; -use crate::Data; -use crate::dataflow::channels::pushers::{CounterCore as PushCounter, TeeCore}; -use crate::dataflow::channels::pushers::buffer::{BufferCore as PushBuffer, AutoflushSessionCore}; +use crate::dataflow::channels::pushers::{Counter as PushCounter, Tee}; +use crate::dataflow::channels::pushers::buffer::{Buffer as PushBuffer, AutoflushSession}; use crate::dataflow::operators::{ActivateCapability, Capability}; -use crate::dataflow::{Stream, Scope, StreamCore}; +use crate::dataflow::{Scope, Stream}; /// Create a new `Stream` and `Handle` through which to supply input. pub trait UnorderedInput { - /// Create a new capability-based `Stream` and `Handle` through which to supply input. This + /// Create a new capability-based [Stream] and [UnorderedHandle] through which to supply input. This /// input supports multiple open epochs (timestamps) at the same time. /// - /// The `new_unordered_input` method returns `((Handle, Capability), Stream)` where the `Stream` can be used - /// immediately for timely dataflow construction, `Handle` and `Capability` are later used to introduce + /// The `new_unordered_input_core` method returns `((HandleCore, Capability), Stream)` where the `Stream` can be used + /// immediately for timely dataflow construction, `HandleCore` and `Capability` are later used to introduce /// data into the timely dataflow computation. /// /// The `Capability` returned is for the default value of the timestamp type in use. The @@ -42,9 +41,7 @@ pub trait UnorderedInput { /// /// use timely::*; /// use timely::dataflow::operators::*; - /// use timely::dataflow::operators::capture::Extract; - /// use timely::dataflow::Stream; - /// + /// use timely::dataflow::operators::capture::Extract; /// /// /// // get send and recv endpoints, wrap send to share /// let (send, recv) = ::std::sync::mpsc::channel(); /// let send = Arc::new(Mutex::new(send)); @@ -74,82 +71,14 @@ pub trait UnorderedInput { /// assert_eq!(extract[i], (i, vec![i])); /// } /// ``` - fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), Stream); + fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), Stream); } impl UnorderedInput for G { - fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), Stream) { - self.new_unordered_input_core() - } -} - -/// An unordered handle specialized to vectors. -pub type UnorderedHandle = UnorderedHandleCore>; - -/// Create a new `Stream` and `Handle` through which to supply input. -pub trait UnorderedInputCore { - /// Create a new capability-based [StreamCore] and [UnorderedHandleCore] through which to supply input. This - /// input supports multiple open epochs (timestamps) at the same time. - /// - /// The `new_unordered_input_core` method returns `((HandleCore, Capability), StreamCore)` where the `StreamCore` can be used - /// immediately for timely dataflow construction, `HandleCore` and `Capability` are later used to introduce - /// data into the timely dataflow computation. - /// - /// The `Capability` returned is for the default value of the timestamp type in use. The - /// capability can be dropped to inform the system that the input has advanced beyond the - /// capability's timestamp. To retain the ability to send, a new capability at a later timestamp - /// should be obtained first, via the `delayed` function for `Capability`. - /// - /// To communicate the end-of-input drop all available capabilities. - /// - /// # Examples - /// - /// ``` - /// use std::sync::{Arc, Mutex}; - /// - /// use timely::*; - /// use timely::dataflow::operators::*; - /// use timely::dataflow::operators::capture::Extract; - /// use timely::dataflow::Stream; - /// - /// // get send and recv endpoints, wrap send to share - /// let (send, recv) = ::std::sync::mpsc::channel(); - /// let send = Arc::new(Mutex::new(send)); - /// - /// timely::execute(Config::thread(), move |worker| { - /// - /// // this is only to validate the output. - /// let send = send.lock().unwrap().clone(); - /// - /// // create and capture the unordered input. - /// let (mut input, mut cap) = worker.dataflow::(|scope| { - /// let (input, stream) = scope.new_unordered_input_core(); - /// stream.capture_into(send); - /// input - /// }); - /// - /// // feed values 0..10 at times 0..10. - /// for round in 0..10 { - /// input.session(cap.clone()).give(round); - /// cap = cap.delayed(&(round + 1)); - /// worker.step(); - /// } - /// }).unwrap(); - /// - /// let extract = recv.extract(); - /// for i in 0..10 { - /// assert_eq!(extract[i], (i, vec![i])); - /// } - /// ``` - fn new_unordered_input_core(&mut self) -> ((UnorderedHandleCore, ActivateCapability), StreamCore); -} - - -impl UnorderedInputCore for G { - fn new_unordered_input_core(&mut self) -> ((UnorderedHandleCore, ActivateCapability), StreamCore) { + fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), Stream) { - let (output, registrar) = TeeCore::::new(); + let (output, registrar) = Tee::::new(); let internal = Rc::new(RefCell::new(ChangeBatch::new())); // let produced = Rc::new(RefCell::new(ChangeBatch::new())); let cap = Capability::new(G::Timestamp::minimum(), internal.clone()); @@ -163,7 +92,7 @@ impl UnorderedInputCore for G { let cap = ActivateCapability::new(cap, &address, self.activations()); - let helper = UnorderedHandleCore::new(counter); + let helper = UnorderedHandle::new(counter); self.add_operator_with_index(Box::new(UnorderedOperator { name: "UnorderedInput".to_owned(), @@ -174,7 +103,7 @@ impl UnorderedInputCore for G { peers, }), index); - ((helper, cap), StreamCore::new(Source::new(index, 0), registrar, self.clone())) + ((helper, cap), Stream::new(Source::new(index, 0), registrar, self.clone())) } } @@ -213,21 +142,21 @@ impl Operate for UnorderedOperator { fn notify_me(&self) -> bool { false } } -/// A handle to an input [StreamCore], used to introduce data to a timely dataflow computation. +/// A handle to an input [Stream], used to introduce data to a timely dataflow computation. #[derive(Debug)] -pub struct UnorderedHandleCore { - buffer: PushBuffer>>, +pub struct UnorderedHandle { + buffer: PushBuffer>>, } -impl UnorderedHandleCore { - fn new(pusher: PushCounter>) -> UnorderedHandleCore { - UnorderedHandleCore { +impl UnorderedHandle { + fn new(pusher: PushCounter>) -> UnorderedHandle { + Self { buffer: PushBuffer::new(pusher), } } /// Allocates a new automatically flushing session based on the supplied capability. - pub fn session<'b>(&'b mut self, cap: ActivateCapability) -> ActivateOnDrop>>> { + pub fn session<'b>(&'b mut self, cap: ActivateCapability) -> ActivateOnDrop>>> { ActivateOnDrop::new(self.buffer.autoflush_session(cap.capability.clone()), cap.address.clone(), cap.activations.clone()) } } diff --git a/timely/src/dataflow/scopes/mod.rs b/timely/src/dataflow/scopes/mod.rs index 50ee7b8fa0..786c12471e 100644 --- a/timely/src/dataflow/scopes/mod.rs +++ b/timely/src/dataflow/scopes/mod.rs @@ -83,7 +83,7 @@ pub trait Scope: ScopeParent { /// timely::execute_from_args(std::env::args(), |worker| { /// // must specify types as nothing else drives inference. /// let input = worker.dataflow::(|child1| { - /// let (input, stream) = child1.new_input::(); + /// let (input, stream) = child1.new_input::>(); /// let output = child1.scoped::,_,_>("ScopeName", |child2| { /// stream.enter(child2).leave() /// }); @@ -110,7 +110,7 @@ pub trait Scope: ScopeParent { /// timely::execute_from_args(std::env::args(), |worker| { /// // must specify types as nothing else drives inference. /// let input = worker.dataflow::(|child1| { - /// let (input, stream) = child1.new_input::(); + /// let (input, stream) = child1.new_input::>(); /// let output = child1.iterative::(|child2| { /// stream.enter(child2).leave() /// }); @@ -140,7 +140,7 @@ pub trait Scope: ScopeParent { /// timely::execute_from_args(std::env::args(), |worker| { /// // must specify types as nothing else drives inference. /// let input = worker.dataflow::(|child1| { - /// let (input, stream) = child1.new_input::(); + /// let (input, stream) = child1.new_input::>(); /// let output = child1.region(|child2| { /// stream.enter(child2).leave() /// }); @@ -172,7 +172,7 @@ pub trait Scope: ScopeParent { /// timely::execute_from_args(std::env::args(), |worker| { /// // must specify types as nothing else drives inference. /// let input = worker.dataflow::(|child1| { - /// let (input, stream) = child1.new_input::(); + /// let (input, stream) = child1.new_input::>(); /// let output = child1.region_named("region", |child2| { /// stream.enter(child2).leave() /// }); diff --git a/timely/src/dataflow/stream.rs b/timely/src/dataflow/stream.rs index aa5e48601f..8b6c836eb5 100644 --- a/timely/src/dataflow/stream.rs +++ b/timely/src/dataflow/stream.rs @@ -9,7 +9,7 @@ use crate::progress::{Source, Target}; use crate::communication::Push; use crate::dataflow::Scope; use crate::dataflow::channels::pushers::tee::TeeHelper; -use crate::dataflow::channels::BundleCore; +use crate::dataflow::channels::Bundle; use std::fmt::{self, Debug}; use crate::Container; @@ -20,7 +20,7 @@ use crate::Container; /// Internally `Stream` maintains a list of data recipients who should be presented with data /// produced by the source of the stream. #[derive(Clone)] -pub struct StreamCore { +pub struct Stream { /// The progress identifier of the stream's data source. name: Source, /// The `Scope` containing the stream. @@ -29,15 +29,12 @@ pub struct StreamCore { ports: TeeHelper, } -/// A stream batching data in vectors. -pub type Stream = StreamCore>; - -impl StreamCore { +impl Stream { /// Connects the stream to a destination. /// /// The destination is described both by a `Target`, for progress tracking information, and a `P: Push` where the /// records should actually be sent. The identifier is unique to the edge and is used only for logging purposes. - pub fn connect_to>+'static>(&self, target: Target, pusher: P, identifier: usize) { + pub fn connect_to>+'static>(&self, target: Target, pusher: P, identifier: usize) { let mut logging = self.scope().logging(); logging.as_mut().map(|l| l.log(crate::logging::ChannelsEvent { @@ -60,7 +57,7 @@ impl StreamCore { pub fn scope(&self) -> S { self.scope.clone() } } -impl Debug for StreamCore +impl Debug for Stream where S: Scope, { diff --git a/timely/src/execute.rs b/timely/src/execute.rs index 3564d35928..9a8340dbfe 100644 --- a/timely/src/execute.rs +++ b/timely/src/execute.rs @@ -235,12 +235,12 @@ where use ::std::net::TcpStream; use crate::logging::BatchLogger; - use crate::dataflow::operators::capture::EventWriterCore; + use crate::dataflow::operators::capture::EventWriter; eprintln!("enabled COMM logging to {}", addr); if let Ok(stream) = TcpStream::connect(&addr) { - let writer = EventWriterCore::new(stream); + let writer = EventWriter::new(stream); let mut logger = BatchLogger::new(writer); result = Some(crate::logging_core::Logger::new( ::std::time::Instant::now(), @@ -269,10 +269,10 @@ where use ::std::net::TcpStream; use crate::logging::{BatchLogger, TimelyEvent}; - use crate::dataflow::operators::capture::EventWriterCore; + use crate::dataflow::operators::capture::EventWriter; if let Ok(stream) = TcpStream::connect(&addr) { - let writer = EventWriterCore::new(stream); + let writer = EventWriter::new(stream); let mut logger = BatchLogger::new(writer); worker.log_register() .insert::("timely", move |time, data| diff --git a/timely/src/logging.rs b/timely/src/logging.rs index ca9868fbe5..39944ade05 100644 --- a/timely/src/logging.rs +++ b/timely/src/logging.rs @@ -13,14 +13,14 @@ use std::time::Duration; use crate::dataflow::operators::capture::{Event, EventPusher}; /// Logs events as a timely stream, with progress statements. -pub struct BatchLogger where P: EventPusher { +pub struct BatchLogger where P: EventPusher> { // None when the logging stream is closed time: Duration, event_pusher: P, _phantom: ::std::marker::PhantomData<(E, T)>, } -impl BatchLogger where P: EventPusher { +impl BatchLogger where P: EventPusher> { /// Creates a new batch logger. pub fn new(event_pusher: P) -> Self { BatchLogger { @@ -42,7 +42,7 @@ impl BatchLogger where P: EventPusher Drop for BatchLogger where P: EventPusher { +impl Drop for BatchLogger where P: EventPusher> { fn drop(&mut self) { self.event_pusher.push(Event::Progress(vec![(self.time, -1)])); } diff --git a/timely/src/synchronization/barrier.rs b/timely/src/synchronization/barrier.rs index e7d1fe460a..5538ae70f1 100644 --- a/timely/src/synchronization/barrier.rs +++ b/timely/src/synchronization/barrier.rs @@ -6,7 +6,7 @@ use crate::worker::Worker; /// A re-usable barrier synchronization mechanism. pub struct Barrier { - input: InputHandle, + input: InputHandle>, probe: ProbeHandle, worker: Worker, } @@ -17,7 +17,7 @@ impl Barrier { pub fn new(worker: &mut Worker) -> Self { use crate::dataflow::operators::{Input, Probe}; let (input, probe) = worker.dataflow(|scope| { - let (handle, stream) = scope.new_input::<()>(); + let (handle, stream) = scope.new_input::>(); (handle, stream.probe()) }); Barrier { input, probe, worker: worker.clone() } @@ -51,4 +51,3 @@ impl Barrier { !self.probe.less_than(self.input.time()) } } - diff --git a/timely/tests/barrier.rs b/timely/tests/barrier.rs index 9e627762bd..f9bfbeb22f 100644 --- a/timely/tests/barrier.rs +++ b/timely/tests/barrier.rs @@ -17,7 +17,7 @@ fn barrier_sync_helper(comm_config: ::timely::CommunicationConfig) { }; timely::execute(config, move |worker| { worker.dataflow(move |scope| { - let (handle, stream) = scope.feedback::(1); + let (handle, stream) = scope.feedback::>(1); stream.unary_notify( Pipeline, "Barrier",