Skip to content

Commit

Permalink
Fixed bug in Subgraph::get_internal_summary() (#383)
Browse files Browse the repository at this point in the history
* Fixed antichain bug

* Added assertions after summary construction

* Added comment about summary length

* Simplified summary loop

* Fancy asserts

* Simplified summary init
  • Loading branch information
Kixiron authored Apr 15, 2021
1 parent 3671a3b commit 79ff074
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 20 deletions.
5 changes: 5 additions & 0 deletions timely/src/progress/frontier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ impl<T: PartialOrder> Antichain<T> {
}
}

/// Reserves capacity for at least additional more elements to be inserted in the given `Antichain`
pub fn reserve(&mut self, additional: usize) {
self.elements.reserve(additional);
}

/// Performs a sequence of insertion and return true iff any insertion does.
///
/// # Examples
Expand Down
51 changes: 31 additions & 20 deletions timely/src/progress/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,26 +530,28 @@ where
assert_eq!(self.children[0].outputs, self.inputs());
assert_eq!(self.children[0].inputs, self.outputs());

let mut internal_summary = Vec::with_capacity(self.inputs());
for summary in self.scope_summary.iter() {
let scope_summary = summary.iter()
.map(|output| {
output.elements()
.iter()
.cloned()
.fold(
Antichain::with_capacity(output.elements().len()),
|mut antichain, path_summary| {
antichain.insert(TInner::summarize(path_summary));
antichain
},
)
})
.collect();

internal_summary.push(scope_summary);
// Note that we need to have `self.inputs()` elements in the summary
// with each element containing `self.outputs()` antichains regardless
// of how long `self.scope_summary` is
let mut internal_summary = vec![vec![Antichain::new(); self.outputs()]; self.inputs()];
for (input_idx, input) in self.scope_summary.iter().enumerate() {
for (output_idx, output) in input.iter().enumerate() {
let antichain = &mut internal_summary[input_idx][output_idx];
antichain.reserve(output.elements().len());
antichain.extend(output.elements().iter().cloned().map(TInner::summarize));
}
}

debug_assert_eq!(
internal_summary.len(),
self.inputs(),
"the internal summary should have as many elements as there are inputs",
);
debug_assert!(
internal_summary.iter().all(|summary| summary.len() == self.outputs()),
"each element of the internal summary should have as many elements as there are outputs",
);

// Each child has expressed initial capabilities (their `shared_progress.internals`).
// We introduce these into the progress tracker to determine the scope's initial
// internal capabilities.
Expand Down Expand Up @@ -632,8 +634,17 @@ impl<T: Timestamp> PerOperatorState<T> {

let (internal_summary, shared_progress) = scope.get_internal_summary();

assert_eq!(internal_summary.len(), inputs);
assert!(!internal_summary.iter().any(|x| x.len() != outputs));
assert_eq!(
internal_summary.len(),
inputs,
"operator summary has {} inputs when {} were expected",
internal_summary.len(),
inputs,
);
assert!(
!internal_summary.iter().any(|x| x.len() != outputs),
"operator summary had too few outputs",
);

PerOperatorState {
name: scope.name().to_owned(),
Expand Down

0 comments on commit 79ff074

Please sign in to comment.