Skip to content

Commit

Permalink
feat: add DataType::Char and add LEN to Describe (#174)
Browse files Browse the repository at this point in the history
* feat: add `DataType::Char` and add `LEN` to `Describe`

* fix: `Tuple::serialize_to` & `Tuple::deserialize_from` for `LogicalType::Char`

* fix: `DataValue::cast`'s Decimal\Date32\Date64 to Char

* fix: replace fill symbol '\0' -> ' ' for `LogicalType::Char`

* fix: fill symbol move to `DataValue::to_raw`

* docs: add DataType::Char

* style: code simplification

* fix: `LogicalType::Char` on Server
  • Loading branch information
KKould authored Mar 21, 2024
1 parent e0de637 commit 253daa8
Show file tree
Hide file tree
Showing 23 changed files with 248 additions and 101 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ let fnck_sql = DataBaseBuilder::path("./data")
- UInteger
- Bigint
- UBigint
- Char
- Varchar
- DDL
- Begin (Server only)
Expand Down Expand Up @@ -177,6 +178,7 @@ let fnck_sql = DataBaseBuilder::path("./data")
- UBigint
- Float
- Double
- Char
- Varchar
- Date
- DateTime
Expand Down
5 changes: 4 additions & 1 deletion src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,9 @@ fn encode_tuples<'a>(schema: &Schema, tuples: Vec<Tuple>) -> PgWireResult<QueryR
LogicalType::UBigint => encoder.encode_field(&value.u64().map(|v| v as i64)),
LogicalType::Float => encoder.encode_field(&value.float()),
LogicalType::Double => encoder.encode_field(&value.double()),
LogicalType::Varchar(_) => encoder.encode_field(&value.utf8()),
LogicalType::Char(_) | LogicalType::Varchar(_) => {
encoder.encode_field(&value.utf8())
}
LogicalType::Date => encoder.encode_field(&value.date()),
LogicalType::DateTime => encoder.encode_field(&value.datetime()),
LogicalType::Decimal(_, _) => todo!(),
Expand All @@ -224,6 +226,7 @@ fn into_pg_type(data_type: &LogicalType) -> PgWireResult<Type> {
LogicalType::Double => Type::FLOAT8,
LogicalType::Varchar(_) => Type::VARCHAR,
LogicalType::Date | LogicalType::DateTime => Type::DATE,
LogicalType::Char(_) => Type::CHAR,
LogicalType::Decimal(_, _) => todo!(),
_ => {
return Err(PgWireError::UserError(Box::new(ErrorInfo::new(
Expand Down
11 changes: 9 additions & 2 deletions src/execution/volcano/ddl/add_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,21 @@ impl<T: Transaction> WriteExecutor<T> for AddColumn {

impl AddColumn {
#[try_stream(boxed, ok = Tuple, error = DatabaseError)]
async fn _execute<T: Transaction>(self, transaction: &mut T) {
async fn _execute<T: Transaction>(mut self, transaction: &mut T) {
let AddColumnOperator {
table_name,
column,
if_not_exists,
} = &self.op;
let mut unique_values = column.desc().is_unique.then(Vec::new);
let mut tuples = Vec::new();
let schema = self.input.output_schema();
let mut types = Vec::with_capacity(schema.len() + 1);

for column_ref in schema.iter() {
types.push(*column_ref.datatype());
}
types.push(*column.datatype());

#[for_await]
for tuple in build_read(self.input, transaction) {
Expand All @@ -54,7 +61,7 @@ impl AddColumn {
tuples.push(tuple);
}
for tuple in tuples {
transaction.append(table_name, tuple, true)?;
transaction.append(table_name, tuple, &types, true)?;
}
let col_id = transaction.add_column(table_name, column, *if_not_exists)?;

Expand Down
9 changes: 8 additions & 1 deletion src/execution/volcano/ddl/drop_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,14 @@ impl DropColumn {
))?;
}
let mut tuples = Vec::new();
let mut types = Vec::with_capacity(tuple_columns.len() - 1);

for (i, column_ref) in tuple_columns.iter().enumerate() {
if i == column_index {
continue;
}
types.push(*column_ref.datatype());
}
#[for_await]
for tuple in build_read(self.input, transaction) {
let mut tuple: Tuple = tuple?;
Expand All @@ -55,7 +62,7 @@ impl DropColumn {
tuples.push(tuple);
}
for tuple in tuples {
transaction.append(&table_name, tuple, true)?;
transaction.append(&table_name, tuple, &types, true)?;
}
transaction.drop_column(&table_name, &column_name)?;

Expand Down
5 changes: 3 additions & 2 deletions src/execution/volcano/dml/copy_from_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::errors::DatabaseError;
use crate::execution::volcano::{BoxedExecutor, WriteExecutor};
use crate::planner::operator::copy_from_file::CopyFromFileOperator;
use crate::storage::Transaction;
use crate::types::tuple::Tuple;
use crate::types::tuple::{types, Tuple};
use crate::types::tuple_builder::TupleBuilder;
use futures_async_stream::try_stream;
use std::fs::File;
Expand All @@ -30,6 +30,7 @@ impl<T: Transaction> WriteExecutor<T> for CopyFromFile {
impl CopyFromFile {
#[try_stream(boxed, ok = Tuple, error = DatabaseError)]
pub async fn _execute<T: Transaction>(self, transaction: &mut T) {
let types = types(&self.op.schema_ref);
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let (tx1, mut rx1) = tokio::sync::mpsc::channel(1);
// # Cancellation
Expand All @@ -39,7 +40,7 @@ impl CopyFromFile {
let handle = tokio::task::spawn_blocking(|| self.read_file_blocking(tx));
let mut size = 0_usize;
while let Some(chunk) = rx.recv().await {
transaction.append(&table_name, chunk, false)?;
transaction.append(&table_name, chunk, &types, false)?;
size += 1;
}
handle.await??;
Expand Down
3 changes: 2 additions & 1 deletion src/execution/volcano/dml/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ impl Insert {
.ok_or_else(|| DatabaseError::NotNull)?;

if let Some(table_catalog) = transaction.table(table_name.clone()).cloned() {
let types = table_catalog.types();
#[for_await]
for tuple in build_read(input, transaction) {
let Tuple { values, .. } = tuple?;
Expand Down Expand Up @@ -99,7 +100,7 @@ impl Insert {
}
}
for tuple in tuples {
transaction.append(&table_name, tuple, is_overwrite)?;
transaction.append(&table_name, tuple, &types, is_overwrite)?;
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/execution/volcano/dml/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::planner::operator::update::UpdateOperator;
use crate::planner::LogicalPlan;
use crate::storage::Transaction;
use crate::types::index::Index;
use crate::types::tuple::types;
use crate::types::tuple::Tuple;
use futures_async_stream::try_stream;
use std::collections::HashMap;
Expand Down Expand Up @@ -44,6 +45,7 @@ impl Update {
} = self;
let values_schema = values.output_schema().clone();
let input_schema = input.output_schema().clone();
let types = types(&input_schema);

if let Some(table_catalog) = transaction.table(table_name.clone()).cloned() {
let mut value_map = HashMap::new();
Expand Down Expand Up @@ -94,7 +96,7 @@ impl Update {
transaction.add_index(&table_name, index, tuple.id.as_ref().unwrap())?;
}

transaction.append(&table_name, tuple, is_overwrite)?;
transaction.append(&table_name, tuple, &types, is_overwrite)?;
}
}
}
Expand Down
11 changes: 9 additions & 2 deletions src/execution/volcano/dql/describe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,21 @@ impl Describe {
};

for column in table.columns() {
let datatype = column.datatype();
let values = vec![
Arc::new(DataValue::Utf8(Some(column.name().to_string()))),
Arc::new(DataValue::Utf8(Some(column.datatype().to_string()))),
Arc::new(DataValue::Utf8(Some(datatype.to_string()))),
Arc::new(DataValue::Utf8(Some(
datatype
.raw_len()
.map(|len| len.to_string())
.unwrap_or_else(|| "DYNAMIC".to_string()),
))),
Arc::new(DataValue::Utf8(Some(column.nullable.to_string()))),
key_fn(column),
column
.default_value()
.unwrap_or_else(|| Arc::new(DataValue::none(column.datatype()))),
.unwrap_or_else(|| Arc::new(DataValue::none(datatype))),
];
yield Tuple { id: None, values };
}
Expand Down
2 changes: 1 addition & 1 deletion src/expression/value_compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ impl DataValue {
_ => return Err(DatabaseError::UnsupportedBinaryOperator(unified_type, *op)),
}
}
LogicalType::Varchar(_) => {
LogicalType::Varchar(_) | LogicalType::Char(_) => {
let left_value = unpack_utf8(self.clone().cast(&unified_type)?);
let right_value = unpack_utf8(right.clone().cast(&unified_type)?);

Expand Down
2 changes: 1 addition & 1 deletion src/optimizer/core/histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ impl Histogram {
) -> Result<bool, DatabaseError> {
let float_value = |value: &DataValue, prefix_len: usize| {
let value = match value.logical_type() {
LogicalType::Varchar(_) => match value {
LogicalType::Varchar(_) | LogicalType::Char(_) => match value {
DataValue::Utf8(value) => value.as_ref().map(|string| {
if prefix_len > string.len() {
return 0.0;
Expand Down
1 change: 1 addition & 0 deletions src/planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ impl LogicalPlan {
Operator::Describe(_) => Arc::new(vec![
Arc::new(ColumnCatalog::new_dummy("FIELD".to_string())),
Arc::new(ColumnCatalog::new_dummy("TYPE".to_string())),
Arc::new(ColumnCatalog::new_dummy("LEN".to_string())),
Arc::new(ColumnCatalog::new_dummy("NULL".to_string())),
Arc::new(ColumnCatalog::new_dummy("Key".to_string())),
Arc::new(ColumnCatalog::new_dummy("DEFAULT".to_string())),
Expand Down
5 changes: 4 additions & 1 deletion src/storage/kip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,10 @@ impl Transaction for KipTransaction {
&mut self,
table_name: &str,
tuple: Tuple,
types: &[LogicalType],
is_overwrite: bool,
) -> Result<(), DatabaseError> {
let (key, value) = TableCodec::encode_tuple(table_name, &tuple)?;
let (key, value) = TableCodec::encode_tuple(table_name, &tuple, types)?;

if !is_overwrite && self.tx.get(&key)?.is_some() {
return Err(DatabaseError::DuplicatePrimaryKey);
Expand Down Expand Up @@ -611,6 +612,7 @@ mod test {
Arc::new(DataValue::Boolean(Some(true))),
],
},
&[LogicalType::Integer, LogicalType::Boolean],
false,
)?;
transaction.append(
Expand All @@ -622,6 +624,7 @@ mod test {
Arc::new(DataValue::Boolean(Some(false))),
],
},
&[LogicalType::Integer, LogicalType::Boolean],
false,
)?;

Expand Down
3 changes: 2 additions & 1 deletion src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::storage::table_codec::TableCodec;
use crate::types::index::{Index, IndexId, IndexMetaRef, IndexType};
use crate::types::tuple::{Tuple, TupleId};
use crate::types::value::{DataValue, ValueRef};
use crate::types::ColumnId;
use crate::types::{ColumnId, LogicalType};
use kip_db::kernel::lsm::iterator::Iter as DBIter;
use kip_db::kernel::lsm::mvcc;
use std::collections::{Bound, VecDeque};
Expand Down Expand Up @@ -75,6 +75,7 @@ pub trait Transaction: Sync + Send + 'static {
&mut self,
table_name: &str,
tuple: Tuple,
types: &[LogicalType],
is_overwrite: bool,
) -> Result<(), DatabaseError>;

Expand Down
18 changes: 13 additions & 5 deletions src/storage/table_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,15 @@ impl TableCodec {

/// Key: {TableName}{TUPLE_TAG}{BOUND_MIN_TAG}{RowID}(Sorted)
/// Value: Tuple
pub fn encode_tuple(table_name: &str, tuple: &Tuple) -> Result<(Bytes, Bytes), DatabaseError> {
pub fn encode_tuple(
table_name: &str,
tuple: &Tuple,
types: &[LogicalType],
) -> Result<(Bytes, Bytes), DatabaseError> {
let tuple_id = tuple.id.clone().ok_or(DatabaseError::PrimaryKeyNotFound)?;
let key = Self::encode_tuple_key(table_name, &tuple_id)?;

Ok((Bytes::from(key), Bytes::from(tuple.serialize_to())))
Ok((Bytes::from(key), Bytes::from(tuple.serialize_to(types))))
}

pub fn encode_tuple_key(
Expand Down Expand Up @@ -223,7 +227,7 @@ impl TableCodec {
) -> Result<(Bytes, Bytes), DatabaseError> {
let key = TableCodec::encode_index_key(name, index, Some(tuple_id))?;

Ok((Bytes::from(key), Bytes::from(tuple_id.to_raw())))
Ok((Bytes::from(key), Bytes::from(tuple_id.to_raw(None))))
}

fn _encode_index_key(name: &str, index: &Index) -> Result<Vec<u8>, DatabaseError> {
Expand Down Expand Up @@ -263,7 +267,7 @@ impl TableCodec {

if let Some(tuple_id) = tuple_id {
if matches!(index.ty, IndexType::Normal | IndexType::Composite) {
key_prefix.append(&mut tuple_id.to_raw());
key_prefix.append(&mut tuple_id.to_raw(None));
}
}
Ok(key_prefix)
Expand Down Expand Up @@ -381,7 +385,11 @@ mod tests {
Arc::new(DataValue::Decimal(Some(Decimal::new(1, 0)))),
],
};
let (_, bytes) = TableCodec::encode_tuple(&table_catalog.name, &tuple)?;
let (_, bytes) = TableCodec::encode_tuple(
&table_catalog.name,
&tuple,
&[LogicalType::Integer, LogicalType::Decimal(None, None)],
)?;
let schema = table_catalog.schema_ref();

assert_eq!(
Expand Down
49 changes: 42 additions & 7 deletions src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ use chrono::{NaiveDate, NaiveDateTime};
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};
use std::any::TypeId;
use std::cmp;

use crate::errors::DatabaseError;
use sqlparser::ast::ExactNumberInfo;
use sqlparser::ast::{CharLengthUnits, CharacterLength, ExactNumberInfo};
use strum_macros::AsRefStr;

pub type ColumnId = u32;
Expand All @@ -33,6 +34,7 @@ pub enum LogicalType {
UBigint,
Float,
Double,
Char(u32),
Varchar(Option<u32>),
Date,
DateTime,
Expand Down Expand Up @@ -80,7 +82,6 @@ impl LogicalType {

pub fn raw_len(&self) -> Option<usize> {
match self {
LogicalType::Invalid => Some(0),
LogicalType::SqlNull => Some(0),
LogicalType::Boolean => Some(1),
LogicalType::Tinyint => Some(1),
Expand All @@ -95,10 +96,11 @@ impl LogicalType {
LogicalType::Double => Some(8),
/// Note: The non-fixed length type's raw_len is None e.g. Varchar
LogicalType::Varchar(_) => None,
LogicalType::Char(len) => Some(*len as usize),
LogicalType::Decimal(_, _) => Some(16),
LogicalType::Date => Some(4),
LogicalType::DateTime => Some(8),
LogicalType::Tuple => unreachable!(),
LogicalType::Invalid | LogicalType::Tuple => unreachable!(),
}
}

Expand Down Expand Up @@ -287,9 +289,16 @@ impl LogicalType {
LogicalType::UBigint => matches!(to, LogicalType::Float | LogicalType::Double),
LogicalType::Float => matches!(to, LogicalType::Double),
LogicalType::Double => false,
LogicalType::Char(_) => false,
LogicalType::Varchar(_) => false,
LogicalType::Date => matches!(to, LogicalType::DateTime | LogicalType::Varchar(_)),
LogicalType::DateTime => matches!(to, LogicalType::Date | LogicalType::Varchar(_)),
LogicalType::Date => matches!(
to,
LogicalType::DateTime | LogicalType::Varchar(_) | LogicalType::Char(_)
),
LogicalType::DateTime => matches!(
to,
LogicalType::Date | LogicalType::Varchar(_) | LogicalType::Char(_)
),
LogicalType::Decimal(_, _) | LogicalType::Tuple => false,
}
}
Expand All @@ -301,8 +310,34 @@ impl TryFrom<sqlparser::ast::DataType> for LogicalType {

fn try_from(value: sqlparser::ast::DataType) -> Result<Self, Self::Error> {
match value {
sqlparser::ast::DataType::Char(len) | sqlparser::ast::DataType::Varchar(len) => {
Ok(LogicalType::Varchar(len.map(|len| len.length as u32)))
sqlparser::ast::DataType::Char(char_len)
| sqlparser::ast::DataType::Character(char_len) => {
let mut len = 1;
if let Some(CharacterLength { length, unit }) = char_len {
if matches!(unit, Some(CharLengthUnits::Octets)) {
return Err(DatabaseError::UnsupportedStmt(format!(
"char unit: {:?}",
unit
)));
}
len = cmp::max(len, length)
}
Ok(LogicalType::Char(len as u32))
}
sqlparser::ast::DataType::CharVarying(varchar_len)
| sqlparser::ast::DataType::CharacterVarying(varchar_len)
| sqlparser::ast::DataType::Varchar(varchar_len) => {
let mut len = None;
if let Some(CharacterLength { length, unit }) = varchar_len {
if matches!(unit, Some(CharLengthUnits::Octets)) {
return Err(DatabaseError::UnsupportedStmt(format!(
"char unit: {:?}",
unit
)));
}
len = Some(length as u32)
}
Ok(LogicalType::Varchar(len))
}
sqlparser::ast::DataType::Float(_) => Ok(LogicalType::Float),
sqlparser::ast::DataType::Double | sqlparser::ast::DataType::DoublePrecision => {
Expand Down
Loading

0 comments on commit 253daa8

Please sign in to comment.