diff --git a/src/cmd/src/cli/export.rs b/src/cmd/src/cli/export.rs index 4ec90132fe6b..88fef95fad5c 100644 --- a/src/cmd/src/cli/export.rs +++ b/src/cmd/src/cli/export.rs @@ -284,6 +284,56 @@ impl Export { Ok(()) } + + async fn export_table_data(&self) -> Result<()> { + let semaphore = Arc::new(Semaphore::new(self.parallelism)); + let db_names = self.iter_db_names().await?; + let db_count = db_names.len(); + let mut tasks = Vec::with_capacity(db_names.len()); + for (catalog, schema) in db_names { + let semaphore_moved = semaphore.clone(); + tasks.push(async move { + let _permit = semaphore_moved.acquire().await.unwrap(); + tokio::fs::create_dir_all(&self.output_dir) + .await + .context(FileIoSnafu)?; + let output_dir = Path::new(&self.output_dir).join(format!("{catalog}-{schema}/")); + + let mut client = self.client.clone(); + client.set_catalog(catalog.clone()); + client.set_schema(schema.clone()); + + let sql = format!( + "copy database {} to '{}' with (format='parquet');", + schema, + output_dir.to_str().unwrap() + ); + client + .sql(sql.clone()) + .await + .context(RequestDatabaseSnafu { sql })?; + + info!("finished exporting {catalog}.{schema} data"); + Ok::<(), Error>(()) + }); + } + + let success = futures::future::join_all(tasks) + .await + .into_iter() + .filter(|r| match r { + Ok(_) => true, + Err(e) => { + error!(e; "export job failed"); + false + } + }) + .count(); + + info!("success {success}/{db_count} jobs"); + + Ok(()) + } } #[async_trait] @@ -291,7 +341,7 @@ impl Tool for Export { async fn do_work(&self) -> Result<()> { match self.target { ExportTarget::CreateTable => self.export_create_table().await, - ExportTarget::TableData => unimplemented!("export table data"), + ExportTarget::TableData => self.export_table_data().await, } } }