diff --git a/src/macros/mod.rs b/src/macros/mod.rs index c973bc72..bb283799 100644 --- a/src/macros/mod.rs +++ b/src/macros/mod.rs @@ -103,7 +103,7 @@ macro_rules! scala_function { _index += 1; if value.logical_type() != $arg_ty { - value = Arc::new(::fnck_sql::types::value::DataValue::clone(&value).cast(&$arg_ty)?); + value = ::fnck_sql::types::value::DataValue::clone(&value).cast(&$arg_ty)?; } value }, )*) @@ -135,8 +135,8 @@ macro_rules! scala_function { /// .map(|i| Ok(Tuple { /// id: None, /// values: vec![ -/// Arc::new(DataValue::Int32(Some(i))), -/// Arc::new(DataValue::Int32(Some(i))), +/// DataValue::Int32(Some(i)), +/// DataValue::Int32(Some(i)), /// ] /// }))) as Box>>) /// })); @@ -154,7 +154,7 @@ macro_rules! table_function { let mut columns = Vec::new(); $({ - columns.push(::fnck_sql::catalog::column::ColumnCatalog::new(stringify!($output_name).to_lowercase(), true, ::fnck_sql::catalog::column::ColumnDesc::new($output_ty, false, false, None).unwrap())); + columns.push(::fnck_sql::catalog::column::ColumnCatalog::new(stringify!($output_name).to_lowercase(), true, ::fnck_sql::catalog::column::ColumnDesc::new($output_ty, None, false, None).unwrap())); })* ::fnck_sql::catalog::table::TableCatalog::new(Arc::new(stringify!($function_name).to_lowercase()), columns).unwrap() }; @@ -199,7 +199,7 @@ macro_rules! table_function { _index += 1; if value.logical_type() != $arg_ty { - value = Arc::new(::fnck_sql::types::value::DataValue::clone(&value).cast(&$arg_ty)?); + value = ::fnck_sql::types::value::DataValue::clone(&value).cast(&$arg_ty)?; } value }, )*) diff --git a/src/storage/mod.rs b/src/storage/mod.rs index f4657a1e..75c58fb3 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -21,6 +21,7 @@ use std::ops::SubAssign; use std::sync::Arc; use std::{mem, slice}; use ulid::Generator; +use crate::types::tuple_builder::TupleIdBuilder; pub(crate) type StatisticsMetaCache = SharedLruCache<(TableName, IndexId), StatisticsMeta>; pub(crate) type TableCache = SharedLruCache; @@ -58,6 +59,7 @@ pub trait Transaction: Sized { let table = self .table(table_cache, table_name.clone())? .ok_or(DatabaseError::TableNotFound)?; + let id_builder = TupleIdBuilder::new(table.schema_ref()); let table_types = table.types(); if columns.is_empty() { let (i, column) = &table.primary_keys()[0]; @@ -78,6 +80,7 @@ pub trait Transaction: Sized { limit: bounds.1, table_types, tuple_columns: Arc::new(tuple_columns), + id_builder, projections, iter, }) @@ -98,6 +101,7 @@ pub trait Transaction: Sized { let table = self .table(table_cache, table_name.clone())? .ok_or(DatabaseError::TableNotFound)?; + let id_builder = TupleIdBuilder::new(table.schema_ref()); let table_types = table.types(); let table_name = table.name.as_str(); let offset = offset_option.unwrap_or(0); @@ -113,6 +117,7 @@ pub trait Transaction: Sized { Ok(IndexIter { offset, limit: limit_option, + id_builder, params: IndexImplParams { tuple_schema_ref: Arc::new(tuple_columns), projections, @@ -584,12 +589,14 @@ trait IndexImpl { fn index_lookup( &self, bytes: &Bytes, + id_builder: &mut TupleIdBuilder, params: &IndexImplParams, ) -> Result; fn eq_to_res<'a>( &self, value: &DataValue, + id_builder: &mut TupleIdBuilder, params: &IndexImplParams<'a, T>, ) -> Result, DatabaseError>; @@ -650,12 +657,13 @@ impl IndexImplParams<'_, T> { Ok(val) } - fn get_tuple_by_id(&self, tuple_id: &TupleId) -> Result, DatabaseError> { + fn get_tuple_by_id(&self, id_builder: &mut TupleIdBuilder, tuple_id: &TupleId) -> Result, DatabaseError> { let key = TableCodec::encode_tuple_key(self.table_name, tuple_id)?; Ok(self.tx.get(&key)?.map(|bytes| { TableCodec::decode_tuple( &self.table_types, + id_builder, &self.projections, &self.tuple_schema_ref, &bytes, @@ -673,26 +681,28 @@ impl IndexImpl for IndexImplEnum { fn index_lookup( &self, bytes: &Bytes, + id_builder: &mut TupleIdBuilder, params: &IndexImplParams, ) -> Result { match self { - IndexImplEnum::PrimaryKey(inner) => inner.index_lookup(bytes, params), - IndexImplEnum::Unique(inner) => inner.index_lookup(bytes, params), - IndexImplEnum::Normal(inner) => inner.index_lookup(bytes, params), - IndexImplEnum::Composite(inner) => inner.index_lookup(bytes, params), + IndexImplEnum::PrimaryKey(inner) => inner.index_lookup(bytes, id_builder, params), + IndexImplEnum::Unique(inner) => inner.index_lookup(bytes, id_builder, params), + IndexImplEnum::Normal(inner) => inner.index_lookup(bytes, id_builder, params), + IndexImplEnum::Composite(inner) => inner.index_lookup(bytes, id_builder, params), } } fn eq_to_res<'a>( &self, value: &DataValue, + id_builder: &mut TupleIdBuilder, params: &IndexImplParams<'a, T>, ) -> Result, DatabaseError> { match self { - IndexImplEnum::PrimaryKey(inner) => inner.eq_to_res(value, params), - IndexImplEnum::Unique(inner) => inner.eq_to_res(value, params), - IndexImplEnum::Normal(inner) => inner.eq_to_res(value, params), - IndexImplEnum::Composite(inner) => inner.eq_to_res(value, params), + IndexImplEnum::PrimaryKey(inner) => inner.eq_to_res(value, id_builder, params), + IndexImplEnum::Unique(inner) => inner.eq_to_res(value, id_builder, params), + IndexImplEnum::Normal(inner) => inner.eq_to_res(value, id_builder, params), + IndexImplEnum::Composite(inner) => inner.eq_to_res(value, id_builder, params), } } @@ -715,10 +725,12 @@ impl IndexImpl for PrimaryKeyIndexImpl { fn index_lookup( &self, bytes: &Bytes, + id_builder: &mut TupleIdBuilder, params: &IndexImplParams, ) -> Result { Ok(TableCodec::decode_tuple( ¶ms.table_types, + id_builder, ¶ms.projections, ¶ms.tuple_schema_ref, bytes, @@ -728,6 +740,7 @@ impl IndexImpl for PrimaryKeyIndexImpl { fn eq_to_res<'a>( &self, value: &DataValue, + id_builder: &mut TupleIdBuilder, params: &IndexImplParams<'a, T>, ) -> Result, DatabaseError> { let tuple = params @@ -736,6 +749,7 @@ impl IndexImpl for PrimaryKeyIndexImpl { .map(|bytes| { TableCodec::decode_tuple( ¶ms.table_types, + id_builder, ¶ms.projections, ¶ms.tuple_schema_ref, &bytes, @@ -756,11 +770,12 @@ impl IndexImpl for PrimaryKeyIndexImpl { fn secondary_index_lookup( bytes: &Bytes, + id_builder: &mut TupleIdBuilder, params: &IndexImplParams, ) -> Result { let tuple_id = TableCodec::decode_index(bytes, ¶ms.index_meta.pk_ty)?; params - .get_tuple_by_id(&tuple_id)? + .get_tuple_by_id(id_builder, &tuple_id)? .ok_or(DatabaseError::TupleIdNotFound(tuple_id)) } @@ -768,14 +783,16 @@ impl IndexImpl for UniqueIndexImpl { fn index_lookup( &self, bytes: &Bytes, + id_builder: &mut TupleIdBuilder, params: &IndexImplParams, ) -> Result { - secondary_index_lookup(bytes, params) + secondary_index_lookup(bytes, id_builder, params) } fn eq_to_res<'a>( &self, value: &DataValue, + id_builder: &mut TupleIdBuilder, params: &IndexImplParams<'a, T>, ) -> Result, DatabaseError> { let Some(bytes) = params.tx.get(&self.bound_key(params, value, false)?)? else { @@ -783,7 +800,7 @@ impl IndexImpl for UniqueIndexImpl { }; let tuple_id = TableCodec::decode_index(&bytes, ¶ms.index_meta.pk_ty)?; let tuple = params - .get_tuple_by_id(&tuple_id)? + .get_tuple_by_id(id_builder, &tuple_id)? .ok_or(DatabaseError::TupleIdNotFound(tuple_id))?; Ok(IndexResult::Tuple(Some(tuple))) } @@ -808,14 +825,16 @@ impl IndexImpl for NormalIndexImpl { fn index_lookup( &self, bytes: &Bytes, + id_builder: &mut TupleIdBuilder, params: &IndexImplParams, ) -> Result { - secondary_index_lookup(bytes, params) + secondary_index_lookup(bytes, id_builder, params) } fn eq_to_res<'a>( &self, value: &DataValue, + _: &mut TupleIdBuilder, params: &IndexImplParams<'a, T>, ) -> Result, DatabaseError> { let min = self.bound_key(params, value, false)?; @@ -848,14 +867,16 @@ impl IndexImpl for CompositeIndexImpl { fn index_lookup( &self, bytes: &Bytes, + id_builder: &mut TupleIdBuilder, params: &IndexImplParams, ) -> Result { - secondary_index_lookup(bytes, params) + secondary_index_lookup(bytes, id_builder, params) } fn eq_to_res<'a>( &self, value: &DataValue, + _: &mut TupleIdBuilder, params: &IndexImplParams<'a, T>, ) -> Result, DatabaseError> { let min = self.bound_key(params, value, false)?; @@ -890,6 +911,7 @@ pub struct TupleIter<'a, T: Transaction + 'a> { limit: Option, table_types: Vec, tuple_columns: Arc>, + id_builder: TupleIdBuilder, projections: Vec, iter: T::IterType<'a>, } @@ -911,6 +933,7 @@ impl<'a, T: Transaction + 'a> Iter for TupleIter<'a, T> { while let Some((_, value)) = self.iter.try_next()? { let tuple = TableCodec::decode_tuple( &self.table_types, + &mut self.id_builder, &self.projections, &self.tuple_columns, &value, @@ -931,6 +954,7 @@ pub struct IndexIter<'a, T: Transaction> { offset: usize, limit: Option, + id_builder: TupleIdBuilder, params: IndexImplParams<'a, T>, inner: IndexImplEnum, // for buffering data @@ -976,7 +1000,7 @@ impl Iter for IndexIter<'_, T> { continue; } Self::limit_sub(&mut self.limit); - let tuple = self.inner.index_lookup(&bytes, &self.params)?; + let tuple = self.inner.index_lookup(&bytes, &mut self.id_builder, &self.params)?; return Ok(Some(tuple)); } @@ -1039,7 +1063,7 @@ impl Iter for IndexIter<'_, T> { Range::Eq(mut val) => { val = self.params.try_cast(val)?; - match self.inner.eq_to_res(&val, &self.params)? { + match self.inner.eq_to_res(&val, &mut self.id_builder, &self.params)? { IndexResult::Tuple(tuple) => { if Self::offset_move(&mut self.offset) { return self.next_tuple(); diff --git a/src/storage/rocksdb.rs b/src/storage/rocksdb.rs index 3d9b3693..94dfe984 100644 --- a/src/storage/rocksdb.rs +++ b/src/storage/rocksdb.rs @@ -149,6 +149,7 @@ mod test { use std::hash::RandomState; use std::sync::Arc; use tempfile::TempDir; + use crate::types::tuple_builder::TupleIdBuilder; #[test] fn test_in_rocksdb_storage_works_with_data() -> Result<(), DatabaseError> { @@ -243,9 +244,11 @@ mod test { DataValue::Int32(Some(3)), DataValue::Int32(Some(4)), ]; + let id_builder = TupleIdBuilder::new(table.schema_ref()); let mut iter = IndexIter { offset: 0, limit: None, + id_builder, params: IndexImplParams { tuple_schema_ref: table.schema_ref().clone(), projections: vec![0], diff --git a/src/storage/table_codec.rs b/src/storage/table_codec.rs index 29c34564..b547fcb0 100644 --- a/src/storage/table_codec.rs +++ b/src/storage/table_codec.rs @@ -11,6 +11,7 @@ use bytes::Bytes; use integer_encoding::FixedInt; use lazy_static::lazy_static; use std::io::{Cursor, Read, Seek, SeekFrom, Write}; +use crate::types::tuple_builder::TupleIdBuilder; const BOUND_MIN_TAG: u8 = 0; const BOUND_MAX_TAG: u8 = 1; @@ -250,11 +251,12 @@ impl TableCodec { pub fn decode_tuple( table_types: &[LogicalType], + id_builder: &mut TupleIdBuilder, projections: &[usize], schema: &Schema, bytes: &[u8], ) -> Tuple { - Tuple::deserialize_from(table_types, projections, schema, bytes) + Tuple::deserialize_from(table_types, id_builder, projections, schema, bytes) } /// Key: {TableName}{INDEX_META_TAG}{BOUND_MIN_TAG}{IndexID} @@ -508,6 +510,7 @@ mod tests { use std::slice; use std::sync::Arc; use ulid::Ulid; + use crate::types::tuple_builder::TupleIdBuilder; fn build_table_codec() -> TableCatalog { let columns = vec![ @@ -542,9 +545,10 @@ mod tests { &[LogicalType::Integer, LogicalType::Decimal(None, None)], )?; let schema = table_catalog.schema_ref(); + let mut id_builder = TupleIdBuilder::new(schema); debug_assert_eq!( - TableCodec::decode_tuple(&table_catalog.types(), &[0, 1], schema, &bytes), + TableCodec::decode_tuple(&table_catalog.types(), &mut id_builder, &[0, 1], schema, &bytes), tuple ); diff --git a/src/types/tuple.rs b/src/types/tuple.rs index 6c9ac67f..cb4ddd98 100644 --- a/src/types/tuple.rs +++ b/src/types/tuple.rs @@ -6,6 +6,7 @@ use comfy_table::{Cell, Table}; use itertools::Itertools; use lazy_static::lazy_static; use std::sync::Arc; +use crate::types::tuple_builder::TupleIdBuilder; lazy_static! { pub static ref EMPTY_TUPLE: Tuple = { @@ -38,6 +39,7 @@ pub struct Tuple { impl Tuple { pub fn deserialize_from( table_types: &[LogicalType], + id_builder: &mut TupleIdBuilder, projections: &[usize], schema: &Schema, bytes: &[u8], @@ -52,7 +54,6 @@ impl Tuple { let values_len = table_types.len(); let mut tuple_values = Vec::with_capacity(values_len); let bits_len = (values_len + BITS_MAX_INDEX) / BITS_MAX_INDEX; - let mut primary_keys = Vec::new(); let mut projection_i = 0; let mut pos = bits_len; @@ -64,13 +65,13 @@ impl Tuple { if is_none(bytes[i / BITS_MAX_INDEX], i % BITS_MAX_INDEX) { if projections[projection_i] == i { tuple_values.push(DataValue::none(logic_type)); - Self::values_push(schema, &tuple_values, &mut primary_keys, &mut projection_i); + Self::values_push(schema, &tuple_values, id_builder, &mut projection_i); } } else if let Some(len) = logic_type.raw_len() { /// fixed length (e.g.: int) if projections[projection_i] == i { tuple_values.push(DataValue::from_raw(&bytes[pos..pos + len], logic_type)); - Self::values_push(schema, &tuple_values, &mut primary_keys, &mut projection_i); + Self::values_push(schema, &tuple_values, id_builder, &mut projection_i); } pos += len; } else { @@ -80,21 +81,13 @@ impl Tuple { pos += 4; if projections[projection_i] == i { tuple_values.push(DataValue::from_raw(&bytes[pos..pos + len], logic_type)); - Self::values_push(schema, &tuple_values, &mut primary_keys, &mut projection_i); + Self::values_push(schema, &tuple_values, id_builder, &mut projection_i); } pos += len; } } - - let id = (!primary_keys.is_empty()).then(|| { - if primary_keys.len() == 1 { - primary_keys.pop().unwrap() - } else { - DataValue::Tuple(Some(primary_keys)) - } - }); Tuple { - id, + id: id_builder.build(), values: tuple_values, } } @@ -102,11 +95,11 @@ impl Tuple { fn values_push( tuple_columns: &Schema, tuple_values: &[DataValue], - primary_keys: &mut Vec, + id_builder: &mut TupleIdBuilder, projection_i: &mut usize, ) { if tuple_columns[*projection_i].desc().is_primary() { - primary_keys.push(tuple_values[*projection_i].clone()) + id_builder.append(tuple_values[*projection_i].clone()); } *projection_i += 1; } @@ -181,6 +174,7 @@ mod tests { use rust_decimal::Decimal; use sqlparser::ast::CharLengthUnits; use std::sync::Arc; + use crate::types::tuple_builder::TupleIdBuilder; #[test] fn test_tuple_serialize_to_and_deserialize_from() { @@ -372,15 +366,18 @@ mod tests { .map(|column| column.datatype().clone()) .collect_vec(); let columns = Arc::new(columns); + let mut id_builder = TupleIdBuilder::new(&columns); let tuple_0 = Tuple::deserialize_from( &types, + &mut id_builder, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15], &columns, &tuples[0].serialize_to(&types).unwrap(), ); let tuple_1 = Tuple::deserialize_from( &types, + &mut id_builder, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15], &columns, &tuples[1].serialize_to(&types).unwrap(), diff --git a/src/types/tuple_builder.rs b/src/types/tuple_builder.rs index a5dc784b..4fb4cc84 100644 --- a/src/types/tuple_builder.rs +++ b/src/types/tuple_builder.rs @@ -4,7 +4,7 @@ use crate::types::value::{DataValue, Utf8Type}; use itertools::Itertools; use sqlparser::ast::CharLengthUnits; -pub(crate) struct TupleIdBuilder { +pub struct TupleIdBuilder { primary_indexes: Vec, tmp_keys: Vec>, } @@ -14,7 +14,7 @@ pub struct TupleBuilder<'a> { } impl TupleIdBuilder { - pub(crate) fn new(schema: &Schema) -> Self { + pub fn new(schema: &Schema) -> Self { let primary_indexes = schema .iter() .filter_map(|column| column.desc().primary()) @@ -29,16 +29,16 @@ impl TupleIdBuilder { } } - pub(crate) fn append(&mut self, value: DataValue) { + pub fn append(&mut self, value: DataValue) { self.tmp_keys.push(Some(value)); } - pub(crate) fn build(&mut self) -> Option { + pub fn build(&mut self) -> Option { (!self.tmp_keys.is_empty()).then(|| { if self.tmp_keys.len() == 1 { self.tmp_keys.pop().unwrap().unwrap() } else { - let mut primary_keys = Vec::new(); + let mut primary_keys = Vec::with_capacity(self.primary_indexes.len()); for i in self.primary_indexes.iter() { primary_keys.push(self.tmp_keys[*i].take().unwrap()); diff --git a/tests/macros-test/src/main.rs b/tests/macros-test/src/main.rs index f428d232..bd604326 100644 --- a/tests/macros-test/src/main.rs +++ b/tests/macros-test/src/main.rs @@ -11,7 +11,6 @@ mod test { use fnck_sql::expression::ScalarExpression; use fnck_sql::types::evaluator::EvaluatorFactory; use fnck_sql::types::tuple::{SchemaRef, Tuple}; - use fnck_sql::types::value::DataValue; use fnck_sql::types::value::{DataValue, Utf8Type}; use fnck_sql::types::LogicalType; use fnck_sql::{implement_from_tuple, scala_function, table_function}; @@ -23,14 +22,14 @@ mod test { ColumnRef::from(ColumnCatalog::new( "c1".to_string(), false, - ColumnDesc::new(LogicalType::Integer, true, false, None).unwrap(), + ColumnDesc::new(LogicalType::Integer, Some(0), false, None).unwrap(), )), ColumnRef::from(ColumnCatalog::new( "c2".to_string(), false, ColumnDesc::new( LogicalType::Varchar(None, CharLengthUnits::Characters), - false, + None, false, None, ) @@ -38,12 +37,12 @@ mod test { )), ]); let values = vec![ - Arc::new(DataValue::Int32(Some(9))), - Arc::new(DataValue::Utf8 { + DataValue::Int32(Some(9)), + DataValue::Utf8 { value: Some("LOL".to_string()), ty: Utf8Type::Variable(None), unit: CharLengthUnits::Characters, - }), + }, ]; (Tuple { id: None, values }, schema_ref) @@ -95,8 +94,8 @@ mod test { .map(|i| Ok(Tuple { id: None, values: vec![ - Arc::new(DataValue::Int32(Some(i))), - Arc::new(DataValue::Int32(Some(i))), + DataValue::Int32(Some(i)), + DataValue::Int32(Some(i)), ] }))) as Box>>) })); @@ -106,12 +105,12 @@ mod test { let function = MyScalaFunction::new(); let sum = function.eval( &[ - ScalarExpression::Constant(Arc::new(DataValue::Int8(Some(1)))), - ScalarExpression::Constant(Arc::new(DataValue::Utf8 { + ScalarExpression::Constant(DataValue::Int8(Some(1))), + ScalarExpression::Constant(DataValue::Utf8 { value: Some("1".to_string()), ty: Utf8Type::Variable(None), unit: CharLengthUnits::Characters, - })), + }), ], &Tuple { id: None, @@ -136,9 +135,7 @@ mod test { #[test] fn test_table_function() -> Result<(), DatabaseError> { let function = MyTableFunction::new(); - let mut numbers = function.eval(&[ScalarExpression::Constant(Arc::new( - DataValue::Int8(Some(2)), - ))])?; + let mut numbers = function.eval(&[ScalarExpression::Constant(DataValue::Int8(Some(2)))])?; println!("{:?}", function); @@ -153,20 +150,14 @@ mod test { numbers.next().unwrap().unwrap(), Tuple { id: None, - values: vec![ - Arc::new(DataValue::Int32(Some(0))), - Arc::new(DataValue::Int32(Some(0))), - ] + values: vec![DataValue::Int32(Some(0)), DataValue::Int32(Some(0)),] } ); assert_eq!( numbers.next().unwrap().unwrap(), Tuple { id: None, - values: vec![ - Arc::new(DataValue::Int32(Some(1))), - Arc::new(DataValue::Int32(Some(1))), - ] + values: vec![DataValue::Int32(Some(1)), DataValue::Int32(Some(1)),] } ); assert!(numbers.next().is_none()); @@ -176,7 +167,7 @@ mod test { let mut c1 = ColumnCatalog::new( "c1".to_string(), true, - ColumnDesc::new(LogicalType::Integer, false, false, None)?, + ColumnDesc::new(LogicalType::Integer, None, false, None)?, ); c1.summary_mut().relation = ColumnRelation::Table { column_id: function_schema[0].id().unwrap(), @@ -186,7 +177,7 @@ mod test { let mut c2 = ColumnCatalog::new( "c2".to_string(), true, - ColumnDesc::new(LogicalType::Integer, false, false, None)?, + ColumnDesc::new(LogicalType::Integer, None, false, None)?, ); c2.summary_mut().relation = ColumnRelation::Table { column_id: function_schema[1].id().unwrap(),