diff --git a/timely/src/dataflow/operators/enterleave.rs b/timely/src/dataflow/operators/enterleave.rs index bd9b1f86a..24c8fa12e 100644 --- a/timely/src/dataflow/operators/enterleave.rs +++ b/timely/src/dataflow/operators/enterleave.rs @@ -21,6 +21,7 @@ use std::marker::PhantomData; +use crate::logging::{TimelyLogger, MessagesEvent}; use crate::progress::Timestamp; use crate::progress::timestamp::Refines; use crate::progress::{Source, Target}; @@ -90,17 +91,26 @@ impl, C: Data+Container> Enter::new(); let ingress = IngressNub { targets: CounterCore::new(targets), - phantom: ::std::marker::PhantomData, + phantom: PhantomData, activator: scope.activator_for(&scope.addr()), active: false, }; let produced = ingress.targets.produced().clone(); - let input = scope.subgraph.borrow_mut().new_input(produced); - let channel_id = scope.clone().new_identifier(); - self.connect_to(input, ingress, channel_id); - StreamCore::new(Source::new(0, input.port), registrar, scope.clone()) + + if let Some(logger) = scope.logging() { + let pusher = LogPusher::new(ingress, channel_id, scope.index(), logger); + self.connect_to(input, pusher, channel_id); + } else { + self.connect_to(input, ingress, channel_id); + } + + StreamCore::new( + Source::new(0, input.port), + registrar, + scope.clone(), + ) } } @@ -129,9 +139,17 @@ impl<'a, G: Scope, D: Clone+Container, T: Timestamp+Refines> Leave let scope = self.scope(); let output = scope.subgraph.borrow_mut().new_output(); + let target = Target::new(0, output.port); let (targets, registrar) = TeeCore::::new(); + let egress = EgressNub { targets, phantom: PhantomData }; let channel_id = scope.clone().new_identifier(); - self.connect_to(Target::new(0, output.port), EgressNub { targets, phantom: PhantomData }, channel_id); + + if let Some(logger) = scope.logging() { + let pusher = LogPusher::new(egress, channel_id, scope.index(), logger); + self.connect_to(target, pusher, channel_id); + } else { + self.connect_to(target, egress, channel_id); + } StreamCore::new( output, @@ -197,6 +215,61 @@ where TOuter: Timestamp, TInner: Timestamp+Refines, TData: Data { } } +/// A pusher that logs messages passing through it. +/// +/// This type performs the same function as the `LogPusher` and `LogPuller` types in +/// `timely::dataflow::channels::pact`. We need a special implementation for `enter`/`leave` +/// channels because those don't have a puller connected. Thus, this pusher needs to log both the +/// send and the receive `MessageEvent`. +struct LogPusher

{ + pusher: P, + channel: usize, + counter: usize, + index: usize, + logger: TimelyLogger, +} + +impl

LogPusher

{ + fn new(pusher: P, channel: usize, index: usize, logger: TimelyLogger) -> Self { + Self { + pusher, + channel, + counter: 0, + index, + logger, + } + } +} + +impl Push> for LogPusher

+where + D: Container, + P: Push>, +{ + fn push(&mut self, element: &mut Option>) { + if let Some(bundle) = element { + let send_event = MessagesEvent { + is_send: true, + channel: self.channel, + source: self.index, + target: self.index, + seq_no: self.counter, + length: bundle.data.len(), + }; + let recv_event = MessagesEvent { + is_send: false, + ..send_event + }; + + self.logger.log(send_event); + self.logger.log(recv_event); + self.counter += 1; + } + + self.pusher.push(element); + } +} + #[cfg(test)] mod test { /// Test that nested scopes with pass-through edges (no operators) correctly communicate progress.