From 79ff074f2b49b468cc3da59e9e62bdfeecc35b4c Mon Sep 17 00:00:00 2001 From: Chase Wilson Date: Wed, 14 Apr 2021 19:56:14 -0500 Subject: [PATCH] Fixed bug in Subgraph::get_internal_summary() (#383) * Fixed antichain bug * Added assertions after summary construction * Added comment about summary length * Simplified summary loop * Fancy asserts * Simplified summary init --- timely/src/progress/frontier.rs | 5 ++++ timely/src/progress/subgraph.rs | 51 ++++++++++++++++++++------------- 2 files changed, 36 insertions(+), 20 deletions(-) diff --git a/timely/src/progress/frontier.rs b/timely/src/progress/frontier.rs index 4d370b6aa..d8e23d5d3 100644 --- a/timely/src/progress/frontier.rs +++ b/timely/src/progress/frontier.rs @@ -43,6 +43,11 @@ impl Antichain { } } + /// Reserves capacity for at least additional more elements to be inserted in the given `Antichain` + pub fn reserve(&mut self, additional: usize) { + self.elements.reserve(additional); + } + /// Performs a sequence of insertion and return true iff any insertion does. /// /// # Examples diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index b178532cb..309d9077c 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -530,26 +530,28 @@ where assert_eq!(self.children[0].outputs, self.inputs()); assert_eq!(self.children[0].inputs, self.outputs()); - let mut internal_summary = Vec::with_capacity(self.inputs()); - for summary in self.scope_summary.iter() { - let scope_summary = summary.iter() - .map(|output| { - output.elements() - .iter() - .cloned() - .fold( - Antichain::with_capacity(output.elements().len()), - |mut antichain, path_summary| { - antichain.insert(TInner::summarize(path_summary)); - antichain - }, - ) - }) - .collect(); - - internal_summary.push(scope_summary); + // Note that we need to have `self.inputs()` elements in the summary + // with each element containing `self.outputs()` antichains regardless + // of how long `self.scope_summary` is + let mut internal_summary = vec![vec![Antichain::new(); self.outputs()]; self.inputs()]; + for (input_idx, input) in self.scope_summary.iter().enumerate() { + for (output_idx, output) in input.iter().enumerate() { + let antichain = &mut internal_summary[input_idx][output_idx]; + antichain.reserve(output.elements().len()); + antichain.extend(output.elements().iter().cloned().map(TInner::summarize)); + } } + debug_assert_eq!( + internal_summary.len(), + self.inputs(), + "the internal summary should have as many elements as there are inputs", + ); + debug_assert!( + internal_summary.iter().all(|summary| summary.len() == self.outputs()), + "each element of the internal summary should have as many elements as there are outputs", + ); + // Each child has expressed initial capabilities (their `shared_progress.internals`). // We introduce these into the progress tracker to determine the scope's initial // internal capabilities. @@ -632,8 +634,17 @@ impl PerOperatorState { let (internal_summary, shared_progress) = scope.get_internal_summary(); - assert_eq!(internal_summary.len(), inputs); - assert!(!internal_summary.iter().any(|x| x.len() != outputs)); + assert_eq!( + internal_summary.len(), + inputs, + "operator summary has {} inputs when {} were expected", + internal_summary.len(), + inputs, + ); + assert!( + !internal_summary.iter().any(|x| x.len() != outputs), + "operator summary had too few outputs", + ); PerOperatorState { name: scope.name().to_owned(),