Skip to content

Commit

Permalink
feat(copy): add support for file copying operations in database
Browse files Browse the repository at this point in the history
Added files and functions necessary for implementing the COPY command in the database, allowing for both copying to and from files. This includes the creation of the CopyFromFile and CopyToFile operators and executors, modifications to the binder and executor mod files to support the new COPY command structure, and the creation of several associated test files. Additionally, made necessary changes to the CreateTable executor to yield tuples as the execution result, and introduced the 'csv' dependency in Cargo.toml for handling CSV file reading and writing. This feature allows for efficient data importation and exportation between database tables and external files.
  • Loading branch information
loloxwg committed Oct 3, 2023
1 parent 3c5603d commit 5e2f548
Show file tree
Hide file tree
Showing 17 changed files with 525 additions and 11 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ bytes = "1.5.0"
kip_db = "0.1.2-alpha.15"
async-recursion = "1.0.5"
rust_decimal = "1"
csv = "1"

[dev-dependencies]
tokio-test = "0.4.2"
Expand Down
140 changes: 140 additions & 0 deletions src/binder/copy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
use std::path::PathBuf;
use std::str::FromStr;

use serde::{Deserialize, Serialize};
use sqlparser::ast::{CopyOption, CopySource, CopyTarget};
use crate::planner::operator::copy_from_file::CopyFromFileOperator;
use crate::planner::operator::copy_to_file::CopyToFileOperator;
use crate::planner::operator::Operator;

use super::*;

#[derive(Debug, PartialEq, PartialOrd, Ord, Hash, Eq, Clone, Serialize, Deserialize)]
pub struct ExtSource {
pub path: PathBuf,
pub format: FileFormat,
}

/// File format.
#[derive(Debug, PartialEq, PartialOrd, Ord, Hash, Eq, Clone, Serialize, Deserialize)]
pub enum FileFormat {
Csv {
/// Delimiter to parse.
delimiter: char,
/// Quote to use.
quote: char,
/// Escape character to use.
escape: Option<char>,
/// Whether or not the file has a header line.
header: bool,
},
}

impl std::fmt::Display for ExtSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{self:?}")
}
}

impl std::fmt::Display for FileFormat {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{self:?}")
}
}

impl FromStr for ExtSource {
type Err = ();
fn from_str(_s: &str) -> std::result::Result<Self, Self::Err> {
Err(())
}
}

impl<S: Storage> Binder<S> {
pub(super) async fn bind_copy(
&mut self,
source: CopySource,
to: bool,
target: CopyTarget,
options: &[CopyOption],
) -> Result<LogicalPlan, BindError> {
let (table_name,..) = match source {
CopySource::Table {
table_name,
columns,
} => (table_name, columns),
CopySource::Query(_) => {
return Err(BindError::UnsupportedCopySource(
"bad copy source".to_string(),
));
}
};

if let Some(table) = self.context.storage.table(&table_name.to_string()).await {
let cols = table.all_columns();
let ext_source = ExtSource {
path: match target {
CopyTarget::File { filename } => filename.into(),
t => todo!("unsupported copy target: {:?}", t),
},
format: FileFormat::from_options(options),
};
let types = cols.iter().map(|c| c.desc.column_datatype).collect();

let copy = if to {
// COPY <source_table> TO <dest_file>
LogicalPlan {
operator: Operator::CopyToFile(
CopyToFileOperator {
source: ext_source,
}
),
childrens: vec![],
}
} else {
// COPY <dest_table> FROM <source_file>
LogicalPlan {
operator: Operator::CopyFromFile(
CopyFromFileOperator {
source: ext_source,
types,
columns: cols,
table: table_name.to_string()
}
),
childrens: vec![],
}
};
Ok(copy)
} else {
Err(BindError::InvalidTable(format!("not found table {}", table_name)))
}
}
}

impl FileFormat {
/// Create from copy options.
pub fn from_options(options: &[CopyOption]) -> Self {
let mut delimiter = ',';
let mut quote = '"';
let mut escape = None;
let mut header = false;
for opt in options {
match opt {
CopyOption::Format(fmt) => {
assert_eq!(fmt.value.to_lowercase(), "csv", "only support CSV format")
}
CopyOption::Delimiter(c) => delimiter = *c,
CopyOption::Header(b) => header = *b,
CopyOption::Quote(c) => quote = *c,
CopyOption::Escape(c) => escape = Some(*c),
o => panic!("unsupported copy option: {:?}", o),
}
}
FileFormat::Csv {
delimiter,
quote,
escape,
header,
}
}
}
2 changes: 1 addition & 1 deletion src/binder/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl<S: Storage> Binder<S> {
&mut self,
name: &ObjectName,
columns: &[ColumnDef],
constraints: &[TableConstraint]
_constraints: &[TableConstraint]
) -> Result<LogicalPlan, BindError> {
let name = lower_case_name(&name);
let (_, name) = split_name(&name)?;
Expand Down
22 changes: 14 additions & 8 deletions src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod drop_table;
mod truncate;
mod distinct;
mod show;
pub mod copy;

use std::collections::BTreeMap;
use sqlparser::ast::{Ident, ObjectName, ObjectType, SetExpr, Statement};
Expand All @@ -22,7 +23,7 @@ use crate::types::errors::TypeError;

pub enum InputRefType {
AggCall,
GroupBy
GroupBy,
}

#[derive(Clone)]
Expand Down Expand Up @@ -121,12 +122,15 @@ impl<S: Storage> Binder<S> {
self.bind_delete(table, selection).await?
}
}
Statement::Truncate { table_name, .. } => {
self.bind_truncate(table_name).await?
}
Statement::ShowTables { .. } => {
self.bind_show_tables()?
}
Statement::Truncate { table_name, .. } => self.bind_truncate(table_name).await?,
Statement::ShowTables { .. } => self.bind_show_tables()?,
Statement::Copy {
source,
to,
target,
options,
..
} => self.bind_copy(source.clone(), *to, target.clone(), &options).await?,
_ => return Err(BindError::UnsupportedStmt(stmt.to_string())),
};
Ok(plan)
Expand Down Expand Up @@ -173,7 +177,9 @@ pub enum BindError {
#[error("catalog error: {0}")]
CatalogError(#[from] CatalogError),
#[error("type error: {0}")]
TypeError(#[from] TypeError)
TypeError(#[from] TypeError),
#[error("copy error: {0}")]
UnsupportedCopySource(String),
}

#[cfg(test)]
Expand Down
7 changes: 5 additions & 2 deletions src/execution/executor/ddl/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::execution::ExecutorError;
use crate::planner::operator::create_table::CreateTableOperator;
use crate::storage::Storage;
use crate::types::tuple::Tuple;
use crate::types::tuple_builder::TupleBuilder;

pub struct CreateTable {
op: CreateTableOperator
Expand All @@ -27,7 +28,9 @@ impl CreateTable {
#[try_stream(boxed, ok = Tuple, error = ExecutorError)]
pub async fn _execute<S: Storage>(self, storage: S) {
let CreateTableOperator { table_name, columns } = self.op;

let _ = storage.create_table(table_name, columns).await?;
let _ = storage.create_table(table_name.clone(), columns).await?;
let tuple_builder = TupleBuilder::new_result();
let tuple = tuple_builder.push_result("CREATE TABLE SUCCESS", format!("{}", table_name).as_str())?;
yield tuple;
}
}
Loading

0 comments on commit 5e2f548

Please sign in to comment.