Skip to content

Commit

Permalink
perf: tpcc
Browse files Browse the repository at this point in the history
- `Tuple` id to use mapping to get the primary key
- `Executor` uses `*mut Transaction` to avoid materialization problems caused by mutable references
  • Loading branch information
KKould committed Dec 8, 2024
1 parent cfab5b4 commit c66585a
Show file tree
Hide file tree
Showing 47 changed files with 730 additions and 632 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

[package]
name = "fnck_sql"
version = "0.0.7"
version = "0.0.8"
edition = "2021"
authors = ["Kould <[email protected]>", "Xwg <[email protected]>"]
description = "SQL as a Function for Rust"
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ run `cargo run -p tpcc --release` to run tpcc
- Tips: TPC-C currently only supports single thread
```shell
<90th Percentile RT (MaxRT)>
New-Order : 0.003 (0.006)
New-Order : 0.003 (0.012)
Payment : 0.001 (0.003)
Order-Status : 0.062 (0.188)
Delivery : 0.022 (0.052)
Order-Status : 0.054 (0.188)
Delivery : 0.021 (0.049)
Stock-Level : 0.004 (0.006)
<TpmC>
6669 Tpmc
7345 Tpmc
```
#### 👉[check more](tpcc/README.md)

Expand Down
5 changes: 3 additions & 2 deletions src/binder/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ impl<T: Transaction> Binder<'_, '_, T> {
return Err(DatabaseError::UnsupportedStmt("'COPY SOURCE'".to_string()));
}
};
let table_name = Arc::new(lower_case_name(&table_name)?);

if let Some(table) = self.context.table(Arc::new(table_name.to_string()))? {
if let Some(table) = self.context.table(table_name.clone())? {
let schema_ref = table.schema_ref().clone();
let ext_source = ExtSource {
path: match target {
Expand All @@ -107,7 +108,7 @@ impl<T: Transaction> Binder<'_, '_, T> {
Operator::CopyFromFile(CopyFromFileOperator {
source: ext_source,
schema_ref,
table: table_name.to_string(),
table: table_name,
}),
Childrens::None,
))
Expand Down
37 changes: 31 additions & 6 deletions src/catalog/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::{slice, vec};
use ulid::Generator;

pub type TableName = Arc<String>;
pub type PrimaryKeyIndices = Arc<Vec<usize>>;

#[derive(Debug, Clone, PartialEq)]
pub struct TableCatalog {
Expand All @@ -22,6 +23,7 @@ pub struct TableCatalog {

schema_ref: SchemaRef,
primary_keys: Vec<(usize, ColumnRef)>,
primary_key_indices: PrimaryKeyIndices,
primary_key_type: Option<LogicalType>,
}

Expand All @@ -32,6 +34,10 @@ pub struct TableMeta {
}

impl TableCatalog {
pub(crate) fn name(&self) -> &TableName {
&self.name
}

pub(crate) fn get_unique_index(&self, col_id: &ColumnId) -> Option<&IndexMetaRef> {
self.indexes
.iter()
Expand Down Expand Up @@ -79,6 +85,10 @@ impl TableCatalog {
&self.primary_keys
}

pub(crate) fn primary_keys_indices(&self) -> &PrimaryKeyIndices {
&self.primary_key_indices
}

pub(crate) fn types(&self) -> Vec<LogicalType> {
self.columns()
.map(|column| column.datatype().clone())
Expand Down Expand Up @@ -186,6 +196,7 @@ impl TableCatalog {
indexes: vec![],
schema_ref: Arc::new(vec![]),
primary_keys: vec![],
primary_key_indices: Default::default(),
primary_key_type: None,
};
let mut generator = Generator::new();
Expand All @@ -194,7 +205,11 @@ impl TableCatalog {
.add_column(col_catalog, &mut generator)
.unwrap();
}
table_catalog.primary_keys = Self::build_primary_keys(&table_catalog.schema_ref);
let (primary_keys, primary_key_indices) =
Self::build_primary_keys(&table_catalog.schema_ref);

table_catalog.primary_keys = primary_keys;
table_catalog.primary_key_indices = primary_key_indices;

Ok(table_catalog)
}
Expand All @@ -216,7 +231,7 @@ impl TableCatalog {
columns.insert(column_id, i);
}
let schema_ref = Arc::new(column_refs.clone());
let primary_keys = Self::build_primary_keys(&schema_ref);
let (primary_keys, primary_key_indices) = Self::build_primary_keys(&schema_ref);

Ok(TableCatalog {
name,
Expand All @@ -225,12 +240,18 @@ impl TableCatalog {
indexes,
schema_ref,
primary_keys,
primary_key_indices,
primary_key_type: None,
})
}

fn build_primary_keys(schema_ref: &Arc<Vec<ColumnRef>>) -> Vec<(usize, ColumnRef)> {
schema_ref
fn build_primary_keys(
schema_ref: &Arc<Vec<ColumnRef>>,
) -> (Vec<(usize, ColumnRef)>, PrimaryKeyIndices) {
let mut primary_keys = Vec::new();
let mut primary_key_indices = Vec::new();

for (_, (i, column)) in schema_ref
.iter()
.enumerate()
.filter_map(|(i, column)| {
Expand All @@ -240,8 +261,12 @@ impl TableCatalog {
.map(|p_i| (p_i, (i, column.clone())))
})
.sorted_by_key(|(p_i, _)| *p_i)
.map(|(_, entry)| entry)
.collect_vec()
{
primary_key_indices.push(i);
primary_keys.push((i, column));
}

(primary_keys, Arc::new(primary_key_indices))
}
}

Expand Down
18 changes: 6 additions & 12 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,10 +532,10 @@ pub(crate) mod test {
);
assert_eq!(
iter.next().unwrap()?,
Tuple {
id: None,
values: vec![DataValue::Date32(Some(Local::now().num_days_from_ce()))],
}
Tuple::new(
None,
vec![DataValue::Date32(Some(Local::now().num_days_from_ce()))]
)
);
assert!(iter.next().is_none());

Expand All @@ -562,17 +562,11 @@ pub(crate) mod test {
assert_eq!(iter.schema(), &Arc::new(vec![ColumnRef::from(column)]));
assert_eq!(
iter.next().unwrap()?,
Tuple {
id: None,
values: vec![DataValue::Int32(Some(3))],
}
Tuple::new(None, vec![DataValue::Int32(Some(3))])
);
assert_eq!(
iter.next().unwrap()?,
Tuple {
id: None,
values: vec![DataValue::Int32(Some(4))],
}
Tuple::new(None, vec![DataValue::Int32(Some(4))])
);
Ok(())
}
Expand Down
21 changes: 14 additions & 7 deletions src/execution/ddl/add_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for AddColumn {
fn execute_mut(
mut self,
cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache),
transaction: &'a mut T,
transaction: *mut T,
) -> Executor<'a> {
Box::new(
#[coroutine]
Expand Down Expand Up @@ -55,7 +55,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for AddColumn {

if let Some(value) = throw!(column.default_value()) {
if let Some(unique_values) = &mut unique_values {
unique_values.push((tuple.id.clone().unwrap(), value.clone()));
unique_values.push((tuple.id().unwrap().clone(), value.clone()));
}
tuple.values.push(value);
} else {
Expand All @@ -66,21 +66,28 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for AddColumn {
drop(coroutine);

for tuple in tuples {
throw!(transaction.append_tuple(table_name, tuple, &types, true));
throw!(unsafe { &mut (*transaction) }
.append_tuple(table_name, tuple, &types, true));
}
let col_id =
throw!(transaction.add_column(cache.0, table_name, column, *if_not_exists));
let col_id = throw!(unsafe { &mut (*transaction) }.add_column(
cache.0,
table_name,
column,
*if_not_exists
));

// Unique Index
if let (Some(unique_values), Some(unique_meta)) = (
unique_values,
throw!(transaction.table(cache.0, table_name.clone()))
throw!(unsafe { &mut (*transaction) }.table(cache.0, table_name.clone()))
.and_then(|table| table.get_unique_index(&col_id))
.cloned(),
) {
for (tuple_id, value) in unique_values {
let index = Index::new(unique_meta.id, &value, IndexType::Unique);
throw!(transaction.add_index(table_name, index, &tuple_id));
throw!(
unsafe { &mut (*transaction) }.add_index(table_name, index, &tuple_id)
);
}
}

Expand Down
30 changes: 15 additions & 15 deletions src/execution/ddl/create_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for CreateIndex {
fn execute_mut(
mut self,
cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache),
transaction: &'a mut T,
transaction: *mut T,
) -> Executor<'a> {
Box::new(
#[coroutine]
Expand All @@ -52,7 +52,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for CreateIndex {
})
.unzip();
let schema = self.input.output_schema().clone();
let index_id = match transaction.add_index_meta(
let index_id = match unsafe { &mut (*transaction) }.add_index_meta(
cache.0,
&table_name,
index_name,
Expand All @@ -69,29 +69,29 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for CreateIndex {
}
err => throw!(err),
};
let mut index_values = Vec::new();
let mut coroutine = build_read(self.input, cache, transaction);

while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) {
let mut tuple: Tuple = throw!(tuple);

let tuple_id = if let Some(tuple_id) = tuple.id.take() {
tuple_id
} else {
let Some(value) = DataValue::values_to_tuple(throw!(Projection::projection(
&tuple,
&column_exprs,
&schema
))) else {
continue;
};
index_values.push((
tuple_id,
throw!(Projection::projection(&tuple, &column_exprs, &schema)),
));
}
drop(coroutine);
for (tuple_id, values) in index_values {
let Some(value) = DataValue::values_to_tuple(values) else {
let tuple_id = if let Some(tuple_id) = tuple.id().take() {
tuple_id
} else {
continue;
};
let index = Index::new(index_id, &value, ty);
throw!(transaction.add_index(table_name.as_str(), index, &tuple_id));
throw!(unsafe { &mut (*transaction) }.add_index(
table_name.as_str(),
index,
tuple_id
));
}
yield Ok(TupleBuilder::build_result("1".to_string()));
},
Expand Down
4 changes: 2 additions & 2 deletions src/execution/ddl/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for CreateTable {
fn execute_mut(
self,
(table_cache, _, _): (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache),
transaction: &'a mut T,
transaction: *mut T,
) -> Executor<'a> {
Box::new(
#[coroutine]
Expand All @@ -29,7 +29,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for CreateTable {
if_not_exists,
} = self.op;

let _ = throw!(transaction.create_table(
let _ = throw!(unsafe { &mut (*transaction) }.create_table(
table_cache,
table_name.clone(),
columns,
Expand Down
4 changes: 2 additions & 2 deletions src/execution/ddl/create_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for CreateView {
fn execute_mut(
self,
(_, view_cache, _): (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache),
transaction: &'a mut T,
transaction: *mut T,
) -> Executor<'a> {
Box::new(
#[coroutine]
move || {
let CreateViewOperator { view, or_replace } = self.op;

let result_tuple = TupleBuilder::build_result(format!("{}", view.name));
throw!(transaction.create_view(view_cache, view, or_replace));
throw!(unsafe { &mut (*transaction) }.create_view(view_cache, view, or_replace));

yield Ok(result_tuple);
},
Expand Down
16 changes: 13 additions & 3 deletions src/execution/ddl/drop_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for DropColumn {
fn execute_mut(
mut self,
cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache),
transaction: &'a mut T,
transaction: *mut T,
) -> Executor<'a> {
Box::new(
#[coroutine]
Expand Down Expand Up @@ -67,9 +67,19 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for DropColumn {
}
drop(coroutine);
for tuple in tuples {
throw!(transaction.append_tuple(&table_name, tuple, &types, true));
throw!(unsafe { &mut (*transaction) }.append_tuple(
&table_name,
tuple,
&types,
true
));
}
throw!(transaction.drop_column(cache.0, cache.2, &table_name, &column_name));
throw!(unsafe { &mut (*transaction) }.drop_column(
cache.0,
cache.2,
&table_name,
&column_name
));

yield Ok(TupleBuilder::build_result("1".to_string()));
} else if if_exists {
Expand Down
8 changes: 6 additions & 2 deletions src/execution/ddl/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for DropTable {
fn execute_mut(
self,
(table_cache, _, _): (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache),
transaction: &'a mut T,
transaction: *mut T,
) -> Executor<'a> {
Box::new(
#[coroutine]
Expand All @@ -28,7 +28,11 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for DropTable {
if_exists,
} = self.op;

throw!(transaction.drop_table(table_cache, table_name.clone(), if_exists));
throw!(unsafe { &mut (*transaction) }.drop_table(
table_cache,
table_name.clone(),
if_exists
));

yield Ok(TupleBuilder::build_result(format!("{}", table_name)));
},
Expand Down
Loading

0 comments on commit c66585a

Please sign in to comment.