diff --git a/src/cmd/src/cli/export.rs b/src/cmd/src/cli/export.rs index e0b9cd6be5b3..5932df399cfc 100644 --- a/src/cmd/src/cli/export.rs +++ b/src/cmd/src/cli/export.rs @@ -17,6 +17,8 @@ use std::sync::Arc; use async_trait::async_trait; use clap::{Parser, ValueEnum}; +use client::api::v1::auth_header::AuthScheme; +use client::api::v1::Basic; use client::{Client, Database, DEFAULT_SCHEMA_NAME}; use common_query::Output; use common_recordbatch::util::collect; @@ -25,13 +27,14 @@ use datatypes::scalars::ScalarVector; use datatypes::vectors::{StringVector, Vector}; use snafu::{OptionExt, ResultExt}; use tokio::fs::File; -use tokio::io::AsyncWriteExt; +use tokio::io::{AsyncWriteExt, BufWriter}; use tokio::sync::Semaphore; use crate::cli::{Instance, Tool}; use crate::error::{ CollectRecordBatchesSnafu, ConnectServerSnafu, EmptyResultSnafu, Error, FileIoSnafu, - InvalidDatabaseNameSnafu, NotDataFromOutputSnafu, RequestDatabaseSnafu, Result, + IllegalConfigSnafu, InvalidDatabaseNameSnafu, NotDataFromOutputSnafu, RequestDatabaseSnafu, + Result, }; type TableReference = (String, String, String); @@ -70,6 +73,10 @@ pub struct ExportCommand { /// Things to export #[clap(long, short = 't', value_enum)] target: ExportTarget, + + /// basic authentication for connecting to the server + #[clap(long)] + auth_basic: Option, } impl ExportCommand { @@ -82,12 +89,22 @@ impl ExportCommand { addr: self.addr.clone(), })?; let (catalog, schema) = split_database(&self.database)?; - let database_client = Database::new( + let mut database_client = Database::new( catalog.clone(), schema.clone().unwrap_or(DEFAULT_SCHEMA_NAME.to_string()), client, ); + if let Some(auth_basic) = &self.auth_basic { + let (username, password) = auth_basic.split_once(':').context(IllegalConfigSnafu { + msg: "auth_basic cannot be split by ':'".to_string(), + })?; + database_client.set_auth(AuthScheme::Basic(Basic { + username: username.to_string(), + password: password.to_string(), + })); + } + Ok(Instance::Tool(Box::new(Export { client: database_client, catalog, @@ -141,6 +158,9 @@ impl Export { let mut result = Vec::with_capacity(schemas.len()); for i in 0..schemas.len() { let schema = schemas.get_data(i).unwrap().to_owned(); + if schema == common_catalog::consts::INFORMATION_SCHEMA_NAME { + continue; + } result.push((self.catalog.clone(), schema)); } Ok(result) @@ -326,25 +346,30 @@ impl Export { let copy_from_file = Path::new(&self.output_dir).join(format!("{catalog}-{schema}_copy_from.sql")); - let mut file = File::create(copy_from_file).await.context(FileIoSnafu)?; - - let copy_from_sql = dir_filenames - .into_iter() - .map(|file| { - let file = file.unwrap(); - let filename = file.file_name().into_string().unwrap(); - - format!( - "copy {} from '{}' with (format='parquet');\n", - filename.replace(".parquet", ""), - file.path().to_str().unwrap() + let mut writer = + BufWriter::new(File::create(copy_from_file).await.context(FileIoSnafu)?); + + for table_file in dir_filenames { + let table_file = table_file.unwrap(); + let table_name = table_file + .file_name() + .into_string() + .unwrap() + .replace(".parquet", ""); + + writer + .write( + format!( + "copy {} from '{}' with (format='parquet');\n", + table_name, + table_file.path().to_str().unwrap() + ) + .as_bytes(), ) - }) - .collect::>() - .join(""); - file.write_all(copy_from_sql.as_bytes()) - .await - .context(FileIoSnafu)?; + .await + .context(FileIoSnafu)?; + } + writer.flush().await.context(FileIoSnafu)?; info!("finished exporting {catalog}.{schema} copy_from.sql");