Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: export tool minor refactor #2612

Merged
merged 8 commits into from
Nov 6, 2023
Merged
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 36 additions & 13 deletions src/cmd/src/cli/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use std::sync::Arc;

use async_trait::async_trait;
use clap::{Parser, ValueEnum};
use client::api::v1::auth_header::AuthScheme;
use client::api::v1::Basic;
use client::{Client, Database, DEFAULT_SCHEMA_NAME};
use common_query::Output;
use common_recordbatch::util::collect;
Expand All @@ -31,7 +33,8 @@ use tokio::sync::Semaphore;
use crate::cli::{Instance, Tool};
use crate::error::{
CollectRecordBatchesSnafu, ConnectServerSnafu, EmptyResultSnafu, Error, FileIoSnafu,
InvalidDatabaseNameSnafu, NotDataFromOutputSnafu, RequestDatabaseSnafu, Result,
IllegalConfigSnafu, InvalidDatabaseNameSnafu, NotDataFromOutputSnafu, RequestDatabaseSnafu,
Result,
};

type TableReference = (String, String, String);
Expand Down Expand Up @@ -70,6 +73,10 @@ pub struct ExportCommand {
/// Things to export
#[clap(long, short = 't', value_enum)]
target: ExportTarget,

/// basic authentication for connecting to the server
#[clap(long)]
auth_basic: Option<String>,
shuiyisong marked this conversation as resolved.
Show resolved Hide resolved
}

impl ExportCommand {
Expand All @@ -82,12 +89,22 @@ impl ExportCommand {
addr: self.addr.clone(),
})?;
let (catalog, schema) = split_database(&self.database)?;
let database_client = Database::new(
let mut database_client = Database::new(
catalog.clone(),
schema.clone().unwrap_or(DEFAULT_SCHEMA_NAME.to_string()),
client,
);

if let Some(auth_basic) = &self.auth_basic {
let (username, password) = auth_basic.split_once(':').context(IllegalConfigSnafu {
msg: "auth_basic cannot be split by ':'".to_string(),
})?;
database_client.set_auth(AuthScheme::Basic(Basic {
username: username.to_string(),
password: password.to_string(),
}));
}

Ok(Instance::Tool(Box::new(Export {
client: database_client,
catalog,
Expand Down Expand Up @@ -141,6 +158,9 @@ impl Export {
let mut result = Vec::with_capacity(schemas.len());
for i in 0..schemas.len() {
let schema = schemas.get_data(i).unwrap().to_owned();
if schema == common_catalog::consts::INFORMATION_SCHEMA_NAME {
continue;
}
result.push((self.catalog.clone(), schema));
}
Ok(result)
Expand Down Expand Up @@ -328,23 +348,26 @@ impl Export {
Path::new(&self.output_dir).join(format!("{catalog}-{schema}_copy_from.sql"));
let mut file = File::create(copy_from_file).await.context(FileIoSnafu)?;
shuiyisong marked this conversation as resolved.
Show resolved Hide resolved

let copy_from_sql = dir_filenames
.into_iter()
.map(|file| {
let file = file.unwrap();
let filename = file.file_name().into_string().unwrap();
for table_file in dir_filenames {
let table_file = table_file.unwrap();
let table_name = table_file
.file_name()
.into_string()
.unwrap()
.replace(".parquet", "");

file.write(
format!(
"copy {} from '{}' with (format='parquet');\n",
filename.replace(".parquet", ""),
file.path().to_str().unwrap()
table_name,
table_file.path().to_str().unwrap()
)
})
.collect::<Vec<_>>()
.join("");
file.write_all(copy_from_sql.as_bytes())
.as_bytes(),
)
.await
.context(FileIoSnafu)?;
file.flush().await.context(FileIoSnafu)?;
}
shuiyisong marked this conversation as resolved.
Show resolved Hide resolved

info!("finished exporting {catalog}.{schema} copy_from.sql");

Expand Down
Loading