diff --git a/Cargo.lock b/Cargo.lock index fcce5f2c9db4b..bd98f519bc586 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10104,9 +10104,6 @@ name = "risingwave_batch" version = "2.1.0-alpha" dependencies = [ "anyhow", - "arrow-array 50.0.0", - "arrow-array 52.0.0", - "arrow-schema 50.0.0", "assert_matches", "async-recursion", "async-trait", @@ -10596,12 +10593,6 @@ version = "2.1.0-alpha" dependencies = [ "anyhow", "apache-avro 0.16.0", - "arrow-array 50.0.0", - "arrow-array 52.0.0", - "arrow-row 50.0.0", - "arrow-schema 50.0.0", - "arrow-schema 52.0.0", - "arrow-select 50.0.0", "assert_matches", "async-compression", "async-nats", @@ -10862,8 +10853,6 @@ name = "risingwave_expr" version = "2.1.0-alpha" dependencies = [ "anyhow", - "arrow-array 50.0.0", - "arrow-schema 50.0.0", "async-trait", "auto_impl", "await-tree", @@ -10904,10 +10893,7 @@ version = "2.1.0-alpha" dependencies = [ "aho-corasick", "anyhow", - "arrow-array 50.0.0", "arrow-flight", - "arrow-schema 50.0.0", - "arrow-schema 52.0.0", "arrow-udf-flight", "arrow-udf-js", "arrow-udf-python", @@ -10972,8 +10958,6 @@ version = "2.1.0-alpha" dependencies = [ "anyhow", "arc-swap", - "arrow-schema 50.0.0", - "arrow-schema 52.0.0", "assert_matches", "async-recursion", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index c34b414ad0a7b..01123152d56da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -140,32 +140,18 @@ prost-build = { version = "0.13" } icelake = { git = "https://github.com/risingwavelabs/icelake.git", rev = "3f4724158acee37a4785f56670a1427993a58739", features = [ "prometheus", ] } -arrow-array-iceberg = { package = "arrow-array", version = "52" } -arrow-schema-iceberg = { package = "arrow-schema", version = "52" } -arrow-buffer-iceberg = { package = "arrow-buffer", version = "52" } -arrow-cast-iceberg = { package = "arrow-cast", version = "52" } + # branch dev iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "84bf51c9d0d5886e4ee306ca4f383f029e1767a4" } iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "84bf51c9d0d5886e4ee306ca4f383f029e1767a4" } iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "84bf51c9d0d5886e4ee306ca4f383f029e1767a4" } opendal = "0.47" -arrow-array = "50" -arrow-arith = "50" -arrow-cast = "50" -arrow-schema = "50" -arrow-buffer = "50" +# used only by arrow-udf-flight arrow-flight = "50" -arrow-select = "50" -arrow-ord = "50" -arrow-row = "50" arrow-udf-js = "0.3.1" arrow-udf-wasm = { version = "0.2.2", features = ["build"] } arrow-udf-python = "0.2" arrow-udf-flight = "0.1" -arrow-array-deltalake = { package = "arrow-array", version = "48.0.1" } -arrow-buffer-deltalake = { package = "arrow-buffer", version = "48.0.1" } -arrow-cast-deltalake = { package = "arrow-cast", version = "48.0.1" } -arrow-schema-deltalake = { package = "arrow-schema", version = "48.0.1" } clap = { version = "4", features = ["cargo", "derive", "env"] } # Use a forked version which removes the dependencies on dynamo db to reduce # compile time and binary size. diff --git a/src/batch/Cargo.toml b/src/batch/Cargo.toml index 403eb864229d3..3970f4eab4324 100644 --- a/src/batch/Cargo.toml +++ b/src/batch/Cargo.toml @@ -15,9 +15,6 @@ normal = ["workspace-hack"] [dependencies] anyhow = "1" -arrow-array = { workspace = true } -arrow-array-iceberg = { workspace = true } -arrow-schema = { workspace = true } assert_matches = "1" async-recursion = "1" async-trait = "0.1" diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml index cab8ef2129f0f..58a4b9870dfaf 100644 --- a/src/common/Cargo.toml +++ b/src/common/Cargo.toml @@ -17,18 +17,18 @@ normal = ["workspace-hack"] ahash = "0.8" anyhow = "1" arc-swap = "1" -arrow-array = { workspace = true } -arrow-array-deltalake = { workspace = true } -arrow-array-iceberg = { workspace = true } -arrow-buffer = { workspace = true } -arrow-buffer-deltalake = { workspace = true } -arrow-buffer-iceberg = { workspace = true } -arrow-cast = { workspace = true } -arrow-cast-deltalake = { workspace = true } -arrow-cast-iceberg = { workspace = true } -arrow-schema = { workspace = true } -arrow-schema-deltalake = { workspace = true } -arrow-schema-iceberg = { workspace = true } +arrow-48-array = { package = "arrow-array", version = "48" } +arrow-48-buffer = { package = "arrow-buffer", version = "48" } +arrow-48-cast = { package = "arrow-cast", version = "48" } +arrow-48-schema = { package = "arrow-schema", version = "48" } +arrow-50-array = { package = "arrow-array", version = "50" } +arrow-50-buffer = { package = "arrow-buffer", version = "50" } +arrow-50-cast = { package = "arrow-cast", version = "50" } +arrow-50-schema = { package = "arrow-schema", version = "50" } +arrow-52-array = { package = "arrow-array", version = "52" } +arrow-52-buffer = { package = "arrow-buffer", version = "52" } +arrow-52-cast = { package = "arrow-cast", version = "52" } +arrow-52-schema = { package = "arrow-schema", version = "52" } async-trait = "0.1" auto_enums = { workspace = true } auto_impl = "1" diff --git a/src/common/src/array/arrow/arrow_48.rs b/src/common/src/array/arrow/arrow_48.rs new file mode 100644 index 0000000000000..6f7333b0e70c1 --- /dev/null +++ b/src/common/src/array/arrow/arrow_48.rs @@ -0,0 +1,23 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[allow(clippy::duplicate_mod)] +#[path = "./arrow_impl.rs"] +mod arrow_impl; +type ArrowIntervalType = i128; +pub use arrow_impl::{FromArrow, ToArrow}; +pub use { + arrow_48_array as arrow_array, arrow_48_buffer as arrow_buffer, arrow_48_cast as arrow_cast, + arrow_48_schema as arrow_schema, +}; diff --git a/src/common/src/array/arrow/arrow_50.rs b/src/common/src/array/arrow/arrow_50.rs new file mode 100644 index 0000000000000..17b5328806ce6 --- /dev/null +++ b/src/common/src/array/arrow/arrow_50.rs @@ -0,0 +1,23 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[allow(clippy::duplicate_mod)] +#[path = "./arrow_impl.rs"] +mod arrow_impl; +type ArrowIntervalType = i128; +pub use arrow_impl::{FromArrow, ToArrow}; +pub use { + arrow_50_array as arrow_array, arrow_50_buffer as arrow_buffer, arrow_50_cast as arrow_cast, + arrow_50_schema as arrow_schema, +}; diff --git a/src/common/src/array/arrow/arrow_52.rs b/src/common/src/array/arrow/arrow_52.rs new file mode 100644 index 0000000000000..1590366e1281b --- /dev/null +++ b/src/common/src/array/arrow/arrow_52.rs @@ -0,0 +1,48 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[allow(clippy::duplicate_mod)] +#[path = "./arrow_impl.rs"] +mod arrow_impl; +type ArrowIntervalType = arrow_buffer::IntervalMonthDayNano; +pub use arrow_impl::{FromArrow, ToArrow}; +pub use { + arrow_52_array as arrow_array, arrow_52_buffer as arrow_buffer, arrow_52_cast as arrow_cast, + arrow_52_schema as arrow_schema, +}; + +use crate::array::Interval; + +impl super::ArrowIntervalTypeTrait for ArrowIntervalType { + fn to_interval(self) -> Interval { + // XXX: the arrow-rs decoding is incorrect + // let (months, days, ns) = arrow_array::types::IntervalMonthDayNanoType::to_parts(value); + Interval::from_month_day_usec(self.months, self.days, self.nanoseconds / 1000) + } + + fn from_interval(value: Interval) -> Self { + // XXX: the arrow-rs encoding is incorrect + // arrow_array::types::IntervalMonthDayNanoType::make_value( + // self.months(), + // self.days(), + // // TODO: this may overflow and we need `try_into` + // self.usecs() * 1000, + // ) + Self { + months: value.months(), + days: value.days(), + nanoseconds: value.usecs() * 1000, + } + } +} diff --git a/src/common/src/array/arrow/arrow_deltalake.rs b/src/common/src/array/arrow/arrow_deltalake.rs index 7338532e082d6..7e3684233745a 100644 --- a/src/common/src/array/arrow/arrow_deltalake.rs +++ b/src/common/src/array/arrow/arrow_deltalake.rs @@ -22,17 +22,11 @@ use std::sync::Arc; use arrow_array::ArrayRef; use num_traits::abs; -use { - arrow_array_deltalake as arrow_array, arrow_buffer_deltalake as arrow_buffer, - arrow_cast_deltalake as arrow_cast, arrow_schema_deltalake as arrow_schema, -}; -type ArrowIntervalType = i128; -use self::arrow_impl::ToArrow; +pub use super::arrow_48::{ + arrow_array, arrow_buffer, arrow_cast, arrow_schema, FromArrow, ToArrow, +}; use crate::array::{Array, ArrayError, DataChunk, Decimal, DecimalArray}; -#[expect(clippy::duplicate_mod)] -#[path = "./arrow_impl.rs"] -mod arrow_impl; pub struct DeltaLakeConvert; @@ -103,8 +97,8 @@ mod test { use arrow_array::cast::AsArray; use arrow_array::ArrayRef; use arrow_schema::Field; - use {arrow_array_deltalake as arrow_array, arrow_schema_deltalake as arrow_schema}; + use super::*; use crate::array::arrow::arrow_deltalake::DeltaLakeConvert; use crate::array::{ArrayImpl, Decimal, DecimalArray, ListArray, ListValue}; use crate::bitmap::Bitmap; diff --git a/src/common/src/array/arrow/arrow_iceberg.rs b/src/common/src/array/arrow/arrow_iceberg.rs index 80c0a3dab1667..845dcc8a0f6f9 100644 --- a/src/common/src/array/arrow/arrow_iceberg.rs +++ b/src/common/src/array/arrow/arrow_iceberg.rs @@ -17,46 +17,14 @@ use std::collections::HashMap; use std::ops::{Div, Mul}; use std::sync::Arc; -use arrow_array_iceberg::{self as arrow_array, ArrayRef}; -use arrow_buffer_iceberg::IntervalMonthDayNano as ArrowIntervalType; +use arrow_array::ArrayRef; use num_traits::abs; -use { - arrow_buffer_iceberg as arrow_buffer, arrow_cast_iceberg as arrow_cast, - arrow_schema_iceberg as arrow_schema, -}; +pub use super::arrow_52::{ + arrow_array, arrow_buffer, arrow_cast, arrow_schema, FromArrow, ToArrow, +}; use crate::array::{Array, ArrayError, ArrayImpl, DataChunk, DataType, DecimalArray}; -use crate::types::{Interval, StructType}; - -impl ArrowIntervalTypeTrait for ArrowIntervalType { - fn to_interval(self) -> Interval { - // XXX: the arrow-rs decoding is incorrect - // let (months, days, ns) = arrow_array::types::IntervalMonthDayNanoType::to_parts(value); - Interval::from_month_day_usec(self.months, self.days, self.nanoseconds / 1000) - } - - fn from_interval(value: Interval) -> Self { - // XXX: the arrow-rs encoding is incorrect - // arrow_array::types::IntervalMonthDayNanoType::make_value( - // self.months(), - // self.days(), - // // TODO: this may overflow and we need `try_into` - // self.usecs() * 1000, - // ) - Self { - months: value.months(), - days: value.days(), - nanoseconds: value.usecs() * 1000, - } - } -} - -#[path = "./arrow_impl.rs"] -mod arrow_impl; - -use arrow_impl::{FromArrow, ToArrow}; - -use crate::array::arrow::ArrowIntervalTypeTrait; +use crate::types::StructType; pub struct IcebergArrowConvert; @@ -261,11 +229,9 @@ impl ToArrow for IcebergCreateTableArrowConvert { mod test { use std::sync::Arc; - use arrow_array_iceberg::{ArrayRef, Decimal128Array}; - use arrow_schema_iceberg::DataType; - - use super::arrow_impl::ToArrow; - use super::IcebergArrowConvert; + use super::arrow_array::{ArrayRef, Decimal128Array}; + use super::arrow_schema::DataType; + use super::*; use crate::array::{Decimal, DecimalArray}; #[test] diff --git a/src/common/src/array/arrow/arrow_impl.rs b/src/common/src/array/arrow/arrow_impl.rs index 8fa3e2abb6b5f..3095461a2ebc5 100644 --- a/src/common/src/array/arrow/arrow_impl.rs +++ b/src/common/src/array/arrow/arrow_impl.rs @@ -42,8 +42,8 @@ use std::fmt::Write; +use arrow_array::array; use arrow_array::cast::AsArray; -use arrow_array_iceberg::array; use arrow_buffer::OffsetBuffer; use chrono::{DateTime, NaiveDateTime, NaiveTime}; use itertools::Itertools; diff --git a/src/common/src/array/arrow/arrow_udf.rs b/src/common/src/array/arrow/arrow_udf.rs index a5296ca21cab8..0a2e85785c772 100644 --- a/src/common/src/array/arrow/arrow_udf.rs +++ b/src/common/src/array/arrow/arrow_udf.rs @@ -20,16 +20,11 @@ use std::sync::Arc; -pub use arrow_impl::{FromArrow, ToArrow}; -use {arrow_array, arrow_buffer, arrow_cast, arrow_schema}; -type ArrowIntervalType = i128; - +pub use super::arrow_50::{ + arrow_array, arrow_buffer, arrow_cast, arrow_schema, FromArrow, ToArrow, +}; use crate::array::{ArrayError, ArrayImpl, DataType, DecimalArray, JsonbArray}; -#[expect(clippy::duplicate_mod)] -#[path = "./arrow_impl.rs"] -mod arrow_impl; - /// Arrow conversion for UDF. #[derive(Default, Debug)] pub struct UdfArrowConvert { diff --git a/src/common/src/array/arrow/mod.rs b/src/common/src/array/arrow/mod.rs index d519d62f9935a..ade14a5948d2f 100644 --- a/src/common/src/array/arrow/mod.rs +++ b/src/common/src/array/arrow/mod.rs @@ -12,16 +12,44 @@ // See the License for the specific language governing permissions and // limitations under the License. +// These mods imports arrow_impl.rs to provide FromArrow, ToArrow traits for corresponding arrow versions, +// and the default From/To implementations. +mod arrow_48; +mod arrow_50; +mod arrow_52; +// These mods import mods above and may override some methods. mod arrow_deltalake; mod arrow_iceberg; mod arrow_udf; pub use arrow_deltalake::DeltaLakeConvert; pub use arrow_iceberg::{IcebergArrowConvert, IcebergCreateTableArrowConvert}; -pub use arrow_udf::{FromArrow, ToArrow, UdfArrowConvert}; - +pub use arrow_udf::UdfArrowConvert; +pub use reexport::*; +/// For other RisingWave crates, they can directly use arrow re-exported here, without adding +/// `arrow` dependencies in their `Cargo.toml`. And they don't need to care about the version. +mod reexport { + pub use super::arrow_deltalake::{ + arrow_array as arrow_array_deltalake, arrow_buffer as arrow_buffer_deltalake, + arrow_cast as arrow_cast_deltalake, arrow_schema as arrow_schema_deltalake, + FromArrow as DeltaLakeFromArrow, ToArrow as DeltaLakeToArrow, + }; + pub use super::arrow_iceberg::{ + arrow_array as arrow_array_iceberg, arrow_buffer as arrow_buffer_iceberg, + arrow_cast as arrow_cast_iceberg, arrow_schema as arrow_schema_iceberg, + FromArrow as IcebergFromArrow, ToArrow as IcebergToArrow, + }; + pub use super::arrow_udf::{ + arrow_array as arrow_array_udf, arrow_buffer as arrow_buffer_udf, + arrow_cast as arrow_cast_udf, arrow_schema as arrow_schema_udf, FromArrow as UdfFromArrow, + ToArrow as UdfToArrow, + }; +} use crate::types::Interval; +/// Arrow 52 changed the interval type from `i128` to `arrow_buffer::IntervalMonthDayNano`, so +/// we introduced this trait to customize the conversion in `arrow_impl.rs`. +/// We may delete this after all arrow versions are upgraded. trait ArrowIntervalTypeTrait { fn to_interval(self) -> Interval; fn from_interval(value: Interval) -> Self; diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 2535847c98fe4..14eed053b25cf 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -16,12 +16,6 @@ normal = ["workspace-hack"] [dependencies] anyhow = "1" apache-avro = { workspace = true } -arrow-array = { workspace = true } -arrow-array-iceberg = { workspace = true } -arrow-row = { workspace = true } -arrow-schema = { workspace = true } -arrow-schema-iceberg = { workspace = true } -arrow-select = { workspace = true } assert_matches = "1" async-compression = { version = "0.4.5", features = ["gzip", "tokio"] } async-nats = "0.35" diff --git a/src/connector/src/error.rs b/src/connector/src/error.rs index 9dbd17ae58a1d..e8b7a134b60f1 100644 --- a/src/connector/src/error.rs +++ b/src/connector/src/error.rs @@ -64,8 +64,7 @@ def_anyhow_newtype! { icelake::Error => "Iceberg error", iceberg::Error => "IcebergV2 error", redis::RedisError => "Redis error", - arrow_schema::ArrowError => "Arrow error", - arrow_schema_iceberg::ArrowError => "Arrow error", + risingwave_common::array::arrow::arrow_schema_iceberg::ArrowError => "Arrow error", google_cloud_pubsub::client::google_cloud_auth::error::Error => "Google Cloud error", rumqttc::tokio_rustls::rustls::Error => "TLS error", rumqttc::v5::ClientError => "MQTT error", diff --git a/src/connector/src/parser/parquet_parser.rs b/src/connector/src/parser/parquet_parser.rs index 4f1e720bc47fb..db2ace3d2b6dd 100644 --- a/src/connector/src/parser/parquet_parser.rs +++ b/src/connector/src/parser/parquet_parser.rs @@ -14,10 +14,10 @@ use std::future::IntoFuture; use std::sync::Arc; -use arrow_array_iceberg::RecordBatch; use deltalake::parquet::arrow::async_reader::AsyncFileReader; use futures_async_stream::try_stream; -use risingwave_common::array::arrow::IcebergArrowConvert; +use risingwave_common::array::arrow::arrow_array_iceberg::RecordBatch; +use risingwave_common::array::arrow::{arrow_schema_iceberg, IcebergArrowConvert}; use risingwave_common::array::{ArrayBuilderImpl, DataChunk, StreamChunk}; use risingwave_common::bail; use risingwave_common::types::{Datum, ScalarImpl}; diff --git a/src/connector/src/sink/file_sink/opendal_sink.rs b/src/connector/src/sink/file_sink/opendal_sink.rs index 65ec46f494345..d3771d9122d6e 100644 --- a/src/connector/src/sink/file_sink/opendal_sink.rs +++ b/src/connector/src/sink/file_sink/opendal_sink.rs @@ -17,11 +17,11 @@ use std::marker::PhantomData; use std::sync::Arc; use anyhow::anyhow; -use arrow_schema_iceberg::SchemaRef; use async_trait::async_trait; use opendal::{FuturesAsyncWriter, Operator, Writer as OpendalWriter}; use parquet::arrow::AsyncArrowWriter; use parquet::file::properties::WriterProperties; +use risingwave_common::array::arrow::arrow_schema_iceberg::{self, SchemaRef}; use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 9e87694539f0c..6609f63578e0b 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -24,9 +24,6 @@ use std::ops::Deref; use std::sync::Arc; use anyhow::{anyhow, Context}; -use arrow_schema_iceberg::{ - DataType as ArrowDataType, Field as ArrowField, Fields, Schema as ArrowSchema, SchemaRef, -}; use async_trait::async_trait; use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; use iceberg::spec::TableMetadata; @@ -46,6 +43,9 @@ use icelake::transaction::Transaction; use icelake::types::{data_file_from_json, data_file_to_json, Any, DataFile}; use icelake::{Table, TableIdentifier}; use itertools::Itertools; +use risingwave_common::array::arrow::arrow_schema_iceberg::{ + self, DataType as ArrowDataType, Field as ArrowField, Fields, Schema as ArrowSchema, SchemaRef, +}; use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert}; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::bail; diff --git a/src/connector/src/sink/iceberg/prometheus/monitored_base_file_writer.rs b/src/connector/src/sink/iceberg/prometheus/monitored_base_file_writer.rs index c29608f0fd63b..a45eaf5a401fd 100644 --- a/src/connector/src/sink/iceberg/prometheus/monitored_base_file_writer.rs +++ b/src/connector/src/sink/iceberg/prometheus/monitored_base_file_writer.rs @@ -12,13 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use arrow_array_iceberg::RecordBatch; -use arrow_schema_iceberg::SchemaRef; use icelake::io_v2::{ BaseFileWriter, BaseFileWriterBuilder, BaseFileWriterMetrics, CurrentFileStatus, FileWriter, FileWriterBuilder, }; use icelake::Result; +use risingwave_common::array::arrow::arrow_array_iceberg::RecordBatch; +use risingwave_common::array::arrow::arrow_schema_iceberg::SchemaRef; use risingwave_common::metrics::LabelGuardedIntGauge; #[derive(Clone)] diff --git a/src/connector/src/sink/iceberg/prometheus/monitored_partition_writer.rs b/src/connector/src/sink/iceberg/prometheus/monitored_partition_writer.rs index 463b1f3c9dbd4..c5fb3bcc906b5 100644 --- a/src/connector/src/sink/iceberg/prometheus/monitored_partition_writer.rs +++ b/src/connector/src/sink/iceberg/prometheus/monitored_partition_writer.rs @@ -12,12 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use arrow_schema_iceberg::SchemaRef; use icelake::io_v2::{ FanoutPartitionedWriter, FanoutPartitionedWriterBuilder, FanoutPartitionedWriterMetrics, IcebergWriter, IcebergWriterBuilder, }; use icelake::Result; +use risingwave_common::array::arrow::arrow_array_iceberg; +use risingwave_common::array::arrow::arrow_schema_iceberg::SchemaRef; use risingwave_common::metrics::LabelGuardedIntGauge; #[derive(Clone)] diff --git a/src/connector/src/sink/iceberg/prometheus/monitored_position_delete_writer.rs b/src/connector/src/sink/iceberg/prometheus/monitored_position_delete_writer.rs index 8be6eb7018b13..f68a1b6032135 100644 --- a/src/connector/src/sink/iceberg/prometheus/monitored_position_delete_writer.rs +++ b/src/connector/src/sink/iceberg/prometheus/monitored_position_delete_writer.rs @@ -17,6 +17,7 @@ use icelake::io_v2::{ PositionDeleteMetrics, PositionDeleteWriter, PositionDeleteWriterBuilder, }; use icelake::Result; +use risingwave_common::array::arrow::arrow_schema_iceberg; use risingwave_common::metrics::LabelGuardedIntGauge; #[derive(Clone)] diff --git a/src/connector/src/sink/iceberg/prometheus/monitored_write_writer.rs b/src/connector/src/sink/iceberg/prometheus/monitored_write_writer.rs index aebb5939ff143..634e9ac968f89 100644 --- a/src/connector/src/sink/iceberg/prometheus/monitored_write_writer.rs +++ b/src/connector/src/sink/iceberg/prometheus/monitored_write_writer.rs @@ -12,11 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use arrow_array_iceberg::RecordBatch; -use arrow_schema_iceberg::SchemaRef; use async_trait::async_trait; use icelake::io_v2::{IcebergWriter, IcebergWriterBuilder}; use icelake::Result; +use risingwave_common::array::arrow::arrow_array_iceberg::RecordBatch; +use risingwave_common::array::arrow::arrow_schema_iceberg::SchemaRef; use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter}; #[derive(Clone)] diff --git a/src/connector/src/source/pulsar/source/reader.rs b/src/connector/src/source/pulsar/source/reader.rs index 20f6872474e88..e377970e38214 100644 --- a/src/connector/src/source/pulsar/source/reader.rs +++ b/src/connector/src/source/pulsar/source/reader.rs @@ -15,7 +15,6 @@ use std::collections::HashMap; use anyhow::Context; -use arrow_array_iceberg::{Int32Array, Int64Array, RecordBatch}; use async_trait::async_trait; use futures::StreamExt; use futures_async_stream::try_stream; @@ -27,6 +26,7 @@ use itertools::Itertools; use pulsar::consumer::InitialPosition; use pulsar::message::proto::MessageIdData; use pulsar::{Consumer, ConsumerBuilder, ConsumerOptions, Pulsar, SubType, TokioExecutor}; +use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, Int64Array, RecordBatch}; use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::ROWID_PREFIX; diff --git a/src/expr/core/Cargo.toml b/src/expr/core/Cargo.toml index cbff3a5ff2e28..538192523b046 100644 --- a/src/expr/core/Cargo.toml +++ b/src/expr/core/Cargo.toml @@ -17,8 +17,6 @@ normal = ["workspace-hack", "ctor"] [dependencies] anyhow = "1" -arrow-array = { workspace = true } -arrow-schema = { workspace = true } async-trait = "0.1" auto_impl = "1" await-tree = { workspace = true } diff --git a/src/expr/core/src/aggregate/user_defined.rs b/src/expr/core/src/aggregate/user_defined.rs index a3897896a3a67..2f4fdc5f9f9c5 100644 --- a/src/expr/core/src/aggregate/user_defined.rs +++ b/src/expr/core/src/aggregate/user_defined.rs @@ -15,9 +15,9 @@ use std::sync::Arc; use anyhow::Context; -use arrow_array::ArrayRef; -use arrow_schema::{Field, Fields, Schema, SchemaRef}; -use risingwave_common::array::arrow::{FromArrow, ToArrow, UdfArrowConvert}; +use risingwave_common::array::arrow::arrow_array_udf::ArrayRef; +use risingwave_common::array::arrow::arrow_schema_udf::{Field, Fields, Schema, SchemaRef}; +use risingwave_common::array::arrow::{UdfArrowConvert, UdfFromArrow, UdfToArrow}; use risingwave_common::array::Op; use risingwave_common::bitmap::Bitmap; use risingwave_pb::expr::PbUserDefinedFunctionMetadata; @@ -154,7 +154,11 @@ pub fn new_user_defined( Ok(Box::new(UserDefinedAggregateFunction { return_field: arrow_convert.to_arrow_field("", return_type)?, - state_field: Field::new("state", arrow_schema::DataType::Binary, true), + state_field: Field::new( + "state", + risingwave_common::array::arrow::arrow_schema_udf::DataType::Binary, + true, + ), return_type: return_type.clone(), arg_schema, runtime, diff --git a/src/expr/core/src/expr/expr_udf.rs b/src/expr/core/src/expr/expr_udf.rs index c5bce6a2944df..6ae27dabb2458 100644 --- a/src/expr/core/src/expr/expr_udf.rs +++ b/src/expr/core/src/expr/expr_udf.rs @@ -16,10 +16,10 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, LazyLock}; use anyhow::Context; -use arrow_schema::{Fields, Schema, SchemaRef}; use await_tree::InstrumentAwait; use prometheus::{exponential_buckets, Registry}; -use risingwave_common::array::arrow::{FromArrow, ToArrow, UdfArrowConvert}; +use risingwave_common::array::arrow::arrow_schema_udf::{Fields, Schema, SchemaRef}; +use risingwave_common::array::arrow::{UdfArrowConvert, UdfFromArrow, UdfToArrow}; use risingwave_common::array::{Array, ArrayRef, DataChunk}; use risingwave_common::metrics::*; use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY; diff --git a/src/expr/core/src/sig/udf.rs b/src/expr/core/src/sig/udf.rs index 9a253a78051e9..047879b9192b8 100644 --- a/src/expr/core/src/sig/udf.rs +++ b/src/expr/core/src/sig/udf.rs @@ -19,9 +19,9 @@ //! See expr/impl/src/udf for the implementations. use anyhow::{bail, Context, Result}; -use arrow_array::{ArrayRef, BooleanArray, RecordBatch}; use enum_as_inner::EnumAsInner; use futures::stream::BoxStream; +use risingwave_common::array::arrow::arrow_array_udf::{ArrayRef, BooleanArray, RecordBatch}; use risingwave_common::types::DataType; /// The global registry of UDF implementations. diff --git a/src/expr/core/src/table_function/user_defined.rs b/src/expr/core/src/table_function/user_defined.rs index 919c258299cde..b490e9b023af1 100644 --- a/src/expr/core/src/table_function/user_defined.rs +++ b/src/expr/core/src/table_function/user_defined.rs @@ -15,8 +15,8 @@ use std::sync::Arc; use anyhow::Context; -use arrow_schema::{Fields, Schema, SchemaRef}; -use risingwave_common::array::arrow::{FromArrow, ToArrow, UdfArrowConvert}; +use risingwave_common::array::arrow::arrow_schema_udf::{Fields, Schema, SchemaRef}; +use risingwave_common::array::arrow::{UdfArrowConvert, UdfFromArrow, UdfToArrow}; use risingwave_common::array::I32Array; use risingwave_common::bail; diff --git a/src/expr/impl/Cargo.toml b/src/expr/impl/Cargo.toml index c0e506889ef77..f65556f8a4711 100644 --- a/src/expr/impl/Cargo.toml +++ b/src/expr/impl/Cargo.toml @@ -24,10 +24,7 @@ wasm-udf = ["arrow-udf-wasm", "zstd"] [dependencies] aho-corasick = "1" anyhow = "1" -arrow-array = { workspace = true } arrow-flight = { workspace = true, optional = true } -arrow-schema = { workspace = true } -arrow-schema-iceberg = { workspace = true } arrow-udf-flight = { workspace = true, optional = true } arrow-udf-js = { workspace = true, optional = true } arrow-udf-python = { workspace = true, optional = true } diff --git a/src/expr/impl/src/scalar/external/iceberg.rs b/src/expr/impl/src/scalar/external/iceberg.rs index 5fbc9b003305a..c6881c5228e43 100644 --- a/src/expr/impl/src/scalar/external/iceberg.rs +++ b/src/expr/impl/src/scalar/external/iceberg.rs @@ -22,7 +22,7 @@ use anyhow::anyhow; use icelake::types::{ create_transform_function, Any as IcelakeDataType, BoxedTransformFunction, Transform, }; -use risingwave_common::array::arrow::IcebergArrowConvert; +use risingwave_common::array::arrow::{arrow_schema_iceberg, IcebergArrowConvert}; use risingwave_common::array::{ArrayRef, DataChunk}; use risingwave_common::ensure; use risingwave_common::row::OwnedRow; diff --git a/src/expr/impl/src/udf/external.rs b/src/expr/impl/src/udf/external.rs index 0d6ba0e409386..f3055954c3a7e 100644 --- a/src/expr/impl/src/udf/external.rs +++ b/src/expr/impl/src/udf/external.rs @@ -18,11 +18,11 @@ use std::sync::{Arc, LazyLock, Weak}; use std::time::Duration; use anyhow::bail; -use arrow_schema::Fields; use arrow_udf_flight::Client; use futures_util::{StreamExt, TryStreamExt}; use ginepro::{LoadBalancedChannel, ResolutionStrategy}; -use risingwave_common::array::arrow::{ToArrow, UdfArrowConvert}; +use risingwave_common::array::arrow::arrow_schema_udf::{self, Fields}; +use risingwave_common::array::arrow::{UdfArrowConvert, UdfToArrow}; use risingwave_common::util::addr::HostAddr; use thiserror_ext::AsReport; use tokio::runtime::Runtime; @@ -45,15 +45,15 @@ static EXTERNAL: UdfImplDescriptor = UdfImplDescriptor { }; // A helper function to create a unnamed field from data type. let to_field = |data_type| convert.to_arrow_field("", data_type); - let args = arrow_schema::Schema::new( + let args = arrow_schema_udf::Schema::new( opts.arg_types .iter() .map(to_field) .try_collect::()?, ); - let returns = arrow_schema::Schema::new(if opts.kind.is_table() { + let returns = arrow_schema_udf::Schema::new(if opts.kind.is_table() { vec![ - arrow_schema::Field::new("row", arrow_schema::DataType::Int32, true), + arrow_schema_udf::Field::new("row", arrow_schema_udf::DataType::Int32, true), to_field(opts.return_type)?, ] } else { @@ -285,7 +285,7 @@ fn is_tonic_error(err: &arrow_udf_flight::Error) -> bool { } /// Check if two list of data types match, ignoring field names. -fn data_types_match(a: &arrow_schema::Schema, b: &arrow_schema::Schema) -> bool { +fn data_types_match(a: &arrow_schema_udf::Schema, b: &arrow_schema_udf::Schema) -> bool { if a.fields().len() != b.fields().len() { return false; } diff --git a/src/expr/impl/src/udf/mod.rs b/src/expr/impl/src/udf/mod.rs index 599fe2cb5198f..1977b937b1b4f 100644 --- a/src/expr/impl/src/udf/mod.rs +++ b/src/expr/impl/src/udf/mod.rs @@ -16,8 +16,8 @@ // common imports for submodules use anyhow::{Context as _, Result}; -use arrow_array::{ArrayRef, BooleanArray, RecordBatch}; use futures_util::stream::BoxStream; +use risingwave_common::array::arrow::arrow_array_udf::{ArrayRef, BooleanArray, RecordBatch}; use risingwave_expr::sig::{ CreateFunctionOptions, CreateFunctionOutput, UdfImpl, UdfImplDescriptor, UDF_IMPLS, }; diff --git a/src/expr/impl/src/udf/python.rs b/src/expr/impl/src/udf/python.rs index 9ca4b2a5b2a24..b60faced6f41c 100644 --- a/src/expr/impl/src/udf/python.rs +++ b/src/expr/impl/src/udf/python.rs @@ -12,10 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use arrow_schema::{DataType, Field}; +use arrow_schema_udf::{DataType, Field}; use arrow_udf_python::{CallMode, Runtime}; use futures_util::StreamExt; -use risingwave_common::array::arrow::{ToArrow, UdfArrowConvert}; +use risingwave_common::array::arrow::{arrow_schema_udf, UdfArrowConvert, UdfToArrow}; use super::*; diff --git a/src/expr/impl/src/udf/quickjs.rs b/src/expr/impl/src/udf/quickjs.rs index 7faa4dec8ae9f..fb13f3f374b84 100644 --- a/src/expr/impl/src/udf/quickjs.rs +++ b/src/expr/impl/src/udf/quickjs.rs @@ -12,10 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use arrow_schema::{DataType, Field}; use arrow_udf_js::{CallMode, Runtime}; use futures_util::StreamExt; -use risingwave_common::array::arrow::{ToArrow, UdfArrowConvert}; +use risingwave_common::array::arrow::arrow_schema_udf::{DataType, Field}; +use risingwave_common::array::arrow::{UdfArrowConvert, UdfToArrow}; use super::*; diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 3a95eab660b09..8116dc3369ace 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -17,8 +17,6 @@ normal = ["workspace-hack"] [dependencies] anyhow = "1" arc-swap = "1" -arrow-schema = { workspace = true } -arrow-schema-iceberg = { workspace = true } async-recursion = "1.1.0" async-trait = "0.1" auto_enums = { workspace = true } diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 9f4f2f63975f1..f87fced6b02a2 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -16,11 +16,11 @@ use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::sync::{Arc, LazyLock}; use anyhow::Context; -use arrow_schema_iceberg::DataType as ArrowDataType; use either::Either; use itertools::Itertools; use maplit::{convert_args, hashmap}; use pgwire::pg_response::{PgResponse, StatementType}; +use risingwave_common::array::arrow::arrow_schema_iceberg::DataType as ArrowDataType; use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::catalog::{ ColumnCatalog, ConnectionId, DatabaseId, Schema, SchemaId, TableId, UserId, diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index f1535fa769b28..71a3b67cd8dd6 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -21,7 +21,7 @@ use either::Either; use itertools::Itertools; use maplit::{convert_args, hashmap}; use pgwire::pg_response::{PgResponse, StatementType}; -use risingwave_common::array::arrow::IcebergArrowConvert; +use risingwave_common::array::arrow::{arrow_schema_iceberg, IcebergArrowConvert}; use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{ debug_assert_column_ids_distinct, ColumnCatalog, ColumnDesc, ColumnId, Schema, TableId,