Skip to content

Commit

Permalink
Remove SizableContainer requirement from partition (#612)
Browse files Browse the repository at this point in the history
Removes the SizableContainer requirement from the partition operator. Also
removes the same requirement from some places where it's not longer needed.

Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru authored Dec 18, 2024
1 parent f45b2aa commit 486c988
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 14 deletions.
1 change: 0 additions & 1 deletion container/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ pub trait ContainerBuilder: Default + 'static {
fn partition<I>(container: &mut Self::Container, builders: &mut [Self], mut index: I)
where
Self: for<'a> PushInto<<Self::Container as Container>::Item<'a>>,
Self::Container: SizableContainer,
I: for<'a> FnMut(&<Self::Container as Container>::Item<'a>) -> usize,
{
for datum in container.drain() {
Expand Down
3 changes: 1 addition & 2 deletions timely/src/dataflow/channels/pact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ pub type Exchange<D, F> = ExchangeCore<CapacityContainerBuilder<Vec<D>>, F>;
impl<CB, F> ExchangeCore<CB, F>
where
CB: LengthPreservingContainerBuilder,
CB::Container: SizableContainer,
for<'a> F: FnMut(&<CB::Container as Container>::Item<'a>)->u64
{
/// Allocates a new `Exchange` pact from a distribution function.
Expand Down Expand Up @@ -84,7 +83,7 @@ impl<T: Timestamp, CB, H: 'static> ParallelizationContract<T, CB::Container> for
where
CB: ContainerBuilder,
CB: for<'a> PushInto<<CB::Container as Container>::Item<'a>>,
CB::Container: Data + Send + SizableContainer + crate::dataflow::channels::ContainerBytes,
CB::Container: Data + Send + crate::dataflow::channels::ContainerBytes,
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
{
type Pusher = ExchangePusher<T, CB, LogPusher<T, CB::Container, Box<dyn Push<Message<T, CB::Container>>>>, H>;
Expand Down
5 changes: 1 addition & 4 deletions timely/src/dataflow/channels/pushers/exchange.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! The exchange pattern distributes pushed data between many target pushees.
use crate::communication::Push;
use crate::container::{ContainerBuilder, SizableContainer, PushInto};
use crate::container::{ContainerBuilder, PushInto};
use crate::dataflow::channels::Message;
use crate::{Container, Data};

Expand All @@ -10,7 +10,6 @@ use crate::{Container, Data};
pub struct Exchange<T, CB, P, H>
where
CB: ContainerBuilder,
CB::Container: SizableContainer,
P: Push<Message<T, CB::Container>>,
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
{
Expand All @@ -23,7 +22,6 @@ where
impl<T: Clone, CB, P, H> Exchange<T, CB, P, H>
where
CB: ContainerBuilder,
CB::Container: SizableContainer,
P: Push<Message<T, CB::Container>>,
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
{
Expand Down Expand Up @@ -53,7 +51,6 @@ where
impl<T: Eq+Data, CB, P, H> Push<Message<T, CB::Container>> for Exchange<T, CB, P, H>
where
CB: ContainerBuilder,
CB::Container: SizableContainer,
CB: for<'a> PushInto<<CB::Container as Container>::Item<'a>>,
P: Push<Message<T, CB::Container>>,
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
Expand Down
14 changes: 7 additions & 7 deletions timely/src/dataflow/operators/core/partition.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Partition a stream of records into multiple streams.
use timely_container::{Container, ContainerBuilder, PushInto, SizableContainer};
use timely_container::{Container, ContainerBuilder, PushInto};

use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
Expand Down Expand Up @@ -29,16 +29,16 @@ pub trait Partition<G: Scope, C: Container> {
/// ```
fn partition<CB, D2, F>(&self, parts: u64, route: F) -> Vec<StreamCore<G, CB::Container>>
where
CB: ContainerBuilder,
CB::Container: SizableContainer + PushInto<D2> + Data,
CB: ContainerBuilder + PushInto<D2>,
CB::Container: Data,
F: FnMut(C::Item<'_>) -> (u64, D2) + 'static;
}

impl<G: Scope, C: Container + Data> Partition<G, C> for StreamCore<G, C> {
fn partition<CB, D2, F>(&self, parts: u64, mut route: F) -> Vec<StreamCore<G, CB::Container>>
where
CB: ContainerBuilder,
CB::Container: SizableContainer + PushInto<D2> + Data,
CB: ContainerBuilder + PushInto<D2>,
CB::Container: Data,
F: FnMut(C::Item<'_>) -> (u64, D2) + 'static,
{
let mut builder = OperatorBuilder::new("Partition".to_owned(), self.scope());
Expand All @@ -48,7 +48,7 @@ impl<G: Scope, C: Container + Data> Partition<G, C> for StreamCore<G, C> {
let mut streams = Vec::with_capacity(parts as usize);

for _ in 0..parts {
let (output, stream) = builder.new_output();
let (output, stream) = builder.new_output::<CB>();
outputs.push(output);
streams.push(stream);
}
Expand All @@ -59,7 +59,7 @@ impl<G: Scope, C: Container + Data> Partition<G, C> for StreamCore<G, C> {
input.for_each(|time, data| {
let mut sessions = handles
.iter_mut()
.map(|h| h.session(&time))
.map(|h| h.session_with_builder(&time))
.collect::<Vec<_>>();

for datum in data.drain() {
Expand Down

0 comments on commit 486c988

Please sign in to comment.