Skip to content

Commit

Permalink
Compute proper latest-at query for data_ui + fix warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
abey79 committed Sep 9, 2024
1 parent 5ae68b1 commit 8b5a82d
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 20 deletions.
4 changes: 1 addition & 3 deletions crates/store/re_chunk/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,9 +274,7 @@ impl TransportChunk {
}

#[inline]
pub fn all_columns<'a>(
&'a self,
) -> impl Iterator<Item = (&ArrowField, &'a Box<dyn ArrowArray>)> + 'a {
pub fn all_columns(&self) -> impl Iterator<Item = (&ArrowField, &Box<dyn ArrowArray>)> + '_ {
self.schema
.fields
.iter()
Expand Down
76 changes: 61 additions & 15 deletions crates/viewer/re_space_view_dataframe/src/dataframe_ui.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ use std::collections::BTreeMap;
use std::ops::Range;

use anyhow::Context;
use itertools::Itertools;

use re_chunk_store::{ColumnDescriptor, LatestAtQuery, RowId};
use re_dataframe::{LatestAtQueryHandle, RangeQueryHandle, RecordBatch};
use re_log_types::{EntityPath, TimeInt, Timeline};
use re_types_core::ComponentName;
use re_ui::UiExt as _;
use re_viewer_context::ViewerContext;

Expand All @@ -16,7 +18,7 @@ pub(crate) fn dataframe_ui<'a>(
ui: &mut egui::Ui,
query: impl Into<QueryHandle<'a>>,
) {
dataframe_ui_impl(ctx, ui, query.into());
dataframe_ui_impl(ctx, ui, &query.into());
}

/// A query handle for either a latest-at or range query.
Expand Down Expand Up @@ -73,18 +75,27 @@ impl<'a> From<RangeQueryHandle<'a>> for QueryHandle<'a> {
///
/// Row data is stored in a bunch of [`DisplayRecordBatch`], which are created from
/// [`RecordBatch`]s. We also maintain a mapping for each row number to the corresponding record
/// batch and the inedex inside it.
/// batch and the index inside it.
struct RowsDisplayData {
//row_ids: Vec<RowId>,
/// The [`DisplayRecordBatch`]s to display.
display_record_batches: Vec<DisplayRecordBatch>,

/// For each row to be displayed, the batch index and the row index with that batch.
batch_and_row_from_row_nr: BTreeMap<u64, (usize, usize)>,

/// The index of the time column corresponding to the query timeline.
time_column_index: Option<usize>,

/// The index of the time column corresponding the row IDs.
row_id_column_index: Option<usize>,
}

impl RowsDisplayData {
fn try_new(
row_indices: &Range<u64>,
record_batches: Vec<RecordBatch>,
schema: &[ColumnDescriptor],
query_timeline: &Timeline,
) -> Result<Self, DisplayRecordBatchError> {
let display_record_batches = record_batches
.into_iter()
Expand All @@ -101,10 +112,34 @@ impl RowsDisplayData {
offset += batch_len;
}

// find the time column
let time_column_index = schema
.iter()
.find_position(|desc| match desc {
ColumnDescriptor::Time(time_column_desc) => {
&time_column_desc.timeline == query_timeline
}
_ => false,
})
.map(|(pos, _)| pos);

// find the row id column
let row_id_column_index = schema
.iter()
.find_position(|desc| match desc {
ColumnDescriptor::Control(control_column_desc) => {
control_column_desc.component_name
== ComponentName::from("rerun.controls.RowId")
}
_ => false,
})
.map(|(pos, _)| pos);

Ok(Self {
//row_ids: row_indices.collect(),
display_record_batches,
batch_and_row_from_row_nr,
time_column_index,
row_id_column_index,
})
}
}
Expand All @@ -116,11 +151,8 @@ struct DataframeTableDelegate<'a> {
schema: &'a [ColumnDescriptor],
header_entity_paths: Vec<Option<EntityPath>>,
display_data: anyhow::Result<RowsDisplayData>,
//display_record_batches: Option<Vec<DisplayRecordBatch>>,
query_timeline: Timeline,

num_rows: u64,
//batch_and_row_from_row_nr: BTreeMap<u64, (usize, usize)>,
}

impl<'a> egui_table::TableDelegate for DataframeTableDelegate<'a> {
Expand All @@ -134,6 +166,7 @@ impl<'a> egui_table::TableDelegate for DataframeTableDelegate<'a> {
info.visible_rows.end - info.visible_rows.start,
),
self.schema,
&self.query_handle.timeline(),
);

self.display_data = data.with_context(|| "Failed to create display data");
Expand Down Expand Up @@ -188,11 +221,6 @@ impl<'a> egui_table::TableDelegate for DataframeTableDelegate<'a> {
egui::Frame::none()
.inner_margin(egui::Margin::symmetric(4.0, 0.0))
.show(ui, |ui| {
//TODO: wrong, we must pass the actual timestamp of the row
let latest_at_query = LatestAtQuery::new(self.query_timeline, TimeInt::MAX);
//TODO: wrong, we must pass the actual row_id (if we have it)
let row_id = RowId::ZERO;

if let Some((batch_nr, batch_index)) = display_data
.batch_and_row_from_row_nr
.get(&cell.row_nr)
Expand All @@ -201,6 +229,24 @@ impl<'a> egui_table::TableDelegate for DataframeTableDelegate<'a> {
let batch = &display_data.display_record_batches[batch_nr];
let column = &batch.columns()[cell.col_nr];

// compute the latest at query for this row (used to display tooltips)
let timestamp = display_data
.time_column_index
.and_then(|index| {
display_data.display_record_batches[batch_nr].columns()[index]
.try_decode_time(batch_index)
})
.unwrap_or(TimeInt::MAX);
let latest_at_query =
LatestAtQuery::new(self.query_handle.timeline(), timestamp);
let row_id = display_data
.row_id_column_index
.and_then(|index| {
display_data.display_record_batches[batch_nr].columns()[index]
.try_decode_row_id(batch_index)
})
.unwrap_or(RowId::ZERO);

if ui.is_sizing_pass() {
ui.style_mut().wrap_mode = Some(egui::TextWrapMode::Extend);
} else {
Expand All @@ -218,7 +264,7 @@ impl<'a> egui_table::TableDelegate for DataframeTableDelegate<'a> {
}

/// Display the result of a [`QueryHandle`] in a table.
fn dataframe_ui_impl(ctx: &ViewerContext<'_>, ui: &mut egui::Ui, query_handle: QueryHandle<'_>) {
fn dataframe_ui_impl(ctx: &ViewerContext<'_>, ui: &mut egui::Ui, query_handle: &QueryHandle<'_>) {
re_tracing::profile_function!();

let schema = query_handle.schema();
Expand All @@ -228,10 +274,9 @@ fn dataframe_ui_impl(ctx: &ViewerContext<'_>, ui: &mut egui::Ui, query_handle: Q

let mut table_delegate = DataframeTableDelegate {
ctx,
query_handle: &query_handle,
query_handle,
schema,
header_entity_paths,
query_timeline: query_handle.timeline(),
num_rows,
display_data: Err(anyhow::anyhow!(
"No row data, `fetch_columns_and_rows` not called."
Expand Down Expand Up @@ -276,6 +321,7 @@ fn column_groups_for_entity(
if columns.is_empty() {
(vec![], vec![])
} else if columns.len() == 1 {
#[allow(clippy::single_range_in_vec_init)]
(vec![0..1], vec![columns[0].entity_path().cloned()])
} else {
let mut groups = vec![];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub(crate) enum ComponentData {
}

impl ComponentData {
#[allow(clippy::borrowed_box)] // https://github.com/rust-lang/rust-clippy/issues/11940
fn try_new(
descriptor: &ComponentColumnDescriptor,
column_data: &Box<dyn ArrowArray>,
Expand Down Expand Up @@ -75,6 +76,7 @@ impl ComponentData {
}
}

#[allow(clippy::too_many_arguments)]
fn data_ui(
&self,
ctx: &ViewerContext<'_>,
Expand Down Expand Up @@ -133,6 +135,7 @@ pub(crate) enum DisplayColumn {
}

impl DisplayColumn {
#[allow(clippy::borrowed_box)] // https://github.com/rust-lang/rust-clippy/issues/11940
fn try_new(
column_schema: &ColumnDescriptor,
column_data: &Box<dyn ArrowArray>,
Expand All @@ -143,7 +146,7 @@ impl DisplayColumn {
let row_ids = column_data
.as_any()
.downcast_ref::<ArrowStructArray>()
.unwrap();
.expect("sanity checked");
let [times, counters] = row_ids.values() else {
panic!("RowIds are corrupt -- this should be impossible (sanity checked)");
};
Expand Down Expand Up @@ -249,6 +252,36 @@ impl DisplayColumn {
}
}
}

/// Try to decode the row ID from the given row index.
///
/// Succeeds only if the column is a `RowId` column.
pub(crate) fn try_decode_row_id(&self, row_index: usize) -> Option<RowId> {
match self {
Self::RowId {
row_id_times,
row_id_counters,
} => {
let time = row_id_times.value(row_index);
let counter = row_id_counters.value(row_index);
Some(RowId::from_u128((time as u128) << 64 | (counter as u128)))
}
_ => None,
}
}

/// Try to decode the time from the given row index.
///
/// Succeeds only if the column is a `Timeline` column.
pub(crate) fn try_decode_time(&self, row_index: usize) -> Option<TimeInt> {
match self {
Self::Timeline { time_data, .. } => {
let timestamp = time_data.value(row_index);
TimeInt::try_from(timestamp).ok()
}
_ => None,
}
}
}

pub(crate) struct DisplayRecordBatch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ impl DataframeSpaceView {
ctx.blueprint_db(),
ctx.blueprint_query,
Self::identifier(),
&re_log_types::EntityPathSubs::new_with_origin(&space_origin),
&re_log_types::EntityPathSubs::new_with_origin(space_origin),
)
.entity_path_filter
}
Expand Down

0 comments on commit 8b5a82d

Please sign in to comment.