Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prototype: Directly call parent node's function #161

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions deepola/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[workspace]
members = [
"wake",
"prototype",
]
38 changes: 38 additions & 0 deletions deepola/prototype/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
[package]
name = "prototype"
version = "0.1.0"
edition = "2021"
autoexamples = false

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
concurrent-queue = "1.2.2"
csv = "1.1"
env_logger = "0.9.0"
fixed-vec-deque = "0.1.9"
generator = "0.6"
getset = "0.1.2"
itertools = "0.10"
log = "0.4.14"
nanoid = "0.4.0"
quick-error = "2.0.1"
rand = "0.8.5"
rayon = "1.5.2"
rustc-hash = "1.1.0"
simple-error = "0.2.3"
structopt = "0.3.26"
uuid = { version = "0.8", features = ["v4"] }
jemallocator = "0.3.2"
glob = "0.3.0"
alphanumeric-sort = "1.4.4"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.85"
statrs = "0.16.0"

[dev-dependencies]
ctor = "0.1.21"
env_logger = "0.9.0"
criterion = "0.3.5"
lazy_static = "1.4.0"
regex = "1.6.0"
5 changes: 5 additions & 0 deletions deepola/prototype/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod node;
mod operations;

pub use node::*;
pub use operations::*;
136 changes: 136 additions & 0 deletions deepola/prototype/src/node.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
use std::{sync::{Arc, RwLock}, cell::RefCell};
use getset::{Getters,Setters};

use crate::BufferedProcessor;

#[derive(Getters, Setters)]
pub struct ExecutionNode {
parents: Vec<Arc<RwLock<ExecutionNode>>>,
operation: Arc<RefCell<Box<dyn BufferedProcessor>>>,
}

unsafe impl Send for ExecutionNode {}
unsafe impl Sync for ExecutionNode {}

impl ExecutionNode {
pub fn new(operation: Box<dyn BufferedProcessor>) -> Arc<RwLock<ExecutionNode>> {
let execution_node = Arc::new(RwLock::new(
ExecutionNode {
parents: vec![],
operation: Arc::new(RefCell::new(operation)),
}));
execution_node.as_ref().read().unwrap().operation.as_ref().borrow_mut().set_node(Arc::downgrade(&execution_node));
execution_node
}

pub fn get(&mut self) -> Option<i64> {
todo!("User facing interface that allows to obtain the result for next partition");
}

pub fn run(&mut self) {
todo!("Run execution of all available partitions");
}

// Node obtains ith output operation by invoking the operation's get_output_partition.
pub fn get_output_partition(&self, partition_num: usize) -> Option<i64> {
let result = self.operation.as_ref().borrow_mut().get_output_partition(partition_num);
result
}

// Interface to obtain `partition_num` partition from the parent node at `seq_no`.
// The operation panics if no such parent node is available else returns the partition.
pub fn get_input_partition(&self, seq_no: usize, partition_num: usize) -> Option<i64> {
if seq_no >= self.parents.len() {
panic!("No parent node at seq_no: {}", seq_no);
} else {
let parent_node_rc = self.parents.get(seq_no).unwrap();
let result = parent_node_rc.as_ref().read().unwrap().get_output_partition(partition_num);
result
}
}

// Connect nodes through parent pointers.
pub fn subscribe_to_node(&mut self, parent: &Arc<RwLock<ExecutionNode>>) {
self.parents.push(parent.clone());
}

pub fn get_all_results(&self) -> Vec<i64> {
let mut count = 0;
let mut output = vec![];
loop {
let result = self.get_output_partition(count);
match result {
Some(x) => {output.push(x); count += 1;},
None => {break;}
}
}
output
}

}

#[cfg(test)]
mod tests {
use std::thread;

use crate::*;

#[test]
fn test_single_mapper_node() {
let input_data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let expected_add_result = vec![1, 3, 6, 10, 15, 21, 28, 36, 45, 55];

// Creating Operations
let reader = Box::new(Reader::new(input_data));
let add = Box::new(Mapper::new(0, MapperOp::Add));

// Creating Nodes from these operations
let reader_node = ExecutionNode::new(reader);
let add_node = ExecutionNode::new(add);

// Connecting the created nodes.
add_node.as_ref().write().unwrap().subscribe_to_node(&reader_node);

// Obtain the result from the final node in the execution graph one by one.
let mut count = 0;
loop {
let result = add_node.as_ref().read().unwrap().get_output_partition(count);
match result {
Some(x) => { assert_eq!(x, *expected_add_result.get(count).unwrap()); },
None => { break; }
}
count += 1;
}
}

#[test]
fn test_multiple_nodes_in_parallel() {
let input_data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let expected_add_result = vec![1, 3, 6, 10, 15, 21, 28, 36, 45, 55];
let expected_mul_result = vec![1, 2, 6, 24, 120, 720, 5040, 40320, 362880, 3628800];

let reader = Box::new(Reader::new(input_data));
let add = Box::new(Mapper::new(0, MapperOp::Add));
let mul = Box::new(Mapper::new(1, MapperOp::Mul));

let reader_node = ExecutionNode::new(reader);
let add_node = ExecutionNode::new(add);
let mul_node = ExecutionNode::new(mul);

// Connect the nodes. write() since mut.
add_node.as_ref().write().unwrap().subscribe_to_node(&reader_node);
mul_node.as_ref().write().unwrap().subscribe_to_node(&reader_node);

let handle_1 = thread::spawn(move || {
add_node.as_ref().read().unwrap().get_all_results()
});

let handle_2 = thread::spawn(move || {
mul_node.as_ref().read().unwrap().get_all_results()
});

assert_eq!(handle_1.join().unwrap(), expected_add_result);
assert_eq!(handle_2.join().unwrap(), expected_mul_result);
}

}
104 changes: 104 additions & 0 deletions deepola/prototype/src/operations.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
use std::sync::{Weak, RwLock};
use std::convert::AsRef;

use getset::{Getters, Setters};

use crate::ExecutionNode;

pub trait BufferedProcessor {
fn map(&mut self, input: i64) -> i64 { input }

fn get_output_partition(&mut self, partition_num: usize) -> Option<i64>;

fn set_node(&mut self, node: Weak<RwLock<ExecutionNode>>);
}

// Concrete operations that implement the BufferedProcessor trait.
// Operation 1: Reader [] => This is a generator for input data.
#[derive(Getters, Setters)]
pub struct Reader {
node: Option<Weak<RwLock<ExecutionNode>>>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

node: Weak<RwLock<ExecutionNode>>, ?

Then we can check during upgrade like match node.upgrade() {Some(...) => ..., None => ...}

input_data: Vec<i64>,
}

impl Reader {
pub fn new(input_data: Vec<i64>) -> Self {
Reader {
node: None,
input_data
}
}
}

impl BufferedProcessor for Reader {
fn get_output_partition(&mut self, partition_num: usize) -> Option<i64> {
// This operation takes the ith input_data (or file_name).
// Returns the value read (or dataframe).
let value = self.input_data.get(partition_num);
if let Some(x) = value {
Some(*x)
} else {
None
}
Comment on lines +37 to +42
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just return self.input_data.get(partition_num) ?

Copy link
Collaborator Author

@nikhil96sher nikhil96sher Dec 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, turns out .get() on a Vec<Item> returns Option<&Item>.
Directly returning the value mismatches the signature (Option<i64> vs Option<&i64>). That's why did the reconstruction.
But there should be some better way.

}

fn map(&mut self, input: i64) -> i64 {
input
}

// Need to have this operation because `dyn BufferedProcessor` cannot do .node = <>
fn set_node(&mut self, node: Weak<RwLock<ExecutionNode>>) {
self.node = Some(node);
}
}

pub enum MapperOp {
Add,
Mul
}

// Operation 2: Appender [] => Can have custom map implementations.
#[derive(Getters, Setters)]
pub struct Mapper {
node: Option<Weak<RwLock<ExecutionNode>>>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

node: Weak<RwLock<ExecutionNode>>, ?

Copy link
Collaborator Author

@nikhil96sher nikhil96sher Dec 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can get rid of the Option.
I initially added it to first create operation without a node. Then create the execution node with this operation while updating the node in the operation.
But Weak itself should be good enough to support that.

acc: i64,
op: MapperOp,
}

impl Mapper {
pub fn new(acc: i64, op : MapperOp) -> Self {
Mapper {
node: None,
acc,
op,
}
}
}

impl BufferedProcessor for Mapper {
fn get_output_partition(&mut self, partition_num: usize) -> Option<i64> {
match &self.node {
Some(node) => {
let execution_node = node.upgrade().unwrap();
let input_partition = execution_node.as_ref().read().unwrap().get_input_partition(0, partition_num);
match input_partition {
Some(a) => Some(self.map(a)),
None => None
}
},
None => panic!("ExecutionNode not created!")
}
}

fn map(&mut self, input: i64) -> i64 {
match self.op {
MapperOp::Add => { self.acc += input; },
MapperOp::Mul => { self.acc *= input; },
}
self.acc
}

fn set_node(&mut self, node: Weak<RwLock<ExecutionNode>>) {
self.node = Some(node);
}
}