From 594ef891d30067f99d48ce5e515a3134a1df7f9a Mon Sep 17 00:00:00 2001 From: xxchan Date: Mon, 16 Dec 2024 12:39:05 +0800 Subject: [PATCH] feat: replace icelake Transform with iceberg (#18625) Signed-off-by: xxchan --- Cargo.lock | 9 +- Cargo.toml | 6 +- .../connector_common/iceberg/mock_catalog.rs | 250 ++++++++++++++++++ .../src/connector_common/iceberg/mod.rs | 1 + src/connector/src/sink/iceberg/mod.rs | 8 + src/expr/impl/Cargo.toml | 2 +- src/expr/impl/src/scalar/external/iceberg.rs | 27 +- src/frontend/Cargo.toml | 1 - src/frontend/src/handler/create_sink.rs | 62 ++--- .../src/optimizer/plan_node/stream_sink.rs | 5 +- 10 files changed, 314 insertions(+), 57 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 69cb010feae83..b242d74d2e06f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6305,7 +6305,7 @@ dependencies = [ [[package]] name = "iceberg" version = "0.3.0" -source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=e28726443a57028f7c7df11d6d385470dc484d46#e28726443a57028f7c7df11d6d385470dc484d46" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=53f786fb2141b51d10a173cbcb5595edd5aa52a6#53f786fb2141b51d10a173cbcb5595edd5aa52a6" dependencies = [ "anyhow", "apache-avro 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -6352,7 +6352,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-glue" version = "0.3.0" -source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=e28726443a57028f7c7df11d6d385470dc484d46#e28726443a57028f7c7df11d6d385470dc484d46" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=53f786fb2141b51d10a173cbcb5595edd5aa52a6#53f786fb2141b51d10a173cbcb5595edd5aa52a6" dependencies = [ "anyhow", "async-trait", @@ -6369,7 +6369,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-rest" version = "0.3.0" -source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=e28726443a57028f7c7df11d6d385470dc484d46#e28726443a57028f7c7df11d6d385470dc484d46" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=53f786fb2141b51d10a173cbcb5595edd5aa52a6#53f786fb2141b51d10a173cbcb5595edd5aa52a6" dependencies = [ "async-trait", "chrono", @@ -11358,7 +11358,7 @@ dependencies = [ "ginepro", "hex", "hmac", - "icelake", + "iceberg", "itertools 0.13.0", "jsonbb", "linkme", @@ -11429,7 +11429,6 @@ dependencies = [ "futures-async-stream", "iana-time-zone", "iceberg", - "icelake", "itertools 0.13.0", "jsonbb", "linkme", diff --git a/Cargo.toml b/Cargo.toml index e1956166592c7..0644def23d5c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -143,9 +143,9 @@ icelake = { git = "https://github.com/risingwavelabs/icelake.git", rev = "0ec44f "prometheus", ] } # branch dev-rebase-main-20241030 -iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "e28726443a57028f7c7df11d6d385470dc484d46" } -iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "e28726443a57028f7c7df11d6d385470dc484d46" } -iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "e28726443a57028f7c7df11d6d385470dc484d46" } +iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "53f786fb2141b51d10a173cbcb5595edd5aa52a6" } +iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "53f786fb2141b51d10a173cbcb5595edd5aa52a6" } +iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "53f786fb2141b51d10a173cbcb5595edd5aa52a6" } opendal = "0.49" # used only by arrow-udf-flight arrow-flight = "53" diff --git a/src/connector/src/connector_common/iceberg/mock_catalog.rs b/src/connector/src/connector_common/iceberg/mock_catalog.rs index 1e4910d9ff5c6..b2723cf40dfbb 100644 --- a/src/connector/src/connector_common/iceberg/mock_catalog.rs +++ b/src/connector/src/connector_common/iceberg/mock_catalog.rs @@ -23,6 +23,7 @@ use opendal::services::Memory; use opendal::Operator; /// A mock catalog for iceberg used for plan test. +#[derive(Debug)] pub struct MockCatalog; impl MockCatalog { @@ -233,3 +234,252 @@ impl Catalog for MockCatalog { unimplemented!() } } + +mod v2 { + use std::collections::HashMap; + + use async_trait::async_trait; + use iceberg::io::FileIO; + use iceberg::spec::{ + NestedField, PrimitiveType, Schema, TableMetadataBuilder, Transform, Type, + UnboundPartitionField, UnboundPartitionSpec, + }; + use iceberg::table::Table as TableV2; + use iceberg::{ + Catalog as CatalogV2, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, + }; + + use super::MockCatalog; + + impl MockCatalog { + fn build_table_v2( + name: &str, + schema: Schema, + partition_spec: UnboundPartitionSpec, + ) -> TableV2 { + let file_io = FileIO::from_path("memory://").unwrap().build().unwrap(); + let table_creation = TableCreation { + name: "ignore".to_owned(), + location: Some("1".to_owned()), + schema, + partition_spec: Some(partition_spec), + sort_order: None, + properties: HashMap::new(), + }; + TableV2::builder() + .identifier(TableIdent::new( + NamespaceIdent::new("mock_namespace".to_owned()), + name.to_owned(), + )) + .file_io(file_io) + .metadata( + TableMetadataBuilder::from_table_creation(table_creation) + .unwrap() + .build() + .unwrap(), + ) + .build() + .unwrap() + } + + fn sparse_table_v2() -> TableV2 { + Self::build_table_v2( + Self::SPARSE_TABLE, + Schema::builder() + .with_fields(vec![ + NestedField::new(1, "v1", Type::Primitive(PrimitiveType::Int), true).into(), + NestedField::new(2, "v2", Type::Primitive(PrimitiveType::Long), true) + .into(), + NestedField::new(3, "v3", Type::Primitive(PrimitiveType::String), true) + .into(), + NestedField::new(4, "v4", Type::Primitive(PrimitiveType::Time), true) + .into(), + ]) + .build() + .unwrap(), + UnboundPartitionSpec::builder() + .with_spec_id(1) + .add_partition_fields(vec![ + UnboundPartitionField { + source_id: 1, + field_id: Some(5), + name: "f1".to_owned(), + transform: Transform::Identity, + }, + UnboundPartitionField { + source_id: 2, + field_id: Some(6), + name: "f2".to_owned(), + transform: Transform::Bucket(1), + }, + UnboundPartitionField { + source_id: 3, + field_id: Some(7), + name: "f3".to_owned(), + transform: Transform::Truncate(1), + }, + UnboundPartitionField { + source_id: 4, + field_id: Some(8), + name: "f4".to_owned(), + transform: Transform::Void, + }, + ]) + .unwrap() + .build(), + ) + } + + fn range_table_v2() -> TableV2 { + Self::build_table_v2( + Self::RANGE_TABLE, + Schema::builder() + .with_fields(vec![ + NestedField::new(1, "v1", Type::Primitive(PrimitiveType::Date), true) + .into(), + NestedField::new(2, "v2", Type::Primitive(PrimitiveType::Timestamp), true) + .into(), + NestedField::new( + 3, + "v3", + Type::Primitive(PrimitiveType::Timestamptz), + true, + ) + .into(), + NestedField::new( + 4, + "v4", + Type::Primitive(PrimitiveType::Timestamptz), + true, + ) + .into(), + ]) + .build() + .unwrap(), + UnboundPartitionSpec::builder() + .with_spec_id(1) + .add_partition_fields(vec![ + UnboundPartitionField { + source_id: 1, + field_id: Some(5), + name: "f1".to_owned(), + transform: Transform::Year, + }, + UnboundPartitionField { + source_id: 2, + field_id: Some(6), + name: "f2".to_owned(), + transform: Transform::Month, + }, + UnboundPartitionField { + source_id: 3, + field_id: Some(7), + name: "f3".to_owned(), + transform: Transform::Day, + }, + UnboundPartitionField { + source_id: 4, + field_id: Some(8), + name: "f4".to_owned(), + transform: Transform::Hour, + }, + ]) + .unwrap() + .build(), + ) + } + } + + #[async_trait] + impl CatalogV2 for MockCatalog { + /// List namespaces from table. + async fn list_namespaces( + &self, + _parent: Option<&NamespaceIdent>, + ) -> iceberg::Result> { + todo!() + } + + /// Create a new namespace inside the catalog. + async fn create_namespace( + &self, + _namespace: &iceberg::NamespaceIdent, + _properties: HashMap, + ) -> iceberg::Result { + todo!() + } + + /// Get a namespace information from the catalog. + async fn get_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result { + todo!() + } + + /// Check if namespace exists in catalog. + async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> iceberg::Result { + todo!() + } + + /// Drop a namespace from the catalog. + async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result<()> { + todo!() + } + + /// List tables from namespace. + async fn list_tables( + &self, + _namespace: &NamespaceIdent, + ) -> iceberg::Result> { + todo!() + } + + async fn update_namespace( + &self, + _namespace: &NamespaceIdent, + _properties: HashMap, + ) -> iceberg::Result<()> { + todo!() + } + + /// Create a new table inside the namespace. + async fn create_table( + &self, + _namespace: &NamespaceIdent, + _creation: TableCreation, + ) -> iceberg::Result { + todo!() + } + + /// Load table from the catalog. + async fn load_table(&self, table: &TableIdent) -> iceberg::Result { + match table.name.as_ref() { + Self::SPARSE_TABLE => Ok(Self::sparse_table_v2()), + Self::RANGE_TABLE => Ok(Self::range_table_v2()), + _ => unimplemented!("table {} not found", table.name()), + } + } + + /// Drop a table from the catalog. + async fn drop_table(&self, _table: &TableIdent) -> iceberg::Result<()> { + todo!() + } + + /// Check if a table exists in the catalog. + async fn table_exists(&self, table: &TableIdent) -> iceberg::Result { + match table.name.as_ref() { + Self::SPARSE_TABLE => Ok(true), + Self::RANGE_TABLE => Ok(true), + _ => Ok(false), + } + } + + /// Rename a table in the catalog. + async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> iceberg::Result<()> { + todo!() + } + + /// Update a table to the catalog. + async fn update_table(&self, _commit: TableCommit) -> iceberg::Result { + todo!() + } + } +} diff --git a/src/connector/src/connector_common/iceberg/mod.rs b/src/connector/src/connector_common/iceberg/mod.rs index d945e6449958b..c1a80a4d4fcb5 100644 --- a/src/connector/src/connector_common/iceberg/mod.rs +++ b/src/connector/src/connector_common/iceberg/mod.rs @@ -630,6 +630,7 @@ mod v2 { java_catalog_props, ) } + "mock" => Ok(Arc::new(mock_catalog::MockCatalog {})), _ => { bail!( "Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `jdbc`, `glue`", diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 3679cd5ab298f..8242144918930 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -22,6 +22,7 @@ use std::sync::Arc; use anyhow::{anyhow, Context}; use async_trait::async_trait; +use iceberg::table::Table as TableV2; use iceberg::{Catalog as CatalogV2, NamespaceIdent, TableCreation, TableIdent}; use icelake::catalog::CatalogRef; use icelake::io_v2::input_wrapper::{DeltaWriter, RecordBatchWriter}; @@ -185,6 +186,13 @@ impl IcebergConfig { .map_err(Into::into) } + pub async fn load_table_v2(&self) -> Result { + self.common + .load_table_v2(&self.java_catalog_props) + .await + .map_err(Into::into) + } + pub fn full_table_name_v2(&self) -> Result { self.common.full_table_name_v2().map_err(Into::into) } diff --git a/src/expr/impl/Cargo.toml b/src/expr/impl/Cargo.toml index 87b27a9e670de..c7b1974d8699a 100644 --- a/src/expr/impl/Cargo.toml +++ b/src/expr/impl/Cargo.toml @@ -45,7 +45,7 @@ futures-util = "0.3" ginepro = "0.8" hex = "0.4" hmac = "0.12" -icelake = { workspace = true } +iceberg = { workspace = true } itertools = { workspace = true } jsonbb = { workspace = true } linkme = { workspace = true } diff --git a/src/expr/impl/src/scalar/external/iceberg.rs b/src/expr/impl/src/scalar/external/iceberg.rs index c6881c5228e43..797a87510623e 100644 --- a/src/expr/impl/src/scalar/external/iceberg.rs +++ b/src/expr/impl/src/scalar/external/iceberg.rs @@ -19,9 +19,8 @@ use std::str::FromStr; use std::sync::Arc; use anyhow::anyhow; -use icelake::types::{ - create_transform_function, Any as IcelakeDataType, BoxedTransformFunction, Transform, -}; +use iceberg::spec::{PrimitiveType, Transform, Type as IcebergType}; +use iceberg::transform::{create_transform_function, BoxedTransformFunction}; use risingwave_common::array::arrow::{arrow_schema_iceberg, IcebergArrowConvert}; use risingwave_common::array::{ArrayRef, DataChunk}; use risingwave_common::ensure; @@ -81,7 +80,7 @@ fn build(return_type: DataType, mut children: Vec) -> Result) -> Result) -> Result) -> Result + (matches!(transform_type, Transform::Day) && matches!(actual_res_type, IcebergType::Primitive(PrimitiveType::Int))), ExprError::InvalidParam { name: "return type in iceberg_transform", reason: format!( - "Expect return type {:?} but got {:?}", - expect_res_type, actual_res_type + "Expect return type {:?} but got {:?}, RisingWave return type is {:?}, input type is {:?}, transform type is {:?}", + expect_res_type, + actual_res_type, + return_type, + (input_type, input_arrow_type), + transform_type ) .into() } diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 9dfb31a141cc2..34964e4c3438f 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -39,7 +39,6 @@ futures = { version = "0.3", default-features = false, features = ["alloc"] } futures-async-stream = { workspace = true } iana-time-zone = "0.1" iceberg = { workspace = true } -icelake = { workspace = true } itertools = { workspace = true } jsonbb = { workspace = true } linkme = { workspace = true } diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index c4d6793444104..c89050b331d3f 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -379,62 +379,56 @@ async fn get_partition_compute_info_for_iceberg( if _iceberg_config.create_table_if_not_exists { return Ok(None); } - let table = _iceberg_config.load_table().await?; - let Some(partition_spec) = table.current_table_metadata().current_partition_spec().ok() - else { - return Ok(None); - }; + let table = _iceberg_config.load_table_v2().await?; + let partition_spec = table.metadata().default_partition_spec(); if partition_spec.is_unpartitioned() { return Ok(None); } + use iceberg::spec::Transform; // Separate the partition spec into two parts: sparse partition and range partition. // Sparse partition means that the data distribution is more sparse at a given time. // Range partition means that the data distribution is likely same at a given time. // Only compute the partition and shuffle by them for the sparse partition. - let has_sparse_partition = partition_spec.fields.iter().any(|f| match f.transform { + let has_sparse_partition = partition_spec.fields().iter().any(|f| match f.transform { // Sparse partition - icelake::types::Transform::Identity - | icelake::types::Transform::Truncate(_) - | icelake::types::Transform::Bucket(_) => true, + Transform::Identity | Transform::Truncate(_) | Transform::Bucket(_) => true, // Range partition - icelake::types::Transform::Year - | icelake::types::Transform::Month - | icelake::types::Transform::Day - | icelake::types::Transform::Hour - | icelake::types::Transform::Void => false, + Transform::Year + | Transform::Month + | Transform::Day + | Transform::Hour + | Transform::Void => false, + // unknown + Transform::Unknown => false, }); if !has_sparse_partition { return Ok(None); } - let arrow_type: ArrowDataType = table - .current_partition_type() - .map_err(|err| RwError::from(ErrorCode::SinkError(err.into())))? - .try_into() + let schema = table.metadata().current_schema(); + let partition_type = partition_spec + .partition_type(schema) + .map_err(|err| RwError::from(ErrorCode::SinkError(err.into())))?; + let arrow_type: ArrowDataType = iceberg::arrow::type_to_arrow_type(&partition_type.into()) .map_err(|_| { RwError::from(ErrorCode::SinkError( "Fail to convert iceberg partition type to arrow type".into(), )) })?; - let Some(schema) = table.current_table_metadata().current_schema().ok() else { - return Ok(None); - }; - let partition_fields = partition_spec - .fields - .iter() - .map(|f| { - let source_f = - schema - .look_up_field_by_id(f.source_column_id) - .ok_or(RwError::from(ErrorCode::SinkError( - "Fail to look up iceberg partition field".into(), - )))?; - Ok((source_f.name.clone(), f.transform)) - }) - .collect::>>()?; + let partition_fields = + partition_spec + .fields() + .iter() + .map(|f| { + let source_f = schema.field_by_id(f.source_id).ok_or(RwError::from( + ErrorCode::SinkError("Fail to look up iceberg partition field".into()), + ))?; + Ok((source_f.name.clone(), f.transform)) + }) + .collect::>>()?; let ArrowDataType::Struct(partition_type) = arrow_type else { return Err(RwError::from(ErrorCode::SinkError( diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 2023b7936439a..61572fe179005 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use anyhow::anyhow; use fixedbitset::FixedBitSet; -use icelake::types::Transform; +use iceberg::spec::Transform; use itertools::Itertools; use pretty_xmlish::{Pretty, XmlNode}; use risingwave_common::catalog::{ColumnCatalog, CreateType}; @@ -612,13 +612,12 @@ impl ExprVisitable for StreamSink {} #[cfg(test)] mod test { - use icelake::types::Transform; use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId}; use risingwave_common::types::{DataType, StructType}; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_pb::expr::expr_node::Type; - use super::IcebergPartitionInfo; + use super::{IcebergPartitionInfo, *}; use crate::expr::{Expr, ExprImpl}; fn create_column_catalog() -> Vec {