Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rerun auto-forwarding: actual forwarding #7958

Merged
merged 1 commit into from
Oct 31, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 39 additions & 12 deletions crates/top/rerun/src/commands/entrypoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use itertools::Itertools;

use re_data_source::DataSource;
use re_log_types::LogMsg;
use re_sdk::sink::LogSink;
use re_smart_channel::{ReceiveSet, Receiver, SmartMessagePayload};

use crate::{commands::RrdCommands, CallSource};
Expand Down Expand Up @@ -656,7 +657,7 @@ fn run_impl(
};

// Where do we get the data from?
let rx: Vec<Receiver<LogMsg>> = {
let rxs: Vec<Receiver<LogMsg>> = {
let data_sources = args
.url_or_paths
.iter()
Expand Down Expand Up @@ -694,11 +695,10 @@ fn run_impl(
{
// Check if there is already a viewer running and if so, send the data to it.
use std::net::TcpStream;
let connect_addr = std::net::SocketAddr::new("127.0.0.1".parse().unwrap(), args.port);
if TcpStream::connect_timeout(&connect_addr, std::time::Duration::from_secs(1)).is_ok()
{
let addr = std::net::SocketAddr::new(re_sdk::default_server_addr().ip(), args.port);
if TcpStream::connect_timeout(&addr, std::time::Duration::from_secs(1)).is_ok() {
re_log::info!(
addr = %connect_addr,
%addr,
"A process is already listening at this address. Assuming it's a Rerun Viewer."
);
is_another_viewer_running = true;
Expand All @@ -719,15 +719,15 @@ fn run_impl(
// Now what do we do with the data?

if args.test_receive {
let rx = ReceiveSet::new(rx);
let rx = ReceiveSet::new(rxs);
assert_receive_into_entity_db(&rx).map(|_db| ())
} else if let Some(rrd_path) = args.save {
let rx = ReceiveSet::new(rx);
let rx = ReceiveSet::new(rxs);
Ok(stream_to_rrd_on_disk(&rx, &rrd_path.into())?)
} else if args.serve {
#[cfg(not(feature = "server"))]
{
_ = (call_source, rx);
_ = (call_source, rxs);
anyhow::bail!("Can't host server - rerun was not compiled with the 'server' feature");
}

Expand Down Expand Up @@ -757,7 +757,7 @@ fn run_impl(

// This is the server which the web viewer will talk to:
let _ws_server = re_ws_comms::RerunServer::new(
ReceiveSet::new(rx),
ReceiveSet::new(rxs),
&args.bind,
args.ws_server_port,
server_memory_limit,
Expand Down Expand Up @@ -786,7 +786,34 @@ fn run_impl(
return Ok(());
}
} else if is_another_viewer_running {
re_log::info!("Another viewer is already running, streaming data to it.");
let addr = std::net::SocketAddr::new(re_sdk::default_server_addr().ip(), args.port);
re_log::info!(%addr, "Another viewer is already running, streaming data to it.");

let sink = re_sdk::sink::TcpSink::new(addr, re_sdk::default_flush_timeout());

for rx in rxs {
while rx.is_connected() {
while let Ok(msg) = rx.recv() {
if let Some(log_msg) = msg.into_data() {
sink.send(log_msg);
}
}
}
}

// TODO(cmc): This is what I would have normally done, but this never terminates for some
// reason.
// let rx = ReceiveSet::new(rxs);
// while rx.is_connected() {
// while let Ok(msg) = rx.recv() {
// if let Some(log_msg) = msg.into_data() {
// sink.send(log_msg);
// }
// }
// }

sink.flush_blocking();

Ok(())
} else {
#[cfg(feature = "native_viewer")]
Expand All @@ -799,7 +826,7 @@ fn run_impl(
cc.egui_ctx.clone(),
cc.storage,
);
for rx in rx {
for rx in rxs {
app.add_receiver(rx);
}
app.set_profiler(profiler);
Expand All @@ -814,7 +841,7 @@ fn run_impl(

#[cfg(not(feature = "native_viewer"))]
{
_ = (call_source, rx);
_ = (call_source, rxs);
anyhow::bail!(
"Can't start viewer - rerun was compiled without the 'native_viewer' feature"
);
Expand Down
Loading