Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow blocks wip #22

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft

Flow blocks wip #22

wants to merge 6 commits into from

Conversation

evren-okcu
Copy link
Contributor

Hi, I am stuck with the sort block implementation, in order to sort and compare the messages the messages should implement Eq and PartialOrd traits, i have trouble when I introduce these trait bounds, please check the execute method on the sort block. sort block

@evren-okcu evren-okcu marked this pull request as draft November 23, 2024 08:47
Comment on lines 76 to 101
// Wrap inputs and outputs in Arc for shared ownership
let input1 = Arc::new(self.input_1.clone());
let input2 = Arc::new(self.input_2.clone());
let output = Arc::new(self.output.clone());

// Spawn thread for input1
let input1_clone = Arc::clone(&input1);
let output_clone = Arc::clone(&output);
let handle1 = std::thread::spawn(move || {
while let Ok(Some(message)) = input1_clone.recv() {
output_clone.send(&message).unwrap();
}
});

// Spawn thread for input2
let input2_clone = Arc::clone(&input2);
let output_clone2 = Arc::clone(&output);
let handle2 = std::thread::spawn(move || {
while let Ok(Some(message)) = input2_clone.recv() {
output_clone2.send(&message).unwrap();
}
});

// Wait for both threads to finish
handle1.join().unwrap();
handle2.join().unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concatanate is:

[...] Concatenate processor, mentioned above: it accepts and outputs all the IPs from its first input port, followed by all the IPs from the second, and so on.

So this would rather be:

Suggested change
// Wrap inputs and outputs in Arc for shared ownership
let input1 = Arc::new(self.input_1.clone());
let input2 = Arc::new(self.input_2.clone());
let output = Arc::new(self.output.clone());
// Spawn thread for input1
let input1_clone = Arc::clone(&input1);
let output_clone = Arc::clone(&output);
let handle1 = std::thread::spawn(move || {
while let Ok(Some(message)) = input1_clone.recv() {
output_clone.send(&message).unwrap();
}
});
// Spawn thread for input2
let input2_clone = Arc::clone(&input2);
let output_clone2 = Arc::clone(&output);
let handle2 = std::thread::spawn(move || {
while let Ok(Some(message)) = input2_clone.recv() {
output_clone2.send(&message).unwrap();
}
});
// Wait for both threads to finish
handle1.join().unwrap();
handle2.join().unwrap();
while let Some(message) = self.input_1.recv()? {
self.output.send(&message)?;
}
while let Some(message) = self.input_2.recv()? {
self.output.send(&message)?;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

messages received from input_2 port would not be processed until input_1 receives one in this case?

Copy link
Contributor

@SamuelSarle SamuelSarle Nov 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, they would be concatenated, consuming each input port to the end before moving on. Receiving from both ports would fit a merge/join/collate block but it's also possible to just have two blocks with output ports send to the same input port of another block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're right, it looks like i've implemented 'merge' logic instead of concatenate:

merge
concat

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the following test fails with the suggested implementation. the splitter sends messages to the input 1 and 2 of concat in round robin approach. Concat's input 2 port receives multiple messages prior to receiving them. There seems to be an issue in the transport layer after the second message to the input 2 port. I think there is no buffer for the ports. It looks like the ports should be listened in parallel threads.

Copy link
Contributor Author

@evren-okcu evren-okcu Nov 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#[test]
    #[ignore = "requires stdin"]
    fn run_split_to_stdout() {
        //use super::*;
        use protoflow_core::SystemBuilding;
        if let Err(e) = System::run(|s| {
            let stdin = s.read_stdin();

            let split = s.split();
            s.connect(&stdin.output, &split.input);

            let concat = s.block(Concat::new(s.input(), s.input(), s.output()));

            s.connect(&split.output_1, &concat.input_1);
            s.connect(&split.output_2, &concat.input_2);

            let stdout_1 = s.write_stdout();
            s.connect(&concat.output, &stdout_1.input);
        }) {
            error!("{}", e)
        }
    }

/// ```
///
#[derive(Block, Clone)]
pub struct Sort<T: Message = Any> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prost_types::protobuf::Any isn't sortable until it's value field is decoded so I would suggest that this must become:

Suggested change
pub struct Sort<T: Message = Any> {
pub struct Sort<T: Message + PartialOrd = String> {

(default type = String optional)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had done this, the issue arises when you try to implement the BlockInstantiation trait for System.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this fine?:

    impl BlockInstantiation for FlowBlockConfig {
        fn instantiate(&self, system: &mut System) -> Box<dyn Block> {
            use super::SystemBuilding;
            use FlowBlockConfig::*;
            match self {
                // ...
                Sort { .. } => {
                    Box::new(super::Sort::<String>::new(system.input(), system.output()))
                }
                // ...
            }
        }
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think no, because you always instantiate the block with the String type. It could be any other proto type.

messages: Vec<T>,
}

impl<T: Message> Sort<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
impl<T: Message> Sort<T> {
impl<T: Message + PartialOrd> Sort<T> {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

depends on how we will provide the solution for the PartialOrd trait bound

}
}

impl<T: Message + 'static> Sort<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
impl<T: Message + 'static> Sort<T> {
impl<T: Message + PartialOrd + 'static> Sort<T> {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

depends on how we will provide the solution for the PartialOrd trait bound

}
}

impl<T: Message> Block for Sort<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
impl<T: Message> Block for Sort<T> {
impl<T: Message + PartialOrd> Block for Sort<T> {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

depends on how we will provide the solution for the PartialOrd trait bound

lib/protoflow-blocks/src/blocks/flow/replicate.rs Outdated Show resolved Hide resolved
}

#[cfg(test)]
pub mod split_tests {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for tests to be public.

Suggested change
pub mod split_tests {
mod concat_tests {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're right, fixed.

Comment on lines +80 to +92
// if let Some(x) = self.stop.recv()? {
// while let Some(message) = self.input.recv()? {
// if message == x {
// self.messages.sort_by(|x, y| x.partial_cmp(y).unwrap());
// for x in self.messages.iter() {
// self.output.send(x)?;
// }
// self.messages.clear();
// } else {
// self.messages.push(message);
// }
// }
// }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what the .unwrap_or() default value should be when the partial_cmp fails as Ordering::Equal may not be desired in some cases like:

let result = f64::NAN.partial_cmp(&1.0);
assert_eq!(result, None);

Means the output could have NAN's scattered around, e.g. vec![0.0, 1.0, f64::NAN, 2.0] instead of at the front or at the end. Although that's something the Sort block's user can work around with a Filter block or similar.

But overall to keep true to the one-input-port definition of sort, and this:

Because the Sort pattern does not start to output any data until all the incoming data has been received [...]

May I suggest this?:

Suggested change
// if let Some(x) = self.stop.recv()? {
// while let Some(message) = self.input.recv()? {
// if message == x {
// self.messages.sort_by(|x, y| x.partial_cmp(y).unwrap());
// for x in self.messages.iter() {
// self.output.send(x)?;
// }
// self.messages.clear();
// } else {
// self.messages.push(message);
// }
// }
// }
while let Some(message) = self.input.recv()? {
self.messages.push(message);
}
self.messages
.sort_by(|x, y| x.partial_cmp(y).unwrap_or(Ordering::Equal));
for message in self.messages.iter() {
self.output.send(message)?;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will depend on how we proceed with the sorting issue

Comment on lines +57 to +60
pub fn new(input: InputPort<T>, stop: InputPort<T>, output: OutputPort<T>) -> Self {
Self {
input,
stop,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub fn new(input: InputPort<T>, stop: InputPort<T>, output: OutputPort<T>) -> Self {
Self {
input,
stop,
pub fn new(input: InputPort<T>, output: OutputPort<T>) -> Self {
Self {
input,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use it to identify the stop message, which triggers sending the buffered messages to the output port, what would be the alternative solution in this case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the more natural behaviour would be that Sort block consumes its input to the end, sorts, and then sends on output port.
Assume that you're sorting integers, what is the "signal" value? Max int? Min int? 0? There's no reason why those can't also be valid data.

In any case you're mixing a signaling method inside the stream of data and at the very least it should not be comparing (commented implementation currently has if message == x) each of the data messages against the signal message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're right about "I think the more natural behaviour would be that Sort block consumes its input to the end, sorts, and then sends on output port.". that would be better

Comment on lines +44 to +46
#[input]
pub stop: InputPort<T>,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#[input]
pub stop: InputPort<T>,

Comment on lines +111 to +115
Sort { .. } => Box::new(super::Sort::new(
system.input_any(),
system.input(),
system.output(),
)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Sort { .. } => Box::new(super::Sort::new(
system.input_any(),
system.input(),
system.output(),
)),
Sort { .. } => {
Box::new(super::Sort::<String>::new(system.input(), system.output()))
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants