Skip to content

Commit

Permalink
Log messages over enter/leave channels (TimelyDataflow#507)
Browse files Browse the repository at this point in the history
This commit adds `MessageEvent` logging for messages traveling over
`enter`/`leave` channels.
  • Loading branch information
teskje authored Mar 28, 2023
1 parent 036b4da commit 134842a
Showing 1 changed file with 79 additions and 6 deletions.
85 changes: 79 additions & 6 deletions timely/src/dataflow/operators/enterleave.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
use std::marker::PhantomData;

use crate::logging::{TimelyLogger, MessagesEvent};
use crate::progress::Timestamp;
use crate::progress::timestamp::Refines;
use crate::progress::{Source, Target};
Expand Down Expand Up @@ -90,17 +91,26 @@ impl<G: Scope, T: Timestamp+Refines<G::Timestamp>, C: Data+Container> Enter<G, T
let (targets, registrar) = TeeCore::<T, C>::new();
let ingress = IngressNub {
targets: CounterCore::new(targets),
phantom: ::std::marker::PhantomData,
phantom: PhantomData,
activator: scope.activator_for(&scope.addr()),
active: false,
};
let produced = ingress.targets.produced().clone();

let input = scope.subgraph.borrow_mut().new_input(produced);

let channel_id = scope.clone().new_identifier();
self.connect_to(input, ingress, channel_id);
StreamCore::new(Source::new(0, input.port), registrar, scope.clone())

if let Some(logger) = scope.logging() {
let pusher = LogPusher::new(ingress, channel_id, scope.index(), logger);
self.connect_to(input, pusher, channel_id);
} else {
self.connect_to(input, ingress, channel_id);
}

StreamCore::new(
Source::new(0, input.port),
registrar,
scope.clone(),
)
}
}

Expand Down Expand Up @@ -129,9 +139,17 @@ impl<'a, G: Scope, D: Clone+Container, T: Timestamp+Refines<G::Timestamp>> Leave
let scope = self.scope();

let output = scope.subgraph.borrow_mut().new_output();
let target = Target::new(0, output.port);
let (targets, registrar) = TeeCore::<G::Timestamp, D>::new();
let egress = EgressNub { targets, phantom: PhantomData };
let channel_id = scope.clone().new_identifier();
self.connect_to(Target::new(0, output.port), EgressNub { targets, phantom: PhantomData }, channel_id);

if let Some(logger) = scope.logging() {
let pusher = LogPusher::new(egress, channel_id, scope.index(), logger);
self.connect_to(target, pusher, channel_id);
} else {
self.connect_to(target, egress, channel_id);
}

StreamCore::new(
output,
Expand Down Expand Up @@ -197,6 +215,61 @@ where TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TData: Data {
}
}

/// A pusher that logs messages passing through it.
///
/// This type performs the same function as the `LogPusher` and `LogPuller` types in
/// `timely::dataflow::channels::pact`. We need a special implementation for `enter`/`leave`
/// channels because those don't have a puller connected. Thus, this pusher needs to log both the
/// send and the receive `MessageEvent`.
struct LogPusher<P> {
pusher: P,
channel: usize,
counter: usize,
index: usize,
logger: TimelyLogger,
}

impl<P> LogPusher<P> {
fn new(pusher: P, channel: usize, index: usize, logger: TimelyLogger) -> Self {
Self {
pusher,
channel,
counter: 0,
index,
logger,
}
}
}

impl<T, D, P> Push<BundleCore<T, D>> for LogPusher<P>
where
D: Container,
P: Push<BundleCore<T, D>>,
{
fn push(&mut self, element: &mut Option<BundleCore<T, D>>) {
if let Some(bundle) = element {
let send_event = MessagesEvent {
is_send: true,
channel: self.channel,
source: self.index,
target: self.index,
seq_no: self.counter,
length: bundle.data.len(),
};
let recv_event = MessagesEvent {
is_send: false,
..send_event
};

self.logger.log(send_event);
self.logger.log(recv_event);
self.counter += 1;
}

self.pusher.push(element);
}
}

#[cfg(test)]
mod test {
/// Test that nested scopes with pass-through edges (no operators) correctly communicate progress.
Expand Down

0 comments on commit 134842a

Please sign in to comment.