-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Prototype: Directly call parent node's function #161
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,5 @@ | ||
[workspace] | ||
members = [ | ||
"wake", | ||
"prototype", | ||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
[package] | ||
name = "prototype" | ||
version = "0.1.0" | ||
edition = "2021" | ||
autoexamples = false | ||
|
||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | ||
|
||
[dependencies] | ||
concurrent-queue = "1.2.2" | ||
csv = "1.1" | ||
env_logger = "0.9.0" | ||
fixed-vec-deque = "0.1.9" | ||
generator = "0.6" | ||
getset = "0.1.2" | ||
itertools = "0.10" | ||
log = "0.4.14" | ||
nanoid = "0.4.0" | ||
quick-error = "2.0.1" | ||
rand = "0.8.5" | ||
rayon = "1.5.2" | ||
rustc-hash = "1.1.0" | ||
simple-error = "0.2.3" | ||
structopt = "0.3.26" | ||
uuid = { version = "0.8", features = ["v4"] } | ||
jemallocator = "0.3.2" | ||
glob = "0.3.0" | ||
alphanumeric-sort = "1.4.4" | ||
serde = { version = "1.0", features = ["derive"] } | ||
serde_json = "1.0.85" | ||
statrs = "0.16.0" | ||
|
||
[dev-dependencies] | ||
ctor = "0.1.21" | ||
env_logger = "0.9.0" | ||
criterion = "0.3.5" | ||
lazy_static = "1.4.0" | ||
regex = "1.6.0" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
mod node; | ||
mod operations; | ||
|
||
pub use node::*; | ||
pub use operations::*; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
use std::{sync::{Arc, RwLock}, cell::RefCell}; | ||
use getset::{Getters,Setters}; | ||
|
||
use crate::BufferedProcessor; | ||
|
||
#[derive(Getters, Setters)] | ||
pub struct ExecutionNode { | ||
parents: Vec<Arc<RwLock<ExecutionNode>>>, | ||
operation: Arc<RefCell<Box<dyn BufferedProcessor>>>, | ||
} | ||
|
||
unsafe impl Send for ExecutionNode {} | ||
unsafe impl Sync for ExecutionNode {} | ||
|
||
impl ExecutionNode { | ||
pub fn new(operation: Box<dyn BufferedProcessor>) -> Arc<RwLock<ExecutionNode>> { | ||
let execution_node = Arc::new(RwLock::new( | ||
ExecutionNode { | ||
parents: vec![], | ||
operation: Arc::new(RefCell::new(operation)), | ||
})); | ||
execution_node.as_ref().read().unwrap().operation.as_ref().borrow_mut().set_node(Arc::downgrade(&execution_node)); | ||
execution_node | ||
} | ||
|
||
pub fn get(&mut self) -> Option<i64> { | ||
todo!("User facing interface that allows to obtain the result for next partition"); | ||
} | ||
|
||
pub fn run(&mut self) { | ||
todo!("Run execution of all available partitions"); | ||
} | ||
|
||
// Node obtains ith output operation by invoking the operation's get_output_partition. | ||
pub fn get_output_partition(&self, partition_num: usize) -> Option<i64> { | ||
let result = self.operation.as_ref().borrow_mut().get_output_partition(partition_num); | ||
result | ||
} | ||
|
||
// Interface to obtain `partition_num` partition from the parent node at `seq_no`. | ||
// The operation panics if no such parent node is available else returns the partition. | ||
pub fn get_input_partition(&self, seq_no: usize, partition_num: usize) -> Option<i64> { | ||
if seq_no >= self.parents.len() { | ||
panic!("No parent node at seq_no: {}", seq_no); | ||
} else { | ||
let parent_node_rc = self.parents.get(seq_no).unwrap(); | ||
let result = parent_node_rc.as_ref().read().unwrap().get_output_partition(partition_num); | ||
result | ||
} | ||
} | ||
|
||
// Connect nodes through parent pointers. | ||
pub fn subscribe_to_node(&mut self, parent: &Arc<RwLock<ExecutionNode>>) { | ||
self.parents.push(parent.clone()); | ||
} | ||
|
||
pub fn get_all_results(&self) -> Vec<i64> { | ||
let mut count = 0; | ||
let mut output = vec![]; | ||
loop { | ||
let result = self.get_output_partition(count); | ||
match result { | ||
Some(x) => {output.push(x); count += 1;}, | ||
None => {break;} | ||
} | ||
} | ||
output | ||
} | ||
|
||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use std::thread; | ||
|
||
use crate::*; | ||
|
||
#[test] | ||
fn test_single_mapper_node() { | ||
let input_data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; | ||
let expected_add_result = vec![1, 3, 6, 10, 15, 21, 28, 36, 45, 55]; | ||
|
||
// Creating Operations | ||
let reader = Box::new(Reader::new(input_data)); | ||
let add = Box::new(Mapper::new(0, MapperOp::Add)); | ||
|
||
// Creating Nodes from these operations | ||
let reader_node = ExecutionNode::new(reader); | ||
let add_node = ExecutionNode::new(add); | ||
|
||
// Connecting the created nodes. | ||
add_node.as_ref().write().unwrap().subscribe_to_node(&reader_node); | ||
|
||
// Obtain the result from the final node in the execution graph one by one. | ||
let mut count = 0; | ||
loop { | ||
let result = add_node.as_ref().read().unwrap().get_output_partition(count); | ||
match result { | ||
Some(x) => { assert_eq!(x, *expected_add_result.get(count).unwrap()); }, | ||
None => { break; } | ||
} | ||
count += 1; | ||
} | ||
} | ||
|
||
#[test] | ||
fn test_multiple_nodes_in_parallel() { | ||
let input_data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; | ||
let expected_add_result = vec![1, 3, 6, 10, 15, 21, 28, 36, 45, 55]; | ||
let expected_mul_result = vec![1, 2, 6, 24, 120, 720, 5040, 40320, 362880, 3628800]; | ||
|
||
let reader = Box::new(Reader::new(input_data)); | ||
let add = Box::new(Mapper::new(0, MapperOp::Add)); | ||
let mul = Box::new(Mapper::new(1, MapperOp::Mul)); | ||
|
||
let reader_node = ExecutionNode::new(reader); | ||
let add_node = ExecutionNode::new(add); | ||
let mul_node = ExecutionNode::new(mul); | ||
|
||
// Connect the nodes. write() since mut. | ||
add_node.as_ref().write().unwrap().subscribe_to_node(&reader_node); | ||
mul_node.as_ref().write().unwrap().subscribe_to_node(&reader_node); | ||
|
||
let handle_1 = thread::spawn(move || { | ||
add_node.as_ref().read().unwrap().get_all_results() | ||
}); | ||
|
||
let handle_2 = thread::spawn(move || { | ||
mul_node.as_ref().read().unwrap().get_all_results() | ||
}); | ||
|
||
assert_eq!(handle_1.join().unwrap(), expected_add_result); | ||
assert_eq!(handle_2.join().unwrap(), expected_mul_result); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
use std::sync::{Weak, RwLock}; | ||
use std::convert::AsRef; | ||
|
||
use getset::{Getters, Setters}; | ||
|
||
use crate::ExecutionNode; | ||
|
||
pub trait BufferedProcessor { | ||
fn map(&mut self, input: i64) -> i64 { input } | ||
|
||
fn get_output_partition(&mut self, partition_num: usize) -> Option<i64>; | ||
|
||
fn set_node(&mut self, node: Weak<RwLock<ExecutionNode>>); | ||
} | ||
|
||
// Concrete operations that implement the BufferedProcessor trait. | ||
// Operation 1: Reader [] => This is a generator for input data. | ||
#[derive(Getters, Setters)] | ||
pub struct Reader { | ||
node: Option<Weak<RwLock<ExecutionNode>>>, | ||
input_data: Vec<i64>, | ||
} | ||
|
||
impl Reader { | ||
pub fn new(input_data: Vec<i64>) -> Self { | ||
Reader { | ||
node: None, | ||
input_data | ||
} | ||
} | ||
} | ||
|
||
impl BufferedProcessor for Reader { | ||
fn get_output_partition(&mut self, partition_num: usize) -> Option<i64> { | ||
// This operation takes the ith input_data (or file_name). | ||
// Returns the value read (or dataframe). | ||
let value = self.input_data.get(partition_num); | ||
if let Some(x) = value { | ||
Some(*x) | ||
} else { | ||
None | ||
} | ||
Comment on lines
+37
to
+42
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just return There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So, turns out |
||
} | ||
|
||
fn map(&mut self, input: i64) -> i64 { | ||
input | ||
} | ||
|
||
// Need to have this operation because `dyn BufferedProcessor` cannot do .node = <> | ||
fn set_node(&mut self, node: Weak<RwLock<ExecutionNode>>) { | ||
self.node = Some(node); | ||
} | ||
} | ||
|
||
pub enum MapperOp { | ||
Add, | ||
Mul | ||
} | ||
|
||
// Operation 2: Appender [] => Can have custom map implementations. | ||
#[derive(Getters, Setters)] | ||
pub struct Mapper { | ||
node: Option<Weak<RwLock<ExecutionNode>>>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can get rid of the Option. |
||
acc: i64, | ||
op: MapperOp, | ||
} | ||
|
||
impl Mapper { | ||
pub fn new(acc: i64, op : MapperOp) -> Self { | ||
Mapper { | ||
node: None, | ||
acc, | ||
op, | ||
} | ||
} | ||
} | ||
|
||
impl BufferedProcessor for Mapper { | ||
fn get_output_partition(&mut self, partition_num: usize) -> Option<i64> { | ||
match &self.node { | ||
Some(node) => { | ||
let execution_node = node.upgrade().unwrap(); | ||
let input_partition = execution_node.as_ref().read().unwrap().get_input_partition(0, partition_num); | ||
match input_partition { | ||
Some(a) => Some(self.map(a)), | ||
None => None | ||
} | ||
}, | ||
None => panic!("ExecutionNode not created!") | ||
} | ||
} | ||
|
||
fn map(&mut self, input: i64) -> i64 { | ||
match self.op { | ||
MapperOp::Add => { self.acc += input; }, | ||
MapperOp::Mul => { self.acc *= input; }, | ||
} | ||
self.acc | ||
} | ||
|
||
fn set_node(&mut self, node: Weak<RwLock<ExecutionNode>>) { | ||
self.node = Some(node); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
node: Weak<RwLock<ExecutionNode>>,
?Then we can check during upgrade like
match node.upgrade() {Some(...) => ..., None => ...}