Skip to content

Commit

Permalink
refactor: move dyn schema to runtime mod
Browse files Browse the repository at this point in the history
  • Loading branch information
ethe committed Dec 16, 2024
1 parent 9e80dba commit 69f639b
Show file tree
Hide file tree
Showing 12 changed files with 95 additions and 109 deletions.
5 changes: 3 additions & 2 deletions src/inmem/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ pub(crate) mod tests {

use super::{ArrowArrays, Builder};
use crate::{
magic,
record::{Record, Schema},
tests::{Test, TestRef},
timestamp::timestamped::Timestamped,
Expand All @@ -250,7 +251,7 @@ pub(crate) mod tests {
static SCHEMA: Lazy<Arc<ArrowSchema>> = Lazy::new(|| {
Arc::new(ArrowSchema::new(vec![
Field::new("_null", DataType::Boolean, false),
Field::new("_ts", DataType::UInt32, false),
Field::new(magic::TS, DataType::UInt32, false),
Field::new("vstring", DataType::Utf8, false),
Field::new("vu32", DataType::UInt32, false),
Field::new("vbool", DataType::Boolean, true),
Expand All @@ -271,7 +272,7 @@ pub(crate) mod tests {
Vec<parquet::format::SortingColumn>,
) {
(
ColumnPath::new(vec!["_ts".to_string(), "vstring".to_string()]),
ColumnPath::new(vec![magic::TS.to_string(), "vstring".to_string()]),
vec![
SortingColumn::new(1, true, true),
SortingColumn::new(2, false, true),
Expand Down
6 changes: 4 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ mod compaction;
pub mod executor;
pub mod fs;
pub mod inmem;
pub mod magic;
mod ondisk;
pub mod option;
pub mod record;
Expand All @@ -147,6 +148,7 @@ use futures_core::Stream;
use futures_util::StreamExt;
use inmem::{immutable::Immutable, mutable::Mutable};
use lockable::LockableHashMap;
use magic::USER_COLUMN_OFFSET;
pub use once_cell;
pub use parquet;
use parquet::{
Expand Down Expand Up @@ -615,7 +617,7 @@ where
Projection::Parts(projection) => {
let mut fixed_projection: Vec<usize> = [0, 1, primary_key_index]
.into_iter()
.chain(projection.into_iter().map(|p| p + 2))
.chain(projection.into_iter().map(|p| p + USER_COLUMN_OFFSET))
.collect();
fixed_projection.dedup();

Expand Down Expand Up @@ -733,7 +735,7 @@ where
pub fn projection(self, mut projection: Vec<usize>) -> Self {
// skip two columns: _null and _ts
for p in &mut projection {
*p += 2;
*p += USER_COLUMN_OFFSET;
}
let primary_key_index = self.schema.record_schema.primary_key_index();
let mut fixed_projection = vec![0, 1, primary_key_index];
Expand Down
2 changes: 2 additions & 0 deletions src/magic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub const TS: &str = "_ts";
pub const USER_COLUMN_OFFSET: usize = 2;
95 changes: 2 additions & 93 deletions src/record/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,9 @@ pub mod runtime;
#[cfg(test)]
pub(crate) mod test;

use std::{collections::HashMap, error::Error, fmt::Debug, io, sync::Arc};
use std::{error::Error, fmt::Debug, io, sync::Arc};

use array::DynRecordImmutableArrays;
use arrow::{
array::RecordBatch,
datatypes::{DataType, Field, Schema as ArrowSchema},
};
use arrow::{array::RecordBatch, datatypes::Schema as ArrowSchema};
use internal::InternalRecordRef;
pub use key::{Key, KeyRef};
use parquet::{arrow::ProjectionMask, format::SortingColumn, schema::types::ColumnPath};
Expand All @@ -22,35 +18,6 @@ use crate::{
serdes::{Decode, Encode},
};

// #[allow(unused)]
// pub(crate) enum RecordInstance {
// Normal,
// Runtime(DynRecord),
// }

// #[allow(unused)]
// impl RecordInstance {
// pub(crate) fn primary_key_index<R>(&self) -> usize
// where
// R: Record,
// {
// match self {
// RecordInstance::Normal => R::primary_key_index(),
// RecordInstance::Runtime(record) => record.primary_key_index(),
// }
// }

// pub(crate) fn arrow_schema<R>(&self) -> Arc<ArrowSchema>
// where
// R: Record,
// {
// match self {
// RecordInstance::Normal => R::arrow_schema().clone(),
// RecordInstance::Runtime(record) => record.arrow_schema(),
// }
// }
// }

pub trait Schema: Debug + Send + Sync {
type Record: Record<Schema = Self>;

Expand All @@ -65,64 +32,6 @@ pub trait Schema: Debug + Send + Sync {
fn primary_key_path(&self) -> (ColumnPath, Vec<SortingColumn>);
}

#[derive(Debug)]
pub struct DynSchema {
schema: Vec<ValueDesc>,
primary_index: usize,
arrow_schema: Arc<ArrowSchema>,
}

impl DynSchema {
pub fn new(schema: Vec<ValueDesc>, primary_index: usize) -> Self {
let mut metadata = HashMap::new();
metadata.insert("primary_key_index".to_string(), primary_index.to_string());
let arrow_schema = Arc::new(ArrowSchema::new_with_metadata(
[
Field::new("_null", DataType::Boolean, false),
Field::new("_ts", DataType::UInt32, false),
]
.into_iter()
.chain(schema.iter().map(|desc| desc.arrow_field()))
.collect::<Vec<_>>(),
metadata,
));
Self {
schema,
primary_index,
arrow_schema,
}
}
}

impl Schema for DynSchema {
type Record = DynRecord;

type Columns = DynRecordImmutableArrays;

type Key = Value;

fn arrow_schema(&self) -> &Arc<ArrowSchema> {
&self.arrow_schema
}

fn primary_key_index(&self) -> usize {
self.primary_index
}

fn primary_key_path(&self) -> (ColumnPath, Vec<SortingColumn>) {
(
ColumnPath::new(vec![
"_ts".to_string(),
self.schema[self.primary_index].name.clone(),
]),
vec![
SortingColumn::new(1_i32, true, true),
SortingColumn::new(self.primary_key_index() as i32, false, true),
],
)
}
}

pub trait Record: 'static + Sized + Decode + Debug + Send + Sync {
type Schema: Schema<Record = Self>;

Expand Down
5 changes: 3 additions & 2 deletions src/record/runtime/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use arrow::{
use super::{record::DynRecord, record_ref::DynRecordRef, value::Value, Datatype};
use crate::{
inmem::immutable::{ArrowArrays, Builder},
magic::USER_COLUMN_OFFSET,
record::{Key, Record, Schema},
timestamp::Timestamped,
};
Expand Down Expand Up @@ -117,7 +118,7 @@ impl ArrowArrays for DynRecordImmutableArrays {

let mut columns = vec![];
for (idx, col) in self.columns.iter().enumerate() {
if projection_mask.leaf_included(idx + 2) && !col.is_nullable {
if projection_mask.leaf_included(idx + USER_COLUMN_OFFSET) && !col.is_nullable {
let datatype = col.datatype;
let name = col.name.to_string();
let value: Arc<dyn Any + Send + Sync> = match datatype {
Expand Down Expand Up @@ -458,7 +459,7 @@ impl Builder<DynRecordImmutableArrays> for DynRecordBuilder {
.zip(self.datatypes.iter())
.enumerate()
{
let field = self.schema.field(idx + 2);
let field = self.schema.field(idx + USER_COLUMN_OFFSET);
let is_nullable = field.is_nullable();
match datatype {
Datatype::UInt8 => {
Expand Down
2 changes: 2 additions & 0 deletions src/record/runtime/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
pub(crate) mod array;
mod record;
mod record_ref;
mod schema;
mod value;

use arrow::datatypes::DataType;
pub use record::*;
pub use record_ref::*;
pub use schema::*;
pub use value::*;

#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
Expand Down
8 changes: 4 additions & 4 deletions src/record/runtime/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use std::sync::Arc;

use fusio::SeqRead;

use super::{Datatype, DynRecordRef, Value};
use super::{schema::DynSchema, Datatype, DynRecordRef, Value};
use crate::{
record::{DynSchema, Record, RecordDecodeError},
record::{Record, RecordDecodeError},
serdes::{Decode, Encode},
};

Expand Down Expand Up @@ -175,8 +175,8 @@ impl Record for DynRecord {
pub(crate) mod test {
use std::sync::Arc;

use super::DynRecord;
use crate::record::{Datatype, DynSchema, Value, ValueDesc};
use super::{DynRecord, DynSchema};
use crate::record::{Datatype, Value, ValueDesc};

#[allow(unused)]
pub(crate) fn test_dyn_item_schema() -> DynSchema {
Expand Down
4 changes: 3 additions & 1 deletion src/record/runtime/record_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use fusio::Write;

use super::{Datatype, DynRecord, Value};
use crate::{
magic::USER_COLUMN_OFFSET,
record::{internal::InternalRecordRef, Key, Record, RecordEncodeError, RecordRef, Schema},
serdes::Encode,
};
Expand Down Expand Up @@ -215,7 +216,8 @@ impl<'r> RecordRef<'r> for DynRecordRef<'r> {

fn projection(&mut self, projection_mask: &parquet::arrow::ProjectionMask) {
for (idx, col) in self.columns.iter_mut().enumerate() {
if idx != self.primary_index && !projection_mask.leaf_included(idx + 2) {
if idx != self.primary_index && !projection_mask.leaf_included(idx + USER_COLUMN_OFFSET)
{
match col.datatype {
Datatype::UInt8 => col.value = Arc::<Option<u8>>::new(None),
Datatype::UInt16 => col.value = Arc::<Option<u16>>::new(None),
Expand Down
65 changes: 65 additions & 0 deletions src/record/runtime/schema.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use std::{collections::HashMap, sync::Arc};

use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use parquet::{format::SortingColumn, schema::types::ColumnPath};

use super::{array::DynRecordImmutableArrays, DynRecord, Value, ValueDesc};
use crate::{magic, record::Schema};

#[derive(Debug)]
pub struct DynSchema {
schema: Vec<ValueDesc>,
primary_index: usize,
arrow_schema: Arc<ArrowSchema>,
}

impl DynSchema {
pub fn new(schema: Vec<ValueDesc>, primary_index: usize) -> Self {
let mut metadata = HashMap::new();
metadata.insert("primary_key_index".to_string(), primary_index.to_string());
let arrow_schema = Arc::new(ArrowSchema::new_with_metadata(
[
Field::new("_null", DataType::Boolean, false),
Field::new(magic::TS, DataType::UInt32, false),
]
.into_iter()
.chain(schema.iter().map(|desc| desc.arrow_field()))
.collect::<Vec<_>>(),
metadata,
));
Self {
schema,
primary_index,
arrow_schema,
}
}
}

impl Schema for DynSchema {
type Record = DynRecord;

type Columns = DynRecordImmutableArrays;

type Key = Value;

fn arrow_schema(&self) -> &Arc<ArrowSchema> {
&self.arrow_schema
}

fn primary_key_index(&self) -> usize {
self.primary_index
}

fn primary_key_path(&self) -> (ColumnPath, Vec<SortingColumn>) {
(
ColumnPath::new(vec![
magic::TS.to_string(),
self.schema[self.primary_index].name.clone(),
]),
vec![
SortingColumn::new(1_i32, true, true),
SortingColumn::new(self.primary_key_index() as i32, false, true),
],
)
}
}
5 changes: 3 additions & 2 deletions src/record/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use parquet::{arrow::ProjectionMask, format::SortingColumn, schema::types::Colum
use super::{internal::InternalRecordRef, Key, Record, RecordRef, Schema};
use crate::{
inmem::immutable::{ArrowArrays, Builder},
magic,
timestamp::Timestamped,
};

Expand All @@ -32,7 +33,7 @@ impl Schema for StringSchema {
static SCHEMA: Lazy<Arc<ArrowSchema>> = Lazy::new(|| {
Arc::new(ArrowSchema::new(vec![
Field::new("_null", DataType::Boolean, false),
Field::new("_ts", DataType::UInt32, false),
Field::new(magic::TS, DataType::UInt32, false),
Field::new(PRIMARY_FIELD_NAME, DataType::Utf8, false),
]))
});
Expand All @@ -46,7 +47,7 @@ impl Schema for StringSchema {

fn primary_key_path(&self) -> (ColumnPath, Vec<SortingColumn>) {
(
ColumnPath::new(vec!["_ts".to_string(), PRIMARY_FIELD_NAME.to_string()]),
ColumnPath::new(vec![magic::TS.to_string(), PRIMARY_FIELD_NAME.to_string()]),
vec![
SortingColumn::new(1, true, true),
SortingColumn::new(2, false, true),
Expand Down
3 changes: 2 additions & 1 deletion tests/macros_correctness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mod tests {
use tokio::io::AsyncSeekExt;
use tonbo::{
inmem::immutable::{ArrowArrays, Builder},
magic,
record::{Record, RecordRef, Schema},
serdes::{Decode, Encode},
timestamp::timestamped::Timestamped,
Expand All @@ -42,7 +43,7 @@ mod tests {
assert_eq!(
UserSchema {}.primary_key_path(),
(
ColumnPath::new(vec!["_ts".to_string(), "name".to_string()]),
ColumnPath::new(vec![magic::TS.to_string(), "name".to_string()]),
vec![
SortingColumn::new(1, true, true),
SortingColumn::new(4, false, true),
Expand Down
4 changes: 2 additions & 2 deletions tonbo_macros/src/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ fn struct_schema_codegen(

fn primary_key_path(&self) -> (::tonbo::parquet::schema::types::ColumnPath, Vec<::tonbo::parquet::format::SortingColumn>) {
(
::tonbo::parquet::schema::types::ColumnPath::new(vec!["_ts".to_string(), stringify!(#primary_key_name).to_string()]),
::tonbo::parquet::schema::types::ColumnPath::new(vec![::tonbo::magic::TS.to_string(), stringify!(#primary_key_name).to_string()]),
vec![::tonbo::parquet::format::SortingColumn::new(1_i32, true, true), ::tonbo::parquet::format::SortingColumn::new(#primary_key_index as i32, false, true)]
)
}
Expand All @@ -383,7 +383,7 @@ fn struct_schema_codegen(
static SCHEMA: ::tonbo::once_cell::sync::Lazy<::std::sync::Arc<::tonbo::arrow::datatypes::Schema>> = ::tonbo::once_cell::sync::Lazy::new(|| {
::std::sync::Arc::new(::tonbo::arrow::datatypes::Schema::new(vec![
::tonbo::arrow::datatypes::Field::new("_null", ::tonbo::arrow::datatypes::DataType::Boolean, false),
::tonbo::arrow::datatypes::Field::new("_ts", ::tonbo::arrow::datatypes::DataType::UInt32, false),
::tonbo::arrow::datatypes::Field::new(::tonbo::magic::TS, ::tonbo::arrow::datatypes::DataType::UInt32, false),
#(#schema_fields)*
]))
});
Expand Down

0 comments on commit 69f639b

Please sign in to comment.