From 85f2bc9967193ae53ba3e9acdd476a3e6e1a183f Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Thu, 5 Sep 2024 16:18:31 +0800 Subject: [PATCH 1/2] make drop table more robust --- src/frontend/src/handler/create_table.rs | 27 +++++---- src/frontend/src/handler/drop_table.rs | 73 +++++++++++++++++------- 2 files changed, 64 insertions(+), 36 deletions(-) diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index d526295c830fe..a6815befc5818 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -1312,24 +1312,22 @@ pub async fn handle_create_table( .await?; } Engine::Iceberg => { - // export AWS_REGION=your_region - // export AWS_ACCESS_KEY_ID=your_access_key - // export AWS_SECRET_ACCESS_KEY=your_secret_key - // export RW_S3_ENDPOINT=your_endpoint - // export META_STORE_URI=your_meta_store_uri - // export META_STORE_USER=your_meta_store_user - // export META_STORE_PASSWORD=your_meta_store_password - - let s3_endpoint = if let Ok(endpoint) = std::env::var("RW_S3_ENDPOINT") { + let s3_region = if let Ok(region) = std::env::var("AWS_REGION") { + region + } else { + "us-east-1".to_string() + }; + + let s3_endpoint = if let Ok(endpoint) = std::env::var("AWS_ENDPOINT_URL") { endpoint } else { "http://127.0.0.1:9301".to_string() }; - let s3_region = if let Ok(region) = std::env::var("AWS_REGION") { - region + let s3_bucket = if let Ok(bucket) = std::env::var("AWS_BUCKET") { + bucket } else { - "us-east-1".to_string() + "hummock001".to_string() }; let s3_ak = if let Ok(ak) = std::env::var("AWS_ACCESS_KEY_ID") { @@ -1438,7 +1436,7 @@ pub async fn handle_create_table( with.insert("primary_key".to_string(), pks.join(",")); with.insert("type".to_string(), "upsert".to_string()); with.insert("catalog.type".to_string(), "jdbc".to_string()); - with.insert("warehouse.path".to_string(), "s3://hummock001".to_string()); + with.insert("warehouse.path".to_string(), format!("s3://{}", s3_bucket)); with.insert("s3.endpoint".to_string(), s3_endpoint.clone()); with.insert("s3.access.key".to_string(), s3_ak.clone()); with.insert("s3.secret.key".to_string(), s3_sk.clone()); @@ -1452,6 +1450,7 @@ pub async fn handle_create_table( with.insert("catalog.name".to_string(), catalog_name.clone()); with.insert("database.name".to_string(), database_name.clone()); with.insert("table.name".to_string(), table_name.to_string()); + with.insert("commit_checkpoint_interval".to_string(), "1".to_string()); with.insert("create_table_if_not_exists".to_string(), "true".to_string()); sink_handler_args.with_options = WithOptions::new_with_options(with); @@ -1474,7 +1473,7 @@ pub async fn handle_create_table( let mut with = BTreeMap::new(); with.insert("connector".to_string(), "iceberg".to_string()); with.insert("catalog.type".to_string(), "jdbc".to_string()); - with.insert("warehouse.path".to_string(), "s3://hummock001".to_string()); + with.insert("warehouse.path".to_string(), format!("s3://{}", s3_bucket)); with.insert("s3.endpoint".to_string(), s3_endpoint.clone()); with.insert("s3.access.key".to_string(), s3_ak.clone()); with.insert("s3.secret.key".to_string(), s3_sk.clone()); diff --git a/src/frontend/src/handler/drop_table.rs b/src/frontend/src/handler/drop_table.rs index dc471414f7e4c..936f5e9def317 100644 --- a/src/frontend/src/handler/drop_table.rs +++ b/src/frontend/src/handler/drop_table.rs @@ -18,6 +18,7 @@ use risingwave_common::catalog::Engine; use risingwave_connector::sink::iceberg::IcebergConfig; use risingwave_connector::source::ConnectorProperties; use risingwave_sqlparser::ast::{Ident, ObjectName}; +use tracing::warn; use super::RwPgResponse; use crate::binder::Binder; @@ -66,7 +67,7 @@ pub async fn handle_drop_table( match engine { Engine::Iceberg => { - let source = session + let iceberg_config = if let Ok(source) = session .env() .catalog_reader() .read_guard() @@ -75,7 +76,31 @@ pub async fn handle_drop_table( schema_path, &(ICEBERG_SOURCE_PREFIX.to_string() + &table_name), ) - .map(|(source, _)| source.clone())?; + .map(|(source, _)| source.clone()) + { + let config = ConnectorProperties::extract(source.with_properties.clone(), false)?; + if let ConnectorProperties::Iceberg(iceberg_properties) = config { + Some(iceberg_properties.to_iceberg_config()) + } else { + unreachable!("must be iceberg source"); + } + } else if let Ok(sink) = session + .env() + .catalog_reader() + .read_guard() + .get_sink_by_name( + db_name, + schema_path, + &(ICEBERG_SINK_PREFIX.to_string() + &table_name), + ) + .map(|(sink, _)| sink.clone()) + { + // If iceberg source does not exist, use iceberg sink to load iceberg table + Some(IcebergConfig::from_btreemap(sink.properties.clone())?) + } else { + None + }; + // TODO(nimtable): handle drop table failures in the middle. // Drop sink // Drop iceberg table @@ -92,9 +117,7 @@ pub async fn handle_drop_table( ) .await?; - let config = ConnectorProperties::extract(source.with_properties.clone(), false)?; - if let ConnectorProperties::Iceberg(iceberg_properties) = config { - let iceberg_config: IcebergConfig = iceberg_properties.to_iceberg_config(); + if let Some(iceberg_config) = iceberg_config { let iceberg_catalog = iceberg_config .create_catalog_v2() .await @@ -102,30 +125,36 @@ pub async fn handle_drop_table( let table_id = iceberg_config .full_table_name_v2() .context("Unable to parse table name")?; - let table = iceberg_catalog + if let Ok(table) = iceberg_catalog .load_table(&table_id) .await - .context("failed to load iceberg table")?; - table - .file_io() - .remove_all(table.metadata().location()) - .await - .context("failed to purge iceberg table")?; + .context("failed to load iceberg table") + { + table + .file_io() + .remove_all(table.metadata().location()) + .await + .context("failed to purge iceberg table")?; + } else { + warn!("Table {} with iceberg engine, but failed to load iceberg table. It might be the warehouse path has been cleared but fail before drop iceberg source", table_name); + } iceberg_catalog .drop_table(&table_id) .await .context("failed to drop iceberg table")?; - } - crate::handler::drop_source::handle_drop_source( - handler_args.clone(), - ObjectName::from(vec![Ident::from( - (ICEBERG_SOURCE_PREFIX.to_string() + &table_name).as_str(), - )]), - true, - false, - ) - .await?; + crate::handler::drop_source::handle_drop_source( + handler_args.clone(), + ObjectName::from(vec![Ident::from( + (ICEBERG_SOURCE_PREFIX.to_string() + &table_name).as_str(), + )]), + true, + false, + ) + .await?; + } else { + warn!("Table {} with iceberg engine but with no source and sink. It might be created partially. Please check it with iceberg catalog", table_name); + } } Engine::Hummock => {} } From c7a45f69bd92e2edf982d4463206d4df285b23ae Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Sun, 8 Sep 2024 00:44:00 +0800 Subject: [PATCH 2/2] refine --- src/frontend/src/handler/create_table.rs | 109 +++++++++++++---------- 1 file changed, 60 insertions(+), 49 deletions(-) diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index a6815befc5818..c38e49ebb9bd1 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -21,7 +21,6 @@ use either::Either; use fixedbitset::FixedBitSet; use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; -use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{ CdcTableDesc, ColumnCatalog, ColumnDesc, Engine, TableId, TableVersionId, DEFAULT_SCHEMA_NAME, INITIAL_TABLE_VERSION_ID, ROWID_PREFIX, @@ -30,6 +29,7 @@ use risingwave_common::license::Feature; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use risingwave_common::util::value_encoding::DatumToProtoExt; +use risingwave_common::{bail, bail_not_implemented}; use risingwave_connector::source::cdc::build_cdc_table_id; use risingwave_connector::source::cdc::external::{ ExternalTableConfig, ExternalTableImpl, DATABASE_NAME_KEY, SCHEMA_NAME_KEY, TABLE_NAME_KEY, @@ -1312,54 +1312,58 @@ pub async fn handle_create_table( .await?; } Engine::Iceberg => { - let s3_region = if let Ok(region) = std::env::var("AWS_REGION") { - region + let s3_endpoint = if let Ok(s3_endpoint) = std::env::var("AWS_ENDPOINT_URL") { + Some(s3_endpoint) } else { - "us-east-1".to_string() + None }; - let s3_endpoint = if let Ok(endpoint) = std::env::var("AWS_ENDPOINT_URL") { - endpoint - } else { - "http://127.0.0.1:9301".to_string() + let Ok(s3_region) = std::env::var("AWS_REGION") else { + bail!("To create an iceberg engine table, AWS_REGION needed to be set"); }; - let s3_bucket = if let Ok(bucket) = std::env::var("AWS_BUCKET") { - bucket - } else { - "hummock001".to_string() + let Ok(s3_bucket) = std::env::var("AWS_BUCKET") else { + bail!("To create an iceberg engine table, AWS_BUCKET needed to be set"); }; - let s3_ak = if let Ok(ak) = std::env::var("AWS_ACCESS_KEY_ID") { - ak - } else { - "hummockadmin".to_string() + let Ok(s3_ak) = std::env::var("AWS_ACCESS_KEY_ID") else { + bail!("To create an iceberg engine table, AWS_ACCESS_KEY_ID needed to be set"); }; - let s3_sk = if let Ok(sk) = std::env::var("AWS_SECRET_ACCESS_KEY") { - sk - } else { - "hummockadmin".to_string() + let Ok(s3_sk) = std::env::var("AWS_SECRET_ACCESS_KEY") else { + bail!("To create an iceberg engine table, AWS_SECRET_ACCESS_KEY needed to be set"); }; - let meta_store_uri = if let Ok(uri) = std::env::var("META_STORE_URI") { - uri - } else { - "jdbc:sqlite:/tmp/sqlite/iceberg.db".to_string() + let Ok(meta_store_uri) = std::env::var("META_STORE_URI") else { + bail!("To create an iceberg engine table, META_STORE_URI needed to be set"); }; - let meta_store_user = if let Ok(user) = std::env::var("META_STORE_USER") { - user - } else { - "xxx".to_string() + let Ok(meta_store_user) = std::env::var("META_STORE_USER") else { + bail!("To create an iceberg engine table, META_STORE_USER needed to be set"); }; - let meta_store_password = if let Ok(password) = std::env::var("META_STORE_PASSWORD") { - password - } else { - "xxx".to_string() + let Ok(meta_store_password) = std::env::var("META_STORE_PASSWORD") else { + bail!("To create an iceberg engine table, META_STORE_PASSWORD needed to be set"); }; + let rw_db_name = session + .env() + .catalog_reader() + .read_guard() + .get_database_by_id(&table.database_id)? + .name() + .to_string(); + let rw_schema_name = session + .env() + .catalog_reader() + .read_guard() + .get_schema_by_id(&table.database_id, &table.schema_id)? + .name() + .to_string(); + let iceberg_catalog_name = rw_db_name.to_string(); + let iceberg_database_name = rw_schema_name.to_string(); + let iceberg_table_name = table_name.0.last().unwrap().real_value(); + // Iceberg sinks require a primary key, if none is provided, we will use the _row_id column // Fetch primary key from columns let mut pks = column_defs @@ -1413,11 +1417,14 @@ pub async fn handle_create_table( }; let with_properties = WithProperties(vec![]); + let mut sink_name = table_name.clone(); + *sink_name.0.last_mut().unwrap() = Ident::from( + (ICEBERG_SINK_PREFIX.to_string() + &sink_name.0.last().unwrap().real_value()) + .as_str(), + ); let create_sink_stmt = CreateSinkStatement { if_not_exists: false, - sink_name: ObjectName::from(vec![Ident::from( - (ICEBERG_SINK_PREFIX.to_string() + &table_name.real_value()).as_str(), - )]), + sink_name, with_properties, sink_from, columns: vec![], @@ -1426,9 +1433,6 @@ pub async fn handle_create_table( into_table_name: None, }; - let catalog_name = "nimtable".to_string(); - let database_name = "nimtable_db".to_string(); - let mut sink_handler_args = handler_args.clone(); let mut with = BTreeMap::new(); with.insert("connector".to_string(), "iceberg".to_string()); @@ -1437,7 +1441,9 @@ pub async fn handle_create_table( with.insert("type".to_string(), "upsert".to_string()); with.insert("catalog.type".to_string(), "jdbc".to_string()); with.insert("warehouse.path".to_string(), format!("s3://{}", s3_bucket)); - with.insert("s3.endpoint".to_string(), s3_endpoint.clone()); + if let Some(s3_endpoint) = s3_endpoint.clone() { + with.insert("s3.endpoint".to_string(), s3_endpoint); + } with.insert("s3.access.key".to_string(), s3_ak.clone()); with.insert("s3.secret.key".to_string(), s3_sk.clone()); with.insert("s3.region".to_string(), s3_region.clone()); @@ -1447,20 +1453,23 @@ pub async fn handle_create_table( "catalog.jdbc.password".to_string(), meta_store_password.clone(), ); - with.insert("catalog.name".to_string(), catalog_name.clone()); - with.insert("database.name".to_string(), database_name.clone()); - with.insert("table.name".to_string(), table_name.to_string()); + with.insert("catalog.name".to_string(), iceberg_catalog_name.clone()); + with.insert("database.name".to_string(), iceberg_database_name.clone()); + with.insert("table.name".to_string(), iceberg_table_name.to_string()); with.insert("commit_checkpoint_interval".to_string(), "1".to_string()); with.insert("create_table_if_not_exists".to_string(), "true".to_string()); sink_handler_args.with_options = WithOptions::new_with_options(with); + let mut source_name = table_name.clone(); + *source_name.0.last_mut().unwrap() = Ident::from( + (ICEBERG_SOURCE_PREFIX.to_string() + &source_name.0.last().unwrap().real_value()) + .as_str(), + ); let create_source_stmt = CreateSourceStatement { temporary: false, if_not_exists: false, columns: vec![], - source_name: ObjectName::from(vec![Ident::from( - (ICEBERG_SOURCE_PREFIX.to_string() + &table_name.real_value()).as_str(), - )]), + source_name, wildcard_idx: None, constraints: vec![], with_properties: WithProperties(vec![]), @@ -1474,7 +1483,9 @@ pub async fn handle_create_table( with.insert("connector".to_string(), "iceberg".to_string()); with.insert("catalog.type".to_string(), "jdbc".to_string()); with.insert("warehouse.path".to_string(), format!("s3://{}", s3_bucket)); - with.insert("s3.endpoint".to_string(), s3_endpoint.clone()); + if let Some(s3_endpoint) = s3_endpoint { + with.insert("s3.endpoint".to_string(), s3_endpoint.clone()); + } with.insert("s3.access.key".to_string(), s3_ak.clone()); with.insert("s3.secret.key".to_string(), s3_sk.clone()); with.insert("s3.region".to_string(), s3_region.clone()); @@ -1484,9 +1495,9 @@ pub async fn handle_create_table( "catalog.jdbc.password".to_string(), meta_store_password.clone(), ); - with.insert("catalog.name".to_string(), catalog_name.clone()); - with.insert("database.name".to_string(), database_name.clone()); - with.insert("table.name".to_string(), table_name.to_string()); + with.insert("catalog.name".to_string(), iceberg_catalog_name.clone()); + with.insert("database.name".to_string(), iceberg_database_name.clone()); + with.insert("table.name".to_string(), iceberg_table_name.to_string()); source_handler_args.with_options = WithOptions::new_with_options(with); catalog_writer