diff --git a/Cargo.lock b/Cargo.lock index 713d8553679e..83b5ddcfee80 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5768,6 +5768,7 @@ dependencies = [ "common-recordbatch", "common-runtime", "common-telemetry", + "common-test-util", "common-time", "datafusion", "datafusion-common", diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 72b79947548a..f73d490de74f 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -71,7 +71,7 @@ use session::context::QueryContextRef; use snafu::prelude::*; use sql::dialect::Dialect; use sql::parser::{ParseOptions, ParserContext}; -use sql::statements::copy::CopyTable; +use sql::statements::copy::{CopyDatabase, CopyTable}; use sql::statements::statement::Statement; use sqlparser::ast::ObjectName; pub use standalone::StandaloneDatanodeManager; @@ -487,8 +487,11 @@ pub fn check_permission( validate_param(©_table_from.table_name, query_ctx)? } }, - Statement::Copy(sql::statements::copy::Copy::CopyDatabase(stmt)) => { - validate_param(&stmt.database_name, query_ctx)? + Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => { + match copy_database { + CopyDatabase::To(stmt) => validate_param(&stmt.database_name, query_ctx)?, + CopyDatabase::From(stmt) => validate_param(&stmt.database_name, query_ctx)?, + } } Statement::TruncateTable(stmt) => { validate_param(stmt.table_name(), query_ctx)?; diff --git a/src/operator/Cargo.toml b/src/operator/Cargo.toml index a2de8e0102ae..29d6dc215833 100644 --- a/src/operator/Cargo.toml +++ b/src/operator/Cargo.toml @@ -54,3 +54,6 @@ substrait.workspace = true table.workspace = true tokio.workspace = true tonic.workspace = true + +[dev-dependencies] +common-test-util.workspace = true diff --git a/src/operator/src/error.rs b/src/operator/src/error.rs index 0492be12db75..e9b8501b5631 100644 --- a/src/operator/src/error.rs +++ b/src/operator/src/error.rs @@ -418,6 +418,9 @@ pub enum Error { location: Location, }, + #[snafu(display("Invalid COPY DATABASE location, must end with '/': {}", value))] + InvalidCopyDatabasePath { value: String, location: Location }, + #[snafu(display("Table metadata manager error"))] TableMetadataManager { source: common_meta::error::Error, @@ -596,7 +599,9 @@ impl ErrorExt for Error { | Error::BuildBackend { source, .. } => source.status_code(), Error::ExecuteDdl { source, .. } => source.status_code(), - Error::InvalidCopyParameter { .. } => StatusCode::InvalidArguments, + Error::InvalidCopyParameter { .. } | Error::InvalidCopyDatabasePath { .. } => { + StatusCode::InvalidArguments + } Error::ReadRecordBatch { source, .. } | Error::BuildColumnVectors { source, .. } => { source.status_code() diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index 84efcd13addd..93106b8587a1 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod backup; +mod copy_database; mod copy_table_from; mod copy_table_to; mod ddl; @@ -41,7 +41,7 @@ use query::plan::LogicalPlan; use query::QueryEngineRef; use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt}; -use sql::statements::copy::{CopyDatabaseArgument, CopyTable, CopyTableArgument}; +use sql::statements::copy::{CopyDatabase, CopyDatabaseArgument, CopyTable, CopyTableArgument}; use sql::statements::statement::Statement; use sql::statements::OptionMap; use sql::util::format_raw_object_name; @@ -55,7 +55,7 @@ use crate::error::{ PlanStatementSnafu, Result, TableNotFoundSnafu, }; use crate::insert::InserterRef; -use crate::statement::backup::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY}; +use crate::statement::copy_database::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY}; use crate::table::table_idents_to_full_name; #[derive(Clone)] @@ -131,9 +131,23 @@ impl StatementExecutor { } } - Statement::Copy(sql::statements::copy::Copy::CopyDatabase(arg)) => { - self.copy_database(to_copy_database_request(arg, &query_ctx)?) - .await + Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => { + match copy_database { + CopyDatabase::To(arg) => { + self.copy_database_to( + to_copy_database_request(arg, &query_ctx)?, + query_ctx.clone(), + ) + .await + } + CopyDatabase::From(arg) => { + self.copy_database_from( + to_copy_database_request(arg, &query_ctx)?, + query_ctx, + ) + .await + } + } } Statement::CreateTable(stmt) => { diff --git a/src/operator/src/statement/backup.rs b/src/operator/src/statement/backup.rs deleted file mode 100644 index d970e28bd740..000000000000 --- a/src/operator/src/statement/backup.rs +++ /dev/null @@ -1,89 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use common_datasource::file_format::Format; -use common_query::Output; -use common_telemetry::{info, tracing}; -use session::context::QueryContextBuilder; -use snafu::{ensure, ResultExt}; -use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest}; - -use crate::error; -use crate::error::{CatalogSnafu, InvalidCopyParameterSnafu}; -use crate::statement::StatementExecutor; - -pub(crate) const COPY_DATABASE_TIME_START_KEY: &str = "start_time"; -pub(crate) const COPY_DATABASE_TIME_END_KEY: &str = "end_time"; - -impl StatementExecutor { - #[tracing::instrument(skip_all)] - pub(crate) async fn copy_database(&self, req: CopyDatabaseRequest) -> error::Result { - // location must end with / so that every table is exported to a file. - ensure!( - req.location.ends_with('/'), - InvalidCopyParameterSnafu { - key: "location", - value: req.location, - } - ); - - info!( - "Copy database {}.{}, dir: {},. time: {:?}", - req.catalog_name, req.schema_name, req.location, req.time_range - ); - let table_names = self - .catalog_manager - .table_names(&req.catalog_name, &req.schema_name) - .await - .context(CatalogSnafu)?; - - let suffix = Format::try_from(&req.with) - .context(error::ParseFileFormatSnafu)? - .suffix(); - - let mut exported_rows = 0; - for table_name in table_names { - // TODO(hl): remove this hardcode once we've removed numbers table. - if table_name == "numbers" { - continue; - } - let mut table_file = req.location.clone(); - table_file.push_str(&table_name); - table_file.push_str(suffix); - info!( - "Copy table: {}.{}.{} to {}", - req.catalog_name, req.schema_name, table_name, table_file - ); - - let exported = self - .copy_table_to( - CopyTableRequest { - catalog_name: req.catalog_name.clone(), - schema_name: req.schema_name.clone(), - table_name, - location: table_file, - with: req.with.clone(), - connection: req.connection.clone(), - pattern: None, - direction: CopyDirection::Export, - timestamp_range: req.time_range, - }, - QueryContextBuilder::default().build(), - ) - .await?; - exported_rows += exported; - } - Ok(Output::AffectedRows(exported_rows)) - } -} diff --git a/src/operator/src/statement/copy_database.rs b/src/operator/src/statement/copy_database.rs new file mode 100644 index 000000000000..0ab4d09cf752 --- /dev/null +++ b/src/operator/src/statement/copy_database.rs @@ -0,0 +1,250 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::path::Path; +use std::str::FromStr; + +use common_datasource::file_format::Format; +use common_datasource::lister::{Lister, Source}; +use common_datasource::object_store::build_backend; +use common_query::Output; +use common_telemetry::{debug, error, info, tracing}; +use object_store::Entry; +use regex::Regex; +use session::context::QueryContextRef; +use snafu::{ensure, OptionExt, ResultExt}; +use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest}; + +use crate::error; +use crate::error::{CatalogSnafu, InvalidCopyDatabasePathSnafu}; +use crate::statement::StatementExecutor; + +pub(crate) const COPY_DATABASE_TIME_START_KEY: &str = "start_time"; +pub(crate) const COPY_DATABASE_TIME_END_KEY: &str = "end_time"; +pub(crate) const CONTINUE_ON_ERROR_KEY: &str = "continue_on_error"; + +impl StatementExecutor { + #[tracing::instrument(skip_all)] + pub(crate) async fn copy_database_to( + &self, + req: CopyDatabaseRequest, + ctx: QueryContextRef, + ) -> error::Result { + // location must end with / so that every table is exported to a file. + ensure!( + req.location.ends_with('/'), + InvalidCopyDatabasePathSnafu { + value: req.location, + } + ); + + info!( + "Copy database {}.{} to dir: {}, time: {:?}", + req.catalog_name, req.schema_name, req.location, req.time_range + ); + let table_names = self + .catalog_manager + .table_names(&req.catalog_name, &req.schema_name) + .await + .context(CatalogSnafu)?; + + let suffix = Format::try_from(&req.with) + .context(error::ParseFileFormatSnafu)? + .suffix(); + + let mut exported_rows = 0; + for table_name in table_names { + // TODO(hl): also handles tables with metric engine. + // TODO(hl): remove this hardcode once we've removed numbers table. + if table_name == "numbers" { + continue; + } + let mut table_file = req.location.clone(); + table_file.push_str(&table_name); + table_file.push_str(suffix); + info!( + "Copy table: {}.{}.{} to {}", + req.catalog_name, req.schema_name, table_name, table_file + ); + + let exported = self + .copy_table_to( + CopyTableRequest { + catalog_name: req.catalog_name.clone(), + schema_name: req.schema_name.clone(), + table_name, + location: table_file, + with: req.with.clone(), + connection: req.connection.clone(), + pattern: None, + direction: CopyDirection::Export, + timestamp_range: req.time_range, + }, + ctx.clone(), + ) + .await?; + exported_rows += exported; + } + Ok(Output::AffectedRows(exported_rows)) + } + + /// Imports data to database from a given location and returns total rows imported. + #[tracing::instrument(skip_all)] + pub(crate) async fn copy_database_from( + &self, + req: CopyDatabaseRequest, + ctx: QueryContextRef, + ) -> error::Result { + // location must end with / + ensure!( + req.location.ends_with('/'), + InvalidCopyDatabasePathSnafu { + value: req.location, + } + ); + + info!( + "Copy database {}.{} from dir: {}, time: {:?}", + req.catalog_name, req.schema_name, req.location, req.time_range + ); + let suffix = Format::try_from(&req.with) + .context(error::ParseFileFormatSnafu)? + .suffix(); + + let entries = list_files_to_copy(&req, suffix).await?; + + let continue_on_error = req + .with + .get(CONTINUE_ON_ERROR_KEY) + .and_then(|v| bool::from_str(v).ok()) + .unwrap_or(false); + let mut rows_inserted = 0; + + for e in entries { + let table_name = match parse_file_name_to_copy(&e) { + Ok(table_name) => table_name, + Err(err) => { + if continue_on_error { + error!(err; "Failed to import table from file: {:?}", e); + continue; + } else { + return Err(err); + } + } + }; + let req = CopyTableRequest { + catalog_name: req.catalog_name.clone(), + schema_name: req.schema_name.clone(), + table_name: table_name.clone(), + location: format!("{}/{}", req.location, e.path()), + with: req.with.clone(), + connection: req.connection.clone(), + pattern: None, + direction: CopyDirection::Import, + timestamp_range: None, + }; + debug!("Copy table, arg: {:?}", req); + match self.copy_table_from(req, ctx.clone()).await { + Ok(rows) => { + rows_inserted += rows; + } + Err(err) => { + if continue_on_error { + error!(err; "Failed to import file to table: {}", table_name); + continue; + } else { + return Err(err); + } + } + } + } + Ok(Output::AffectedRows(rows_inserted)) + } +} + +/// Parses table names from files' names. +fn parse_file_name_to_copy(e: &Entry) -> error::Result { + Path::new(e.name()) + .file_stem() + .and_then(|os_str| os_str.to_str()) + .map(|s| s.to_string()) + .context(error::InvalidTableNameSnafu { + table_name: e.name().to_string(), + }) +} + +/// Lists all files with expected suffix that can be imported to database. +async fn list_files_to_copy(req: &CopyDatabaseRequest, suffix: &str) -> error::Result> { + let object_store = + build_backend(&req.location, &req.connection).context(error::BuildBackendSnafu)?; + + let pattern = Regex::try_from(format!(".*{}", suffix)).context(error::BuildRegexSnafu)?; + let lister = Lister::new( + object_store.clone(), + Source::Dir, + "/".to_string(), + Some(pattern), + ); + lister.list().await.context(error::ListObjectsSnafu) +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use object_store::services::Fs; + use object_store::util::normalize_dir; + use object_store::ObjectStore; + use table::requests::CopyDatabaseRequest; + + use crate::statement::copy_database::{list_files_to_copy, parse_file_name_to_copy}; + + #[tokio::test] + async fn test_list_files_and_parse_table_name() { + let dir = common_test_util::temp_dir::create_temp_dir("test_list_files_to_copy"); + let store_dir = normalize_dir(dir.path().to_str().unwrap()); + let mut builder = Fs::default(); + let _ = builder.root(&store_dir); + let object_store = ObjectStore::new(builder).unwrap().finish(); + object_store.write("a.parquet", "").await.unwrap(); + object_store.write("b.parquet", "").await.unwrap(); + object_store.write("c.csv", "").await.unwrap(); + object_store.write("d", "").await.unwrap(); + object_store.write("e.f.parquet", "").await.unwrap(); + + let request = CopyDatabaseRequest { + catalog_name: "catalog_0".to_string(), + schema_name: "schema_0".to_string(), + location: store_dir, + with: [("FORMAT".to_string(), "parquet".to_string())] + .into_iter() + .collect(), + connection: Default::default(), + time_range: None, + }; + let listed = list_files_to_copy(&request, ".parquet") + .await + .unwrap() + .into_iter() + .map(|e| parse_file_name_to_copy(&e).unwrap()) + .collect::>(); + + assert_eq!( + ["a".to_string(), "b".to_string(), "e.f".to_string()] + .into_iter() + .collect::>(), + listed + ); + } +} diff --git a/src/operator/src/statement/copy_table_to.rs b/src/operator/src/statement/copy_table_to.rs index d0a015e0a765..24390e64270e 100644 --- a/src/operator/src/statement/copy_table_to.rs +++ b/src/operator/src/statement/copy_table_to.rs @@ -111,15 +111,17 @@ impl StatementExecutor { let table_provider = Arc::new(DfTableProviderAdapter::new(table)); let table_source = Arc::new(DefaultTableSource::new(table_provider)); - let plan = LogicalPlanBuilder::scan_with_filters( + let mut builder = LogicalPlanBuilder::scan_with_filters( df_table_ref.to_owned_reference(), table_source, None, - filters, + filters.clone(), ) - .context(BuildDfLogicalPlanSnafu)? - .build() .context(BuildDfLogicalPlanSnafu)?; + for f in filters { + builder = builder.filter(f).context(BuildDfLogicalPlanSnafu)?; + } + let plan = builder.build().context(BuildDfLogicalPlanSnafu)?; let output = self .query_engine diff --git a/src/sql/src/parsers/copy_parser.rs b/src/sql/src/parsers/copy_parser.rs index eee50e8c940e..bd365e51ef3a 100644 --- a/src/sql/src/parsers/copy_parser.rs +++ b/src/sql/src/parsers/copy_parser.rs @@ -15,13 +15,12 @@ use std::collections::HashMap; use snafu::ResultExt; -use sqlparser::ast::ObjectName; use sqlparser::keywords::Keyword; use sqlparser::tokenizer::Token::Word; use crate::error::{self, Result}; use crate::parser::ParserContext; -use crate::statements::copy::{CopyDatabaseArgument, CopyTable, CopyTableArgument}; +use crate::statements::copy::{CopyDatabase, CopyDatabaseArgument, CopyTable, CopyTableArgument}; use crate::statements::statement::Statement; use crate::util::parse_option_string; @@ -47,7 +46,7 @@ impl<'a> ParserContext<'a> { Ok(Statement::Copy(copy)) } - fn parser_copy_database(&mut self) -> Result { + fn parser_copy_database(&mut self) -> Result { let database_name = self.parser .parse_object_name() @@ -57,17 +56,29 @@ impl<'a> ParserContext<'a> { actual: self.peek_token_as_string(), })?; - self.parser - .expect_keyword(Keyword::TO) - .context(error::SyntaxSnafu)?; - - let (with, connection, location) = self.parse_copy_to()?; - Ok(CopyDatabaseArgument { - database_name, - with: with.into(), - connection: connection.into(), - location, - }) + let req = if self.parser.parse_keyword(Keyword::TO) { + let (with, connection, location) = self.parse_copy_parameters()?; + let argument = CopyDatabaseArgument { + database_name, + with: with.into(), + connection: connection.into(), + location, + }; + CopyDatabase::To(argument) + } else { + self.parser + .expect_keyword(Keyword::FROM) + .context(error::SyntaxSnafu)?; + let (with, connection, location) = self.parse_copy_parameters()?; + let argument = CopyDatabaseArgument { + database_name, + with: with.into(), + connection: connection.into(), + location, + }; + CopyDatabase::From(argument) + }; + Ok(req) } fn parse_copy_table(&mut self) -> Result { @@ -82,7 +93,7 @@ impl<'a> ParserContext<'a> { let table_name = Self::canonicalize_object_name(raw_table_name); if self.parser.parse_keyword(Keyword::TO) { - let (with, connection, location) = self.parse_copy_to()?; + let (with, connection, location) = self.parse_copy_parameters()?; Ok(CopyTable::To(CopyTableArgument { table_name, with: with.into(), @@ -93,52 +104,17 @@ impl<'a> ParserContext<'a> { self.parser .expect_keyword(Keyword::FROM) .context(error::SyntaxSnafu)?; - Ok(CopyTable::From(self.parse_copy_table_from(table_name)?)) + let (with, connection, location) = self.parse_copy_parameters()?; + Ok(CopyTable::From(CopyTableArgument { + table_name, + with: with.into(), + connection: connection.into(), + location, + })) } } - fn parse_copy_table_from(&mut self, table_name: ObjectName) -> Result { - let location = - self.parser - .parse_literal_string() - .with_context(|_| error::UnexpectedSnafu { - sql: self.sql, - expected: "a uri", - actual: self.peek_token_as_string(), - })?; - - let options = self - .parser - .parse_options(Keyword::WITH) - .context(error::SyntaxSnafu)?; - - let with = options - .into_iter() - .filter_map(|option| { - parse_option_string(option.value).map(|v| (option.name.value.to_lowercase(), v)) - }) - .collect(); - - let connection_options = self - .parser - .parse_options(Keyword::CONNECTION) - .context(error::SyntaxSnafu)?; - - let connection = connection_options - .into_iter() - .filter_map(|option| { - parse_option_string(option.value).map(|v| (option.name.value.to_lowercase(), v)) - }) - .collect(); - Ok(CopyTableArgument { - table_name, - with, - connection, - location, - }) - } - - fn parse_copy_to(&mut self) -> Result<(With, Connection, String)> { + fn parse_copy_parameters(&mut self) -> Result<(With, Connection, String)> { let location = self.parser .parse_literal_string() @@ -181,7 +157,7 @@ mod tests { use std::assert_matches::assert_matches; use std::collections::HashMap; - use sqlparser::ast::Ident; + use sqlparser::ast::{Ident, ObjectName}; use super::*; use crate::dialect::GreptimeDbDialect; @@ -398,6 +374,50 @@ mod tests { let Copy(crate::statements::copy::Copy::CopyDatabase(stmt)) = stmt else { unreachable!() }; + + let CopyDatabase::To(stmt) = stmt else { + unreachable!() + }; + + assert_eq!( + ObjectName(vec![Ident::new("catalog0"), Ident::new("schema0")]), + stmt.database_name + ); + assert_eq!( + [("format".to_string(), "parquet".to_string())] + .into_iter() + .collect::>(), + stmt.with.map + ); + + assert_eq!( + [ + ("foo".to_string(), "Bar".to_string()), + ("one".to_string(), "two".to_string()) + ] + .into_iter() + .collect::>(), + stmt.connection.map + ); + } + + #[test] + fn test_copy_database_from() { + let sql = "COPY DATABASE catalog0.schema0 FROM '/a/b/c/' WITH (FORMAT = 'parquet') CONNECTION (FOO='Bar', ONE='two')"; + let stmt = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap() + .pop() + .unwrap(); + + let Copy(crate::statements::copy::Copy::CopyDatabase(stmt)) = stmt else { + unreachable!() + }; + + let CopyDatabase::From(stmt) = stmt else { + unreachable!() + }; + assert_eq!( ObjectName(vec![Ident::new("catalog0"), Ident::new("schema0")]), stmt.database_name diff --git a/src/sql/src/statements/copy.rs b/src/sql/src/statements/copy.rs index cd3893300ba8..8d3104f29e69 100644 --- a/src/sql/src/statements/copy.rs +++ b/src/sql/src/statements/copy.rs @@ -20,7 +20,7 @@ use crate::statements::OptionMap; #[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut)] pub enum Copy { CopyTable(CopyTable), - CopyDatabase(CopyDatabaseArgument), + CopyDatabase(CopyDatabase), } #[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut)] @@ -29,6 +29,12 @@ pub enum CopyTable { From(CopyTableArgument), } +#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut)] +pub enum CopyDatabase { + To(CopyDatabaseArgument), + From(CopyDatabaseArgument), +} + #[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut)] pub struct CopyDatabaseArgument { pub database_name: ObjectName, diff --git a/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.result b/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.result new file mode 100644 index 000000000000..8d0ef6e834a3 --- /dev/null +++ b/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.result @@ -0,0 +1,58 @@ +CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time index); + +Affected Rows: 0 + +INSERT INTO demo(host, cpu, memory, ts) values ('host1', 66.6, 1024, 1655276557000), ('host2', 88.8, 333.3, 1655276558000); + +Affected Rows: 2 + +COPY DATABASE public TO '/tmp/demo/export/parquet/' WITH (FORMAT="parquet"); + +Affected Rows: 2 + +COPY DATABASE public TO '/tmp/demo/export/parquet_range/' WITH (FORMAT="parquet", start_time='2022-06-15 07:02:37.000Z', end_time='2022-06-15 07:02:37.1Z'); + +Affected Rows: 1 + +DELETE FROM demo; + +Affected Rows: 2 + +SELECT * FROM demo ORDER BY ts; + +++ +++ + +COPY DATABASE public FROM '/tmp/demo/export/parquet/'; + +Affected Rows: 2 + +SELECT * FROM demo ORDER BY ts; + ++-------+------+--------+---------------------+ +| host | cpu | memory | ts | ++-------+------+--------+---------------------+ +| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 | +| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 | ++-------+------+--------+---------------------+ + +DELETE FROM demo; + +Affected Rows: 2 + +COPY DATABASE public FROM '/tmp/demo/export/parquet_range/'; + +Affected Rows: 1 + +SELECT * FROM demo ORDER BY ts; + ++-------+------+--------+---------------------+ +| host | cpu | memory | ts | ++-------+------+--------+---------------------+ +| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 | ++-------+------+--------+---------------------+ + +DROP TABLE demo; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.sql b/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.sql new file mode 100644 index 000000000000..80fee15303d3 --- /dev/null +++ b/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.sql @@ -0,0 +1,23 @@ +CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time index); + +INSERT INTO demo(host, cpu, memory, ts) values ('host1', 66.6, 1024, 1655276557000), ('host2', 88.8, 333.3, 1655276558000); + +COPY DATABASE public TO '/tmp/demo/export/parquet/' WITH (FORMAT="parquet"); + +COPY DATABASE public TO '/tmp/demo/export/parquet_range/' WITH (FORMAT="parquet", start_time='2022-06-15 07:02:37.000Z', end_time='2022-06-15 07:02:37.1Z'); + +DELETE FROM demo; + +SELECT * FROM demo ORDER BY ts; + +COPY DATABASE public FROM '/tmp/demo/export/parquet/'; + +SELECT * FROM demo ORDER BY ts; + +DELETE FROM demo; + +COPY DATABASE public FROM '/tmp/demo/export/parquet_range/'; + +SELECT * FROM demo ORDER BY ts; + +DROP TABLE demo;