Skip to content

Commit

Permalink
Merge pull request #88 from pipeless-ai/errors
Browse files Browse the repository at this point in the history
fix: Improve error handling
  • Loading branch information
miguelaeh authored Nov 23, 2023
2 parents 785fcc8 + 7d10306 commit c31c857
Show file tree
Hide file tree
Showing 12 changed files with 385 additions and 229 deletions.
2 changes: 1 addition & 1 deletion pipeless/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pipeless/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pipeless-ai"
version = "1.1.2"
version = "1.1.3"
edition = "2021"
authors = ["Miguel A. Cabrera Minagorri"]
description = "An open-source computer vision framework to build and deploy applications in minutes"
Expand Down
12 changes: 9 additions & 3 deletions pipeless/src/config/adapters/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,16 @@ async fn handle_add_stream(
}
let output_uri = stream.output_uri.clone();
{
streams_table.write()
let res = streams_table.write()
.await
.add(pipeless::config::streams::StreamsTableEntry::new(input_uri, output_uri, frame_path))
.expect("Error adding new stream to the table");
.add(pipeless::config::streams::StreamsTableEntry::new(input_uri, output_uri, frame_path));

if let Err(err) = res {
return Ok(warp::reply::with_status(
warp::reply::json(&json!({"error": format!("Error adding new stream to the table: {}", err)})),
warp::http::StatusCode::INTERNAL_SERVER_ERROR,
));
}
}

match dispatcher_sender.send(pipeless::dispatcher::DispatcherEvent::TableChange) {
Expand Down
28 changes: 23 additions & 5 deletions pipeless/src/config/video.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@

#[derive(Debug)]
pub struct VideoConfigError {
msg: String
}
impl VideoConfigError {
fn new(msg: &str) -> Self {
Self { msg: msg.to_owned() }
}
}
impl std::error::Error for VideoConfigError {}
impl std::fmt::Display for VideoConfigError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.msg.to_string())
}
}

#[derive(Clone)]
pub struct Video {
/// This can be input or output video, it is generic
Expand All @@ -6,7 +23,7 @@ pub struct Video {
uri: String,
}
impl Video {
pub fn new(uri: String) -> Self {
pub fn new(uri: String) -> Result<Self, VideoConfigError> {
let protocol: String;
let location: String;
if uri == "screen" {
Expand All @@ -15,17 +32,18 @@ impl Video {
location = String::from("screen");
} else if uri != "v4l2" {
let uri_split: Vec<&str> = uri.split("://").collect();
protocol = uri_split.get(0).expect("Unable to get protocol from URI").to_string();
location = uri_split.get(1).expect("Unable to get location from URI. Ensure it contains the protocol followed by '//'.").to_string();
protocol = uri_split.get(0).ok_or_else(|| { VideoConfigError::new("Unable to get protocol from URI") })?.to_string();
location = uri_split.get(1)
.ok_or_else(|| { VideoConfigError::new("Unable to get location from URI. Ensure it contains the protocol followed by '://'. Example: file:///home/user/file.mp4") })?.to_string();
if protocol == "file" && !location.starts_with('/') {
panic!("When using files you should indicate an absolute path. Ensure your path is on the format file:///some/path (note there are 3 slashes)");
panic!("When using files you should indicate an absolute path. Ensure your path is on the format file:///home/user/file.mp4 (note there are 3 slashes)");
}
} else {
protocol = String::from("v4l2");
location = String::from("v4l2");
}

Video { protocol, location, uri, }
Ok(Video { protocol, location, uri, })
}

pub fn get_protocol(&self) -> &str {
Expand Down
6 changes: 0 additions & 6 deletions pipeless/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,9 @@ impl RgbFrame {
pub fn get_original_pixels(&self) -> &ndarray::Array3<u8> {
&self.original
}
pub fn get_owned_original_pixels(&self) -> ndarray::Array3<u8> {
self.original.to_owned()
}
pub fn get_modified_pixels(&self) -> &ndarray::Array3<u8> {
&self.modified
}
pub fn get_owned_modified_pixels(&self) -> ndarray::Array3<u8> {
self.modified.to_owned()
}
pub fn get_mutable_pixels(&mut self) -> ndarray::ArrayViewMut3<u8> {
self.modified.view_mut()
}
Expand Down
31 changes: 21 additions & 10 deletions pipeless/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,23 +133,34 @@ pub fn start(
Ok(frame_path) => {
info!("New stream entry detected, creating pipeline");
let new_pipeless_bus = pipeless::events::Bus::new();
let new_manager = pipeless::pipeline::Manager::new(
let new_manager_result = pipeless::pipeline::Manager::new(
input_uri, output_uri, frame_path,
&new_pipeless_bus.get_sender(),
dispatcher_event_sender.clone(),
);
new_manager.start(new_pipeless_bus, frame_path_executor_arc.clone());
streams_table_guard.set_stream_pipeline(
entry.get_id(),
new_manager.get_pipeline_id().await
).expect("Error adding new stream to the streams config table");
let mut managers_map_guard = running_managers.write().await;
managers_map_guard.insert(new_manager.get_pipeline_id().await, new_manager);
match new_manager_result {
Ok(new_manager) => {
new_manager.start(new_pipeless_bus, frame_path_executor_arc.clone());
if let Err(err) = streams_table_guard.set_stream_pipeline(
entry.get_id(),
new_manager.get_pipeline_id().await
) {
error!("Error adding new stream to the streams config table: {}", err);
}
let mut managers_map_guard = running_managers.write().await;
managers_map_guard.insert(new_manager.get_pipeline_id().await, new_manager);
},
Err(err) => {
error!("Unable to create new pipeline: {}. Rolling back streams configuration.", err.to_string());
let removed = streams_table_guard.remove(entry.get_id());
if removed.is_none() { warn!("Error rolling back table, entry not found.") };
}
}
},
Err(err) => {
warn!("Rolling back streams table configuration due to error. Error: {}", err);
streams_table_guard.remove(entry.get_id())
.expect("Error removing new stream from the streams config table");
let removed = streams_table_guard.remove(entry.get_id());
if removed.is_none() { warn!("Error rolling back table, entry not found.") };
}
}
}
Expand Down
35 changes: 24 additions & 11 deletions pipeless/src/gst/utils.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use glib::BoolError;
use gstreamer as gst;
use log::error;
use log::{error, warn};

pub fn create_generic_component(ctype: &str, cname: &str) -> gst::Element {
pub fn create_generic_component(ctype: &str, cname: &str) -> Result<gst::Element, BoolError> {
let component = gst::ElementFactory::make(ctype)
.name(cname)
.build()
.expect(format!("Failed to create component {} of type {}", cname, ctype).as_str());
.build().or_else(|err| {
error!("Failed to create component {} of type {}", cname, ctype);
Err(err)
});

component
}
Expand Down Expand Up @@ -94,9 +97,14 @@ pub fn tag_list_to_string(tag_list: &gst::TagList) -> String {
if n_tag_values == 1 {
if let Some(tag_value) = tag_list.index_generic(tag_name, 0) {
match tag_value.get::<gst::DateTime>() {
Ok(datetime) =>
formatted_tags.push(format!("{}={}", tag_name, datetime.to_iso8601_string()
.expect("Unable to get ISO string from tag"))),
Ok(datetime) => {
let datetime_tag_res = datetime.to_iso8601_string();
if let Ok(tag_value) = datetime_tag_res {
formatted_tags.push(format!("{}={}", tag_name, tag_value))
} else {
warn!("Unable to get ISO string from tag");
}
}
Err(_) => formatted_tags.push(format!("{}={:?}", tag_name, tag_value))
}
}
Expand All @@ -105,10 +113,15 @@ pub fn tag_list_to_string(tag_list: &gst::TagList) -> String {
.filter_map(|i| {
tag_list.index_generic(tag_name, i).map(|tag_value| {
match tag_value.get::<gst::DateTime>() {
Ok(datetime) =>
datetime.to_iso8601_string()
.expect("Unable to get ISO string from tag")
.to_string(),
Ok(datetime) => {
let datetime_tag_res = datetime.to_iso8601_string();
if let Ok(tag_value) = datetime_tag_res {
tag_value.to_string()
} else {
warn!("Unable to get ISO string from tag");
String::from("")
}
}
Err(_) => format!("{:?}", tag_value)
}
})
Expand Down
Loading

0 comments on commit c31c857

Please sign in to comment.