diff --git a/src/cmd/src/cli/export.rs b/src/cmd/src/cli/export.rs index d653889dae68..24b608689d0e 100644 --- a/src/cmd/src/cli/export.rs +++ b/src/cmd/src/cli/export.rs @@ -176,8 +176,12 @@ impl Export { } /// Return a list of [`TableReference`] to be exported. - /// Includes all tables under the given `catalog` and `schema` - async fn get_table_list(&self, catalog: &str, schema: &str) -> Result> { + /// Includes all tables under the given `catalog` and `schema`. + async fn get_table_list( + &self, + catalog: &str, + schema: &str, + ) -> Result<(Vec, Vec)> { // Puts all metric table first let sql = format!( "select table_catalog, table_schema, table_name from \ @@ -214,7 +218,7 @@ impl Export { debug!("Fetched table list: {:?}", records); if records.is_empty() { - return Ok(vec![]); + return Ok((vec![], vec![])); } let mut remaining_tables = Vec::with_capacity(records.len()); @@ -232,11 +236,11 @@ impl Export { remaining_tables.push(table); } } - let mut tables = Vec::with_capacity(metric_physical_tables.len() + remaining_tables.len()); - tables.extend(metric_physical_tables.into_iter()); - tables.extend(remaining_tables); - Ok(tables) + Ok(( + metric_physical_tables.into_iter().collect(), + remaining_tables, + )) } async fn show_create_table(&self, catalog: &str, schema: &str, table: &str) -> Result { @@ -265,15 +269,16 @@ impl Export { let semaphore_moved = semaphore.clone(); tasks.push(async move { let _permit = semaphore_moved.acquire().await.unwrap(); - let table_list = self.get_table_list(&catalog, &schema).await?; - let table_count = table_list.len(); + let (metric_physical_tables, remaining_tables) = + self.get_table_list(&catalog, &schema).await?; + let table_count = metric_physical_tables.len() + remaining_tables.len(); tokio::fs::create_dir_all(&self.output_dir) .await .context(FileIoSnafu)?; let output_file = Path::new(&self.output_dir).join(format!("{catalog}-{schema}.sql")); let mut file = File::create(output_file).await.context(FileIoSnafu)?; - for (c, s, t) in table_list { + for (c, s, t) in metric_physical_tables.into_iter().chain(remaining_tables) { match self.show_create_table(&c, &s, &t).await { Err(e) => { error!(e; r#"Failed to export table "{}"."{}"."{}""#, c, s, t) @@ -322,15 +327,25 @@ impl Export { .await .context(FileIoSnafu)?; let output_dir = Path::new(&self.output_dir).join(format!("{catalog}-{schema}/")); - - // copy database to - let sql = format!( - "copy database {} to '{}' with (format='parquet');", - schema, - output_dir.to_str().unwrap() - ); - self.sql(&sql).await?; - info!("finished exporting {catalog}.{schema} data"); + // Ignores metric physical tables + let (metrics_tables, table_list) = self.get_table_list(&catalog, &schema).await?; + for (_, _, table_name) in metrics_tables { + warn!("Ignores metric physical table: {table_name}"); + } + for (catalog_name, schema_name, table_name) in table_list { + // copy table to + let sql = format!( + r#"Copy "{}"."{}"."{}" TO '{}{}.parquet' WITH (format='parquet');"#, + catalog_name, + schema_name, + table_name, + output_dir.to_str().unwrap(), + table_name, + ); + info!("Executing sql: {sql}"); + self.sql(&sql).await?; + } + info!("Finished exporting {catalog}.{schema} data"); // export copy from sql let dir_filenames = match output_dir.read_dir() {