Skip to content

Commit

Permalink
Usability and performance tweaks (#379)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kixiron authored Apr 10, 2021
1 parent 6d86538 commit 3671a3b
Show file tree
Hide file tree
Showing 18 changed files with 227 additions and 105 deletions.
2 changes: 1 addition & 1 deletion timely/src/dataflow/channels/pact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl<T: 'static, D: 'static> ParallelizationContract<T, D> for Pipeline {
// // ignore `&mut A` and use thread allocator
// let (pusher, puller) = Thread::new::<Bundle<T, D>>();
(LogPusher::new(pusher, allocator.index(), allocator.index(), identifier, logging.clone()),
LogPuller::new(puller, allocator.index(), identifier, logging.clone()))
LogPuller::new(puller, allocator.index(), identifier, logging))
}
}

Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/channels/pushers/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl<'a, T, D, P: Push<Bundle<T, D>>+'a> Session<'a, T, D, P> where T: Eq+Clone
/// new backing memory.
#[inline]
pub fn give_vec(&mut self, message: &mut Vec<D>) {
if message.len() > 0 {
if !message.is_empty() {
self.buffer.give_vec(message);
}
}
Expand Down Expand Up @@ -144,7 +144,7 @@ impl<'a, T: Timestamp, D, P: Push<Bundle<T, D>>+'a> AutoflushSession<'a, T, D, P
/// Transmits a pre-packed batch of data.
#[inline]
pub fn give_content(&mut self, message: &mut Vec<D>) {
if message.len() > 0 {
if !message.is_empty() {
self.buffer.give_vec(message);
}
}
Expand Down
8 changes: 7 additions & 1 deletion timely/src/dataflow/operators/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,10 +385,16 @@ impl<T: Timestamp> CapabilitySet<T> {
}
}

impl<T: Timestamp> Default for CapabilitySet<T> {
fn default() -> Self {
Self::new()
}
}

impl<T: Timestamp> Deref for CapabilitySet<T> {
type Target=[Capability<T>];

fn deref(&self) -> &[Capability<T>] {
&self.elements[..]
&self.elements
}
}
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/enterleave.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl<'a, G: Scope, D: Data, T: Timestamp+Refines<G::Timestamp>> Leave<G, D> for
Stream::new(
output,
registrar,
scope.parent.clone()
scope.parent,
)
}
}
Expand Down Expand Up @@ -206,7 +206,7 @@ mod test {
fn test_nested() {

use crate::dataflow::{InputHandle, ProbeHandle};
use crate::dataflow::operators::{Input, Exchange, Inspect, Probe};
use crate::dataflow::operators::{Input, Inspect, Probe};

use crate::dataflow::Scope;
use crate::dataflow::operators::{Enter, Leave};
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl<G: Scope, D: Data> Filter<D> for Stream<G, D> {
input.for_each(|time, data| {
data.swap(&mut vector);
vector.retain(|x| predicate(x));
if vector.len() > 0 {
if !vector.is_empty() {
output.session(&time).give_vec(&mut vector);
}
});
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/generic/builder_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ impl<G: Scope> OperatorBuilder<G> {
let operator = OperatorCore {
shape: self.shape,
address: self.address,
activations: self.scope.activations().clone(),
activations: self.scope.activations(),
logic,
shared_progress: Rc::new(RefCell::new(SharedProgress::new(inputs, outputs))),
summary: self.summary,
Expand Down
19 changes: 9 additions & 10 deletions timely/src/dataflow/operators/generic/builder_rc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,10 @@ impl<G: Scope> OperatorBuilder<G> {
{
// create capabilities, discard references to their creation.
let mut capabilities = Vec::with_capacity(self.internal.borrow().len());
for output_index in 0 .. self.internal.borrow().len() {
let borrow = &self.internal.borrow()[output_index];
capabilities.push(mint_capability(G::Timestamp::minimum(), borrow.clone()));
for batch in self.internal.borrow().iter() {
capabilities.push(mint_capability(G::Timestamp::minimum(), batch.clone()));
// Discard evidence of creation, as we are assumed to start with one.
borrow.borrow_mut().clear();
batch.borrow_mut().clear();
}

let mut logic = constructor(capabilities);
Expand All @@ -155,16 +154,16 @@ impl<G: Scope> OperatorBuilder<G> {
move |progress: &mut SharedProgress<G::Timestamp>| {

// drain frontier changes
for index in 0 .. progress.frontiers.len() {
self_frontier[index].update_iter(progress.frontiers[index].drain());
for (progress, frontier) in progress.frontiers.iter_mut().zip(self_frontier.iter_mut()) {
frontier.update_iter(progress.drain());
}

// invoke supplied logic
let result = logic(&self_frontier[..]);

// move batches of consumed changes.
for index in 0 .. progress.consumeds.len() {
self_consumed[index].borrow_mut().drain_into(&mut progress.consumeds[index]);
for (progress, consumed) in progress.consumeds.iter_mut().zip(self_consumed.iter()) {
consumed.borrow_mut().drain_into(progress);
}

// move batches of internal changes.
Expand All @@ -175,8 +174,8 @@ impl<G: Scope> OperatorBuilder<G> {
}

// move batches of produced changes.
for index in 0 .. progress.produceds.len() {
self_produced[index].borrow_mut().drain_into(&mut progress.produceds[index]);
for (progress, produced) in progress.produceds.iter_mut().zip(self_produced.iter()) {
produced.borrow_mut().drain_into(progress);
}

result
Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/generic/notificator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ impl<T: Timestamp> FrontierNotificator<T> {
/// });
/// ```
#[inline]
pub fn notify_at<'a>(&mut self, cap: Capability<T>) {
pub fn notify_at(&mut self, cap: Capability<T>) {
self.pending.push((cap,1));
}

Expand Down Expand Up @@ -398,7 +398,7 @@ impl<T: Timestamp> FrontierNotificator<T> {
/// });
/// });
/// ```
pub fn pending<'a>(&'a self) -> ::std::slice::Iter<'a, (Capability<T>, u64)> {
pub fn pending(&self) -> ::std::slice::Iter<'_, (Capability<T>, u64)> {
self.pending.iter()
}
}
Expand Down
6 changes: 6 additions & 0 deletions timely/src/dataflow/operators/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,12 @@ impl<T:Timestamp, D: Data> Handle<T, D> {
}
}

impl<T: Timestamp, D: Data> Default for Handle<T, D> {
fn default() -> Self {
Self::new()
}
}

impl<T:Timestamp, D: Data> Drop for Handle<T, D> {
fn drop(&mut self) {
self.close_epoch();
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/scopes/child.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ where
let index = self.subgraph.borrow_mut().allocate_child_id();
let path = self.subgraph.borrow().path.clone();

let subscope = RefCell::new(SubgraphBuilder::new_from(index, path, self.logging().clone(), self.progress_logging.clone(), name));
let subscope = RefCell::new(SubgraphBuilder::new_from(index, path, self.logging(), self.progress_logging.clone(), name));
let result = {
let mut builder = Child {
subgraph: &subscope,
Expand Down
10 changes: 5 additions & 5 deletions timely/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ impl<T, E, P> BatchLogger<T, E, P> where P: EventPusher<Duration, (Duration, E,
}
}
/// Publishes a batch of logged events and advances the capability.
pub fn publish_batch(&mut self, time: &Duration, data: &mut Vec<(Duration, E, T)>) {
pub fn publish_batch(&mut self, &time: &Duration, data: &mut Vec<(Duration, E, T)>) {
if !data.is_empty() {
self.event_pusher.push(Event::Messages(self.time, data.drain(..).collect()));
}
if &self.time < time {
let new_frontier = time.clone();
let old_frontier = self.time.clone();
if self.time < time {
let new_frontier = time;
let old_frontier = self.time;
self.event_pusher.push(Event::Progress(vec![(new_frontier, 1), (old_frontier, -1)]));
}
self.time = time.clone();
self.time = time;
}
}
impl<T, E, P> Drop for BatchLogger<T, E, P> where P: EventPusher<Duration, (Duration, E, T)> {
Expand Down
4 changes: 2 additions & 2 deletions timely/src/progress/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ impl<T:Timestamp+Send> Progcaster<T> {
seq_no: counter,
channel,
addr: addr.clone(),
messages: messages,
internal: internal,
messages,
internal,
});
});

Expand Down
90 changes: 60 additions & 30 deletions timely/src/progress/change_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub struct ChangeBatch<T> {
clean: usize,
}

impl<T:Ord> ChangeBatch<T> {
impl<T> ChangeBatch<T> {

/// Allocates a new empty `ChangeBatch`.
///
Expand All @@ -32,10 +32,61 @@ impl<T:Ord> ChangeBatch<T> {
pub fn new() -> ChangeBatch<T> {
ChangeBatch {
updates: Vec::new(),
clean: 0
clean: 0,
}
}

/// Allocates a new empty `ChangeBatch` with space for `capacity` updates.
///
/// # Examples
///
///```
/// use timely::progress::ChangeBatch;
///
/// let mut batch = ChangeBatch::<usize>::with_capacity(10);
/// assert!(batch.is_empty());
///```
pub fn with_capacity(capacity: usize) -> ChangeBatch<T> {
ChangeBatch {
updates: Vec::with_capacity(capacity),
clean: 0,
}
}

/// Returns true if the change batch is not guaranteed compact.
pub fn is_dirty(&self) -> bool {
self.updates.len() > self.clean
}

/// Expose the internal vector of updates.
pub fn unstable_internal_updates(&self) -> &Vec<(T, i64)> { &self.updates }

/// Expose the internal value of `clean`.
pub fn unstable_internal_clean(&self) -> usize { self.clean }

/// Clears the map.
///
/// # Examples
///
///```
/// use timely::progress::ChangeBatch;
///
/// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
/// batch.clear();
/// assert!(batch.is_empty());
///```
#[inline]
pub fn clear(&mut self) {
self.updates.clear();
self.clean = 0;
}
}

impl<T> ChangeBatch<T>
where
T: Ord,
{

/// Allocates a new `ChangeBatch` with a single entry.
///
/// # Examples
Expand All @@ -52,11 +103,6 @@ impl<T:Ord> ChangeBatch<T> {
result
}

/// Returns true if the change batch is not guaranteed compact.
pub fn is_dirty(&self) -> bool {
self.updates.len() > self.clean
}

/// Adds a new update, for `item` with `value`.
///
/// This could be optimized to perform compaction when the number of "dirty" elements exceeds
Expand Down Expand Up @@ -157,23 +203,6 @@ impl<T:Ord> ChangeBatch<T> {
self.updates.drain(..)
}

/// Clears the map.
///
/// # Examples
///
///```
/// use timely::progress::ChangeBatch;
///
/// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
/// batch.clear();
/// assert!(batch.is_empty());
///```
#[inline]
pub fn clear(&mut self) {
self.updates.clear();
self.clean = 0;
}

/// True iff all keys have value zero.
///
/// This method requires mutable access to `self` because it may need to compact the representation
Expand Down Expand Up @@ -266,17 +295,12 @@ impl<T:Ord> ChangeBatch<T> {
self.updates[i].1 = 0;
}
}

self.updates.retain(|x| x.1 != 0);
}
self.clean = self.updates.len();
}

/// Expose the internal vector of updates.
pub fn unstable_internal_updates(&self) -> &Vec<(T, i64)> { &self.updates }

/// Expose the internal value of `clean`.
pub fn unstable_internal_clean(&self) -> usize { self.clean }

/// Maintain the bounds of pending (non-compacted) updates versus clean (compacted) data.
/// This function tries to minimize work by only compacting if enough work has accumulated.
fn maintain_bounds(&mut self) {
Expand All @@ -286,3 +310,9 @@ impl<T:Ord> ChangeBatch<T> {
}
}
}

impl<T> Default for ChangeBatch<T> {
fn default() -> Self {
Self::new()
}
}
Loading

0 comments on commit 3671a3b

Please sign in to comment.