Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): support deltalake sink with rust sdk #13600

Merged
merged 35 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
e6bc0ee
add delta lake sink
xxhZs Nov 21, 2023
013eede
fix
xxhZs Nov 21, 2023
f0047ba
Merge branch 'main' into 13152-reimplement-delta-lake-sink-with-rust-sdk
xxhZs Nov 23, 2023
2ff021f
fmt
xxhZs Nov 23, 2023
7b85ea2
save
xxhZs Nov 23, 2023
6fbbac3
fix cargo lock
xxhZs Nov 24, 2023
c6a709c
fmt
xxhZs Nov 28, 2023
1ddf1f9
fix
xxhZs Nov 28, 2023
cdecda5
Merge branch 'main' into 13152-reimplement-delta-lake-sink-with-rust-sdk
xxhZs Nov 28, 2023
f521372
Merge branch 'main' into 13152-reimplement-delta-lake-sink-with-rust-sdk
xxhZs Nov 28, 2023
c890167
use separate arrow version for deltalake
wenym1 Dec 1, 2023
2a35a5e
Merge branch 'main' into 13152-reimplement-delta-lake-sink-with-rust-sdk
wenym1 Dec 1, 2023
0aa6785
fix timestamptz
xxhZs Dec 1, 2023
9b7b543
use mod path to avoid macro
wenym1 Dec 1, 2023
e460901
add license and comment
wenym1 Dec 1, 2023
13ff6ed
Merge branch 'main' into 13152-reimplement-delta-lake-sink-with-rust-sdk
wenym1 Dec 4, 2023
c187f72
update test
wenym1 Dec 5, 2023
2217f61
add comment
wenym1 Dec 5, 2023
4c503fa
Merge branch 'main' into 13152-reimplement-delta-lake-sink-with-rust-sdk
wenym1 Dec 5, 2023
1aaf516
use expect
wenym1 Dec 5, 2023
e1a3814
save
xxhZs Dec 5, 2023
76547e4
fix fmt
xxhZs Dec 7, 2023
b0b1b87
add ci
xxhZs Dec 7, 2023
50899c0
Merge branch 'main' into 13152-reimplement-delta-lake-sink-with-rust-sdk
xxhZs Dec 7, 2023
10b1ba6
Empty commit
xxhZs Dec 7, 2023
5a492a6
fix ci
xxhZs Dec 7, 2023
435eb73
fix
xxhZs Dec 8, 2023
cf96d64
ut timeout 20 -> 22
xxhZs Dec 12, 2023
4a83b8f
fix region
xxhZs Dec 12, 2023
814dcc9
reduce compile time and binary size
wenym1 Dec 12, 2023
f9605ad
Merge branch 'main' into 13152-reimplement-delta-lake-sink-with-rust-sdk
wenym1 Dec 12, 2023
899070a
temp add hakari third party
wenym1 Dec 13, 2023
86625ab
use new delta-rs commit
wenym1 Dec 13, 2023
5c365a9
Merge branch 'main' into 13152-reimplement-delta-lake-sink-with-rust-sdk
xxhZs Dec 14, 2023
a4023e5
fix ci
xxhZs Dec 14, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,011 changes: 873 additions & 138 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ arrow-flight = "49"
arrow-select = "49"
arrow-ord = "49"
arrow-row = "49"
arrow-array-deltalake = { package = "arrow-array", version = "48.0.1" }
arrow-buffer-deltalake = { package = "arrow-buffer", version = "48.0.1" }
arrow-cast-deltalake = { package = "arrow-cast", version = "48.0.1" }
arrow-schema-deltalake = { package = "arrow-schema", version = "48.0.1" }
deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "72505449e9538371fe5fda35d545dbd662facd07", features = ["s3"] }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a particular reason for downgrading arrow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because both Delta.rs and IceLake need to use Arrow, version 48.0.1 is the maximum version they both support.

parquet = "49"
thiserror-ext = "0.0.10"
tikv-jemalloc-ctl = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9" }
tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", features = [
Expand Down
4 changes: 4 additions & 0 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ normal = ["workspace-hack"]
anyhow = "1"
arc-swap = "1"
arrow-array = { workspace = true }
arrow-array-deltalake = { workspace = true }
arrow-buffer = { workspace = true }
arrow-buffer-deltalake = { workspace = true }
arrow-cast = { workspace = true }
arrow-cast-deltalake = { workspace = true }
arrow-schema = { workspace = true }
arrow-schema-deltalake = { workspace = true }
async-trait = "0.1"
auto_enums = "0.8"
auto_impl = "1"
Expand Down
26 changes: 26 additions & 0 deletions src/common/src/array/arrow/arrow_default.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! This is for arrow dependency named `arrow-xxx` such as `arrow-array` in the cargo workspace.
//!
//! This should the default arrow version to be used in our system.
//!
//! The corresponding version of arrow is currently used by `udf` and `iceberg` sink.

pub use arrow_impl::to_record_batch_with_schema;
use {arrow_array, arrow_buffer, arrow_cast, arrow_schema};

#[expect(clippy::duplicate_mod)]
#[path = "./arrow_impl.rs"]
mod arrow_impl;
28 changes: 28 additions & 0 deletions src/common/src/array/arrow/arrow_deltalake.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! This is for arrow dependency named `arrow-xxx-deltalake` such as `arrow-array-deltalake`
//! in the cargo workspace.
//!
//! The corresponding version of arrow is currently used by `deltalake` sink.

pub use arrow_impl::to_record_batch_with_schema as to_deltalake_record_batch_with_schema;
wenym1 marked this conversation as resolved.
Show resolved Hide resolved
use {
arrow_array_deltalake as arrow_array, arrow_buffer_deltalake as arrow_buffer,
arrow_cast_deltalake as arrow_cast, arrow_schema_deltalake as arrow_schema,
};

#[expect(clippy::duplicate_mod)]
#[path = "./arrow_impl.rs"]
mod arrow_impl;
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,47 @@
// limitations under the License.

//! Converts between arrays and Apache Arrow arrays.
//!
//! This file acts as a template file for conversion code between
//! arrays and different version of Apache Arrow.
//!
//! The conversion logic will be implemented for the arrow version specified in the outer mod by
//! `super::arrow_xxx`, such as `super::arrow_array`.
//!
//! When we want to implement the conversion logic for an arrow version, we first
//! create a new mod file, and rename the corresponding arrow package name to `arrow_xxx`
//! using the `use` clause, and then declare a sub-mod and set its file path with attribute
//! `#[path = "./arrow_impl.rs"]` so that the code in this template file can be embedded to
//! the new mod file, and the conversion logic can be implemented for the corresponding arrow
//! version.
//!
//! Example can be seen in `arrow_default.rs`, which is also as followed:
//! ```ignore
//! use {arrow_array, arrow_buffer, arrow_cast, arrow_schema};
//!
//! #[allow(clippy::duplicate_mod)]
//! #[path = "./arrow_impl.rs"]
//! mod arrow_impl;
//! ```

use std::fmt::Write;
use std::sync::Arc;

use arrow_array::Array as ArrowArray;
use arrow_cast::cast;
use arrow_schema::{Field, Schema, SchemaRef, DECIMAL256_MAX_PRECISION};
use chrono::{NaiveDateTime, NaiveTime};
use itertools::Itertools;

use super::*;
use crate::types::{Int256, StructType};
// This is important because we want to use the arrow version specified by the outer mod.
use super::{arrow_array, arrow_buffer, arrow_cast, arrow_schema};
// Other import should always use the absolute path.
use crate::array::*;
use crate::buffer::Bitmap;
use crate::types::*;
use crate::util::iter_util::ZipEqFast;

/// Converts RisingWave array to Arrow array with the schema.
/// This function will try to convert the array if the type is not same with the schema.
pub fn to_record_batch_with_schema(
schema: SchemaRef,
schema: arrow_schema::SchemaRef,
chunk: &DataChunk,
) -> Result<arrow_array::RecordBatch, ArrayError> {
if !chunk.is_compacted() {
Expand All @@ -45,7 +69,7 @@ pub fn to_record_batch_with_schema(
if column.data_type() == field.data_type() {
Ok(column)
} else {
cast(&column, field.data_type()).map_err(ArrayError::from_arrow)
arrow_cast::cast(&column, field.data_type()).map_err(ArrayError::from_arrow)
}
})
.try_collect::<_, _, ArrayError>()?;
Expand All @@ -72,14 +96,14 @@ impl TryFrom<&DataChunk> for arrow_array::RecordBatch {

let fields: Vec<_> = columns
.iter()
.map(|array: &Arc<dyn ArrowArray>| {
.map(|array: &Arc<dyn arrow_array::Array>| {
let nullable = array.null_count() > 0;
let data_type = array.data_type().clone();
Field::new("", data_type, nullable)
arrow_schema::Field::new("", data_type, nullable)
})
.collect();

let schema = Arc::new(Schema::new(fields));
let schema = Arc::new(arrow_schema::Schema::new(fields));
let opts =
arrow_array::RecordBatchOptions::default().with_row_count(Some(chunk.capacity()));
arrow_array::RecordBatch::try_new_with_options(schema, columns, &opts)
Expand Down Expand Up @@ -205,7 +229,7 @@ impl TryFrom<&StructType> for arrow_schema::Fields {
fn try_from(struct_type: &StructType) -> Result<Self, Self::Error> {
struct_type
.iter()
.map(|(name, ty)| Ok(Field::new(name, ty.try_into()?, true)))
.map(|(name, ty)| Ok(arrow_schema::Field::new(name, ty.try_into()?, true)))
.try_collect()
}
}
Expand All @@ -225,7 +249,7 @@ impl TryFrom<&DataType> for arrow_schema::DataType {
DataType::Int16 => Ok(Self::Int16),
DataType::Int32 => Ok(Self::Int32),
DataType::Int64 => Ok(Self::Int64),
DataType::Int256 => Ok(Self::Decimal256(DECIMAL256_MAX_PRECISION, 0)),
DataType::Int256 => Ok(Self::Decimal256(arrow_schema::DECIMAL256_MAX_PRECISION, 0)),
DataType::Float32 => Ok(Self::Float32),
DataType::Float64 => Ok(Self::Float64),
DataType::Date => Ok(Self::Date32),
Expand All @@ -243,10 +267,10 @@ impl TryFrom<&DataType> for arrow_schema::DataType {
DataType::Struct(struct_type) => Ok(Self::Struct(
struct_type
.iter()
.map(|(name, ty)| Ok(Field::new(name, ty.try_into()?, true)))
.map(|(name, ty)| Ok(arrow_schema::Field::new(name, ty.try_into()?, true)))
.try_collect::<_, _, ArrayError>()?,
)),
DataType::List(datatype) => Ok(Self::List(Arc::new(Field::new(
DataType::List(datatype) => Ok(Self::List(Arc::new(arrow_schema::Field::new(
"item",
datatype.as_ref().try_into()?,
true,
Expand Down Expand Up @@ -528,6 +552,20 @@ impl TryFrom<&arrow_array::LargeStringArray> for JsonbArray {
}
}

impl From<arrow_buffer::i256> for Int256 {
fn from(value: arrow_buffer::i256) -> Self {
let buffer = value.to_be_bytes();
Int256::from_be_bytes(buffer)
}
}

impl<'a> From<Int256Ref<'a>> for arrow_buffer::i256 {
fn from(val: Int256Ref<'a>) -> Self {
let buffer = val.to_be_bytes();
arrow_buffer::i256::from_be_bytes(buffer)
}
}

impl From<&Int256Array> for arrow_array::Decimal256Array {
fn from(array: &Int256Array) -> Self {
array
Expand Down Expand Up @@ -604,7 +642,7 @@ impl TryFrom<&ListArray> for arrow_array::ListArray {
array,
a,
Decimal256Builder::with_capacity(a.len()).with_data_type(
arrow_schema::DataType::Decimal256(DECIMAL256_MAX_PRECISION, 0),
arrow_schema::DataType::Decimal256(arrow_schema::DECIMAL256_MAX_PRECISION, 0),
),
|b, v| b.append_option(v.map(Into::into)),
),
Expand Down Expand Up @@ -656,7 +694,11 @@ impl TryFrom<&ListArray> for arrow_array::ListArray {
ArrayImpl::Struct(a) => {
let values = Arc::new(arrow_array::StructArray::try_from(a)?);
arrow_array::ListArray::new(
Arc::new(Field::new("item", a.data_type().try_into()?, true)),
Arc::new(arrow_schema::Field::new(
"item",
a.data_type().try_into()?,
true,
)),
arrow_buffer::OffsetBuffer::new(arrow_buffer::ScalarBuffer::from(
array
.offsets()
Expand All @@ -683,6 +725,7 @@ impl TryFrom<&arrow_array::ListArray> for ListArray {
type Error = ArrayError;

fn try_from(array: &arrow_array::ListArray) -> Result<Self, Self::Error> {
use arrow_array::Array;
Ok(ListArray {
value: Box::new(ArrayImpl::try_from(array.values())?),
bitmap: match array.nulls() {
Expand Down Expand Up @@ -731,6 +774,7 @@ impl TryFrom<&arrow_array::StructArray> for StructArray {

#[cfg(test)]
mod tests {
use super::arrow_array::Array as _;
use super::*;

#[test]
Expand Down Expand Up @@ -860,8 +904,6 @@ mod tests {

#[test]
fn struct_array() {
use arrow_array::Array as _;

// Empty array - risingwave to arrow conversion.
let test_arr = StructArray::new(StructType::empty(), vec![], Bitmap::ones(0));
assert_eq!(
Expand Down
19 changes: 19 additions & 0 deletions src/common/src/array/arrow/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod arrow_default;
mod arrow_deltalake;

pub use arrow_default::to_record_batch_with_schema;
pub use arrow_deltalake::to_deltalake_record_batch_with_schema;
2 changes: 1 addition & 1 deletion src/common/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
//! `Array` defines all in-memory representations of vectorized execution framework.

mod arrow;
pub use arrow::to_record_batch_with_schema;
pub use arrow::{to_deltalake_record_batch_with_schema, to_record_batch_with_schema};
mod bool_array;
pub mod bytes_array;
mod chrono_array;
Expand Down
14 changes: 0 additions & 14 deletions src/common/src/types/num256.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,20 +326,6 @@ impl Num for Int256 {
}
}

impl From<arrow_buffer::i256> for Int256 {
fn from(value: arrow_buffer::i256) -> Self {
let buffer = value.to_be_bytes();
Int256::from_be_bytes(buffer)
}
}

impl<'a> From<Int256Ref<'a>> for arrow_buffer::i256 {
fn from(val: Int256Ref<'a>) -> Self {
let buffer = val.to_be_bytes();
arrow_buffer::i256::from_be_bytes(buffer)
}
}

impl EstimateSize for Int256 {
fn estimated_heap_size(&self) -> usize {
mem::size_of::<i128>() * 2
Expand Down
1 change: 1 addition & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ clickhouse = { git = "https://github.com/risingwavelabs/clickhouse.rs", rev = "6
"time",
] }
csv = "1.3"
deltalake = { workspace = true }
duration-str = "0.7.0"
easy-ext = "1"
enum-as-inner = "0.6"
Expand Down
Loading
Loading