Skip to content

Commit

Permalink
chore: make IndexIter stateful to enhance readability
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Dec 4, 2024
1 parent 60526e1 commit f57ab88
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 86 deletions.
184 changes: 100 additions & 84 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ use crate::types::{ColumnId, LogicalType};
use crate::utils::lru::SharedLruCache;
use bytes::Bytes;
use itertools::Itertools;
use std::collections::{Bound, VecDeque};
use std::collections::Bound;
use std::io::Cursor;
use std::mem;
use std::ops::SubAssign;
use std::sync::Arc;
use std::vec::IntoIter;
use ulid::Generator;

pub(crate) type StatisticsMetaCache = SharedLruCache<(TableName, IndexId), StatisticsMeta>;
Expand Down Expand Up @@ -127,8 +128,8 @@ pub trait Transaction: Sized {
tx: self,
},
inner,
ranges: VecDeque::from(ranges),
scope_iter: None,
ranges: ranges.into_iter(),
state: IndexIterState::Init,
})
}

Expand Down Expand Up @@ -965,8 +966,14 @@ pub struct IndexIter<'a, T: Transaction> {
params: IndexImplParams<'a, T>,
inner: IndexImplEnum,
// for buffering data
ranges: VecDeque<Range>,
scope_iter: Option<T::IterType<'a>>,
ranges: IntoIter<Range>,
state: IndexIterState<'a, T>,
}

pub enum IndexIterState<'a, T: Transaction + 'a> {
Init,
Range(T::IterType<'a>),
Over,
}

impl<'a, T: Transaction + 'a> IndexIter<'a, T> {
Expand All @@ -985,102 +992,111 @@ impl<'a, T: Transaction + 'a> IndexIter<'a, T> {
num.sub_assign(1);
}
}

fn is_empty(&self) -> bool {
self.scope_iter.is_none() && self.ranges.is_empty()
}
}

/// expression -> index value -> tuple
impl<T: Transaction> Iter for IndexIter<'_, T> {
fn next_tuple(&mut self) -> Result<Option<Tuple>, DatabaseError> {
if matches!(self.limit, Some(0)) || self.is_empty() {
self.scope_iter = None;
self.ranges.clear();
if matches!(self.limit, Some(0)) {
self.state = IndexIterState::Over;

return Ok(None);
}

if let Some(iter) = &mut self.scope_iter {
while let Some((_, bytes)) = iter.try_next()? {
if Self::offset_move(&mut self.offset) {
continue;
}
Self::limit_sub(&mut self.limit);
let tuple = self
.inner
.index_lookup(&bytes, &mut self.id_builder, &self.params)?;

return Ok(Some(tuple));
}
self.scope_iter = None;
}

if let Some(binary) = self.ranges.pop_front() {
match binary {
Range::Scope { min, max } => {
let table_name = self.params.table_name;
let index_meta = &self.params.index_meta;
let bound_encode = |bound: Bound<DataValue>| -> Result<_, DatabaseError> {
match bound {
Bound::Included(mut val) => {
val = self.params.try_cast(val)?;

Ok(Bound::Included(self.inner.bound_key(&self.params, &val)?))
}
Bound::Excluded(mut val) => {
val = self.params.try_cast(val)?;

Ok(Bound::Excluded(self.inner.bound_key(&self.params, &val)?))
}
Bound::Unbounded => Ok(Bound::Unbounded),
}
loop {
match &mut self.state {
IndexIterState::Init => {
let Some(binary) = self.ranges.next() else {
self.state = IndexIterState::Over;
continue;
};
let (bound_min, bound_max) =
if matches!(index_meta.ty, IndexType::PrimaryKey { .. }) {
TableCodec::tuple_bound(table_name)
} else {
TableCodec::index_bound(table_name, &index_meta.id)?
};
let check_bound = |value: &mut Bound<Vec<u8>>, bound: Vec<u8>| {
if matches!(value, Bound::Unbounded) {
let _ = mem::replace(value, Bound::Included(bound));
match binary {
Range::Scope { min, max } => {
let table_name = self.params.table_name;
let index_meta = &self.params.index_meta;
let bound_encode =
|bound: Bound<DataValue>| -> Result<_, DatabaseError> {
match bound {
Bound::Included(mut val) => {
val = self.params.try_cast(val)?;

Ok(Bound::Included(
self.inner.bound_key(&self.params, &val)?,
))
}
Bound::Excluded(mut val) => {
val = self.params.try_cast(val)?;

Ok(Bound::Excluded(
self.inner.bound_key(&self.params, &val)?,
))
}
Bound::Unbounded => Ok(Bound::Unbounded),
}
};
let (bound_min, bound_max) =
if matches!(index_meta.ty, IndexType::PrimaryKey { .. }) {
TableCodec::tuple_bound(table_name)
} else {
TableCodec::index_bound(table_name, &index_meta.id)?
};
let check_bound = |value: &mut Bound<Vec<u8>>, bound: Vec<u8>| {
if matches!(value, Bound::Unbounded) {
let _ = mem::replace(value, Bound::Included(bound));
}
};

let mut encode_min = bound_encode(min)?;
check_bound(&mut encode_min, bound_min);

let mut encode_max = bound_encode(max)?;
check_bound(&mut encode_max, bound_max);

let iter = self.params.tx.range(
encode_min.as_ref().map(Vec::as_slice),
encode_max.as_ref().map(Vec::as_slice),
)?;
self.state = IndexIterState::Range(iter);
}
};

let mut encode_min = bound_encode(min)?;
check_bound(&mut encode_min, bound_min);

let mut encode_max = bound_encode(max)?;
check_bound(&mut encode_max, bound_max);

let iter = self.params.tx.range(
encode_min.as_ref().map(Vec::as_slice),
encode_max.as_ref().map(Vec::as_slice),
)?;
self.scope_iter = Some(iter);
}
Range::Eq(mut val) => {
val = self.params.try_cast(val)?;

match self
.inner
.eq_to_res(&val, &mut self.id_builder, &self.params)?
{
IndexResult::Tuple(tuple) => {
if Self::offset_move(&mut self.offset) {
return self.next_tuple();
Range::Eq(mut val) => {
val = self.params.try_cast(val)?;

match self
.inner
.eq_to_res(&val, &mut self.id_builder, &self.params)?
{
IndexResult::Tuple(tuple) => {
if Self::offset_move(&mut self.offset) {
continue;
}
Self::limit_sub(&mut self.limit);
return Ok(tuple);
}
IndexResult::Scope(iter) => {
self.state = IndexIterState::Range(iter);
}
}
Self::limit_sub(&mut self.limit);
return Ok(tuple);
}
IndexResult::Scope(iter) => self.scope_iter = Some(iter),
_ => (),
}
}
IndexIterState::Range(iter) => {
while let Some((_, bytes)) = iter.try_next()? {
if Self::offset_move(&mut self.offset) {
continue;
}
Self::limit_sub(&mut self.limit);
let tuple =
self.inner
.index_lookup(&bytes, &mut self.id_builder, &self.params)?;

return Ok(Some(tuple));
}
self.state = IndexIterState::Init;
}
_ => (),
IndexIterState::Over => return Ok(None),
}
}
self.next_tuple()
}
}

Expand Down
4 changes: 2 additions & 2 deletions tpcc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ use clap::Parser;
use fnck_sql::db::{DBTransaction, DataBaseBuilder, Statement};
use fnck_sql::errors::DatabaseError;
use fnck_sql::storage::Storage;
use fnck_sql::types::tuple::create_table;
use rand::prelude::ThreadRng;
use rand::Rng;
use std::time::{Duration, Instant};
use fnck_sql::types::tuple::create_table;

mod delivery;
mod load;
Expand Down Expand Up @@ -320,7 +320,7 @@ pub enum TpccError {
}

#[test]
fn debug_tpcc() -> Result<(), DatabaseError> {
fn explain_tpcc() -> Result<(), DatabaseError> {
let database = DataBaseBuilder::path("./fnck_sql_tpcc").build()?;
let mut tx = database.new_transaction()?;

Expand Down

0 comments on commit f57ab88

Please sign in to comment.