From e69f5132d7fc8208324e08100a68d0140bfea054 Mon Sep 17 00:00:00 2001 From: ZENOTME <43447882+ZENOTME@users.noreply.github.com> Date: Sun, 8 Oct 2023 15:23:18 +0800 Subject: [PATCH] feat: support iceberg sink partition write (#12664) Co-authored-by: ZENOTME --- Cargo.lock | 3 +- Cargo.toml | 2 +- ci/scripts/e2e-iceberg-sink-v2-test.sh | 2 + .../test_case/partition_append_only.toml | 40 +++++++++++++++++++ .../iceberg/test_case/partition_upsert.toml | 35 ++++++++++++++++ src/connector/src/sink/iceberg.rs | 39 +++++++++++------- 6 files changed, 105 insertions(+), 16 deletions(-) create mode 100644 e2e_test/iceberg/test_case/partition_append_only.toml create mode 100644 e2e_test/iceberg/test_case/partition_upsert.toml diff --git a/Cargo.lock b/Cargo.lock index 47e85c6a612b5..950a49e1b2296 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3851,7 +3851,7 @@ dependencies = [ [[package]] name = "icelake" version = "0.0.10" -source = "git+https://github.com/icelake-io/icelake?rev=72a65aed6ed7b3d529b311031c2c8d99650990e2#72a65aed6ed7b3d529b311031c2c8d99650990e2" +source = "git+https://github.com/icelake-io/icelake?rev=16dab0e36ab337e58ee8002d828def2d212fa116#16dab0e36ab337e58ee8002d828def2d212fa116" dependencies = [ "anyhow", "apache-avro 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -5294,6 +5294,7 @@ dependencies = [ "parking_lot 0.12.1", "percent-encoding", "pin-project", + "prometheus", "quick-xml 0.29.0", "reqsign", "reqwest", diff --git a/Cargo.toml b/Cargo.toml index 43316c1288f09..c674a38f2fea7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -108,7 +108,7 @@ criterion = { version = "0.5", features = ["async_futures"] } tonic = { package = "madsim-tonic", version = "0.4.0" } tonic-build = { package = "madsim-tonic-build", version = "0.4.0" } prost = { version = "0.12" } -icelake = { git = "https://github.com/icelake-io/icelake", rev = "72a65aed6ed7b3d529b311031c2c8d99650990e2" } +icelake = { git = "https://github.com/icelake-io/icelake", rev = "16dab0e36ab337e58ee8002d828def2d212fa116" } arrow-array = "47" arrow-cast = "47" arrow-schema = "47" diff --git a/ci/scripts/e2e-iceberg-sink-v2-test.sh b/ci/scripts/e2e-iceberg-sink-v2-test.sh index c365e417319c2..0e8054a4946af 100755 --- a/ci/scripts/e2e-iceberg-sink-v2-test.sh +++ b/ci/scripts/e2e-iceberg-sink-v2-test.sh @@ -40,6 +40,8 @@ bash ./start_spark_connect_server.sh "$HOME"/.local/bin/poetry update --quiet "$HOME"/.local/bin/poetry run python main.py -t ./test_case/no_partition_append_only.toml "$HOME"/.local/bin/poetry run python main.py -t ./test_case/no_partition_upsert.toml +"$HOME"/.local/bin/poetry run python main.py -t ./test_case/partition_append_only.toml +"$HOME"/.local/bin/poetry run python main.py -t ./test_case/partition_upsert.toml echo "--- Kill cluster" diff --git a/e2e_test/iceberg/test_case/partition_append_only.toml b/e2e_test/iceberg/test_case/partition_append_only.toml new file mode 100644 index 0000000000000..4721ef11c5ba6 --- /dev/null +++ b/e2e_test/iceberg/test_case/partition_append_only.toml @@ -0,0 +1,40 @@ +init_sqls = [ + 'CREATE SCHEMA IF NOT EXISTS demo_db', + 'DROP TABLE IF EXISTS demo_db.demo_table', + ''' + CREATE TABLE demo_db.demo_table ( + id long, + v_int int, + v_long long, + v_float float, + v_double double, + v_varchar string, + v_bool boolean, + v_date date, + v_timestamp timestamp, + v_ts_ntz timestamp_ntz + ) + PARTITIONED BY (v_int,v_long,v_float,v_double,v_varchar,v_bool,v_date,v_timestamp,v_ts_ntz) + TBLPROPERTIES ('format-version'='2'); + ''' +] + +slt = 'test_case/iceberg_sink_append_only.slt' + +verify_schema = ['long', 'int', 'long', 'float', 'double', 'string', 'boolean', 'date', 'timestamp', 'timestamp_ntz'] + +verify_sql = 'SELECT * FROM demo_db.demo_table ORDER BY id ASC' + + +verify_data = """ +1,1,1000,1.1,1.11,1-1,true,2022-03-11,2022-03-11 01:00:00+00:00,2022-03-11 01:00:00 +2,2,2000,2.2,2.22,2-2,false,2022-03-12,2022-03-12 02:00:00+00:00,2022-03-12 02:00:00 +3,3,3000,3.3,3.33,3-3,true,2022-03-13,2022-03-13 03:00:00+00:00,2022-03-13 03:00:00 +4,4,4000,4.4,4.44,4-4,false,2022-03-14,2022-03-14 04:00:00+00:00,2022-03-14 04:00:00 +5,5,5000,5.5,5.55,5-5,true,2022-03-15,2022-03-15 05:00:00+00:00,2022-03-15 05:00:00 +""" + +drop_sqls = [ + 'DROP TABLE IF EXISTS demo_db.demo_table', + 'DROP SCHEMA IF EXISTS demo_db' +] diff --git a/e2e_test/iceberg/test_case/partition_upsert.toml b/e2e_test/iceberg/test_case/partition_upsert.toml new file mode 100644 index 0000000000000..d95178ed893fa --- /dev/null +++ b/e2e_test/iceberg/test_case/partition_upsert.toml @@ -0,0 +1,35 @@ +init_sqls = [ + 'CREATE SCHEMA IF NOT EXISTS demo_db', + 'DROP TABLE IF EXISTS demo_db.demo_table', + ''' + CREATE TABLE demo_db.demo_table ( + id int, + v1 int, + v2 long, + v3 string + ) USING iceberg + PARTITIONED BY (v1,v2) + TBLPROPERTIES ('format-version'='2'); + ''' +] + +slt = 'test_case/iceberg_sink_upsert.slt' + +verify_schema = ['int','int','long','string'] + +verify_sql = 'SELECT * FROM demo_db.demo_table ORDER BY id, v1 ASC' + +verify_data = """ +1,1,50,1-50 +1,2,2,2-2 +1,3,2,3-2 +1,5,2,5-2 +1,8,2,8-2 +1,13,2,13-2 +1,21,2,21-2 +""" + +drop_sqls = [ + 'DROP TABLE IF EXISTS demo_db.demo_table', + 'DROP SCHEMA IF EXISTS demo_db' +] diff --git a/src/connector/src/sink/iceberg.rs b/src/connector/src/sink/iceberg.rs index dd2b7e793b634..88ff53cccb714 100644 --- a/src/connector/src/sink/iceberg.rs +++ b/src/connector/src/sink/iceberg.rs @@ -23,8 +23,9 @@ use arrow_schema::{DataType as ArrowDataType, Schema as ArrowSchema, SchemaRef}; use async_trait::async_trait; use icelake::catalog::{load_catalog, CATALOG_NAME, CATALOG_TYPE}; use icelake::io::file_writer::DeltaWriterResult; +use icelake::io::EmptyLayer; use icelake::transaction::Transaction; -use icelake::types::{data_file_from_json, data_file_to_json, DataFile}; +use icelake::types::{data_file_from_json, data_file_to_json, Any, DataFile}; use icelake::{Table, TableIdentifier}; use risingwave_common::array::{to_record_batch_with_schema, Op, StreamChunk}; use risingwave_common::buffer::Bitmap; @@ -367,8 +368,12 @@ impl Sink for IcebergSink { _connector_client: Option, ) -> Result { let table = self.create_table().await?; + let partition_type = table.current_partition_type()?; - Ok(IcebergSinkCommitter { table }) + Ok(IcebergSinkCommitter { + table, + partition_type, + }) } } @@ -437,7 +442,7 @@ impl SinkWriter for IcebergWriter { struct AppendOnlyWriter { table: Table, - writer: icelake::io::task_writer::TaskWriter, + writer: icelake::io::task_writer::TaskWriter, schema: SchemaRef, } @@ -455,7 +460,9 @@ impl AppendOnlyWriter { Ok(Self { writer: table - .task_writer() + .writer_builder() + .await? + .build_task_writer() .await .map_err(|err| SinkError::Iceberg(anyhow!(err)))?, table, @@ -484,7 +491,9 @@ impl AppendOnlyWriter { let old_writer = std::mem::replace( &mut self.writer, self.table - .task_writer() + .writer_builder() + .await? + .build_task_writer() .await .map_err(|err| SinkError::Iceberg(anyhow!(err)))?, ); @@ -604,7 +613,7 @@ impl UpsertWriter { struct UnpartitionDeltaWriter { table: Table, - writer: icelake::io::file_writer::EqualityDeltaWriter, + writer: icelake::io::file_writer::EqualityDeltaWriter, unique_column_ids: Vec, } @@ -646,7 +655,10 @@ impl UnpartitionDeltaWriter { struct PartitionDeltaWriter { table: Table, - writers: HashMap, + writers: HashMap< + icelake::types::PartitionKey, + icelake::io::file_writer::EqualityDeltaWriter, + >, partition_splitter: icelake::types::PartitionSplitter, unique_column_ids: Vec, } @@ -730,10 +742,8 @@ struct WriteResult { delete_files: Vec, } -impl<'a> TryFrom<&'a SinkMetadata> for WriteResult { - type Error = SinkError; - - fn try_from(value: &'a SinkMetadata) -> std::result::Result { +impl WriteResult { + fn try_from(value: &SinkMetadata, partition_type: &Any) -> Result { if let Some(Serialized(v)) = &value.metadata { let mut values = if let serde_json::Value::Object(v) = serde_json::from_slice::(&v.metadata).map_err( @@ -752,7 +762,7 @@ impl<'a> TryFrom<&'a SinkMetadata> for WriteResult { { data_files = values .into_iter() - .map(data_file_from_json) + .map(|value| data_file_from_json(value, partition_type.clone())) .collect::, icelake::Error>>() .unwrap(); } else { @@ -764,7 +774,7 @@ impl<'a> TryFrom<&'a SinkMetadata> for WriteResult { { delete_files = values .into_iter() - .map(data_file_from_json) + .map(|value| data_file_from_json(value, partition_type.clone())) .collect::, icelake::Error>>() .map_err(|e| anyhow!("Failed to parse data file from json: {}", e))?; } else { @@ -822,6 +832,7 @@ impl<'a> TryFrom<&'a WriteResult> for SinkMetadata { pub struct IcebergSinkCommitter { table: Table, + partition_type: Any, } #[async_trait::async_trait] @@ -836,7 +847,7 @@ impl SinkCommitCoordinator for IcebergSinkCommitter { let write_results = metadata .iter() - .map(WriteResult::try_from) + .map(|meta| WriteResult::try_from(meta, &self.partition_type)) .collect::>>()?; let mut txn = Transaction::new(&mut self.table);