diff --git a/Cargo.toml b/Cargo.toml index bf937f55..7cd8c4ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,7 @@ bytes = "1.5.0" kip_db = "0.1.2-alpha.15" async-recursion = "1.0.5" rust_decimal = "1" +csv = "1" [dev-dependencies] tokio-test = "0.4.2" diff --git a/src/binder/copy.rs b/src/binder/copy.rs new file mode 100644 index 00000000..16bc58e0 --- /dev/null +++ b/src/binder/copy.rs @@ -0,0 +1,140 @@ +use std::path::PathBuf; +use std::str::FromStr; + +use serde::{Deserialize, Serialize}; +use sqlparser::ast::{CopyOption, CopySource, CopyTarget}; +use crate::planner::operator::copy_from_file::CopyFromFileOperator; +use crate::planner::operator::copy_to_file::CopyToFileOperator; +use crate::planner::operator::Operator; + +use super::*; + +#[derive(Debug, PartialEq, PartialOrd, Ord, Hash, Eq, Clone, Serialize, Deserialize)] +pub struct ExtSource { + pub path: PathBuf, + pub format: FileFormat, +} + +/// File format. +#[derive(Debug, PartialEq, PartialOrd, Ord, Hash, Eq, Clone, Serialize, Deserialize)] +pub enum FileFormat { + Csv { + /// Delimiter to parse. + delimiter: char, + /// Quote to use. + quote: char, + /// Escape character to use. + escape: Option, + /// Whether or not the file has a header line. + header: bool, + }, +} + +impl std::fmt::Display for ExtSource { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{self:?}") + } +} + +impl std::fmt::Display for FileFormat { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{self:?}") + } +} + +impl FromStr for ExtSource { + type Err = (); + fn from_str(_s: &str) -> std::result::Result { + Err(()) + } +} + +impl Binder { + pub(super) async fn bind_copy( + &mut self, + source: CopySource, + to: bool, + target: CopyTarget, + options: &[CopyOption], + ) -> Result { + let (table_name,..) = match source { + CopySource::Table { + table_name, + columns, + } => (table_name, columns), + CopySource::Query(_) => { + return Err(BindError::UnsupportedCopySource( + "bad copy source".to_string(), + )); + } + }; + + if let Some(table) = self.context.storage.table(&table_name.to_string()).await { + let cols = table.all_columns(); + let ext_source = ExtSource { + path: match target { + CopyTarget::File { filename } => filename.into(), + t => todo!("unsupported copy target: {:?}", t), + }, + format: FileFormat::from_options(options), + }; + let types = cols.iter().map(|c| c.desc.column_datatype).collect(); + + let copy = if to { + // COPY TO + LogicalPlan { + operator: Operator::CopyToFile( + CopyToFileOperator { + source: ext_source, + } + ), + childrens: vec![], + } + } else { + // COPY FROM + LogicalPlan { + operator: Operator::CopyFromFile( + CopyFromFileOperator { + source: ext_source, + types, + columns: cols, + table: table_name.to_string() + } + ), + childrens: vec![], + } + }; + Ok(copy) + } else { + Err(BindError::InvalidTable(format!("not found table {}", table_name))) + } + } +} + +impl FileFormat { + /// Create from copy options. + pub fn from_options(options: &[CopyOption]) -> Self { + let mut delimiter = ','; + let mut quote = '"'; + let mut escape = None; + let mut header = false; + for opt in options { + match opt { + CopyOption::Format(fmt) => { + assert_eq!(fmt.value.to_lowercase(), "csv", "only support CSV format") + } + CopyOption::Delimiter(c) => delimiter = *c, + CopyOption::Header(b) => header = *b, + CopyOption::Quote(c) => quote = *c, + CopyOption::Escape(c) => escape = Some(*c), + o => panic!("unsupported copy option: {:?}", o), + } + } + FileFormat::Csv { + delimiter, + quote, + escape, + header, + } + } +} diff --git a/src/binder/create_table.rs b/src/binder/create_table.rs index 6d3bb383..179ea77c 100644 --- a/src/binder/create_table.rs +++ b/src/binder/create_table.rs @@ -17,7 +17,7 @@ impl Binder { &mut self, name: &ObjectName, columns: &[ColumnDef], - constraints: &[TableConstraint] + _constraints: &[TableConstraint] ) -> Result { let name = lower_case_name(&name); let (_, name) = split_name(&name)?; diff --git a/src/binder/mod.rs b/src/binder/mod.rs index fd2f6a87..45251524 100644 --- a/src/binder/mod.rs +++ b/src/binder/mod.rs @@ -9,6 +9,7 @@ mod drop_table; mod truncate; mod distinct; mod show; +pub mod copy; use std::collections::BTreeMap; use sqlparser::ast::{Ident, ObjectName, ObjectType, SetExpr, Statement}; @@ -22,7 +23,7 @@ use crate::types::errors::TypeError; pub enum InputRefType { AggCall, - GroupBy + GroupBy, } #[derive(Clone)] @@ -121,12 +122,15 @@ impl Binder { self.bind_delete(table, selection).await? } } - Statement::Truncate { table_name, .. } => { - self.bind_truncate(table_name).await? - } - Statement::ShowTables { .. } => { - self.bind_show_tables()? - } + Statement::Truncate { table_name, .. } => self.bind_truncate(table_name).await?, + Statement::ShowTables { .. } => self.bind_show_tables()?, + Statement::Copy { + source, + to, + target, + options, + .. + } => self.bind_copy(source.clone(), *to, target.clone(), &options).await?, _ => return Err(BindError::UnsupportedStmt(stmt.to_string())), }; Ok(plan) @@ -173,7 +177,9 @@ pub enum BindError { #[error("catalog error: {0}")] CatalogError(#[from] CatalogError), #[error("type error: {0}")] - TypeError(#[from] TypeError) + TypeError(#[from] TypeError), + #[error("copy error: {0}")] + UnsupportedCopySource(String), } #[cfg(test)] diff --git a/src/execution/executor/ddl/create_table.rs b/src/execution/executor/ddl/create_table.rs index 47122f6e..89f3c6a4 100644 --- a/src/execution/executor/ddl/create_table.rs +++ b/src/execution/executor/ddl/create_table.rs @@ -4,6 +4,7 @@ use crate::execution::ExecutorError; use crate::planner::operator::create_table::CreateTableOperator; use crate::storage::Storage; use crate::types::tuple::Tuple; +use crate::types::tuple_builder::TupleBuilder; pub struct CreateTable { op: CreateTableOperator @@ -27,7 +28,9 @@ impl CreateTable { #[try_stream(boxed, ok = Tuple, error = ExecutorError)] pub async fn _execute(self, storage: S) { let CreateTableOperator { table_name, columns } = self.op; - - let _ = storage.create_table(table_name, columns).await?; + let _ = storage.create_table(table_name.clone(), columns).await?; + let tuple_builder = TupleBuilder::new_result(); + let tuple = tuple_builder.push_result("CREATE TABLE SUCCESS", format!("{}", table_name).as_str())?; + yield tuple; } } \ No newline at end of file diff --git a/src/execution/executor/dml/copy_from_file.rs b/src/execution/executor/dml/copy_from_file.rs new file mode 100644 index 00000000..4429a577 --- /dev/null +++ b/src/execution/executor/dml/copy_from_file.rs @@ -0,0 +1,200 @@ +use crate::binder::copy::FileFormat; +use futures_async_stream::try_stream; +use crate::execution::executor::{BoxedExecutor, Executor}; +use crate::execution::ExecutorError; +use crate::planner::operator::copy_from_file::CopyFromFileOperator; +use crate::storage::{Storage, Transaction}; +use crate::types::tuple::Tuple; +use std::fs::File; +use std::io::BufReader; +use tokio::sync::mpsc::Sender; +use crate::types::tuple_builder::TupleBuilder; + + + +pub struct CopyFromFile { + op: CopyFromFileOperator, + size: usize, + +} + + +impl From for CopyFromFile { + fn from(op: CopyFromFileOperator) -> Self { + CopyFromFile { + op, + size: 0, + } + } +} + +impl Executor for CopyFromFile { + fn execute(self, storage: &S) -> BoxedExecutor { + self._execute(storage.clone()) + } +} + + +impl CopyFromFile { + #[try_stream(boxed, ok = Tuple, error = ExecutorError)] + pub async fn _execute(self, storage: S) { + let (tx, mut rx) = tokio::sync::mpsc::channel(1); + // # Cancellation + // When this stream is dropped, the `rx` is dropped, the spawned task will fail to send to + // `tx`, then the task will finish. + let table_name = self.op.table.clone(); + let mut txn = storage.transaction(&table_name).await.unwrap(); + let handle = tokio::task::spawn_blocking(|| self.read_file_blocking(tx)); + let mut size = 0 as usize; + while let Some(chunk) = rx.recv().await { + txn.append(chunk, false)?; + size += 1; + } + handle.await.unwrap()?; + txn.commit().await?; + + let (tx1, mut rx1) = tokio::sync::mpsc::channel(1); + let handle = tokio::task::spawn_blocking(move || return_result(size.clone(), tx1)); + while let Some(chunk) = rx1.recv().await { + yield chunk; + } + handle.await.unwrap()?; + } + /// Read records from file using blocking IO. + /// + /// The read data chunks will be sent through `tx`. + fn read_file_blocking(mut self, tx: Sender) -> Result<(), ExecutorError> { + let file = File::open(self.op.source.path)?; + let mut buf_reader = BufReader::new(file); + let mut reader = match self.op.source.format { + FileFormat::Csv { + delimiter, + quote, + escape, + header, + } => csv::ReaderBuilder::new() + .delimiter(delimiter as u8) + .quote(quote as u8) + .escape(escape.map(|c| c as u8)) + .has_headers(header) + .from_reader(&mut buf_reader), + }; + + let column_count = self.op.types.len(); + let mut size_count = 0; + for record in reader.records() { + let mut tuple_builder = TupleBuilder::new(self.op.types.clone(), self.op.columns.clone()); + // read records and push raw str rows into data chunk builder + let record = record?; + + if !(record.len() == column_count + || record.len() == column_count + 1 && record.get(column_count) == Some("")) + { + return Err(ExecutorError::LengthMismatch { + expected: column_count, + actual: record.len(), + }); + } + + size_count += 1; + + // push a raw str row and send it if necessary + if let Some(chunk) = tuple_builder.push_str_row(record.iter())? { + tx.blocking_send(chunk).map_err(|_| ExecutorError::Abort)?; + } + } + self.size = size_count; + Ok(()) + } +} + +fn return_result(size: usize, tx: Sender) -> Result<(), ExecutorError> { + let tuple_builder = TupleBuilder::new_result(); + let tuple = tuple_builder.push_result("COPY FROM SOURCE", format!("import {} rows", size).as_str())?; + tx.blocking_send(tuple).map_err(|_| ExecutorError::Abort)?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use std::io::Write; + use std::sync::Arc; + use futures::StreamExt; + use tempfile::TempDir; + use crate::catalog::{ColumnCatalog, ColumnDesc}; + use crate::db::Database; + + use super::*; + use crate::types::LogicalType; + use crate::binder::copy::ExtSource; + + + #[tokio::test] + async fn read_csv() { + let csv = "1,1.5,one\n2,2.5,two\n"; + + let mut file = tempfile::NamedTempFile::new().expect("failed to create temp file"); + write!(file, "{}", csv).expect("failed to write file"); + + let columns = vec![ + Arc::new(ColumnCatalog { + id: Some(0), + name: "a".to_string(), + table_name: None, + nullable: false, + desc: ColumnDesc::new(LogicalType::Integer, true, false), + ref_expr: None, + }), + Arc::new(ColumnCatalog { + id: Some(1), + name: "b".to_string(), + table_name: None, + nullable: false, + desc: ColumnDesc::new(LogicalType::Float, false, false), + ref_expr: None, + }), + Arc::new(ColumnCatalog { + id: Some(1), + name: "c".to_string(), + table_name: None, + nullable: false, + desc: ColumnDesc::new(LogicalType::Varchar(Some(10)), false, false), + ref_expr: None, + }), + ]; + + let op = CopyFromFileOperator { + table: "test_copy".to_string(), + source: ExtSource { + path: file.path().into(), + format: FileFormat::Csv { + delimiter: ',', + quote: '"', + escape: None, + header: false, + }, + }, + + types: vec![ + LogicalType::Integer, + LogicalType::Float, + LogicalType::Varchar(Some(10)), + ], + columns: columns.clone(), + }; + let executor = CopyFromFile { + op: op.clone(), + size: 0, + }; + + let temp_dir = TempDir::new().unwrap(); + let db = Database::with_kipdb(temp_dir.path()).await.unwrap(); + let _ = db.run("create table test_copy (a int primary key, b float, c varchar(10))").await; + let actual = executor.execute(&db.storage).next().await.unwrap().unwrap(); + + + let tuple_builder = TupleBuilder::new_result(); + let expected = tuple_builder.push_result("COPY FROM SOURCE", format!("import {} rows", 2).as_str()).unwrap(); + assert_eq!(actual, expected); + } +} diff --git a/src/execution/executor/dml/copy_to_file.rs b/src/execution/executor/dml/copy_to_file.rs new file mode 100644 index 00000000..4ac7b3d8 --- /dev/null +++ b/src/execution/executor/dml/copy_to_file.rs @@ -0,0 +1,7 @@ +use crate::planner::operator::copy_to_file::CopyToFileOperator; + + +#[warn(dead_code)] +pub struct CopyToFile { + op: CopyToFileOperator, +} \ No newline at end of file diff --git a/src/execution/executor/dml/mod.rs b/src/execution/executor/dml/mod.rs index c4bf8b48..2044b636 100644 --- a/src/execution/executor/dml/mod.rs +++ b/src/execution/executor/dml/mod.rs @@ -1,3 +1,5 @@ pub(crate) mod insert; pub(crate) mod update; pub(crate) mod delete; +pub(crate) mod copy_from_file; +pub(crate) mod copy_to_file; diff --git a/src/execution/executor/mod.rs b/src/execution/executor/mod.rs index ab5eab18..f586aa37 100644 --- a/src/execution/executor/mod.rs +++ b/src/execution/executor/mod.rs @@ -8,6 +8,7 @@ use futures::TryStreamExt; use crate::execution::executor::ddl::create_table::CreateTable; use crate::execution::executor::ddl::drop_table::DropTable; use crate::execution::executor::ddl::truncate::Truncate; +use crate::execution::executor::dml::copy_from_file::CopyFromFile; use crate::execution::executor::dml::delete::Delete; use crate::execution::executor::dml::insert::Insert; use crate::execution::executor::dml::update::Update; @@ -113,6 +114,13 @@ pub fn build(plan: LogicalPlan, storage: &S) -> BoxedExecutor { Operator::Show(op) => { ShowTables::from(op).execute(storage) } + Operator::CopyFromFile(op) => { + CopyFromFile::from(op).execute(storage) + } + #[warn(unused_assignments)] + Operator::CopyToFile(_op) => { + todo!() + } } } diff --git a/src/execution/mod.rs b/src/execution/mod.rs index d9b1cae7..727276a5 100644 --- a/src/execution/mod.rs +++ b/src/execution/mod.rs @@ -40,4 +40,20 @@ pub enum ExecutorError { ), #[error("Internal error: {0}")] InternalError(String), + #[error("io error")] + Io( + #[from] + #[source] + std::io::Error, + ), + #[error("csv error")] + Csv( + #[from] + #[source] + csv::Error, + ), + #[error("tuple length mismatch: expected {expected} but got {actual}")] + LengthMismatch { expected: usize, actual: usize }, + #[error("abort")] + Abort, } diff --git a/src/planner/operator/copy_from_file.rs b/src/planner/operator/copy_from_file.rs new file mode 100644 index 00000000..f1e32612 --- /dev/null +++ b/src/planner/operator/copy_from_file.rs @@ -0,0 +1,11 @@ +use crate::binder::copy::ExtSource; +use crate::catalog::ColumnRef; +use crate::types::LogicalType; + +#[derive(Debug, PartialEq, Clone)] +pub struct CopyFromFileOperator { + pub table: String, + pub source: ExtSource, + pub types: Vec, + pub columns: Vec, +} \ No newline at end of file diff --git a/src/planner/operator/copy_to_file.rs b/src/planner/operator/copy_to_file.rs new file mode 100644 index 00000000..72289a44 --- /dev/null +++ b/src/planner/operator/copy_to_file.rs @@ -0,0 +1,6 @@ +use crate::binder::copy::ExtSource; + +#[derive(Debug, PartialEq, Clone)] +pub struct CopyToFileOperator { + pub source: ExtSource, +} \ No newline at end of file diff --git a/src/planner/operator/mod.rs b/src/planner/operator/mod.rs index 92c47ab8..37f158f8 100644 --- a/src/planner/operator/mod.rs +++ b/src/planner/operator/mod.rs @@ -13,10 +13,14 @@ pub mod delete; pub mod drop_table; pub mod truncate; pub mod show; +pub mod copy_from_file; +pub mod copy_to_file; use itertools::Itertools; use crate::catalog::ColumnRef; use crate::expression::ScalarExpression; +use crate::planner::operator::copy_from_file::CopyFromFileOperator; +use crate::planner::operator::copy_to_file::CopyToFileOperator; use crate::planner::operator::create_table::CreateTableOperator; use crate::planner::operator::delete::DeleteOperator; use crate::planner::operator::drop_table::DropTableOperator; @@ -54,6 +58,9 @@ pub enum Operator { Truncate(TruncateOperator), // Show Show(ShowTablesOperator), + // Copy + CopyFromFile(CopyFromFileOperator), + CopyToFile(CopyToFileOperator), } impl Operator { diff --git a/src/types/mod.rs b/src/types/mod.rs index b2fbd65e..b3f82ad4 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -2,6 +2,7 @@ pub mod errors; pub mod value; pub mod tuple; pub mod index; +pub mod tuple_builder; use serde::{Deserialize, Serialize}; diff --git a/src/types/tuple_builder.rs b/src/types/tuple_builder.rs new file mode 100644 index 00000000..40288ca0 --- /dev/null +++ b/src/types/tuple_builder.rs @@ -0,0 +1,87 @@ +use std::collections::HashMap; +use std::sync::Arc; +use crate::catalog::{ColumnCatalog, ColumnRef}; +use crate::types::errors::TypeError; +use crate::types::LogicalType; +use crate::types::tuple::Tuple; +use crate::types::value::{DataValue, ValueRef}; + +pub struct TupleBuilder { + data_types: Vec, + data_values: Vec, + columns: Vec, +} + +impl TupleBuilder { + pub fn new(data_types: Vec, columns: Vec) -> Self { + TupleBuilder { + data_types, + data_values: Vec::new(), + columns, + } + } + + pub fn new_result() -> Self { + TupleBuilder { + data_types: Vec::new(), + data_values: Vec::new(), + columns: Vec::new(), + } + } + + pub fn push_result(self,header: &str, message: &str) -> Result { + let columns: Vec = vec![ + Arc::new(ColumnCatalog::new_dummy(header.to_string())), + ]; + let values: Vec = vec![ + Arc::new(DataValue::Utf8(Some(String::from(message)))), + ]; + let t = Tuple { + id: None, + columns, + values, + }; + Ok(t) + } + + pub fn push_str_row<'a>( + &mut self, + row: impl IntoIterator, + ) -> Result, TypeError> { + let mut primary_key_index = None; + let columns = self.columns.clone(); + let mut tuple_map = HashMap::new(); + for (i, value) in row.into_iter().enumerate() { + let data_value = DataValue::Utf8(Some(value.to_string())); + let cast_data_value = data_value.cast(&self.data_types[i]).unwrap(); + self.data_values.push(Arc::new(cast_data_value.clone())); + let col = &columns[i]; + if let Some(col_id) = col.id { + tuple_map.insert(col_id, Arc::new(cast_data_value)); + } + } + + + let primary_col_id = primary_key_index.get_or_insert_with(|| { + self.columns.iter() + .find(|col| col.desc.is_primary) + .map(|col| col.id.unwrap()) + .unwrap() + }); + + let tuple_id = tuple_map.get(primary_col_id) + .cloned() + .unwrap(); + + let tuple = if self.data_values.len() == self.data_types.len() { + Some(Tuple { + id: Some(tuple_id), + columns: self.columns.clone(), + values: self.data_values.clone(), + }) + } else { + None + }; + Ok(tuple) + } +} \ No newline at end of file diff --git a/tests/data/copy.tbl b/tests/data/copy.tbl new file mode 100644 index 00000000..cb636302 --- /dev/null +++ b/tests/data/copy.tbl @@ -0,0 +1,2 @@ +0|1.5|one +1|2.5|two \ No newline at end of file diff --git a/tests/slt/copy.slt b/tests/slt/copy.slt new file mode 100644 index 00000000..77f78384 --- /dev/null +++ b/tests/slt/copy.slt @@ -0,0 +1,17 @@ +statement ok +create table test_copy (a int primary key, b float, c varchar(10)) + + + +# copy data from tbl file +query I +COPY test_copy FROM 'tests/data/copy.tbl' ( DELIMITER '|' ); +---- +import 2 rows + + +query I +SELECT * FROM test_copy +---- +0 1.5 one +1 2.5 two \ No newline at end of file