From cb657b8653e2fad9456f745fa53091dc27522083 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Thu, 31 Oct 2024 16:01:20 +0100 Subject: [PATCH] actually forward data --- crates/top/rerun/src/commands/entrypoint.rs | 51 ++++++++++++++++----- 1 file changed, 39 insertions(+), 12 deletions(-) diff --git a/crates/top/rerun/src/commands/entrypoint.rs b/crates/top/rerun/src/commands/entrypoint.rs index 6c3b6389907e..b21ca6cf74e9 100644 --- a/crates/top/rerun/src/commands/entrypoint.rs +++ b/crates/top/rerun/src/commands/entrypoint.rs @@ -3,6 +3,7 @@ use itertools::Itertools; use re_data_source::DataSource; use re_log_types::LogMsg; +use re_sdk::sink::LogSink; use re_smart_channel::{ReceiveSet, Receiver, SmartMessagePayload}; use crate::{commands::RrdCommands, CallSource}; @@ -656,7 +657,7 @@ fn run_impl( }; // Where do we get the data from? - let rx: Vec> = { + let rxs: Vec> = { let data_sources = args .url_or_paths .iter() @@ -694,11 +695,10 @@ fn run_impl( { // Check if there is already a viewer running and if so, send the data to it. use std::net::TcpStream; - let connect_addr = std::net::SocketAddr::new("127.0.0.1".parse().unwrap(), args.port); - if TcpStream::connect_timeout(&connect_addr, std::time::Duration::from_secs(1)).is_ok() - { + let addr = std::net::SocketAddr::new(re_sdk::default_server_addr().ip(), args.port); + if TcpStream::connect_timeout(&addr, std::time::Duration::from_secs(1)).is_ok() { re_log::info!( - addr = %connect_addr, + %addr, "A process is already listening at this address. Assuming it's a Rerun Viewer." ); is_another_viewer_running = true; @@ -719,15 +719,15 @@ fn run_impl( // Now what do we do with the data? if args.test_receive { - let rx = ReceiveSet::new(rx); + let rx = ReceiveSet::new(rxs); assert_receive_into_entity_db(&rx).map(|_db| ()) } else if let Some(rrd_path) = args.save { - let rx = ReceiveSet::new(rx); + let rx = ReceiveSet::new(rxs); Ok(stream_to_rrd_on_disk(&rx, &rrd_path.into())?) } else if args.serve { #[cfg(not(feature = "server"))] { - _ = (call_source, rx); + _ = (call_source, rxs); anyhow::bail!("Can't host server - rerun was not compiled with the 'server' feature"); } @@ -757,7 +757,7 @@ fn run_impl( // This is the server which the web viewer will talk to: let _ws_server = re_ws_comms::RerunServer::new( - ReceiveSet::new(rx), + ReceiveSet::new(rxs), &args.bind, args.ws_server_port, server_memory_limit, @@ -786,7 +786,34 @@ fn run_impl( return Ok(()); } } else if is_another_viewer_running { - re_log::info!("Another viewer is already running, streaming data to it."); + let addr = std::net::SocketAddr::new(re_sdk::default_server_addr().ip(), args.port); + re_log::info!(%addr, "Another viewer is already running, streaming data to it."); + + let sink = re_sdk::sink::TcpSink::new(addr, re_sdk::default_flush_timeout()); + + for rx in rxs { + while rx.is_connected() { + while let Ok(msg) = rx.recv() { + if let Some(log_msg) = msg.into_data() { + sink.send(log_msg); + } + } + } + } + + // TODO(cmc): This is what I would have normally done, but this never terminates for some + // reason. + // let rx = ReceiveSet::new(rxs); + // while rx.is_connected() { + // while let Ok(msg) = rx.recv() { + // if let Some(log_msg) = msg.into_data() { + // sink.send(log_msg); + // } + // } + // } + + sink.flush_blocking(); + Ok(()) } else { #[cfg(feature = "native_viewer")] @@ -799,7 +826,7 @@ fn run_impl( cc.egui_ctx.clone(), cc.storage, ); - for rx in rx { + for rx in rxs { app.add_receiver(rx); } app.set_profiler(profiler); @@ -814,7 +841,7 @@ fn run_impl( #[cfg(not(feature = "native_viewer"))] { - _ = (call_source, rx); + _ = (call_source, rxs); anyhow::bail!( "Can't start viewer - rerun was compiled without the 'native_viewer' feature" );