From 0ad82e7ab85f6a0b61ce18e20e910fe51c984011 Mon Sep 17 00:00:00 2001 From: Xwg Date: Thu, 5 Oct 2023 03:26:26 +0800 Subject: [PATCH] feat(copy): add support for file copying operations in database (#75) * feat(copy): add support for file copying operations in database Added files and functions necessary for implementing the COPY command in the database, allowing for both copying to and from files. This includes the creation of the CopyFromFile and CopyToFile operators and executors, modifications to the binder and executor mod files to support the new COPY command structure, and the creation of several associated test files. Additionally, made necessary changes to the CreateTable executor to yield tuples as the execution result, and introduced the 'csv' dependency in Cargo.toml for handling CSV file reading and writing. This feature allows for efficient data importation and exportation between database tables and external files. * remove the he unwrap * remove duplicate module import in operator * cargo fmt * style: added PrimaryKeyNotFound error * fix: handle error process --------- Co-authored-by: Kould <2435992353@qq.com> --- Cargo.toml | 1 + src/binder/copy.rs | 137 +++++++++++++ src/binder/create_table.rs | 2 +- src/binder/mod.rs | 13 ++ src/execution/executor/ddl/create_table.rs | 8 +- src/execution/executor/dml/copy_from_file.rs | 198 +++++++++++++++++++ src/execution/executor/dml/copy_to_file.rs | 6 + src/execution/executor/dml/mod.rs | 2 + src/execution/executor/mod.rs | 6 + src/execution/mod.rs | 24 +++ src/planner/operator/copy_from_file.rs | 11 ++ src/planner/operator/copy_to_file.rs | 6 + src/planner/operator/mod.rs | 7 + src/types/errors.rs | 2 + src/types/mod.rs | 1 + src/types/tuple_builder.rs | 83 ++++++++ tests/data/copy.tbl | 2 + tests/slt/copy.slt | 17 ++ 18 files changed, 523 insertions(+), 3 deletions(-) create mode 100644 src/binder/copy.rs create mode 100644 src/execution/executor/dml/copy_from_file.rs create mode 100644 src/execution/executor/dml/copy_to_file.rs create mode 100644 src/planner/operator/copy_from_file.rs create mode 100644 src/planner/operator/copy_to_file.rs create mode 100644 src/types/tuple_builder.rs create mode 100644 tests/data/copy.tbl create mode 100644 tests/slt/copy.slt diff --git a/Cargo.toml b/Cargo.toml index 5f5cd914..223881a9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,7 @@ bytes = "1.5.0" kip_db = "0.1.2-alpha.16" async-recursion = "1.0.5" rust_decimal = "1" +csv = "1" [dev-dependencies] tokio-test = "0.4.2" diff --git a/src/binder/copy.rs b/src/binder/copy.rs new file mode 100644 index 00000000..ad671d72 --- /dev/null +++ b/src/binder/copy.rs @@ -0,0 +1,137 @@ +use std::path::PathBuf; +use std::str::FromStr; + +use crate::planner::operator::copy_from_file::CopyFromFileOperator; +use crate::planner::operator::copy_to_file::CopyToFileOperator; +use crate::planner::operator::Operator; +use serde::{Deserialize, Serialize}; +use sqlparser::ast::{CopyOption, CopySource, CopyTarget}; + +use super::*; + +#[derive(Debug, PartialEq, PartialOrd, Ord, Hash, Eq, Clone, Serialize, Deserialize)] +pub struct ExtSource { + pub path: PathBuf, + pub format: FileFormat, +} + +/// File format. +#[derive(Debug, PartialEq, PartialOrd, Ord, Hash, Eq, Clone, Serialize, Deserialize)] +pub enum FileFormat { + Csv { + /// Delimiter to parse. + delimiter: char, + /// Quote to use. + quote: char, + /// Escape character to use. + escape: Option, + /// Whether or not the file has a header line. + header: bool, + }, +} + +impl std::fmt::Display for ExtSource { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{self:?}") + } +} + +impl std::fmt::Display for FileFormat { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{self:?}") + } +} + +impl FromStr for ExtSource { + type Err = (); + fn from_str(_s: &str) -> std::result::Result { + Err(()) + } +} + +impl Binder { + pub(super) async fn bind_copy( + &mut self, + source: CopySource, + to: bool, + target: CopyTarget, + options: &[CopyOption], + ) -> Result { + let (table_name, ..) = match source { + CopySource::Table { + table_name, + columns, + } => (table_name, columns), + CopySource::Query(_) => { + return Err(BindError::UnsupportedCopySource( + "bad copy source".to_string(), + )); + } + }; + + if let Some(table) = self.context.storage.table(&table_name.to_string()).await { + let cols = table.all_columns(); + let ext_source = ExtSource { + path: match target { + CopyTarget::File { filename } => filename.into(), + t => todo!("unsupported copy target: {:?}", t), + }, + format: FileFormat::from_options(options), + }; + let types = cols.iter().map(|c| c.desc.column_datatype).collect(); + + let copy = if to { + // COPY TO + LogicalPlan { + operator: Operator::CopyToFile(CopyToFileOperator { source: ext_source }), + childrens: vec![], + } + } else { + // COPY FROM + LogicalPlan { + operator: Operator::CopyFromFile(CopyFromFileOperator { + source: ext_source, + types, + columns: cols, + table: table_name.to_string(), + }), + childrens: vec![], + } + }; + Ok(copy) + } else { + Err(BindError::InvalidTable(format!( + "not found table {}", + table_name + ))) + } + } +} + +impl FileFormat { + /// Create from copy options. + pub fn from_options(options: &[CopyOption]) -> Self { + let mut delimiter = ','; + let mut quote = '"'; + let mut escape = None; + let mut header = false; + for opt in options { + match opt { + CopyOption::Format(fmt) => { + assert_eq!(fmt.value.to_lowercase(), "csv", "only support CSV format") + } + CopyOption::Delimiter(c) => delimiter = *c, + CopyOption::Header(b) => header = *b, + CopyOption::Quote(c) => quote = *c, + CopyOption::Escape(c) => escape = Some(*c), + o => panic!("unsupported copy option: {:?}", o), + } + } + FileFormat::Csv { + delimiter, + quote, + escape, + header, + } + } +} diff --git a/src/binder/create_table.rs b/src/binder/create_table.rs index aac141d4..80227721 100644 --- a/src/binder/create_table.rs +++ b/src/binder/create_table.rs @@ -17,7 +17,7 @@ impl Binder { &mut self, name: &ObjectName, columns: &[ColumnDef], - constraints: &[TableConstraint], + _constraints: &[TableConstraint], ) -> Result { let name = lower_case_name(&name); let (_, name) = split_name(&name)?; diff --git a/src/binder/mod.rs b/src/binder/mod.rs index 61d13a89..134b6fa9 100644 --- a/src/binder/mod.rs +++ b/src/binder/mod.rs @@ -1,4 +1,5 @@ pub mod aggregate; +pub mod copy; mod create_table; mod delete; mod distinct; @@ -129,6 +130,16 @@ impl Binder { } Statement::Truncate { table_name, .. } => self.bind_truncate(table_name).await?, Statement::ShowTables { .. } => self.bind_show_tables()?, + Statement::Copy { + source, + to, + target, + options, + .. + } => { + self.bind_copy(source.clone(), *to, target.clone(), &options) + .await? + } _ => return Err(BindError::UnsupportedStmt(stmt.to_string())), }; Ok(plan) @@ -176,6 +187,8 @@ pub enum BindError { CatalogError(#[from] CatalogError), #[error("type error: {0}")] TypeError(#[from] TypeError), + #[error("copy error: {0}")] + UnsupportedCopySource(String), } #[cfg(test)] diff --git a/src/execution/executor/ddl/create_table.rs b/src/execution/executor/ddl/create_table.rs index 08a3592f..253c6074 100644 --- a/src/execution/executor/ddl/create_table.rs +++ b/src/execution/executor/ddl/create_table.rs @@ -3,6 +3,7 @@ use crate::execution::ExecutorError; use crate::planner::operator::create_table::CreateTableOperator; use crate::storage::Storage; use crate::types::tuple::Tuple; +use crate::types::tuple_builder::TupleBuilder; use futures_async_stream::try_stream; pub struct CreateTable { @@ -28,7 +29,10 @@ impl CreateTable { table_name, columns, } = self.op; - - let _ = storage.create_table(table_name, columns).await?; + let _ = storage.create_table(table_name.clone(), columns).await?; + let tuple_builder = TupleBuilder::new_result(); + let tuple = tuple_builder + .push_result("CREATE TABLE SUCCESS", format!("{}", table_name).as_str())?; + yield tuple; } } diff --git a/src/execution/executor/dml/copy_from_file.rs b/src/execution/executor/dml/copy_from_file.rs new file mode 100644 index 00000000..79885fdf --- /dev/null +++ b/src/execution/executor/dml/copy_from_file.rs @@ -0,0 +1,198 @@ +use crate::binder::copy::FileFormat; +use crate::execution::executor::{BoxedExecutor, Executor}; +use crate::execution::ExecutorError; +use crate::planner::operator::copy_from_file::CopyFromFileOperator; +use crate::storage::{Storage, Transaction}; +use crate::types::tuple::Tuple; +use crate::types::tuple_builder::TupleBuilder; +use futures_async_stream::try_stream; +use std::fs::File; +use std::io::BufReader; +use tokio::sync::mpsc::Sender; + +pub struct CopyFromFile { + op: CopyFromFileOperator, + size: usize, +} + +impl From for CopyFromFile { + fn from(op: CopyFromFileOperator) -> Self { + CopyFromFile { op, size: 0 } + } +} + +impl Executor for CopyFromFile { + fn execute(self, storage: &S) -> BoxedExecutor { + self._execute(storage.clone()) + } +} + +impl CopyFromFile { + #[try_stream(boxed, ok = Tuple, error = ExecutorError)] + pub async fn _execute(self, storage: S) { + let (tx, mut rx) = tokio::sync::mpsc::channel(1); + let (tx1, mut rx1) = tokio::sync::mpsc::channel(1); + // # Cancellation + // When this stream is dropped, the `rx` is dropped, the spawned task will fail to send to + // `tx`, then the task will finish. + let table_name = self.op.table.clone(); + if let Some(mut txn) = storage.transaction(&table_name).await { + let handle = tokio::task::spawn_blocking(|| self.read_file_blocking(tx)); + let mut size = 0 as usize; + while let Some(chunk) = rx.recv().await { + txn.append(chunk, false)?; + size += 1; + } + handle.await??; + txn.commit().await?; + + let handle = tokio::task::spawn_blocking(move || return_result(size.clone(), tx1)); + while let Some(chunk) = rx1.recv().await { + yield chunk; + } + handle.await??; + } + } + /// Read records from file using blocking IO. + /// + /// The read data chunks will be sent through `tx`. + fn read_file_blocking(mut self, tx: Sender) -> Result<(), ExecutorError> { + let file = File::open(self.op.source.path)?; + let mut buf_reader = BufReader::new(file); + let mut reader = match self.op.source.format { + FileFormat::Csv { + delimiter, + quote, + escape, + header, + } => csv::ReaderBuilder::new() + .delimiter(delimiter as u8) + .quote(quote as u8) + .escape(escape.map(|c| c as u8)) + .has_headers(header) + .from_reader(&mut buf_reader), + }; + + let column_count = self.op.types.len(); + let mut size_count = 0; + + for record in reader.records() { + let mut tuple_builder = + TupleBuilder::new(self.op.types.clone(), self.op.columns.clone()); + // read records and push raw str rows into data chunk builder + let record = record?; + + if !(record.len() == column_count + || record.len() == column_count + 1 && record.get(column_count) == Some("")) + { + return Err(ExecutorError::LengthMismatch { + expected: column_count, + actual: record.len(), + }); + } + + size_count += 1; + + // push a raw str row and send it if necessary + if let Some(chunk) = tuple_builder.push_str_row(record.iter())? { + tx.blocking_send(chunk).map_err(|_| ExecutorError::Abort)?; + } + } + self.size = size_count; + Ok(()) + } +} + +fn return_result(size: usize, tx: Sender) -> Result<(), ExecutorError> { + let tuple_builder = TupleBuilder::new_result(); + let tuple = + tuple_builder.push_result("COPY FROM SOURCE", format!("import {} rows", size).as_str())?; + tx.blocking_send(tuple).map_err(|_| ExecutorError::Abort)?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use crate::catalog::{ColumnCatalog, ColumnDesc}; + use crate::db::Database; + use futures::StreamExt; + use std::io::Write; + use std::sync::Arc; + use tempfile::TempDir; + + use super::*; + use crate::binder::copy::ExtSource; + use crate::types::LogicalType; + + #[tokio::test] + async fn read_csv() { + let csv = "1,1.5,one\n2,2.5,two\n"; + + let mut file = tempfile::NamedTempFile::new().expect("failed to create temp file"); + write!(file, "{}", csv).expect("failed to write file"); + + let columns = vec![ + Arc::new(ColumnCatalog { + id: Some(0), + name: "a".to_string(), + table_name: None, + nullable: false, + desc: ColumnDesc::new(LogicalType::Integer, true, false), + ref_expr: None, + }), + Arc::new(ColumnCatalog { + id: Some(1), + name: "b".to_string(), + table_name: None, + nullable: false, + desc: ColumnDesc::new(LogicalType::Float, false, false), + ref_expr: None, + }), + Arc::new(ColumnCatalog { + id: Some(1), + name: "c".to_string(), + table_name: None, + nullable: false, + desc: ColumnDesc::new(LogicalType::Varchar(Some(10)), false, false), + ref_expr: None, + }), + ]; + + let op = CopyFromFileOperator { + table: "test_copy".to_string(), + source: ExtSource { + path: file.path().into(), + format: FileFormat::Csv { + delimiter: ',', + quote: '"', + escape: None, + header: false, + }, + }, + + types: vec![ + LogicalType::Integer, + LogicalType::Float, + LogicalType::Varchar(Some(10)), + ], + columns: columns.clone(), + }; + let executor = CopyFromFile { + op: op.clone(), + size: 0, + }; + + let temp_dir = TempDir::new().unwrap(); + let db = Database::with_kipdb(temp_dir.path()).await.unwrap(); + let _ = db + .run("create table test_copy (a int primary key, b float, c varchar(10))") + .await; + let actual = executor.execute(&db.storage).next().await.unwrap().unwrap(); + + let tuple_builder = TupleBuilder::new_result(); + let expected = tuple_builder + .push_result("COPY FROM SOURCE", format!("import {} rows", 2).as_str()) + .unwrap(); + assert_eq!(actual, expected); + } +} diff --git a/src/execution/executor/dml/copy_to_file.rs b/src/execution/executor/dml/copy_to_file.rs new file mode 100644 index 00000000..f1970de8 --- /dev/null +++ b/src/execution/executor/dml/copy_to_file.rs @@ -0,0 +1,6 @@ +use crate::planner::operator::copy_to_file::CopyToFileOperator; + +#[warn(dead_code)] +pub struct CopyToFile { + op: CopyToFileOperator, +} diff --git a/src/execution/executor/dml/mod.rs b/src/execution/executor/dml/mod.rs index d36b39d8..fac541fe 100644 --- a/src/execution/executor/dml/mod.rs +++ b/src/execution/executor/dml/mod.rs @@ -1,3 +1,5 @@ +pub(crate) mod copy_from_file; +pub(crate) mod copy_to_file; pub(crate) mod delete; pub(crate) mod insert; pub(crate) mod update; diff --git a/src/execution/executor/mod.rs b/src/execution/executor/mod.rs index d3f44f4c..93d1a193 100644 --- a/src/execution/executor/mod.rs +++ b/src/execution/executor/mod.rs @@ -6,6 +6,7 @@ pub(crate) mod show; use crate::execution::executor::ddl::create_table::CreateTable; use crate::execution::executor::ddl::drop_table::DropTable; use crate::execution::executor::ddl::truncate::Truncate; +use crate::execution::executor::dml::copy_from_file::CopyFromFile; use crate::execution::executor::dml::delete::Delete; use crate::execution::executor::dml::insert::Insert; use crate::execution::executor::dml::update::Update; @@ -106,6 +107,11 @@ pub fn build(plan: LogicalPlan, storage: &S) -> BoxedExecutor { Operator::DropTable(op) => DropTable::from(op).execute(storage), Operator::Truncate(op) => Truncate::from(op).execute(storage), Operator::Show(op) => ShowTables::from(op).execute(storage), + Operator::CopyFromFile(op) => CopyFromFile::from(op).execute(storage), + #[warn(unused_assignments)] + Operator::CopyToFile(_op) => { + todo!() + } } } diff --git a/src/execution/mod.rs b/src/execution/mod.rs index b1f8256f..77764311 100644 --- a/src/execution/mod.rs +++ b/src/execution/mod.rs @@ -40,4 +40,28 @@ pub enum ExecutorError { ), #[error("Internal error: {0}")] InternalError(String), + #[error("io error")] + Io( + #[from] + #[source] + std::io::Error, + ), + #[error("csv error")] + Csv( + #[from] + #[source] + csv::Error, + ), + #[error("tuple length mismatch: expected {expected} but got {actual}")] + LengthMismatch { expected: usize, actual: usize }, + #[error("abort")] + Abort, + #[error("unknown error")] + Unknown, + #[error("join error")] + JoinError( + #[from] + #[source] + tokio::task::JoinError, + ), } diff --git a/src/planner/operator/copy_from_file.rs b/src/planner/operator/copy_from_file.rs new file mode 100644 index 00000000..70d8caa5 --- /dev/null +++ b/src/planner/operator/copy_from_file.rs @@ -0,0 +1,11 @@ +use crate::binder::copy::ExtSource; +use crate::catalog::ColumnRef; +use crate::types::LogicalType; + +#[derive(Debug, PartialEq, Clone)] +pub struct CopyFromFileOperator { + pub table: String, + pub source: ExtSource, + pub types: Vec, + pub columns: Vec, +} diff --git a/src/planner/operator/copy_to_file.rs b/src/planner/operator/copy_to_file.rs new file mode 100644 index 00000000..1eb65ed8 --- /dev/null +++ b/src/planner/operator/copy_to_file.rs @@ -0,0 +1,6 @@ +use crate::binder::copy::ExtSource; + +#[derive(Debug, PartialEq, Clone)] +pub struct CopyToFileOperator { + pub source: ExtSource, +} diff --git a/src/planner/operator/mod.rs b/src/planner/operator/mod.rs index 10500025..4be5b0af 100644 --- a/src/planner/operator/mod.rs +++ b/src/planner/operator/mod.rs @@ -1,4 +1,6 @@ pub mod aggregate; +pub mod copy_from_file; +pub mod copy_to_file; pub mod create_table; pub mod delete; pub mod drop_table; @@ -16,6 +18,8 @@ pub mod values; use crate::catalog::ColumnRef; use crate::expression::ScalarExpression; +use crate::planner::operator::copy_from_file::CopyFromFileOperator; +use crate::planner::operator::copy_to_file::CopyToFileOperator; use crate::planner::operator::create_table::CreateTableOperator; use crate::planner::operator::delete::DeleteOperator; use crate::planner::operator::drop_table::DropTableOperator; @@ -54,6 +58,9 @@ pub enum Operator { Truncate(TruncateOperator), // Show Show(ShowTablesOperator), + // Copy + CopyFromFile(CopyFromFileOperator), + CopyToFile(CopyToFileOperator), } impl Operator { diff --git a/src/types/errors.rs b/src/types/errors.rs index b897138d..6aae5dc1 100644 --- a/src/types/errors.rs +++ b/src/types/errors.rs @@ -7,6 +7,8 @@ use std::string::FromUtf8Error; pub enum TypeError { #[error("invalid type")] InvalidType, + #[error("Must contain PrimaryKey!")] + PrimaryKeyNotFound, #[error("not implemented sqlparser datatype: {0}")] NotImplementedSqlparserDataType(String), #[error("internal error: {0}")] diff --git a/src/types/mod.rs b/src/types/mod.rs index 7d83edff..de2d29f4 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -1,6 +1,7 @@ pub mod errors; pub mod index; pub mod tuple; +pub mod tuple_builder; pub mod value; use chrono::{NaiveDate, NaiveDateTime}; diff --git a/src/types/tuple_builder.rs b/src/types/tuple_builder.rs new file mode 100644 index 00000000..aa6f0778 --- /dev/null +++ b/src/types/tuple_builder.rs @@ -0,0 +1,83 @@ +use crate::catalog::{ColumnCatalog, ColumnRef}; +use crate::types::errors::TypeError; +use crate::types::tuple::Tuple; +use crate::types::value::{DataValue, ValueRef}; +use crate::types::LogicalType; +use std::collections::HashMap; +use std::sync::Arc; + +pub struct TupleBuilder { + data_types: Vec, + data_values: Vec, + columns: Vec, +} + +impl TupleBuilder { + pub fn new(data_types: Vec, columns: Vec) -> Self { + TupleBuilder { + data_types, + data_values: Vec::new(), + columns, + } + } + + pub fn new_result() -> Self { + TupleBuilder { + data_types: Vec::new(), + data_values: Vec::new(), + columns: Vec::new(), + } + } + + pub fn push_result(self, header: &str, message: &str) -> Result { + let columns: Vec = vec![Arc::new(ColumnCatalog::new_dummy(header.to_string()))]; + let values: Vec = vec![Arc::new(DataValue::Utf8(Some(String::from(message))))]; + let t = Tuple { + id: None, + columns, + values, + }; + Ok(t) + } + + pub fn push_str_row<'a>( + &mut self, + row: impl IntoIterator, + ) -> Result, TypeError> { + let mut primary_key_index = None; + let columns = self.columns.clone(); + let mut tuple_map = HashMap::new(); + + for (i, value) in row.into_iter().enumerate() { + let data_value = DataValue::Utf8(Some(value.to_string())); + let cast_data_value = data_value.cast(&self.data_types[i])?; + self.data_values.push(Arc::new(cast_data_value.clone())); + let col = &columns[i]; + col.id + .map(|col_id| tuple_map.insert(col_id, Arc::new(cast_data_value.clone()))); + if primary_key_index.is_none() && col.desc.is_primary { + primary_key_index = Some(i); + } + } + + let primary_col_id = primary_key_index + .map(|i| columns[i].id.unwrap()) + .ok_or_else(|| TypeError::PrimaryKeyNotFound)?; + + let tuple_id = tuple_map + .get(&primary_col_id) + .ok_or_else(|| TypeError::PrimaryKeyNotFound)? + .clone(); + + let tuple = if self.data_values.len() == self.data_types.len() { + Some(Tuple { + id: Some(tuple_id), + columns: self.columns.clone(), + values: self.data_values.clone(), + }) + } else { + None + }; + Ok(tuple) + } +} diff --git a/tests/data/copy.tbl b/tests/data/copy.tbl new file mode 100644 index 00000000..cb636302 --- /dev/null +++ b/tests/data/copy.tbl @@ -0,0 +1,2 @@ +0|1.5|one +1|2.5|two \ No newline at end of file diff --git a/tests/slt/copy.slt b/tests/slt/copy.slt new file mode 100644 index 00000000..77f78384 --- /dev/null +++ b/tests/slt/copy.slt @@ -0,0 +1,17 @@ +statement ok +create table test_copy (a int primary key, b float, c varchar(10)) + + + +# copy data from tbl file +query I +COPY test_copy FROM 'tests/data/copy.tbl' ( DELIMITER '|' ); +---- +import 2 rows + + +query I +SELECT * FROM test_copy +---- +0 1.5 one +1 2.5 two \ No newline at end of file