Skip to content

Commit

Permalink
perf: use TupleIdBuilder for Tuple::deserialize_from
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Nov 10, 2024
1 parent 4c65d5d commit ee53f4d
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 67 deletions.
10 changes: 5 additions & 5 deletions src/macros/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ macro_rules! scala_function {
_index += 1;

if value.logical_type() != $arg_ty {
value = Arc::new(::fnck_sql::types::value::DataValue::clone(&value).cast(&$arg_ty)?);
value = ::fnck_sql::types::value::DataValue::clone(&value).cast(&$arg_ty)?;
}
value
}, )*)
Expand Down Expand Up @@ -135,8 +135,8 @@ macro_rules! scala_function {
/// .map(|i| Ok(Tuple {
/// id: None,
/// values: vec![
/// Arc::new(DataValue::Int32(Some(i))),
/// Arc::new(DataValue::Int32(Some(i))),
/// DataValue::Int32(Some(i)),
/// DataValue::Int32(Some(i)),
/// ]
/// }))) as Box<dyn Iterator<Item = Result<Tuple, DatabaseError>>>)
/// }));
Expand All @@ -154,7 +154,7 @@ macro_rules! table_function {
let mut columns = Vec::new();

$({
columns.push(::fnck_sql::catalog::column::ColumnCatalog::new(stringify!($output_name).to_lowercase(), true, ::fnck_sql::catalog::column::ColumnDesc::new($output_ty, false, false, None).unwrap()));
columns.push(::fnck_sql::catalog::column::ColumnCatalog::new(stringify!($output_name).to_lowercase(), true, ::fnck_sql::catalog::column::ColumnDesc::new($output_ty, None, false, None).unwrap()));
})*
::fnck_sql::catalog::table::TableCatalog::new(Arc::new(stringify!($function_name).to_lowercase()), columns).unwrap()
};
Expand Down Expand Up @@ -199,7 +199,7 @@ macro_rules! table_function {
_index += 1;

if value.logical_type() != $arg_ty {
value = Arc::new(::fnck_sql::types::value::DataValue::clone(&value).cast(&$arg_ty)?);
value = ::fnck_sql::types::value::DataValue::clone(&value).cast(&$arg_ty)?;
}
value
}, )*)
Expand Down
56 changes: 40 additions & 16 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::ops::SubAssign;
use std::sync::Arc;
use std::{mem, slice};
use ulid::Generator;
use crate::types::tuple_builder::TupleIdBuilder;

pub(crate) type StatisticsMetaCache = SharedLruCache<(TableName, IndexId), StatisticsMeta>;
pub(crate) type TableCache = SharedLruCache<TableName, TableCatalog>;
Expand Down Expand Up @@ -58,6 +59,7 @@ pub trait Transaction: Sized {
let table = self
.table(table_cache, table_name.clone())?
.ok_or(DatabaseError::TableNotFound)?;
let id_builder = TupleIdBuilder::new(table.schema_ref());
let table_types = table.types();
if columns.is_empty() {
let (i, column) = &table.primary_keys()[0];
Expand All @@ -78,6 +80,7 @@ pub trait Transaction: Sized {
limit: bounds.1,
table_types,
tuple_columns: Arc::new(tuple_columns),
id_builder,
projections,
iter,
})
Expand All @@ -98,6 +101,7 @@ pub trait Transaction: Sized {
let table = self
.table(table_cache, table_name.clone())?
.ok_or(DatabaseError::TableNotFound)?;
let id_builder = TupleIdBuilder::new(table.schema_ref());
let table_types = table.types();
let table_name = table.name.as_str();
let offset = offset_option.unwrap_or(0);
Expand All @@ -113,6 +117,7 @@ pub trait Transaction: Sized {
Ok(IndexIter {
offset,
limit: limit_option,
id_builder,
params: IndexImplParams {
tuple_schema_ref: Arc::new(tuple_columns),
projections,
Expand Down Expand Up @@ -584,12 +589,14 @@ trait IndexImpl<T: Transaction> {
fn index_lookup(
&self,
bytes: &Bytes,
id_builder: &mut TupleIdBuilder,
params: &IndexImplParams<T>,
) -> Result<Tuple, DatabaseError>;

fn eq_to_res<'a>(
&self,
value: &DataValue,
id_builder: &mut TupleIdBuilder,
params: &IndexImplParams<'a, T>,
) -> Result<IndexResult<'a, T>, DatabaseError>;

Expand Down Expand Up @@ -650,12 +657,13 @@ impl<T: Transaction> IndexImplParams<'_, T> {
Ok(val)
}

fn get_tuple_by_id(&self, tuple_id: &TupleId) -> Result<Option<Tuple>, DatabaseError> {
fn get_tuple_by_id(&self, id_builder: &mut TupleIdBuilder, tuple_id: &TupleId) -> Result<Option<Tuple>, DatabaseError> {
let key = TableCodec::encode_tuple_key(self.table_name, tuple_id)?;

Ok(self.tx.get(&key)?.map(|bytes| {
TableCodec::decode_tuple(
&self.table_types,
id_builder,
&self.projections,
&self.tuple_schema_ref,
&bytes,
Expand All @@ -673,26 +681,28 @@ impl<T: Transaction> IndexImpl<T> for IndexImplEnum {
fn index_lookup(
&self,
bytes: &Bytes,
id_builder: &mut TupleIdBuilder,
params: &IndexImplParams<T>,
) -> Result<Tuple, DatabaseError> {
match self {
IndexImplEnum::PrimaryKey(inner) => inner.index_lookup(bytes, params),
IndexImplEnum::Unique(inner) => inner.index_lookup(bytes, params),
IndexImplEnum::Normal(inner) => inner.index_lookup(bytes, params),
IndexImplEnum::Composite(inner) => inner.index_lookup(bytes, params),
IndexImplEnum::PrimaryKey(inner) => inner.index_lookup(bytes, id_builder, params),
IndexImplEnum::Unique(inner) => inner.index_lookup(bytes, id_builder, params),
IndexImplEnum::Normal(inner) => inner.index_lookup(bytes, id_builder, params),
IndexImplEnum::Composite(inner) => inner.index_lookup(bytes, id_builder, params),
}
}

fn eq_to_res<'a>(
&self,
value: &DataValue,
id_builder: &mut TupleIdBuilder,
params: &IndexImplParams<'a, T>,
) -> Result<IndexResult<'a, T>, DatabaseError> {
match self {
IndexImplEnum::PrimaryKey(inner) => inner.eq_to_res(value, params),
IndexImplEnum::Unique(inner) => inner.eq_to_res(value, params),
IndexImplEnum::Normal(inner) => inner.eq_to_res(value, params),
IndexImplEnum::Composite(inner) => inner.eq_to_res(value, params),
IndexImplEnum::PrimaryKey(inner) => inner.eq_to_res(value, id_builder, params),
IndexImplEnum::Unique(inner) => inner.eq_to_res(value, id_builder, params),
IndexImplEnum::Normal(inner) => inner.eq_to_res(value, id_builder, params),
IndexImplEnum::Composite(inner) => inner.eq_to_res(value, id_builder, params),
}
}

Expand All @@ -715,10 +725,12 @@ impl<T: Transaction> IndexImpl<T> for PrimaryKeyIndexImpl {
fn index_lookup(
&self,
bytes: &Bytes,
id_builder: &mut TupleIdBuilder,
params: &IndexImplParams<T>,
) -> Result<Tuple, DatabaseError> {
Ok(TableCodec::decode_tuple(
&params.table_types,
id_builder,
&params.projections,
&params.tuple_schema_ref,
bytes,
Expand All @@ -728,6 +740,7 @@ impl<T: Transaction> IndexImpl<T> for PrimaryKeyIndexImpl {
fn eq_to_res<'a>(
&self,
value: &DataValue,
id_builder: &mut TupleIdBuilder,
params: &IndexImplParams<'a, T>,
) -> Result<IndexResult<'a, T>, DatabaseError> {
let tuple = params
Expand All @@ -736,6 +749,7 @@ impl<T: Transaction> IndexImpl<T> for PrimaryKeyIndexImpl {
.map(|bytes| {
TableCodec::decode_tuple(
&params.table_types,
id_builder,
&params.projections,
&params.tuple_schema_ref,
&bytes,
Expand All @@ -756,34 +770,37 @@ impl<T: Transaction> IndexImpl<T> for PrimaryKeyIndexImpl {

fn secondary_index_lookup<T: Transaction>(
bytes: &Bytes,
id_builder: &mut TupleIdBuilder,
params: &IndexImplParams<T>,
) -> Result<Tuple, DatabaseError> {
let tuple_id = TableCodec::decode_index(bytes, &params.index_meta.pk_ty)?;
params
.get_tuple_by_id(&tuple_id)?
.get_tuple_by_id(id_builder, &tuple_id)?
.ok_or(DatabaseError::TupleIdNotFound(tuple_id))
}

impl<T: Transaction> IndexImpl<T> for UniqueIndexImpl {
fn index_lookup(
&self,
bytes: &Bytes,
id_builder: &mut TupleIdBuilder,
params: &IndexImplParams<T>,
) -> Result<Tuple, DatabaseError> {
secondary_index_lookup(bytes, params)
secondary_index_lookup(bytes, id_builder, params)
}

fn eq_to_res<'a>(
&self,
value: &DataValue,
id_builder: &mut TupleIdBuilder,
params: &IndexImplParams<'a, T>,
) -> Result<IndexResult<'a, T>, DatabaseError> {
let Some(bytes) = params.tx.get(&self.bound_key(params, value, false)?)? else {
return Ok(IndexResult::Tuple(None));
};
let tuple_id = TableCodec::decode_index(&bytes, &params.index_meta.pk_ty)?;
let tuple = params
.get_tuple_by_id(&tuple_id)?
.get_tuple_by_id(id_builder, &tuple_id)?
.ok_or(DatabaseError::TupleIdNotFound(tuple_id))?;
Ok(IndexResult::Tuple(Some(tuple)))
}
Expand All @@ -808,14 +825,16 @@ impl<T: Transaction> IndexImpl<T> for NormalIndexImpl {
fn index_lookup(
&self,
bytes: &Bytes,
id_builder: &mut TupleIdBuilder,
params: &IndexImplParams<T>,
) -> Result<Tuple, DatabaseError> {
secondary_index_lookup(bytes, params)
secondary_index_lookup(bytes, id_builder, params)
}

fn eq_to_res<'a>(
&self,
value: &DataValue,
_: &mut TupleIdBuilder,
params: &IndexImplParams<'a, T>,
) -> Result<IndexResult<'a, T>, DatabaseError> {
let min = self.bound_key(params, value, false)?;
Expand Down Expand Up @@ -848,14 +867,16 @@ impl<T: Transaction> IndexImpl<T> for CompositeIndexImpl {
fn index_lookup(
&self,
bytes: &Bytes,
id_builder: &mut TupleIdBuilder,
params: &IndexImplParams<T>,
) -> Result<Tuple, DatabaseError> {
secondary_index_lookup(bytes, params)
secondary_index_lookup(bytes, id_builder, params)
}

fn eq_to_res<'a>(
&self,
value: &DataValue,
_: &mut TupleIdBuilder,
params: &IndexImplParams<'a, T>,
) -> Result<IndexResult<'a, T>, DatabaseError> {
let min = self.bound_key(params, value, false)?;
Expand Down Expand Up @@ -890,6 +911,7 @@ pub struct TupleIter<'a, T: Transaction + 'a> {
limit: Option<usize>,
table_types: Vec<LogicalType>,
tuple_columns: Arc<Vec<ColumnRef>>,
id_builder: TupleIdBuilder,
projections: Vec<usize>,
iter: T::IterType<'a>,
}
Expand All @@ -911,6 +933,7 @@ impl<'a, T: Transaction + 'a> Iter for TupleIter<'a, T> {
while let Some((_, value)) = self.iter.try_next()? {
let tuple = TableCodec::decode_tuple(
&self.table_types,
&mut self.id_builder,
&self.projections,
&self.tuple_columns,
&value,
Expand All @@ -931,6 +954,7 @@ pub struct IndexIter<'a, T: Transaction> {
offset: usize,
limit: Option<usize>,

id_builder: TupleIdBuilder,
params: IndexImplParams<'a, T>,
inner: IndexImplEnum,
// for buffering data
Expand Down Expand Up @@ -976,7 +1000,7 @@ impl<T: Transaction> Iter for IndexIter<'_, T> {
continue;
}
Self::limit_sub(&mut self.limit);
let tuple = self.inner.index_lookup(&bytes, &self.params)?;
let tuple = self.inner.index_lookup(&bytes, &mut self.id_builder, &self.params)?;

return Ok(Some(tuple));
}
Expand Down Expand Up @@ -1039,7 +1063,7 @@ impl<T: Transaction> Iter for IndexIter<'_, T> {
Range::Eq(mut val) => {
val = self.params.try_cast(val)?;

match self.inner.eq_to_res(&val, &self.params)? {
match self.inner.eq_to_res(&val, &mut self.id_builder, &self.params)? {
IndexResult::Tuple(tuple) => {
if Self::offset_move(&mut self.offset) {
return self.next_tuple();
Expand Down
3 changes: 3 additions & 0 deletions src/storage/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ mod test {
use std::hash::RandomState;
use std::sync::Arc;
use tempfile::TempDir;
use crate::types::tuple_builder::TupleIdBuilder;

#[test]
fn test_in_rocksdb_storage_works_with_data() -> Result<(), DatabaseError> {
Expand Down Expand Up @@ -243,9 +244,11 @@ mod test {
DataValue::Int32(Some(3)),
DataValue::Int32(Some(4)),
];
let id_builder = TupleIdBuilder::new(table.schema_ref());
let mut iter = IndexIter {
offset: 0,
limit: None,
id_builder,
params: IndexImplParams {
tuple_schema_ref: table.schema_ref().clone(),
projections: vec![0],
Expand Down
8 changes: 6 additions & 2 deletions src/storage/table_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use bytes::Bytes;
use integer_encoding::FixedInt;
use lazy_static::lazy_static;
use std::io::{Cursor, Read, Seek, SeekFrom, Write};
use crate::types::tuple_builder::TupleIdBuilder;

const BOUND_MIN_TAG: u8 = 0;
const BOUND_MAX_TAG: u8 = 1;
Expand Down Expand Up @@ -250,11 +251,12 @@ impl TableCodec {

pub fn decode_tuple(
table_types: &[LogicalType],
id_builder: &mut TupleIdBuilder,
projections: &[usize],
schema: &Schema,
bytes: &[u8],
) -> Tuple {
Tuple::deserialize_from(table_types, projections, schema, bytes)
Tuple::deserialize_from(table_types, id_builder, projections, schema, bytes)
}

/// Key: {TableName}{INDEX_META_TAG}{BOUND_MIN_TAG}{IndexID}
Expand Down Expand Up @@ -508,6 +510,7 @@ mod tests {
use std::slice;
use std::sync::Arc;
use ulid::Ulid;
use crate::types::tuple_builder::TupleIdBuilder;

fn build_table_codec() -> TableCatalog {
let columns = vec![
Expand Down Expand Up @@ -542,9 +545,10 @@ mod tests {
&[LogicalType::Integer, LogicalType::Decimal(None, None)],
)?;
let schema = table_catalog.schema_ref();
let mut id_builder = TupleIdBuilder::new(schema);

debug_assert_eq!(
TableCodec::decode_tuple(&table_catalog.types(), &[0, 1], schema, &bytes),
TableCodec::decode_tuple(&table_catalog.types(), &mut id_builder, &[0, 1], schema, &bytes),
tuple
);

Expand Down
Loading

0 comments on commit ee53f4d

Please sign in to comment.