diff --git a/pipeless/Cargo.lock b/pipeless/Cargo.lock index a04e611..967b8b8 100644 --- a/pipeless/Cargo.lock +++ b/pipeless/Cargo.lock @@ -1374,7 +1374,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "pipeless-ai" -version = "1.1.2" +version = "1.1.3" dependencies = [ "clap", "env_logger", diff --git a/pipeless/Cargo.toml b/pipeless/Cargo.toml index 97bf08e..4eb5bfc 100644 --- a/pipeless/Cargo.toml +++ b/pipeless/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pipeless-ai" -version = "1.1.2" +version = "1.1.3" edition = "2021" authors = ["Miguel A. Cabrera Minagorri"] description = "An open-source computer vision framework to build and deploy applications in minutes" diff --git a/pipeless/src/config/adapters/rest.rs b/pipeless/src/config/adapters/rest.rs index 9f8c828..d00838e 100644 --- a/pipeless/src/config/adapters/rest.rs +++ b/pipeless/src/config/adapters/rest.rs @@ -53,10 +53,16 @@ async fn handle_add_stream( } let output_uri = stream.output_uri.clone(); { - streams_table.write() + let res = streams_table.write() .await - .add(pipeless::config::streams::StreamsTableEntry::new(input_uri, output_uri, frame_path)) - .expect("Error adding new stream to the table"); + .add(pipeless::config::streams::StreamsTableEntry::new(input_uri, output_uri, frame_path)); + + if let Err(err) = res { + return Ok(warp::reply::with_status( + warp::reply::json(&json!({"error": format!("Error adding new stream to the table: {}", err)})), + warp::http::StatusCode::INTERNAL_SERVER_ERROR, + )); + } } match dispatcher_sender.send(pipeless::dispatcher::DispatcherEvent::TableChange) { diff --git a/pipeless/src/config/video.rs b/pipeless/src/config/video.rs index acff913..6bc56b2 100644 --- a/pipeless/src/config/video.rs +++ b/pipeless/src/config/video.rs @@ -1,3 +1,20 @@ + +#[derive(Debug)] +pub struct VideoConfigError { + msg: String +} +impl VideoConfigError { + fn new(msg: &str) -> Self { + Self { msg: msg.to_owned() } + } +} +impl std::error::Error for VideoConfigError {} +impl std::fmt::Display for VideoConfigError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.msg.to_string()) + } +} + #[derive(Clone)] pub struct Video { /// This can be input or output video, it is generic @@ -6,7 +23,7 @@ pub struct Video { uri: String, } impl Video { - pub fn new(uri: String) -> Self { + pub fn new(uri: String) -> Result { let protocol: String; let location: String; if uri == "screen" { @@ -15,17 +32,18 @@ impl Video { location = String::from("screen"); } else if uri != "v4l2" { let uri_split: Vec<&str> = uri.split("://").collect(); - protocol = uri_split.get(0).expect("Unable to get protocol from URI").to_string(); - location = uri_split.get(1).expect("Unable to get location from URI. Ensure it contains the protocol followed by '//'.").to_string(); + protocol = uri_split.get(0).ok_or_else(|| { VideoConfigError::new("Unable to get protocol from URI") })?.to_string(); + location = uri_split.get(1) + .ok_or_else(|| { VideoConfigError::new("Unable to get location from URI. Ensure it contains the protocol followed by '://'. Example: file:///home/user/file.mp4") })?.to_string(); if protocol == "file" && !location.starts_with('/') { - panic!("When using files you should indicate an absolute path. Ensure your path is on the format file:///some/path (note there are 3 slashes)"); + panic!("When using files you should indicate an absolute path. Ensure your path is on the format file:///home/user/file.mp4 (note there are 3 slashes)"); } } else { protocol = String::from("v4l2"); location = String::from("v4l2"); } - Video { protocol, location, uri, } + Ok(Video { protocol, location, uri, }) } pub fn get_protocol(&self) -> &str { diff --git a/pipeless/src/data.rs b/pipeless/src/data.rs index 4859447..94506ef 100644 --- a/pipeless/src/data.rs +++ b/pipeless/src/data.rs @@ -69,15 +69,9 @@ impl RgbFrame { pub fn get_original_pixels(&self) -> &ndarray::Array3 { &self.original } - pub fn get_owned_original_pixels(&self) -> ndarray::Array3 { - self.original.to_owned() - } pub fn get_modified_pixels(&self) -> &ndarray::Array3 { &self.modified } - pub fn get_owned_modified_pixels(&self) -> ndarray::Array3 { - self.modified.to_owned() - } pub fn get_mutable_pixels(&mut self) -> ndarray::ArrayViewMut3 { self.modified.view_mut() } diff --git a/pipeless/src/dispatcher.rs b/pipeless/src/dispatcher.rs index 90e02db..db9069f 100644 --- a/pipeless/src/dispatcher.rs +++ b/pipeless/src/dispatcher.rs @@ -133,23 +133,34 @@ pub fn start( Ok(frame_path) => { info!("New stream entry detected, creating pipeline"); let new_pipeless_bus = pipeless::events::Bus::new(); - let new_manager = pipeless::pipeline::Manager::new( + let new_manager_result = pipeless::pipeline::Manager::new( input_uri, output_uri, frame_path, &new_pipeless_bus.get_sender(), dispatcher_event_sender.clone(), ); - new_manager.start(new_pipeless_bus, frame_path_executor_arc.clone()); - streams_table_guard.set_stream_pipeline( - entry.get_id(), - new_manager.get_pipeline_id().await - ).expect("Error adding new stream to the streams config table"); - let mut managers_map_guard = running_managers.write().await; - managers_map_guard.insert(new_manager.get_pipeline_id().await, new_manager); + match new_manager_result { + Ok(new_manager) => { + new_manager.start(new_pipeless_bus, frame_path_executor_arc.clone()); + if let Err(err) = streams_table_guard.set_stream_pipeline( + entry.get_id(), + new_manager.get_pipeline_id().await + ) { + error!("Error adding new stream to the streams config table: {}", err); + } + let mut managers_map_guard = running_managers.write().await; + managers_map_guard.insert(new_manager.get_pipeline_id().await, new_manager); + }, + Err(err) => { + error!("Unable to create new pipeline: {}. Rolling back streams configuration.", err.to_string()); + let removed = streams_table_guard.remove(entry.get_id()); + if removed.is_none() { warn!("Error rolling back table, entry not found.") }; + } + } }, Err(err) => { warn!("Rolling back streams table configuration due to error. Error: {}", err); - streams_table_guard.remove(entry.get_id()) - .expect("Error removing new stream from the streams config table"); + let removed = streams_table_guard.remove(entry.get_id()); + if removed.is_none() { warn!("Error rolling back table, entry not found.") }; } } } diff --git a/pipeless/src/gst/utils.rs b/pipeless/src/gst/utils.rs index 67bf432..f5d38dd 100644 --- a/pipeless/src/gst/utils.rs +++ b/pipeless/src/gst/utils.rs @@ -1,11 +1,14 @@ +use glib::BoolError; use gstreamer as gst; -use log::error; +use log::{error, warn}; -pub fn create_generic_component(ctype: &str, cname: &str) -> gst::Element { +pub fn create_generic_component(ctype: &str, cname: &str) -> Result { let component = gst::ElementFactory::make(ctype) .name(cname) - .build() - .expect(format!("Failed to create component {} of type {}", cname, ctype).as_str()); + .build().or_else(|err| { + error!("Failed to create component {} of type {}", cname, ctype); + Err(err) + }); component } @@ -94,9 +97,14 @@ pub fn tag_list_to_string(tag_list: &gst::TagList) -> String { if n_tag_values == 1 { if let Some(tag_value) = tag_list.index_generic(tag_name, 0) { match tag_value.get::() { - Ok(datetime) => - formatted_tags.push(format!("{}={}", tag_name, datetime.to_iso8601_string() - .expect("Unable to get ISO string from tag"))), + Ok(datetime) => { + let datetime_tag_res = datetime.to_iso8601_string(); + if let Ok(tag_value) = datetime_tag_res { + formatted_tags.push(format!("{}={}", tag_name, tag_value)) + } else { + warn!("Unable to get ISO string from tag"); + } + } Err(_) => formatted_tags.push(format!("{}={:?}", tag_name, tag_value)) } } @@ -105,10 +113,15 @@ pub fn tag_list_to_string(tag_list: &gst::TagList) -> String { .filter_map(|i| { tag_list.index_generic(tag_name, i).map(|tag_value| { match tag_value.get::() { - Ok(datetime) => - datetime.to_iso8601_string() - .expect("Unable to get ISO string from tag") - .to_string(), + Ok(datetime) => { + let datetime_tag_res = datetime.to_iso8601_string(); + if let Ok(tag_value) = datetime_tag_res { + tag_value.to_string() + } else { + warn!("Unable to get ISO string from tag"); + String::from("") + } + } Err(_) => format!("{:?}", tag_value) } }) diff --git a/pipeless/src/input/pipeline.rs b/pipeless/src/input/pipeline.rs index 45ebbdc..8df78f3 100644 --- a/pipeless/src/input/pipeline.rs +++ b/pipeless/src/input/pipeline.rs @@ -1,3 +1,4 @@ +use glib::BoolError; use log::{error, info, warn, debug}; use std; use std::str::FromStr; @@ -10,11 +11,32 @@ use ndarray; use crate as pipeless; #[derive(Debug)] -pub struct PipelineError; -impl std::error::Error for PipelineError {} -impl std::fmt::Display for PipelineError { +pub struct InputPipelineError { + msg: String +} +impl InputPipelineError { + fn new(msg: &str) -> Self { + Self { msg: msg.to_owned() } + } +} +impl std::error::Error for InputPipelineError {} +impl std::fmt::Display for InputPipelineError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "An error ocurred with the input pipeline") + write!(f, "{}", self.msg.to_string()) + } +} +impl From for InputPipelineError { + fn from(error: BoolError) -> Self { + Self { + msg: error.to_string(), + } + } +} +impl From for InputPipelineError { + fn from(error: pipeless::config::video::VideoConfigError) -> Self { + Self { + msg: error.to_string(), + } } } @@ -25,10 +47,9 @@ pub struct StreamDef { video: pipeless::config::video::Video, } impl StreamDef { - pub fn new(uri: String) -> Self { - Self { - video: pipeless::config::video::Video::new(uri) - } + pub fn new(uri: String) -> Result { + let video = pipeless::config::video::Video::new(uri)?; + Ok(Self { video }) } pub fn get_video(&self) -> &pipeless::config::video::Video { @@ -93,7 +114,10 @@ fn on_new_sample( let ndframe = ndarray::Array3::from_shape_vec( (height, width, channels), buffer_info.to_vec() - ).expect("Failed to create ndarray from buffer data"); + ).map_err(|err| { + error!("Failed to create ndarray from buffer data: {}", err.to_string()); + gst::FlowError::Error + })?; let frame = pipeless::data::Frame::new_rgb( ndframe, width, height, @@ -135,36 +159,38 @@ fn on_pad_added ( fn create_input_bin( uri: &str, pipeless_bus_sender: &tokio::sync::mpsc::UnboundedSender, -) -> gst::Bin { +) -> Result { let bin = gst::Bin::new(); if uri == "v4l2" { // Device webcam - let v4l2src = pipeless::gst::utils::create_generic_component("v4l2src", "v4l2src"); - let videoconvert = pipeless::gst::utils::create_generic_component("videoconvert", "videoconvert"); - let videoscale = pipeless::gst::utils::create_generic_component("videoscale", "videoscale"); + let v4l2src = pipeless::gst::utils::create_generic_component("v4l2src", "v4l2src")?; + let videoconvert = pipeless::gst::utils::create_generic_component("videoconvert", "videoconvert")?; + let videoscale = pipeless::gst::utils::create_generic_component("videoscale", "videoscale")?; // Webcam resolutions are not standard and we can't read the webcam caps, // force a hardcoded resolution so that we annouce a correct resolution to the output. let forced_size_str = "video/x-raw,width=1280,height=720"; - let forced_caps = gst::Caps::from_str(forced_size_str).expect("Unable to create caps from string"); + let forced_caps = gst::Caps::from_str(forced_size_str) + .map_err(|_| { InputPipelineError::new("Unable to create caps from string") })?; let capsfilter = gst::ElementFactory::make("capsfilter") .name("capsfilter") .property("caps", forced_caps) .build() - .expect("Failed to create capsfilter"); + .map_err(|_| { InputPipelineError::new("Failed to create capsfilter") })?; bin.add_many([&v4l2src, &videoconvert, &videoscale, &capsfilter]) - .expect("Unable to add elements to input bin"); + .map_err(|_| { InputPipelineError::new("Unable to add elements to input bin") })?; - v4l2src.link(&videoconvert).expect("Error linking v4l2src to videoconvert"); - videoconvert.link(&videoscale).expect("Error linking videoconvert to videoscale"); - videoscale.link(&capsfilter).expect("Error linking videoscale to capsfilter"); + v4l2src.link(&videoconvert).map_err(|_| { InputPipelineError::new("Error linking v4l2src to videoconvert") })?; + videoconvert.link(&videoscale).map_err(|_| { InputPipelineError::new("Error linking videoconvert to videoscale") })?; + videoscale.link(&capsfilter).map_err(|_| { InputPipelineError::new("Error linking videoscale to capsfilter") })?; // Create ghostpad to be able to plug other components to the bin let capsfilter_src_pad = capsfilter.static_pad("src") - .expect("Failed to create the pipeline. Unable to get capsfilter source pad."); + .ok_or_else(|| { InputPipelineError::new("Failed to create the pipeline. Unable to get capsfilter source pad.") })?; let ghostpath_src = gst::GhostPad::with_target(&capsfilter_src_pad) - .expect("Unable to create the ghost pad to link bin"); - bin.add_pad(&ghostpath_src).expect("Unable to add ghostpad to input bin"); + .map_err(|_| { InputPipelineError::new("Unable to create the ghost pad to link bin") })?; + bin.add_pad(&ghostpath_src) + .map_err(|_| { InputPipelineError::new("Unable to add ghostpad to input bin") })?; // v4l2src doesn't have caps property that we can handle. Notify the output about the new stream let forced_caps_str = format!("{},format=RGB,framerate=1/30", forced_size_str); @@ -174,37 +200,44 @@ fn create_input_bin( ); } else { // Use uridecodebin by default - let uridecodebin = pipeless::gst::utils::create_generic_component("uridecodebin3", "source"); - let videoconvert = pipeless::gst::utils::create_generic_component("videoconvert", "videoconvert"); + let uridecodebin = pipeless::gst::utils::create_generic_component("uridecodebin3", "source")?; + let videoconvert = pipeless::gst::utils::create_generic_component("videoconvert", "videoconvert")?; bin.add_many([&uridecodebin, &videoconvert]) - .expect("Unable to add elements to the input bin"); + .map_err(|_| { InputPipelineError::new("Unable to add elements to the input bin")})?; uridecodebin.set_property("uri", uri); // Uridecodebin uses dynamic linking (creates pads automatically for new detected streams) - let videoconvert_sink_pad = videoconvert.static_pad("sink").expect("Unable to get videoconvert pad"); - let link_new_pad_fn = move |pad: &gst::Pad| { + let videoconvert_sink_pad = videoconvert.static_pad("sink") + .ok_or_else(|| { InputPipelineError::new("Unable to get videoconvert pad") })?; + let link_new_pad_fn = move |pad: &gst::Pad| -> Result{ if !videoconvert_sink_pad.is_linked() { pad.link(&videoconvert_sink_pad) - .expect("Unable to link new pad to videoconvert sink pad"); + .map_err(|_| { InputPipelineError::new("Unable to link new pad to videoconvert sink pad") }) } else { warn!("Videoconvert pad already linked, skipping link."); + Ok(gst::PadLinkSuccess) } }; uridecodebin.connect_pad_added({ let pipeless_bus_sender = pipeless_bus_sender.clone(); move |_elem, pad| { - link_new_pad_fn(&pad); - //// Connect an async handler to the pad to be notified when caps are set - pad.add_probe( - gst::PadProbeType::EVENT_UPSTREAM, - { - let pipeless_bus_sender = pipeless_bus_sender.clone(); - move |pad: &gst::Pad, info: &mut gst::PadProbeInfo| { - on_pad_added(pad, info, &pipeless_bus_sender) - } - } - ); + let link_pad_res = link_new_pad_fn(&pad); + match link_pad_res { + Ok(_) => { + // Connect an async handler to the pad to be notified when caps are set + pad.add_probe( + gst::PadProbeType::EVENT_UPSTREAM, + { + let pipeless_bus_sender = pipeless_bus_sender.clone(); + move |pad: &gst::Pad, info: &mut gst::PadProbeInfo| { + on_pad_added(pad, info, &pipeless_bus_sender) + } + } + ); + }, + Err(err) => error!("{}", err) + } } }); @@ -212,15 +245,16 @@ fn create_input_bin( let videoconvert_src_pad = match videoconvert.static_pad("src") { Some(pad) => pad, None => { - panic!("Failed to create the pipeline. Unable to get videoconvert source pad."); + return Err(InputPipelineError::new("Failed to create the pipeline. Unable to get videoconvert source pad.")); } }; let ghostpath_src = gst::GhostPad::with_target(&videoconvert_src_pad) - .expect("Unable to create the ghost pad to link bin"); - bin.add_pad(&ghostpath_src).expect("Unable to add ghostpad to input bin"); + .map_err(|_| { InputPipelineError::new("Unable to create the ghost pad to link bin")})?; + bin.add_pad(&ghostpath_src) + .map_err(|_| { InputPipelineError::new("Unable to add ghostpad to input bin")})?; } - bin + Ok(bin) } fn on_bus_message( @@ -254,7 +288,7 @@ fn on_bus_message( // Communicate error pipeless::events::publish_input_stream_error_event_sync(pipeless_bus_sender, &err_msg); // Exit thread, thus glib pipeline mainloop. - panic!( + error!( "Error in input gst pipeline from element {}. Pipeline id: {}. Error: {}", err_src_name, pipeline_id, err_msg @@ -305,19 +339,20 @@ fn create_gst_pipeline( pipeless_pipeline_id: uuid::Uuid, input_uri: &str, pipeless_bus_sender: &tokio::sync::mpsc::UnboundedSender, -) -> gst::Pipeline { +) -> Result { let pipeline = gst::Pipeline::new(); - let input_bin = create_input_bin(input_uri, pipeless_bus_sender); + let input_bin = create_input_bin(input_uri, pipeless_bus_sender)?; // Force RGB output since workers process RGB - let sink_caps = gst::Caps::from_str("video/x-raw,format=RGB").expect("Unable to create caps from string"); + let sink_caps = gst::Caps::from_str("video/x-raw,format=RGB") + .map_err(|_| { InputPipelineError::new("Unable to create caps from string") })?; let appsink = gst::ElementFactory::make("appsink") .name("appsink") .property("emit-signals", true) .property("caps", sink_caps) .build() - .expect("Failed to create appsink") + .map_err(|_| { InputPipelineError::new("Failed to create appsink") })? .dynamic_cast::() - .expect("Unable to cast element to AppSink"); + .map_err(|_| { InputPipelineError::new("Unable to cast element to AppSink") })?; let appsink_callbacks = gst_app::AppSinkCallbacks::builder() .new_sample( @@ -333,18 +368,18 @@ fn create_gst_pipeline( }).build(); appsink.set_callbacks(appsink_callbacks); - pipeline.add(&input_bin).expect("Failed to add input bin to input pipeline"); - pipeline.add(&appsink).expect("Failed to add app sink to input pipeline"); + pipeline.add(&input_bin).map_err(|_| InputPipelineError::new("Failed to add input bin to input pipeline"))?; + pipeline.add(&appsink).map_err(|_| InputPipelineError::new("Failed to add app sink to input pipeline"))?; // Link static elements - input_bin.link(&appsink).expect("Error linking input bin to appsink"); + input_bin.link(&appsink).map_err(|_| InputPipelineError::new("Error linking input bin to appsink"))?; - pipeline + Ok(pipeline) } pub struct Pipeline { id: uuid::Uuid, // Id of the parent pipeline (the one that groups input and output) - stream: pipeless::input::pipeline::StreamDef, + _stream: pipeless::input::pipeline::StreamDef, gst_pipeline: gst::Pipeline, } impl Pipeline { @@ -352,17 +387,17 @@ impl Pipeline { id: uuid::Uuid, stream: pipeless::input::pipeline::StreamDef, pipeless_bus_sender: &tokio::sync::mpsc::UnboundedSender, - ) -> Self { + ) -> Result { let input_uri = stream.get_video().get_uri(); - let gst_pipeline = create_gst_pipeline(id, input_uri, pipeless_bus_sender); + let gst_pipeline = create_gst_pipeline(id, input_uri, pipeless_bus_sender)?; let pipeline = Pipeline { id, - stream, + _stream: stream, gst_pipeline, }; let bus = pipeline.gst_pipeline.bus() - .expect("Unable to get input gst pipeline bus"); + .ok_or_else(|| { InputPipelineError::new("Unable to get input gst pipeline bus") })?; bus.add_signal_watch(); let pipeline_id = pipeline.id.clone(); bus.connect_message( @@ -377,9 +412,9 @@ impl Pipeline { pipeline.gst_pipeline .set_state(gst::State::Playing) - .expect("Unable to start the input gst pipeline"); + .map_err(|_| { InputPipelineError::new("Unable to start the input gst pipeline") })?; - pipeline + Ok(pipeline) } pub fn get_pipeline_id(&self) -> uuid::Uuid { diff --git a/pipeless/src/kvs/store.rs b/pipeless/src/kvs/store.rs index 90a9444..77c50d0 100644 --- a/pipeless/src/kvs/store.rs +++ b/pipeless/src/kvs/store.rs @@ -14,7 +14,8 @@ struct LocalStore { impl LocalStore { fn new() -> Self { let db_path = "/tmp/.pipeless_kv_store"; - let db = sled::open(db_path).expect("Failed to open KV store"); + let db = sled::open(db_path) + .expect(&format!("Failed to open KV store. Ensure pipeless can write at {}", db_path)); Self { backend: db } } } @@ -46,6 +47,7 @@ impl StoreInterface for LocalStore { // TODO: setup Redis or any other distributed solution. // Important: Note that any type implementing StoreInterface must be thread safe +/* struct DistributedStore {} impl DistributedStore { fn new() -> Self { unimplemented!() } @@ -54,6 +56,7 @@ impl StoreInterface for DistributedStore { fn get(&self, key: &str) -> String { unimplemented!() } fn set(&self, key: &str, value: &str) { unimplemented!() } } +*/ lazy_static! { // TODO: Add support for distributed store do not hardcode the local one diff --git a/pipeless/src/output/pipeline.rs b/pipeless/src/output/pipeline.rs index 56bc8ab..771e0cd 100644 --- a/pipeless/src/output/pipeline.rs +++ b/pipeless/src/output/pipeline.rs @@ -1,18 +1,40 @@ use std; use std::str::FromStr; +use glib::BoolError; use gstreamer as gst; use gst::prelude::*; use gstreamer_app as gst_app; -use log::{info, error, warn, debug}; +use log::{info, warn, debug, error}; -use crate as pipeless; +use crate::{self as pipeless}; #[derive(Debug)] -pub struct PipelineError; -impl std::error::Error for PipelineError {} -impl std::fmt::Display for PipelineError { +pub struct OutputPipelineError { + msg: String +} +impl OutputPipelineError { + fn new(msg: &str) -> Self { + Self { msg: msg.to_owned() } + } +} +impl std::error::Error for OutputPipelineError {} +impl std::fmt::Display for OutputPipelineError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "An error ocurred with the output pipeline") + write!(f, "{}", self.msg.to_string()) + } +} +impl From for OutputPipelineError { + fn from(error: BoolError) -> Self { + Self { + msg: error.to_string(), + } + } +} +impl From for OutputPipelineError { + fn from(error: pipeless::config::video::VideoConfigError) -> Self { + Self { + msg: error.to_string(), + } } } @@ -25,11 +47,9 @@ pub struct StreamDef { initial_tags: Option, } impl StreamDef { - pub fn new(uri: String) -> Self { - StreamDef { - video: pipeless::config::video::Video::new(uri), - initial_tags: None, - } + pub fn new(uri: String) -> Result { + let video = pipeless::config::video::Video::new(uri)?; + Ok(Self { video, initial_tags: None }) } pub fn set_initial_tags(&mut self, tags: gst::TagList) { @@ -55,134 +75,134 @@ fn set_pipeline_tags(pipeline: &gst::Pipeline, new_tag_list: &gst::TagList) { }; } -fn create_processing_bin(stream: &StreamDef) -> gst::Bin { +fn create_processing_bin(stream: &StreamDef) -> Result { // The Pipeless output pipeline always receives RGB, so we only // worry about the output format let bin = gst::Bin::new(); if stream.video.get_protocol() == "file" { if stream.video.get_location().ends_with(".mp4") { let videoconvert = pipeless::gst::utils::create_generic_component( - "videoconvert", "videoconvert"); + "videoconvert", "videoconvert")?; let capsfilter = pipeless::gst::utils::create_generic_component( - "capsfilter", "capsfilter"); + "capsfilter", "capsfilter")?; let encoder = pipeless::gst::utils::create_generic_component( - "x264enc", "encoder"); + "x264enc", "encoder")?; let taginject = pipeless::gst::utils::create_generic_component( - "taginject", "taginject"); + "taginject", "taginject")?; let muxer = pipeless::gst::utils::create_generic_component( - "mp4mux", "muxer"); + "mp4mux", "muxer")?; bin.add_many([ &videoconvert, &capsfilter, &encoder, &taginject, &muxer - ]).expect("Unable to add components to processing bin"); + ])?; let caps = gst::Caps::from_str("video/x-raw,format=I420") - .expect("Unable to create caps from provided string"); + .map_err(|_| { OutputPipelineError::new("Unable to create caps from provided string") })?; capsfilter.set_property("caps", caps); videoconvert.link(&capsfilter) - .expect("Unable to link videoconvert to capsfilter"); + .map_err(|_| { OutputPipelineError::new("Unable to link videoconvert to capsfilter") })?; capsfilter.link(&encoder) - .expect("Unable to link capsfilter to encoder"); + .map_err(|_| { OutputPipelineError::new("Unable to link capsfilter to encoder") })?; encoder.link(&taginject) - .expect("Unable to link encoder to taginject"); + .map_err(|_| { OutputPipelineError::new("Unable to link encoder to taginject") })?; taginject.link(&muxer) - .expect("Unable to link taginject to muxer"); + .map_err(|_| { OutputPipelineError::new("Unable to link taginject to muxer") })?; // Ghost pads to be able to plug other components to the bin let videoconvert_sink_pad = videoconvert.static_pad("sink") - .expect("Failed to create the pipeline. Unable to get videoconvert sink pad."); + .ok_or_else(|| { OutputPipelineError::new("Failed to create the pipeline. Unable to get videoconvert sink pad.") })?; let ghostpath_sink = gst::GhostPad::with_target(&videoconvert_sink_pad) - .expect("Unable to create the sink ghost pad to link bin"); - bin.add_pad(&ghostpath_sink).expect("Unable to add sink pad"); + .map_err(|_| { OutputPipelineError::new("Unable to create the sink ghost pad to link bin") })?; + bin.add_pad(&ghostpath_sink).map_err(|_| { OutputPipelineError::new("Unable to add sink pad") })?; let muxer_src_pad = muxer.static_pad("src") - .expect("Failed to create the pipeline. Unable to get muxer source pad."); + .ok_or_else(|| { OutputPipelineError::new("Failed to create the pipeline. Unable to get muxer source pad.") })?; let ghostpath_src = gst::GhostPad::with_target(&muxer_src_pad) - .expect("Unable to create the ghost pad to link bin"); - bin.add_pad(&ghostpath_src).expect("Unable to add source pad"); + .map_err(|_| { OutputPipelineError::new("Unable to create the ghost pad to link bin") })?; + bin.add_pad(&ghostpath_src).map_err(|_| { OutputPipelineError::new("Unable to add source pad") })?; } else { - panic!("Unsupported file type. Currently supported output extensions: .mp4"); + return Err(OutputPipelineError::new("Unsupported file type. Currently supported output extensions: .mp4")); } } else if stream.video.get_protocol() == "rtmp" { let videoconvert = pipeless::gst::utils::create_generic_component( - "videoconvert", "videoconvert"); + "videoconvert", "videoconvert")?; let queue = pipeless::gst::utils::create_generic_component( - "queue", "queue"); + "queue", "queue")?; let encoder = pipeless::gst::utils::create_generic_component( - "x264enc", "encoder"); + "x264enc", "encoder")?; let taginject = pipeless::gst::utils::create_generic_component( - "taginject", "taginject"); + "taginject", "taginject")?; let muxer = pipeless::gst::utils::create_generic_component( - "flvmux", "muxer"); + "flvmux", "muxer")?; bin.add_many([ &videoconvert, &queue, &encoder, &taginject, &muxer - ]).expect("Unable to add elements to processing bin"); + ]).map_err(|_| { OutputPipelineError::new("Unable to add elements to processing bin") })?; muxer.set_property("streamable", true); videoconvert.link(&queue) - .expect("Unable to link videoconvert to queue"); + .map_err(|_| { OutputPipelineError::new("Unable to link videoconvert to queue") })?; queue.link(&encoder) - .expect("Unable to link queue to encoder"); + .map_err(|_| { OutputPipelineError::new("Unable to link queue to encoder") })?; encoder.link(&taginject) - .expect("Unable to link encoder to taginject"); + .map_err(|_| { OutputPipelineError::new("Unable to link encoder to taginject") })?; taginject.link(&muxer) - .expect("Unable to link taginject to muxer"); + .map_err(|_| { OutputPipelineError::new("Unable to link taginject to muxer") })?; // Ghost pads to be able to plug other components to the bin let videoconvert_sink_pad = videoconvert.static_pad("sink") - .expect("Failed to create the pipeline. Unable to get videoconvert sink pad."); + .ok_or_else(|| { OutputPipelineError::new("Failed to create the pipeline. Unable to get videoconvert sink pad.") })?; let ghostpath_sink = gst::GhostPad::with_target(&videoconvert_sink_pad) - .expect("Unable to create the sink ghost pad to link bin"); - bin.add_pad(&ghostpath_sink).expect("Unable to add ghostpad sink"); + .map_err(|_| { OutputPipelineError::new("Unable to create the sink ghost pad to link bin") })?; + bin.add_pad(&ghostpath_sink).map_err(|_| { OutputPipelineError::new("Unable to add ghostpad sink") })?; let muxer_src_pad = muxer.static_pad("src") - .expect("Failed to create the pipeline. Unable to get muxer source pad."); + .ok_or_else(|| { OutputPipelineError::new("Failed to create the pipeline. Unable to get muxer source pad.") })?; let ghostpath_src = gst::GhostPad::with_target(&muxer_src_pad) - .expect("Unable to create the ghost pad to link bin"); - bin.add_pad(&ghostpath_src).expect("Unable to add ghostpad source"); + .map_err(|_| { OutputPipelineError::new("Unable to create the ghost pad to link bin") })?; + bin.add_pad(&ghostpath_src).map_err(|_| { OutputPipelineError::new("Unable to add ghostpad source") })?; } else if stream.video.get_protocol() == "screen" { let queue1 = pipeless::gst::utils::create_generic_component( - "queue", "queue1"); + "queue", "queue1")?; let videoconvert = pipeless::gst::utils::create_generic_component( - "videoconvert", "videoconvert"); + "videoconvert", "videoconvert")?; let queue2 = pipeless::gst::utils::create_generic_component( - "queue", "queue2"); + "queue", "queue2")?; bin.add_many([&queue1, &videoconvert, &queue2]) - .expect("Unable to add elements to processing bin"); + .map_err(|_| { OutputPipelineError::new("Unable to add elements to processing bin") })?; queue1.link(&videoconvert) - .expect("Unable to link queue1 to videoconvert"); + .map_err(|_| { OutputPipelineError::new("Unable to link queue1 to videoconvert") })?; videoconvert.link(&queue2) - .expect("Unable to link videoconvert to queue2"); + .map_err(|_| { OutputPipelineError::new("Unable to link videoconvert to queue2") })?; // Ghost pads to be able to plug other components to the bin let queue1_sink_pad = queue1.static_pad("sink") - .expect("Failed to create the pipeline. Unable to get queue1 sink pad."); + .ok_or_else(|| { OutputPipelineError::new("Failed to create the pipeline. Unable to get queue1 sink pad.") })?; let ghostpath_sink = gst::GhostPad::with_target(&queue1_sink_pad) - .expect("Unable to create the sink ghost pad to link bin"); - bin.add_pad(&ghostpath_sink).expect("Unable to add ghost pad to processing bin"); + .map_err(|_| { OutputPipelineError::new("Unable to create the sink ghost pad to link bin") })?; + bin.add_pad(&ghostpath_sink).map_err(|_| { OutputPipelineError::new("Unable to add ghost pad to processing bin") })?; let queue2_src_pad = queue2.static_pad("src") - .expect("Failed to create the pipeline. Unable to get queue2 source pad."); + .ok_or_else(|| { OutputPipelineError::new("Failed to create the pipeline. Unable to get queue2 source pad.") })?; let ghostpath_src = gst::GhostPad::with_target(&queue2_src_pad) - .expect("Unable to create the ghost pad to link bin"); - bin.add_pad(&ghostpath_src).expect("Unable to add ghost pad to processing bin"); + .map_err(|_| { OutputPipelineError::new("Unable to create the ghost pad to link bin") })?; + bin.add_pad(&ghostpath_src).map_err(|_| { OutputPipelineError::new("Unable to add ghost pad to processing bin") })?; } else { - panic!("Unsupported output protocol. Please contact us if you think a new protocol is needed or feel free send us a GitHub PR adding it"); + return Err(OutputPipelineError::new("Unsupported output protocol. Please contact us if you think a new protocol is needed or feel free send us a GitHub PR adding it")); } - bin + Ok(bin) } -fn get_sink(sink_type: &str, location: Option<&str>) -> gst::Element { +fn get_sink(sink_type: &str, location: Option<&str>) -> Result { let sink = pipeless::gst::utils::create_generic_component( - sink_type, "sink"); + sink_type, "sink")?; if let Some(l) = location { sink.set_property("location", l); } - sink + Ok(sink) } -fn create_sink(stream: &StreamDef) -> gst::Element { +fn create_sink(stream: &StreamDef) -> Result { let location = stream.video.get_location(); return match stream.video.get_protocol() { // TODO: implement processing bin for all the below protocols @@ -223,7 +243,7 @@ fn on_bus_message( // Communicate error pipeless::events::publish_output_stream_error_event_sync(pipeless_bus_sender, &err_msg); // Exit the the output thread with the error. This will stop the mainloop. - panic!( + error!( "Error in output gst pipeline from element {}. Pipeline id: {}. Error: {}", err_src_name, pipeline_id, err_msg @@ -265,22 +285,22 @@ fn on_bus_message( fn create_gst_pipeline( output_stream_def: &StreamDef, caps: &str -) -> (gst::Pipeline, gst::BufferPool) { +) -> Result<(gst::Pipeline, gst::BufferPool), OutputPipelineError> { let pipeline = gst::Pipeline::new(); - let input_stream_caps = gst::Caps::from_str(caps).expect(format!( - "Unable to create caps from provide string {}", caps).as_ref()); + let input_stream_caps = gst::Caps::from_str(caps) + .map_err(|_| { OutputPipelineError::new(&format!("Unable to create caps from provide string {}", caps)) })?; let caps_structure = input_stream_caps.structure(0) - .expect("Unable to get structure from capabilities"); + .ok_or_else(|| { OutputPipelineError::new("Unable to get structure from capabilities") })?; let caps_width = pipeless::gst::utils::i32_from_caps_structure( &caps_structure, "width" - ).expect("Unable to get width from original input caps") as u32; + ).map_err(|_| { OutputPipelineError::new("Unable to get width from original input caps") })? as u32; let caps_height = pipeless::gst::utils::i32_from_caps_structure( &caps_structure, "height" - ).expect("Unable to get height from original input caps") as u32; + ).map_err(|_| { OutputPipelineError::new("Unable to get height from original input caps") })? as u32; let caps_framerate_fraction = pipeless::gst::utils::fraction_from_caps_structure( &caps_structure, "framerate" - ).expect("Unable to get framerate from original input caps to create output"); + ).map_err(|_| { OutputPipelineError::new("Unable to get framerate from original input caps to create output") })?; // The appsrc caps will be the caps from the input stream but in RGB format (produced by the workers) let appsrc_caps_str = format!( "video/x-raw,format=RGB,width={},height={},framerate={}/{}", @@ -288,10 +308,8 @@ fn create_gst_pipeline( caps_framerate_fraction.0, caps_framerate_fraction.1 ); let appsrc_caps = - gst::Caps::from_str(&appsrc_caps_str).expect(format!( - "Unable to create appsrc caps from {}", - appsrc_caps_str).as_ref() - ); + gst::Caps::from_str(&appsrc_caps_str) + .map_err(|_| { OutputPipelineError::new(&format!("Unable to create appsrc caps from {}", appsrc_caps_str)) })?; let appsrc = gst::ElementFactory::make("appsrc") .name("appsrc") @@ -301,19 +319,19 @@ fn create_gst_pipeline( .property("max-bytes", 500000000 as u64) // Queue size .property("caps", &appsrc_caps) .build() - .expect("Failed to create appsrc"); + .map_err(|_| { OutputPipelineError::new("Failed to create appsrc") })?; - let processing_bin = create_processing_bin(output_stream_def); - let sink = create_sink(output_stream_def); + let processing_bin = create_processing_bin(output_stream_def)?; + let sink = create_sink(output_stream_def)?; pipeline.add_many([&appsrc, &sink]) - .expect("Unable to add elements to the pipeline"); + .map_err(|_| { OutputPipelineError::new("Unable to add elements to the pipeline") })?; pipeline.add(&processing_bin) - .expect("Unable to add processing bin to the pipeline"); + .map_err(|_| { OutputPipelineError::new("Unable to add processing bin to the pipeline") })?; appsrc.link(&processing_bin) - .expect("Unable to link appsrc to processing bin"); + .map_err(|_| { OutputPipelineError::new("Unable to link appsrc to processing bin") })?; processing_bin.link(&sink) - .expect("Unable to link processing bin to sink"); + .map_err(|_| { OutputPipelineError::new("Unable to link processing bin to sink") })?; // The tags can be sent by the input before the output // pipeline is created @@ -328,10 +346,10 @@ fn create_gst_pipeline( let mut bufferpool_config = bufferpool.config(); let frame_size = caps_width * caps_height * 3; bufferpool_config.set_params(Some(&appsrc_caps), frame_size,0, 0); - bufferpool.set_config(bufferpool_config).expect("Unable to set bufferpool config"); - bufferpool.set_active(true).expect("Could not activate buffer pool"); + bufferpool.set_config(bufferpool_config).map_err(|_| { OutputPipelineError::new("Unable to set bufferpool config") })?; + bufferpool.set_active(true).map_err(|_| { OutputPipelineError::new("Could not activate buffer pool") })?; - (pipeline, bufferpool) + Ok((pipeline, bufferpool)) } pub struct Pipeline { @@ -348,8 +366,8 @@ impl Pipeline { stream: pipeless::output::pipeline::StreamDef, caps: &str, pipeless_bus_sender: &tokio::sync::mpsc::UnboundedSender, - ) -> Self { - let (gst_pipeline, buffer_pool) = create_gst_pipeline(&stream, caps); + ) -> Result { + let (gst_pipeline, buffer_pool) = create_gst_pipeline(&stream, caps)?; let pipeline = Pipeline { id, gst_pipeline, @@ -357,7 +375,7 @@ impl Pipeline { buffer_pool, }; let bus = pipeline.gst_pipeline.bus() - .expect("Unable to get output gst pipeline bus"); + .ok_or_else(|| { OutputPipelineError::new("Unable to get output gst pipeline bus") })?; bus.add_signal_watch(); let pipeline_id = pipeline.id.clone(); bus.connect_message( @@ -371,9 +389,9 @@ impl Pipeline { ); pipeline.gst_pipeline .set_state(gst::State::Playing) - .expect("Unable to start the output gst pipeline"); + .map_err(|_| { OutputPipelineError::new("Unable to start the output gst pipeline") })?; - pipeline + Ok(pipeline) } pub fn get_pipeline_id(&self) -> uuid::Uuid { @@ -386,34 +404,37 @@ impl Pipeline { /// Invoked by the pipeline manager when there is an EOS /// event on the input - pub fn on_eos(&self) { + pub fn on_eos(&self) -> Result<(), OutputPipelineError>{ let appsrc = self.gst_pipeline.by_name("appsrc") - .expect("Unable to get pipeline appsrc element") + .ok_or_else(|| { OutputPipelineError::new("Unable to get pipeline appsrc element") })? .dynamic_cast::() - .expect("Unable to cast element to AppSource"); + .map_err(|_| { OutputPipelineError::new("Unable to cast element to AppSource") })?; appsrc.end_of_stream() - .expect("Error sending EOS signal to output"); + .map_err(|_| { OutputPipelineError::new("Error sending EOS signal to output") })?; + + Ok(()) } - pub fn on_new_frame(&self, frame: pipeless::data::Frame) { + pub fn on_new_frame(&self, frame: pipeless::data::Frame) -> Result<(), OutputPipelineError>{ match frame { pipeless::data::Frame::RgbFrame(rgb_frame) => { let modified_pixels = rgb_frame.get_modified_pixels(); let out_frame_data = modified_pixels.as_slice() - .expect("Unable to get bytes data from RGB frame. Is your output image of the same shape as the input?"); + .ok_or_else(|| { OutputPipelineError::new("Unable to get bytes data from RGB frame. Is your output image of the same shape as the input?") })?; let appsrc = self.gst_pipeline.by_name("appsrc") - .expect("Unable to get pipeline appsrc element") + .ok_or_else(|| { OutputPipelineError::new("Unable to get pipeline appsrc element") })? .dynamic_cast::() - .expect("Unable to cast element to AppSource"); + .map_err(|_| { OutputPipelineError::new("Unable to cast element to AppSource") })?; let copy_timestamps = self.stream.get_video().get_protocol() != "screen"; // TODO: something we can do instead of having a buffer pool is to re-use the input gst buffer by storing it into the RgbFrame let mut gst_buffer = self.buffer_pool.acquire_buffer(None) - .expect("Unable to acquire buffer from pool"); + .map_err(|_| { OutputPipelineError::new("Unable to acquire buffer from pool") })?; - let gst_buffer_mut = gst_buffer.get_mut().expect("Unable to get mutable buffer"); + let gst_buffer_mut = gst_buffer.get_mut() + .ok_or_else(|| { OutputPipelineError::new("Unable to get mutable buffer") })?; // TODO: profile. Could this be faster by copying manually with rayon? // let data_slice = buffer_map.as_mut_slice(); @@ -422,7 +443,7 @@ impl Pipeline { // *byte = (*byte + 1) % 256; // }); gst_buffer_mut.copy_from_slice(0, out_frame_data) - .expect("Unable to copy slice into buffer"); + .map_err(|_| { OutputPipelineError::new("Unable to copy slice into buffer") })?; if copy_timestamps { let pts = rgb_frame.get_pts(); @@ -434,10 +455,12 @@ impl Pipeline { } if let Err(err) = appsrc.push_buffer(gst_buffer) { - error!("Failed to send the output buffer: {}", err); + return Err(OutputPipelineError::new(&format!("Failed to send the output buffer: {}", err))); } } } + + Ok(()) } pub fn on_new_tags(&self, new_tags: gst::TagList) { diff --git a/pipeless/src/pipeline.rs b/pipeless/src/pipeline.rs index 5be1457..213e7da 100644 --- a/pipeless/src/pipeline.rs +++ b/pipeless/src/pipeline.rs @@ -6,6 +6,31 @@ use log::{info, error, warn}; use crate as pipeless; +#[derive(Debug)] +pub struct PipelineError { + msg: String +} +impl std::error::Error for PipelineError {} +impl std::fmt::Display for PipelineError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.msg.to_string()) + } +} +impl From for PipelineError { + fn from(error: pipeless::input::pipeline::InputPipelineError) -> Self { + Self { + msg: error.to_string(), + } + } +} +impl From for PipelineError { + fn from(error: pipeless::output::pipeline::OutputPipelineError) -> Self { + Self { + msg: error.to_string(), + } + } +} + /// A Pipeless pipeline is an association of an input pipeline and an /// output pipeline, plus the stages the frames must pass through /// The input and output pipelines are handled via independent Gstreamer pipelines @@ -14,7 +39,7 @@ use crate as pipeless; /// avoiding inconsistences when a node fails in a cloud setup. struct Pipeline { id: uuid::Uuid, - input_stream_def: pipeless::input::pipeline::StreamDef, + _input_stream_def: pipeless::input::pipeline::StreamDef, output_stream_def: Option, input_pipeline: pipeless::input::pipeline::Pipeline, output_pipeline: Option, @@ -26,30 +51,30 @@ impl Pipeline { input_uri: String, output_uri: Option, frames_path: pipeless::stages::path::FramePath, - ) -> Self { + ) -> Result { let pipeline_id = uuid::Uuid::new_v4(); let input_stream_def = - pipeless::input::pipeline::StreamDef::new(input_uri.clone()); + pipeless::input::pipeline::StreamDef::new(input_uri.clone())?; let input_pipeline = pipeless::input::pipeline::Pipeline::new( pipeline_id, input_stream_def.clone(), pipeless_bus_sender, - ); + )?; - let output_stream_def = match output_uri { - Some(uri) => Some(pipeless::output::pipeline::StreamDef::new(uri)), - None => None, - }; + let mut output_stream_def = None; + if let Some(uri) = output_uri { + output_stream_def = Some(pipeless::output::pipeline::StreamDef::new(uri)?); + } - Pipeline { + Ok(Pipeline { id: pipeline_id, - input_stream_def, + _input_stream_def: input_stream_def, output_stream_def, input_pipeline, // The output pipeline can't be created until we have the input caps output_pipeline: None, frames_path, - } + }) } /// The output stream of a pipeline is created once we got the input capabilities @@ -58,7 +83,7 @@ impl Pipeline { &mut self, input_caps: String, pipeless_bus_sender: &tokio::sync::mpsc::UnboundedSender, - ) { + ) -> Result<(), pipeless::output::pipeline::OutputPipelineError> { if let Some(stream_def) = &self.output_stream_def { // TODO: build streamdefs within pipelines and pass the uri only let output_pipeline = @@ -67,9 +92,11 @@ impl Pipeline { stream_def.clone(), &input_caps, pipeless_bus_sender - ); + )?; self.output_pipeline = Some(output_pipeline); } + + Ok(()) } /// Close only stops the gst pipelines. It does not send EOS. @@ -113,15 +140,15 @@ impl Manager { // The bus needs to be created before the pipeline pipeless_bus_sender: &tokio::sync::mpsc::UnboundedSender, dispatcher_sender: tokio::sync::mpsc::UnboundedSender, - ) -> Self { + ) -> Result { let pipeline = Arc::new(RwLock::new(pipeless::pipeline::Pipeline::new( &pipeless_bus_sender, input_video_uri, output_video_uri, frames_path, - ))); + )?)); - Self {pipeline, dispatcher_sender } + Ok(Self {pipeline, dispatcher_sender }) } // Start takes ownership of self because we have to access the bus, @@ -171,7 +198,11 @@ impl Manager { if let Some(out_frame) = out_frame_opt { let read_guard = rw_pipeline.read().await; match &read_guard.output_pipeline { - Some(pipe) => pipe.on_new_frame(out_frame), + Some(pipe) => { + if let Err(err) = pipe.on_new_frame(out_frame) { + error!("{}", err); + } + } None => {} } } else { @@ -183,10 +214,12 @@ impl Manager { info!("New input caps. Creating output pipeline for caps: {}", caps); let mut write_guard = rw_pipeline.write().await; - write_guard.create_and_start_output_pipeline( + if let Err(err) = write_guard.create_and_start_output_pipeline( caps.to_string(), &pipeless_bus_sender - ); + ) { + error!("Error creating output: {}. The stream will be processed without the output", err); + }; } pipeless::events::Event::TagsChangeEvent(e) => { let tags = e.get_tags(); @@ -209,7 +242,21 @@ impl Manager { let write_guard = rw_pipeline.write().await; if let Some(out_pipe) = &write_guard.output_pipeline { info!("End of input stream reached. Pipeline id: {}", out_pipe.get_pipeline_id()); - out_pipe.on_eos(); + if let Err(err) = out_pipe.on_eos() { + error!("Error sending end of stream signal to output: {}", err); + } + } else { + // When there is no output, stop the stream as fast as the input EOS is reached + info!("End of stream reached for pipeline: {}", write_guard.input_pipeline.get_pipeline_id()); + if let Err(err) = dispatcher_sender + .send(pipeless::dispatcher::DispatcherEvent::PipelineFinished(write_guard.input_pipeline.get_pipeline_id())) { + warn!("Unable to send pipeline finished event to dispatcher. Error: {}", err); + }; + + // End the processing loop + if let Err(err) = end_signal.send(()).await { + error!("Error signaling stream event loop end: {}", err); + } } } } @@ -227,7 +274,9 @@ impl Manager { }; // End the processing loop - end_signal.send(()).await.expect("Error signaling stream event loop end"); + if let Err(err) = end_signal.send(()).await { + error!("Error signaling stream event loop end: {}", err); + } } pipeless::events::Event::InputStreamErrorEvent(e) => { let pipeline_id; @@ -246,7 +295,9 @@ impl Manager { }; // End the processing loop - end_signal.send(()).await.expect("Error signaling stream event loop end"); + if let Err(err) = end_signal.send(()).await { + error!("Error signaling stream event loop end: {}", err) + }; } pipeless::events::Event::OutputStreamErrorEvent(e) => { let pipeline_id; @@ -265,7 +316,9 @@ impl Manager { } // End the processing loop - end_signal.send(()).await.expect("Error signaling stream event loop end"); + if let Err(err) = end_signal.send(()).await { + error!("Error signaling stream event loop end: {}", err) + }; } } } diff --git a/pipeless/src/stages/languages/python.rs b/pipeless/src/stages/languages/python.rs index adb08ce..6608ce8 100644 --- a/pipeless/src/stages/languages/python.rs +++ b/pipeless/src/stages/languages/python.rs @@ -1,6 +1,6 @@ use log::{error, warn}; use pyo3::prelude::*; -use numpy::{self, ToPyArray}; +use numpy; use crate::{data::{RgbFrame, Frame}, stages::{hook::HookTrait, stage::ContextTrait}, stages::stage::Context, kvs::store}; @@ -29,8 +29,8 @@ impl IntoPy> for RgbFrame { fn into_py(self, py: Python) -> Py { let dict = pyo3::types::PyDict::new(py); dict.set_item("uuid", self.get_uuid().to_string()).unwrap(); - dict.set_item("original", numpy::PyArray3::from_owned_array(py, self.get_owned_original_pixels())).unwrap(); - dict.set_item("modified", numpy::PyArray3::from_owned_array(py, self.get_owned_modified_pixels())).unwrap(); + dict.set_item("original", numpy::PyArray3::from_array(py, self.get_original_pixels())).unwrap(); + dict.set_item("modified", numpy::PyArray3::from_array(py, self.get_modified_pixels())).unwrap(); dict.set_item("width", self.get_width()).unwrap(); dict.set_item("height", self.get_height()).unwrap(); dict.set_item("pts", self.get_pts().mseconds()).unwrap(); @@ -38,8 +38,8 @@ impl IntoPy> for RgbFrame { dict.set_item("duration", self.get_duration().mseconds()).unwrap(); dict.set_item("fps", self.get_fps()).unwrap(); dict.set_item("input_ts", self.get_input_ts()).unwrap(); - dict.set_item("inference_input", self.get_inference_input().to_owned().to_pyarray(py)).unwrap(); - dict.set_item("inference_output", self.get_inference_output().to_owned().to_pyarray(py)).unwrap(); + dict.set_item("inference_input", numpy::PyArrayDyn::from_array(py, self.get_inference_input())).unwrap(); + dict.set_item("inference_output", numpy::PyArrayDyn::from_array(py, self.get_inference_output())).unwrap(); dict.set_item("pipeline_id", self.get_pipeline_id().to_string()).unwrap(); dict.into() }