Skip to content

Commit

Permalink
Merge branch 'main' into antoine/use-re-dataframe
Browse files Browse the repository at this point in the history
  • Loading branch information
abey79 committed Sep 10, 2024
2 parents 0a8630d + 175876a commit bbff7dc
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 11 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4794,6 +4794,7 @@ version = "0.19.0-alpha.1+dev"
dependencies = [
"ahash",
"anyhow",
"arrow",
"bytemuck",
"criterion",
"crossbeam",
Expand Down
4 changes: 4 additions & 0 deletions crates/store/re_chunk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ serde = [
"re_types_core/serde",
]

## Enable conversion to and from arrow-rs types
arrow = ["arrow2/arrow", "dep:arrow"]


[dependencies]

Expand Down Expand Up @@ -60,6 +63,7 @@ thiserror.workspace = true

# Optional dependencies:
serde = { workspace = true, optional = true, features = ["derive", "rc"] }
arrow = { workspace = true, optional = true }

# Native dependencies:
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
Expand Down
67 changes: 67 additions & 0 deletions crates/store/re_chunk/src/arrow.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use arrow::{
array::{make_array, RecordBatch},
datatypes::{Field, Schema},
error::ArrowError,
};

use crate::TransportChunk;

impl TransportChunk {
/// Create an arrow-rs [`RecordBatch`] containing the data from this [`TransportChunk`].
///
/// This is a "fairly" cheap operation, as it does not copy the underlying arrow data,
/// but does incur overhead of generating an alternative representation of the arrow-
/// related rust structures that refer to those data buffers.
pub fn try_to_arrow_record_batch(&self) -> Result<RecordBatch, ArrowError> {
let fields: Vec<Field> = self
.schema
.fields
.iter()
.map(|f| f.clone().into())
.collect();

let metadata = self.schema.metadata.clone().into_iter().collect();

let schema = Schema::new(fields).with_metadata(metadata);

let columns: Vec<_> = self
.data
.columns()
.iter()
.map(|arr2_array| {
let data = arrow2::array::to_data(arr2_array.as_ref());
make_array(data)
})
.collect();

RecordBatch::try_new(std::sync::Arc::new(schema), columns)
}

/// Create a [`TransportChunk`] from an arrow-rs [`RecordBatch`].
///
/// This is a "fairly" cheap operation, as it does not copy the underlying arrow data,
/// but does incur overhead of generating an alternative representation of the arrow-
/// related rust structures that refer to those data buffers.
pub fn from_arrow_record_batch(batch: &RecordBatch) -> Self {
let fields: Vec<arrow2::datatypes::Field> = batch
.schema()
.fields
.iter()
.map(|f| f.clone().into())
.collect();

let metadata = batch.schema().metadata.clone().into_iter().collect();

let schema = arrow2::datatypes::Schema::from(fields).with_metadata(metadata);

let columns: Vec<_> = batch
.columns()
.iter()
.map(|array| arrow2::array::from_data(&array.to_data()))
.collect();

let data = arrow2::chunk::Chunk::new(columns);

Self { schema, data }
}
}
65 changes: 65 additions & 0 deletions crates/store/re_chunk/src/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,71 @@ impl Chunk {
}
&& *components == rhs.components
}

/// Check for equality while ignoring possible `Extension` type information
///
/// This is necessary because `arrow2` loses the `Extension` datatype
/// when deserializing back from the `arrow_schema::DataType` representation.
///
/// In theory we could fix this, but as we're moving away from arrow2 anyways
/// it's unlikely worth the effort.
pub fn are_equal_ignoring_extension_types(&self, other: &Self) -> bool {
let Self {
id,
entity_path,
heap_size_bytes: _,
is_sorted,
row_ids,
timelines,
components,
} = self;

let row_ids_no_extension = arrow2::array::StructArray::new(
row_ids.data_type().to_logical_type().clone(),
row_ids.values().to_vec(),
row_ids.validity().cloned(),
);

let components_no_extension: BTreeMap<_, _> = components
.iter()
.map(|(name, arr)| {
let arr = arrow2::array::ListArray::new(
arr.data_type().to_logical_type().clone(),
arr.offsets().clone(),
arr.values().clone(),
arr.validity().cloned(),
);
(name, arr)
})
.collect();

let other_components_no_extension: BTreeMap<_, _> = other
.components
.iter()
.map(|(name, arr)| {
let arr = arrow2::array::ListArray::new(
arr.data_type().to_logical_type().clone(),
arr.offsets().clone(),
arr.values().clone(),
arr.validity().cloned(),
);
(name, arr)
})
.collect();

let other_row_ids_no_extension = arrow2::array::StructArray::new(
other.row_ids.data_type().to_logical_type().clone(),
other.row_ids.values().to_vec(),
other.row_ids.validity().cloned(),
);

*id == other.id
&& *entity_path == other.entity_path
&& *is_sorted == other.is_sorted
&& row_ids_no_extension == other_row_ids_no_extension
&& *timelines == other.timelines
&& components_no_extension == other_components_no_extension
}
}

impl Clone for Chunk {
Expand Down
3 changes: 3 additions & 0 deletions crates/store/re_chunk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ pub mod util;
#[cfg(not(target_arch = "wasm32"))]
mod batcher;

#[cfg(feature = "arrow")]
mod arrow;

pub use self::builder::{ChunkBuilder, TimeColumnBuilder};
pub use self::chunk::{Chunk, ChunkError, ChunkResult, TimeColumn};
pub use self::helpers::{ChunkShared, UnitChunkShared};
Expand Down
22 changes: 20 additions & 2 deletions crates/store/re_chunk/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,18 @@ mod tests {

for _ in 0..3 {
let chunk_in_transport = chunk_before.to_transport()?;
let chunk_after = Chunk::from_transport(&chunk_in_transport)?;
let chunk_roundtrip;
#[cfg(feature = "arrow")]
{
let chunk_in_record_batch = chunk_in_transport.try_to_arrow_record_batch()?;
chunk_roundtrip =
TransportChunk::from_arrow_record_batch(&chunk_in_record_batch);
}
#[cfg(not(feature = "arrow"))]
{
chunk_roundtrip = &chunk_in_transport;
}
let chunk_after = Chunk::from_transport(&chunk_roundtrip)?;

assert_eq!(
chunk_in_transport.entity_path()?,
Expand Down Expand Up @@ -771,7 +782,14 @@ mod tests {
eprintln!("{chunk_in_transport}");
eprintln!("{chunk_after}");

assert_eq!(chunk_before, chunk_after);
#[cfg(not(feature = "arrow"))]
{
// This will fail when round-tripping all the way to record-batch
// the below check should always pass regardless.
assert_eq!(chunk_before, &chunk_after);
}

assert!(chunk_before.are_equal_ignoring_extension_types(&chunk_after));

chunk_before = chunk_after;
}
Expand Down
8 changes: 4 additions & 4 deletions crates/store/re_chunk/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ pub fn arrays_to_list_array(
pub fn arrays_to_dictionary<Idx: Copy + Eq>(
array_datatype: ArrowDatatype,
arrays: &[Option<(Idx, &dyn ArrowArray)>],
) -> Option<ArrowDictionaryArray<u32>> {
) -> Option<ArrowDictionaryArray<i32>> {
// Dedupe the input arrays based on the given primary key.
let arrays_dense_deduped = arrays
.iter()
Expand All @@ -96,7 +96,7 @@ pub fn arrays_to_dictionary<Idx: Copy + Eq>(

// Compute the keys for the final dictionary, using that same primary key.
let keys = {
let mut cur_key = 0u32;
let mut cur_key = 0i32;
arrays
.iter()
.dedup_by_with_count(|lhs, rhs| {
Expand Down Expand Up @@ -140,7 +140,7 @@ pub fn arrays_to_dictionary<Idx: Copy + Eq>(
};

let datatype = ArrowDatatype::Dictionary(
arrow2::datatypes::IntegerType::UInt32,
arrow2::datatypes::IntegerType::Int32,
std::sync::Arc::new(data.data_type().clone()),
true, // is_sorted
);
Expand All @@ -149,7 +149,7 @@ pub fn arrays_to_dictionary<Idx: Copy + Eq>(
// unique values.
ArrowDictionaryArray::try_new(
datatype,
ArrowPrimitiveArray::<u32>::from(keys),
ArrowPrimitiveArray::<i32>::from(keys),
data.to_boxed(),
)
.ok()
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_chunk_store/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ impl ChunkStore {
archetype_name: None,
archetype_field_name: None,
component_name: *component_name,
datatype: datatype.clone(),
datatype: ArrowListArray::<i32>::default_datatype(datatype.clone()),
is_static: true,
})
})
Expand Down
8 changes: 4 additions & 4 deletions crates/store/re_dataframe/src/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl RangeQueryHandle<'_> {
.map(|col| match col {
ColumnDescriptor::Component(mut descr) => {
descr.datatype = ArrowDatatype::Dictionary(
arrow2::datatypes::IntegerType::UInt32,
arrow2::datatypes::IntegerType::Int32,
descr.datatype.into(),
true,
);
Expand Down Expand Up @@ -316,7 +316,7 @@ impl RangeQueryHandle<'_> {
// see if this ever becomes an issue before going down this road.
//
// TODO(cmc): Opportunities for parallelization, if it proves to be a net positive in practice.
let dict_arrays: HashMap<&ComponentColumnDescriptor, ArrowDictionaryArray<u32>> = {
let dict_arrays: HashMap<&ComponentColumnDescriptor, ArrowDictionaryArray<i32>> = {
re_tracing::profile_scope!("queries");

columns
Expand Down Expand Up @@ -617,7 +617,7 @@ mod tests {
})
.unwrap()
.as_any()
.downcast_ref::<ArrowDictionaryArray<u32>>()
.downcast_ref::<ArrowDictionaryArray<i32>>()
.unwrap()
.values()
.clone()
Expand All @@ -644,7 +644,7 @@ mod tests {
})
.unwrap()
.as_any()
.downcast_ref::<ArrowDictionaryArray<u32>>()
.downcast_ref::<ArrowDictionaryArray<i32>>()
.unwrap()
.values()
.clone()
Expand Down

0 comments on commit bbff7dc

Please sign in to comment.