Skip to content

Commit

Permalink
feat(cli): prevent exporting physical table data (GreptimeTeam#3978)
Browse files Browse the repository at this point in the history
* feat: prevent exporting physical table data

* chore: apply suggestions from CR
  • Loading branch information
WenyXu committed May 21, 2024
1 parent 7c56d24 commit 428e632
Showing 1 changed file with 34 additions and 19 deletions.
53 changes: 34 additions & 19 deletions src/cmd/src/cli/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,12 @@ impl Export {
}

/// Return a list of [`TableReference`] to be exported.
/// Includes all tables under the given `catalog` and `schema`
async fn get_table_list(&self, catalog: &str, schema: &str) -> Result<Vec<TableReference>> {
/// Includes all tables under the given `catalog` and `schema`.
async fn get_table_list(
&self,
catalog: &str,
schema: &str,
) -> Result<(Vec<TableReference>, Vec<TableReference>)> {
// Puts all metric table first
let sql = format!(
"select table_catalog, table_schema, table_name from \
Expand Down Expand Up @@ -214,7 +218,7 @@ impl Export {
debug!("Fetched table list: {:?}", records);

if records.is_empty() {
return Ok(vec![]);
return Ok((vec![], vec![]));
}

let mut remaining_tables = Vec::with_capacity(records.len());
Expand All @@ -232,11 +236,11 @@ impl Export {
remaining_tables.push(table);
}
}
let mut tables = Vec::with_capacity(metric_physical_tables.len() + remaining_tables.len());
tables.extend(metric_physical_tables.into_iter());
tables.extend(remaining_tables);

Ok(tables)
Ok((
metric_physical_tables.into_iter().collect(),
remaining_tables,
))
}

async fn show_create_table(&self, catalog: &str, schema: &str, table: &str) -> Result<String> {
Expand Down Expand Up @@ -265,15 +269,16 @@ impl Export {
let semaphore_moved = semaphore.clone();
tasks.push(async move {
let _permit = semaphore_moved.acquire().await.unwrap();
let table_list = self.get_table_list(&catalog, &schema).await?;
let table_count = table_list.len();
let (metric_physical_tables, remaining_tables) =
self.get_table_list(&catalog, &schema).await?;
let table_count = metric_physical_tables.len() + remaining_tables.len();
tokio::fs::create_dir_all(&self.output_dir)
.await
.context(FileIoSnafu)?;
let output_file =
Path::new(&self.output_dir).join(format!("{catalog}-{schema}.sql"));
let mut file = File::create(output_file).await.context(FileIoSnafu)?;
for (c, s, t) in table_list {
for (c, s, t) in metric_physical_tables.into_iter().chain(remaining_tables) {
match self.show_create_table(&c, &s, &t).await {
Err(e) => {
error!(e; r#"Failed to export table "{}"."{}"."{}""#, c, s, t)
Expand Down Expand Up @@ -322,15 +327,25 @@ impl Export {
.await
.context(FileIoSnafu)?;
let output_dir = Path::new(&self.output_dir).join(format!("{catalog}-{schema}/"));

// copy database to
let sql = format!(
"copy database {} to '{}' with (format='parquet');",
schema,
output_dir.to_str().unwrap()
);
self.sql(&sql).await?;
info!("finished exporting {catalog}.{schema} data");
// Ignores metric physical tables
let (metrics_tables, table_list) = self.get_table_list(&catalog, &schema).await?;
for (_, _, table_name) in metrics_tables {
warn!("Ignores metric physical table: {table_name}");
}
for (catalog_name, schema_name, table_name) in table_list {
// copy table to
let sql = format!(
r#"Copy "{}"."{}"."{}" TO '{}{}.parquet' WITH (format='parquet');"#,
catalog_name,
schema_name,
table_name,
output_dir.to_str().unwrap(),
table_name,
);
info!("Executing sql: {sql}");
self.sql(&sql).await?;
}
info!("Finished exporting {catalog}.{schema} data");

// export copy from sql
let dir_filenames = match output_dir.read_dir() {
Expand Down

0 comments on commit 428e632

Please sign in to comment.