diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index 88055f354b..2b123683e0 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -38,7 +38,7 @@ impl ParallelizationContract for Pipeline { // // ignore `&mut A` and use thread allocator // let (pusher, puller) = Thread::new::>(); (LogPusher::new(pusher, allocator.index(), allocator.index(), identifier, logging.clone()), - LogPuller::new(puller, allocator.index(), identifier, logging.clone())) + LogPuller::new(puller, allocator.index(), identifier, logging)) } } diff --git a/timely/src/dataflow/channels/pushers/buffer.rs b/timely/src/dataflow/channels/pushers/buffer.rs index ceb709d093..7bb4c5553a 100644 --- a/timely/src/dataflow/channels/pushers/buffer.rs +++ b/timely/src/dataflow/channels/pushers/buffer.rs @@ -113,7 +113,7 @@ impl<'a, T, D, P: Push>+'a> Session<'a, T, D, P> where T: Eq+Clone /// new backing memory. #[inline] pub fn give_vec(&mut self, message: &mut Vec) { - if message.len() > 0 { + if !message.is_empty() { self.buffer.give_vec(message); } } @@ -144,7 +144,7 @@ impl<'a, T: Timestamp, D, P: Push>+'a> AutoflushSession<'a, T, D, P /// Transmits a pre-packed batch of data. #[inline] pub fn give_content(&mut self, message: &mut Vec) { - if message.len() > 0 { + if !message.is_empty() { self.buffer.give_vec(message); } } diff --git a/timely/src/dataflow/operators/capability.rs b/timely/src/dataflow/operators/capability.rs index 9c0e959b1a..18cc5800c5 100644 --- a/timely/src/dataflow/operators/capability.rs +++ b/timely/src/dataflow/operators/capability.rs @@ -385,10 +385,16 @@ impl CapabilitySet { } } +impl Default for CapabilitySet { + fn default() -> Self { + Self::new() + } +} + impl Deref for CapabilitySet { type Target=[Capability]; fn deref(&self) -> &[Capability] { - &self.elements[..] + &self.elements } } diff --git a/timely/src/dataflow/operators/enterleave.rs b/timely/src/dataflow/operators/enterleave.rs index e581f4e04c..6153d7f505 100644 --- a/timely/src/dataflow/operators/enterleave.rs +++ b/timely/src/dataflow/operators/enterleave.rs @@ -136,7 +136,7 @@ impl<'a, G: Scope, D: Data, T: Timestamp+Refines> Leave for Stream::new( output, registrar, - scope.parent.clone() + scope.parent, ) } } @@ -206,7 +206,7 @@ mod test { fn test_nested() { use crate::dataflow::{InputHandle, ProbeHandle}; - use crate::dataflow::operators::{Input, Exchange, Inspect, Probe}; + use crate::dataflow::operators::{Input, Inspect, Probe}; use crate::dataflow::Scope; use crate::dataflow::operators::{Enter, Leave}; diff --git a/timely/src/dataflow/operators/filter.rs b/timely/src/dataflow/operators/filter.rs index a35346eda9..7034a9df09 100644 --- a/timely/src/dataflow/operators/filter.rs +++ b/timely/src/dataflow/operators/filter.rs @@ -29,7 +29,7 @@ impl Filter for Stream { input.for_each(|time, data| { data.swap(&mut vector); vector.retain(|x| predicate(x)); - if vector.len() > 0 { + if !vector.is_empty() { output.session(&time).give_vec(&mut vector); } }); diff --git a/timely/src/dataflow/operators/generic/builder_raw.rs b/timely/src/dataflow/operators/generic/builder_raw.rs index 64e4d04bb2..ace34cb00c 100644 --- a/timely/src/dataflow/operators/generic/builder_raw.rs +++ b/timely/src/dataflow/operators/generic/builder_raw.rs @@ -163,7 +163,7 @@ impl OperatorBuilder { let operator = OperatorCore { shape: self.shape, address: self.address, - activations: self.scope.activations().clone(), + activations: self.scope.activations(), logic, shared_progress: Rc::new(RefCell::new(SharedProgress::new(inputs, outputs))), summary: self.summary, diff --git a/timely/src/dataflow/operators/generic/builder_rc.rs b/timely/src/dataflow/operators/generic/builder_rc.rs index 5f15878baf..e2e263dd57 100644 --- a/timely/src/dataflow/operators/generic/builder_rc.rs +++ b/timely/src/dataflow/operators/generic/builder_rc.rs @@ -137,11 +137,10 @@ impl OperatorBuilder { { // create capabilities, discard references to their creation. let mut capabilities = Vec::with_capacity(self.internal.borrow().len()); - for output_index in 0 .. self.internal.borrow().len() { - let borrow = &self.internal.borrow()[output_index]; - capabilities.push(mint_capability(G::Timestamp::minimum(), borrow.clone())); + for batch in self.internal.borrow().iter() { + capabilities.push(mint_capability(G::Timestamp::minimum(), batch.clone())); // Discard evidence of creation, as we are assumed to start with one. - borrow.borrow_mut().clear(); + batch.borrow_mut().clear(); } let mut logic = constructor(capabilities); @@ -155,16 +154,16 @@ impl OperatorBuilder { move |progress: &mut SharedProgress| { // drain frontier changes - for index in 0 .. progress.frontiers.len() { - self_frontier[index].update_iter(progress.frontiers[index].drain()); + for (progress, frontier) in progress.frontiers.iter_mut().zip(self_frontier.iter_mut()) { + frontier.update_iter(progress.drain()); } // invoke supplied logic let result = logic(&self_frontier[..]); // move batches of consumed changes. - for index in 0 .. progress.consumeds.len() { - self_consumed[index].borrow_mut().drain_into(&mut progress.consumeds[index]); + for (progress, consumed) in progress.consumeds.iter_mut().zip(self_consumed.iter()) { + consumed.borrow_mut().drain_into(progress); } // move batches of internal changes. @@ -175,8 +174,8 @@ impl OperatorBuilder { } // move batches of produced changes. - for index in 0 .. progress.produceds.len() { - self_produced[index].borrow_mut().drain_into(&mut progress.produceds[index]); + for (progress, produced) in progress.produceds.iter_mut().zip(self_produced.iter()) { + produced.borrow_mut().drain_into(progress); } result diff --git a/timely/src/dataflow/operators/generic/notificator.rs b/timely/src/dataflow/operators/generic/notificator.rs index 6fb1e7e8cc..ded5ab7f2c 100644 --- a/timely/src/dataflow/operators/generic/notificator.rs +++ b/timely/src/dataflow/operators/generic/notificator.rs @@ -282,7 +282,7 @@ impl FrontierNotificator { /// }); /// ``` #[inline] - pub fn notify_at<'a>(&mut self, cap: Capability) { + pub fn notify_at(&mut self, cap: Capability) { self.pending.push((cap,1)); } @@ -398,7 +398,7 @@ impl FrontierNotificator { /// }); /// }); /// ``` - pub fn pending<'a>(&'a self) -> ::std::slice::Iter<'a, (Capability, u64)> { + pub fn pending(&self) -> ::std::slice::Iter<'_, (Capability, u64)> { self.pending.iter() } } diff --git a/timely/src/dataflow/operators/input.rs b/timely/src/dataflow/operators/input.rs index 120e44abee..e66d9a63e1 100644 --- a/timely/src/dataflow/operators/input.rs +++ b/timely/src/dataflow/operators/input.rs @@ -369,6 +369,12 @@ impl Handle { } } +impl Default for Handle { + fn default() -> Self { + Self::new() + } +} + impl Drop for Handle { fn drop(&mut self) { self.close_epoch(); diff --git a/timely/src/dataflow/scopes/child.rs b/timely/src/dataflow/scopes/child.rs index 98f255e077..07789304c9 100644 --- a/timely/src/dataflow/scopes/child.rs +++ b/timely/src/dataflow/scopes/child.rs @@ -118,7 +118,7 @@ where let index = self.subgraph.borrow_mut().allocate_child_id(); let path = self.subgraph.borrow().path.clone(); - let subscope = RefCell::new(SubgraphBuilder::new_from(index, path, self.logging().clone(), self.progress_logging.clone(), name)); + let subscope = RefCell::new(SubgraphBuilder::new_from(index, path, self.logging(), self.progress_logging.clone(), name)); let result = { let mut builder = Child { subgraph: &subscope, diff --git a/timely/src/logging.rs b/timely/src/logging.rs index 0be7f98d37..4a59e057f5 100644 --- a/timely/src/logging.rs +++ b/timely/src/logging.rs @@ -30,16 +30,16 @@ impl BatchLogger where P: EventPusher) { + pub fn publish_batch(&mut self, &time: &Duration, data: &mut Vec<(Duration, E, T)>) { if !data.is_empty() { self.event_pusher.push(Event::Messages(self.time, data.drain(..).collect())); } - if &self.time < time { - let new_frontier = time.clone(); - let old_frontier = self.time.clone(); + if self.time < time { + let new_frontier = time; + let old_frontier = self.time; self.event_pusher.push(Event::Progress(vec![(new_frontier, 1), (old_frontier, -1)])); } - self.time = time.clone(); + self.time = time; } } impl Drop for BatchLogger where P: EventPusher { diff --git a/timely/src/progress/broadcast.rs b/timely/src/progress/broadcast.rs index 5eead8b92f..7c0b95dfe9 100644 --- a/timely/src/progress/broadcast.rs +++ b/timely/src/progress/broadcast.rs @@ -154,8 +154,8 @@ impl Progcaster { seq_no: counter, channel, addr: addr.clone(), - messages: messages, - internal: internal, + messages, + internal, }); }); diff --git a/timely/src/progress/change_batch.rs b/timely/src/progress/change_batch.rs index 09f4cb5ec6..6573a45c8d 100644 --- a/timely/src/progress/change_batch.rs +++ b/timely/src/progress/change_batch.rs @@ -17,7 +17,7 @@ pub struct ChangeBatch { clean: usize, } -impl ChangeBatch { +impl ChangeBatch { /// Allocates a new empty `ChangeBatch`. /// @@ -32,10 +32,61 @@ impl ChangeBatch { pub fn new() -> ChangeBatch { ChangeBatch { updates: Vec::new(), - clean: 0 + clean: 0, } } + /// Allocates a new empty `ChangeBatch` with space for `capacity` updates. + /// + /// # Examples + /// + ///``` + /// use timely::progress::ChangeBatch; + /// + /// let mut batch = ChangeBatch::::with_capacity(10); + /// assert!(batch.is_empty()); + ///``` + pub fn with_capacity(capacity: usize) -> ChangeBatch { + ChangeBatch { + updates: Vec::with_capacity(capacity), + clean: 0, + } + } + + /// Returns true if the change batch is not guaranteed compact. + pub fn is_dirty(&self) -> bool { + self.updates.len() > self.clean + } + + /// Expose the internal vector of updates. + pub fn unstable_internal_updates(&self) -> &Vec<(T, i64)> { &self.updates } + + /// Expose the internal value of `clean`. + pub fn unstable_internal_clean(&self) -> usize { self.clean } + + /// Clears the map. + /// + /// # Examples + /// + ///``` + /// use timely::progress::ChangeBatch; + /// + /// let mut batch = ChangeBatch::::new_from(17, 1); + /// batch.clear(); + /// assert!(batch.is_empty()); + ///``` + #[inline] + pub fn clear(&mut self) { + self.updates.clear(); + self.clean = 0; + } +} + +impl ChangeBatch +where + T: Ord, +{ + /// Allocates a new `ChangeBatch` with a single entry. /// /// # Examples @@ -52,11 +103,6 @@ impl ChangeBatch { result } - /// Returns true if the change batch is not guaranteed compact. - pub fn is_dirty(&self) -> bool { - self.updates.len() > self.clean - } - /// Adds a new update, for `item` with `value`. /// /// This could be optimized to perform compaction when the number of "dirty" elements exceeds @@ -157,23 +203,6 @@ impl ChangeBatch { self.updates.drain(..) } - /// Clears the map. - /// - /// # Examples - /// - ///``` - /// use timely::progress::ChangeBatch; - /// - /// let mut batch = ChangeBatch::::new_from(17, 1); - /// batch.clear(); - /// assert!(batch.is_empty()); - ///``` - #[inline] - pub fn clear(&mut self) { - self.updates.clear(); - self.clean = 0; - } - /// True iff all keys have value zero. /// /// This method requires mutable access to `self` because it may need to compact the representation @@ -266,17 +295,12 @@ impl ChangeBatch { self.updates[i].1 = 0; } } + self.updates.retain(|x| x.1 != 0); } self.clean = self.updates.len(); } - /// Expose the internal vector of updates. - pub fn unstable_internal_updates(&self) -> &Vec<(T, i64)> { &self.updates } - - /// Expose the internal value of `clean`. - pub fn unstable_internal_clean(&self) -> usize { self.clean } - /// Maintain the bounds of pending (non-compacted) updates versus clean (compacted) data. /// This function tries to minimize work by only compacting if enough work has accumulated. fn maintain_bounds(&mut self) { @@ -286,3 +310,9 @@ impl ChangeBatch { } } } + +impl Default for ChangeBatch { + fn default() -> Self { + Self::new() + } +} diff --git a/timely/src/progress/frontier.rs b/timely/src/progress/frontier.rs index da1ff21df4..4d370b6aad 100644 --- a/timely/src/progress/frontier.rs +++ b/timely/src/progress/frontier.rs @@ -124,6 +124,21 @@ impl Antichain { ///``` pub fn new() -> Antichain { Antichain { elements: Vec::new() } } + /// Creates a new empty `Antichain` with space for `capacity` elements. + /// + /// # Examples + /// + ///``` + /// use timely::progress::frontier::Antichain; + /// + /// let mut frontier = Antichain::::with_capacity(10); + ///``` + pub fn with_capacity(capacity: usize) -> Self { + Self { + elements: Vec::with_capacity(capacity), + } + } + /// Creates a new singleton `Antichain`. /// /// # Examples @@ -161,7 +176,7 @@ impl Antichain { /// let mut frontier = Antichain::from_elem(2); /// assert_eq!(frontier.elements(), &[2]); ///``` - #[inline] pub fn elements(&self) -> &[T] { &self.elements[..] } + #[inline] pub fn elements(&self) -> &[T] { &self.elements } /// Reveals the elements in the antichain. /// @@ -173,7 +188,7 @@ impl Antichain { /// let mut frontier = Antichain::from_elem(2); /// assert_eq!(&*frontier.borrow(), &[2]); ///``` - #[inline] pub fn borrow(&self) -> AntichainRef { AntichainRef::new(&self.elements[..]) }} + #[inline] pub fn borrow(&self) -> AntichainRef { AntichainRef::new(&self.elements) }} impl PartialEq for Antichain { fn eq(&self, other: &Self) -> bool { @@ -223,14 +238,14 @@ impl From> for Antichain { /// but I strongly recommend against using them unless you must (on part of timely progress tracking seems /// to be greatly simplified by access to this) #[derive(Clone, Debug, Abomonation, Serialize, Deserialize)] -pub struct MutableAntichain { +pub struct MutableAntichain { dirty: usize, updates: Vec<(T, i64)>, frontier: Vec, changes: ChangeBatch, } -impl MutableAntichain { +impl MutableAntichain { /// Creates a new empty `MutableAntichain`. /// /// # Examples @@ -272,7 +287,10 @@ impl MutableAntichain { /// This method deletes the contents. Unlike `clear` it records doing so. pub fn empty(&mut self) { - for index in 0 .. self.updates.len() { self.updates[index].1 = 0; } + for (_, diff) in self.updates.iter_mut() { + *diff = 0; + } + self.dirty = self.updates.len(); } @@ -287,7 +305,7 @@ impl MutableAntichain { /// assert!(frontier.frontier().len() == 0); ///``` #[inline] - pub fn frontier(&self) -> AntichainRef { + pub fn frontier(&self) -> AntichainRef<'_, T> { debug_assert_eq!(self.dirty, 0); AntichainRef::new(&self.frontier) } @@ -303,7 +321,10 @@ impl MutableAntichain { /// assert!(frontier.frontier() == AntichainRef::new(&[0u64])); ///``` #[inline] - pub fn new_bottom(bottom: T) -> MutableAntichain { + pub fn new_bottom(bottom: T) -> MutableAntichain + where + T: Clone, + { MutableAntichain { dirty: 0, updates: vec![(bottom.clone(), 1)], @@ -341,7 +362,10 @@ impl MutableAntichain { /// assert!(frontier.less_than(&2)); ///``` #[inline] - pub fn less_than(&self, time: &T) -> bool { + pub fn less_than(&self, time: &T) -> bool + where + T: PartialOrder, + { debug_assert_eq!(self.dirty, 0); self.frontier().less_than(time) } @@ -359,7 +383,10 @@ impl MutableAntichain { /// assert!(frontier.less_equal(&2)); ///``` #[inline] - pub fn less_equal(&self, time: &T) -> bool { + pub fn less_equal(&self, time: &T) -> bool + where + T: PartialOrder, + { debug_assert_eq!(self.dirty, 0); self.frontier().less_equal(time) } @@ -393,10 +420,17 @@ impl MutableAntichain { /// assert!(changes == vec![(1, -1), (2, 1)]); ///``` #[inline] - pub fn update_iter<'a, I>(&'a mut self, updates: I) -> ::std::vec::Drain<'a, (T, i64)> + pub fn update_iter(&mut self, updates: I) -> ::std::vec::Drain<'_, (T, i64)> where + T: Clone + PartialOrder + Ord, I: IntoIterator, { + let updates = updates.into_iter(); + + // Attempt to pre-allocate for the new updates + let (min, max) = updates.size_hint(); + self.updates.reserve(max.unwrap_or(min)); + for (time, delta) in updates { self.updates.push((time, delta)); self.dirty += 1; @@ -431,7 +465,10 @@ impl MutableAntichain { /// This method is meant to be used for bulk updates to the frontier, and does more work than one might do /// for single updates, but is meant to be an efficient way to process multiple updates together. This is /// especially true when we want to apply very large numbers of updates. - fn rebuild(&mut self) { + fn rebuild(&mut self) + where + T: Clone + PartialOrder + Ord, + { // sort and consolidate updates; retain non-zero accumulations. if !self.updates.is_empty() { @@ -463,7 +500,10 @@ impl MutableAntichain { } /// Reports the count for a queried time. - pub fn count_for(&self, query_time: &T) -> i64 { + pub fn count_for(&self, query_time: &T) -> i64 + where + T: Ord, + { self.updates .iter() .filter(|td| td.0.eq(query_time)) @@ -472,13 +512,19 @@ impl MutableAntichain { } } +impl Default for MutableAntichain { + fn default() -> Self { + Self::new() + } +} + /// Extension trait for filtering time changes through antichains. pub trait MutableAntichainFilter { /// Filters time changes through an antichain. /// /// # Examples /// - ///``` + /// ``` /// use timely::progress::frontier::{MutableAntichain, MutableAntichainFilter}; /// /// let mut frontier = MutableAntichain::new_bottom(1u64); @@ -488,7 +534,7 @@ pub trait MutableAntichainFilter { /// .collect::>(); /// /// assert!(changes == vec![(1, -1), (2, 1)]); - ///``` + /// ``` fn filter_through(self, antichain: &mut MutableAntichain) -> ::std::vec::Drain<(T,i64)>; } @@ -508,10 +554,11 @@ pub struct AntichainRef<'a, T: 'a> { impl<'a, T: 'a> Clone for AntichainRef<'a, T> { fn clone(&self) -> Self { Self { - frontier: self.frontier.clone(), + frontier: self.frontier, } } } + impl<'a, T: 'a> Copy for AntichainRef<'a, T> { } impl<'a, T: 'a> AntichainRef<'a, T> { diff --git a/timely/src/progress/mod.rs b/timely/src/progress/mod.rs index 912ce456d7..5d169c75d3 100644 --- a/timely/src/progress/mod.rs +++ b/timely/src/progress/mod.rs @@ -33,9 +33,9 @@ impl Location { Location { node, port: Port::Source(port) } } /// If the location is a target. - pub fn is_target(&self) -> bool { if let Port::Target(_) = self.port { true } else { false } } + pub fn is_target(&self) -> bool { matches!(self.port, Port::Target(_)) } /// If the location is a source. - pub fn is_source(&self) -> bool { if let Port::Source(_) = self.port { true } else { false } } + pub fn is_source(&self) -> bool { matches!(self.port, Port::Source(_)) } } impl From for Location { diff --git a/timely/src/progress/reachability.rs b/timely/src/progress/reachability.rs index 9d2283d98f..8e51c839ff 100644 --- a/timely/src/progress/reachability.rs +++ b/timely/src/progress/reachability.rs @@ -343,6 +343,12 @@ impl Builder { } } +impl Default for Builder { + fn default() -> Self { + Self::new() + } +} + /// An interactive tracker of propagated reachability information. /// /// A `Tracker` tracks, for a fixed graph topology, the implications of @@ -439,6 +445,7 @@ impl PortInformation { output_summaries: Vec::new(), } } + /// True if updates at this pointstamp uniquely block progress. /// /// This method returns true if the currently maintained pointstamp @@ -455,6 +462,12 @@ impl PortInformation { } } +impl Default for PortInformation { + fn default() -> Self { + Self::new() + } +} + impl Tracker { /// Updates the count for a time at a location. @@ -742,7 +755,7 @@ fn summarize_outputs( } } - let mut results = HashMap::new(); + let mut results: HashMap>> = HashMap::new(); let mut worklist = VecDeque::<(Location, usize, T::Summary)>::new(); let outputs = @@ -771,7 +784,11 @@ fn summarize_outputs( // Determine the current path summaries from the input port. let location = Location { node: location.node, port: Port::Target(input_port) }; - let antichains = results.entry(location).or_insert(Vec::new()); + let antichains = results + .entry(location) + .and_modify(|antichains| antichains.reserve(output)) + .or_insert_with(|| Vec::with_capacity(output)); + while antichains.len() <= output { antichains.push(Antichain::new()); } // Combine each operator-internal summary to the output with `summary`. @@ -791,12 +808,16 @@ fn summarize_outputs( Port::Target(_port) => { // Each target should have (at most) one source. - if let Some(source) = reverse.get(&location) { - let antichains = results.entry(*source).or_insert(Vec::new()); + if let Some(&source) = reverse.get(&location) { + let antichains = results + .entry(source) + .and_modify(|antichains| antichains.reserve(output)) + .or_insert_with(|| Vec::with_capacity(output)); + while antichains.len() <= output { antichains.push(Antichain::new()); } if antichains[output].insert(summary.clone()) { - worklist.push_back((*source, output, summary.clone())); + worklist.push_back((source, output, summary.clone())); } } diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index 2d52f41a05..b178532cbb 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -187,7 +187,7 @@ where incomplete[0] = false; let incomplete_count = incomplete.len() - 1; - let activations = worker.activations().clone(); + let activations = worker.activations(); activations.borrow_mut().activate(&self.path[..]); @@ -530,13 +530,24 @@ where assert_eq!(self.children[0].outputs, self.inputs()); assert_eq!(self.children[0].inputs, self.outputs()); - let mut internal_summary = vec![vec![Antichain::new(); self.outputs()]; self.inputs()]; - for input in 0 .. self.scope_summary.len() { - for output in 0 .. self.scope_summary[input].len() { - for path_summary in self.scope_summary[input][output].elements().iter() { - internal_summary[input][output].insert(TInner::summarize(path_summary.clone())); - } - } + let mut internal_summary = Vec::with_capacity(self.inputs()); + for summary in self.scope_summary.iter() { + let scope_summary = summary.iter() + .map(|output| { + output.elements() + .iter() + .cloned() + .fold( + Antichain::with_capacity(output.elements().len()), + |mut antichain, path_summary| { + antichain.insert(TInner::summarize(path_summary)); + antichain + }, + ) + }) + .collect(); + + internal_summary.push(scope_summary); } // Each child has expressed initial capabilities (their `shared_progress.internals`). diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 49dafde7e3..d3d01fa2be 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -117,8 +117,7 @@ impl Config { #[cfg(feature = "getopts")] pub fn from_matches(matches: &getopts_dep::Matches) -> Result { let progress_mode = matches - .opt_get_default("progress-mode", ProgressMode::Eager) - .map_err(|e| e.to_string())?; + .opt_get_default("progress-mode", ProgressMode::Eager)?; Ok(Config::default().progress_mode(progress_mode)) } @@ -233,14 +232,14 @@ impl AsWorker for Worker { fn index(&self) -> usize { self.allocator.borrow().index() } fn peers(&self) -> usize { self.allocator.borrow().peers() } fn allocate(&mut self, identifier: usize, address: &[usize]) -> (Vec>>>, Box>>) { - if address.len() == 0 { panic!("Unacceptable address: Length zero"); } + if address.is_empty() { panic!("Unacceptable address: Length zero"); } let mut paths = self.paths.borrow_mut(); paths.insert(identifier, address.to_vec()); self.temp_channel_ids.borrow_mut().push(identifier); self.allocator.borrow_mut().allocate(identifier) } fn pipeline(&mut self, identifier: usize, address: &[usize]) -> (ThreadPusher>, ThreadPuller>) { - if address.len() == 0 { panic!("Unacceptable address: Length zero"); } + if address.is_empty() { panic!("Unacceptable address: Length zero"); } let mut paths = self.paths.borrow_mut(); paths.insert(identifier, address.to_vec()); self.temp_channel_ids.borrow_mut().push(identifier); @@ -266,14 +265,14 @@ impl Worker { let index = c.index(); Worker { config, - timer: now.clone(), + timer: now, paths: Default::default(), allocator: Rc::new(RefCell::new(c)), identifiers: Default::default(), dataflows: Default::default(), dataflow_counter: Default::default(), - logging: Rc::new(RefCell::new(crate::logging_core::Registry::new(now.clone(), index))), - activations: Rc::new(RefCell::new(Activations::new(now.clone()))), + logging: Rc::new(RefCell::new(crate::logging_core::Registry::new(now, index))), + activations: Rc::new(RefCell::new(Activations::new(now))), active_dataflows: Default::default(), temp_channel_ids: Default::default(), } @@ -370,8 +369,10 @@ impl Worker { if !self.dataflows.borrow().is_empty() && delay != Some(Duration::new(0,0)) { // Log parking and flush log. - self.logging().as_mut().map(|l| l.log(crate::logging::ParkEvent::park(delay))); - self.logging.borrow_mut().flush(); + if let Some(l) = self.logging().as_mut() { + l.log(crate::logging::ParkEvent::park(delay)); + l.flush(); + } self.allocator .borrow() @@ -633,20 +634,21 @@ impl Worker { subgraph: &subscope, parent: self.clone(), logging: logging.clone(), - progress_logging: progress_logging.clone(), + progress_logging, }; func(&mut resources, &mut builder) }; let mut operator = subscope.into_inner().build(self); - logging.as_mut().map(|l| l.log(crate::logging::OperatesEvent { - id: identifier, - addr: operator.path().to_vec(), - name: operator.name().to_string(), - })); - - logging.as_mut().map(|l| l.flush()); + if let Some(l) = logging.as_mut() { + l.log(crate::logging::OperatesEvent { + id: identifier, + addr: operator.path().to_vec(), + name: operator.name().to_string(), + }); + l.flush(); + } operator.get_internal_summary(); operator.set_external_summary();