Skip to content

Commit

Permalink
chore: add export data to migrate tool
Browse files Browse the repository at this point in the history
  • Loading branch information
shuiyisong committed Oct 17, 2023
1 parent 9056c3a commit 194b6ae
Showing 1 changed file with 51 additions and 1 deletion.
52 changes: 51 additions & 1 deletion src/cmd/src/cli/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,14 +284,64 @@ impl Export {

Ok(())
}

async fn export_table_data(&self) -> Result<()> {
let semaphore = Arc::new(Semaphore::new(self.parallelism));
let db_names = self.iter_db_names().await?;
let db_count = db_names.len();
let mut tasks = Vec::with_capacity(db_names.len());
for (catalog, schema) in db_names {
let semaphore_moved = semaphore.clone();
tasks.push(async move {
let _permit = semaphore_moved.acquire().await.unwrap();
tokio::fs::create_dir_all(&self.output_dir)
.await
.context(FileIoSnafu)?;
let output_dir = Path::new(&self.output_dir).join(format!("{catalog}-{schema}/"));

let mut client = self.client.clone();
client.set_catalog(catalog.clone());
client.set_schema(schema.clone());

let sql = format!(
"copy database {} to '{}' with (format='parquet');",
schema,
output_dir.to_str().unwrap()
);
client
.sql(sql.clone())
.await
.context(RequestDatabaseSnafu { sql })?;

info!("finished exporting {catalog}.{schema} data");
Ok::<(), Error>(())
});
}

let success = futures::future::join_all(tasks)
.await
.into_iter()
.filter(|r| match r {
Ok(_) => true,
Err(e) => {
error!(e; "export job failed");
false
}
})
.count();

info!("success {success}/{db_count} jobs");

Ok(())
}
}

#[async_trait]
impl Tool for Export {
async fn do_work(&self) -> Result<()> {
match self.target {
ExportTarget::CreateTable => self.export_create_table().await,
ExportTarget::TableData => unimplemented!("export table data"),
ExportTarget::TableData => self.export_table_data().await,
}
}
}
Expand Down

0 comments on commit 194b6ae

Please sign in to comment.