Skip to content

Commit

Permalink
feat(iceberg): support create iceberg table w/ one layer nested type …
Browse files Browse the repository at this point in the history
…& scan table w/ nested type (#18498)

Co-authored-by: xxchan <[email protected]>
  • Loading branch information
chenzl25 and xxchan authored Sep 12, 2024
1 parent 317641f commit 34a2ad4
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 26 deletions.
9 changes: 3 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ arrow-array-iceberg = { package = "arrow-array", version = "52" }
arrow-schema-iceberg = { package = "arrow-schema", version = "52" }
arrow-buffer-iceberg = { package = "arrow-buffer", version = "52" }
arrow-cast-iceberg = { package = "arrow-cast", version = "52" }
iceberg = "0.3.0"
iceberg-catalog-rest = "0.3.0"
iceberg-catalog-glue = "0.3.0"
iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "df076a9949e9ed6431a6dd3998ac0c49152dda9c" }
iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "df076a9949e9ed6431a6dd3998ac0c49152dda9c" }
iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "df076a9949e9ed6431a6dd3998ac0c49152dda9c" }
opendal = "0.47"
arrow-array = "50"
arrow-arith = "50"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ CREATE SINK s6 AS select * from mv6 WITH (
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
create_table_if_not_exists = 'true'
);

statement ok
Expand All @@ -62,6 +63,30 @@ FLUSH;

sleep 5s

statement ok
CREATE Source iceberg_s WITH (
connector = 'iceberg',
database.name = 'demo_db',
table.name = 'no_partition_append_only_table',
catalog.name = 'demo',
catalog.type = 'storage',
warehouse.path = 's3a://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin'
);

query ?????????????? rowsort
select * from iceberg_s
----
1 1 1000 1.1 1.11 1-1 t 2022-03-11 2022-03-11 01:00:00+00:00 2022-03-11 01:00:00 1.11000 {1:100,2:200} {1,2,3} (1,2)
2 2 2000 2.2 2.22 2-2 f 2022-03-12 2022-03-12 02:00:00+00:00 2022-03-12 02:00:00 2.22000 {3:300} {1,NULL,3} (3,)
3 3 3000 3.3 3.33 3-3 t 2022-03-13 2022-03-13 03:00:00+00:00 2022-03-13 03:00:00 99999.99999 NULL NULL NULL
4 4 4000 4.4 4.44 4-4 f 2022-03-14 2022-03-14 04:00:00+00:00 2022-03-14 04:00:00 -99999.99999 NULL NULL NULL
5 5 5000 5.5 5.55 5-5 t 2022-03-15 2022-03-15 05:00:00+00:00 2022-03-15 05:00:00 NULL NULL NULL NULL


statement ok
DROP SINK s6;

Expand All @@ -70,3 +95,6 @@ DROP MATERIALIZED VIEW mv6;

statement ok
DROP TABLE t6;

statement ok
DROP SOURCE iceberg_s;
90 changes: 90 additions & 0 deletions src/common/src/array/arrow/arrow_iceberg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cell::RefCell;
use std::collections::HashMap;
use std::ops::{Div, Mul};
use std::sync::Arc;

Expand Down Expand Up @@ -171,6 +173,94 @@ impl ToArrow for IcebergArrowConvert {

impl FromArrow for IcebergArrowConvert {}

/// Iceberg sink with `create_table_if_not_exists` option will use this struct to convert the
/// iceberg data type to arrow data type. Specifically, it will add the field id to the
/// arrow field metadata, because iceberg-rust and icelake need the field id to be set.
///
/// Note: this is different from [`IcebergArrowConvert`], which is used to read from/write to
/// an _existing_ iceberg table. In that case, we just need to make sure the data is compatible to the existing schema.
/// But to _create a new table_, we need to meet more requirements of iceberg.
#[derive(Default)]
pub struct IcebergCreateTableArrowConvert {
next_field_id: RefCell<u32>,
}

impl IcebergCreateTableArrowConvert {
pub fn to_arrow_field(
&self,
name: &str,
data_type: &DataType,
) -> Result<arrow_schema::Field, ArrayError> {
ToArrow::to_arrow_field(self, name, data_type)
}

fn add_field_id(&self, arrow_field: &mut arrow_schema::Field) {
*self.next_field_id.borrow_mut() += 1;
let field_id = *self.next_field_id.borrow();

let mut metadata = HashMap::new();
// for iceberg-rust
metadata.insert("PARQUET:field_id".to_string(), field_id.to_string());
// for icelake
metadata.insert("column_id".to_string(), field_id.to_string());
arrow_field.set_metadata(metadata);
}
}

impl ToArrow for IcebergCreateTableArrowConvert {
#[inline]
fn decimal_type_to_arrow(&self, name: &str) -> arrow_schema::Field {
// To create a iceberg table, we need a decimal type with precision and scale to be set
// We choose 28 here
// The decimal type finally will be converted to an iceberg decimal type.
// Iceberg decimal(P,S)
// Fixed-point decimal; precision P, scale S Scale is fixed, precision must be 38 or less.
let data_type = arrow_schema::DataType::Decimal128(28, 10);

let mut arrow_field = arrow_schema::Field::new(name, data_type, true);
self.add_field_id(&mut arrow_field);
arrow_field
}

/// Convert RisingWave data type to Arrow data type.
///
/// This function returns a `Field` instead of `DataType` because some may be converted to
/// extension types which require additional metadata in the field.
fn to_arrow_field(
&self,
name: &str,
value: &DataType,
) -> Result<arrow_schema::Field, ArrayError> {
let data_type = match value {
// using the inline function
DataType::Boolean => self.bool_type_to_arrow(),
DataType::Int16 => self.int16_type_to_arrow(),
DataType::Int32 => self.int32_type_to_arrow(),
DataType::Int64 => self.int64_type_to_arrow(),
DataType::Int256 => self.int256_type_to_arrow(),
DataType::Float32 => self.float32_type_to_arrow(),
DataType::Float64 => self.float64_type_to_arrow(),
DataType::Date => self.date_type_to_arrow(),
DataType::Time => self.time_type_to_arrow(),
DataType::Timestamp => self.timestamp_type_to_arrow(),
DataType::Timestamptz => self.timestamptz_type_to_arrow(),
DataType::Interval => self.interval_type_to_arrow(),
DataType::Varchar => self.varchar_type_to_arrow(),
DataType::Bytea => self.bytea_type_to_arrow(),
DataType::Serial => self.serial_type_to_arrow(),
DataType::Decimal => return Ok(self.decimal_type_to_arrow(name)),
DataType::Jsonb => return Ok(self.jsonb_type_to_arrow(name)),
DataType::Struct(fields) => self.struct_type_to_arrow(fields)?,
DataType::List(datatype) => self.list_type_to_arrow(datatype)?,
DataType::Map(datatype) => self.map_type_to_arrow(datatype)?,
};

let mut arrow_field = arrow_schema::Field::new(name, data_type, true);
self.add_field_id(&mut arrow_field);
Ok(arrow_field)
}
}

#[cfg(test)]
mod test {
use std::sync::Arc;
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/array/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ mod arrow_iceberg;
mod arrow_udf;

pub use arrow_deltalake::DeltaLakeConvert;
pub use arrow_iceberg::IcebergArrowConvert;
pub use arrow_iceberg::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
pub use arrow_udf::{FromArrow, ToArrow, UdfArrowConvert};

use crate::types::Interval;
Expand Down
21 changes: 5 additions & 16 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,10 @@ use icelake::io_v2::{
DataFileWriterBuilder, EqualityDeltaWriterBuilder, IcebergWriterBuilder, DELETE_OP, INSERT_OP,
};
use icelake::transaction::Transaction;
use icelake::types::{data_file_from_json, data_file_to_json, Any, DataFile, COLUMN_ID_META_KEY};
use icelake::types::{data_file_from_json, data_file_to_json, Any, DataFile};
use icelake::{Table, TableIdentifier};
use itertools::Itertools;
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::bail;
use risingwave_common::bitmap::Bitmap;
Expand Down Expand Up @@ -745,30 +744,20 @@ impl IcebergSink {
bail!("database name must be set if you want to create table")
};

let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
// convert risingwave schema -> arrow schema -> iceberg schema
let arrow_fields = self
.param
.columns
.iter()
.map(|column| {
let mut arrow_field = IcebergArrowConvert
Ok(iceberg_create_table_arrow_convert
.to_arrow_field(&column.name, &column.data_type)
.map_err(|e| SinkError::Iceberg(anyhow!(e)))
.context(format!(
"failed to convert {}: {} to arrow type",
&column.name, &column.data_type
))?;
let mut metadata = HashMap::new();
metadata.insert(
PARQUET_FIELD_ID_META_KEY.to_string(),
column.column_id.get_id().to_string(),
);
metadata.insert(
COLUMN_ID_META_KEY.to_string(),
column.column_id.get_id().to_string(),
);
arrow_field.set_metadata(metadata);
Ok(arrow_field)
))?)
})
.collect::<Result<Vec<ArrowField>>>()?;
let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
Expand Down

0 comments on commit 34a2ad4

Please sign in to comment.