Skip to content

Commit

Permalink
perf: encode the tablename prefix in TableCodec into a hash, enabli…
Browse files Browse the repository at this point in the history
…ng it to apply rocksdb prefix range
  • Loading branch information
KKould committed Dec 16, 2024
1 parent 6cb4190 commit 10f1eac
Show file tree
Hide file tree
Showing 11 changed files with 246 additions and 195 deletions.
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ run `cargo run -p tpcc --release` to run tpcc
- Tips: TPC-C currently only supports single thread
```shell
<90th Percentile RT (MaxRT)>
New-Order : 0.002 (0.122)
Payment : 0.001 (0.027)
Order-Status : 0.054 (0.144)
Delivery : 0.020 (0.056)
Stock-Level : 0.003 (0.015)
New-Order : 0.002 (0.004)
Payment : 0.001 (0.025)
Order-Status : 0.053 (0.175)
Delivery : 0.022 (0.027)
Stock-Level : 0.003 (0.019)
<TpmC>
7755 Tpmc
7815 tpmC
```
#### 👉[check more](tpcc/README.md)

Expand Down
4 changes: 2 additions & 2 deletions src/binder/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use super::*;
use crate::errors::DatabaseError;
use crate::planner::operator::copy_from_file::CopyFromFileOperator;
use crate::planner::operator::copy_to_file::CopyToFileOperator;
use crate::planner::operator::table_scan::TableScanOperator;
use crate::planner::operator::Operator;
use crate::planner::Childrens;
use fnck_sql_serde_macros::ReferenceSerialization;
Expand Down Expand Up @@ -96,11 +97,10 @@ impl<T: Transaction> Binder<'_, '_, T> {
// COPY <source_table> TO <dest_file>
Ok(LogicalPlan::new(
Operator::CopyToFile(CopyToFileOperator {
table: table.name.to_string(),
target: ext_source,
schema_ref,
}),
Childrens::None,
Childrens::Only(TableScanOperator::build(table_name, table)),
))
} else {
// COPY <dest_table> FROM <source_file>
Expand Down
2 changes: 2 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ pub enum DatabaseError {
DefaultNotExist,
#[error("column: {0} already exists")]
DuplicateColumn(String),
#[error("table or view: {0} hash already exists")]
DuplicateSourceHash(String),
#[error("index: {0} already exists")]
DuplicateIndex(String),
#[error("duplicate primary key")]
Expand Down
47 changes: 25 additions & 22 deletions src/execution/dml/copy_to_file.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
use crate::binder::copy::FileFormat;
use crate::errors::DatabaseError;
use crate::execution::{Executor, ReadExecutor};
use crate::execution::{build_read, Executor, ReadExecutor};
use crate::planner::operator::copy_to_file::CopyToFileOperator;
use crate::storage::{Iter, StatisticsMetaCache, TableCache, Transaction, ViewCache};
use crate::planner::LogicalPlan;
use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache};
use crate::throw;
use crate::types::tuple_builder::TupleBuilder;
use std::sync::Arc;
use std::ops::Coroutine;
use std::ops::CoroutineState;
use std::pin::Pin;

pub struct CopyToFile {
pub op: CopyToFileOperator,
op: CopyToFileOperator,
input: LogicalPlan,
}

impl From<CopyToFileOperator> for CopyToFile {
fn from(op: CopyToFileOperator) -> Self {
CopyToFile { op }
impl From<(CopyToFileOperator, LogicalPlan)> for CopyToFile {
fn from((op, input): (CopyToFileOperator, LogicalPlan)) -> Self {
CopyToFile { op, input }
}
}

Expand All @@ -27,20 +31,13 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for CopyToFile {
#[coroutine]
move || {
let mut writer = throw!(self.create_writer());
let CopyToFile { input, .. } = self;

let mut coroutine = build_read(input, cache, transaction);

while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) {
let tuple = throw!(tuple);

let mut iter = throw!(unsafe { &mut (*transaction) }.read(
cache.0,
Arc::new(self.op.table.clone()),
(None, None),
self.op
.schema_ref
.iter()
.enumerate()
.map(|(index, column_ref)| (index, column_ref.clone()))
.collect()
));

while let Some(tuple) = throw!(iter.next_tuple()) {
throw!(writer
.write_record(
tuple
Expand Down Expand Up @@ -96,6 +93,7 @@ mod tests {
use crate::catalog::{ColumnCatalog, ColumnDesc, ColumnRef, ColumnRelation, ColumnSummary};
use crate::db::{DataBaseBuilder, ResultIter};
use crate::errors::DatabaseError;
use crate::planner::operator::table_scan::TableScanOperator;
use crate::storage::Storage;
use crate::types::LogicalType;
use sqlparser::ast::CharLengthUnits;
Expand Down Expand Up @@ -158,7 +156,6 @@ mod tests {
let file_path = tmp_dir.path().join("test.csv");

let op = CopyToFileOperator {
table: "t1".to_string(),
target: ExtSource {
path: file_path.clone(),
format: FileFormat::Csv {
Expand All @@ -181,8 +178,14 @@ mod tests {

let storage = db.storage;
let mut transaction = storage.transaction()?;
let table = transaction
.table(&db.state.table_cache(), Arc::new("t1".to_string()))?
.unwrap();

let executor = CopyToFile { op: op.clone() };
let executor = CopyToFile {
op: op.clone(),
input: TableScanOperator::build(Arc::new("t1".to_string()), table),
};
let mut coroutine = executor.execute(
(
db.state.table_cache(),
Expand Down
6 changes: 5 additions & 1 deletion src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,11 @@ pub fn build_write<'a, T: Transaction + 'a>(
Operator::DropView(op) => DropView::from(op).execute_mut(cache, transaction),
Operator::Truncate(op) => Truncate::from(op).execute_mut(cache, transaction),
Operator::CopyFromFile(op) => CopyFromFile::from(op).execute_mut(cache, transaction),
Operator::CopyToFile(op) => CopyToFile::from(op).execute(cache, transaction),
Operator::CopyToFile(op) => {
let input = childrens.pop_only();

CopyToFile::from((op, input)).execute(cache, transaction)
}

Operator::Analyze(op) => {
let input = childrens.pop_only();
Expand Down
9 changes: 1 addition & 8 deletions src/planner/operator/copy_to_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::fmt::Formatter;

#[derive(Debug, PartialEq, Eq, Clone, Hash, ReferenceSerialization)]
pub struct CopyToFileOperator {
pub table: String,
pub target: ExtSource,
pub schema_ref: SchemaRef,
}
Expand All @@ -19,13 +18,7 @@ impl fmt::Display for CopyToFileOperator {
.iter()
.map(|column| column.name().to_string())
.join(", ");
write!(
f,
"Copy {} -> {} [{}]",
self.table,
self.target.path.display(),
columns
)?;
write!(f, "Copy To {} [{}]", self.target.path.display(), columns)?;

Ok(())
}
Expand Down
27 changes: 23 additions & 4 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,13 @@ pub trait Transaction: Sized {
) -> Result<(), DatabaseError> {
let (view_key, value) = unsafe { &*self.table_codec() }.encode_view(&view)?;

if !or_replace && self.get(&view_key)?.is_some() {
let already_exists = self.get(&view_key)?.is_some();
if !or_replace && already_exists {
return Err(DatabaseError::ViewExists);
}
if !already_exists {
self.check_name_hash(&view.name)?;
}
self.set(view_key, value)?;
let _ = view_cache.put(view.name.clone(), view);

Expand Down Expand Up @@ -347,6 +351,7 @@ pub trait Transaction: Sized {
}
return Err(DatabaseError::TableExists);
}
self.check_name_hash(&table_name)?;
self.create_index_meta_from_column(&mut table_catalog)?;
self.set(table_key, value)?;

Expand All @@ -362,13 +367,26 @@ pub trait Transaction: Sized {
Ok(table_name)
}

fn check_name_hash(&mut self, table_name: &TableName) -> Result<(), DatabaseError> {
let (hash_key, value) = unsafe { &*self.table_codec() }.encode_table_hash(table_name);
if self.get(&hash_key)?.is_some() {
return Err(DatabaseError::DuplicateSourceHash(table_name.to_string()));
}
self.set(hash_key, value)
}

fn drop_name_hash(&mut self, table_name: &TableName) -> Result<(), DatabaseError> {
self.remove(&unsafe { &*self.table_codec() }.encode_table_hash_key(table_name))
}

fn drop_view(
&mut self,
view_cache: &ViewCache,
table_cache: &TableCache,
view_name: TableName,
if_exists: bool,
) -> Result<(), DatabaseError> {
self.drop_name_hash(&view_name)?;
if self
.view(table_cache, view_cache, view_name.clone())?
.is_none()
Expand All @@ -392,6 +410,7 @@ pub trait Transaction: Sized {
table_name: TableName,
if_exists: bool,
) -> Result<(), DatabaseError> {
self.drop_name_hash(&table_name)?;
if self.table(table_cache, table_name.clone())?.is_none() {
if if_exists {
return Ok(());
Expand Down Expand Up @@ -590,9 +609,9 @@ pub trait Transaction: Sized {
let table_name = table.name.clone();
let mut primary_keys = Vec::new();

// FIXME: no clone
for col in table.columns().cloned().collect_vec() {
let col_id = col.id().unwrap();
let schema_ref = table.schema_ref().clone();
for col in schema_ref.iter() {
let col_id = col.id().ok_or(DatabaseError::PrimaryKeyNotFound)?;
let index_ty = if let Some(i) = col.desc().primary() {
primary_keys.push((i, col_id));
continue;
Expand Down
60 changes: 35 additions & 25 deletions src/storage/rocksdb.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::errors::DatabaseError;
use crate::storage::table_codec::{BumpBytes, Bytes, TableCodec};
use crate::storage::{InnerIter, Storage, Transaction};
use rocksdb::{DBIteratorWithThreadMode, Direction, IteratorMode, OptimisticTransactionDB};
use rocksdb::{
DBIteratorWithThreadMode, Direction, IteratorMode, OptimisticTransactionDB, SliceTransform,
};
use std::collections::Bound;
use std::path::PathBuf;
use std::sync::Arc;
Expand All @@ -15,10 +17,12 @@ impl RocksStorage {
pub fn new(path: impl Into<PathBuf> + Send) -> Result<Self, DatabaseError> {
let mut bb = rocksdb::BlockBasedOptions::default();
bb.set_block_cache(&rocksdb::Cache::new_lru_cache(40 * 1_024 * 1_024));
bb.set_whole_key_filtering(false);

let mut opts = rocksdb::Options::default();
opts.set_block_based_table_factory(&bb);
opts.create_if_missing(true);
opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(4));

let storage = OptimisticTransactionDB::open(&opts, path.into())?;

Expand Down Expand Up @@ -84,24 +88,38 @@ impl<'txn> Transaction for RocksTransaction<'txn> {
min: Bound<BumpBytes<'a>>,
max: Bound<BumpBytes<'a>>,
) -> Result<Self::IterType<'a>, DatabaseError> {
#[inline]
fn bound_to_include(bound: Bound<&[u8]>) -> Option<&[u8]> {
match bound {
Bound::Included(bytes) | Bound::Excluded(bytes) => Some(bytes),
Bound::Unbounded => None,
let min = match min {
Bound::Included(bytes) => Some(bytes),
Bound::Excluded(mut bytes) => {
// the prefix is the same, but the length is larger
bytes.push(0u8);
Some(bytes)
}
}

let lower = bound_to_include(min.as_ref().map(BumpBytes::as_slice))
Bound::Unbounded => None,
};
let lower = min
.as_ref()
.map(|bytes| IteratorMode::From(bytes, Direction::Forward))
.unwrap_or(IteratorMode::Start);

if let (Some(min_bytes), Bound::Included(max_bytes) | Bound::Excluded(max_bytes)) =
(&min, &max)
{
let len = min_bytes
.iter()
.zip(max_bytes.iter())
.take_while(|(x, y)| x == y)
.count();

debug_assert!(len > 0);
let mut iter = self.tx.prefix_iterator(&min_bytes[..len]);
iter.set_mode(lower);

return Ok(RocksIter { upper: max, iter });
}
let iter = self.tx.iterator(lower);

Ok(RocksIter {
lower: min,
upper: max,
iter,
})
Ok(RocksIter { upper: max, iter })
}

fn commit(self) -> Result<(), DatabaseError> {
Expand All @@ -111,15 +129,14 @@ impl<'txn> Transaction for RocksTransaction<'txn> {
}

pub struct RocksIter<'txn, 'iter> {
lower: Bound<BumpBytes<'iter>>,
upper: Bound<BumpBytes<'iter>>,
iter: DBIteratorWithThreadMode<'iter, rocksdb::Transaction<'txn, OptimisticTransactionDB>>,
}

impl InnerIter for RocksIter<'_, '_> {
#[inline]
fn try_next(&mut self) -> Result<Option<(Bytes, Bytes)>, DatabaseError> {
for result in self.iter.by_ref() {
if let Some(result) = self.iter.by_ref().next() {
let (key, value) = result?;
let upper_bound_check = match &self.upper {
Bound::Included(ref upper) => {
Expand All @@ -129,16 +146,9 @@ impl InnerIter for RocksIter<'_, '_> {
Bound::Unbounded => true,
};
if !upper_bound_check {
break;
}
let lower_bound_check = match &self.lower {
Bound::Included(ref lower) => key.as_ref() >= lower.as_slice(),
Bound::Excluded(ref lower) => key.as_ref() > lower.as_slice(),
Bound::Unbounded => true,
};
if lower_bound_check {
return Ok(Some((Vec::from(key), Vec::from(value))));
return Ok(None);
}
return Ok(Some((Vec::from(key), Vec::from(value))));
}
Ok(None)
}
Expand Down
Loading

0 comments on commit 10f1eac

Please sign in to comment.