From 8b094c992fe5dfe0731c7be9bcf041003f5da4b6 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Fri, 17 May 2024 14:14:04 +0000 Subject: [PATCH] feat: prevent exporting physical table data --- src/cmd/src/cli/export.rs | 42 +++++++++++++++++++++++---------------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/src/cmd/src/cli/export.rs b/src/cmd/src/cli/export.rs index d653889dae68..2b299288ae6f 100644 --- a/src/cmd/src/cli/export.rs +++ b/src/cmd/src/cli/export.rs @@ -176,8 +176,12 @@ impl Export { } /// Return a list of [`TableReference`] to be exported. - /// Includes all tables under the given `catalog` and `schema` - async fn get_table_list(&self, catalog: &str, schema: &str) -> Result> { + /// Includes all tables under the given `catalog` and `schema`. + async fn get_table_list( + &self, + catalog: &str, + schema: &str, + ) -> Result<(Vec, Vec)> { // Puts all metric table first let sql = format!( "select table_catalog, table_schema, table_name from \ @@ -232,11 +236,11 @@ impl Export { remaining_tables.push(table); } } - let mut tables = Vec::with_capacity(metric_physical_tables.len() + remaining_tables.len()); - tables.extend(metric_physical_tables.into_iter()); - tables.extend(remaining_tables); - Ok(tables) + Ok(( + metric_physical_tables.into_iter().collect(), + remaining_tables, + )) } async fn show_create_table(&self, catalog: &str, schema: &str, table: &str) -> Result { @@ -265,7 +269,8 @@ impl Export { let semaphore_moved = semaphore.clone(); tasks.push(async move { let _permit = semaphore_moved.acquire().await.unwrap(); - let table_list = self.get_table_list(&catalog, &schema).await?; + let (metric_physical_tables, remaining_tables) = + self.get_table_list(&catalog, &schema).await?; let table_count = table_list.len(); tokio::fs::create_dir_all(&self.output_dir) .await @@ -273,7 +278,7 @@ impl Export { let output_file = Path::new(&self.output_dir).join(format!("{catalog}-{schema}.sql")); let mut file = File::create(output_file).await.context(FileIoSnafu)?; - for (c, s, t) in table_list { + for (c, s, t) in metric_physical_tables.into_iter().chain(remaining_tables) { match self.show_create_table(&c, &s, &t).await { Err(e) => { error!(e; r#"Failed to export table "{}"."{}"."{}""#, c, s, t) @@ -322,15 +327,18 @@ impl Export { .await .context(FileIoSnafu)?; let output_dir = Path::new(&self.output_dir).join(format!("{catalog}-{schema}/")); - - // copy database to - let sql = format!( - "copy database {} to '{}' with (format='parquet');", - schema, - output_dir.to_str().unwrap() - ); - self.sql(&sql).await?; - info!("finished exporting {catalog}.{schema} data"); + // Ignores metric physical tables + let (_, table_lists) = self.get_table_list(&catalog, &schema).await?; + for (_, _, table_name) in table_list { + // copy table to + let sql = format!( + "copy table {} to '{}' with (format='parquet');", + schema, + format!("{}{}.parquet",output_dir.to_str().unwrap(),); + ); + self.sql(&sql).await?; + info!("finished exporting {catalog}.{schema} data"); + } // export copy from sql let dir_filenames = match output_dir.read_dir() {