-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flow blocks wip #22
base: master
Are you sure you want to change the base?
Flow blocks wip #22
Conversation
// Wrap inputs and outputs in Arc for shared ownership | ||
let input1 = Arc::new(self.input_1.clone()); | ||
let input2 = Arc::new(self.input_2.clone()); | ||
let output = Arc::new(self.output.clone()); | ||
|
||
// Spawn thread for input1 | ||
let input1_clone = Arc::clone(&input1); | ||
let output_clone = Arc::clone(&output); | ||
let handle1 = std::thread::spawn(move || { | ||
while let Ok(Some(message)) = input1_clone.recv() { | ||
output_clone.send(&message).unwrap(); | ||
} | ||
}); | ||
|
||
// Spawn thread for input2 | ||
let input2_clone = Arc::clone(&input2); | ||
let output_clone2 = Arc::clone(&output); | ||
let handle2 = std::thread::spawn(move || { | ||
while let Ok(Some(message)) = input2_clone.recv() { | ||
output_clone2.send(&message).unwrap(); | ||
} | ||
}); | ||
|
||
// Wait for both threads to finish | ||
handle1.join().unwrap(); | ||
handle2.join().unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Concatanate is:
[...] Concatenate processor, mentioned above: it accepts and outputs all the IPs from its first input port, followed by all the IPs from the second, and so on.
So this would rather be:
// Wrap inputs and outputs in Arc for shared ownership | |
let input1 = Arc::new(self.input_1.clone()); | |
let input2 = Arc::new(self.input_2.clone()); | |
let output = Arc::new(self.output.clone()); | |
// Spawn thread for input1 | |
let input1_clone = Arc::clone(&input1); | |
let output_clone = Arc::clone(&output); | |
let handle1 = std::thread::spawn(move || { | |
while let Ok(Some(message)) = input1_clone.recv() { | |
output_clone.send(&message).unwrap(); | |
} | |
}); | |
// Spawn thread for input2 | |
let input2_clone = Arc::clone(&input2); | |
let output_clone2 = Arc::clone(&output); | |
let handle2 = std::thread::spawn(move || { | |
while let Ok(Some(message)) = input2_clone.recv() { | |
output_clone2.send(&message).unwrap(); | |
} | |
}); | |
// Wait for both threads to finish | |
handle1.join().unwrap(); | |
handle2.join().unwrap(); | |
while let Some(message) = self.input_1.recv()? { | |
self.output.send(&message)?; | |
} | |
while let Some(message) = self.input_2.recv()? { | |
self.output.send(&message)?; | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
messages received from input_2 port would not be processed until input_1 receives one in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, they would be concatenated, consuming each input port to the end before moving on. Receiving from both ports would fit a merge/join/collate
block but it's also possible to just have two blocks with output ports send to the same input port of another block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the following test fails with the suggested implementation. the splitter sends messages to the input 1 and 2 of concat in round robin approach. Concat's input 2 port receives multiple messages prior to receiving them. There seems to be an issue in the transport layer after the second message to the input 2 port. I think there is no buffer for the ports. It looks like the ports should be listened in parallel threads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#[test]
#[ignore = "requires stdin"]
fn run_split_to_stdout() {
//use super::*;
use protoflow_core::SystemBuilding;
if let Err(e) = System::run(|s| {
let stdin = s.read_stdin();
let split = s.split();
s.connect(&stdin.output, &split.input);
let concat = s.block(Concat::new(s.input(), s.input(), s.output()));
s.connect(&split.output_1, &concat.input_1);
s.connect(&split.output_2, &concat.input_2);
let stdout_1 = s.write_stdout();
s.connect(&concat.output, &stdout_1.input);
}) {
error!("{}", e)
}
}
/// ``` | ||
/// | ||
#[derive(Block, Clone)] | ||
pub struct Sort<T: Message = Any> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prost_types::protobuf::Any
isn't sortable until it's value
field is decoded so I would suggest that this must become:
pub struct Sort<T: Message = Any> { | |
pub struct Sort<T: Message + PartialOrd = String> { |
(default type = String
optional)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had done this, the issue arises when you try to implement the BlockInstantiation trait for System.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this fine?:
impl BlockInstantiation for FlowBlockConfig {
fn instantiate(&self, system: &mut System) -> Box<dyn Block> {
use super::SystemBuilding;
use FlowBlockConfig::*;
match self {
// ...
Sort { .. } => {
Box::new(super::Sort::<String>::new(system.input(), system.output()))
}
// ...
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think no, because you always instantiate the block with the String type. It could be any other proto type.
messages: Vec<T>, | ||
} | ||
|
||
impl<T: Message> Sort<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
impl<T: Message> Sort<T> { | |
impl<T: Message + PartialOrd> Sort<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
depends on how we will provide the solution for the PartialOrd trait bound
} | ||
} | ||
|
||
impl<T: Message + 'static> Sort<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
impl<T: Message + 'static> Sort<T> { | |
impl<T: Message + PartialOrd + 'static> Sort<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
depends on how we will provide the solution for the PartialOrd trait bound
} | ||
} | ||
|
||
impl<T: Message> Block for Sort<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
impl<T: Message> Block for Sort<T> { | |
impl<T: Message + PartialOrd> Block for Sort<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
depends on how we will provide the solution for the PartialOrd trait bound
} | ||
|
||
#[cfg(test)] | ||
pub mod split_tests { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for tests to be public.
pub mod split_tests { | |
mod concat_tests { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you're right, fixed.
// if let Some(x) = self.stop.recv()? { | ||
// while let Some(message) = self.input.recv()? { | ||
// if message == x { | ||
// self.messages.sort_by(|x, y| x.partial_cmp(y).unwrap()); | ||
// for x in self.messages.iter() { | ||
// self.output.send(x)?; | ||
// } | ||
// self.messages.clear(); | ||
// } else { | ||
// self.messages.push(message); | ||
// } | ||
// } | ||
// } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what the .unwrap_or()
default value should be when the partial_cmp
fails as Ordering::Equal
may not be desired in some cases like:
let result = f64::NAN.partial_cmp(&1.0);
assert_eq!(result, None);
Means the output could have NAN
's scattered around, e.g. vec![0.0, 1.0, f64::NAN, 2.0]
instead of at the front or at the end. Although that's something the Sort
block's user can work around with a Filter
block or similar.
But overall to keep true to the one-input-port definition of sort, and this:
Because the Sort pattern does not start to output any data until all the incoming data has been received [...]
May I suggest this?:
// if let Some(x) = self.stop.recv()? { | |
// while let Some(message) = self.input.recv()? { | |
// if message == x { | |
// self.messages.sort_by(|x, y| x.partial_cmp(y).unwrap()); | |
// for x in self.messages.iter() { | |
// self.output.send(x)?; | |
// } | |
// self.messages.clear(); | |
// } else { | |
// self.messages.push(message); | |
// } | |
// } | |
// } | |
while let Some(message) = self.input.recv()? { | |
self.messages.push(message); | |
} | |
self.messages | |
.sort_by(|x, y| x.partial_cmp(y).unwrap_or(Ordering::Equal)); | |
for message in self.messages.iter() { | |
self.output.send(message)?; | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will depend on how we proceed with the sorting issue
pub fn new(input: InputPort<T>, stop: InputPort<T>, output: OutputPort<T>) -> Self { | ||
Self { | ||
input, | ||
stop, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub fn new(input: InputPort<T>, stop: InputPort<T>, output: OutputPort<T>) -> Self { | |
Self { | |
input, | |
stop, | |
pub fn new(input: InputPort<T>, output: OutputPort<T>) -> Self { | |
Self { | |
input, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I use it to identify the stop message, which triggers sending the buffered messages to the output port, what would be the alternative solution in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the more natural behaviour would be that Sort
block consumes its input to the end, sorts, and then sends on output port.
Assume that you're sorting integers, what is the "signal" value? Max int? Min int? 0? There's no reason why those can't also be valid data.
In any case you're mixing a signaling method inside the stream of data and at the very least it should not be comparing (commented implementation currently has if message == x
) each of the data messages against the signal message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you're right about "I think the more natural behaviour would be that Sort block consumes its input to the end, sorts, and then sends on output port.". that would be better
#[input] | ||
pub stop: InputPort<T>, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#[input] | |
pub stop: InputPort<T>, |
Sort { .. } => Box::new(super::Sort::new( | ||
system.input_any(), | ||
system.input(), | ||
system.output(), | ||
)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sort { .. } => Box::new(super::Sort::new( | |
system.input_any(), | |
system.input(), | |
system.output(), | |
)), | |
Sort { .. } => { | |
Box::new(super::Sort::<String>::new(system.input(), system.output())) | |
} |
Hi, I am stuck with the sort block implementation, in order to sort and compare the messages the messages should implement Eq and PartialOrd traits, i have trouble when I introduce these trait bounds, please check the execute method on the sort block. sort block