Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove core #515

Closed
wants to merge 35 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
45c1ec2
Replace Stream with StreamCore
antiguru Mar 17, 2023
81a4fa5
Rename StreamCore to Stream
antiguru Mar 17, 2023
567f3c1
Inline Bundle
antiguru Mar 17, 2023
68fcdc7
Rename BundleCore to Bundle
antiguru Mar 17, 2023
2b322a3
Remove ParallelizationContract
antiguru Mar 17, 2023
99e37ad
Rename ParllelizationContractCore to ParallelizationContract
antiguru Mar 17, 2023
a0110e3
Inline push Counter
antiguru Mar 17, 2023
2374fa9
Rename push CounterCore to Counter
antiguru Mar 17, 2023
46d3f0c
Inline Buffer
antiguru Mar 17, 2023
b79f8f0
Rename BufferCore to Buffer
antiguru Mar 17, 2023
d554902
Replace Exchange with container-variant, remove ExchangeCore
antiguru Mar 17, 2023
dc86120
Inline FrontieredInputHandle
antiguru Mar 17, 2023
0c6fd14
Rename FrontieredInputHanleCore to FrontieredInputHanle
antiguru Mar 17, 2023
3017c61
Inline InputHandle
antiguru Mar 17, 2023
bca00ab
Rename InputHandleCore to InputHandle
antiguru Mar 17, 2023
71f2257
Inline OutputHandle
antiguru Mar 17, 2023
3093ecf
Rename OutputHandleCore to OutputHandle
antiguru Mar 17, 2023
e172764
Inline Tee
antiguru Mar 17, 2023
3379b47
Rename TeeCore to Tee
antiguru Mar 17, 2023
963a37c
Update AutoflushSession to non-core
antiguru Mar 17, 2023
9bb7dbd
Replace InputHandle by core variant
antiguru Mar 17, 2023
48897dc
Replace UnorderedHandle by core variant
antiguru Mar 17, 2023
867d32d
Replace Kafka EventProducer by core variant
antiguru Mar 17, 2023
43bee87
Replace EventPusher by core variant
antiguru Mar 17, 2023
91a15f6
Replace Event by core variant
antiguru Mar 17, 2023
126787d
Replace EventLink by core variant
antiguru Mar 17, 2023
835e034
Replace EventConsumer by core variant
antiguru Mar 17, 2023
a42dfa8
Replace EventIterator by core variant
antiguru Mar 17, 2023
3eb06a8
Replace EventWriter by core variant
antiguru Mar 17, 2023
48dd610
Replace EventReader by core variant
antiguru Mar 17, 2023
586430d
Replace feedback Handle by core variant
antiguru Mar 17, 2023
ecb666f
Promote UnorderedInputCore to main variant
antiguru Mar 17, 2023
60ae900
InputHandle only supports inputting containers
antiguru Mar 17, 2023
5987d4c
Promote feedback core-variants
antiguru Mar 17, 2023
127fb98
Cleanup documentation
antiguru Mar 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions kafkaesque/src/bin/capture_recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use timely::dataflow::operators::capture::Replay;
use timely::dataflow::operators::Accumulate;

use rdkafka::config::ClientConfig;

use kafkaesque::EventConsumer;

fn main() {
Expand Down Expand Up @@ -31,7 +30,7 @@ fn main() {
.filter(|i| i % worker.peers() == worker.index())
.map(|i| {
let topic = format!("{}-{:?}", topic, i);
EventConsumer::<_,u64>::new(consumer_config.clone(), topic)
EventConsumer::<_, Vec<u64>>::new(consumer_config.clone(), topic)
})
.collect::<Vec<_>>();

Expand Down
3 changes: 1 addition & 2 deletions kafkaesque/src/bin/capture_send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use timely::dataflow::operators::ToStream;
use timely::dataflow::operators::capture::Capture;

use rdkafka::config::ClientConfig;

use kafkaesque::EventProducer;

fn main() {
Expand All @@ -20,7 +19,7 @@ fn main() {
.set("bootstrap.servers", brokers);

let topic = format!("{}-{:?}", topic, worker.index());
let producer = EventProducer::new(producer_config, topic);
let producer = EventProducer::<u64, Vec<u64>>::new(producer_config, topic);

worker.dataflow::<u64,_,_>(|scope|
(0 .. count)
Expand Down
11 changes: 5 additions & 6 deletions kafkaesque/src/kafka_source.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use timely::Data;
use timely::dataflow::{Scope, Stream};
use timely::dataflow::operators::Capability;
use timely::dataflow::operators::generic::OutputHandle;
use timely::dataflow::channels::pushers::Tee;

use rdkafka::Message;
use rdkafka::consumer::{ConsumerContext, BaseConsumer};
use timely::dataflow::channels::pushers::Tee;
use timely::dataflow::operators::generic::OutputHandle;

/// Constructs a stream of data from a Kafka consumer.
///
Expand Down Expand Up @@ -89,14 +88,14 @@ pub fn kafka_source<C, G, D, L>(
name: &str,
consumer: BaseConsumer<C>,
logic: L
) -> Stream<G, D>
) -> Stream<G, Vec<D>>
where
C: ConsumerContext+'static,
G: Scope,
D: Data,
L: Fn(&[u8],
&mut Capability<G::Timestamp>,
&mut OutputHandle<G::Timestamp, D, Tee<G::Timestamp, D>>) -> bool+'static,
&mut OutputHandle<G::Timestamp, Vec<D>, Tee<G::Timestamp, Vec<D>>>) -> bool+'static,
{
use timely::dataflow::operators::generic::source;
source(scope, name, move |capability, info| {
Expand Down Expand Up @@ -135,4 +134,4 @@ where
}

})
}
}
28 changes: 11 additions & 17 deletions kafkaesque/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicIsize, Ordering};

use abomonation::Abomonation;
use timely::dataflow::operators::capture::event::{EventCore, EventPusherCore, EventIteratorCore};
use timely::dataflow::operators::capture::event::{Event, EventPusher, EventIterator};

use rdkafka::Message;
use rdkafka::client::ClientContext;
Expand Down Expand Up @@ -37,18 +37,15 @@ impl OutstandingCounterContext {
}

/// A wrapper for `W: Write` implementing `EventPusher<T, D>`.
pub struct EventProducerCore<T, D> {
pub struct EventProducer<T, D> {
topic: String,
buffer: Vec<u8>,
producer: BaseProducer<OutstandingCounterContext>,
counter: Arc<AtomicIsize>,
phant: ::std::marker::PhantomData<(T,D)>,
}

/// [EventProducerCore] specialized to vector-based containers.
pub type EventProducer<T, D> = EventProducerCore<T, Vec<D>>;

impl<T, D> EventProducerCore<T, D> {
impl<T, D> EventProducer<T, D> {
/// Allocates a new `EventWriter` wrapping a supplied writer.
pub fn new(config: ClientConfig, topic: String) -> Self {
let counter = Arc::new(AtomicIsize::new(0));
Expand All @@ -65,8 +62,8 @@ impl<T, D> EventProducerCore<T, D> {
}
}

impl<T: Abomonation, D: Abomonation> EventPusherCore<T, D> for EventProducerCore<T, D> {
fn push(&mut self, event: EventCore<T, D>) {
impl<T: Abomonation, D: Abomonation> EventPusher<T, D> for EventProducer<T, D> {
fn push(&mut self, event: Event<T, D>) {
unsafe { ::abomonation::encode(&event, &mut self.buffer).expect("Encode failure"); }
// println!("sending {:?} bytes", self.buffer.len());
self.producer.send::<(),[u8]>(BaseRecord::to(self.topic.as_str()).payload(&self.buffer[..])).unwrap();
Expand All @@ -76,7 +73,7 @@ impl<T: Abomonation, D: Abomonation> EventPusherCore<T, D> for EventProducerCore
}
}

impl<T, D> Drop for EventProducerCore<T, D> {
impl<T, D> Drop for EventProducer<T, D> {
fn drop(&mut self) {
while self.counter.load(Ordering::SeqCst) > 0 {
self.producer.poll(std::time::Duration::from_millis(10));
Expand All @@ -85,16 +82,13 @@ impl<T, D> Drop for EventProducerCore<T, D> {
}

/// A Wrapper for `R: Read` implementing `EventIterator<T, D>`.
pub struct EventConsumerCore<T, D> {
pub struct EventConsumer<T, D> {
consumer: BaseConsumer<DefaultConsumerContext>,
buffer: Vec<u8>,
phant: ::std::marker::PhantomData<(T,D)>,
}

/// [EventConsumerCore] specialized to vector-based containers.
pub type EventConsumer<T, D> = EventConsumerCore<T, Vec<D>>;

impl<T, D> EventConsumerCore<T, D> {
impl<T, D> EventConsumer<T, D> {
/// Allocates a new `EventReader` wrapping a supplied reader.
pub fn new(config: ClientConfig, topic: String) -> Self {
println!("allocating consumer for topic {:?}", topic);
Expand All @@ -108,14 +102,14 @@ impl<T, D> EventConsumerCore<T, D> {
}
}

impl<T: Abomonation, D: Abomonation> EventIteratorCore<T, D> for EventConsumerCore<T, D> {
fn next(&mut self) -> Option<&EventCore<T, D>> {
impl<T: Abomonation, D: Abomonation> EventIterator<T, D> for EventConsumer<T, D> {
fn next(&mut self) -> Option<&Event<T, D>> {
if let Some(result) = self.consumer.poll(std::time::Duration::from_millis(0)) {
match result {
Ok(message) => {
self.buffer.clear();
self.buffer.extend_from_slice(message.payload().unwrap());
Some(unsafe { ::abomonation::decode::<EventCore<T,D>>(&mut self.buffer[..]).unwrap().0 })
Some(unsafe { ::abomonation::decode::<Event<T,D>>(&mut self.buffer[..]).unwrap().0 })
},
Err(err) => {
println!("KafkaConsumer error: {:?}", err);
Expand Down
2 changes: 1 addition & 1 deletion mdbook/src/chapter_2/chapter_2_1.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ fn main() {
// initializes and runs a timely dataflow.
timely::execute_from_args(std::env::args(), |worker| {

let mut input = InputHandle::<(), String>::new();
let mut input = InputHandle::<(), Vec<String>>::new();

// define a new dataflow
worker.dataflow(|scope| {
Expand Down
2 changes: 1 addition & 1 deletion mdbook/src/chapter_4/chapter_4_4.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ One nice aspect of `capture_into` is that it really does reveal everything that
At *its* core, `replay_into` takes some sequence of `Event<T, D>` items and reproduces the stream, as it was recorded. It is also fairly simple, and we can just look at its implementation as well:

```rust,ignore
fn replay_into<S: Scope<Timestamp=T>>(self, scope: &mut S) -> Stream<S, D>{
fn replay_into<S: Scope<Timestamp=T>>(self, scope: &mut S) -> Stream<S, Vec<D>>{

let mut builder = OperatorBuilder::new("Replay".to_owned(), scope.clone());
let (targets, stream) = builder.new_output();
Expand Down
7 changes: 1 addition & 6 deletions mdbook/src/chapter_5/chapter_5_3.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,10 @@ Many parts of Timely assume that data is organized into `Vec<T>`, i.e., batches
This abstractions works well for many cases but precludes some advanced techniques, such as transferring translated or columnar data between operators.
With the container abstraction, Timely specifies a minimal interface it requires tracking progress and provide data to operators.

## Core operators

In Timely, we provide a set of `Core` operators that are generic on the container type they can handle.
In most cases, the `Core` operators are a immediate generalization of their non-core variant, providing the semantically equivalent functionality.

## Limitations

A challenge when genericizing Timely operators is that all interfaces need to be declared independent of a concrete type, for example as part of a trait.
For this reason, Timely doesn't currently support operators that require knowledge of the elements of a container or how to partition a container, with the only exception being the `Vec` type.
For this reason, Timely doesn't currently support operators that require knowledge of the elements of a container, with the only exception being the `Vec` type.

## A custom container

Expand Down
2 changes: 1 addition & 1 deletion timely/examples/barrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ fn main() {
timely::execute_from_args(std::env::args().skip(2), move |worker| {

worker.dataflow(move |scope| {
let (handle, stream) = scope.feedback::<usize>(1);
let (handle, stream) = scope.feedback::<Vec<usize>>(1);
stream.unary_notify(
Pipeline,
"Barrier",
Expand Down
6 changes: 3 additions & 3 deletions timely/examples/bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ fn main() {
.to_stream(scope);

// define a loop variable, for the (node, worker) pairs.
let (handle, stream) = scope.feedback(1usize);
let (handle, stream) = scope.feedback::<Vec<_>>(1usize);

// use the stream of edges
graph.binary_notify(
Expand All @@ -58,7 +58,7 @@ fn main() {
// receive edges, start to sort them
input1.for_each(|time, data| {
notify.notify_at(time.retain());
edge_list.push(data.replace(Vec::new()));
edge_list.push(data.take());
});

// receive (node, worker) pairs, note any new ones.
Expand All @@ -68,7 +68,7 @@ fn main() {
notify.notify_at(time.retain());
Vec::new()
})
.push(data.replace(Vec::new()));
.push(data.take());
});

notify.for_each(|time, _num, _notify| {
Expand Down
2 changes: 1 addition & 1 deletion timely/examples/capture_recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ fn main() {
.collect::<Vec<_>>()
.into_iter()
.map(|l| l.incoming().next().unwrap().unwrap())
.map(|r| EventReader::<_,u64,_>::new(r))
.map(|r| EventReader::<_, Vec<u64>, _>::new(r))
.collect::<Vec<_>>();

worker.dataflow::<u64,_,_>(|scope| {
Expand Down
2 changes: 1 addition & 1 deletion timely/examples/distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ fn main() {
// initializes and runs a timely dataflow.
timely::execute_from_args(std::env::args(), |worker| {
let index = worker.index();
let mut input = InputHandle::new();
let mut input = InputHandle::<_, Vec<_>>::new();
let mut probe = ProbeHandle::new();

// create a new input, exchange data, and inspect its output
Expand Down
4 changes: 2 additions & 2 deletions timely/examples/hashjoin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ fn main() {
let index = worker.index();
let peers = worker.peers();

let mut input1 = InputHandle::new();
let mut input2 = InputHandle::new();
let mut input1 = InputHandle::<_, Vec<_>>::new();
let mut input2 = InputHandle::<_, Vec<_>>::new();
let mut probe = ProbeHandle::new();

worker.dataflow(|scope| {
Expand Down
2 changes: 1 addition & 1 deletion timely/examples/logging-recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ fn main() {
.collect::<Vec<_>>()
.into_iter()
.map(|l| l.incoming().next().unwrap().unwrap())
.map(|r| EventReader::<Duration,(Duration,TimelySetup,TimelyEvent),_>::new(r))
.map(|r| EventReader::<Duration, Vec<(Duration, TimelySetup, TimelyEvent)>, _>::new(r))
.collect::<Vec<_>>();

worker.dataflow(|scope| {
Expand Down
4 changes: 2 additions & 2 deletions timely/examples/pagerank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ fn main() {

timely::execute_from_args(std::env::args().skip(3), move |worker| {

let mut input = InputHandle::new();
let mut input = InputHandle::<_, Vec<_>>::new();
let mut probe = ProbeHandle::new();

worker.dataflow::<usize,_,_>(|scope| {
Expand All @@ -22,7 +22,7 @@ fn main() {
let edge_stream = input.to_stream(scope);

// create a new feedback stream, which will be changes to ranks.
let (handle, rank_stream) = scope.feedback(1);
let (handle, rank_stream) = scope.feedback::<Vec<_>>(1);

// bring edges and ranks together!
let changes = edge_stream.binary_frontier(
Expand Down
4 changes: 2 additions & 2 deletions timely/examples/unionfind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ trait UnionFind {
fn union_find(&self) -> Self;
}

impl<G: Scope> UnionFind for Stream<G, (usize, usize)> {
fn union_find(&self) -> Stream<G, (usize, usize)> {
impl<G: Scope> UnionFind for Stream<G, Vec<(usize, usize)>> {
fn union_find(&self) -> Stream<G, Vec<(usize, usize)>> {

self.unary(Pipeline, "UnionFind", |_,_| {

Expand Down
9 changes: 3 additions & 6 deletions timely/src/dataflow/channels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@ pub mod pullers;
pub mod pact;

/// The input to and output from timely dataflow communication channels.
pub type BundleCore<T, D> = crate::communication::Message<Message<T, D>>;

/// The input to and output from timely dataflow communication channels specialized to vectors.
pub type Bundle<T, D> = BundleCore<T, Vec<D>>;
pub type Bundle<T, D> = crate::communication::Message<Message<T, D>>;

/// A serializable representation of timestamped data.
#[derive(Clone, Abomonation, Serialize, Deserialize)]
Expand Down Expand Up @@ -46,11 +43,11 @@ impl<T, D: Container> Message<T, D> {
/// Forms a message, and pushes contents at `pusher`. Replaces `buffer` with what the pusher
/// leaves in place, or the container's default element.
#[inline]
pub fn push_at<P: Push<BundleCore<T, D>>>(buffer: &mut D, time: T, pusher: &mut P) {
pub fn push_at<P: Push<Bundle<T, D>>>(buffer: &mut D, time: T, pusher: &mut P) {

let data = ::std::mem::take(buffer);
let message = Message::new(time, data, 0, 0);
let mut bundle = Some(BundleCore::from_typed(message));
let mut bundle = Some(Bundle::from_typed(message));

pusher.push(&mut bundle);

Expand Down
Loading