From 304b697f02b959f645daadedd562523d3ed65f55 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Sun, 14 Jan 2024 14:22:50 +0000 Subject: [PATCH 01/10] wip: impl COPY DATABASE FROM parser --- src/frontend/src/instance.rs | 11 ++- src/operator/src/statement.rs | 15 +++- src/sql/src/parsers/copy_parser.rs | 137 ++++++++++++++++------------- src/sql/src/statements/copy.rs | 8 +- 4 files changed, 103 insertions(+), 68 deletions(-) diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 72b79947548a..bd21a3b4bf8e 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -70,8 +70,8 @@ use servers::server::{start_server, ServerHandlers}; use session::context::QueryContextRef; use snafu::prelude::*; use sql::dialect::Dialect; -use sql::parser::{ParseOptions, ParserContext}; -use sql::statements::copy::CopyTable; +use sql::parser::{ParseOptions, ParserContext, ParserContext}; +use sql::statements::copy::{CopyDatabase, CopyTable}; use sql::statements::statement::Statement; use sqlparser::ast::ObjectName; pub use standalone::StandaloneDatanodeManager; @@ -487,8 +487,11 @@ pub fn check_permission( validate_param(©_table_from.table_name, query_ctx)? } }, - Statement::Copy(sql::statements::copy::Copy::CopyDatabase(stmt)) => { - validate_param(&stmt.database_name, query_ctx)? + Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_databse)) => { + match copy_databse { + CopyDatabase::To(stmt) => validate_param(&stmt.database_name, query_ctx)?, + CopyDatabase::From(stmt) => validate_param(&stmt.database_name, query_ctx)?, + } } Statement::TruncateTable(stmt) => { validate_param(stmt.table_name(), query_ctx)?; diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index 84efcd13addd..244f4a77055f 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -41,7 +41,7 @@ use query::plan::LogicalPlan; use query::QueryEngineRef; use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt}; -use sql::statements::copy::{CopyDatabaseArgument, CopyTable, CopyTableArgument}; +use sql::statements::copy::{CopyDatabase, CopyDatabaseArgument, CopyTable, CopyTableArgument}; use sql::statements::statement::Statement; use sql::statements::OptionMap; use sql::util::format_raw_object_name; @@ -131,9 +131,16 @@ impl StatementExecutor { } } - Statement::Copy(sql::statements::copy::Copy::CopyDatabase(arg)) => { - self.copy_database(to_copy_database_request(arg, &query_ctx)?) - .await + Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => { + match copy_database { + CopyDatabase::To(arg) => { + self.copy_database(to_copy_database_request(arg, &query_ctx)?) + .await + } + CopyDatabase::From(_) => { + todo!() + } + } } Statement::CreateTable(stmt) => { diff --git a/src/sql/src/parsers/copy_parser.rs b/src/sql/src/parsers/copy_parser.rs index eee50e8c940e..aecdf3341e96 100644 --- a/src/sql/src/parsers/copy_parser.rs +++ b/src/sql/src/parsers/copy_parser.rs @@ -15,13 +15,12 @@ use std::collections::HashMap; use snafu::ResultExt; -use sqlparser::ast::ObjectName; use sqlparser::keywords::Keyword; use sqlparser::tokenizer::Token::Word; use crate::error::{self, Result}; use crate::parser::ParserContext; -use crate::statements::copy::{CopyDatabaseArgument, CopyTable, CopyTableArgument}; +use crate::statements::copy::{CopyDatabase, CopyDatabaseArgument, CopyTable, CopyTableArgument}; use crate::statements::statement::Statement; use crate::util::parse_option_string; @@ -47,7 +46,7 @@ impl<'a> ParserContext<'a> { Ok(Statement::Copy(copy)) } - fn parser_copy_database(&mut self) -> Result { + fn parser_copy_database(&mut self) -> Result { let database_name = self.parser .parse_object_name() @@ -57,17 +56,29 @@ impl<'a> ParserContext<'a> { actual: self.peek_token_as_string(), })?; - self.parser - .expect_keyword(Keyword::TO) - .context(error::SyntaxSnafu)?; - - let (with, connection, location) = self.parse_copy_to()?; - Ok(CopyDatabaseArgument { - database_name, - with: with.into(), - connection: connection.into(), - location, - }) + let req = if self.parser.parse_keyword(Keyword::TO) { + let (with, connection, location) = self.parse_copy_parameters()?; + let argument = CopyDatabaseArgument { + database_name, + with: with.into(), + connection: connection.into(), + location, + }; + CopyDatabase::To(argument) + } else { + self.parser + .expect_keyword(Keyword::FROM) + .context(error::SyntaxSnafu)?; + let (with, connection, location) = self.parse_copy_parameters()?; + let argument = CopyDatabaseArgument { + database_name, + with: with.into(), + connection: connection.into(), + location, + }; + CopyDatabase::From(argument) + }; + Ok(req) } fn parse_copy_table(&mut self) -> Result { @@ -82,7 +93,7 @@ impl<'a> ParserContext<'a> { let table_name = Self::canonicalize_object_name(raw_table_name); if self.parser.parse_keyword(Keyword::TO) { - let (with, connection, location) = self.parse_copy_to()?; + let (with, connection, location) = self.parse_copy_parameters()?; Ok(CopyTable::To(CopyTableArgument { table_name, with: with.into(), @@ -93,52 +104,17 @@ impl<'a> ParserContext<'a> { self.parser .expect_keyword(Keyword::FROM) .context(error::SyntaxSnafu)?; - Ok(CopyTable::From(self.parse_copy_table_from(table_name)?)) + let (with, connection, location) = self.parse_copy_parameters()?; + Ok(CopyTable::From(CopyTableArgument { + table_name, + with: with.into(), + connection: connection.into(), + location, + })) } } - fn parse_copy_table_from(&mut self, table_name: ObjectName) -> Result { - let location = - self.parser - .parse_literal_string() - .with_context(|_| error::UnexpectedSnafu { - sql: self.sql, - expected: "a uri", - actual: self.peek_token_as_string(), - })?; - - let options = self - .parser - .parse_options(Keyword::WITH) - .context(error::SyntaxSnafu)?; - - let with = options - .into_iter() - .filter_map(|option| { - parse_option_string(option.value).map(|v| (option.name.value.to_lowercase(), v)) - }) - .collect(); - - let connection_options = self - .parser - .parse_options(Keyword::CONNECTION) - .context(error::SyntaxSnafu)?; - - let connection = connection_options - .into_iter() - .filter_map(|option| { - parse_option_string(option.value).map(|v| (option.name.value.to_lowercase(), v)) - }) - .collect(); - Ok(CopyTableArgument { - table_name, - with, - connection, - location, - }) - } - - fn parse_copy_to(&mut self) -> Result<(With, Connection, String)> { + fn parse_copy_parameters(&mut self) -> Result<(With, Connection, String)> { let location = self.parser .parse_literal_string() @@ -181,7 +157,7 @@ mod tests { use std::assert_matches::assert_matches; use std::collections::HashMap; - use sqlparser::ast::Ident; + use sqlparser::ast::{Ident, ObjectName}; use super::*; use crate::dialect::GreptimeDbDialect; @@ -398,6 +374,49 @@ mod tests { let Copy(crate::statements::copy::Copy::CopyDatabase(stmt)) = stmt else { unreachable!() }; + + let CopyDatabase::To(stmt) = stmt else { + unreachable!() + }; + + assert_eq!( + ObjectName(vec![Ident::new("catalog0"), Ident::new("schema0")]), + stmt.database_name + ); + assert_eq!( + [("format".to_string(), "parquet".to_string())] + .into_iter() + .collect::>(), + stmt.with.map + ); + + assert_eq!( + [ + ("foo".to_string(), "Bar".to_string()), + ("one".to_string(), "two".to_string()) + ] + .into_iter() + .collect::>(), + stmt.connection.map + ); + } + + #[test] + fn test_copy_database_from() { + let sql = "COPY DATABASE catalog0.schema0 FROM '/a/b/c/' WITH (FORMAT = 'parquet') CONNECTION (FOO='Bar', ONE='two')"; + let stmt = ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}) + .unwrap() + .pop() + .unwrap(); + + let Copy(crate::statements::copy::Copy::CopyDatabase(stmt)) = stmt else { + unreachable!() + }; + + let CopyDatabase::From(stmt) = stmt else { + unreachable!() + }; + assert_eq!( ObjectName(vec![Ident::new("catalog0"), Ident::new("schema0")]), stmt.database_name diff --git a/src/sql/src/statements/copy.rs b/src/sql/src/statements/copy.rs index cd3893300ba8..8d3104f29e69 100644 --- a/src/sql/src/statements/copy.rs +++ b/src/sql/src/statements/copy.rs @@ -20,7 +20,7 @@ use crate::statements::OptionMap; #[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut)] pub enum Copy { CopyTable(CopyTable), - CopyDatabase(CopyDatabaseArgument), + CopyDatabase(CopyDatabase), } #[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut)] @@ -29,6 +29,12 @@ pub enum CopyTable { From(CopyTableArgument), } +#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut)] +pub enum CopyDatabase { + To(CopyDatabaseArgument), + From(CopyDatabaseArgument), +} + #[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut)] pub struct CopyDatabaseArgument { pub database_name: ObjectName, From 0fd838f9fd48b46872aa2c2fc0c3aad9393a8981 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Sun, 14 Jan 2024 15:19:58 +0000 Subject: [PATCH 02/10] wip: impl copy database from --- src/operator/src/statement.rs | 14 +-- .../statement/{backup.rs => copy_database.rs} | 89 ++++++++++++++++++- 2 files changed, 94 insertions(+), 9 deletions(-) rename src/operator/src/statement/{backup.rs => copy_database.rs} (50%) diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index 244f4a77055f..127539afa447 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod backup; +mod copy_database; mod copy_table_from; mod copy_table_to; mod ddl; @@ -55,7 +55,7 @@ use crate::error::{ PlanStatementSnafu, Result, TableNotFoundSnafu, }; use crate::insert::InserterRef; -use crate::statement::backup::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY}; +use crate::statement::copy_database::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY}; use crate::table::table_idents_to_full_name; #[derive(Clone)] @@ -134,11 +134,15 @@ impl StatementExecutor { Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => { match copy_database { CopyDatabase::To(arg) => { - self.copy_database(to_copy_database_request(arg, &query_ctx)?) + self.copy_database_to(to_copy_database_request(arg, &query_ctx)?) .await } - CopyDatabase::From(_) => { - todo!() + CopyDatabase::From(arg) => { + self.copy_database_from( + to_copy_database_request(arg, &query_ctx)?, + query_ctx, + ) + .await } } } diff --git a/src/operator/src/statement/backup.rs b/src/operator/src/statement/copy_database.rs similarity index 50% rename from src/operator/src/statement/backup.rs rename to src/operator/src/statement/copy_database.rs index d970e28bd740..7e2d314dd118 100644 --- a/src/operator/src/statement/backup.rs +++ b/src/operator/src/statement/copy_database.rs @@ -12,10 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::path::Path; + use common_datasource::file_format::Format; +use common_datasource::lister::{Lister, Source}; +use common_datasource::object_store::build_backend; use common_query::Output; -use common_telemetry::{info, tracing}; -use session::context::QueryContextBuilder; +use common_telemetry::{debug, info, tracing}; +use object_store::Entry; +use regex::Regex; +use session::context::{QueryContextBuilder, QueryContextRef}; use snafu::{ensure, ResultExt}; use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest}; @@ -28,7 +34,7 @@ pub(crate) const COPY_DATABASE_TIME_END_KEY: &str = "end_time"; impl StatementExecutor { #[tracing::instrument(skip_all)] - pub(crate) async fn copy_database(&self, req: CopyDatabaseRequest) -> error::Result { + pub(crate) async fn copy_database_to(&self, req: CopyDatabaseRequest) -> error::Result { // location must end with / so that every table is exported to a file. ensure!( req.location.ends_with('/'), @@ -39,7 +45,7 @@ impl StatementExecutor { ); info!( - "Copy database {}.{}, dir: {},. time: {:?}", + "Copy database {}.{} to dir: {},. time: {:?}", req.catalog_name, req.schema_name, req.location, req.time_range ); let table_names = self @@ -86,4 +92,79 @@ impl StatementExecutor { } Ok(Output::AffectedRows(exported_rows)) } + + /// Copies database from a given directory. + #[tracing::instrument(skip_all)] + pub(crate) async fn copy_database_from( + &self, + req: CopyDatabaseRequest, + ctx: QueryContextRef, + ) -> error::Result { + // location must end with / + ensure!( + req.location.ends_with('/'), + InvalidCopyParameterSnafu { + key: "location", + value: req.location, + } + ); + + info!( + "Copy database {}.{} from dir: {}, time: {:?}", + req.catalog_name, req.schema_name, req.location, req.time_range + ); + let suffix = Format::try_from(&req.with) + .context(error::ParseFileFormatSnafu)? + .suffix(); + + let entries = list_files_to_copy(&req, suffix).await?; + + let mut rows_inserted = 0; + for e in entries { + let table_name = parse_file_name_to_copy(&e)?; + let req = CopyTableRequest { + catalog_name: req.catalog_name.clone(), + schema_name: req.schema_name.clone(), + table_name, + location: format!("{}/{}", req.location, e.path()), + with: req.with.clone(), + connection: req.connection.clone(), + pattern: None, + direction: CopyDirection::Import, + timestamp_range: None, + }; + debug!("Copy table, arg: {:?}", req); + rows_inserted += self.copy_table_from(req, ctx.clone()).await?; + } + Ok(Output::AffectedRows(rows_inserted)) + } +} + +/// Parses table names from files' names. +fn parse_file_name_to_copy(e: &Entry) -> error::Result { + Path::new(e.name()) + .file_stem() + .and_then(|os_str| os_str.to_str()) + .map(|s| s.to_string()) + .ok_or_else(|| { + error::InvalidTableNameSnafu { + table_name: e.name().to_string(), + } + .build() + }) +} + +/// Lists all files with expected suffix that can be imported to database. +async fn list_files_to_copy(req: &CopyDatabaseRequest, suffix: &str) -> error::Result> { + let object_store = + build_backend(&req.location, &req.connection).context(error::BuildBackendSnafu)?; + + let pattern = Regex::try_from(format!(".*{}", suffix)).context(error::BuildRegexSnafu)?; + let lister = Lister::new( + object_store.clone(), + Source::Dir, + "/".to_string(), + Some(pattern), + ); + lister.list().await.context(error::ListObjectsSnafu) } From 5843158da7a97f81189d36e0fdd022a189542b40 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Mon, 15 Jan 2024 03:32:33 +0000 Subject: [PATCH 03/10] wip: add some ut --- Cargo.lock | 1 + src/operator/Cargo.toml | 3 ++ src/operator/src/statement/copy_database.rs | 52 ++++++++++++++++++++- 3 files changed, 55 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 713d8553679e..83b5ddcfee80 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5768,6 +5768,7 @@ dependencies = [ "common-recordbatch", "common-runtime", "common-telemetry", + "common-test-util", "common-time", "datafusion", "datafusion-common", diff --git a/src/operator/Cargo.toml b/src/operator/Cargo.toml index a2de8e0102ae..9cd42d2d3af8 100644 --- a/src/operator/Cargo.toml +++ b/src/operator/Cargo.toml @@ -54,3 +54,6 @@ substrait.workspace = true table.workspace = true tokio.workspace = true tonic.workspace = true + +[dev-dependencies] +common-test-util.workspace = true \ No newline at end of file diff --git a/src/operator/src/statement/copy_database.rs b/src/operator/src/statement/copy_database.rs index 7e2d314dd118..870ece1ce6a1 100644 --- a/src/operator/src/statement/copy_database.rs +++ b/src/operator/src/statement/copy_database.rs @@ -93,7 +93,7 @@ impl StatementExecutor { Ok(Output::AffectedRows(exported_rows)) } - /// Copies database from a given directory. + /// Imports data to database from a given location and returns total rows imported. #[tracing::instrument(skip_all)] pub(crate) async fn copy_database_from( &self, @@ -168,3 +168,53 @@ async fn list_files_to_copy(req: &CopyDatabaseRequest, suffix: &str) -> error::R ); lister.list().await.context(error::ListObjectsSnafu) } + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use object_store::services::Fs; + use object_store::util::normalize_dir; + use object_store::ObjectStore; + use table::requests::CopyDatabaseRequest; + + use crate::statement::copy_database::{list_files_to_copy, parse_file_name_to_copy}; + + #[tokio::test] + async fn test_list_files_and_parse_table_name() { + let dir = common_test_util::temp_dir::create_temp_dir("test_list_files_to_copy"); + let store_dir = normalize_dir(dir.path().to_str().unwrap()); + let mut builder = Fs::default(); + let _ = builder.root(&store_dir); + let object_store = ObjectStore::new(builder).unwrap().finish(); + object_store.write("a.parquet", "").await.unwrap(); + object_store.write("b.parquet", "").await.unwrap(); + object_store.write("c.csv", "").await.unwrap(); + object_store.write("d", "").await.unwrap(); + object_store.write("e.f.parquet", "").await.unwrap(); + + let request = CopyDatabaseRequest { + catalog_name: "catalog_0".to_string(), + schema_name: "schema_0".to_string(), + location: store_dir, + with: [("FORMAT".to_string(), "parquet".to_string())] + .into_iter() + .collect(), + connection: Default::default(), + time_range: None, + }; + let listed = list_files_to_copy(&request, ".parquet") + .await + .unwrap() + .into_iter() + .map(|e| parse_file_name_to_copy(&e).unwrap()) + .collect::>(); + + assert_eq!( + ["a".to_string(), "b".to_string(), "e.f".to_string()] + .into_iter() + .collect::>(), + listed + ); + } +} From 1f85bb1a00171e32326132aa3901dacb167460b3 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Mon, 15 Jan 2024 03:46:03 +0000 Subject: [PATCH 04/10] wip: add continue_on_error option --- src/operator/src/statement/copy_database.rs | 43 ++++++++++++++++++--- 1 file changed, 37 insertions(+), 6 deletions(-) diff --git a/src/operator/src/statement/copy_database.rs b/src/operator/src/statement/copy_database.rs index 870ece1ce6a1..d355de166702 100644 --- a/src/operator/src/statement/copy_database.rs +++ b/src/operator/src/statement/copy_database.rs @@ -13,12 +13,14 @@ // limitations under the License. use std::path::Path; +use std::str::FromStr; use common_datasource::file_format::Format; use common_datasource::lister::{Lister, Source}; use common_datasource::object_store::build_backend; use common_query::Output; -use common_telemetry::{debug, info, tracing}; +use common_telemetry::{debug, error, info, tracing}; +use object_store::util::join_path; use object_store::Entry; use regex::Regex; use session::context::{QueryContextBuilder, QueryContextRef}; @@ -31,6 +33,7 @@ use crate::statement::StatementExecutor; pub(crate) const COPY_DATABASE_TIME_START_KEY: &str = "start_time"; pub(crate) const COPY_DATABASE_TIME_END_KEY: &str = "end_time"; +pub(crate) const CONTINUE_ON_ERROR_KEY: &str = "continue_on_error"; impl StatementExecutor { #[tracing::instrument(skip_all)] @@ -45,7 +48,7 @@ impl StatementExecutor { ); info!( - "Copy database {}.{} to dir: {},. time: {:?}", + "Copy database {}.{} to dir: {}, time: {:?}", req.catalog_name, req.schema_name, req.location, req.time_range ); let table_names = self @@ -119,14 +122,30 @@ impl StatementExecutor { let entries = list_files_to_copy(&req, suffix).await?; + let continue_on_error = req + .with + .get(CONTINUE_ON_ERROR_KEY) + .and_then(|v| bool::from_str(v).ok()) + .unwrap_or(false); let mut rows_inserted = 0; + for e in entries { - let table_name = parse_file_name_to_copy(&e)?; + let table_name = match parse_file_name_to_copy(&e) { + Ok(table_name) => table_name, + Err(err) => { + if continue_on_error { + error!(err; "Failed to import table from file: {:?}", e); + continue; + } else { + return Err(err); + } + } + }; let req = CopyTableRequest { catalog_name: req.catalog_name.clone(), schema_name: req.schema_name.clone(), - table_name, - location: format!("{}/{}", req.location, e.path()), + table_name: table_name.clone(), + location: join_path(&req.location, e.path()), with: req.with.clone(), connection: req.connection.clone(), pattern: None, @@ -134,7 +153,19 @@ impl StatementExecutor { timestamp_range: None, }; debug!("Copy table, arg: {:?}", req); - rows_inserted += self.copy_table_from(req, ctx.clone()).await?; + match self.copy_table_from(req, ctx.clone()).await { + Ok(rows) => { + rows_inserted += rows; + } + Err(err) => { + if continue_on_error { + error!(err; "Failed to import file to table: {}", table_name); + continue; + } else { + return Err(err); + } + } + } } Ok(Output::AffectedRows(rows_inserted)) } From 5c4b6a3dfcebb49bb687efec5fcdc22a076aed48 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Mon, 15 Jan 2024 06:32:00 +0000 Subject: [PATCH 05/10] test: add sqlness cases for copy database --- src/operator/src/statement/copy_database.rs | 3 +- .../copy/copy_database_from_fs_parquet.result | 38 +++++++++++++++++++ .../copy/copy_database_from_fs_parquet.sql | 15 ++++++++ 3 files changed, 54 insertions(+), 2 deletions(-) create mode 100644 tests/cases/standalone/common/copy/copy_database_from_fs_parquet.result create mode 100644 tests/cases/standalone/common/copy/copy_database_from_fs_parquet.sql diff --git a/src/operator/src/statement/copy_database.rs b/src/operator/src/statement/copy_database.rs index d355de166702..b15ff230d21b 100644 --- a/src/operator/src/statement/copy_database.rs +++ b/src/operator/src/statement/copy_database.rs @@ -20,7 +20,6 @@ use common_datasource::lister::{Lister, Source}; use common_datasource::object_store::build_backend; use common_query::Output; use common_telemetry::{debug, error, info, tracing}; -use object_store::util::join_path; use object_store::Entry; use regex::Regex; use session::context::{QueryContextBuilder, QueryContextRef}; @@ -145,7 +144,7 @@ impl StatementExecutor { catalog_name: req.catalog_name.clone(), schema_name: req.schema_name.clone(), table_name: table_name.clone(), - location: join_path(&req.location, e.path()), + location: format!("{}/{}", req.location, e.path()), with: req.with.clone(), connection: req.connection.clone(), pattern: None, diff --git a/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.result b/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.result new file mode 100644 index 000000000000..b02a1e50d53c --- /dev/null +++ b/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.result @@ -0,0 +1,38 @@ +CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time index); + +Affected Rows: 0 + +INSERT INTO demo(host, cpu, memory, ts) values ('host1', 66.6, 1024, 1655276557000), ('host2', 88.8, 333.3, 1655276558000); + +Affected Rows: 2 + +COPY DATABASE public TO '/tmp/demo/export/parquet/' WITH (FORMAT="parquet"); + +Affected Rows: 2 + +DELETE FROM demo; + +Affected Rows: 2 + +SELECT * FROM demo ORDER BY ts; + +++ +++ + +COPY DATABASE public FROM '/tmp/demo/export/parquet/'; + +Affected Rows: 2 + +SELECT * FROM demo ORDER BY ts; + ++-------+------+--------+---------------------+ +| host | cpu | memory | ts | ++-------+------+--------+---------------------+ +| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 | +| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 | ++-------+------+--------+---------------------+ + +DROP TABLE demo; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.sql b/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.sql new file mode 100644 index 000000000000..05635506074f --- /dev/null +++ b/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.sql @@ -0,0 +1,15 @@ +CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time index); + +INSERT INTO demo(host, cpu, memory, ts) values ('host1', 66.6, 1024, 1655276557000), ('host2', 88.8, 333.3, 1655276558000); + +COPY DATABASE public TO '/tmp/demo/export/parquet/' WITH (FORMAT="parquet"); + +DELETE FROM demo; + +SELECT * FROM demo ORDER BY ts; + +COPY DATABASE public FROM '/tmp/demo/export/parquet/'; + +SELECT * FROM demo ORDER BY ts; + +DROP TABLE demo; From 9657f03b95e9fef54f9aa91ea0eb3695404a5630 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Mon, 15 Jan 2024 06:35:26 +0000 Subject: [PATCH 06/10] fix: trailing newline --- src/operator/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/operator/Cargo.toml b/src/operator/Cargo.toml index 9cd42d2d3af8..29d6dc215833 100644 --- a/src/operator/Cargo.toml +++ b/src/operator/Cargo.toml @@ -56,4 +56,4 @@ tokio.workspace = true tonic.workspace = true [dev-dependencies] -common-test-util.workspace = true \ No newline at end of file +common-test-util.workspace = true From 58f8644c99fc0491ba2620ba0aeea1ee0a8908f7 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Mon, 15 Jan 2024 06:37:25 +0000 Subject: [PATCH 07/10] fix: typo --- src/frontend/src/instance.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index bd21a3b4bf8e..f09d9d86a4e6 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -487,8 +487,8 @@ pub fn check_permission( validate_param(©_table_from.table_name, query_ctx)? } }, - Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_databse)) => { - match copy_databse { + Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => { + match copy_database { CopyDatabase::To(stmt) => validate_param(&stmt.database_name, query_ctx)?, CopyDatabase::From(stmt) => validate_param(&stmt.database_name, query_ctx)?, } From 949fc1b5fe0ab4618dd13eb2b9220e83fed67b60 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Sun, 21 Jan 2024 09:35:29 +0000 Subject: [PATCH 08/10] fix: some cr comments --- src/operator/src/error.rs | 7 ++++++- src/operator/src/statement.rs | 7 +++++-- src/operator/src/statement/copy_database.rs | 19 ++++++++++-------- src/operator/src/statement/copy_table_to.rs | 10 ++++++---- .../copy/copy_database_from_fs_parquet.result | 20 +++++++++++++++++++ .../copy/copy_database_from_fs_parquet.sql | 8 ++++++++ 6 files changed, 56 insertions(+), 15 deletions(-) diff --git a/src/operator/src/error.rs b/src/operator/src/error.rs index 0492be12db75..e9b8501b5631 100644 --- a/src/operator/src/error.rs +++ b/src/operator/src/error.rs @@ -418,6 +418,9 @@ pub enum Error { location: Location, }, + #[snafu(display("Invalid COPY DATABASE location, must end with '/': {}", value))] + InvalidCopyDatabasePath { value: String, location: Location }, + #[snafu(display("Table metadata manager error"))] TableMetadataManager { source: common_meta::error::Error, @@ -596,7 +599,9 @@ impl ErrorExt for Error { | Error::BuildBackend { source, .. } => source.status_code(), Error::ExecuteDdl { source, .. } => source.status_code(), - Error::InvalidCopyParameter { .. } => StatusCode::InvalidArguments, + Error::InvalidCopyParameter { .. } | Error::InvalidCopyDatabasePath { .. } => { + StatusCode::InvalidArguments + } Error::ReadRecordBatch { source, .. } | Error::BuildColumnVectors { source, .. } => { source.status_code() diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index 127539afa447..93106b8587a1 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -134,8 +134,11 @@ impl StatementExecutor { Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => { match copy_database { CopyDatabase::To(arg) => { - self.copy_database_to(to_copy_database_request(arg, &query_ctx)?) - .await + self.copy_database_to( + to_copy_database_request(arg, &query_ctx)?, + query_ctx.clone(), + ) + .await } CopyDatabase::From(arg) => { self.copy_database_from( diff --git a/src/operator/src/statement/copy_database.rs b/src/operator/src/statement/copy_database.rs index b15ff230d21b..29abec49e291 100644 --- a/src/operator/src/statement/copy_database.rs +++ b/src/operator/src/statement/copy_database.rs @@ -22,12 +22,12 @@ use common_query::Output; use common_telemetry::{debug, error, info, tracing}; use object_store::Entry; use regex::Regex; -use session::context::{QueryContextBuilder, QueryContextRef}; +use session::context::QueryContextRef; use snafu::{ensure, ResultExt}; use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest}; use crate::error; -use crate::error::{CatalogSnafu, InvalidCopyParameterSnafu}; +use crate::error::{CatalogSnafu, InvalidCopyDatabasePathSnafu}; use crate::statement::StatementExecutor; pub(crate) const COPY_DATABASE_TIME_START_KEY: &str = "start_time"; @@ -36,12 +36,15 @@ pub(crate) const CONTINUE_ON_ERROR_KEY: &str = "continue_on_error"; impl StatementExecutor { #[tracing::instrument(skip_all)] - pub(crate) async fn copy_database_to(&self, req: CopyDatabaseRequest) -> error::Result { + pub(crate) async fn copy_database_to( + &self, + req: CopyDatabaseRequest, + ctx: QueryContextRef, + ) -> error::Result { // location must end with / so that every table is exported to a file. ensure!( req.location.ends_with('/'), - InvalidCopyParameterSnafu { - key: "location", + InvalidCopyDatabasePathSnafu { value: req.location, } ); @@ -62,6 +65,7 @@ impl StatementExecutor { let mut exported_rows = 0; for table_name in table_names { + // TODO(hl): also handles tables with metric engine. // TODO(hl): remove this hardcode once we've removed numbers table. if table_name == "numbers" { continue; @@ -87,7 +91,7 @@ impl StatementExecutor { direction: CopyDirection::Export, timestamp_range: req.time_range, }, - QueryContextBuilder::default().build(), + ctx.clone(), ) .await?; exported_rows += exported; @@ -105,8 +109,7 @@ impl StatementExecutor { // location must end with / ensure!( req.location.ends_with('/'), - InvalidCopyParameterSnafu { - key: "location", + InvalidCopyDatabasePathSnafu { value: req.location, } ); diff --git a/src/operator/src/statement/copy_table_to.rs b/src/operator/src/statement/copy_table_to.rs index d0a015e0a765..24390e64270e 100644 --- a/src/operator/src/statement/copy_table_to.rs +++ b/src/operator/src/statement/copy_table_to.rs @@ -111,15 +111,17 @@ impl StatementExecutor { let table_provider = Arc::new(DfTableProviderAdapter::new(table)); let table_source = Arc::new(DefaultTableSource::new(table_provider)); - let plan = LogicalPlanBuilder::scan_with_filters( + let mut builder = LogicalPlanBuilder::scan_with_filters( df_table_ref.to_owned_reference(), table_source, None, - filters, + filters.clone(), ) - .context(BuildDfLogicalPlanSnafu)? - .build() .context(BuildDfLogicalPlanSnafu)?; + for f in filters { + builder = builder.filter(f).context(BuildDfLogicalPlanSnafu)?; + } + let plan = builder.build().context(BuildDfLogicalPlanSnafu)?; let output = self .query_engine diff --git a/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.result b/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.result index b02a1e50d53c..8d0ef6e834a3 100644 --- a/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.result +++ b/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.result @@ -10,6 +10,10 @@ COPY DATABASE public TO '/tmp/demo/export/parquet/' WITH (FORMAT="parquet"); Affected Rows: 2 +COPY DATABASE public TO '/tmp/demo/export/parquet_range/' WITH (FORMAT="parquet", start_time='2022-06-15 07:02:37.000Z', end_time='2022-06-15 07:02:37.1Z'); + +Affected Rows: 1 + DELETE FROM demo; Affected Rows: 2 @@ -32,6 +36,22 @@ SELECT * FROM demo ORDER BY ts; | host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 | +-------+------+--------+---------------------+ +DELETE FROM demo; + +Affected Rows: 2 + +COPY DATABASE public FROM '/tmp/demo/export/parquet_range/'; + +Affected Rows: 1 + +SELECT * FROM demo ORDER BY ts; + ++-------+------+--------+---------------------+ +| host | cpu | memory | ts | ++-------+------+--------+---------------------+ +| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 | ++-------+------+--------+---------------------+ + DROP TABLE demo; Affected Rows: 0 diff --git a/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.sql b/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.sql index 05635506074f..80fee15303d3 100644 --- a/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.sql +++ b/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.sql @@ -4,6 +4,8 @@ INSERT INTO demo(host, cpu, memory, ts) values ('host1', 66.6, 1024, 16552765570 COPY DATABASE public TO '/tmp/demo/export/parquet/' WITH (FORMAT="parquet"); +COPY DATABASE public TO '/tmp/demo/export/parquet_range/' WITH (FORMAT="parquet", start_time='2022-06-15 07:02:37.000Z', end_time='2022-06-15 07:02:37.1Z'); + DELETE FROM demo; SELECT * FROM demo ORDER BY ts; @@ -12,4 +14,10 @@ COPY DATABASE public FROM '/tmp/demo/export/parquet/'; SELECT * FROM demo ORDER BY ts; +DELETE FROM demo; + +COPY DATABASE public FROM '/tmp/demo/export/parquet_range/'; + +SELECT * FROM demo ORDER BY ts; + DROP TABLE demo; From eb05233b3a7900ba5d75c4917417682261c1cc50 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Sun, 21 Jan 2024 11:12:22 +0000 Subject: [PATCH 09/10] chore: resolve confilicts --- src/frontend/src/instance.rs | 2 +- src/sql/src/parsers/copy_parser.rs | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index f09d9d86a4e6..f73d490de74f 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -70,7 +70,7 @@ use servers::server::{start_server, ServerHandlers}; use session::context::QueryContextRef; use snafu::prelude::*; use sql::dialect::Dialect; -use sql::parser::{ParseOptions, ParserContext, ParserContext}; +use sql::parser::{ParseOptions, ParserContext}; use sql::statements::copy::{CopyDatabase, CopyTable}; use sql::statements::statement::Statement; use sqlparser::ast::ObjectName; diff --git a/src/sql/src/parsers/copy_parser.rs b/src/sql/src/parsers/copy_parser.rs index aecdf3341e96..bd365e51ef3a 100644 --- a/src/sql/src/parsers/copy_parser.rs +++ b/src/sql/src/parsers/copy_parser.rs @@ -404,10 +404,11 @@ mod tests { #[test] fn test_copy_database_from() { let sql = "COPY DATABASE catalog0.schema0 FROM '/a/b/c/' WITH (FORMAT = 'parquet') CONNECTION (FOO='Bar', ONE='two')"; - let stmt = ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}) - .unwrap() - .pop() - .unwrap(); + let stmt = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap() + .pop() + .unwrap(); let Copy(crate::statements::copy::Copy::CopyDatabase(stmt)) = stmt else { unreachable!() From 132407f50a205619202cc1073cba3955a1a2cdb8 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Mon, 22 Jan 2024 06:18:40 +0000 Subject: [PATCH 10/10] fix: some cr comments --- src/operator/src/statement/copy_database.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/operator/src/statement/copy_database.rs b/src/operator/src/statement/copy_database.rs index 29abec49e291..0ab4d09cf752 100644 --- a/src/operator/src/statement/copy_database.rs +++ b/src/operator/src/statement/copy_database.rs @@ -23,7 +23,7 @@ use common_telemetry::{debug, error, info, tracing}; use object_store::Entry; use regex::Regex; use session::context::QueryContextRef; -use snafu::{ensure, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest}; use crate::error; @@ -179,11 +179,8 @@ fn parse_file_name_to_copy(e: &Entry) -> error::Result { .file_stem() .and_then(|os_str| os_str.to_str()) .map(|s| s.to_string()) - .ok_or_else(|| { - error::InvalidTableNameSnafu { - table_name: e.name().to_string(), - } - .build() + .context(error::InvalidTableNameSnafu { + table_name: e.name().to_string(), }) }