Skip to content

Commit

Permalink
Support for having multiple viewers open at the same time.
Browse files Browse the repository at this point in the history
  • Loading branch information
zrezke committed Feb 23, 2024
1 parent 215904e commit e8d290d
Show file tree
Hide file tree
Showing 9 changed files with 749 additions and 632 deletions.
1,128 changes: 584 additions & 544 deletions Cargo.lock

Large diffs are not rendered by default.

54 changes: 27 additions & 27 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,39 @@ include = ["../../LICENSE-APACHE", "../../LICENSE-MIT", "**/*.rs", "Cargo.toml"]
license = "MIT OR Apache-2.0"
repository = "https://github.com/rerun-io/rerun"
rust-version = "1.74"
version = "0.1.6"
version = "0.1.7"

[workspace.dependencies]
# When using alpha-release, always use exact version, e.g. `version = "=0.x.y-alpha.z"
# This is because we treat alpha-releases as incompatible, but semver doesn't.
# In particular: if we compile rerun 0.3.0-alpha.0 we only want it to use
# re_log_types 0.3.0-alpha.0, NOT 0.3.0-alpha.4 even though it is newer and semver-compatible.
re_sdk_comms = { path = "crates/re_sdk_comms", version = "0.1.6" }
re_analytics = { path = "crates/re_analytics", version = "0.1.6" }
re_arrow_store = { path = "crates/re_arrow_store", version = "0.1.6" }
re_build_build_info = { path = "crates/re_build_build_info", version = "0.1.6" }
re_build_info = { path = "crates/re_build_info", version = "0.1.6" }
re_build_web_viewer = { path = "crates/re_build_web_viewer", version = "0.1.6", default-features = false }
re_data_store = { path = "crates/re_data_store", version = "0.1.6" }
re_error = { path = "crates/re_error", version = "0.1.6" }
re_format = { path = "crates/re_format", version = "0.1.6" }
re_int_histogram = { path = "crates/re_int_histogram", version = "0.1.6" }
re_log = { path = "crates/re_log", version = "0.1.6" }
re_log_encoding = { path = "crates/re_log_encoding", version = "0.1.6" }
re_log_types = { path = "crates/re_log_types", version = "0.1.6" }
re_memory = { path = "crates/re_memory", version = "0.1.6" }
re_query = { path = "crates/re_query", version = "0.1.6" }
re_renderer = { path = "crates/re_renderer", version = "0.1.6", default-features = false }
re_sdk = { path = "crates/re_sdk", version = "0.1.6" }
re_smart_channel = { path = "crates/re_smart_channel", version = "0.1.6" }
re_string_interner = { path = "crates/re_string_interner", version = "0.1.6" }
re_tensor_ops = { path = "crates/re_tensor_ops", version = "0.1.6" }
re_tuid = { path = "crates/re_tuid", version = "0.1.6" }
re_ui = { path = "crates/re_ui", version = "0.1.6" }
re_viewer = { path = "crates/re_viewer", version = "0.1.6", default-features = false }
re_web_viewer_server = { path = "crates/re_web_viewer_server", version = "0.1.6" }
re_ws_comms = { path = "crates/re_ws_comms", version = "0.1.6" }
depthai-viewer = { path = "crates/rerun", version = "0.1.6" }
re_sdk_comms = { path = "crates/re_sdk_comms", version = "0.1.7" }
re_analytics = { path = "crates/re_analytics", version = "0.1.7" }
re_arrow_store = { path = "crates/re_arrow_store", version = "0.1.7" }
re_build_build_info = { path = "crates/re_build_build_info", version = "0.1.7" }
re_build_info = { path = "crates/re_build_info", version = "0.1.7" }
re_build_web_viewer = { path = "crates/re_build_web_viewer", version = "0.1.7", default-features = false }
re_data_store = { path = "crates/re_data_store", version = "0.1.7" }
re_error = { path = "crates/re_error", version = "0.1.7" }
re_format = { path = "crates/re_format", version = "0.1.7" }
re_int_histogram = { path = "crates/re_int_histogram", version = "0.1.7" }
re_log = { path = "crates/re_log", version = "0.1.7" }
re_log_encoding = { path = "crates/re_log_encoding", version = "0.1.7" }
re_log_types = { path = "crates/re_log_types", version = "0.1.7" }
re_memory = { path = "crates/re_memory", version = "0.1.7" }
re_query = { path = "crates/re_query", version = "0.1.7" }
re_renderer = { path = "crates/re_renderer", version = "0.1.7", default-features = false }
re_sdk = { path = "crates/re_sdk", version = "0.1.7" }
re_smart_channel = { path = "crates/re_smart_channel", version = "0.1.7" }
re_string_interner = { path = "crates/re_string_interner", version = "0.1.7" }
re_tensor_ops = { path = "crates/re_tensor_ops", version = "0.1.7" }
re_tuid = { path = "crates/re_tuid", version = "0.1.7" }
re_ui = { path = "crates/re_ui", version = "0.1.7" }
re_viewer = { path = "crates/re_viewer", version = "0.1.7", default-features = false }
re_web_viewer_server = { path = "crates/re_web_viewer_server", version = "0.1.7" }
re_ws_comms = { path = "crates/re_ws_comms", version = "0.1.7" }
depthai-viewer = { path = "crates/rerun", version = "0.1.7" }

ahash = "0.8"
anyhow = "1.0"
Expand Down
1 change: 1 addition & 0 deletions crates/re_viewer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ re_renderer = { workspace = true, default-features = false, features = [
"import-obj",
"serde",
] }
re_sdk_comms.workspace = true
re_smart_channel.workspace = true
re_tensor_ops.workspace = true
re_ui.workspace = true
Expand Down
66 changes: 50 additions & 16 deletions crates/re_viewer/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use re_renderer::WgpuResourcePoolStatistics;
use re_smart_channel::Receiver;
use re_ui::{toasts, Command};
use sentry;
use re_sdk_comms::DEFAULT_SERVER_PORT;

use crate::{
app_icon::setup_app_icon,
Expand Down Expand Up @@ -49,6 +50,7 @@ enum TimeControlCommand {
pub struct StartupOptions {
pub memory_limit: re_memory::MemoryLimit,
pub persist_state: bool,
pub sdk_port: u32,
}

#[derive(Clone, Default)]
Expand Down Expand Up @@ -119,28 +121,52 @@ pub struct App {
backend_environment: BackendEnvironment,

#[cfg(not(target_arch = "wasm32"))]
backend_handle: Option<std::process::Child>,
backend_handle: Option<(std::process::Child, u32)>,

#[cfg(not(target_arch = "wasm32"))]
dependency_installer: Option<DependencyInstaller>,
}

impl App {
#[cfg(not(target_arch = "wasm32"))]
fn spawn_backend(environment: &BackendEnvironment) -> Option<std::process::Child> {
fn spawn_backend(&self, environment: &BackendEnvironment) -> Option<(std::process::Child, u32)> {
// It is necessary to install the requirements before starting the backend

use re_smart_channel::Source;
let Some(site_packages_directory) = environment.venv_site_packages.clone() else {
return None;
};
const DEFAULT_BACKEND_PORT: u32 = 9001;
let mut port: u32=DEFAULT_BACKEND_PORT; // The default depthai_viewer._backend port
let max_retries = 100;
let mut port_found = false;
for _ in 0..max_retries {
if std::net::TcpListener::bind(format!("localhost:{port}")).is_ok() {
port_found = true;
break;
} else {
port+=1;
}
}
if !port_found {
re_log::warn!("Port probing was unsuccessful, using default port: {DEFAULT_BACKEND_PORT}");
}

let backend_handle = match std::process::Command::new(environment.python_path.clone())
.args(["-m", "depthai_viewer._backend.main"])
.args(["-m", "depthai_viewer._backend.main", "--port", &port.to_string(), "--sdk-port", &match &self.rx.source() {
Source::TcpServer { port } => *port,
Source::File { .. } => DEFAULT_SERVER_PORT,
Source::RrdHttpStream { .. } => DEFAULT_SERVER_PORT,
Source::RrdWebEventListener => DEFAULT_SERVER_PORT,
Source::Sdk => DEFAULT_SERVER_PORT,
Source::WsClient { .. } => DEFAULT_SERVER_PORT
}.to_string()])
.env("PYTHONPATH", site_packages_directory)
.spawn()
{
Ok(child) => {
println!("Backend started successfully.");
Some(child)
Some((child, port))
}
Err(err) => {
eprintln!("Failed to start depthai viewer backend: {err}.");
Expand Down Expand Up @@ -228,7 +254,7 @@ impl App {
#[cfg(not(target_arch = "wasm32"))]
backend_environment: backend_environment.clone(),
#[cfg(not(target_arch = "wasm32"))]
backend_handle: App::spawn_backend(&backend_environment),
backend_handle: None,
#[cfg(not(target_arch = "wasm32"))]
dependency_installer: None,
}
Expand Down Expand Up @@ -336,7 +362,7 @@ impl App {
#[cfg(not(target_arch = "wasm32"))]
Command::Quit => {
self.state.depthai_state.shutdown();
if let Some(backend_handle) = &mut self.backend_handle {
if let Some((backend_handle, _)) = &mut self.backend_handle {
let _ = backend_handle.kill();
}
_frame.close();
Expand Down Expand Up @@ -503,7 +529,7 @@ impl eframe::App for App {
#[cfg(not(target_arch = "wasm32"))]
fn on_close_event(&mut self) -> bool {
self.state.depthai_state.shutdown();
if let Some(backend_handle) = &mut self.backend_handle {
if let Some((backend_handle, _)) = &mut self.backend_handle {
let _ = backend_handle.kill();
}
true
Expand Down Expand Up @@ -546,20 +572,28 @@ impl eframe::App for App {
dependency_installer.update();
}
match &mut self.backend_handle {
Some(handle) => match handle.try_wait() {
Some((handle, port)) => match handle.try_wait() {
Ok(status) => {
if status.is_some() {
let _ = handle.kill(); // It will only Err in case the process is already dead (which is what we want anyway)
self.state.depthai_state.reset();
re_log::debug!("Backend process has exited, restarting!");
self.backend_handle = App::spawn_backend(&self.backend_environment);
match status {
Some(_) => {
let _ = handle.kill(); // It will only Err in case the process is already dead (which is what we want anyway)
self.state.depthai_state.reset();
re_log::debug!("Backend process has exited, restarting!");
self.backend_handle = self.spawn_backend(&self.backend_environment);
}
None => {
if !self.state.depthai_state.backend_comms.ws.is_initialized() {
self.state.depthai_state.backend_comms.ws.connect(*port);
}
}
}
}
Err(_) => {}
Err(_) => {
}
},
None => {
if self.backend_environment.are_requirements_installed() {
self.backend_handle = App::spawn_backend(&self.backend_environment);
self.backend_handle = self.spawn_backend(&self.backend_environment);
} else {
re_log::debug!(
"Backend requirements not installed, starting dependency installer!"
Expand All @@ -583,7 +617,7 @@ impl eframe::App for App {
self.state.depthai_state.shutdown();
#[cfg(not(target_arch = "wasm32"))]
{
if let Some(backend_handle) = &mut self.backend_handle {
if let Some((backend_handle, _)) = &mut self.backend_handle {
let _ = backend_handle.kill();
}
frame.close();
Expand Down
100 changes: 67 additions & 33 deletions crates/re_viewer/src/depthai/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::sync::Arc;
use super::depthai;

async fn spawn_ws_client(
port: u32,
recv_tx: crossbeam_channel::Sender<WsMessage>,
send_rx: crossbeam_channel::Receiver<WsMessage>,
shutdown: Arc<AtomicBool>,
Expand All @@ -22,7 +23,7 @@ async fn spawn_ws_client(
let error_tx = error_tx.clone();
let connected = connected.clone();
if let Ok(sender) = ewebsock::ws_connect(
String::from("ws://localhost:9001"),
String::from(format!("ws://localhost:{port}")),
Box::new(move |event| {
match event {
WsEvent::Opened => {
Expand Down Expand Up @@ -179,22 +180,31 @@ impl Default for BackWsMessage {
}
}

pub struct WebSocket {
struct WsInner {
receiver: crossbeam_channel::Receiver<WsMessage>,
sender: crossbeam_channel::Sender<WsMessage>,
shutdown: Arc<AtomicBool>,
task: tokio::task::JoinHandle<()>,
connected: Arc<AtomicBool>,
}

pub struct WebSocket {
inner: Option<WsInner>,
}

impl Default for WebSocket {
fn default() -> Self {
Self::new()
Self {
inner: None
}
}
}

impl WebSocket {
pub fn new() -> Self {

pub fn is_initialized(&self) -> bool {self.inner.is_some()}

pub fn connect(&mut self, port: u32) {
re_log::debug!("Creating websocket client");
let (recv_tx, recv_rx) = crossbeam_channel::unbounded();
let (send_tx, send_rx) = crossbeam_channel::unbounded();
Expand All @@ -206,6 +216,7 @@ impl WebSocket {
if let Ok(handle) = tokio::runtime::Handle::try_current() {
re_log::debug!("Using current tokio runtime");
task = handle.spawn(spawn_ws_client(
port,
recv_tx,
send_rx,
shutdown_clone,
Expand All @@ -217,58 +228,81 @@ impl WebSocket {
.build()
.unwrap()
.spawn(spawn_ws_client(
port,
recv_tx,
send_rx,
shutdown_clone,
connected_clone,
));
}
Self {
receiver: recv_rx,
sender: send_tx,
shutdown,
task,
connected,
}

self.inner = Some(
WsInner {
receiver: recv_rx,
sender: send_tx,
shutdown,
task,
connected
}
);
}

pub fn is_connected(&self) -> bool {
self.connected.load(std::sync::atomic::Ordering::SeqCst)
match &self.inner {
Some(ws_state) => {
ws_state.connected.load(std::sync::atomic::Ordering::SeqCst)
},
None => false
}
}

pub fn shutdown(&mut self) {
self.shutdown
.store(true, std::sync::atomic::Ordering::SeqCst);
match &mut self.inner {
Some(ws_state) => ws_state.shutdown.store(true, std::sync::atomic::Ordering::SeqCst),
None => ()
}
}

pub fn receive(&self) -> Option<BackWsMessage> {
if let Ok(message) = self.receiver.try_recv() {
match message {
WsMessage::Text(text) => {
re_log::debug!("Received: {:?}", text);
match serde_json::from_str::<BackWsMessage>(&text.as_str()) {
Ok(back_message) => {
return Some(back_message);
match &self.inner {
Some(ws_state) => {
if let Ok(message) = ws_state.receiver.try_recv() {
match message {
WsMessage::Text(text) => {
re_log::debug!("Received: {:?}", text);
match serde_json::from_str::<BackWsMessage>(&text.as_str()) {
Ok(back_message) => {
return Some(back_message);
}
Err(err) => {
re_log::error!("Error: {:}", err);
return None;
}
}
}
Err(err) => {
re_log::error!("Error: {:}", err);
_ => {
return None;
}
}
} else {
None
}
_ => {
return None;
}
}
},
None => None
}
None
}

pub fn send(&self, message: String) {
self.sender.send(WsMessage::Text(message));
// TODO(filip): This is a hotfix for the websocket not sending the message
// This makes the websocket actually send the previous msg
// It has to be something related to tokio::spawn, because it works fine when just running in the current thread
self.sender.send(WsMessage::Text("".to_string()));
match &self.inner {
Some(ws_state) => {
ws_state.sender.send(WsMessage::Text(message));
// TODO(filip): This is a hotfix for the websocket not sending the message
// This makes the websocket actually send the previous msg
// It has to be something related to tokio::spawn, because it works fine when just running in the current thread
ws_state.sender.send(WsMessage::Text("".to_string()));
},
None => ()
}

}
}
Loading

0 comments on commit e8d290d

Please sign in to comment.