From 194b6ae5a398c17efecd2d896ff95f637aa8f110 Mon Sep 17 00:00:00 2001 From: shuiyisong Date: Tue, 17 Oct 2023 10:26:20 +0800 Subject: [PATCH 1/2] chore: add export data to migrate tool --- src/cmd/src/cli/export.rs | 52 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 51 insertions(+), 1 deletion(-) diff --git a/src/cmd/src/cli/export.rs b/src/cmd/src/cli/export.rs index 4ec90132fe6b..88fef95fad5c 100644 --- a/src/cmd/src/cli/export.rs +++ b/src/cmd/src/cli/export.rs @@ -284,6 +284,56 @@ impl Export { Ok(()) } + + async fn export_table_data(&self) -> Result<()> { + let semaphore = Arc::new(Semaphore::new(self.parallelism)); + let db_names = self.iter_db_names().await?; + let db_count = db_names.len(); + let mut tasks = Vec::with_capacity(db_names.len()); + for (catalog, schema) in db_names { + let semaphore_moved = semaphore.clone(); + tasks.push(async move { + let _permit = semaphore_moved.acquire().await.unwrap(); + tokio::fs::create_dir_all(&self.output_dir) + .await + .context(FileIoSnafu)?; + let output_dir = Path::new(&self.output_dir).join(format!("{catalog}-{schema}/")); + + let mut client = self.client.clone(); + client.set_catalog(catalog.clone()); + client.set_schema(schema.clone()); + + let sql = format!( + "copy database {} to '{}' with (format='parquet');", + schema, + output_dir.to_str().unwrap() + ); + client + .sql(sql.clone()) + .await + .context(RequestDatabaseSnafu { sql })?; + + info!("finished exporting {catalog}.{schema} data"); + Ok::<(), Error>(()) + }); + } + + let success = futures::future::join_all(tasks) + .await + .into_iter() + .filter(|r| match r { + Ok(_) => true, + Err(e) => { + error!(e; "export job failed"); + false + } + }) + .count(); + + info!("success {success}/{db_count} jobs"); + + Ok(()) + } } #[async_trait] @@ -291,7 +341,7 @@ impl Tool for Export { async fn do_work(&self) -> Result<()> { match self.target { ExportTarget::CreateTable => self.export_create_table().await, - ExportTarget::TableData => unimplemented!("export table data"), + ExportTarget::TableData => self.export_table_data().await, } } } From 2b139cf1928903827b3bab457c66c0c1b745f3fd Mon Sep 17 00:00:00 2001 From: shuiyisong Date: Tue, 17 Oct 2023 12:13:31 +0800 Subject: [PATCH 2/2] chore: export copy from sql too --- src/cmd/src/cli/export.rs | 38 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/src/cmd/src/cli/export.rs b/src/cmd/src/cli/export.rs index 88fef95fad5c..e0b9cd6be5b3 100644 --- a/src/cmd/src/cli/export.rs +++ b/src/cmd/src/cli/export.rs @@ -20,7 +20,7 @@ use clap::{Parser, ValueEnum}; use client::{Client, Database, DEFAULT_SCHEMA_NAME}; use common_query::Output; use common_recordbatch::util::collect; -use common_telemetry::{debug, error, info}; +use common_telemetry::{debug, error, info, warn}; use datatypes::scalars::ScalarVector; use datatypes::vectors::{StringVector, Vector}; use snafu::{OptionExt, ResultExt}; @@ -303,6 +303,7 @@ impl Export { client.set_catalog(catalog.clone()); client.set_schema(schema.clone()); + // copy database to let sql = format!( "copy database {} to '{}' with (format='parquet');", schema, @@ -312,8 +313,41 @@ impl Export { .sql(sql.clone()) .await .context(RequestDatabaseSnafu { sql })?; - info!("finished exporting {catalog}.{schema} data"); + + // export copy from sql + let dir_filenames = match output_dir.read_dir() { + Ok(dir) => dir, + Err(_) => { + warn!("empty database {catalog}.{schema}"); + return Ok(()); + } + }; + + let copy_from_file = + Path::new(&self.output_dir).join(format!("{catalog}-{schema}_copy_from.sql")); + let mut file = File::create(copy_from_file).await.context(FileIoSnafu)?; + + let copy_from_sql = dir_filenames + .into_iter() + .map(|file| { + let file = file.unwrap(); + let filename = file.file_name().into_string().unwrap(); + + format!( + "copy {} from '{}' with (format='parquet');\n", + filename.replace(".parquet", ""), + file.path().to_str().unwrap() + ) + }) + .collect::>() + .join(""); + file.write_all(copy_from_sql.as_bytes()) + .await + .context(FileIoSnafu)?; + + info!("finished exporting {catalog}.{schema} copy_from.sql"); + Ok::<(), Error>(()) }); }