Skip to content

Commit

Permalink
Merge branch 'main' into jan/update-mp4-0
Browse files Browse the repository at this point in the history
  • Loading branch information
jprochazk authored Sep 10, 2024
2 parents 1516f5a + 0bea9e7 commit 926c37a
Show file tree
Hide file tree
Showing 10 changed files with 185 additions and 12 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4784,6 +4784,7 @@ version = "0.19.0-alpha.1+dev"
dependencies = [
"ahash",
"anyhow",
"arrow",
"bytemuck",
"criterion",
"crossbeam",
Expand Down
4 changes: 4 additions & 0 deletions crates/store/re_chunk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ serde = [
"re_types_core/serde",
]

## Enable conversion to and from arrow-rs types
arrow = ["arrow2/arrow", "dep:arrow"]


[dependencies]

Expand Down Expand Up @@ -60,6 +63,7 @@ thiserror.workspace = true

# Optional dependencies:
serde = { workspace = true, optional = true, features = ["derive", "rc"] }
arrow = { workspace = true, optional = true }

# Native dependencies:
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
Expand Down
67 changes: 67 additions & 0 deletions crates/store/re_chunk/src/arrow.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use arrow::{
array::{make_array, RecordBatch},
datatypes::{Field, Schema},
error::ArrowError,
};

use crate::TransportChunk;

impl TransportChunk {
/// Create an arrow-rs [`RecordBatch`] containing the data from this [`TransportChunk`].
///
/// This is a "fairly" cheap operation, as it does not copy the underlying arrow data,
/// but does incur overhead of generating an alternative representation of the arrow-
/// related rust structures that refer to those data buffers.
pub fn try_to_arrow_record_batch(&self) -> Result<RecordBatch, ArrowError> {
let fields: Vec<Field> = self
.schema
.fields
.iter()
.map(|f| f.clone().into())
.collect();

let metadata = self.schema.metadata.clone().into_iter().collect();

let schema = Schema::new(fields).with_metadata(metadata);

let columns: Vec<_> = self
.data
.columns()
.iter()
.map(|arr2_array| {
let data = arrow2::array::to_data(arr2_array.as_ref());
make_array(data)
})
.collect();

RecordBatch::try_new(std::sync::Arc::new(schema), columns)
}

/// Create a [`TransportChunk`] from an arrow-rs [`RecordBatch`].
///
/// This is a "fairly" cheap operation, as it does not copy the underlying arrow data,
/// but does incur overhead of generating an alternative representation of the arrow-
/// related rust structures that refer to those data buffers.
pub fn from_arrow_record_batch(batch: &RecordBatch) -> Self {
let fields: Vec<arrow2::datatypes::Field> = batch
.schema()
.fields
.iter()
.map(|f| f.clone().into())
.collect();

let metadata = batch.schema().metadata.clone().into_iter().collect();

let schema = arrow2::datatypes::Schema::from(fields).with_metadata(metadata);

let columns: Vec<_> = batch
.columns()
.iter()
.map(|array| arrow2::array::from_data(&array.to_data()))
.collect();

let data = arrow2::chunk::Chunk::new(columns);

Self { schema, data }
}
}
65 changes: 65 additions & 0 deletions crates/store/re_chunk/src/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,71 @@ impl Chunk {
}
&& *components == rhs.components
}

/// Check for equality while ignoring possible `Extension` type information
///
/// This is necessary because `arrow2` loses the `Extension` datatype
/// when deserializing back from the `arrow_schema::DataType` representation.
///
/// In theory we could fix this, but as we're moving away from arrow2 anyways
/// it's unlikely worth the effort.
pub fn are_equal_ignoring_extension_types(&self, other: &Self) -> bool {
let Self {
id,
entity_path,
heap_size_bytes: _,
is_sorted,
row_ids,
timelines,
components,
} = self;

let row_ids_no_extension = arrow2::array::StructArray::new(
row_ids.data_type().to_logical_type().clone(),
row_ids.values().to_vec(),
row_ids.validity().cloned(),
);

let components_no_extension: BTreeMap<_, _> = components
.iter()
.map(|(name, arr)| {
let arr = arrow2::array::ListArray::new(
arr.data_type().to_logical_type().clone(),
arr.offsets().clone(),
arr.values().clone(),
arr.validity().cloned(),
);
(name, arr)
})
.collect();

let other_components_no_extension: BTreeMap<_, _> = other
.components
.iter()
.map(|(name, arr)| {
let arr = arrow2::array::ListArray::new(
arr.data_type().to_logical_type().clone(),
arr.offsets().clone(),
arr.values().clone(),
arr.validity().cloned(),
);
(name, arr)
})
.collect();

let other_row_ids_no_extension = arrow2::array::StructArray::new(
other.row_ids.data_type().to_logical_type().clone(),
other.row_ids.values().to_vec(),
other.row_ids.validity().cloned(),
);

*id == other.id
&& *entity_path == other.entity_path
&& *is_sorted == other.is_sorted
&& row_ids_no_extension == other_row_ids_no_extension
&& *timelines == other.timelines
&& components_no_extension == other_components_no_extension
}
}

impl Clone for Chunk {
Expand Down
3 changes: 3 additions & 0 deletions crates/store/re_chunk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ pub mod util;
#[cfg(not(target_arch = "wasm32"))]
mod batcher;

#[cfg(feature = "arrow")]
mod arrow;

pub use self::builder::{ChunkBuilder, TimeColumnBuilder};
pub use self::chunk::{Chunk, ChunkError, ChunkResult, TimeColumn};
pub use self::helpers::{ChunkShared, UnitChunkShared};
Expand Down
22 changes: 20 additions & 2 deletions crates/store/re_chunk/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,18 @@ mod tests {

for _ in 0..3 {
let chunk_in_transport = chunk_before.to_transport()?;
let chunk_after = Chunk::from_transport(&chunk_in_transport)?;
let chunk_roundtrip;
#[cfg(feature = "arrow")]
{
let chunk_in_record_batch = chunk_in_transport.try_to_arrow_record_batch()?;
chunk_roundtrip =
TransportChunk::from_arrow_record_batch(&chunk_in_record_batch);
}
#[cfg(not(feature = "arrow"))]
{
chunk_roundtrip = &chunk_in_transport;
}
let chunk_after = Chunk::from_transport(&chunk_roundtrip)?;

assert_eq!(
chunk_in_transport.entity_path()?,
Expand Down Expand Up @@ -762,7 +773,14 @@ mod tests {
eprintln!("{chunk_in_transport}");
eprintln!("{chunk_after}");

assert_eq!(chunk_before, chunk_after);
#[cfg(not(feature = "arrow"))]
{
// This will fail when round-tripping all the way to record-batch
// the below check should always pass regardless.
assert_eq!(chunk_before, chunk_after);
}

assert!(chunk_before.are_equal_ignoring_extension_types(&chunk_after));

chunk_before = chunk_after;
}
Expand Down
8 changes: 4 additions & 4 deletions crates/store/re_chunk/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ pub fn arrays_to_list_array(
pub fn arrays_to_dictionary<Idx: Copy + Eq>(
array_datatype: ArrowDatatype,
arrays: &[Option<(Idx, &dyn ArrowArray)>],
) -> Option<ArrowDictionaryArray<u32>> {
) -> Option<ArrowDictionaryArray<i32>> {
// Dedupe the input arrays based on the given primary key.
let arrays_dense_deduped = arrays
.iter()
Expand All @@ -96,7 +96,7 @@ pub fn arrays_to_dictionary<Idx: Copy + Eq>(

// Compute the keys for the final dictionary, using that same primary key.
let keys = {
let mut cur_key = 0u32;
let mut cur_key = 0i32;
arrays
.iter()
.dedup_by_with_count(|lhs, rhs| {
Expand Down Expand Up @@ -140,7 +140,7 @@ pub fn arrays_to_dictionary<Idx: Copy + Eq>(
};

let datatype = ArrowDatatype::Dictionary(
arrow2::datatypes::IntegerType::UInt32,
arrow2::datatypes::IntegerType::Int32,
std::sync::Arc::new(data.data_type().clone()),
true, // is_sorted
);
Expand All @@ -149,7 +149,7 @@ pub fn arrays_to_dictionary<Idx: Copy + Eq>(
// unique values.
ArrowDictionaryArray::try_new(
datatype,
ArrowPrimitiveArray::<u32>::from(keys),
ArrowPrimitiveArray::<i32>::from(keys),
data.to_boxed(),
)
.ok()
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_chunk_store/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ impl ChunkStore {
archetype_name: None,
archetype_field_name: None,
component_name: *component_name,
datatype: datatype.clone(),
datatype: ArrowListArray::<i32>::default_datatype(datatype.clone()),
is_static: true,
})
})
Expand Down
8 changes: 4 additions & 4 deletions crates/store/re_dataframe/src/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl RangeQueryHandle<'_> {
.map(|col| match col {
ColumnDescriptor::Component(mut descr) => {
descr.datatype = ArrowDatatype::Dictionary(
arrow2::datatypes::IntegerType::UInt32,
arrow2::datatypes::IntegerType::Int32,
descr.datatype.into(),
true,
);
Expand Down Expand Up @@ -311,7 +311,7 @@ impl RangeQueryHandle<'_> {
// see if this ever becomes an issue before going down this road.
//
// TODO(cmc): Opportunities for parallelization, if it proves to be a net positive in practice.
let dict_arrays: HashMap<&ComponentColumnDescriptor, ArrowDictionaryArray<u32>> = {
let dict_arrays: HashMap<&ComponentColumnDescriptor, ArrowDictionaryArray<i32>> = {
re_tracing::profile_scope!("queries");

columns
Expand Down Expand Up @@ -612,7 +612,7 @@ mod tests {
})
.unwrap()
.as_any()
.downcast_ref::<ArrowDictionaryArray<u32>>()
.downcast_ref::<ArrowDictionaryArray<i32>>()
.unwrap()
.values()
.clone()
Expand All @@ -639,7 +639,7 @@ mod tests {
})
.unwrap()
.as_any()
.downcast_ref::<ArrowDictionaryArray<u32>>()
.downcast_ref::<ArrowDictionaryArray<i32>>()
.unwrap()
.values()
.clone()
Expand Down
17 changes: 16 additions & 1 deletion crates/viewer/re_renderer/src/renderer/video/decoder/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,9 @@ fn copy_video_frame_to_texture(
depth_or_array_layers: 1,
};
let source = {
// TODO(jan): Remove this unsafe code when https://github.com/gfx-rs/wgpu/pull/6170 ships.
// TODO(jan): The wgpu version we're using doesn't support `VideoFrame` yet.
// This got fixed in https://github.com/gfx-rs/wgpu/pull/6170 but hasn't shipped yet.
// So instead, we just pretend this is a `HtmlVideoElement` instead.
// SAFETY: Depends on the fact that `wgpu` passes the object through as-is,
// and doesn't actually inspect it in any way. The browser then does its own
// typecheck that doesn't care what kind of image source wgpu gave it.
Expand All @@ -325,6 +327,19 @@ fn copy_video_frame_to_texture(
frame.clone().expect("Failed to clone the video frame"),
)
};
// Fake width & height to work around wgpu validating this as if it was a `HtmlVideoElement`.
// Since it thinks this is a `HtmlVideoElement`, it will want to call `videoWidth` and `videoHeight`
// on it to validate the size.
// We simply redirect `displayWidth`/`displayHeight` to `videoWidth`/`videoHeight` to make it work!
let display_width = js_sys::Reflect::get(&frame, &"displayWidth".into())
.expect("Failed to get displayWidth property from VideoFrame.");
js_sys::Reflect::set(&frame, &"videoWidth".into(), &display_width)
.expect("Failed to set videoWidth property.");
let display_height = js_sys::Reflect::get(&frame, &"displayHeight".into())
.expect("Failed to get displayHeight property from VideoFrame.");
js_sys::Reflect::set(&frame, &"videoHeight".into(), &display_height)
.expect("Failed to set videoHeight property.");

wgpu_types::ImageCopyExternalImage {
source: wgpu_types::ExternalImageSource::HTMLVideoElement(frame),
origin: wgpu_types::Origin2d { x: 0, y: 0 },
Expand Down

0 comments on commit 926c37a

Please sign in to comment.