diff --git a/Cargo.lock b/Cargo.lock index 5bdd4f586dd25..ec0cf9aa7cbae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10618,6 +10618,7 @@ dependencies = [ "bytes", "cfg-or-panic", "chrono", + "clap", "clickhouse", "criterion", "csv", diff --git a/risedev.yml b/risedev.yml index 96443a7c0d6e7..312a11f2795f8 100644 --- a/risedev.yml +++ b/risedev.yml @@ -133,6 +133,34 @@ profile: - use: kafka persist-data: true + nimtable: + env: + NIMTABLE_ENABLE_CONFIG_LOAD: "false" + AWS_ENDPOINT_URL: "http://127.0.0.1:9301" + AWS_REGION: "ap-southeast-2" + AWS_S3_BUCKET: "hummock001" + RW_DATA_DIRECTORY: "data-dir" + AWS_ACCESS_KEY_ID: "hummockadmin" + AWS_SECRET_ACCESS_KEY: "hummockadmin" + RW_BACKEND: "sqlite" + RW_SQL_USERNAME: "xxx" + RW_SQL_PASSWORD: "xxx" + RW_SQL_ENDPOINT: "sqlite won't be used" + RW_SQL_DATABASE: "/tmp/sqlite/iceberg.db" + CONNECTOR_LIBS_PATH: ".risingwave/bin/connector-node/libs" + steps: + - use: minio + - use: sqlite + - use: meta-node + meta-backend: sqlite + - use: compute-node + - use: frontend + - use: compactor + - use: prometheus + - use: grafana + - use: kafka + persist-data: true + standalone-full-peripherals: steps: - use: minio diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 14eed053b25cf..0d6062b9eb178 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -43,6 +43,7 @@ chrono = { version = "0.4", default-features = false, features = [ "clock", "std", ] } +clap = { workspace = true } clickhouse = { git = "https://github.com/risingwavelabs/clickhouse.rs", rev = "d38c8b6391af098b724c114e5a4746aedab6ab8e", features = [ "time", ] } diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 53fe7fae6fab0..60838222febcf 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -22,6 +22,10 @@ use std::sync::Arc; use anyhow::{anyhow, Context}; use async_trait::async_trait; +use clap::ValueEnum; +use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; +use iceberg::spec::TableMetadata; +use iceberg::table::Table as TableV2; use iceberg::{Catalog as CatalogV2, NamespaceIdent, TableCreation, TableIdent}; use icelake::catalog::CatalogRef; use icelake::io_v2::input_wrapper::{DeltaWriter, RecordBatchWriter}; @@ -41,6 +45,7 @@ use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::bail; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::Schema; +use risingwave_common::config::MetaBackend; use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized; use risingwave_pb::connector_service::sink_metadata::SerializedMetadata; use risingwave_pb::connector_service::SinkMetadata; @@ -104,6 +109,10 @@ pub struct IcebergConfig { #[serde(default, deserialize_with = "deserialize_bool_from_string")] pub create_table_if_not_exists: bool, + + /// enable config load currently is used by nimtable, so it only support jdbc catalog. + #[serde(default, deserialize_with = "deserialize_bool_from_string")] + pub enable_config_load: bool, } impl IcebergConfig { @@ -162,10 +171,82 @@ impl IcebergConfig { "`commit_checkpoint_interval` must be greater than 0" ))); } + config = config.fill_for_config_load()?; Ok(config) } + pub fn fill_for_config_load(mut self) -> Result { + if self.enable_config_load { + if self.catalog_type.as_deref() != Some("jdbc") { + return Err(SinkError::Config(anyhow!( + "enable_config_load only support jdbc catalog right now" + ))); + } + + let Ok(s3_region) = std::env::var("AWS_REGION") else { + bail!("To create an iceberg engine table, AWS_REGION needed to be set"); + }; + self.region = Some(s3_region); + + let Ok(s3_bucket) = std::env::var("AWS_S3_BUCKET") else { + bail!("To create an iceberg engine table, AWS_S3_BUCKET needed to be set"); + }; + + let Ok(data_directory) = std::env::var("RW_DATA_DIRECTORY") else { + bail!("To create an iceberg engine table, RW_DATA_DIRECTORY needed to be set"); + }; + self.path = format!("s3://{}/{}/nimtable", s3_bucket, data_directory); + + let Ok(meta_store_endpoint) = std::env::var("RW_SQL_ENDPOINT") else { + bail!("To create an iceberg engine table, RW_SQL_ENDPOINT needed to be set"); + }; + + let Ok(meta_store_database) = std::env::var("RW_SQL_DATABASE") else { + bail!("To create an iceberg engine table, RW_SQL_DATABASE needed to be set"); + }; + + let Ok(meta_store_backend) = std::env::var("RW_BACKEND") else { + bail!("To create an iceberg engine table, RW_BACKEND needed to be set"); + }; + let Ok(meta_backend) = MetaBackend::from_str(&meta_store_backend, true) else { + bail!("failed to parse meta backend: {}", meta_store_backend); + }; + + self.uri = match meta_backend { + MetaBackend::Postgres => Some(format!( + "jdbc:postgresql://{}/{}", + meta_store_endpoint.clone(), + meta_store_database.clone() + )), + MetaBackend::Mysql => Some(format!( + "jdbc:mysql://{}/{}", + meta_store_endpoint.clone(), + meta_store_database.clone() + )), + MetaBackend::Sqlite | MetaBackend::Etcd | MetaBackend::Sql | MetaBackend::Mem => { + bail!( + "Unsupported meta backend for iceberg engine table: {}", + meta_store_backend + ); + } + }; + + let Ok(meta_store_user) = std::env::var("RW_SQL_USERNAME") else { + bail!("To create an iceberg engine table, RW_SQL_USERNAME needed to be set"); + }; + + let Ok(meta_store_password) = std::env::var("RW_SQL_PASSWORD") else { + bail!("To create an iceberg engine table, RW_SQL_PASSWORD needed to be set"); + }; + + let java_catalog_props = &mut self.java_catalog_props; + java_catalog_props.insert("jdbc.user".to_string(), meta_store_user); + java_catalog_props.insert("jdbc.password".to_string(), meta_store_password); + } + Ok(self) + } + pub fn catalog_type(&self) -> &str { self.common.catalog_type() } @@ -968,6 +1049,7 @@ mod test { .collect(), commit_checkpoint_interval: DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE, create_table_if_not_exists: false, + enable_config_load: false, }; assert_eq!(iceberg_config, expected_iceberg_config); diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index cdeab00649187..2385b3c4c11b4 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -30,13 +30,13 @@ use risingwave_common::types::JsonbVal; use serde::{Deserialize, Serialize}; use crate::connector_common::IcebergCommon; +use crate::deserialize_optional_bool_from_string; use crate::error::{ConnectorError, ConnectorResult}; use crate::parser::ParserConfig; use crate::source::{ BoxChunkSourceStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties, SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields, }; - pub const ICEBERG_CONNECTOR: &str = "iceberg"; #[derive(Clone, Debug, Deserialize, with_options::WithOptions)] @@ -50,6 +50,13 @@ pub struct IcebergProperties { #[serde(rename = "catalog.jdbc.password")] pub jdbc_password: Option, + #[serde( + rename = "enable_config_load", + default, + deserialize_with = "deserialize_optional_bool_from_string" + )] + pub enable_config_load: Option, + #[serde(flatten)] pub unknown_fields: HashMap, } diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 347fa10f5ac92..200af203a7421 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -347,6 +347,11 @@ IcebergConfig: field_type: bool required: false default: Default::default + - name: enable_config_load + field_type: bool + comments: enable config load currently is used by nimtable, so it only support jdbc catalog. + required: false + default: Default::default KafkaConfig: fields: - name: properties.bootstrap.server diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index 50222280e8b1b..c6a8fd0f8a857 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -123,6 +123,10 @@ IcebergProperties: - name: catalog.jdbc.password field_type: String required: false + - name: enable_config_load + field_type: bool + required: false + default: Default::default KafkaProperties: fields: - name: bytes.per.second diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 0fcea3680c350..50bb0e1fd4ab3 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -14,9 +14,11 @@ use std::collections::{BTreeMap, HashMap}; use std::rc::Rc; +use std::str::ParseBoolError; use std::sync::Arc; use anyhow::{anyhow, Context}; +use clap::ValueEnum; use either::Either; use fixedbitset::FixedBitSet; use itertools::Itertools; @@ -25,6 +27,7 @@ use risingwave_common::catalog::{ CdcTableDesc, ColumnCatalog, ColumnDesc, Engine, TableId, TableVersionId, DEFAULT_SCHEMA_NAME, INITIAL_TABLE_VERSION_ID, ROWID_PREFIX, }; +use risingwave_common::config::MetaBackend; use risingwave_common::license::Feature; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; @@ -1308,6 +1311,21 @@ pub async fn handle_create_table( .await?; } Engine::Iceberg => { + let nimtable_enable_config_load = if let Ok(nimtable_enable_config_load) = + std::env::var("NIMTABLE_ENABLE_CONFIG_LOAD") + { + nimtable_enable_config_load + .parse() + .map_err(|e: ParseBoolError| { + RwError::from(ErrorCode::InvalidParameterValue(format!( + "NIMTABLE_ENABLE_CONFIG_LOAD must be a boolean value, got {}", + e.as_report() + ))) + })? + } else { + true + }; + let s3_endpoint = if let Ok(s3_endpoint) = std::env::var("AWS_ENDPOINT_URL") { Some(s3_endpoint) } else { @@ -1318,28 +1336,46 @@ pub async fn handle_create_table( bail!("To create an iceberg engine table, AWS_REGION needed to be set"); }; - let Ok(s3_bucket) = std::env::var("AWS_BUCKET") else { + let Ok(s3_bucket) = std::env::var("AWS_S3_BUCKET") else { bail!("To create an iceberg engine table, AWS_BUCKET needed to be set"); }; - let Ok(s3_ak) = std::env::var("AWS_ACCESS_KEY_ID") else { - bail!("To create an iceberg engine table, AWS_ACCESS_KEY_ID needed to be set"); + let Ok(data_directory) = std::env::var("RW_DATA_DIRECTORY") else { + bail!("To create an iceberg engine table, RW_DATA_DIRECTORY needed to be set"); }; - let Ok(s3_sk) = std::env::var("AWS_SECRET_ACCESS_KEY") else { - bail!("To create an iceberg engine table, AWS_SECRET_ACCESS_KEY needed to be set"); + let s3_ak = if let Ok(s3_ak) = std::env::var("AWS_ACCESS_KEY_ID") { + s3_ak + } else if nimtable_enable_config_load { + // Since config load is enabled, we will use a placeholder for iceberg sink and source + "xxx".to_string() + } else { + bail!("To create an iceberg engine table in dev mode, AWS_ACCESS_KEY_ID needed to be set") }; - let Ok(meta_store_uri) = std::env::var("META_STORE_URI") else { - bail!("To create an iceberg engine table, META_STORE_URI needed to be set"); + let s3_sk = if let Ok(s3_sk) = std::env::var("AWS_SECRET_ACCESS_KEY") { + s3_sk + } else if nimtable_enable_config_load { + // Since config load is enabled, we will use a placeholder for iceberg sink and source + "xxx".to_string() + } else { + bail!("To create an iceberg engine table in dev mode, AWS_SECRET_ACCESS_KEY needed to be set") }; - let Ok(meta_store_user) = std::env::var("META_STORE_USER") else { - bail!("To create an iceberg engine table, META_STORE_USER needed to be set"); + let Ok(meta_store_endpoint) = std::env::var("RW_SQL_ENDPOINT") else { + bail!("To create an iceberg engine table, RW_SQL_ENDPOINT needed to be set"); }; - let Ok(meta_store_password) = std::env::var("META_STORE_PASSWORD") else { - bail!("To create an iceberg engine table, META_STORE_PASSWORD needed to be set"); + let Ok(meta_store_database) = std::env::var("RW_SQL_DATABASE") else { + bail!("To create an iceberg engine table, RW_SQL_DATABASE needed to be set"); + }; + + let Ok(meta_store_user) = std::env::var("RW_SQL_USERNAME") else { + bail!("To create an iceberg engine table, RW_SQL_USERNAME needed to be set"); + }; + + let Ok(meta_store_password) = std::env::var("RW_SQL_PASSWORD") else { + bail!("To create an iceberg engine table, RW_SQL_PASSWORD needed to be set"); }; let rw_db_name = session @@ -1429,6 +1465,41 @@ pub async fn handle_create_table( into_table_name: None, }; + let Ok(meta_store_backend) = std::env::var("RW_BACKEND") else { + bail!("To create an iceberg engine table, RW_BACKEND needed to be set"); + }; + let Ok(meta_backend) = MetaBackend::from_str(&meta_store_backend, true) else { + bail!("failed to parse meta backend: {}", meta_store_backend); + }; + + let catalog_uri = match meta_backend { + MetaBackend::Postgres => { + format!( + "jdbc:postgresql://{}/{}", + meta_store_endpoint.clone(), + meta_store_database.clone() + ) + } + MetaBackend::Mysql => { + format!( + "jdbc:mysql://{}/{}", + meta_store_endpoint.clone(), + meta_store_database.clone() + ) + } + MetaBackend::Sqlite => { + format!("jdbc:sqlite:{}", meta_store_database.clone()) + } + MetaBackend::Etcd | MetaBackend::Sql | MetaBackend::Mem => { + bail!( + "Unsupported meta backend for iceberg engine table: {}", + meta_store_backend + ); + } + }; + + let warehouse_path = format!("s3://{}/{}/nimtable", s3_bucket, data_directory); + let mut sink_handler_args = handler_args.clone(); let mut with = BTreeMap::new(); with.insert("connector".to_string(), "iceberg".to_string()); @@ -1436,14 +1507,14 @@ pub async fn handle_create_table( with.insert("primary_key".to_string(), pks.join(",")); with.insert("type".to_string(), "upsert".to_string()); with.insert("catalog.type".to_string(), "jdbc".to_string()); - with.insert("warehouse.path".to_string(), format!("s3://{}", s3_bucket)); + with.insert("warehouse.path".to_string(), warehouse_path.clone()); if let Some(s3_endpoint) = s3_endpoint.clone() { with.insert("s3.endpoint".to_string(), s3_endpoint); } with.insert("s3.access.key".to_string(), s3_ak.clone()); with.insert("s3.secret.key".to_string(), s3_sk.clone()); with.insert("s3.region".to_string(), s3_region.clone()); - with.insert("catalog.uri".to_string(), meta_store_uri.clone()); + with.insert("catalog.uri".to_string(), catalog_uri.clone()); with.insert("catalog.jdbc.user".to_string(), meta_store_user.clone()); with.insert( "catalog.jdbc.password".to_string(), @@ -1454,6 +1525,9 @@ pub async fn handle_create_table( with.insert("table.name".to_string(), iceberg_table_name.to_string()); with.insert("commit_checkpoint_interval".to_string(), "1".to_string()); with.insert("create_table_if_not_exists".to_string(), "true".to_string()); + if nimtable_enable_config_load { + with.insert("enable_config_load".to_string(), "true".to_string()); + } sink_handler_args.with_options = WithOptions::new_with_options(with); let mut source_name = table_name.clone(); @@ -1478,14 +1552,14 @@ pub async fn handle_create_table( let mut with = BTreeMap::new(); with.insert("connector".to_string(), "iceberg".to_string()); with.insert("catalog.type".to_string(), "jdbc".to_string()); - with.insert("warehouse.path".to_string(), format!("s3://{}", s3_bucket)); + with.insert("warehouse.path".to_string(), warehouse_path.clone()); if let Some(s3_endpoint) = s3_endpoint { with.insert("s3.endpoint".to_string(), s3_endpoint.clone()); } with.insert("s3.access.key".to_string(), s3_ak.clone()); with.insert("s3.secret.key".to_string(), s3_sk.clone()); with.insert("s3.region".to_string(), s3_region.clone()); - with.insert("catalog.uri".to_string(), meta_store_uri.clone()); + with.insert("catalog.uri".to_string(), catalog_uri.clone()); with.insert("catalog.jdbc.user".to_string(), meta_store_user.clone()); with.insert( "catalog.jdbc.password".to_string(), @@ -1494,6 +1568,9 @@ pub async fn handle_create_table( with.insert("catalog.name".to_string(), iceberg_catalog_name.clone()); with.insert("database.name".to_string(), iceberg_database_name.clone()); with.insert("table.name".to_string(), iceberg_table_name.to_string()); + if nimtable_enable_config_load { + with.insert("enable_config_load".to_string(), "true".to_string()); + } source_handler_args.with_options = WithOptions::new_with_options(with); catalog_writer