From 9056c3a6aa63383a5c626d49114e009f653e5c13 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 17 Oct 2023 09:56:52 +0800 Subject: [PATCH] feat: implement greptime cli export (#2535) * feat: implement greptime cli export Signed-off-by: Ruihang Xia * fix clippy Signed-off-by: Ruihang Xia * read information schema Signed-off-by: Ruihang Xia * parse database name from cli params Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/client/src/database.rs | 7 +- src/cmd/src/cli.rs | 4 + src/cmd/src/cli/export.rs | 311 ++++++++++++++++++++++ src/cmd/src/error.rs | 35 ++- src/common/recordbatch/src/recordbatch.rs | 9 + src/table/src/engine.rs | 2 + 6 files changed, 364 insertions(+), 4 deletions(-) create mode 100644 src/cmd/src/cli/export.rs diff --git a/src/client/src/database.rs b/src/client/src/database.rs index 7e4c33a07417..f3e6738b1d4b 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -167,11 +167,14 @@ impl Database { } } - pub async fn sql(&self, sql: &str) -> Result { + pub async fn sql(&self, sql: S) -> Result + where + S: AsRef, + { let _timer = timer!(metrics::METRIC_GRPC_SQL); self.do_get( Request::Query(QueryRequest { - query: Some(Query::Sql(sql.to_string())), + query: Some(Query::Sql(sql.as_ref().to_string())), }), 0, ) diff --git a/src/cmd/src/cli.rs b/src/cmd/src/cli.rs index d80bd15b8695..ca76e6591ae8 100644 --- a/src/cmd/src/cli.rs +++ b/src/cmd/src/cli.rs @@ -14,6 +14,7 @@ mod bench; mod cmd; +mod export; mod helper; mod repl; // TODO(weny): Removes it @@ -27,6 +28,7 @@ use common_telemetry::logging::LoggingOptions; pub use repl::Repl; use upgrade::UpgradeCommand; +use self::export::ExportCommand; use crate::error::Result; use crate::options::{Options, TopLevelOptions}; @@ -81,6 +83,7 @@ enum SubCommand { // Attach(AttachCommand), Upgrade(UpgradeCommand), Bench(BenchTableMetadataCommand), + Export(ExportCommand), } impl SubCommand { @@ -89,6 +92,7 @@ impl SubCommand { // SubCommand::Attach(cmd) => cmd.build().await, SubCommand::Upgrade(cmd) => cmd.build().await, SubCommand::Bench(cmd) => cmd.build().await, + SubCommand::Export(cmd) => cmd.build().await, } } } diff --git a/src/cmd/src/cli/export.rs b/src/cmd/src/cli/export.rs new file mode 100644 index 000000000000..4ec90132fe6b --- /dev/null +++ b/src/cmd/src/cli/export.rs @@ -0,0 +1,311 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::path::Path; +use std::sync::Arc; + +use async_trait::async_trait; +use clap::{Parser, ValueEnum}; +use client::{Client, Database, DEFAULT_SCHEMA_NAME}; +use common_query::Output; +use common_recordbatch::util::collect; +use common_telemetry::{debug, error, info}; +use datatypes::scalars::ScalarVector; +use datatypes::vectors::{StringVector, Vector}; +use snafu::{OptionExt, ResultExt}; +use tokio::fs::File; +use tokio::io::AsyncWriteExt; +use tokio::sync::Semaphore; + +use crate::cli::{Instance, Tool}; +use crate::error::{ + CollectRecordBatchesSnafu, ConnectServerSnafu, EmptyResultSnafu, Error, FileIoSnafu, + InvalidDatabaseNameSnafu, NotDataFromOutputSnafu, RequestDatabaseSnafu, Result, +}; + +type TableReference = (String, String, String); + +#[derive(Debug, Default, Clone, ValueEnum)] +enum ExportTarget { + /// Corresponding to `SHOW CREATE TABLE` + #[default] + CreateTable, + /// Corresponding to `EXPORT TABLE` + TableData, +} + +#[derive(Debug, Default, Parser)] +pub struct ExportCommand { + /// Server address to connect + #[clap(long)] + addr: String, + + /// Directory to put the exported data. E.g.: /tmp/greptimedb-export + #[clap(long)] + output_dir: String, + + /// The name of the catalog to export. Default to "greptime-*"". + #[clap(long, default_value = "")] + database: String, + + /// Parallelism of the export. + #[clap(long, short = 'j', default_value = "1")] + export_jobs: usize, + + /// Max retry times for each job. + #[clap(long, default_value = "3")] + max_retry: usize, + + /// Things to export + #[clap(long, short = 't', value_enum)] + target: ExportTarget, +} + +impl ExportCommand { + pub async fn build(&self) -> Result { + let client = Client::with_urls([self.addr.clone()]); + client + .health_check() + .await + .with_context(|_| ConnectServerSnafu { + addr: self.addr.clone(), + })?; + let (catalog, schema) = split_database(&self.database)?; + let database_client = Database::new( + catalog.clone(), + schema.clone().unwrap_or(DEFAULT_SCHEMA_NAME.to_string()), + client, + ); + + Ok(Instance::Tool(Box::new(Export { + client: database_client, + catalog, + schema, + output_dir: self.output_dir.clone(), + parallelism: self.export_jobs, + target: self.target.clone(), + }))) + } +} + +pub struct Export { + client: Database, + catalog: String, + schema: Option, + output_dir: String, + parallelism: usize, + target: ExportTarget, +} + +impl Export { + /// Iterate over all db names. + /// + /// Newbie: `db_name` is catalog + schema. + async fn iter_db_names(&self) -> Result> { + if let Some(schema) = &self.schema { + Ok(vec![(self.catalog.clone(), schema.clone())]) + } else { + let mut client = self.client.clone(); + client.set_catalog(self.catalog.clone()); + let result = + client + .sql("show databases") + .await + .with_context(|_| RequestDatabaseSnafu { + sql: "show databases".to_string(), + })?; + let Output::Stream(stream) = result else { + NotDataFromOutputSnafu.fail()? + }; + let record_batch = collect(stream) + .await + .context(CollectRecordBatchesSnafu)? + .pop() + .context(EmptyResultSnafu)?; + let schemas = record_batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let mut result = Vec::with_capacity(schemas.len()); + for i in 0..schemas.len() { + let schema = schemas.get_data(i).unwrap().to_owned(); + result.push((self.catalog.clone(), schema)); + } + Ok(result) + } + } + + /// Return a list of [`TableReference`] to be exported. + /// Includes all tables under the given `catalog` and `schema` + async fn get_table_list(&self, catalog: &str, schema: &str) -> Result> { + // TODO: SQL injection hurts + let sql = format!( + "select table_catalog, table_schema, table_name from \ + information_schema.tables where table_type = \'BASE TABLE\'\ + and table_catalog = \'{catalog}\' and table_schema = \'{schema}\'", + ); + let mut client = self.client.clone(); + client.set_catalog(catalog); + client.set_schema(schema); + let result = client + .sql(&sql) + .await + .with_context(|_| RequestDatabaseSnafu { sql })?; + let Output::Stream(stream) = result else { + NotDataFromOutputSnafu.fail()? + }; + let Some(record_batch) = collect(stream) + .await + .context(CollectRecordBatchesSnafu)? + .pop() + else { + return Ok(vec![]); + }; + + debug!("Fetched table list: {}", record_batch.pretty_print()); + + if record_batch.num_rows() == 0 { + return Ok(vec![]); + } + + let mut result = Vec::with_capacity(record_batch.num_rows()); + let catalog_column = record_batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let schema_column = record_batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + let table_column = record_batch + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..record_batch.num_rows() { + let catalog = catalog_column.get_data(i).unwrap().to_owned(); + let schema = schema_column.get_data(i).unwrap().to_owned(); + let table = table_column.get_data(i).unwrap().to_owned(); + result.push((catalog, schema, table)); + } + + Ok(result) + } + + async fn show_create_table(&self, catalog: &str, schema: &str, table: &str) -> Result { + let sql = format!("show create table {}.{}.{}", catalog, schema, table); + let mut client = self.client.clone(); + client.set_catalog(catalog); + client.set_schema(schema); + let result = client + .sql(&sql) + .await + .with_context(|_| RequestDatabaseSnafu { sql })?; + let Output::Stream(stream) = result else { + NotDataFromOutputSnafu.fail()? + }; + let record_batch = collect(stream) + .await + .context(CollectRecordBatchesSnafu)? + .pop() + .context(EmptyResultSnafu)?; + let create_table = record_batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap() + .get_data(0) + .unwrap(); + + Ok(format!("{create_table};\n")) + } + + async fn export_create_table(&self) -> Result<()> { + let semaphore = Arc::new(Semaphore::new(self.parallelism)); + let db_names = self.iter_db_names().await?; + let db_count = db_names.len(); + let mut tasks = Vec::with_capacity(db_names.len()); + for (catalog, schema) in db_names { + let semaphore_moved = semaphore.clone(); + tasks.push(async move { + let _permit = semaphore_moved.acquire().await.unwrap(); + let table_list = self.get_table_list(&catalog, &schema).await?; + let table_count = table_list.len(); + tokio::fs::create_dir_all(&self.output_dir) + .await + .context(FileIoSnafu)?; + let output_file = + Path::new(&self.output_dir).join(format!("{catalog}-{schema}.sql")); + let mut file = File::create(output_file).await.context(FileIoSnafu)?; + for (c, s, t) in table_list { + match self.show_create_table(&c, &s, &t).await { + Err(e) => { + error!(e; "Failed to export table {}.{}.{}", c, s, t) + } + Ok(create_table) => { + file.write_all(create_table.as_bytes()) + .await + .context(FileIoSnafu)?; + } + } + } + info!("finished exporting {catalog}.{schema} with {table_count} tables",); + Ok::<(), Error>(()) + }); + } + + let success = futures::future::join_all(tasks) + .await + .into_iter() + .filter(|r| match r { + Ok(_) => true, + Err(e) => { + error!(e; "export job failed"); + false + } + }) + .count(); + + info!("success {success}/{db_count} jobs"); + + Ok(()) + } +} + +#[async_trait] +impl Tool for Export { + async fn do_work(&self) -> Result<()> { + match self.target { + ExportTarget::CreateTable => self.export_create_table().await, + ExportTarget::TableData => unimplemented!("export table data"), + } + } +} + +/// Split at `-`. +fn split_database(database: &str) -> Result<(String, Option)> { + let (catalog, schema) = database + .split_once('-') + .with_context(|| InvalidDatabaseNameSnafu { + database: database.to_string(), + })?; + if schema == "*" { + Ok((catalog.to_string(), None)) + } else { + Ok((catalog.to_string(), Some(schema.to_string()))) + } +} diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index d8b748029213..ae8c363fcd92 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -174,12 +174,39 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to connect server at {addr}"))] + ConnectServer { + addr: String, + source: client::error::Error, + location: Location, + }, + #[snafu(display("Failed to serde json"))] SerdeJson { #[snafu(source)] error: serde_json::error::Error, location: Location, }, + + #[snafu(display("Expect data from output, but got another thing"))] + NotDataFromOutput { location: Location }, + + #[snafu(display("Empty result from output"))] + EmptyResult { location: Location }, + + #[snafu(display("Failed to manipulate file"))] + FileIo { + location: Location, + #[snafu(source)] + error: std::io::Error, + }, + + #[snafu(display("Invalid database name: {}", database))] + InvalidDatabaseName { + location: Location, + database: String, + }, + #[snafu(display("Failed to create directory {}", dir))] CreateDir { dir: String, @@ -204,12 +231,16 @@ impl ErrorExt for Error { Error::IterStream { source, .. } | Error::InitMetadata { source, .. } => { source.status_code() } + Error::ConnectServer { source, .. } => source.status_code(), Error::MissingConfig { .. } | Error::LoadLayeredConfig { .. } | Error::IllegalConfig { .. } | Error::InvalidReplCommand { .. } + | Error::ConnectEtcd { .. } + | Error::NotDataFromOutput { .. } | Error::CreateDir { .. } - | Error::ConnectEtcd { .. } => StatusCode::InvalidArguments, + | Error::EmptyResult { .. } + | Error::InvalidDatabaseName { .. } => StatusCode::InvalidArguments, Error::ReplCreation { .. } | Error::Readline { .. } => StatusCode::Internal, Error::RequestDatabase { source, .. } => source.status_code(), @@ -222,7 +253,7 @@ impl ErrorExt for Error { Error::SubstraitEncodeLogicalPlan { source, .. } => source.status_code(), Error::StartCatalogManager { source, .. } => source.status_code(), - Error::SerdeJson { .. } => StatusCode::Unexpected, + Error::SerdeJson { .. } | Error::FileIo { .. } => StatusCode::Unexpected, } } diff --git a/src/common/recordbatch/src/recordbatch.rs b/src/common/recordbatch/src/recordbatch.rs index 979bc5fd0e3c..5de969dbbdb7 100644 --- a/src/common/recordbatch/src/recordbatch.rs +++ b/src/common/recordbatch/src/recordbatch.rs @@ -13,8 +13,10 @@ // limitations under the License. use std::collections::HashMap; +use std::slice; use std::sync::Arc; +use datafusion::arrow::util::pretty::pretty_format_batches; use datatypes::schema::SchemaRef; use datatypes::value::Value; use datatypes::vectors::{Helper, VectorRef}; @@ -169,6 +171,13 @@ impl RecordBatch { Ok(vectors) } + + /// Pretty display this record batch like a table + pub fn pretty_print(&self) -> String { + pretty_format_batches(slice::from_ref(&self.df_record_batch)) + .map(|t| t.to_string()) + .unwrap_or("failed to pretty display a record batch".to_string()) + } } impl Serialize for RecordBatch { diff --git a/src/table/src/engine.rs b/src/table/src/engine.rs index 222640ced4e2..ccb56d874d28 100644 --- a/src/table/src/engine.rs +++ b/src/table/src/engine.rs @@ -35,6 +35,8 @@ pub struct TableReference<'a> { pub table: &'a str, } +pub type OwnedTableReference = TableReference<'static>; + // TODO(LFC): Find a better place for `TableReference`, // so that we can reuse the default catalog and schema consts. // Could be done together with issue #559.