diff --git a/README.md b/README.md index 4f7577fe..2822a862 100755 --- a/README.md +++ b/README.md @@ -192,7 +192,7 @@ let fnck_sql = DataBaseBuilder::path("./data") - Drop - [x] Table - [ ] Index - - [ ] View + - [x] View - Alert - [x] Add Column - [x] Drop Column diff --git a/src/binder/copy.rs b/src/binder/copy.rs index 89573935..eae6f3c7 100644 --- a/src/binder/copy.rs +++ b/src/binder/copy.rs @@ -93,7 +93,11 @@ impl Binder<'_, '_, T> { if to { // COPY TO Ok(LogicalPlan::new( - Operator::CopyToFile(CopyToFileOperator { source: ext_source }), + Operator::CopyToFile(CopyToFileOperator { + table: table.name.to_string(), + target: ext_source, + schema_ref, + }), vec![], )) } else { diff --git a/src/execution/dml/copy_to_file.rs b/src/execution/dml/copy_to_file.rs index d2600a4e..84d8ce57 100644 --- a/src/execution/dml/copy_to_file.rs +++ b/src/execution/dml/copy_to_file.rs @@ -1,6 +1,203 @@ +use crate::binder::copy::FileFormat; +use crate::errors::DatabaseError; +use crate::execution::{Executor, ReadExecutor}; use crate::planner::operator::copy_to_file::CopyToFileOperator; +use crate::storage::{Iter, StatisticsMetaCache, TableCache, Transaction, ViewCache}; +use crate::throw; +use crate::types::tuple::{column_names, full_columns}; +use crate::types::tuple_builder::TupleBuilder; +use std::sync::Arc; -#[allow(dead_code)] pub struct CopyToFile { - op: CopyToFileOperator, + pub op: CopyToFileOperator, +} + +impl From for CopyToFile { + fn from(op: CopyToFileOperator) -> Self { + CopyToFile { op } + } +} + +impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for CopyToFile { + fn execute( + self, + cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), + transaction: &'a T, + ) -> Executor<'a> { + Box::new( + #[coroutine] + move || { + let table_name = Arc::new(self.op.table.clone()); + + let mut writer = throw!(self.create_writer()); + + let mut iter = throw!(transaction.read( + cache.0, + table_name.clone(), + (None, None), + full_columns(&*self.op.schema_ref) + )); + + while let Some(tuple) = throw!(iter.next_tuple()) { + throw!(writer + .write_record(tuple.to_str_vec()) + .map_err(|e| DatabaseError::from(e))); + } + + throw!(writer.flush().map_err(|e| DatabaseError::from(e))); + + let tuple = TupleBuilder::build_result(format!("{}", self.op)); + yield Ok(tuple) + }, + ) + } +} + +impl CopyToFile { + fn create_writer(&self) -> Result, DatabaseError> { + let mut writer = match self.op.target.format { + FileFormat::Csv { + delimiter, + quote, + header, + .. + } => csv::WriterBuilder::new() + .delimiter(delimiter as u8) + .quote(quote as u8) + .has_headers(header) + .from_path(self.op.target.path.clone())?, + }; + + if let FileFormat::Csv { header: true, .. } = self.op.target.format { + let headers = column_names(&*self.op.schema_ref); + writer.write_record(headers)?; + } + + Ok(writer) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::binder::copy::ExtSource; + use crate::catalog::{ColumnCatalog, ColumnDesc, ColumnRef, ColumnRelation, ColumnSummary}; + use crate::db::DataBaseBuilder; + use crate::errors::DatabaseError; + use crate::storage::Storage; + use crate::types::LogicalType; + use sqlparser::ast::CharLengthUnits; + use std::ops::{Coroutine, CoroutineState}; + use std::pin::Pin; + use std::sync::Arc; + use tempfile::TempDir; + use ulid::Ulid; + + #[test] + fn read_csv() -> Result<(), DatabaseError> { + let columns = vec![ + ColumnRef::from(ColumnCatalog::direct_new( + ColumnSummary { + name: "a".to_string(), + relation: ColumnRelation::Table { + column_id: Ulid::new(), + table_name: Arc::new("t1".to_string()), + is_temp: false, + }, + }, + false, + ColumnDesc::new(LogicalType::Integer, Some(0), false, None)?, + false, + )), + ColumnRef::from(ColumnCatalog::direct_new( + ColumnSummary { + name: "b".to_string(), + relation: ColumnRelation::Table { + column_id: Ulid::new(), + table_name: Arc::new("t1".to_string()), + is_temp: false, + }, + }, + false, + ColumnDesc::new(LogicalType::Float, None, false, None)?, + false, + )), + ColumnRef::from(ColumnCatalog::direct_new( + ColumnSummary { + name: "c".to_string(), + relation: ColumnRelation::Table { + column_id: Ulid::new(), + table_name: Arc::new("t1".to_string()), + is_temp: false, + }, + }, + false, + ColumnDesc::new( + LogicalType::Varchar(Some(10), CharLengthUnits::Characters), + None, + false, + None, + )?, + false, + )), + ]; + + let tmp_dir = TempDir::new()?; + let file_path = tmp_dir.path().join("test.csv"); + + let op = CopyToFileOperator { + table: "t1".to_string(), + target: ExtSource { + path: file_path.clone(), + format: FileFormat::Csv { + delimiter: ',', + quote: '"', + escape: None, + header: true, + }, + }, + schema_ref: Arc::new(columns), + }; + + let temp_dir = TempDir::new().unwrap(); + let db = DataBaseBuilder::path(temp_dir.path()).build()?; + let _ = db.run("create table t1 (a int primary key, b float, c varchar(10))"); + let _ = db.run("insert into t1 values (1, 1.1, 'foo')"); + let _ = db.run("insert into t1 values (2, 2.0, 'fooo')"); + let _ = db.run("insert into t1 values (3, 2.1, 'fnck')"); + + let storage = db.storage; + let mut transaction = storage.transaction()?; + + let executor = CopyToFile { op: op.clone() }; + let mut coroutine = executor.execute( + (&db.table_cache, &db.view_cache, &db.meta_cache), + &mut transaction, + ); + + let tuple = match Pin::new(&mut coroutine).resume(()) { + CoroutineState::Yielded(tuple) => tuple, + CoroutineState::Complete(()) => unreachable!(), + }?; + + let mut rdr = csv::Reader::from_path(file_path)?; + let headers = rdr.headers()?.clone(); + assert_eq!(headers, vec!["a", "b", "c"]); + + let mut records = rdr.records(); + let record1 = records.next().unwrap()?; + assert_eq!(record1, vec!["1", "1.1", "foo"]); + + let record2 = records.next().unwrap()?; + assert_eq!(record2, vec!["2", "2.0", "fooo"]); + + let record3 = records.next().unwrap()?; + assert_eq!(record3, vec!["3", "2.1", "fnck"]); + + assert!(records.next().is_none()); + + assert_eq!(tuple, TupleBuilder::build_result(format!("{}", op))); + + Ok(()) + } } diff --git a/src/execution/mod.rs b/src/execution/mod.rs index a6182f0c..505953fa 100644 --- a/src/execution/mod.rs +++ b/src/execution/mod.rs @@ -15,6 +15,7 @@ use crate::execution::ddl::drop_view::DropView; use crate::execution::ddl::truncate::Truncate; use crate::execution::dml::analyze::Analyze; use crate::execution::dml::copy_from_file::CopyFromFile; +use crate::execution::dml::copy_to_file::CopyToFile; use crate::execution::dml::delete::Delete; use crate::execution::dml::insert::Insert; use crate::execution::dml::update::Update; @@ -196,10 +197,8 @@ pub fn build_write<'a, T: Transaction + 'a>( Operator::DropView(op) => DropView::from(op).execute_mut(cache, transaction), Operator::Truncate(op) => Truncate::from(op).execute_mut(cache, transaction), Operator::CopyFromFile(op) => CopyFromFile::from(op).execute_mut(cache, transaction), - #[warn(unused_assignments)] - Operator::CopyToFile(_op) => { - todo!() - } + Operator::CopyToFile(op) => CopyToFile::from(op).execute(cache, transaction), + Operator::Analyze(op) => { let input = childrens.pop().unwrap(); diff --git a/src/planner/mod.rs b/src/planner/mod.rs index 8d2ca2dd..94a4bb96 100644 --- a/src/planner/mod.rs +++ b/src/planner/mod.rs @@ -165,7 +165,9 @@ impl LogicalPlan { Operator::CopyFromFile(_) => SchemaOutput::Schema(vec![ColumnRef::from( ColumnCatalog::new_dummy("COPY FROM SOURCE".to_string()), )]), - Operator::CopyToFile(_) => todo!(), + Operator::CopyToFile(_) => SchemaOutput::Schema(vec![ColumnRef::from( + ColumnCatalog::new_dummy("COPY TO TARGET".to_string()), + )]), } } diff --git a/src/planner/operator/copy_to_file.rs b/src/planner/operator/copy_to_file.rs index 25b295bc..268c9628 100644 --- a/src/planner/operator/copy_to_file.rs +++ b/src/planner/operator/copy_to_file.rs @@ -1,7 +1,32 @@ use crate::binder::copy::ExtSource; +use crate::types::tuple::SchemaRef; use fnck_sql_serde_macros::ReferenceSerialization; +use itertools::Itertools; +use std::fmt; +use std::fmt::Formatter; #[derive(Debug, PartialEq, Eq, Clone, Hash, ReferenceSerialization)] pub struct CopyToFileOperator { - pub source: ExtSource, + pub table: String, + pub target: ExtSource, + pub schema_ref: SchemaRef, +} + +impl fmt::Display for CopyToFileOperator { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + let columns = self + .schema_ref + .iter() + .map(|column| column.name().to_string()) + .join(", "); + write!( + f, + "Copy {} -> {} [{}]", + self.table, + self.target.path.display(), + columns + )?; + + Ok(()) + } } diff --git a/src/types/tuple.rs b/src/types/tuple.rs index 3acecb86..dc281a97 100644 --- a/src/types/tuple.rs +++ b/src/types/tuple.rs @@ -25,6 +25,16 @@ pub fn types(schema: &Schema) -> Vec { .map(|column| column.datatype().clone()) .collect_vec() } +pub fn full_columns(schema: &Schema) -> Vec<(usize, ColumnRef)> { + schema + .iter() + .enumerate() + .map(|(index, column_ref)| (index, column_ref.clone())) + .collect() +} +pub fn column_names(schema: &Schema) -> Vec<&str> { + schema.iter().map(|c| c.name()).collect_vec() +} #[derive(Clone, Debug, PartialEq)] pub struct Tuple { @@ -33,6 +43,9 @@ pub struct Tuple { } impl Tuple { + pub fn to_str_vec(&self) -> Vec { + self.values.iter().map(|v| v.to_string()).collect() + } pub fn deserialize_from( table_types: &[LogicalType], id_builder: &mut TupleIdBuilder, diff --git a/tests/slt/copy.slt b/tests/slt/copy.slt index ac334f31..87889fbb 100644 --- a/tests/slt/copy.slt +++ b/tests/slt/copy.slt @@ -11,4 +11,9 @@ query I SELECT * FROM test_copy ---- 0 1.5 one -1 2.5 two \ No newline at end of file +1 2.5 two + +query I +COPY test_copy TO '/tmp/copy.csv' ( DELIMITER ',' ); +---- +Copy test_copy -> /tmp/copy.csv [a, b, c] \ No newline at end of file