Skip to content

Commit

Permalink
Merge branch 'main' into jan/ws-perf-investigation
Browse files Browse the repository at this point in the history
  • Loading branch information
jprochazk authored Oct 24, 2024
2 parents 4f1930a + 64a27ee commit 597da6b
Show file tree
Hide file tree
Showing 43 changed files with 844 additions and 458 deletions.
19 changes: 18 additions & 1 deletion .github/workflows/reusable_checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,25 @@ jobs:

# ---------------------------------------------------------------------------

markdown-paths-filter:
runs-on: ubuntu-latest
outputs:
md_changes: ${{ steps.filter.outputs.md_changes }}
steps:
- uses: actions/checkout@v4
with:
ref: ${{ github.event_name == 'pull_request' && github.event.pull_request.head.ref || '' }}
- uses: dorny/paths-filter@v3
id: filter
with:
filters: |
md_changes:
- '**/*.md'
link-checker:
name: Check links
needs: markdown-paths-filter
if: inputs.CHANNEL == 'nightly' || needs.markdown-paths-filter.outputs.md_changes == 'true'
runs-on: ubuntu-latest
# do not fail entire workflow (e.g. nightly) if this is the only failing check
continue-on-error: true
Expand All @@ -174,7 +191,7 @@ jobs:
id: lychee
uses: lycheeverse/[email protected]
with:
fail: ${{ inputs.CHANNEL == 'nightly' }}
fail: true
lycheeVersion: "0.15.1"
# When given a directory, lychee checks only markdown, html and text files, everything else we have to glob in manually.
# Pass --verbose, so that all considered links are printed, making it easier to debug.
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/reusable_checks_rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ jobs:
pixi-version: v0.25.0

- name: Download test assets
run: pixi run python ./scripts/ci/download_test_assets.py
run: pixi run python ./tests/assets/download_test_assets.py

- name: pixi run cargo test --all-targets --all-features
run: pixi run cargo test --all-targets --all-features
35 changes: 5 additions & 30 deletions crates/store/re_data_loader/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ pub struct DataLoaderSettings {
pub application_id: Option<re_log_types::ApplicationId>,

/// The [`re_log_types::ApplicationId`] that is currently opened in the viewer, if any.
//
// TODO(#5350): actually support this
pub opened_application_id: Option<re_log_types::ApplicationId>,

/// The recommended [`re_log_types::StoreId`] to log the data to, based on the surrounding context.
Expand All @@ -64,8 +62,6 @@ pub struct DataLoaderSettings {
pub store_id: re_log_types::StoreId,

/// The [`re_log_types::StoreId`] that is currently opened in the viewer, if any.
//
// TODO(#5350): actually support this
pub opened_store_id: Option<re_log_types::StoreId>,

/// What should the logged entity paths be prefixed with?
Expand Down Expand Up @@ -318,39 +314,18 @@ impl DataLoaderError {
/// most convenient for them, whether it is raw components, arrow chunks or even
/// full-on [`LogMsg`]s.
pub enum LoadedData {
Chunk(Chunk),
ArrowMsg(ArrowMsg),
Chunk(re_log_types::StoreId, Chunk),
ArrowMsg(re_log_types::StoreId, ArrowMsg),
LogMsg(LogMsg),
}

impl From<Chunk> for LoadedData {
#[inline]
fn from(value: Chunk) -> Self {
Self::Chunk(value)
}
}

impl From<ArrowMsg> for LoadedData {
#[inline]
fn from(value: ArrowMsg) -> Self {
Self::ArrowMsg(value)
}
}

impl From<LogMsg> for LoadedData {
#[inline]
fn from(value: LogMsg) -> Self {
Self::LogMsg(value)
}
}

impl LoadedData {
/// Pack the data into a [`LogMsg`].
pub fn into_log_msg(self, store_id: &re_log_types::StoreId) -> ChunkResult<LogMsg> {
pub fn into_log_msg(self) -> ChunkResult<LogMsg> {
match self {
Self::Chunk(chunk) => Ok(LogMsg::ArrowMsg(store_id.clone(), chunk.to_arrow_msg()?)),
Self::Chunk(store_id, chunk) => Ok(LogMsg::ArrowMsg(store_id, chunk.to_arrow_msg()?)),

Self::ArrowMsg(msg) => Ok(LogMsg::ArrowMsg(store_id.clone(), msg)),
Self::ArrowMsg(store_id, msg) => Ok(LogMsg::ArrowMsg(store_id, msg)),

Self::LogMsg(msg) => Ok(msg),
}
Expand Down
100 changes: 52 additions & 48 deletions crates/store/re_data_loader/src/load_file.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::borrow::Cow;

use ahash::{HashMap, HashMapExt};
use re_log_types::{FileSource, LogMsg};
use re_smart_channel::Sender;

use crate::{extension, DataLoaderError, LoadedData};
use crate::{DataLoaderError, LoadedData};

// ---

Expand Down Expand Up @@ -36,16 +37,7 @@ pub fn load_from_path(

let rx = load(settings, path, None)?;

// TODO(cmc): should we always unconditionally set store info though?
// If we reach this point, then at least one compatible `DataLoader` has been found.
let store_info = prepare_store_info(&settings.store_id, file_source, path);
if let Some(store_info) = store_info {
if tx.send(store_info).is_err() {
return Ok(()); // other end has hung up.
}
}

send(&settings.store_id, rx, tx);
send(settings.clone(), file_source, path.to_owned(), rx, tx);

Ok(())
}
Expand All @@ -72,16 +64,7 @@ pub fn load_from_file_contents(

let data = load(settings, filepath, Some(contents))?;

// TODO(cmc): should we always unconditionally set store info though?
// If we reach this point, then at least one compatible `DataLoader` has been found.
let store_info = prepare_store_info(&settings.store_id, file_source, filepath);
if let Some(store_info) = store_info {
if tx.send(store_info).is_err() {
return Ok(()); // other end has hung up.
}
}

send(&settings.store_id, data, tx);
send(settings.clone(), file_source, filepath.to_owned(), data, tx);

Ok(())
}
Expand All @@ -93,35 +76,25 @@ pub(crate) fn prepare_store_info(
store_id: &re_log_types::StoreId,
file_source: FileSource,
path: &std::path::Path,
) -> Option<LogMsg> {
) -> LogMsg {
re_tracing::profile_function!(path.display().to_string());

use re_log_types::SetStoreInfo;

let app_id = re_log_types::ApplicationId(path.display().to_string());
let store_source = re_log_types::StoreSource::File { file_source };

let ext = extension(path);
let is_rrd = crate::SUPPORTED_RERUN_EXTENSIONS.contains(&ext.as_str());

(!is_rrd).then(|| {
LogMsg::SetStoreInfo(SetStoreInfo {
row_id: *re_chunk::RowId::new(),
info: re_log_types::StoreInfo {
application_id: app_id.clone(),
store_id: store_id.clone(),
cloned_from: None,
is_official_example: false,
started: re_log_types::Time::now(),
store_source,
// NOTE: If this is a natively supported file, it will go through one of the
// builtin dataloaders, i.e. the local version.
// Otherwise, it will go through an arbitrary external loader, at which point we
// have no certainty what the version is.
store_version: crate::is_supported_file_extension(ext.as_str())
.then_some(re_build_info::CrateVersion::LOCAL),
},
})
LogMsg::SetStoreInfo(SetStoreInfo {
row_id: *re_chunk::RowId::new(),
info: re_log_types::StoreInfo {
application_id: app_id.clone(),
store_id: store_id.clone(),
cloned_from: None,
is_official_example: false,
started: re_log_types::Time::now(),
store_source,
store_version: Some(re_build_info::CrateVersion::LOCAL),
},
})
}

Expand Down Expand Up @@ -288,32 +261,63 @@ pub(crate) fn load(
///
/// Runs asynchronously from another thread on native, synchronously on wasm.
pub(crate) fn send(
store_id: &re_log_types::StoreId,
settings: crate::DataLoaderSettings,
file_source: FileSource,
path: std::path::PathBuf,
rx_loader: std::sync::mpsc::Receiver<LoadedData>,
tx: &Sender<LogMsg>,
) {
spawn({
re_tracing::profile_function!();

let mut store_info_tracker: HashMap<re_log_types::StoreId, bool> = HashMap::new();

let tx = tx.clone();
let store_id = store_id.clone();
move || {
// ## Ignoring channel errors
//
// Not our problem whether or not the other end has hung up, but we still want to
// poll the channel in any case so as to make sure that the data producer
// doesn't get stuck.
for data in rx_loader {
let msg = match data.into_log_msg(&store_id) {
Ok(msg) => msg,
let msg = match data.into_log_msg() {
Ok(msg) => {
let store_info = match &msg {
LogMsg::SetStoreInfo(set_store_info) => {
Some((set_store_info.info.store_id.clone(), true))
}
LogMsg::ArrowMsg(store_id, _arrow_msg) => {
Some((store_id.clone(), false))
}
LogMsg::BlueprintActivationCommand(_) => None,
};

if let Some((store_id, store_info_created)) = store_info {
*store_info_tracker.entry(store_id).or_default() |= store_info_created;
}

msg
}
Err(err) => {
re_log::error!(%err, %store_id, "Couldn't serialize component data");
re_log::error!(%err, "Couldn't serialize component data");
continue;
}
};
tx.send(msg).ok();
}

for (store_id, store_info_already_created) in store_info_tracker {
let is_a_preexisting_recording =
Some(&store_id) == settings.opened_store_id.as_ref();

if store_info_already_created || is_a_preexisting_recording {
continue;
}

let store_info = prepare_store_info(&store_id, file_source.clone(), &path);
tx.send(store_info).ok();
}

tx.quit(None).ok();
}
});
Expand Down
9 changes: 7 additions & 2 deletions crates/store/re_data_loader/src/loader_archetype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl DataLoader for ArchetypeLoader {

fn load_from_file_contents(
&self,
_settings: &crate::DataLoaderSettings,
settings: &crate::DataLoaderSettings,
filepath: std::path::PathBuf,
contents: std::borrow::Cow<'_, [u8]>,
tx: std::sync::mpsc::Sender<LoadedData>,
Expand Down Expand Up @@ -133,8 +133,13 @@ impl DataLoader for ArchetypeLoader {
)?);
}

let store_id = settings
.opened_store_id
.clone()
.unwrap_or_else(|| settings.store_id.clone());
for row in rows {
if tx.send(row.into()).is_err() {
let data = LoadedData::Chunk(store_id.clone(), row);
if tx.send(data).is_err() {
break; // The other end has decided to hang up, not our problem.
}
}
Expand Down
6 changes: 5 additions & 1 deletion crates/store/re_data_loader/src/loader_external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use std::{
use ahash::HashMap;
use once_cell::sync::Lazy;

use crate::LoadedData;

// ---

/// To register a new external data loader, simply add an executable in your $PATH whose name
Expand Down Expand Up @@ -318,7 +320,9 @@ fn decode_and_stream<R: std::io::Read>(
continue;
}
};
if tx.send(msg.into()).is_err() {

let data = LoadedData::LogMsg(msg);
if tx.send(data).is_err() {
break; // The other end has decided to hang up, not our problem.
}
}
Expand Down
5 changes: 4 additions & 1 deletion crates/store/re_data_loader/src/loader_rrd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use re_log_encoding::decoder::Decoder;
use crossbeam::channel::Receiver;
use re_log_types::{ApplicationId, StoreId};

use crate::LoadedData;

// ---

/// Loads data from any `rrd` file or in-memory contents.
Expand Down Expand Up @@ -193,7 +195,8 @@ fn decode_and_stream<R: std::io::Read>(
msg
};

if tx.send(msg.into()).is_err() {
let data = LoadedData::LogMsg(msg);
if tx.send(data).is_err() {
break; // The other end has decided to hang up, not our problem.
}
}
Expand Down
23 changes: 17 additions & 6 deletions crates/store/re_data_source/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,11 @@ impl DataSource {
// or not.
let shared_store_id =
re_log_types::StoreId::random(re_log_types::StoreKind::Recording);
let settings = re_data_loader::DataLoaderSettings::recommended(shared_store_id);
let settings = re_data_loader::DataLoaderSettings {
opened_application_id: file_source.recommended_application_id().cloned(),
opened_store_id: file_source.recommended_recording_id().cloned(),
..re_data_loader::DataLoaderSettings::recommended(shared_store_id)
};
re_data_loader::load_from_path(&settings, file_source, &path, &tx)
.with_context(|| format!("{path:?}"))?;

Expand All @@ -188,7 +192,11 @@ impl DataSource {
// or not.
let shared_store_id =
re_log_types::StoreId::random(re_log_types::StoreKind::Recording);
let settings = re_data_loader::DataLoaderSettings::recommended(shared_store_id);
let settings = re_data_loader::DataLoaderSettings {
opened_application_id: file_source.recommended_application_id().cloned(),
opened_store_id: file_source.recommended_recording_id().cloned(),
..re_data_loader::DataLoaderSettings::recommended(shared_store_id)
};
re_data_loader::load_from_file_contents(
&settings,
file_source,
Expand Down Expand Up @@ -248,12 +256,15 @@ fn test_data_source_from_uri() {
];
let ws = ["ws://foo.zip", "wss://foo.zip", "127.0.0.1"];

let file_source = FileSource::DragAndDrop;
let file_source = FileSource::DragAndDrop {
recommended_application_id: None,
recommended_recording_id: None,
};

for uri in file {
assert!(
matches!(
DataSource::from_uri(file_source, uri.to_owned()),
DataSource::from_uri(file_source.clone(), uri.to_owned()),
DataSource::FilePath { .. }
),
"Expected {uri:?} to be categorized as FilePath"
Expand All @@ -263,7 +274,7 @@ fn test_data_source_from_uri() {
for uri in http {
assert!(
matches!(
DataSource::from_uri(file_source, uri.to_owned()),
DataSource::from_uri(file_source.clone(), uri.to_owned()),
DataSource::RrdHttpUrl { .. }
),
"Expected {uri:?} to be categorized as RrdHttpUrl"
Expand All @@ -273,7 +284,7 @@ fn test_data_source_from_uri() {
for uri in ws {
assert!(
matches!(
DataSource::from_uri(file_source, uri.to_owned()),
DataSource::from_uri(file_source.clone(), uri.to_owned()),
DataSource::WebSocketAddr(_)
),
"Expected {uri:?} to be categorized as WebSocketAddr"
Expand Down
Loading

0 comments on commit 597da6b

Please sign in to comment.