Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(iceberg): support create iceberg table w/ one layer nested type & scan table w/ nested type #18498

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ arrow-array-iceberg = { package = "arrow-array", version = "52" }
arrow-schema-iceberg = { package = "arrow-schema", version = "52" }
arrow-buffer-iceberg = { package = "arrow-buffer", version = "52" }
arrow-cast-iceberg = { package = "arrow-cast", version = "52" }
iceberg = "0.3.0"
iceberg-catalog-rest = "0.3.0"
iceberg-catalog-glue = "0.3.0"
iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "df076a9949e9ed6431a6dd3998ac0c49152dda9c" }
iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "df076a9949e9ed6431a6dd3998ac0c49152dda9c" }
iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "df076a9949e9ed6431a6dd3998ac0c49152dda9c" }
opendal = "0.47"
arrow-array = "50"
arrow-arith = "50"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ CREATE SINK s6 AS select * from mv6 WITH (
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
create_table_if_not_exists = 'true'
);

statement ok
Expand All @@ -62,6 +63,30 @@ FLUSH;

sleep 5s

statement ok
CREATE Source iceberg_s WITH (
connector = 'iceberg',
database.name = 'demo_db',
table.name = 'no_partition_append_only_table',
catalog.name = 'demo',
catalog.type = 'storage',
warehouse.path = 's3a://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin'
);

query ?????????????? rowsort
select * from iceberg_s
----
1 1 1000 1.1 1.11 1-1 t 2022-03-11 2022-03-11 01:00:00+00:00 2022-03-11 01:00:00 1.11000 {1:100,2:200} {1,2,3} (1,2)
2 2 2000 2.2 2.22 2-2 f 2022-03-12 2022-03-12 02:00:00+00:00 2022-03-12 02:00:00 2.22000 {3:300} {1,NULL,3} (3,)
3 3 3000 3.3 3.33 3-3 t 2022-03-13 2022-03-13 03:00:00+00:00 2022-03-13 03:00:00 99999.99999 NULL NULL NULL
4 4 4000 4.4 4.44 4-4 f 2022-03-14 2022-03-14 04:00:00+00:00 2022-03-14 04:00:00 -99999.99999 NULL NULL NULL
5 5 5000 5.5 5.55 5-5 t 2022-03-15 2022-03-15 05:00:00+00:00 2022-03-15 05:00:00 NULL NULL NULL NULL


statement ok
DROP SINK s6;

Expand All @@ -70,3 +95,6 @@ DROP MATERIALIZED VIEW mv6;

statement ok
DROP TABLE t6;

statement ok
DROP SOURCE iceberg_s;
90 changes: 90 additions & 0 deletions src/common/src/array/arrow/arrow_iceberg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cell::RefCell;
use std::collections::HashMap;
use std::ops::{Div, Mul};
use std::sync::Arc;

Expand Down Expand Up @@ -171,6 +173,94 @@ impl ToArrow for IcebergArrowConvert {

impl FromArrow for IcebergArrowConvert {}

/// Iceberg sink with `create_table_if_not_exists` option will use this struct to convert the
/// iceberg data type to arrow data type. Specifically, it will add the field id to the
/// arrow field metadata, because iceberg-rust and icelake need the field id to be set.
///
/// Note: this is different from [`IcebergArrowConvert`], which is used to read from/write to
/// an _existing_ iceberg table. In that case, we just need to make sure the data is compatible to the existing schema.
/// But to _create a new table_, we need to meet more requirements of iceberg.
#[derive(Default)]
chenzl25 marked this conversation as resolved.
Show resolved Hide resolved
pub struct IcebergCreateTableArrowConvert {
next_field_id: RefCell<u32>,
}

impl IcebergCreateTableArrowConvert {
pub fn to_arrow_field(
&self,
name: &str,
data_type: &DataType,
) -> Result<arrow_schema::Field, ArrayError> {
ToArrow::to_arrow_field(self, name, data_type)
}

fn add_field_id(&self, arrow_field: &mut arrow_schema::Field) {
*self.next_field_id.borrow_mut() += 1;
let field_id = *self.next_field_id.borrow();

let mut metadata = HashMap::new();
// for iceberg-rust
metadata.insert("PARQUET:field_id".to_string(), field_id.to_string());
// for icelake
metadata.insert("column_id".to_string(), field_id.to_string());
arrow_field.set_metadata(metadata);
}
}

impl ToArrow for IcebergCreateTableArrowConvert {
#[inline]
fn decimal_type_to_arrow(&self, name: &str) -> arrow_schema::Field {
// To create a iceberg table, we need a decimal type with precision and scale to be set
// We choose 28 here
// The decimal type finally will be converted to an iceberg decimal type.
// Iceberg decimal(P,S)
// Fixed-point decimal; precision P, scale S Scale is fixed, precision must be 38 or less.
let data_type = arrow_schema::DataType::Decimal128(28, 10);

let mut arrow_field = arrow_schema::Field::new(name, data_type, true);
self.add_field_id(&mut arrow_field);
arrow_field
}

/// Convert RisingWave data type to Arrow data type.
///
/// This function returns a `Field` instead of `DataType` because some may be converted to
/// extension types which require additional metadata in the field.
fn to_arrow_field(
&self,
name: &str,
value: &DataType,
) -> Result<arrow_schema::Field, ArrayError> {
let data_type = match value {
// using the inline function
DataType::Boolean => self.bool_type_to_arrow(),
DataType::Int16 => self.int16_type_to_arrow(),
DataType::Int32 => self.int32_type_to_arrow(),
DataType::Int64 => self.int64_type_to_arrow(),
DataType::Int256 => self.int256_type_to_arrow(),
DataType::Float32 => self.float32_type_to_arrow(),
DataType::Float64 => self.float64_type_to_arrow(),
DataType::Date => self.date_type_to_arrow(),
DataType::Time => self.time_type_to_arrow(),
DataType::Timestamp => self.timestamp_type_to_arrow(),
DataType::Timestamptz => self.timestamptz_type_to_arrow(),
DataType::Interval => self.interval_type_to_arrow(),
DataType::Varchar => self.varchar_type_to_arrow(),
DataType::Bytea => self.bytea_type_to_arrow(),
DataType::Serial => self.serial_type_to_arrow(),
DataType::Decimal => return Ok(self.decimal_type_to_arrow(name)),
DataType::Jsonb => return Ok(self.jsonb_type_to_arrow(name)),
DataType::Struct(fields) => self.struct_type_to_arrow(fields)?,
DataType::List(datatype) => self.list_type_to_arrow(datatype)?,
DataType::Map(datatype) => self.map_type_to_arrow(datatype)?,
};

let mut arrow_field = arrow_schema::Field::new(name, data_type, true);
self.add_field_id(&mut arrow_field);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self in the trait method signature finally becomes useful 🤣

Ok(arrow_field)
}
}

#[cfg(test)]
mod test {
use std::sync::Arc;
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/array/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ mod arrow_iceberg;
mod arrow_udf;

pub use arrow_deltalake::DeltaLakeConvert;
pub use arrow_iceberg::IcebergArrowConvert;
pub use arrow_iceberg::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
pub use arrow_udf::{FromArrow, ToArrow, UdfArrowConvert};

use crate::types::Interval;
Expand Down
21 changes: 5 additions & 16 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,10 @@ use icelake::io_v2::{
DataFileWriterBuilder, EqualityDeltaWriterBuilder, IcebergWriterBuilder, DELETE_OP, INSERT_OP,
};
use icelake::transaction::Transaction;
use icelake::types::{data_file_from_json, data_file_to_json, Any, DataFile, COLUMN_ID_META_KEY};
use icelake::types::{data_file_from_json, data_file_to_json, Any, DataFile};
use icelake::{Table, TableIdentifier};
use itertools::Itertools;
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::bail;
use risingwave_common::bitmap::Bitmap;
Expand Down Expand Up @@ -745,30 +744,20 @@ impl IcebergSink {
bail!("database name must be set if you want to create table")
};

let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
// convert risingwave schema -> arrow schema -> iceberg schema
let arrow_fields = self
.param
.columns
.iter()
.map(|column| {
let mut arrow_field = IcebergArrowConvert
Ok(iceberg_create_table_arrow_convert
.to_arrow_field(&column.name, &column.data_type)
.map_err(|e| SinkError::Iceberg(anyhow!(e)))
.context(format!(
"failed to convert {}: {} to arrow type",
&column.name, &column.data_type
))?;
let mut metadata = HashMap::new();
metadata.insert(
PARQUET_FIELD_ID_META_KEY.to_string(),
column.column_id.get_id().to_string(),
);
metadata.insert(
COLUMN_ID_META_KEY.to_string(),
column.column_id.get_id().to_string(),
);
arrow_field.set_metadata(metadata);
Ok(arrow_field)
))?)
})
.collect::<Result<Vec<ArrowField>>>()?;
let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
Expand Down
Loading