From 28280a42ee162032fb43814b92f87907628448ec Mon Sep 17 00:00:00 2001 From: Kould <2435992353@qq.com> Date: Sat, 30 Mar 2024 23:22:30 +0800 Subject: [PATCH] refactor: optimize the implementation of `IndexIter` (#193) * refactor: optimize the implementation of `IndexIter` * style: code fmt --- src/storage/kip.rs | 55 ++++-- src/storage/mod.rs | 480 ++++++++++++++++++++++++++++++--------------- 2 files changed, 361 insertions(+), 174 deletions(-) diff --git a/src/storage/kip.rs b/src/storage/kip.rs index 50d27239..ddf4263a 100644 --- a/src/storage/kip.rs +++ b/src/storage/kip.rs @@ -3,7 +3,9 @@ use crate::errors::DatabaseError; use crate::expression::range_detacher::Range; use crate::optimizer::core::statistics_meta::{StatisticMetaLoader, StatisticsMeta}; use crate::storage::table_codec::TableCodec; -use crate::storage::{Bounds, IndexIter, Iter, Storage, Transaction}; +use crate::storage::{ + Bounds, IndexImplEnum, IndexImplParams, IndexIter, Iter, Storage, Transaction, +}; use crate::types::index::{Index, IndexId, IndexMetaRef, IndexType}; use crate::types::tuple::{Tuple, TupleId}; use crate::types::{ColumnId, LogicalType}; @@ -118,6 +120,8 @@ impl Transaction for KipTransaction { let table = self .table(table_name.clone()) .ok_or(DatabaseError::TableNotFound)?; + let table_types = table.types(); + let table_name = table.name.as_str(); let offset = offset_option.unwrap_or(0); let mut tuple_columns = Vec::with_capacity(columns.len()); @@ -126,18 +130,22 @@ impl Transaction for KipTransaction { tuple_columns.push(column); projections.push(projection); } + let inner = IndexImplEnum::instance(index_meta.ty); Ok(IndexIter { offset, limit: limit_option, - tuple_schema_ref: Arc::new(tuple_columns), - index_meta, - table, - index_values: VecDeque::new(), + params: IndexImplParams { + tuple_schema_ref: Arc::new(tuple_columns), + projections, + index_meta, + table_name, + table_types, + tx: &self.tx, + }, + inner, ranges: VecDeque::from(ranges), - tx: &self.tx, scope_iter: None, - projections, }) } @@ -565,7 +573,9 @@ mod test { use crate::errors::DatabaseError; use crate::expression::range_detacher::Range; use crate::storage::kip::KipStorage; - use crate::storage::{IndexIter, Iter, Storage, Transaction}; + use crate::storage::{ + IndexImplEnum, IndexImplParams, IndexIter, Iter, PrimaryKeyIndexImpl, Storage, Transaction, + }; use crate::types::index::{IndexMeta, IndexType}; use crate::types::tuple::Tuple; use crate::types::value::DataValue; @@ -671,16 +681,21 @@ mod test { let mut iter = IndexIter { offset: 0, limit: None, - tuple_schema_ref: table.schema_ref().clone(), - index_meta: Arc::new(IndexMeta { - id: 0, - column_ids: vec![0], - table_name, - pk_ty: LogicalType::Integer, - name: "pk_a".to_string(), - ty: IndexType::PrimaryKey, - }), - table: &table, + params: IndexImplParams { + tuple_schema_ref: table.schema_ref().clone(), + projections: vec![0], + index_meta: Arc::new(IndexMeta { + id: 0, + column_ids: vec![0], + table_name, + pk_ty: LogicalType::Integer, + name: "pk_a".to_string(), + ty: IndexType::PrimaryKey, + }), + table_name: &table.name, + table_types: table.types(), + tx: &transaction.tx, + }, ranges: VecDeque::from(vec![ Range::Eq(Arc::new(DataValue::Int32(Some(0)))), Range::Scope { @@ -688,10 +703,8 @@ mod test { max: Bound::Included(Arc::new(DataValue::Int32(Some(4)))), }, ]), - index_values: VecDeque::new(), - tx: &transaction.tx, scope_iter: None, - projections: vec![0], + inner: IndexImplEnum::PrimaryKey(PrimaryKeyIndexImpl), }; let mut result = Vec::new(); diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 1cb9d938..d434fc21 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -10,6 +10,7 @@ use crate::types::index::{Index, IndexId, IndexMetaRef, IndexType}; use crate::types::tuple::{Tuple, TupleId}; use crate::types::value::{DataValue, ValueRef}; use crate::types::{ColumnId, LogicalType}; +use bytes::Bytes; use kip_db::kernel::lsm::iterator::Iter as DBIter; use kip_db::kernel::lsm::mvcc; use std::collections::{Bound, VecDeque}; @@ -120,25 +121,302 @@ pub trait Transaction: Sync + Send + 'static { async fn commit(self) -> Result<(), DatabaseError>; } -#[derive(Debug)] -enum IndexValue { - PrimaryKey(Tuple), - Normal(TupleId), +trait IndexImpl { + fn index_lookup(&self, bytes: &Bytes, params: &IndexImplParams) + -> Result; + + fn eq_to_res<'a>( + &self, + value: &ValueRef, + params: &IndexImplParams<'a>, + ) -> Result, DatabaseError>; + + fn bound_key( + &self, + params: &IndexImplParams, + value: &ValueRef, + is_upper: bool, + ) -> Result, DatabaseError>; } -// TODO: Table return optimization -pub struct IndexIter<'a> { - offset: usize, - limit: Option, +enum IndexImplEnum { + PrimaryKey(PrimaryKeyIndexImpl), + Unique(UniqueIndexImpl), + Normal(NormalIndexImpl), + Composite(CompositeIndexImpl), +} + +impl IndexImplEnum { + fn instance(index_type: IndexType) -> IndexImplEnum { + match index_type { + IndexType::PrimaryKey => IndexImplEnum::PrimaryKey(PrimaryKeyIndexImpl), + IndexType::Unique => IndexImplEnum::Unique(UniqueIndexImpl), + IndexType::Normal => IndexImplEnum::Normal(NormalIndexImpl), + IndexType::Composite => IndexImplEnum::Composite(CompositeIndexImpl), + } + } +} + +struct PrimaryKeyIndexImpl; +struct UniqueIndexImpl; +struct NormalIndexImpl; +struct CompositeIndexImpl; + +struct IndexImplParams<'a> { tuple_schema_ref: Arc>, projections: Vec, index_meta: IndexMetaRef, - table: &'a TableCatalog, + table_name: &'a str, + table_types: Vec, tx: &'a mvcc::Transaction, +} + +impl IndexImplParams<'_> { + fn get_tuple_by_id(&self, tuple_id: &TupleId) -> Result, DatabaseError> { + let key = TableCodec::encode_tuple_key(self.table_name, tuple_id)?; + + Ok(self.tx.get(&key)?.map(|bytes| { + TableCodec::decode_tuple( + &self.table_types, + &self.projections, + &self.tuple_schema_ref, + &bytes, + ) + })) + } +} + +enum IndexResult<'a> { + Tuple(Tuple), + Scope(mvcc::TransactionIter<'a>), +} + +impl IndexImpl for IndexImplEnum { + fn index_lookup( + &self, + bytes: &Bytes, + params: &IndexImplParams, + ) -> Result { + match self { + IndexImplEnum::PrimaryKey(inner) => inner.index_lookup(bytes, params), + IndexImplEnum::Unique(inner) => inner.index_lookup(bytes, params), + IndexImplEnum::Normal(inner) => inner.index_lookup(bytes, params), + IndexImplEnum::Composite(inner) => inner.index_lookup(bytes, params), + } + } + + fn eq_to_res<'a>( + &self, + value: &ValueRef, + params: &IndexImplParams<'a>, + ) -> Result, DatabaseError> { + match self { + IndexImplEnum::PrimaryKey(inner) => inner.eq_to_res(value, params), + IndexImplEnum::Unique(inner) => inner.eq_to_res(value, params), + IndexImplEnum::Normal(inner) => inner.eq_to_res(value, params), + IndexImplEnum::Composite(inner) => inner.eq_to_res(value, params), + } + } + + fn bound_key( + &self, + params: &IndexImplParams, + value: &ValueRef, + is_upper: bool, + ) -> Result, DatabaseError> { + match self { + IndexImplEnum::PrimaryKey(inner) => inner.bound_key(params, value, is_upper), + IndexImplEnum::Unique(inner) => inner.bound_key(params, value, is_upper), + IndexImplEnum::Normal(inner) => inner.bound_key(params, value, is_upper), + IndexImplEnum::Composite(inner) => inner.bound_key(params, value, is_upper), + } + } +} + +impl IndexImpl for PrimaryKeyIndexImpl { + fn index_lookup( + &self, + bytes: &Bytes, + params: &IndexImplParams, + ) -> Result { + Ok(TableCodec::decode_tuple( + ¶ms.table_types, + ¶ms.projections, + ¶ms.tuple_schema_ref, + bytes, + )) + } + + fn eq_to_res<'a>( + &self, + value: &ValueRef, + params: &IndexImplParams<'a>, + ) -> Result, DatabaseError> { + let bytes = params + .tx + .get(&TableCodec::encode_tuple_key(params.table_name, value)?)? + .ok_or_else(|| { + DatabaseError::NotFound("secondary index", format!("tuple_id -> {}", value)) + })?; + let tuple = TableCodec::decode_tuple( + ¶ms.table_types, + ¶ms.projections, + ¶ms.tuple_schema_ref, + &bytes, + ); + Ok(IndexResult::Tuple(tuple)) + } + + fn bound_key( + &self, + params: &IndexImplParams, + val: &ValueRef, + _: bool, + ) -> Result, DatabaseError> { + TableCodec::encode_tuple_key(params.table_name, val) + } +} + +fn secondary_index_lookup(bytes: &Bytes, params: &IndexImplParams) -> Result { + let tuple_id = TableCodec::decode_index(bytes, ¶ms.index_meta.pk_ty); + params + .get_tuple_by_id(&tuple_id)? + .ok_or_else(|| DatabaseError::NotFound("index's tuple_id", tuple_id.to_string())) +} + +impl IndexImpl for UniqueIndexImpl { + fn index_lookup( + &self, + bytes: &Bytes, + params: &IndexImplParams, + ) -> Result { + secondary_index_lookup(bytes, params) + } + + fn eq_to_res<'a>( + &self, + value: &ValueRef, + params: &IndexImplParams<'a>, + ) -> Result, DatabaseError> { + let bytes = params + .tx + .get(&self.bound_key(params, value, false)?)? + .ok_or_else(|| { + DatabaseError::NotFound("secondary index", format!("index_value -> {}", value)) + })?; + let tuple_id = TableCodec::decode_index(&bytes, ¶ms.index_meta.pk_ty); + let tuple = params.get_tuple_by_id(&tuple_id)?.ok_or_else(|| { + DatabaseError::NotFound("secondary index", format!("tuple_id -> {}", value)) + })?; + Ok(IndexResult::Tuple(tuple)) + } + + fn bound_key( + &self, + params: &IndexImplParams, + value: &ValueRef, + _: bool, + ) -> Result, DatabaseError> { + let index = Index::new( + params.index_meta.id, + slice::from_ref(value), + IndexType::Unique, + ); + + TableCodec::encode_index_key(params.table_name, &index, None) + } +} + +impl IndexImpl for NormalIndexImpl { + fn index_lookup( + &self, + bytes: &Bytes, + params: &IndexImplParams, + ) -> Result { + secondary_index_lookup(bytes, params) + } + fn eq_to_res<'a>( + &self, + value: &ValueRef, + params: &IndexImplParams<'a>, + ) -> Result, DatabaseError> { + let min = self.bound_key(params, value, false)?; + let max = self.bound_key(params, value, true)?; + + let iter = params.tx.iter( + Bound::Included(min.as_slice()), + Bound::Included(max.as_slice()), + )?; + Ok(IndexResult::Scope(iter)) + } + + fn bound_key( + &self, + params: &IndexImplParams, + value: &ValueRef, + is_upper: bool, + ) -> Result, DatabaseError> { + let index = Index::new( + params.index_meta.id, + slice::from_ref(value), + IndexType::Normal, + ); + + TableCodec::encode_index_bound_key(params.table_name, &index, is_upper) + } +} + +impl IndexImpl for CompositeIndexImpl { + fn index_lookup( + &self, + bytes: &Bytes, + params: &IndexImplParams, + ) -> Result { + secondary_index_lookup(bytes, params) + } + + fn eq_to_res<'a>( + &self, + value: &ValueRef, + params: &IndexImplParams<'a>, + ) -> Result, DatabaseError> { + let min = self.bound_key(params, value, false)?; + let max = self.bound_key(params, value, true)?; + + let iter = params.tx.iter( + Bound::Included(min.as_slice()), + Bound::Included(max.as_slice()), + )?; + Ok(IndexResult::Scope(iter)) + } + + fn bound_key( + &self, + params: &IndexImplParams, + value: &ValueRef, + is_upper: bool, + ) -> Result, DatabaseError> { + let values = if let DataValue::Tuple(Some(values)) = value.as_ref() { + values.as_slice() + } else { + slice::from_ref(value) + }; + let index = Index::new(params.index_meta.id, values, IndexType::Composite); + + TableCodec::encode_index_bound_key(params.table_name, &index, is_upper) + } +} + +// TODO: Table return optimization +pub struct IndexIter<'a> { + offset: usize, + limit: Option, + + params: IndexImplParams<'a>, + inner: IndexImplEnum, // for buffering data - index_values: VecDeque, ranges: VecDeque, scope_iter: Option>, } @@ -154,141 +432,73 @@ impl IndexIter<'_> { } } - fn bound_key(&self, val: &ValueRef, is_upper: bool) -> Result, DatabaseError> { - match self.index_meta.ty { - IndexType::PrimaryKey => TableCodec::encode_tuple_key(&self.table.name, val), - IndexType::Unique => { - let index = - Index::new(self.index_meta.id, slice::from_ref(val), self.index_meta.ty); - - TableCodec::encode_index_key(&self.table.name, &index, None) - } - IndexType::Normal => { - let index = - Index::new(self.index_meta.id, slice::from_ref(val), self.index_meta.ty); - - TableCodec::encode_index_bound_key(&self.table.name, &index, is_upper) - } - IndexType::Composite => { - let values = if let DataValue::Tuple(Some(values)) = val.as_ref() { - values.as_slice() - } else { - slice::from_ref(val) - }; - let index = Index::new(self.index_meta.id, values, self.index_meta.ty); - - TableCodec::encode_index_bound_key(&self.table.name, &index, is_upper) - } + fn limit_sub(limit: &mut Option) { + if let Some(num) = limit.as_mut() { + num.sub_assign(1); } } - fn get_tuple_by_id(&mut self, tuple_id: &TupleId) -> Result, DatabaseError> { - let key = TableCodec::encode_tuple_key(&self.table.name, tuple_id)?; - - Ok(self.tx.get(&key)?.map(|bytes| { - TableCodec::decode_tuple( - &self.table.types(), - &self.projections, - &self.tuple_schema_ref, - &bytes, - ) - })) - } - fn is_empty(&self) -> bool { - self.scope_iter.is_none() && self.index_values.is_empty() && self.ranges.is_empty() + self.scope_iter.is_none() && self.ranges.is_empty() } } /// expression -> index value -> tuple impl Iter for IndexIter<'_> { fn next_tuple(&mut self) -> Result, DatabaseError> { - // 1. check limit if matches!(self.limit, Some(0)) || self.is_empty() { self.scope_iter = None; self.ranges.clear(); return Ok(None); } - // 2. try get tuple on index_values and until it empty - while let Some(value) = self.index_values.pop_front() { - if Self::offset_move(&mut self.offset) { - continue; - } - let tuple = match value { - IndexValue::PrimaryKey(tuple) => { - if let Some(num) = self.limit.as_mut() { - num.sub_assign(1); - } - return Ok(Some(tuple)); - } - IndexValue::Normal(tuple_id) => { - self.get_tuple_by_id(&tuple_id)?.ok_or_else(|| { - DatabaseError::NotFound("index's tuple_id", tuple_id.to_string()) - })? - } - }; - return Ok(Some(tuple)); - } - assert!(self.index_values.is_empty()); - - // 3. If the current expression is a Scope, - // an iterator will be generated for reading the IndexValues of the Scope. if let Some(iter) = &mut self.scope_iter { while let Some((_, value_option)) = iter.try_next()? { - if let Some(value) = value_option { - let index = if matches!(self.index_meta.ty, IndexType::PrimaryKey) { - let tuple = TableCodec::decode_tuple( - &self.table.types(), - &self.projections, - &self.tuple_schema_ref, - &value, - ); - - IndexValue::PrimaryKey(tuple) - } else { - IndexValue::Normal(TableCodec::decode_index(&value, &self.index_meta.pk_ty)) - }; - self.index_values.push_back(index); - break; + if let Some(bytes) = value_option { + if Self::offset_move(&mut self.offset) { + continue; + } + Self::limit_sub(&mut self.limit); + let tuple = self.inner.index_lookup(&bytes, &self.params)?; + + return Ok(Some(tuple)); } } - if self.index_values.is_empty() { - self.scope_iter = None; - } - return self.next_tuple(); + self.scope_iter = None; } - // 4. When `scope_iter` and `index_values` do not have a value, use the next expression to iterate if let Some(binary) = self.ranges.pop_front() { match binary { Range::Scope { min, max } => { - let table_name = &self.table.name; - let index_meta = &self.index_meta; - + let table_name = self.params.table_name; + let index_meta = &self.params.index_meta; let bound_encode = |bound: Bound, is_upper: bool| -> Result<_, DatabaseError> { match bound { - Bound::Included(val) => { - Ok(Bound::Included(self.bound_key(&val, is_upper)?)) - } - Bound::Excluded(val) => { - Ok(Bound::Excluded(self.bound_key(&val, is_upper)?)) - } + Bound::Included(val) => Ok(Bound::Included(self.inner.bound_key( + &self.params, + &val, + is_upper, + )?)), + Bound::Excluded(val) => Ok(Bound::Excluded(self.inner.bound_key( + &self.params, + &val, + is_upper, + )?)), Bound::Unbounded => Ok(Bound::Unbounded), } }; - let check_bound = |value: &mut Bound>, bound: Vec| { - if matches!(value, Bound::Unbounded) { - let _ = mem::replace(value, Bound::Included(bound)); - } - }; let (bound_min, bound_max) = if matches!(index_meta.ty, IndexType::PrimaryKey) { TableCodec::tuple_bound(table_name) } else { TableCodec::index_bound(table_name, &index_meta.id) }; + let check_bound = |value: &mut Bound>, bound: Vec| { + if matches!(value, Bound::Unbounded) { + let _ = mem::replace(value, Bound::Included(bound)); + } + }; let mut encode_min = bound_encode(min, false)?; check_bound(&mut encode_min, bound_min); @@ -296,58 +506,22 @@ impl Iter for IndexIter<'_> { let mut encode_max = bound_encode(max, true)?; check_bound(&mut encode_max, bound_max); - let iter = self.tx.iter( + let iter = self.params.tx.iter( encode_min.as_ref().map(Vec::as_slice), encode_max.as_ref().map(Vec::as_slice), )?; self.scope_iter = Some(iter); } - Range::Eq(val) => { - match self.index_meta.ty { - IndexType::PrimaryKey => { - let bytes = - self.tx.get(&self.bound_key(&val, false)?)?.ok_or_else(|| { - DatabaseError::NotFound( - "secondary index", - format!("value -> {}", val), - ) - })?; - - let tuple = TableCodec::decode_tuple( - &self.table.types(), - &self.projections, - &self.tuple_schema_ref, - &bytes, - ); - self.index_values.push_back(IndexValue::PrimaryKey(tuple)); - self.scope_iter = None; - } - IndexType::Unique => { - let bytes = - self.tx.get(&self.bound_key(&val, false)?)?.ok_or_else(|| { - DatabaseError::NotFound( - "secondary index", - format!("value -> {}", val), - ) - })?; - - self.index_values.push_back(IndexValue::Normal( - TableCodec::decode_index(&bytes, &self.index_meta.pk_ty), - )); - self.scope_iter = None; - } - IndexType::Normal | IndexType::Composite => { - let min = self.bound_key(&val, false)?; - let max = self.bound_key(&val, true)?; - - let iter = self.tx.iter( - Bound::Included(min.as_slice()), - Bound::Included(max.as_slice()), - )?; - self.scope_iter = Some(iter); + Range::Eq(val) => match self.inner.eq_to_res(&val, &self.params)? { + IndexResult::Tuple(tuple) => { + if Self::offset_move(&mut self.offset) { + return self.next_tuple(); } + Self::limit_sub(&mut self.limit); + return Ok(Some(tuple)); } - } + IndexResult::Scope(iter) => self.scope_iter = Some(iter), + }, _ => (), } }