From 34a2ad42da6a6dcb45b1260b6cecfce89d1c360c Mon Sep 17 00:00:00 2001 From: Dylan Date: Thu, 12 Sep 2024 15:48:24 +0800 Subject: [PATCH] feat(iceberg): support create iceberg table w/ one layer nested type & scan table w/ nested type (#18498) Co-authored-by: xxchan --- Cargo.lock | 9 +- Cargo.toml | 6 +- ...rg_sink_no_partition_append_only_table.slt | 28 ++++++ src/common/src/array/arrow/arrow_iceberg.rs | 90 +++++++++++++++++++ src/common/src/array/arrow/mod.rs | 2 +- src/connector/src/sink/iceberg/mod.rs | 21 ++--- 6 files changed, 130 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 98d01dff60589..85fe3c1c4955e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6004,8 +6004,7 @@ dependencies = [ [[package]] name = "iceberg" version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "651dfca7c429918e164607a549287cfdd1e7814d2e4cb577d0d6dc57fe19b785" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=df076a9949e9ed6431a6dd3998ac0c49152dda9c#df076a9949e9ed6431a6dd3998ac0c49152dda9c" dependencies = [ "anyhow", "apache-avro 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -6047,8 +6046,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-glue" version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64ef7c992442a80c46975e08f3862140ca3e1c1c772aa68baaf65bb08f97ff07" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=df076a9949e9ed6431a6dd3998ac0c49152dda9c#df076a9949e9ed6431a6dd3998ac0c49152dda9c" dependencies = [ "anyhow", "async-trait", @@ -6065,8 +6063,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-rest" version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f351c7b964fa6f3b4f976f8de3f16f1bf84eea8478606aaebdfd6a871d6b082c" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=df076a9949e9ed6431a6dd3998ac0c49152dda9c#df076a9949e9ed6431a6dd3998ac0c49152dda9c" dependencies = [ "async-trait", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 46ab2695a4ebb..a6fc35d11c908 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -143,9 +143,9 @@ arrow-array-iceberg = { package = "arrow-array", version = "52" } arrow-schema-iceberg = { package = "arrow-schema", version = "52" } arrow-buffer-iceberg = { package = "arrow-buffer", version = "52" } arrow-cast-iceberg = { package = "arrow-cast", version = "52" } -iceberg = "0.3.0" -iceberg-catalog-rest = "0.3.0" -iceberg-catalog-glue = "0.3.0" +iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "df076a9949e9ed6431a6dd3998ac0c49152dda9c" } +iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "df076a9949e9ed6431a6dd3998ac0c49152dda9c" } +iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "df076a9949e9ed6431a6dd3998ac0c49152dda9c" } opendal = "0.47" arrow-array = "50" arrow-arith = "50" diff --git a/e2e_test/iceberg/test_case/iceberg_sink_no_partition_append_only_table.slt b/e2e_test/iceberg/test_case/iceberg_sink_no_partition_append_only_table.slt index d57c3096cc1ee..49c4cf3fb1145 100644 --- a/e2e_test/iceberg/test_case/iceberg_sink_no_partition_append_only_table.slt +++ b/e2e_test/iceberg/test_case/iceberg_sink_no_partition_append_only_table.slt @@ -39,6 +39,7 @@ CREATE SINK s6 AS select * from mv6 WITH ( s3.region = 'us-east-1', s3.access.key = 'hummockadmin', s3.secret.key = 'hummockadmin', + create_table_if_not_exists = 'true' ); statement ok @@ -62,6 +63,30 @@ FLUSH; sleep 5s +statement ok +CREATE Source iceberg_s WITH ( + connector = 'iceberg', + database.name = 'demo_db', + table.name = 'no_partition_append_only_table', + catalog.name = 'demo', + catalog.type = 'storage', + warehouse.path = 's3a://icebergdata/demo', + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin' +); + +query ?????????????? rowsort +select * from iceberg_s +---- +1 1 1000 1.1 1.11 1-1 t 2022-03-11 2022-03-11 01:00:00+00:00 2022-03-11 01:00:00 1.11000 {1:100,2:200} {1,2,3} (1,2) +2 2 2000 2.2 2.22 2-2 f 2022-03-12 2022-03-12 02:00:00+00:00 2022-03-12 02:00:00 2.22000 {3:300} {1,NULL,3} (3,) +3 3 3000 3.3 3.33 3-3 t 2022-03-13 2022-03-13 03:00:00+00:00 2022-03-13 03:00:00 99999.99999 NULL NULL NULL +4 4 4000 4.4 4.44 4-4 f 2022-03-14 2022-03-14 04:00:00+00:00 2022-03-14 04:00:00 -99999.99999 NULL NULL NULL +5 5 5000 5.5 5.55 5-5 t 2022-03-15 2022-03-15 05:00:00+00:00 2022-03-15 05:00:00 NULL NULL NULL NULL + + statement ok DROP SINK s6; @@ -70,3 +95,6 @@ DROP MATERIALIZED VIEW mv6; statement ok DROP TABLE t6; + +statement ok +DROP SOURCE iceberg_s; diff --git a/src/common/src/array/arrow/arrow_iceberg.rs b/src/common/src/array/arrow/arrow_iceberg.rs index ff23bc102ee6b..25a03c8d7f6d4 100644 --- a/src/common/src/array/arrow/arrow_iceberg.rs +++ b/src/common/src/array/arrow/arrow_iceberg.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cell::RefCell; +use std::collections::HashMap; use std::ops::{Div, Mul}; use std::sync::Arc; @@ -171,6 +173,94 @@ impl ToArrow for IcebergArrowConvert { impl FromArrow for IcebergArrowConvert {} +/// Iceberg sink with `create_table_if_not_exists` option will use this struct to convert the +/// iceberg data type to arrow data type. Specifically, it will add the field id to the +/// arrow field metadata, because iceberg-rust and icelake need the field id to be set. +/// +/// Note: this is different from [`IcebergArrowConvert`], which is used to read from/write to +/// an _existing_ iceberg table. In that case, we just need to make sure the data is compatible to the existing schema. +/// But to _create a new table_, we need to meet more requirements of iceberg. +#[derive(Default)] +pub struct IcebergCreateTableArrowConvert { + next_field_id: RefCell, +} + +impl IcebergCreateTableArrowConvert { + pub fn to_arrow_field( + &self, + name: &str, + data_type: &DataType, + ) -> Result { + ToArrow::to_arrow_field(self, name, data_type) + } + + fn add_field_id(&self, arrow_field: &mut arrow_schema::Field) { + *self.next_field_id.borrow_mut() += 1; + let field_id = *self.next_field_id.borrow(); + + let mut metadata = HashMap::new(); + // for iceberg-rust + metadata.insert("PARQUET:field_id".to_string(), field_id.to_string()); + // for icelake + metadata.insert("column_id".to_string(), field_id.to_string()); + arrow_field.set_metadata(metadata); + } +} + +impl ToArrow for IcebergCreateTableArrowConvert { + #[inline] + fn decimal_type_to_arrow(&self, name: &str) -> arrow_schema::Field { + // To create a iceberg table, we need a decimal type with precision and scale to be set + // We choose 28 here + // The decimal type finally will be converted to an iceberg decimal type. + // Iceberg decimal(P,S) + // Fixed-point decimal; precision P, scale S Scale is fixed, precision must be 38 or less. + let data_type = arrow_schema::DataType::Decimal128(28, 10); + + let mut arrow_field = arrow_schema::Field::new(name, data_type, true); + self.add_field_id(&mut arrow_field); + arrow_field + } + + /// Convert RisingWave data type to Arrow data type. + /// + /// This function returns a `Field` instead of `DataType` because some may be converted to + /// extension types which require additional metadata in the field. + fn to_arrow_field( + &self, + name: &str, + value: &DataType, + ) -> Result { + let data_type = match value { + // using the inline function + DataType::Boolean => self.bool_type_to_arrow(), + DataType::Int16 => self.int16_type_to_arrow(), + DataType::Int32 => self.int32_type_to_arrow(), + DataType::Int64 => self.int64_type_to_arrow(), + DataType::Int256 => self.int256_type_to_arrow(), + DataType::Float32 => self.float32_type_to_arrow(), + DataType::Float64 => self.float64_type_to_arrow(), + DataType::Date => self.date_type_to_arrow(), + DataType::Time => self.time_type_to_arrow(), + DataType::Timestamp => self.timestamp_type_to_arrow(), + DataType::Timestamptz => self.timestamptz_type_to_arrow(), + DataType::Interval => self.interval_type_to_arrow(), + DataType::Varchar => self.varchar_type_to_arrow(), + DataType::Bytea => self.bytea_type_to_arrow(), + DataType::Serial => self.serial_type_to_arrow(), + DataType::Decimal => return Ok(self.decimal_type_to_arrow(name)), + DataType::Jsonb => return Ok(self.jsonb_type_to_arrow(name)), + DataType::Struct(fields) => self.struct_type_to_arrow(fields)?, + DataType::List(datatype) => self.list_type_to_arrow(datatype)?, + DataType::Map(datatype) => self.map_type_to_arrow(datatype)?, + }; + + let mut arrow_field = arrow_schema::Field::new(name, data_type, true); + self.add_field_id(&mut arrow_field); + Ok(arrow_field) + } +} + #[cfg(test)] mod test { use std::sync::Arc; diff --git a/src/common/src/array/arrow/mod.rs b/src/common/src/array/arrow/mod.rs index fd9f55ee09f7e..d519d62f9935a 100644 --- a/src/common/src/array/arrow/mod.rs +++ b/src/common/src/array/arrow/mod.rs @@ -17,7 +17,7 @@ mod arrow_iceberg; mod arrow_udf; pub use arrow_deltalake::DeltaLakeConvert; -pub use arrow_iceberg::IcebergArrowConvert; +pub use arrow_iceberg::{IcebergArrowConvert, IcebergCreateTableArrowConvert}; pub use arrow_udf::{FromArrow, ToArrow, UdfArrowConvert}; use crate::types::Interval; diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index e295938a45a61..977d165171881 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -43,11 +43,10 @@ use icelake::io_v2::{ DataFileWriterBuilder, EqualityDeltaWriterBuilder, IcebergWriterBuilder, DELETE_OP, INSERT_OP, }; use icelake::transaction::Transaction; -use icelake::types::{data_file_from_json, data_file_to_json, Any, DataFile, COLUMN_ID_META_KEY}; +use icelake::types::{data_file_from_json, data_file_to_json, Any, DataFile}; use icelake::{Table, TableIdentifier}; use itertools::Itertools; -use parquet::arrow::PARQUET_FIELD_ID_META_KEY; -use risingwave_common::array::arrow::IcebergArrowConvert; +use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert}; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::bail; use risingwave_common::bitmap::Bitmap; @@ -745,30 +744,20 @@ impl IcebergSink { bail!("database name must be set if you want to create table") }; + let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default(); // convert risingwave schema -> arrow schema -> iceberg schema let arrow_fields = self .param .columns .iter() .map(|column| { - let mut arrow_field = IcebergArrowConvert + Ok(iceberg_create_table_arrow_convert .to_arrow_field(&column.name, &column.data_type) .map_err(|e| SinkError::Iceberg(anyhow!(e))) .context(format!( "failed to convert {}: {} to arrow type", &column.name, &column.data_type - ))?; - let mut metadata = HashMap::new(); - metadata.insert( - PARQUET_FIELD_ID_META_KEY.to_string(), - column.column_id.get_id().to_string(), - ); - metadata.insert( - COLUMN_ID_META_KEY.to_string(), - column.column_id.get_id().to_string(), - ); - arrow_field.set_metadata(metadata); - Ok(arrow_field) + ))?) }) .collect::>>()?; let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);