Skip to content

Commit

Permalink
feat(nimtable): reuse existing env and add risedev nimtable (#18531)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 committed Sep 20, 2024
1 parent 0fc6c5a commit 5ebd4c5
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 16 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,34 @@ profile:
- use: kafka
persist-data: true

nimtable:
env:
NIMTABLE_ENABLE_CONFIG_LOAD: "false"
AWS_ENDPOINT_URL: "http://127.0.0.1:9301"
AWS_REGION: "ap-southeast-2"
AWS_S3_BUCKET: "hummock001"
RW_DATA_DIRECTORY: "data-dir"
AWS_ACCESS_KEY_ID: "hummockadmin"
AWS_SECRET_ACCESS_KEY: "hummockadmin"
RW_BACKEND: "sqlite"
RW_SQL_USERNAME: "xxx"
RW_SQL_PASSWORD: "xxx"
RW_SQL_ENDPOINT: "sqlite won't be used"
RW_SQL_DATABASE: "/tmp/sqlite/iceberg.db"
CONNECTOR_LIBS_PATH: ".risingwave/bin/connector-node/libs"
steps:
- use: minio
- use: sqlite
- use: meta-node
meta-backend: sqlite
- use: compute-node
- use: frontend
- use: compactor
- use: prometheus
- use: grafana
- use: kafka
persist-data: true

standalone-full-peripherals:
steps:
- use: minio
Expand Down
1 change: 1 addition & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ chrono = { version = "0.4", default-features = false, features = [
"clock",
"std",
] }
clap = { workspace = true }
clickhouse = { git = "https://github.com/risingwavelabs/clickhouse.rs", rev = "d38c8b6391af098b724c114e5a4746aedab6ab8e", features = [
"time",
] }
Expand Down
82 changes: 82 additions & 0 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ use std::sync::Arc;

use anyhow::{anyhow, Context};
use async_trait::async_trait;
use clap::ValueEnum;
use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
use iceberg::spec::TableMetadata;
use iceberg::table::Table as TableV2;
use iceberg::{Catalog as CatalogV2, NamespaceIdent, TableCreation, TableIdent};
use icelake::catalog::CatalogRef;
use icelake::io_v2::input_wrapper::{DeltaWriter, RecordBatchWriter};
Expand All @@ -41,6 +45,7 @@ use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::bail;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::Schema;
use risingwave_common::config::MetaBackend;
use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
use risingwave_pb::connector_service::SinkMetadata;
Expand Down Expand Up @@ -104,6 +109,10 @@ pub struct IcebergConfig {

#[serde(default, deserialize_with = "deserialize_bool_from_string")]
pub create_table_if_not_exists: bool,

/// enable config load currently is used by nimtable, so it only support jdbc catalog.
#[serde(default, deserialize_with = "deserialize_bool_from_string")]
pub enable_config_load: bool,
}

impl IcebergConfig {
Expand Down Expand Up @@ -162,10 +171,82 @@ impl IcebergConfig {
"`commit_checkpoint_interval` must be greater than 0"
)));
}
config = config.fill_for_config_load()?;

Ok(config)
}

pub fn fill_for_config_load(mut self) -> Result<Self> {
if self.enable_config_load {
if self.catalog_type.as_deref() != Some("jdbc") {
return Err(SinkError::Config(anyhow!(
"enable_config_load only support jdbc catalog right now"
)));
}

let Ok(s3_region) = std::env::var("AWS_REGION") else {
bail!("To create an iceberg engine table, AWS_REGION needed to be set");
};
self.region = Some(s3_region);

let Ok(s3_bucket) = std::env::var("AWS_S3_BUCKET") else {
bail!("To create an iceberg engine table, AWS_S3_BUCKET needed to be set");
};

let Ok(data_directory) = std::env::var("RW_DATA_DIRECTORY") else {
bail!("To create an iceberg engine table, RW_DATA_DIRECTORY needed to be set");
};
self.path = format!("s3://{}/{}/nimtable", s3_bucket, data_directory);

let Ok(meta_store_endpoint) = std::env::var("RW_SQL_ENDPOINT") else {
bail!("To create an iceberg engine table, RW_SQL_ENDPOINT needed to be set");
};

let Ok(meta_store_database) = std::env::var("RW_SQL_DATABASE") else {
bail!("To create an iceberg engine table, RW_SQL_DATABASE needed to be set");
};

let Ok(meta_store_backend) = std::env::var("RW_BACKEND") else {
bail!("To create an iceberg engine table, RW_BACKEND needed to be set");
};
let Ok(meta_backend) = MetaBackend::from_str(&meta_store_backend, true) else {
bail!("failed to parse meta backend: {}", meta_store_backend);
};

self.uri = match meta_backend {
MetaBackend::Postgres => Some(format!(
"jdbc:postgresql://{}/{}",
meta_store_endpoint.clone(),
meta_store_database.clone()
)),
MetaBackend::Mysql => Some(format!(
"jdbc:mysql://{}/{}",
meta_store_endpoint.clone(),
meta_store_database.clone()
)),
MetaBackend::Sqlite | MetaBackend::Etcd | MetaBackend::Sql | MetaBackend::Mem => {
bail!(
"Unsupported meta backend for iceberg engine table: {}",
meta_store_backend
);
}
};

let Ok(meta_store_user) = std::env::var("RW_SQL_USERNAME") else {
bail!("To create an iceberg engine table, RW_SQL_USERNAME needed to be set");
};

let Ok(meta_store_password) = std::env::var("RW_SQL_PASSWORD") else {
bail!("To create an iceberg engine table, RW_SQL_PASSWORD needed to be set");
};

let java_catalog_props = &mut self.java_catalog_props;
java_catalog_props.insert("jdbc.user".to_string(), meta_store_user);
java_catalog_props.insert("jdbc.password".to_string(), meta_store_password);
}
Ok(self)
}

pub fn catalog_type(&self) -> &str {
self.common.catalog_type()
}
Expand Down Expand Up @@ -968,6 +1049,7 @@ mod test {
.collect(),
commit_checkpoint_interval: DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
create_table_if_not_exists: false,
enable_config_load: false,
};

assert_eq!(iceberg_config, expected_iceberg_config);
Expand Down
9 changes: 8 additions & 1 deletion src/connector/src/source/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ use risingwave_common::types::JsonbVal;
use serde::{Deserialize, Serialize};

use crate::connector_common::IcebergCommon;
use crate::deserialize_optional_bool_from_string;
use crate::error::{ConnectorError, ConnectorResult};
use crate::parser::ParserConfig;
use crate::source::{
BoxChunkSourceStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties,
SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields,
};

pub const ICEBERG_CONNECTOR: &str = "iceberg";

#[derive(Clone, Debug, Deserialize, with_options::WithOptions)]
Expand All @@ -50,6 +50,13 @@ pub struct IcebergProperties {
#[serde(rename = "catalog.jdbc.password")]
pub jdbc_password: Option<String>,

#[serde(
rename = "enable_config_load",
default,
deserialize_with = "deserialize_optional_bool_from_string"
)]
pub enable_config_load: Option<bool>,

#[serde(flatten)]
pub unknown_fields: HashMap<String, String>,
}
Expand Down
5 changes: 5 additions & 0 deletions src/connector/with_options_sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,11 @@ IcebergConfig:
field_type: bool
required: false
default: Default::default
- name: enable_config_load
field_type: bool
comments: enable config load currently is used by nimtable, so it only support jdbc catalog.
required: false
default: Default::default
KafkaConfig:
fields:
- name: properties.bootstrap.server
Expand Down
4 changes: 4 additions & 0 deletions src/connector/with_options_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ IcebergProperties:
- name: catalog.jdbc.password
field_type: String
required: false
- name: enable_config_load
field_type: bool
required: false
default: Default::default
KafkaProperties:
fields:
- name: bytes.per.second
Expand Down
Loading

0 comments on commit 5ebd4c5

Please sign in to comment.