Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perf/tpcc #254

Merged
merged 2 commits into from
Dec 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

[package]
name = "fnck_sql"
version = "0.0.7"
version = "0.0.8"
edition = "2021"
authors = ["Kould <[email protected]>", "Xwg <[email protected]>"]
description = "SQL as a Function for Rust"
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ run `cargo run -p tpcc --release` to run tpcc
- Tips: TPC-C currently only supports single thread
```shell
<90th Percentile RT (MaxRT)>
New-Order : 0.003 (0.006)
New-Order : 0.003 (0.012)
Payment : 0.001 (0.003)
Order-Status : 0.062 (0.188)
Delivery : 0.022 (0.052)
Order-Status : 0.054 (0.188)
Delivery : 0.021 (0.049)
Stock-Level : 0.004 (0.006)
<TpmC>
6669 Tpmc
7345 Tpmc
```
#### 👉[check more](tpcc/README.md)

Expand Down
5 changes: 3 additions & 2 deletions src/binder/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ impl<T: Transaction> Binder<'_, '_, T> {
return Err(DatabaseError::UnsupportedStmt("'COPY SOURCE'".to_string()));
}
};
let table_name = Arc::new(lower_case_name(&table_name)?);

if let Some(table) = self.context.table(Arc::new(table_name.to_string()))? {
if let Some(table) = self.context.table(table_name.clone())? {
let schema_ref = table.schema_ref().clone();
let ext_source = ExtSource {
path: match target {
Expand All @@ -107,7 +108,7 @@ impl<T: Transaction> Binder<'_, '_, T> {
Operator::CopyFromFile(CopyFromFileOperator {
source: ext_source,
schema_ref,
table: table_name.to_string(),
table: table_name,
}),
Childrens::None,
))
Expand Down
37 changes: 31 additions & 6 deletions src/catalog/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::{slice, vec};
use ulid::Generator;

pub type TableName = Arc<String>;
pub type PrimaryKeyIndices = Arc<Vec<usize>>;

#[derive(Debug, Clone, PartialEq)]
pub struct TableCatalog {
Expand All @@ -22,6 +23,7 @@ pub struct TableCatalog {

schema_ref: SchemaRef,
primary_keys: Vec<(usize, ColumnRef)>,
primary_key_indices: PrimaryKeyIndices,
primary_key_type: Option<LogicalType>,
}

Expand All @@ -32,6 +34,10 @@ pub struct TableMeta {
}

impl TableCatalog {
pub(crate) fn name(&self) -> &TableName {
&self.name
}

pub(crate) fn get_unique_index(&self, col_id: &ColumnId) -> Option<&IndexMetaRef> {
self.indexes
.iter()
Expand Down Expand Up @@ -79,6 +85,10 @@ impl TableCatalog {
&self.primary_keys
}

pub(crate) fn primary_keys_indices(&self) -> &PrimaryKeyIndices {
&self.primary_key_indices
}

pub(crate) fn types(&self) -> Vec<LogicalType> {
self.columns()
.map(|column| column.datatype().clone())
Expand Down Expand Up @@ -186,6 +196,7 @@ impl TableCatalog {
indexes: vec![],
schema_ref: Arc::new(vec![]),
primary_keys: vec![],
primary_key_indices: Default::default(),
primary_key_type: None,
};
let mut generator = Generator::new();
Expand All @@ -194,7 +205,11 @@ impl TableCatalog {
.add_column(col_catalog, &mut generator)
.unwrap();
}
table_catalog.primary_keys = Self::build_primary_keys(&table_catalog.schema_ref);
let (primary_keys, primary_key_indices) =
Self::build_primary_keys(&table_catalog.schema_ref);

table_catalog.primary_keys = primary_keys;
table_catalog.primary_key_indices = primary_key_indices;

Ok(table_catalog)
}
Expand All @@ -216,7 +231,7 @@ impl TableCatalog {
columns.insert(column_id, i);
}
let schema_ref = Arc::new(column_refs.clone());
let primary_keys = Self::build_primary_keys(&schema_ref);
let (primary_keys, primary_key_indices) = Self::build_primary_keys(&schema_ref);

Ok(TableCatalog {
name,
Expand All @@ -225,12 +240,18 @@ impl TableCatalog {
indexes,
schema_ref,
primary_keys,
primary_key_indices,
primary_key_type: None,
})
}

fn build_primary_keys(schema_ref: &Arc<Vec<ColumnRef>>) -> Vec<(usize, ColumnRef)> {
schema_ref
fn build_primary_keys(
schema_ref: &Arc<Vec<ColumnRef>>,
) -> (Vec<(usize, ColumnRef)>, PrimaryKeyIndices) {
let mut primary_keys = Vec::new();
let mut primary_key_indices = Vec::new();

for (_, (i, column)) in schema_ref
.iter()
.enumerate()
.filter_map(|(i, column)| {
Expand All @@ -240,8 +261,12 @@ impl TableCatalog {
.map(|p_i| (p_i, (i, column.clone())))
})
.sorted_by_key(|(p_i, _)| *p_i)
.map(|(_, entry)| entry)
.collect_vec()
{
primary_key_indices.push(i);
primary_keys.push((i, column));
}

(primary_keys, Arc::new(primary_key_indices))
}
}

Expand Down
18 changes: 6 additions & 12 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,10 +532,10 @@ pub(crate) mod test {
);
assert_eq!(
iter.next().unwrap()?,
Tuple {
id: None,
values: vec![DataValue::Date32(Some(Local::now().num_days_from_ce()))],
}
Tuple::new(
None,
vec![DataValue::Date32(Some(Local::now().num_days_from_ce()))]
)
);
assert!(iter.next().is_none());

Expand All @@ -562,17 +562,11 @@ pub(crate) mod test {
assert_eq!(iter.schema(), &Arc::new(vec![ColumnRef::from(column)]));
assert_eq!(
iter.next().unwrap()?,
Tuple {
id: None,
values: vec![DataValue::Int32(Some(3))],
}
Tuple::new(None, vec![DataValue::Int32(Some(3))])
);
assert_eq!(
iter.next().unwrap()?,
Tuple {
id: None,
values: vec![DataValue::Int32(Some(4))],
}
Tuple::new(None, vec![DataValue::Int32(Some(4))])
);
Ok(())
}
Expand Down
21 changes: 14 additions & 7 deletions src/execution/ddl/add_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for AddColumn {
fn execute_mut(
mut self,
cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache),
transaction: &'a mut T,
transaction: *mut T,
) -> Executor<'a> {
Box::new(
#[coroutine]
Expand Down Expand Up @@ -55,7 +55,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for AddColumn {

if let Some(value) = throw!(column.default_value()) {
if let Some(unique_values) = &mut unique_values {
unique_values.push((tuple.id.clone().unwrap(), value.clone()));
unique_values.push((tuple.id().unwrap().clone(), value.clone()));
}
tuple.values.push(value);
} else {
Expand All @@ -66,21 +66,28 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for AddColumn {
drop(coroutine);

for tuple in tuples {
throw!(transaction.append_tuple(table_name, tuple, &types, true));
throw!(unsafe { &mut (*transaction) }
.append_tuple(table_name, tuple, &types, true));
}
let col_id =
throw!(transaction.add_column(cache.0, table_name, column, *if_not_exists));
let col_id = throw!(unsafe { &mut (*transaction) }.add_column(
cache.0,
table_name,
column,
*if_not_exists
));

// Unique Index
if let (Some(unique_values), Some(unique_meta)) = (
unique_values,
throw!(transaction.table(cache.0, table_name.clone()))
throw!(unsafe { &mut (*transaction) }.table(cache.0, table_name.clone()))
.and_then(|table| table.get_unique_index(&col_id))
.cloned(),
) {
for (tuple_id, value) in unique_values {
let index = Index::new(unique_meta.id, &value, IndexType::Unique);
throw!(transaction.add_index(table_name, index, &tuple_id));
throw!(
unsafe { &mut (*transaction) }.add_index(table_name, index, &tuple_id)
);
}
}

Expand Down
30 changes: 15 additions & 15 deletions src/execution/ddl/create_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for CreateIndex {
fn execute_mut(
mut self,
cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache),
transaction: &'a mut T,
transaction: *mut T,
) -> Executor<'a> {
Box::new(
#[coroutine]
Expand All @@ -52,7 +52,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for CreateIndex {
})
.unzip();
let schema = self.input.output_schema().clone();
let index_id = match transaction.add_index_meta(
let index_id = match unsafe { &mut (*transaction) }.add_index_meta(
cache.0,
&table_name,
index_name,
Expand All @@ -69,29 +69,29 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for CreateIndex {
}
err => throw!(err),
};
let mut index_values = Vec::new();
let mut coroutine = build_read(self.input, cache, transaction);

while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) {
let mut tuple: Tuple = throw!(tuple);

let tuple_id = if let Some(tuple_id) = tuple.id.take() {
tuple_id
} else {
let Some(value) = DataValue::values_to_tuple(throw!(Projection::projection(
&tuple,
&column_exprs,
&schema
))) else {
continue;
};
index_values.push((
tuple_id,
throw!(Projection::projection(&tuple, &column_exprs, &schema)),
));
}
drop(coroutine);
for (tuple_id, values) in index_values {
let Some(value) = DataValue::values_to_tuple(values) else {
let tuple_id = if let Some(tuple_id) = tuple.id().take() {
tuple_id
} else {
continue;
};
let index = Index::new(index_id, &value, ty);
throw!(transaction.add_index(table_name.as_str(), index, &tuple_id));
throw!(unsafe { &mut (*transaction) }.add_index(
table_name.as_str(),
index,
tuple_id
));
}
yield Ok(TupleBuilder::build_result("1".to_string()));
},
Expand Down
4 changes: 2 additions & 2 deletions src/execution/ddl/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for CreateTable {
fn execute_mut(
self,
(table_cache, _, _): (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache),
transaction: &'a mut T,
transaction: *mut T,
) -> Executor<'a> {
Box::new(
#[coroutine]
Expand All @@ -29,7 +29,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for CreateTable {
if_not_exists,
} = self.op;

let _ = throw!(transaction.create_table(
let _ = throw!(unsafe { &mut (*transaction) }.create_table(
table_cache,
table_name.clone(),
columns,
Expand Down
4 changes: 2 additions & 2 deletions src/execution/ddl/create_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for CreateView {
fn execute_mut(
self,
(_, view_cache, _): (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache),
transaction: &'a mut T,
transaction: *mut T,
) -> Executor<'a> {
Box::new(
#[coroutine]
move || {
let CreateViewOperator { view, or_replace } = self.op;

let result_tuple = TupleBuilder::build_result(format!("{}", view.name));
throw!(transaction.create_view(view_cache, view, or_replace));
throw!(unsafe { &mut (*transaction) }.create_view(view_cache, view, or_replace));

yield Ok(result_tuple);
},
Expand Down
16 changes: 13 additions & 3 deletions src/execution/ddl/drop_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for DropColumn {
fn execute_mut(
mut self,
cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache),
transaction: &'a mut T,
transaction: *mut T,
) -> Executor<'a> {
Box::new(
#[coroutine]
Expand Down Expand Up @@ -67,9 +67,19 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for DropColumn {
}
drop(coroutine);
for tuple in tuples {
throw!(transaction.append_tuple(&table_name, tuple, &types, true));
throw!(unsafe { &mut (*transaction) }.append_tuple(
&table_name,
tuple,
&types,
true
));
}
throw!(transaction.drop_column(cache.0, cache.2, &table_name, &column_name));
throw!(unsafe { &mut (*transaction) }.drop_column(
cache.0,
cache.2,
&table_name,
&column_name
));

yield Ok(TupleBuilder::build_result("1".to_string()));
} else if if_exists {
Expand Down
8 changes: 6 additions & 2 deletions src/execution/ddl/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for DropTable {
fn execute_mut(
self,
(table_cache, _, _): (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache),
transaction: &'a mut T,
transaction: *mut T,
) -> Executor<'a> {
Box::new(
#[coroutine]
Expand All @@ -28,7 +28,11 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for DropTable {
if_exists,
} = self.op;

throw!(transaction.drop_table(table_cache, table_name.clone(), if_exists));
throw!(unsafe { &mut (*transaction) }.drop_table(
table_cache,
table_name.clone(),
if_exists
));

yield Ok(TupleBuilder::build_result(format!("{}", table_name)));
},
Expand Down
Loading
Loading