From 9c842871d42f889cd9ab34098d44aaf571075205 Mon Sep 17 00:00:00 2001 From: shuiyisong Date: Tue, 17 Oct 2023 15:00:11 +0800 Subject: [PATCH 1/8] chore: cr issue --- src/cmd/src/cli/export.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/cmd/src/cli/export.rs b/src/cmd/src/cli/export.rs index e0b9cd6be5b3..68bc48fa03d9 100644 --- a/src/cmd/src/cli/export.rs +++ b/src/cmd/src/cli/export.rs @@ -332,16 +332,20 @@ impl Export { .into_iter() .map(|file| { let file = file.unwrap(); - let filename = file.file_name().into_string().unwrap(); + let table_name = file + .file_name() + .into_string() + .unwrap() + .replace(".parquet", ""); format!( "copy {} from '{}' with (format='parquet');\n", - filename.replace(".parquet", ""), + table_name, file.path().to_str().unwrap() ) }) - .collect::>() - .join(""); + .collect::(); + file.write_all(copy_from_sql.as_bytes()) .await .context(FileIoSnafu)?; From 93ff6086373fc8470de753c03c14bd6e1fef1106 Mon Sep 17 00:00:00 2001 From: shuiyisong Date: Tue, 17 Oct 2023 15:25:28 +0800 Subject: [PATCH 2/8] chore: cr issue --- src/cmd/src/cli/export.rs | 36 +++++++++++++++++++----------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/src/cmd/src/cli/export.rs b/src/cmd/src/cli/export.rs index 68bc48fa03d9..cf5f442ec456 100644 --- a/src/cmd/src/cli/export.rs +++ b/src/cmd/src/cli/export.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt::Write; use std::path::Path; use std::sync::Arc; @@ -328,23 +329,24 @@ impl Export { Path::new(&self.output_dir).join(format!("{catalog}-{schema}_copy_from.sql")); let mut file = File::create(copy_from_file).await.context(FileIoSnafu)?; - let copy_from_sql = dir_filenames - .into_iter() - .map(|file| { - let file = file.unwrap(); - let table_name = file - .file_name() - .into_string() - .unwrap() - .replace(".parquet", ""); - - format!( - "copy {} from '{}' with (format='parquet');\n", - table_name, - file.path().to_str().unwrap() - ) - }) - .collect::(); + let copy_from_sql = + dir_filenames + .into_iter() + .fold(String::new(), |mut acc, file| { + let file = file.unwrap(); + let table_name = file + .file_name() + .into_string() + .unwrap() + .replace(".parquet", ""); + let _ = write!( + acc, + "copy {} from '{}' with (format='parquet');\n", + table_name, + file.path().to_str().unwrap() + ); + acc + }); file.write_all(copy_from_sql.as_bytes()) .await From ceb9d6a83a854f6c8e244ed095d7f0ff594e5b3e Mon Sep 17 00:00:00 2001 From: shuiyisong Date: Tue, 17 Oct 2023 15:54:09 +0800 Subject: [PATCH 3/8] chore: skip information schema --- src/cmd/src/cli/export.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/cmd/src/cli/export.rs b/src/cmd/src/cli/export.rs index cf5f442ec456..7424e9e49306 100644 --- a/src/cmd/src/cli/export.rs +++ b/src/cmd/src/cli/export.rs @@ -142,6 +142,9 @@ impl Export { let mut result = Vec::with_capacity(schemas.len()); for i in 0..schemas.len() { let schema = schemas.get_data(i).unwrap().to_owned(); + if schema == common_catalog::consts::INFORMATION_SCHEMA_NAME { + continue; + } result.push((self.catalog.clone(), schema)); } Ok(result) From ca7a4af6b5379166551af676e06c6db4359cf371 Mon Sep 17 00:00:00 2001 From: shuiyisong Date: Tue, 17 Oct 2023 16:23:26 +0800 Subject: [PATCH 4/8] chore: fix clippy --- src/cmd/src/cli/export.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cmd/src/cli/export.rs b/src/cmd/src/cli/export.rs index 7424e9e49306..5f3ab79f1072 100644 --- a/src/cmd/src/cli/export.rs +++ b/src/cmd/src/cli/export.rs @@ -342,9 +342,9 @@ impl Export { .into_string() .unwrap() .replace(".parquet", ""); - let _ = write!( + let _ = writeln!( acc, - "copy {} from '{}' with (format='parquet');\n", + "copy {} from '{}' with (format='parquet');", table_name, file.path().to_str().unwrap() ); From 4765673138e3db98829ebe33e8d166899dddd9d6 Mon Sep 17 00:00:00 2001 From: shuiyisong Date: Wed, 18 Oct 2023 18:20:11 +0800 Subject: [PATCH 5/8] chore: add basic auth support to cli export --- src/cmd/src/cli/export.rs | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/src/cmd/src/cli/export.rs b/src/cmd/src/cli/export.rs index 5f3ab79f1072..e9d67875cd21 100644 --- a/src/cmd/src/cli/export.rs +++ b/src/cmd/src/cli/export.rs @@ -18,6 +18,8 @@ use std::sync::Arc; use async_trait::async_trait; use clap::{Parser, ValueEnum}; +use client::api::v1::auth_header::AuthScheme; +use client::api::v1::Basic; use client::{Client, Database, DEFAULT_SCHEMA_NAME}; use common_query::Output; use common_recordbatch::util::collect; @@ -32,7 +34,8 @@ use tokio::sync::Semaphore; use crate::cli::{Instance, Tool}; use crate::error::{ CollectRecordBatchesSnafu, ConnectServerSnafu, EmptyResultSnafu, Error, FileIoSnafu, - InvalidDatabaseNameSnafu, NotDataFromOutputSnafu, RequestDatabaseSnafu, Result, + IllegalConfigSnafu, InvalidDatabaseNameSnafu, NotDataFromOutputSnafu, RequestDatabaseSnafu, + Result, }; type TableReference = (String, String, String); @@ -71,6 +74,10 @@ pub struct ExportCommand { /// Things to export #[clap(long, short = 't', value_enum)] target: ExportTarget, + + /// basic authentication for connecting to the server + #[clap(long)] + auth_basic: Option, } impl ExportCommand { @@ -83,12 +90,22 @@ impl ExportCommand { addr: self.addr.clone(), })?; let (catalog, schema) = split_database(&self.database)?; - let database_client = Database::new( + let mut database_client = Database::new( catalog.clone(), schema.clone().unwrap_or(DEFAULT_SCHEMA_NAME.to_string()), client, ); + if let Some(auth_basic) = &self.auth_basic { + let (username, password) = auth_basic.split_once(':').context(IllegalConfigSnafu { + msg: "auth_basic cannot be split by ':'".to_string(), + })?; + database_client.set_auth(AuthScheme::Basic(Basic { + username: username.to_string(), + password: password.to_string(), + })); + } + Ok(Instance::Tool(Box::new(Export { client: database_client, catalog, From 49d54be06e717b56a07e692d0ebc1f3a88258f57 Mon Sep 17 00:00:00 2001 From: shuiyisong Date: Mon, 6 Nov 2023 11:13:32 +0800 Subject: [PATCH 6/8] chore: fix cr issue --- src/cmd/src/cli/export.rs | 39 ++++++++++++++++++--------------------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/src/cmd/src/cli/export.rs b/src/cmd/src/cli/export.rs index e9d67875cd21..fabebe8ebdeb 100644 --- a/src/cmd/src/cli/export.rs +++ b/src/cmd/src/cli/export.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt::Write; use std::path::Path; use std::sync::Arc; @@ -349,28 +348,26 @@ impl Export { Path::new(&self.output_dir).join(format!("{catalog}-{schema}_copy_from.sql")); let mut file = File::create(copy_from_file).await.context(FileIoSnafu)?; - let copy_from_sql = - dir_filenames - .into_iter() - .fold(String::new(), |mut acc, file| { - let file = file.unwrap(); - let table_name = file - .file_name() - .into_string() - .unwrap() - .replace(".parquet", ""); - let _ = writeln!( - acc, - "copy {} from '{}' with (format='parquet');", - table_name, - file.path().to_str().unwrap() - ); - acc - }); - - file.write_all(copy_from_sql.as_bytes()) + for table_file in dir_filenames { + let table_file = table_file.unwrap(); + let table_name = table_file + .file_name() + .into_string() + .unwrap() + .replace(".parquet", ""); + + file.write( + format!( + "copy {} from '{}' with (format='parquet');\n", + table_name, + table_file.path().to_str().unwrap() + ) + .as_bytes(), + ) .await .context(FileIoSnafu)?; + file.flush().await.context(FileIoSnafu)?; + } info!("finished exporting {catalog}.{schema} copy_from.sql"); From 6986f5723aa37aca29ecd93b033c1f5d0f15995a Mon Sep 17 00:00:00 2001 From: shuiyisong <113876041+shuiyisong@users.noreply.github.com> Date: Tue, 7 Nov 2023 00:26:25 +0800 Subject: [PATCH 7/8] chore: reduce `flush` invocation Co-authored-by: Yingwen --- src/cmd/src/cli/export.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cmd/src/cli/export.rs b/src/cmd/src/cli/export.rs index fabebe8ebdeb..0ecca39dbe13 100644 --- a/src/cmd/src/cli/export.rs +++ b/src/cmd/src/cli/export.rs @@ -366,8 +366,8 @@ impl Export { ) .await .context(FileIoSnafu)?; - file.flush().await.context(FileIoSnafu)?; } + file.flush().await.context(FileIoSnafu)?; info!("finished exporting {catalog}.{schema} copy_from.sql"); From 0cd5392d14788b2297e04bd98f5d6cee52d1888c Mon Sep 17 00:00:00 2001 From: shuiyisong Date: Tue, 7 Nov 2023 00:37:23 +0800 Subject: [PATCH 8/8] chore: use BufWriter --- src/cmd/src/cli/export.rs | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/src/cmd/src/cli/export.rs b/src/cmd/src/cli/export.rs index 0ecca39dbe13..5932df399cfc 100644 --- a/src/cmd/src/cli/export.rs +++ b/src/cmd/src/cli/export.rs @@ -27,7 +27,7 @@ use datatypes::scalars::ScalarVector; use datatypes::vectors::{StringVector, Vector}; use snafu::{OptionExt, ResultExt}; use tokio::fs::File; -use tokio::io::AsyncWriteExt; +use tokio::io::{AsyncWriteExt, BufWriter}; use tokio::sync::Semaphore; use crate::cli::{Instance, Tool}; @@ -346,7 +346,8 @@ impl Export { let copy_from_file = Path::new(&self.output_dir).join(format!("{catalog}-{schema}_copy_from.sql")); - let mut file = File::create(copy_from_file).await.context(FileIoSnafu)?; + let mut writer = + BufWriter::new(File::create(copy_from_file).await.context(FileIoSnafu)?); for table_file in dir_filenames { let table_file = table_file.unwrap(); @@ -356,18 +357,19 @@ impl Export { .unwrap() .replace(".parquet", ""); - file.write( - format!( - "copy {} from '{}' with (format='parquet');\n", - table_name, - table_file.path().to_str().unwrap() + writer + .write( + format!( + "copy {} from '{}' with (format='parquet');\n", + table_name, + table_file.path().to_str().unwrap() + ) + .as_bytes(), ) - .as_bytes(), - ) - .await - .context(FileIoSnafu)?; + .await + .context(FileIoSnafu)?; } - file.flush().await.context(FileIoSnafu)?; + writer.flush().await.context(FileIoSnafu)?; info!("finished exporting {catalog}.{schema} copy_from.sql");