Skip to content

Commit

Permalink
feat(copy): add support for file copying operations in database (#75)
Browse files Browse the repository at this point in the history
* feat(copy): add support for file copying operations in database

Added files and functions necessary for implementing the COPY command in the database, allowing for both copying to and from files. This includes the creation of the CopyFromFile and CopyToFile operators and executors, modifications to the binder and executor mod files to support the new COPY command structure, and the creation of several associated test files. Additionally, made necessary changes to the CreateTable executor to yield tuples as the execution result, and introduced the 'csv' dependency in Cargo.toml for handling CSV file reading and writing. This feature allows for efficient data importation and exportation between database tables and external files.

* remove the he unwrap

* remove duplicate module import in operator

* cargo fmt

* style: added PrimaryKeyNotFound error

* fix: handle error process

---------

Co-authored-by: Kould <[email protected]>
  • Loading branch information
loloxwg and KKould authored Oct 4, 2023
1 parent 4bf7385 commit 0ad82e7
Show file tree
Hide file tree
Showing 18 changed files with 523 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ bytes = "1.5.0"
kip_db = "0.1.2-alpha.16"
async-recursion = "1.0.5"
rust_decimal = "1"
csv = "1"

[dev-dependencies]
tokio-test = "0.4.2"
Expand Down
137 changes: 137 additions & 0 deletions src/binder/copy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
use std::path::PathBuf;
use std::str::FromStr;

use crate::planner::operator::copy_from_file::CopyFromFileOperator;
use crate::planner::operator::copy_to_file::CopyToFileOperator;
use crate::planner::operator::Operator;
use serde::{Deserialize, Serialize};
use sqlparser::ast::{CopyOption, CopySource, CopyTarget};

use super::*;

#[derive(Debug, PartialEq, PartialOrd, Ord, Hash, Eq, Clone, Serialize, Deserialize)]
pub struct ExtSource {
pub path: PathBuf,
pub format: FileFormat,
}

/// File format.
#[derive(Debug, PartialEq, PartialOrd, Ord, Hash, Eq, Clone, Serialize, Deserialize)]
pub enum FileFormat {
Csv {
/// Delimiter to parse.
delimiter: char,
/// Quote to use.
quote: char,
/// Escape character to use.
escape: Option<char>,
/// Whether or not the file has a header line.
header: bool,
},
}

impl std::fmt::Display for ExtSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{self:?}")
}
}

impl std::fmt::Display for FileFormat {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{self:?}")
}
}

impl FromStr for ExtSource {
type Err = ();
fn from_str(_s: &str) -> std::result::Result<Self, Self::Err> {
Err(())
}
}

impl<S: Storage> Binder<S> {
pub(super) async fn bind_copy(
&mut self,
source: CopySource,
to: bool,
target: CopyTarget,
options: &[CopyOption],
) -> Result<LogicalPlan, BindError> {
let (table_name, ..) = match source {
CopySource::Table {
table_name,
columns,
} => (table_name, columns),
CopySource::Query(_) => {
return Err(BindError::UnsupportedCopySource(
"bad copy source".to_string(),
));
}
};

if let Some(table) = self.context.storage.table(&table_name.to_string()).await {
let cols = table.all_columns();
let ext_source = ExtSource {
path: match target {
CopyTarget::File { filename } => filename.into(),
t => todo!("unsupported copy target: {:?}", t),
},
format: FileFormat::from_options(options),
};
let types = cols.iter().map(|c| c.desc.column_datatype).collect();

let copy = if to {
// COPY <source_table> TO <dest_file>
LogicalPlan {
operator: Operator::CopyToFile(CopyToFileOperator { source: ext_source }),
childrens: vec![],
}
} else {
// COPY <dest_table> FROM <source_file>
LogicalPlan {
operator: Operator::CopyFromFile(CopyFromFileOperator {
source: ext_source,
types,
columns: cols,
table: table_name.to_string(),
}),
childrens: vec![],
}
};
Ok(copy)
} else {
Err(BindError::InvalidTable(format!(
"not found table {}",
table_name
)))
}
}
}

impl FileFormat {
/// Create from copy options.
pub fn from_options(options: &[CopyOption]) -> Self {
let mut delimiter = ',';
let mut quote = '"';
let mut escape = None;
let mut header = false;
for opt in options {
match opt {
CopyOption::Format(fmt) => {
assert_eq!(fmt.value.to_lowercase(), "csv", "only support CSV format")
}
CopyOption::Delimiter(c) => delimiter = *c,
CopyOption::Header(b) => header = *b,
CopyOption::Quote(c) => quote = *c,
CopyOption::Escape(c) => escape = Some(*c),
o => panic!("unsupported copy option: {:?}", o),
}
}
FileFormat::Csv {
delimiter,
quote,
escape,
header,
}
}
}
2 changes: 1 addition & 1 deletion src/binder/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl<S: Storage> Binder<S> {
&mut self,
name: &ObjectName,
columns: &[ColumnDef],
constraints: &[TableConstraint],
_constraints: &[TableConstraint],
) -> Result<LogicalPlan, BindError> {
let name = lower_case_name(&name);
let (_, name) = split_name(&name)?;
Expand Down
13 changes: 13 additions & 0 deletions src/binder/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod aggregate;
pub mod copy;
mod create_table;
mod delete;
mod distinct;
Expand Down Expand Up @@ -129,6 +130,16 @@ impl<S: Storage> Binder<S> {
}
Statement::Truncate { table_name, .. } => self.bind_truncate(table_name).await?,
Statement::ShowTables { .. } => self.bind_show_tables()?,
Statement::Copy {
source,
to,
target,
options,
..
} => {
self.bind_copy(source.clone(), *to, target.clone(), &options)
.await?
}
_ => return Err(BindError::UnsupportedStmt(stmt.to_string())),
};
Ok(plan)
Expand Down Expand Up @@ -176,6 +187,8 @@ pub enum BindError {
CatalogError(#[from] CatalogError),
#[error("type error: {0}")]
TypeError(#[from] TypeError),
#[error("copy error: {0}")]
UnsupportedCopySource(String),
}

#[cfg(test)]
Expand Down
8 changes: 6 additions & 2 deletions src/execution/executor/ddl/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::execution::ExecutorError;
use crate::planner::operator::create_table::CreateTableOperator;
use crate::storage::Storage;
use crate::types::tuple::Tuple;
use crate::types::tuple_builder::TupleBuilder;
use futures_async_stream::try_stream;

pub struct CreateTable {
Expand All @@ -28,7 +29,10 @@ impl CreateTable {
table_name,
columns,
} = self.op;

let _ = storage.create_table(table_name, columns).await?;
let _ = storage.create_table(table_name.clone(), columns).await?;
let tuple_builder = TupleBuilder::new_result();
let tuple = tuple_builder
.push_result("CREATE TABLE SUCCESS", format!("{}", table_name).as_str())?;
yield tuple;
}
}
Loading

0 comments on commit 0ad82e7

Please sign in to comment.