Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: supplement decimal’s memcomparable encode #256

Merged
merged 2 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ run `cargo run -p tpcc --release` to run tpcc

- i9-13900HX
- 32.0 GB
- YMTC PC411-1024GB-B
- KIOXIA-EXCERIA PLUS G3 SSD
- Tips: TPC-C currently only supports single thread
```shell
<90th Percentile RT (MaxRT)>
Expand Down
36 changes: 22 additions & 14 deletions src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,19 +172,25 @@ impl SimpleQueryHandler for SessionBackend {
_ => {
let mut guard = self.tx.lock();

let iter = if let Some(transaction) = guard.as_mut() {
unsafe { transaction.as_mut().run(query) }.map(Box::new)
as Result<Box<dyn ResultIter>, _>
} else {
self.inner.run(query).map(Box::new)
}
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;

let mut tuples = Vec::new();
for tuple in iter {
tuples.push(tuple.map_err(|e| PgWireError::ApiError(Box::new(e)))?);
}
Ok(vec![Response::Query(encode_tuples(iter.schema(), tuples)?)])
let response = if let Some(transaction) = guard.as_mut() {
let mut iter = unsafe { transaction.as_mut().run(query) }
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
for tuple in iter.by_ref() {
tuples.push(tuple.map_err(|e| PgWireError::ApiError(Box::new(e)))?);
}
encode_tuples(iter.schema(), tuples)?
} else {
let mut iter = self
.inner
.run(query)
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
for tuple in iter.by_ref() {
tuples.push(tuple.map_err(|e| PgWireError::ApiError(Box::new(e)))?);
}
encode_tuples(iter.schema(), tuples)?
};
Ok(vec![Response::Query(response)])
}
}
}
Expand Down Expand Up @@ -235,7 +241,9 @@ fn encode_tuples<'a>(schema: &Schema, tuples: Vec<Tuple>) -> PgWireResult<QueryR
LogicalType::Date => encoder.encode_field(&value.date()),
LogicalType::DateTime => encoder.encode_field(&value.datetime()),
LogicalType::Time => encoder.encode_field(&value.time()),
LogicalType::Decimal(_, _) => todo!(),
LogicalType::Decimal(_, _) => {
encoder.encode_field(&value.decimal().map(|decimal| decimal.to_string()))
}
_ => unreachable!(),
}?;
}
Expand All @@ -260,7 +268,7 @@ fn into_pg_type(data_type: &LogicalType) -> PgWireResult<Type> {
LogicalType::Date | LogicalType::DateTime => Type::DATE,
LogicalType::Char(..) => Type::CHAR,
LogicalType::Time => Type::TIME,
LogicalType::Decimal(_, _) => todo!(),
LogicalType::Decimal(_, _) => Type::FLOAT8,
_ => {
return Err(PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
Expand Down
23 changes: 6 additions & 17 deletions src/binder/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,23 +64,12 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
Childrens::Only(plan),
)
}
AlterTableOperation::DropPrimaryKey => todo!(),
AlterTableOperation::RenameColumn {
old_column_name: _,
new_column_name: _,
} => todo!(),
AlterTableOperation::RenameTable { table_name: _ } => todo!(),
AlterTableOperation::ChangeColumn {
old_name: _,
new_name: _,
data_type: _,
options: _,
} => todo!(),
AlterTableOperation::AlterColumn {
column_name: _,
op: _,
} => todo!(),
_ => todo!(),
op => {
return Err(DatabaseError::UnsupportedStmt(format!(
"AlertOperation: {:?}",
op
)))
}
};

Ok(plan)
Expand Down
7 changes: 6 additions & 1 deletion src/binder/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,12 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
let ext_source = ExtSource {
path: match target {
CopyTarget::File { filename } => filename.into(),
t => todo!("unsupported copy target: {:?}", t),
t => {
return Err(DatabaseError::UnsupportedStmt(format!(
"copy target: {:?}",
t
)))
}
},
format: FileFormat::from_options(options),
};
Expand Down
23 changes: 16 additions & 7 deletions src/binder/create_table.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
use itertools::Itertools;
use sqlparser::ast::{ColumnDef, ColumnOption, ObjectName, TableConstraint};
use std::collections::HashSet;
use std::sync::Arc;

use super::{is_valid_identifier, Binder};
use crate::binder::lower_case_name;
use crate::catalog::{ColumnCatalog, ColumnDesc};
Expand All @@ -14,6 +9,10 @@ use crate::planner::{Childrens, LogicalPlan};
use crate::storage::Transaction;
use crate::types::value::DataValue;
use crate::types::LogicalType;
use itertools::Itertools;
use sqlparser::ast::{ColumnDef, ColumnOption, ObjectName, TableConstraint};
use std::collections::HashSet;
use std::sync::Arc;

impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A> {
// TODO: TableConstraint
Expand Down Expand Up @@ -75,7 +74,12 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
}
}
}
_ => todo!(),
constraint => {
return Err(DatabaseError::UnsupportedStmt(format!(
"`CreateTable` does not currently support this constraint: {:?}",
constraint
)))?
}
}
}

Expand Down Expand Up @@ -140,7 +144,12 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
}
column_desc.default = Some(expr);
}
_ => todo!(),
option => {
return Err(DatabaseError::UnsupportedStmt(format!(
"`Column` does not currently support this option: {:?}",
option
)))
}
}
}

Expand Down
9 changes: 7 additions & 2 deletions src/binder/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl<'a, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, '_, T
.find_map(|(key, value)| (key == name).then(|| value.clone()))
.ok_or_else(|| DatabaseError::ParametersNotFound(name.to_string()))?
} else {
v.into()
v.try_into()?
};
Ok(ScalarExpression::Constant(value))
}
Expand Down Expand Up @@ -473,7 +473,12 @@ impl<'a, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, '_, T
match arg_expr {
FunctionArgExpr::Expr(expr) => args.push(self.bind_expr(expr)?),
FunctionArgExpr::Wildcard => args.push(Self::wildcard_expr()),
_ => todo!(),
expr => {
return Err(DatabaseError::UnsupportedStmt(format!(
"function arg: {:#?}",
expr
)))
}
}
}
let function_name = func.name.to_string().to_lowercase();
Expand Down
34 changes: 25 additions & 9 deletions src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,24 +350,37 @@ impl<'a, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, '
names,
if_exists,
..
} => match object_type {
// todo handle all names
ObjectType::Table => self.bind_drop_table(&names[0], if_exists)?,
// todo handle all names
ObjectType::View => self.bind_drop_view(&names[0], if_exists)?,
_ => todo!(),
},
} => {
if names.len() > 1 {
return Err(DatabaseError::UnsupportedStmt(
"only Drop a single `Table` or `View` is allowed".to_string(),
));
}
match object_type {
ObjectType::Table => self.bind_drop_table(&names[0], if_exists)?,
ObjectType::View => self.bind_drop_view(&names[0], if_exists)?,
_ => {
return Err(DatabaseError::UnsupportedStmt(
"only `Table` and `View` are allowed to be Dropped".to_string(),
))
}
}
}
Statement::Insert {
table_name,
columns,
source,
overwrite,
..
} => {
// TODO: support body on Insert
if let SetExpr::Values(values) = source.body.as_ref() {
self.bind_insert(table_name, columns, &values.rows, *overwrite, false)?
} else {
todo!()
return Err(DatabaseError::UnsupportedStmt(format!(
"insert body: {:#?}",
source.body
)));
}
}
Statement::Update {
Expand Down Expand Up @@ -442,7 +455,10 @@ impl<'a, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, '
left,
right,
} => self.bind_set_operation(op, set_quantifier, left, right),
_ => todo!(),
expr => Err(DatabaseError::UnsupportedStmt(format!(
"set expression: {:?}",
expr
))),
}
}

Expand Down
38 changes: 14 additions & 24 deletions src/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,12 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'
left,
right,
} => self.bind_set_operation(op, set_quantifier, left, right),
_ => unimplemented!(),
expr => {
return Err(DatabaseError::UnsupportedStmt(format!(
"query body: {:?}",
expr
)))
}
}?;

let limit = &query.limit;
Expand Down Expand Up @@ -136,16 +141,7 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'
plan = self.bind_project(plan, select_list)?;
}

if let Some(SelectInto {
name,
unlogged,
temporary,
..
}) = &select.into
{
if *unlogged || *temporary {
todo!()
}
if let Some(SelectInto { name, .. }) = &select.into {
plan = LogicalPlan::new(
Operator::Insert(InsertOperator {
table_name: Arc::new(lower_case_name(name)?),
Expand Down Expand Up @@ -234,18 +230,10 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'
distinct_exprs,
))
}
(SetOperator::Intersect, true) => {
todo!()
}
(SetOperator::Intersect, false) => {
todo!()
}
(SetOperator::Except, true) => {
todo!()
}
(SetOperator::Except, false) => {
todo!()
}
(set_operator, _) => Err(DatabaseError::UnsupportedStmt(format!(
"set operator: {:?}",
set_operator
))),
}
}

Expand Down Expand Up @@ -287,7 +275,9 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'
}) = alias
{
if tables.len() > 1 {
todo!("Implement virtual tables for multiple table aliases");
return Err(DatabaseError::UnsupportedStmt(
"Implement virtual tables for multiple table aliases".to_string(),
));
}
let table_alias = Arc::new(name.value.to_lowercase());

Expand Down
2 changes: 1 addition & 1 deletion src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ impl<S: Storage> Database<S> {
}
}

pub trait ResultIter: Iterator<Item = Result<Tuple, DatabaseError>> + Sized {
pub trait ResultIter: Iterator<Item = Result<Tuple, DatabaseError>> {
fn schema(&self) -> &SchemaRef;

fn done(self) -> Result<(), DatabaseError>;
Expand Down
2 changes: 2 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ pub enum DatabaseError {
InvalidTable(String),
#[error("invalid type")]
InvalidType,
#[error("invalid value: {0}")]
InvalidValue(String),
#[error("io: {0}")]
IO(
#[source]
Expand Down
2 changes: 1 addition & 1 deletion src/planner/operator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ impl fmt::Display for Operator {
Operator::DropView(op) => write!(f, "{}", op),
Operator::Truncate(op) => write!(f, "{}", op),
Operator::CopyFromFile(op) => write!(f, "{}", op),
Operator::CopyToFile(_) => todo!(),
Operator::CopyToFile(op) => write!(f, "{}", op),
Operator::Union(op) => write!(f, "{}", op),
}
}
Expand Down
Loading
Loading