diff --git a/Cargo.lock b/Cargo.lock index 5dda11e2a8215..841ed332603ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -618,26 +618,6 @@ dependencies = [ "num", ] -[[package]] -name = "arrow-flight" -version = "50.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d7f215461ad6346f2e4cc853e377d4e076d533e1ed78d327debe83023e3601f" -dependencies = [ - "arrow-array 50.0.0", - "arrow-buffer 50.0.0", - "arrow-cast 50.0.0", - "arrow-ipc 50.0.0", - "arrow-schema 50.0.0", - "base64 0.21.7", - "bytes", - "futures", - "paste", - "prost 0.12.6", - "tokio", - "tonic 0.10.2", -] - [[package]] name = "arrow-flight" version = "52.0.0" @@ -925,11 +905,12 @@ dependencies = [ [[package]] name = "arrow-udf-flight" -version = "0.1.0" -source = "git+https://github.com/risingwavelabs/arrow-udf?rev=54f6d69#54f6d69c618bfb749abddc507e46ee06497c71b2" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a419f7d50bfd3d80f06c138f7e7e56526177eb2bedf2029f20543373299cb57c" dependencies = [ "arrow-array 52.0.0", - "arrow-flight 52.0.0", + "arrow-flight", "arrow-schema 52.0.0", "arrow-select 52.0.0", "futures-util", @@ -946,9 +927,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76cb6d108605c5489fff1ef9c520656946ad05ed0de3ea6d26d56bcb34bdb8c5" dependencies = [ "anyhow", - "arrow-array 50.0.0", - "arrow-buffer 50.0.0", - "arrow-schema 50.0.0", + "arrow-array 52.0.0", + "arrow-buffer 52.0.0", + "arrow-schema 52.0.0", "atomic-time", "rquickjs", ] @@ -959,10 +940,10 @@ version = "0.0.1" source = "git+https://github.com/risingwavelabs/arrow-udf.git?rev=fa36365#fa3636559de986aa592da6e8b3fbfac7bdd4bb78" dependencies = [ "anyhow", - "arrow-array 50.0.0", - "arrow-buffer 50.0.0", - "arrow-data 50.0.0", - "arrow-schema 50.0.0", + "arrow-array 52.0.0", + "arrow-buffer 52.0.0", + "arrow-data 52.0.0", + "arrow-schema 52.0.0", "arrow-udf-js-deno-runtime", "async-trait", "deno_core", @@ -1010,10 +991,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4506efc6fbc200c083add2a7ed4e3616a859941a745e922320ae7051d90d12ec" dependencies = [ "anyhow", - "arrow-array 50.0.0", - "arrow-buffer 50.0.0", - "arrow-ipc 50.0.0", - "arrow-schema 50.0.0", + "arrow-array 52.0.0", + "arrow-buffer 52.0.0", + "arrow-ipc 52.0.0", + "arrow-schema 52.0.0", "lazy_static", "pyo3", "pyo3-build-config", @@ -1026,9 +1007,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb829e25925161d93617d4b053bae03fe51e708f2cce088d85df856011d4f369" dependencies = [ "anyhow", - "arrow-array 50.0.0", - "arrow-ipc 50.0.0", - "arrow-schema 50.0.0", + "arrow-array 52.0.0", + "arrow-ipc 52.0.0", + "arrow-schema 52.0.0", "async-trait", "base64 0.22.0", "genawaiter", @@ -5943,7 +5924,7 @@ dependencies = [ "http 0.2.9", "thiserror", "tokio", - "tonic 0.10.2", + "tonic 0.11.0", "tower", "tracing", "trust-dns-resolver 0.23.2", @@ -8935,38 +8916,6 @@ dependencies = [ "zstd 0.13.0", ] -[[package]] -name = "parquet" -version = "50.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "547b92ebf0c1177e3892f44c8f79757ee62e678d564a9834189725f2c5b7a750" -dependencies = [ - "ahash 0.8.11", - "arrow-array 50.0.0", - "arrow-buffer 50.0.0", - "arrow-cast 50.0.0", - "arrow-data 50.0.0", - "arrow-ipc 50.0.0", - "arrow-schema 50.0.0", - "arrow-select 50.0.0", - "base64 0.21.7", - "brotli 3.5.0", - "bytes", - "chrono", - "flate2", - "half 2.3.1", - "hashbrown 0.14.3", - "lz4_flex", - "num", - "num-bigint", - "paste", - "seq-macro", - "snap", - "thrift", - "twox-hash", - "zstd 0.13.0", -] - [[package]] name = "parquet" version = "52.0.0" @@ -9816,7 +9765,7 @@ checksum = "8bdf592881d821b83d471f8af290226c8d51402259e9bb5be7f9f8bdebbb11ac" dependencies = [ "bytes", "heck 0.4.1", - "itertools 0.10.5", + "itertools 0.11.0", "log", "multimap 0.8.3", "once_cell", @@ -9850,7 +9799,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools 0.12.1", "proc-macro2", "quote", "syn 2.0.66", @@ -10715,9 +10664,8 @@ name = "risingwave_batch" version = "1.11.0-alpha" dependencies = [ "anyhow", - "arrow-array 50.0.0", "arrow-array 52.0.0", - "arrow-schema 50.0.0", + "arrow-schema 52.0.0", "assert_matches", "async-recursion", "async-trait", @@ -10869,16 +10817,12 @@ dependencies = [ "anyhow", "arc-swap", "arrow-array 48.0.1", - "arrow-array 50.0.0", "arrow-array 52.0.0", "arrow-buffer 48.0.1", - "arrow-buffer 50.0.0", "arrow-buffer 52.0.0", "arrow-cast 48.0.1", - "arrow-cast 50.0.0", "arrow-cast 52.0.0", "arrow-schema 48.0.1", - "arrow-schema 50.0.0", "arrow-schema 52.0.0", "async-trait", "auto_enums", @@ -11177,12 +11121,10 @@ version = "1.11.0-alpha" dependencies = [ "anyhow", "apache-avro 0.16.0", - "arrow-array 50.0.0", "arrow-array 52.0.0", - "arrow-row 50.0.0", - "arrow-schema 50.0.0", + "arrow-row 52.0.0", "arrow-schema 52.0.0", - "arrow-select 50.0.0", + "arrow-select 52.0.0", "assert_matches", "async-nats", "async-trait", @@ -11243,7 +11185,7 @@ dependencies = [ "opendal 0.47.0", "openssl", "parking_lot 0.12.1", - "parquet 50.0.0", + "parquet 52.0.0", "paste", "pg_bigdecimal", "postgres-openssl", @@ -11432,8 +11374,8 @@ name = "risingwave_expr" version = "1.11.0-alpha" dependencies = [ "anyhow", - "arrow-array 50.0.0", - "arrow-schema 50.0.0", + "arrow-array 52.0.0", + "arrow-schema 52.0.0", "async-trait", "auto_impl", "await-tree", @@ -11474,9 +11416,8 @@ version = "1.11.0-alpha" dependencies = [ "aho-corasick", "anyhow", - "arrow-array 50.0.0", - "arrow-flight 50.0.0", - "arrow-schema 50.0.0", + "arrow-array 52.0.0", + "arrow-flight", "arrow-schema 52.0.0", "arrow-udf-flight", "arrow-udf-js", @@ -11542,7 +11483,6 @@ version = "1.11.0-alpha" dependencies = [ "anyhow", "arc-swap", - "arrow-schema 50.0.0", "arrow-schema 52.0.0", "assert_matches", "async-recursion", @@ -15490,7 +15430,10 @@ dependencies = [ "percent-encoding", "pin-project", "prost 0.12.6", + "rustls-pemfile 2.1.1", + "rustls-pki-types", "tokio", + "tokio-rustls 0.25.0", "tokio-stream", "tower", "tower-layer", diff --git a/Cargo.toml b/Cargo.toml index 03b187c60ea7a..5109b5206607a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -135,28 +135,25 @@ prost = { version = "0.12" } icelake = { git = "https://github.com/icelake-io/icelake", rev = "07d53893d7788b4e41fc11efad8a6be828405c31", features = [ "prometheus", ] } -arrow-array-iceberg = { package = "arrow-array", version = "52" } -arrow-schema-iceberg = { package = "arrow-schema", version = "52" } -arrow-buffer-iceberg = { package = "arrow-buffer", version = "52" } -arrow-cast-iceberg = { package = "arrow-cast", version = "52" } + # TODO # After apache/iceberg-rust#411 is merged, we move to the upstream version. iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "0c6e133e6f4655ff9ce4ad57b577dc7f692dd902" } iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "0c6e133e6f4655ff9ce4ad57b577dc7f692dd902" } -arrow-array = "50" -arrow-arith = "50" -arrow-cast = "50" -arrow-schema = "50" -arrow-buffer = "50" -arrow-flight = "50" -arrow-select = "50" -arrow-ord = "50" -arrow-row = "50" +arrow-array = "52" +arrow-arith = "52" +arrow-cast = "52" +arrow-schema = "52" +arrow-buffer = "52" +arrow-flight = "52" +arrow-select = "52" +arrow-ord = "52" +arrow-row = "52" arrow-udf-js = "0.3.1" arrow-udf-js-deno = { git = "https://github.com/risingwavelabs/arrow-udf.git", rev = "fa36365" } arrow-udf-wasm = { version = "0.2.2", features = ["build"] } arrow-udf-python = "0.2" -arrow-udf-flight = "0.1" +arrow-udf-flight = "0.2" arrow-array-deltalake = { package = "arrow-array", version = "48.0.1" } arrow-buffer-deltalake = { package = "arrow-buffer", version = "48.0.1" } arrow-cast-deltalake = { package = "arrow-cast", version = "48.0.1" } @@ -171,7 +168,7 @@ deltalake = { git = "https://github.com/risingwavelabs/delta-rs", rev = "5c2dccd itertools = "0.12.0" jsonbb = "0.1.4" lru = { git = "https://github.com/risingwavelabs/lru-rs.git", rev = "2682b85" } -parquet = "50" +parquet = "52" thiserror-ext = "0.1.2" tikv-jemalloc-ctl = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9" } tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", features = [ @@ -346,7 +343,6 @@ deno_web = { git = "https://github.com/bakjos/deno", rev = "787a232" } deno_websocket = { git = "https://github.com/bakjos/deno", rev = "787a232" } # patch to remove preserve_order from serde_json bson = { git = "https://github.com/risingwavelabs/bson-rust", rev = "e5175ec" } -arrow-udf-flight = { git = "https://github.com/risingwavelabs/arrow-udf", rev = "54f6d69" } [workspace.metadata.dylint] libraries = [{ path = "./lints" }] diff --git a/src/batch/Cargo.toml b/src/batch/Cargo.toml index ef4badfe07666..2ca8ed1be4e77 100644 --- a/src/batch/Cargo.toml +++ b/src/batch/Cargo.toml @@ -16,7 +16,6 @@ normal = ["workspace-hack"] [dependencies] anyhow = "1" arrow-array = { workspace = true } -arrow-array-iceberg = { workspace = true } arrow-schema = { workspace = true } assert_matches = "1" async-recursion = "1" diff --git a/src/batch/src/executor/iceberg_scan.rs b/src/batch/src/executor/iceberg_scan.rs index 2025c20752887..0c7503f61b293 100644 --- a/src/batch/src/executor/iceberg_scan.rs +++ b/src/batch/src/executor/iceberg_scan.rs @@ -15,7 +15,7 @@ use std::hash::{DefaultHasher, Hash, Hasher}; use anyhow::anyhow; -use arrow_array_iceberg::RecordBatch; +use arrow_array::RecordBatch; use futures_async_stream::try_stream; use futures_util::stream::StreamExt; use icelake::io::{FileScan, TableScan}; diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml index ae6f67faf3aac..34507dd87cd75 100644 --- a/src/common/Cargo.toml +++ b/src/common/Cargo.toml @@ -19,16 +19,12 @@ anyhow = "1" arc-swap = "1" arrow-array = { workspace = true } arrow-array-deltalake = { workspace = true } -arrow-array-iceberg = { workspace = true } arrow-buffer = { workspace = true } arrow-buffer-deltalake = { workspace = true } -arrow-buffer-iceberg = { workspace = true } arrow-cast = { workspace = true } arrow-cast-deltalake = { workspace = true } -arrow-cast-iceberg = { workspace = true } arrow-schema = { workspace = true } arrow-schema-deltalake = { workspace = true } -arrow-schema-iceberg = { workspace = true } async-trait = "0.1" auto_enums = { workspace = true } auto_impl = "1" diff --git a/src/common/src/array/arrow/arrow_iceberg.rs b/src/common/src/array/arrow/arrow_iceberg.rs index ff23bc102ee6b..f6ac7dda50e00 100644 --- a/src/common/src/array/arrow/arrow_iceberg.rs +++ b/src/common/src/array/arrow/arrow_iceberg.rs @@ -15,13 +15,10 @@ use std::ops::{Div, Mul}; use std::sync::Arc; -use arrow_array_iceberg::{self as arrow_array, ArrayRef}; -use arrow_buffer_iceberg::IntervalMonthDayNano as ArrowIntervalType; +use arrow_array::{self, ArrayRef}; +use arrow_buffer::IntervalMonthDayNano as ArrowIntervalType; use num_traits::abs; -use { - arrow_buffer_iceberg as arrow_buffer, arrow_cast_iceberg as arrow_cast, - arrow_schema_iceberg as arrow_schema, -}; +use {arrow_buffer, arrow_cast, arrow_schema}; use crate::array::{Array, ArrayError, ArrayImpl, DataChunk, DataType, DecimalArray}; use crate::types::{Interval, StructType}; @@ -49,11 +46,7 @@ impl ArrowIntervalTypeTrait for ArrowIntervalType { } } -#[path = "./arrow_impl.rs"] -mod arrow_impl; - -use arrow_impl::{FromArrow, ToArrow}; - +use super::arrow_udf::arrow_impl::{FromArrow, ToArrow}; use crate::array::arrow::ArrowIntervalTypeTrait; pub struct IcebergArrowConvert; @@ -175,11 +168,11 @@ impl FromArrow for IcebergArrowConvert {} mod test { use std::sync::Arc; - use arrow_array_iceberg::{ArrayRef, Decimal128Array}; - use arrow_schema_iceberg::DataType; + use arrow_array::{ArrayRef, Decimal128Array}; + use arrow_schema::DataType; - use super::arrow_impl::ToArrow; use super::IcebergArrowConvert; + use crate::array::arrow::ToArrow; use crate::array::{Decimal, DecimalArray}; #[test] diff --git a/src/common/src/array/arrow/arrow_udf.rs b/src/common/src/array/arrow/arrow_udf.rs index e461f49e576a6..4c47eee231e79 100644 --- a/src/common/src/array/arrow/arrow_udf.rs +++ b/src/common/src/array/arrow/arrow_udf.rs @@ -28,7 +28,7 @@ use crate::array::{ArrayError, ArrayImpl, DataType, DecimalArray, JsonbArray}; #[expect(clippy::duplicate_mod)] #[path = "./arrow_impl.rs"] -mod arrow_impl; +pub mod arrow_impl; /// Arrow conversion for UDF. #[derive(Default, Debug)] diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 3d8bf618eca58..df1813832f066 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -17,10 +17,8 @@ normal = ["workspace-hack"] anyhow = "1" apache-avro = { workspace = true } arrow-array = { workspace = true } -arrow-array-iceberg = { workspace = true } arrow-row = { workspace = true } arrow-schema = { workspace = true } -arrow-schema-iceberg = { workspace = true } arrow-select = { workspace = true } assert_matches = "1" async-nats = "0.35" diff --git a/src/connector/src/error.rs b/src/connector/src/error.rs index 3a86062d18a07..dda7235e4d955 100644 --- a/src/connector/src/error.rs +++ b/src/connector/src/error.rs @@ -60,7 +60,7 @@ def_anyhow_newtype! { iceberg::Error => "IcebergV2 error", redis::RedisError => "Redis error", arrow_schema::ArrowError => "Arrow error", - arrow_schema_iceberg::ArrowError => "Arrow error", + arrow_schema::ArrowError => "Arrow error", google_cloud_pubsub::client::google_cloud_auth::error::Error => "Google Cloud error", rumqttc::tokio_rustls::rustls::Error => "TLS error", rumqttc::v5::ClientError => "MQTT error", diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index a46597456678e..1bee9622e7f21 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -24,7 +24,7 @@ use std::ops::Deref; use std::sync::Arc; use anyhow::{anyhow, Context}; -use arrow_schema_iceberg::{ +use arrow_schema::{ DataType as ArrowDataType, Field as ArrowField, Fields, Schema as ArrowSchema, SchemaRef, }; use async_trait::async_trait; @@ -1162,7 +1162,7 @@ mod test { #[test] fn test_compatible_arrow_schema() { - use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField}; + use arrow_schema::{DataType as ArrowDataType, Field as ArrowField}; use super::*; let risingwave_schema = Schema::new(vec![ diff --git a/src/connector/src/sink/iceberg/prometheus/monitored_base_file_writer.rs b/src/connector/src/sink/iceberg/prometheus/monitored_base_file_writer.rs index 3c205fd3b104e..b7c04289b7590 100644 --- a/src/connector/src/sink/iceberg/prometheus/monitored_base_file_writer.rs +++ b/src/connector/src/sink/iceberg/prometheus/monitored_base_file_writer.rs @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use arrow_array_iceberg::RecordBatch; -use arrow_schema_iceberg::SchemaRef; +use arrow_array::RecordBatch; +use arrow_schema::SchemaRef; use icelake::io_v2::{ BaseFileWriter, BaseFileWriterBuilder, BaseFileWriterMetrics, CurrentFileStatus, FileWriter, FileWriterBuilder, diff --git a/src/connector/src/sink/iceberg/prometheus/monitored_partition_writer.rs b/src/connector/src/sink/iceberg/prometheus/monitored_partition_writer.rs index d85d712c41ac3..c2134d1974551 100644 --- a/src/connector/src/sink/iceberg/prometheus/monitored_partition_writer.rs +++ b/src/connector/src/sink/iceberg/prometheus/monitored_partition_writer.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use arrow_schema_iceberg::SchemaRef; +use arrow_schema::SchemaRef; use icelake::io_v2::{ FanoutPartitionedWriter, FanoutPartitionedWriterBuilder, FanoutPartitionedWriterMetrics, IcebergWriter, IcebergWriterBuilder, @@ -75,7 +75,7 @@ impl MonitoredFanoutPartitionedWriter { impl IcebergWriter for MonitoredFanoutPartitionedWriter { type R = as IcebergWriter>::R; - async fn write(&mut self, batch: arrow_array_iceberg::RecordBatch) -> Result<()> { + async fn write(&mut self, batch: arrow_array::RecordBatch) -> Result<()> { self.inner.write(batch).await?; self.update_metrics()?; Ok(()) diff --git a/src/connector/src/sink/iceberg/prometheus/monitored_position_delete_writer.rs b/src/connector/src/sink/iceberg/prometheus/monitored_position_delete_writer.rs index 43314c3fae384..e4fdc75f30e8b 100644 --- a/src/connector/src/sink/iceberg/prometheus/monitored_position_delete_writer.rs +++ b/src/connector/src/sink/iceberg/prometheus/monitored_position_delete_writer.rs @@ -43,7 +43,7 @@ impl IcebergWriterBuilder { type R = MonitoredPositionDeleteWriter; - async fn build(self, schema: &arrow_schema_iceberg::SchemaRef) -> Result { + async fn build(self, schema: &arrow_schema::SchemaRef) -> Result { let writer = self.inner.build(schema).await?; Ok(MonitoredPositionDeleteWriter { writer, diff --git a/src/connector/src/sink/iceberg/prometheus/monitored_write_writer.rs b/src/connector/src/sink/iceberg/prometheus/monitored_write_writer.rs index dc44434e5d9c2..be9f35aae2a51 100644 --- a/src/connector/src/sink/iceberg/prometheus/monitored_write_writer.rs +++ b/src/connector/src/sink/iceberg/prometheus/monitored_write_writer.rs @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use arrow_array_iceberg::RecordBatch; -use arrow_schema_iceberg::SchemaRef; +use arrow_array::RecordBatch; +use arrow_schema::SchemaRef; use async_trait::async_trait; use icelake::io_v2::{IcebergWriter, IcebergWriterBuilder}; use icelake::Result; diff --git a/src/connector/src/source/pulsar/source/reader.rs b/src/connector/src/source/pulsar/source/reader.rs index 212c459388b25..33a3930198e15 100644 --- a/src/connector/src/source/pulsar/source/reader.rs +++ b/src/connector/src/source/pulsar/source/reader.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use anyhow::Context; -use arrow_array_iceberg::{Int32Array, Int64Array, RecordBatch}; +use arrow_array::{Int32Array, Int64Array, RecordBatch}; use async_trait::async_trait; use futures::StreamExt; use futures_async_stream::try_stream; diff --git a/src/expr/impl/Cargo.toml b/src/expr/impl/Cargo.toml index 0f69c91e34162..6dfbcc5905750 100644 --- a/src/expr/impl/Cargo.toml +++ b/src/expr/impl/Cargo.toml @@ -28,7 +28,6 @@ anyhow = "1" arrow-array = { workspace = true } arrow-flight = { workspace = true, optional = true } arrow-schema = { workspace = true } -arrow-schema-iceberg = { workspace = true } arrow-udf-flight = { workspace = true, optional = true } arrow-udf-js = { workspace = true, optional = true } arrow-udf-js-deno = { workspace = true, optional = true } diff --git a/src/expr/impl/src/scalar/external/iceberg.rs b/src/expr/impl/src/scalar/external/iceberg.rs index 902545d01c25d..8d242d3cf3852 100644 --- a/src/expr/impl/src/scalar/external/iceberg.rs +++ b/src/expr/impl/src/scalar/external/iceberg.rs @@ -34,8 +34,8 @@ use thiserror_ext::AsReport; pub struct IcebergTransform { child: BoxedExpression, transform: BoxedTransformFunction, - input_arrow_type: arrow_schema_iceberg::DataType, - output_arrow_field: arrow_schema_iceberg::Field, + input_arrow_type: arrow_schema::DataType, + output_arrow_field: arrow_schema::Field, return_type: DataType, } diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index a59ce2e55f678..feb4aa596c55f 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -18,7 +18,6 @@ normal = ["workspace-hack"] anyhow = "1" arc-swap = "1" arrow-schema = { workspace = true } -arrow-schema-iceberg = { workspace = true } async-recursion = "1.1.0" async-trait = "0.1" auto_enums = { workspace = true } diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index a9e8dde489b21..46d7a71a2b1c8 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -17,7 +17,7 @@ use std::rc::Rc; use std::sync::{Arc, LazyLock}; use anyhow::Context; -use arrow_schema_iceberg::DataType as ArrowDataType; +use arrow_schema::DataType as ArrowDataType; use either::Either; use itertools::Itertools; use maplit::{convert_args, hashmap}; diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 89c5afca08a4c..1f03338bb01b2 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -1222,7 +1222,7 @@ pub async fn extract_iceberg_columns( if let ConnectorProperties::Iceberg(properties) = props { let iceberg_config: IcebergConfig = properties.to_iceberg_config(); let table = iceberg_config.load_table().await?; - let iceberg_schema: arrow_schema_iceberg::Schema = table + let iceberg_schema: arrow_schema::Schema = table .current_table_metadata() .current_schema()? .clone() @@ -1278,7 +1278,7 @@ pub async fn check_iceberg_source( let table = iceberg_config.load_table().await?; - let iceberg_schema: arrow_schema_iceberg::Schema = table + let iceberg_schema: arrow_schema::Schema = table .current_table_metadata() .current_schema()? .clone() @@ -1299,7 +1299,7 @@ pub async fn check_iceberg_source( .filter(|f1| schema.fields.iter().any(|f2| f1.name() == &f2.name)) .cloned() .collect::>(); - let new_iceberg_schema = arrow_schema_iceberg::Schema::new(new_iceberg_field); + let new_iceberg_schema = arrow_schema::Schema::new(new_iceberg_field); risingwave_connector::sink::iceberg::try_matches_arrow_schema(&schema, &new_iceberg_schema)?;