Skip to content

Commit

Permalink
Added debug implementations for a bunch of types (TimelyDataflow#387)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kixiron authored Jun 5, 2021
1 parent 687d2e4 commit 209af99
Show file tree
Hide file tree
Showing 16 changed files with 144 additions and 39 deletions.
23 changes: 20 additions & 3 deletions logging/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::cell::RefCell;
use std::any::Any;
use std::collections::HashMap;
use std::time::{Instant, Duration};
use std::fmt::{self, Debug};

pub struct Registry<Id> {
/// A worker-specific identifier.
Expand Down Expand Up @@ -31,7 +32,7 @@ impl<Id: Clone+'static> Registry<Id> {
name: &str,
action: F) -> Option<Box<dyn Any>>
{
let logger = Logger::<T, Id>::new(self.time.clone(), Duration::default(), self.id.clone(), action);
let logger = Logger::<T, Id>::new(self.time, Duration::default(), self.id.clone(), action);
self.insert_logger(name, logger)
}

Expand Down Expand Up @@ -99,7 +100,7 @@ impl<T, E: Clone> Clone for Logger<T, E> {
Logger {
id: self.id.clone(),
time: self.time,
offset: self.offset.clone(),
offset: self.offset,
action: self.action.clone(),
buffer: self.buffer.clone(),
}
Expand Down Expand Up @@ -153,7 +154,7 @@ impl<T, E: Clone> Logger<T, E> {
let mut buffer = self.buffer.borrow_mut();
let elapsed = self.time.elapsed() + self.offset;
for event in events {
buffer.push((elapsed.clone(), self.id.clone(), event.into()));
buffer.push((elapsed, self.id.clone(), event.into()));
if buffer.len() == buffer.capacity() {
// Would call `self.flush()`, but for `RefCell` panic.
let mut action = self.action.borrow_mut();
Expand Down Expand Up @@ -186,6 +187,22 @@ impl<T, E> Drop for Logger<T, E> {
}
}

impl<T, E> Debug for Logger<T, E>
where
E: Debug,
T: Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Logger")
.field("id", &self.id)
.field("time", &self.time)
.field("offset", &self.offset)
.field("action", &Rc::as_ptr(&self.action))
.field("buffer", &self.buffer)
.finish()
}
}

/// Types that can be flushed.
trait Flush {
/// Flushes buffered data.
Expand Down
71 changes: 46 additions & 25 deletions timely/src/dataflow/channels/pact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
//! The only requirement of a pact is that it not alter the number of `D` records at each time `T`.
//! The progress tracking logic assumes that this number is independent of the pact used.
use std::marker::PhantomData;
use std::{fmt::{self, Debug}, marker::PhantomData};

use crate::communication::{Push, Pull, Data};
use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
Expand All @@ -16,7 +16,7 @@ use crate::worker::AsWorker;
use crate::dataflow::channels::pushers::Exchange as ExchangePusher;
use super::{Bundle, Message};

use crate::logging::TimelyLogger as Logger;
use crate::logging::{TimelyLogger as Logger, MessagesEvent};

/// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors.
pub trait ParallelizationContract<T: 'static, D: 'static> {
Expand All @@ -29,7 +29,9 @@ pub trait ParallelizationContract<T: 'static, D: 'static> {
}

/// A direct connection
#[derive(Debug)]
pub struct Pipeline;

impl<T: 'static, D: 'static> ParallelizationContract<T, D> for Pipeline {
type Pusher = LogPusher<T, D, ThreadPusher<Bundle<T, D>>>;
type Puller = LogPuller<T, D, ThreadPuller<Bundle<T, D>>>;
Expand All @@ -43,8 +45,9 @@ impl<T: 'static, D: 'static> ParallelizationContract<T, D> for Pipeline {
}

/// An exchange between multiple observers by data
pub struct Exchange<D, F: FnMut(&D)->u64+'static> { hash_func: F, phantom: PhantomData<D>, }
impl<D, F: FnMut(&D)->u64> Exchange<D, F> {
pub struct Exchange<D, F> { hash_func: F, phantom: PhantomData<D> }

impl<D, F: FnMut(&D)->u64+'static> Exchange<D, F> {
/// Allocates a new `Exchange` pact from a distribution function.
pub fn new(func: F) -> Exchange<D, F> {
Exchange {
Expand All @@ -67,16 +70,24 @@ impl<T: Eq+Data+Clone, D: Data+Clone, F: FnMut(&D)->u64+'static> Parallelization
}
}

impl<D, F> Debug for Exchange<D, F> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Exchange").finish()
}
}

/// Wraps a `Message<T,D>` pusher to provide a `Push<(T, Content<D>)>`.
#[derive(Debug)]
pub struct LogPusher<T, D, P: Push<Bundle<T, D>>> {
pusher: P,
channel: usize,
counter: usize,
source: usize,
target: usize,
phantom: ::std::marker::PhantomData<(T, D)>,
phantom: PhantomData<(T, D)>,
logging: Option<Logger>,
}

impl<T, D, P: Push<Bundle<T, D>>> LogPusher<T, D, P> {
/// Allocates a new pusher.
pub fn new(pusher: P, source: usize, target: usize, channel: usize, logging: Option<Logger>) -> Self {
Expand All @@ -86,7 +97,7 @@ impl<T, D, P: Push<Bundle<T, D>>> LogPusher<T, D, P> {
counter: 0,
source,
target,
phantom: ::std::marker::PhantomData,
phantom: PhantomData,
logging,
}
}
Expand All @@ -97,42 +108,48 @@ impl<T, D, P: Push<Bundle<T, D>>> Push<Bundle<T, D>> for LogPusher<T, D, P> {
fn push(&mut self, pair: &mut Option<Bundle<T, D>>) {
if let Some(bundle) = pair {
self.counter += 1;

// Stamp the sequence number and source.
// FIXME: Awkward moment/logic.
if let Some(message) = bundle.if_mut() {
message.seq = self.counter-1;
message.seq = self.counter - 1;
message.from = self.source;
}

self.logging.as_ref().map(|l| l.log(crate::logging::MessagesEvent {
is_send: true,
channel: self.channel,
source: self.source,
target: self.target,
seq_no: self.counter-1,
length: bundle.data.len(),
}));
if let Some(logger) = self.logging.as_ref() {
logger.log(MessagesEvent {
is_send: true,
channel: self.channel,
source: self.source,
target: self.target,
seq_no: self.counter - 1,
length: bundle.data.len(),
})
}
}

self.pusher.push(pair);
}
}

/// Wraps a `Message<T,D>` puller to provide a `Pull<(T, Content<D>)>`.
#[derive(Debug)]
pub struct LogPuller<T, D, P: Pull<Bundle<T, D>>> {
puller: P,
channel: usize,
index: usize,
phantom: ::std::marker::PhantomData<(T, D)>,
phantom: PhantomData<(T, D)>,
logging: Option<Logger>,
}

impl<T, D, P: Pull<Bundle<T, D>>> LogPuller<T, D, P> {
/// Allocates a new `Puller`.
pub fn new(puller: P, index: usize, channel: usize, logging: Option<Logger>) -> Self {
LogPuller {
puller,
channel,
index,
phantom: ::std::marker::PhantomData,
phantom: PhantomData,
logging,
}
}
Expand All @@ -145,15 +162,19 @@ impl<T, D, P: Pull<Bundle<T, D>>> Pull<Bundle<T, D>> for LogPuller<T, D, P> {
if let Some(bundle) = result {
let channel = self.channel;
let target = self.index;
self.logging.as_ref().map(|l| l.log(crate::logging::MessagesEvent {
is_send: false,
channel,
source: bundle.from,
target,
seq_no: bundle.seq,
length: bundle.data.len(),
}));

if let Some(logger) = self.logging.as_ref() {
logger.log(MessagesEvent {
is_send: false,
channel,
source: bundle.from,
target,
seq_no: bundle.seq,
length: bundle.data.len(),
});
}
}

result
}
}
1 change: 1 addition & 0 deletions timely/src/dataflow/channels/pushers/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::communication::Push;
///
/// The `Buffer` type should be used by calling `session` with a time, which checks whether
/// data must be flushed and creates a `Session` object which allows sending at the given time.
#[derive(Debug)]
pub struct Buffer<T, D, P: Push<Bundle<T, D>>> {
time: Option<T>, // the currently open time, if it is open
buffer: Vec<D>, // a buffer for records, to send at self.time
Expand Down
6 changes: 4 additions & 2 deletions timely/src/dataflow/channels/pushers/counter.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! A wrapper which counts the number of records pushed past and updates a shared count map.
use std::marker::PhantomData;
use std::rc::Rc;
use std::cell::RefCell;

Expand All @@ -8,10 +9,11 @@ use crate::dataflow::channels::Bundle;
use crate::communication::Push;

/// A wrapper which updates shared `produced` based on the number of records pushed.
#[derive(Debug)]
pub struct Counter<T: Ord, D, P: Push<Bundle<T, D>>> {
pushee: P,
produced: Rc<RefCell<ChangeBatch<T>>>,
phantom: ::std::marker::PhantomData<D>,
phantom: PhantomData<D>,
}

impl<T, D, P> Push<Bundle<T, D>> for Counter<T, D, P> where T : Ord+Clone+'static, P: Push<Bundle<T, D>> {
Expand All @@ -34,7 +36,7 @@ impl<T, D, P: Push<Bundle<T, D>>> Counter<T, D, P> where T : Ord+Clone+'static {
Counter {
pushee,
produced: Rc::new(RefCell::new(ChangeBatch::new())),
phantom: ::std::marker::PhantomData,
phantom: PhantomData,
}
}
/// A references to shared changes in counts, for cloning or draining.
Expand Down
45 changes: 40 additions & 5 deletions timely/src/dataflow/channels/pushers/tee.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
//! A `Push` implementor with a list of `Box<Push>` to forward pushes to.
use std::rc::Rc;
use std::cell::RefCell;
use std::fmt::{self, Debug};
use std::rc::Rc;

use crate::Data;
use crate::dataflow::channels::{Bundle, Message};
use crate::Data;

use crate::communication::Push;

type PushList<T, D> = Rc<RefCell<Vec<Box<dyn Push<Bundle<T, D>>>>>>;

/// Wraps a shared list of `Box<Push>` to forward pushes to. Owned by `Stream`.
pub struct Tee<T: 'static, D: 'static> {
buffer: Vec<D>,
shared: Rc<RefCell<Vec<Box<dyn Push<Bundle<T, D>>>>>>,
shared: PushList<T, D>,
}

impl<T: Data, D: Data> Push<Bundle<T, D>> for Tee<T, D> {
Expand Down Expand Up @@ -58,9 +61,27 @@ impl<T, D> Clone for Tee<T, D> {
}
}

impl<T, D> Debug for Tee<T, D>
where
D: Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut debug = f.debug_struct("Tee");
debug.field("buffer", &self.buffer);

if let Ok(shared) = self.shared.try_borrow() {
debug.field("shared", &format!("{} pushers", shared.len()));
} else {
debug.field("shared", &"...");
}

debug.finish()
}
}

/// A shared list of `Box<Push>` used to add `Push` implementors.
pub struct TeeHelper<T, D> {
shared: Rc<RefCell<Vec<Box<dyn Push<Bundle<T, D>>>>>>
shared: PushList<T, D>,
}

impl<T, D> TeeHelper<T, D> {
Expand All @@ -73,7 +94,21 @@ impl<T, D> TeeHelper<T, D> {
impl<T, D> Clone for TeeHelper<T, D> {
fn clone(&self) -> Self {
TeeHelper {
shared: self.shared.clone()
shared: self.shared.clone(),
}
}
}

impl<T, D> Debug for TeeHelper<T, D> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut debug = f.debug_struct("TeeHelper");

if let Ok(shared) = self.shared.try_borrow() {
debug.field("shared", &format!("{} pushers", shared.len()));
} else {
debug.field("shared", &"...");
}

debug.finish()
}
}
6 changes: 6 additions & 0 deletions timely/src/dataflow/operators/capture/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ pub mod link {
}
}

impl<T, D> Default for EventLink<T, D> {
fn default() -> Self {
Self::new()
}
}

#[test]
fn avoid_stack_overflow_in_drop() {
let mut event1 = Rc::new(EventLink::<(),()>::new());
Expand Down
1 change: 1 addition & 0 deletions timely/src/dataflow/operators/feedback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ impl<G: Scope, D: Data> ConnectLoop<G, D> for Stream<G, D> {
}

/// A handle used to bind the source of a loop variable.
#[derive(Debug)]
pub struct Handle<G: Scope, D: Data> {
builder: OperatorBuilder<G>,
summary: <G::Timestamp as Timestamp>::Summary,
Expand Down
2 changes: 2 additions & 0 deletions timely/src/dataflow/operators/generic/builder_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::dataflow::channels::pact::ParallelizationContract;
use crate::dataflow::operators::generic::operator_info::OperatorInfo;

/// Contains type-free information about the operator properties.
#[derive(Debug)]
pub struct OperatorShape {
name: String, // A meaningful name for the operator.
notify: bool, // Does the operator require progress notifications.
Expand Down Expand Up @@ -53,6 +54,7 @@ impl OperatorShape {
}

/// Builds operators with generic shape.
#[derive(Debug)]
pub struct OperatorBuilder<G: Scope> {
scope: G,
index: usize,
Expand Down
1 change: 1 addition & 0 deletions timely/src/dataflow/operators/generic/builder_rc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::logging::TimelyLogger as Logger;
use super::builder_raw::OperatorBuilder as OperatorBuilderRaw;

/// Builds operators with generic shape.
#[derive(Debug)]
pub struct OperatorBuilder<G: Scope> {
builder: OperatorBuilderRaw<G>,
frontier: Vec<MutableAntichain<G::Timestamp>>,
Expand Down
1 change: 1 addition & 0 deletions timely/src/dataflow/operators/generic/handles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ pub fn new_input_handle<T: Timestamp, D, P: Pull<Bundle<T, D>>>(pull_counter: Pu
/// An `OutputWrapper` exists to prevent anyone from using the wrapped buffer in any way other
/// than with an `OutputHandle`, whose methods ensure that capabilities are used and that the
/// pusher is flushed (via the `cease` method) once it is no longer used.
#[derive(Debug)]
pub struct OutputWrapper<T: Timestamp, D, P: Push<Bundle<T, D>>> {
push_buffer: Buffer<T, D, PushCounter<T, D, P>>,
internal_buffer: Rc<RefCell<ChangeBatch<T>>>,
Expand Down
Loading

0 comments on commit 209af99

Please sign in to comment.