diff --git a/Cargo.lock b/Cargo.lock index a81b66b8c154..0dc779d60bed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -346,6 +346,21 @@ dependencies = [ "flatbuffers", ] +[[package]] +name = "arrow-ord" +version = "46.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "584325c91293abbca7aaaabf8da9fe303245d641f5f4a18a6058dc68009c7ebf" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "half 2.3.1", + "num", +] + [[package]] name = "arrow-row" version = "46.0.0" @@ -3834,7 +3849,7 @@ dependencies = [ [[package]] name = "icelake" version = "0.0.10" -source = "git+https://github.com/icelake-io/icelake?rev=166a36b1a40a64086db09a0e0f2ed6791cec548b#166a36b1a40a64086db09a0e0f2ed6791cec548b" +source = "git+https://github.com/icelake-io/icelake?rev=72a65aed6ed7b3d529b311031c2c8d99650990e2#72a65aed6ed7b3d529b311031c2c8d99650990e2" dependencies = [ "anyhow", "apache-avro 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3842,6 +3857,7 @@ dependencies = [ "arrow-array", "arrow-buffer", "arrow-cast", + "arrow-ord", "arrow-row", "arrow-schema", "arrow-select", @@ -3857,7 +3873,7 @@ dependencies = [ "log", "murmur3", "once_cell", - "opendal", + "opendal 0.40.0", "ordered-float 3.9.1", "parquet", "regex", @@ -5246,6 +5262,39 @@ dependencies = [ "uuid", ] +[[package]] +name = "opendal" +version = "0.40.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddba7299bab261d3ae2f37617fb7f45b19ed872752bb4e22cf93a69d979366c5" +dependencies = [ + "anyhow", + "async-compat", + "async-trait", + "backon", + "base64 0.21.3", + "bytes", + "chrono", + "flagset", + "futures", + "http", + "hyper", + "log", + "md-5", + "once_cell", + "parking_lot 0.12.1", + "percent-encoding", + "pin-project", + "quick-xml 0.29.0", + "reqsign", + "reqwest", + "serde", + "serde_json", + "sha2", + "tokio", + "uuid", +] + [[package]] name = "openidconnect" version = "2.5.1" @@ -6895,6 +6944,7 @@ dependencies = [ "arc-swap", "arrow-array", "arrow-buffer", + "arrow-cast", "arrow-schema", "async-trait", "auto_enums", @@ -7152,7 +7202,7 @@ dependencies = [ "nexmark", "nkeys", "num-bigint", - "opendal", + "opendal 0.39.0", "parking_lot 0.12.1", "paste", "prometheus", @@ -7578,7 +7628,7 @@ dependencies = [ "itertools 0.11.0", "madsim-aws-sdk-s3", "madsim-tokio", - "opendal", + "opendal 0.39.0", "prometheus", "risingwave_common", "spin 0.9.8", diff --git a/Cargo.toml b/Cargo.toml index 6962c92321d8..e82fb033c48d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -107,12 +107,14 @@ hashbrown = { version = "0.14.0", features = [ criterion = { version = "0.5", features = ["async_futures"] } tonic = { package = "madsim-tonic", version = "0.3.1" } tonic-build = { package = "madsim-tonic-build", version = "0.3.1" } -icelake = { git = "https://github.com/icelake-io/icelake", rev = "166a36b1a40a64086db09a0e0f2ed6791cec548b" } +icelake = { git = "https://github.com/icelake-io/icelake", rev = "72a65aed6ed7b3d529b311031c2c8d99650990e2" } arrow-array = "46" +arrow-cast = "46" arrow-schema = "46" arrow-buffer = "46" arrow-flight = "46" arrow-select = "46" +arrow-ord = "46" tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", features = [ "profiling", "stats", diff --git a/ci/scripts/e2e-iceberg-sink-v2-test.sh b/ci/scripts/e2e-iceberg-sink-v2-test.sh index 83c0d187d6b3..c365e417319c 100755 --- a/ci/scripts/e2e-iceberg-sink-v2-test.sh +++ b/ci/scripts/e2e-iceberg-sink-v2-test.sh @@ -38,7 +38,8 @@ bash ./start_spark_connect_server.sh # Don't remove the `--quiet` option since poetry has a bug when printing output, see # https://github.com/python-poetry/poetry/issues/3412 "$HOME"/.local/bin/poetry update --quiet -"$HOME"/.local/bin/poetry run python main.py +"$HOME"/.local/bin/poetry run python main.py -t ./test_case/no_partition_append_only.toml +"$HOME"/.local/bin/poetry run python main.py -t ./test_case/no_partition_upsert.toml echo "--- Kill cluster" diff --git a/e2e_test/iceberg/config.ini b/e2e_test/iceberg/config.ini index 6fa1ffbdc683..bd95eddc5b80 100644 --- a/e2e_test/iceberg/config.ini +++ b/e2e_test/iceberg/config.ini @@ -1,6 +1,3 @@ -[default] -result = data.csv - [spark] url=sc://localhost:15002 diff --git a/e2e_test/iceberg/data.csv b/e2e_test/iceberg/data.csv deleted file mode 100644 index 77ad8f16dbc9..000000000000 --- a/e2e_test/iceberg/data.csv +++ /dev/null @@ -1,5 +0,0 @@ -1,1,1000,1.1,1.11,1-1,true,2022-03-11,2022-03-11 01:00:00+00:00,2022-03-11 01:00:00 -2,2,2000,2.2,2.22,2-2,false,2022-03-12,2022-03-12 02:00:00+00:00,2022-03-12 02:00:00 -3,3,3000,3.3,3.33,3-3,true,2022-03-13,2022-03-13 03:00:00+00:00,2022-03-13 03:00:00 -4,4,4000,4.4,4.44,4-4,false,2022-03-14,2022-03-14 04:00:00+00:00,2022-03-14 04:00:00 -5,5,5000,5.5,5.55,5-5,true,2022-03-15,2022-03-15 05:00:00+00:00,2022-03-15 05:00:00 diff --git a/e2e_test/iceberg/main.py b/e2e_test/iceberg/main.py index 304962ef08cc..fa07aa367a9b 100644 --- a/e2e_test/iceberg/main.py +++ b/e2e_test/iceberg/main.py @@ -1,9 +1,11 @@ from pyspark.sql import SparkSession +import argparse import configparser import subprocess import csv import unittest import time +import tomli as toml from datetime import date from datetime import datetime from datetime import timezone @@ -23,25 +25,6 @@ def strtots(v): g_spark = None -init_table_sqls = [ - "CREATE SCHEMA IF NOT EXISTS demo_db", - "DROP TABLE IF EXISTS demo_db.demo_table", - """ - CREATE TABLE demo_db.demo_table ( - id long, - v_int int, - v_long long, - v_float float, - v_double double, - v_varchar string, - v_bool boolean, - v_date date, - v_timestamp timestamp, - v_ts_ntz timestamp_ntz - ) TBLPROPERTIES ('format-version'='2'); - """, -] - def get_spark(args): spark_config = args['spark'] @@ -52,54 +35,81 @@ def get_spark(args): return g_spark -def init_iceberg_table(args): +def init_iceberg_table(args,init_sqls): spark = get_spark(args) - for sql in init_table_sqls: + for sql in init_sqls: print(f"Executing sql: {sql}") spark.sql(sql) -def init_risingwave_mv(args): +def init_risingwave_mv(args,slt): rw_config = args['risingwave'] - cmd = f"sqllogictest -p {rw_config['port']} -d {rw_config['db']} iceberg_sink_v2.slt" + cmd = f"sqllogictest -p {rw_config['port']} -d {rw_config['db']} {slt}" print(f"Command line is [{cmd}]") subprocess.run(cmd, shell=True, check=True) - time.sleep(60) + time.sleep(10) -def verify_result(args): - sql = "SELECT * FROM demo_db.demo_table ORDER BY id ASC" +def verify_result(args,verify_sql,verify_schema,verify_data): tc = unittest.TestCase() - print(f"Executing sql: {sql}") + print(f"Executing sql: {verify_sql}") spark = get_spark(args) - df = spark.sql(sql).collect() + df = spark.sql(verify_sql).collect() for row in df: print(row) - - with open(args['default']['result'], newline='') as csv_file: - csv_result = list(csv.reader(csv_file)) - for (row1, row2) in zip(df, csv_result): - print(f"Row1: {row1}, row 2: {row2}") - tc.assertEqual(row1[0], int(row2[0])) - tc.assertEqual(row1[1], int(row2[1])) - tc.assertEqual(row1[2], int(row2[2])) - tc.assertEqual(round(row1[3], 5), round(float(row2[3]), 5)) - tc.assertEqual(round(row1[4], 5), round(float(row2[4]), 5)) - tc.assertEqual(row1[5], row2[5]) - tc.assertEqual(row1[6], strtobool(row2[6])) - tc.assertEqual(row1[7], strtodate(row2[7])) - tc.assertEqual(row1[8].astimezone(timezone.utc).replace(tzinfo=None), strtots(row2[8])) - tc.assertEqual(row1[9], datetime.fromisoformat(row2[9])) - - tc.assertEqual(len(df), len(csv_result)) - + rows = verify_data.splitlines() + tc.assertEqual(len(df), len(rows)) + for (row1, row2) in zip(df, rows): + print(f"Row1: {row1}, Row 2: {row2}") + row2 = row2.split(',') + for idx, ty in enumerate(verify_schema): + if ty == "int" or ty == "long": + tc.assertEqual(row1[idx], int(row2[idx])) + elif ty == "float" or ty == "double": + tc.assertEqual(round(row1[idx], 5), round(float(row2[idx]), 5)) + elif ty == "boolean": + tc.assertEqual(row1[idx], strtobool(row2[idx])) + elif ty == "date": + tc.assertEqual(row1[idx], strtodate(row2[idx])) + elif ty == "timestamp": + tc.assertEqual(row1[idx].astimezone(timezone.utc).replace(tzinfo=None), strtots(row2[idx])) + elif ty == "timestamp_ntz": + tc.assertEqual(row1[idx], datetime.fromisoformat(row2[idx])) + elif ty == "string": + tc.assertEqual(row1[idx], row2[idx]) + else: + tc.fail(f"Unsupported type {ty}") + +def drop_table(args,drop_sqls): + spark = get_spark(args) + for sql in drop_sqls: + print(f"Executing sql: {sql}") + spark.sql(sql) if __name__ == "__main__": - config = configparser.ConfigParser() - config.read("config.ini") - print({section: dict(config[section]) for section in config.sections()}) - init_iceberg_table(config) - init_risingwave_mv(config) - verify_result(config) + parser = argparse.ArgumentParser(description="Test script for iceberg") + parser.add_argument("-t", dest="test_case", type=str, help="Test case file") + with open(parser.parse_args().test_case,"rb") as test_case: + test_case = toml.load(test_case) + # Extract content from testcase + init_sqls = test_case['init_sqls'] + print(f"init_sqls:{init_sqls}") + slt = test_case['slt'] + print(f"slt:{slt}") + verify_schema = test_case['verify_schema'] + print(f"verify_schema:{verify_schema}") + verify_sql = test_case['verify_sql'] + print(f"verify_sql:{verify_sql}") + verify_data = test_case['verify_data'] + drop_sqls = test_case['drop_sqls'] + + config = configparser.ConfigParser() + config.read("config.ini") + print({section: dict(config[section]) for section in config.sections()}) + + init_iceberg_table(config,init_sqls) + init_risingwave_mv(config,slt) + verify_result(config,verify_sql,verify_schema,verify_data) + drop_table(config,drop_sqls) diff --git a/e2e_test/iceberg/pyproject.toml b/e2e_test/iceberg/pyproject.toml index 7b19ed7b044f..d13be7227759 100644 --- a/e2e_test/iceberg/pyproject.toml +++ b/e2e_test/iceberg/pyproject.toml @@ -7,7 +7,7 @@ authors = ["risingwavelabs"] [tool.poetry.dependencies] python = "^3.10" pyspark = { version = "3.4.1", extras = ["sql", "connect"] } - +tomli = "2.0" [build-system] diff --git a/e2e_test/iceberg/iceberg_sink_v2.slt b/e2e_test/iceberg/test_case/iceberg_sink_append_only.slt similarity index 100% rename from e2e_test/iceberg/iceberg_sink_v2.slt rename to e2e_test/iceberg/test_case/iceberg_sink_append_only.slt diff --git a/e2e_test/iceberg/test_case/iceberg_sink_upsert.slt b/e2e_test/iceberg/test_case/iceberg_sink_upsert.slt new file mode 100644 index 000000000000..646a39cc08e2 --- /dev/null +++ b/e2e_test/iceberg/test_case/iceberg_sink_upsert.slt @@ -0,0 +1,45 @@ +statement ok +set streaming_parallelism=4; + +statement ok +CREATE TABLE t6 (id int, v1 int primary key, v2 bigint, v3 varchar); + +statement ok +CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6; + +statement ok +CREATE SINK s6 AS select mv6.id as id, mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH ( + connector = 'iceberg', + type = 'upsert', + force_append_only = 'false', + database.name = 'demo', + table.name = 'demo_db.demo_table', + catalog.type = 'storage', + warehouse.path = 's3://icebergdata/demo', + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin', + primary_key = 'v1' +); + +statement ok +INSERT INTO t6 VALUES (1, 1, 2, '1-2'), (1, 2, 2, '2-2'), (1, 3, 2, '3-2'), (1, 5, 2, '5-2'), (1, 8, 2, '8-2'), (1, 13, 2, '13-2'), (1, 21, 2, '21-2'); + +statement ok +FLUSH; + +statement ok +INSERT INTO t6 VALUES (1, 1, 50, '1-50'); + +statement ok +FLUSH; + +statement ok +DROP SINK s6; + +statement ok +DROP MATERIALIZED VIEW mv6; + +statement ok +DROP TABLE t6; diff --git a/e2e_test/iceberg/test_case/no_partition_append_only.toml b/e2e_test/iceberg/test_case/no_partition_append_only.toml new file mode 100644 index 000000000000..211407644abe --- /dev/null +++ b/e2e_test/iceberg/test_case/no_partition_append_only.toml @@ -0,0 +1,38 @@ +init_sqls = [ + 'CREATE SCHEMA IF NOT EXISTS demo_db', + 'DROP TABLE IF EXISTS demo_db.demo_table', + ''' + CREATE TABLE demo_db.demo_table ( + id long, + v_int int, + v_long long, + v_float float, + v_double double, + v_varchar string, + v_bool boolean, + v_date date, + v_timestamp timestamp, + v_ts_ntz timestamp_ntz + ) TBLPROPERTIES ('format-version'='2'); + ''' +] + +slt = 'test_case/iceberg_sink_append_only.slt' + +verify_schema = ['long', 'int', 'long', 'float', 'double', 'string', 'boolean', 'date', 'timestamp', 'timestamp_ntz'] + +verify_sql = 'SELECT * FROM demo_db.demo_table ORDER BY id ASC' + + +verify_data = """ +1,1,1000,1.1,1.11,1-1,true,2022-03-11,2022-03-11 01:00:00+00:00,2022-03-11 01:00:00 +2,2,2000,2.2,2.22,2-2,false,2022-03-12,2022-03-12 02:00:00+00:00,2022-03-12 02:00:00 +3,3,3000,3.3,3.33,3-3,true,2022-03-13,2022-03-13 03:00:00+00:00,2022-03-13 03:00:00 +4,4,4000,4.4,4.44,4-4,false,2022-03-14,2022-03-14 04:00:00+00:00,2022-03-14 04:00:00 +5,5,5000,5.5,5.55,5-5,true,2022-03-15,2022-03-15 05:00:00+00:00,2022-03-15 05:00:00 +""" + +drop_sqls = [ + 'DROP TABLE IF EXISTS demo_db.demo_table', + 'DROP SCHEMA IF EXISTS demo_db' +] diff --git a/e2e_test/iceberg/test_case/no_partition_upsert.toml b/e2e_test/iceberg/test_case/no_partition_upsert.toml new file mode 100644 index 000000000000..0e0215d37465 --- /dev/null +++ b/e2e_test/iceberg/test_case/no_partition_upsert.toml @@ -0,0 +1,34 @@ +init_sqls = [ + 'CREATE SCHEMA IF NOT EXISTS demo_db', + 'DROP TABLE IF EXISTS demo_db.demo_table', + ''' + CREATE TABLE demo_db.demo_table ( + id int, + v1 int, + v2 long, + v3 string + ) USING iceberg + TBLPROPERTIES ('format-version'='2'); + ''' +] + +slt = 'test_case/iceberg_sink_upsert.slt' + +verify_schema = ['int','int','long','string'] + +verify_sql = 'SELECT * FROM demo_db.demo_table ORDER BY id, v1 ASC' + +verify_data = """ +1,1,50,1-50 +1,2,2,2-2 +1,3,2,3-2 +1,5,2,5-2 +1,8,2,8-2 +1,13,2,13-2 +1,21,2,21-2 +""" + +drop_sqls = [ + 'DROP TABLE IF EXISTS demo_db.demo_table', + 'DROP SCHEMA IF EXISTS demo_db' +] diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml index 92992687c6c2..982f317cc97a 100644 --- a/src/common/Cargo.toml +++ b/src/common/Cargo.toml @@ -18,6 +18,7 @@ anyhow = "1" arc-swap = "1" arrow-array = { workspace = true } arrow-buffer = { workspace = true } +arrow-cast = { workspace = true } arrow-schema = { workspace = true } async-trait = "0.1" auto_enums = "0.8" diff --git a/src/common/src/array/arrow.rs b/src/common/src/array/arrow.rs index 00d418af0010..36336ee5f819 100644 --- a/src/common/src/array/arrow.rs +++ b/src/common/src/array/arrow.rs @@ -17,13 +17,44 @@ use std::fmt::Write; use arrow_array::Array as ArrowArray; -use arrow_schema::{Field, Schema, DECIMAL256_MAX_PRECISION}; +use arrow_cast::cast; +use arrow_schema::{Field, Schema, SchemaRef, DECIMAL256_MAX_PRECISION}; use chrono::{NaiveDateTime, NaiveTime}; use itertools::Itertools; use super::*; use crate::types::{Int256, StructType}; -use crate::util::iter_util::ZipEqDebug; +use crate::util::iter_util::{ZipEqDebug, ZipEqFast}; + +/// Converts RisingWave array to Arrow array with the schema. +/// This function will try to convert the array if the type is not same with the schema. +pub fn to_record_batch_with_schema( + schema: SchemaRef, + chunk: &DataChunk, +) -> Result { + if !chunk.is_compacted() { + let c = chunk.clone(); + return to_record_batch_with_schema(schema, &c.compact()); + } + let columns: Vec<_> = chunk + .columns() + .iter() + .zip_eq_fast(schema.fields().iter()) + .map(|(column, field)| { + let column: arrow_array::ArrayRef = column.as_ref().try_into()?; + if column.data_type() == field.data_type() { + Ok(column) + } else { + cast(&column, field.data_type()) + .map_err(|err| ArrayError::FromArrow(err.to_string())) + } + }) + .try_collect::<_, _, ArrayError>()?; + + let opts = arrow_array::RecordBatchOptions::default().with_row_count(Some(chunk.capacity())); + arrow_array::RecordBatch::try_new_with_options(schema, columns, &opts) + .map_err(|err| ArrayError::ToArrow(err.to_string())) +} // Implement bi-directional `From` between `DataChunk` and `arrow_array::RecordBatch`. impl TryFrom<&DataChunk> for arrow_array::RecordBatch { diff --git a/src/common/src/array/mod.rs b/src/common/src/array/mod.rs index 2c6e72b50031..2a505cddfc5c 100644 --- a/src/common/src/array/mod.rs +++ b/src/common/src/array/mod.rs @@ -15,6 +15,7 @@ //! `Array` defines all in-memory representations of vectorized execution framework. mod arrow; +pub use arrow::to_record_batch_with_schema; mod bool_array; pub mod bytes_array; mod chrono_array; diff --git a/src/connector/src/sink/iceberg.rs b/src/connector/src/sink/iceberg.rs index 24249531ffbf..dd2b7e793b63 100644 --- a/src/connector/src/sink/iceberg.rs +++ b/src/connector/src/sink/iceberg.rs @@ -12,18 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::hash_map::Entry; use std::collections::HashMap; use std::fmt::Debug; +use std::sync::Arc; use anyhow::anyhow; use arrow_array::RecordBatch; -use arrow_schema::{DataType as ArrowDataType, Schema as ArrowSchema}; +use arrow_schema::{DataType as ArrowDataType, Schema as ArrowSchema, SchemaRef}; use async_trait::async_trait; use icelake::catalog::{load_catalog, CATALOG_NAME, CATALOG_TYPE}; +use icelake::io::file_writer::DeltaWriterResult; use icelake::transaction::Transaction; use icelake::types::{data_file_from_json, data_file_to_json, DataFile}; use icelake::{Table, TableIdentifier}; -use risingwave_common::array::{Op, StreamChunk}; +use risingwave_common::array::{to_record_batch_with_schema, Op, StreamChunk}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; use risingwave_common::error::anyhow_error; @@ -31,8 +34,8 @@ use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized; use risingwave_pb::connector_service::sink_metadata::SerializedMetadata; use risingwave_pb::connector_service::SinkMetadata; use risingwave_rpc_client::ConnectorClient; +use serde::de; use serde_derive::Deserialize; -use serde_json::Value; use url::Url; use super::{ @@ -96,6 +99,28 @@ pub struct IcebergConfig { #[serde(rename = "s3.secret.key")] pub secret_key: String, + + #[serde( + rename = "primary_key", + default, + deserialize_with = "deserialize_string_seq_from_string" + )] + pub primary_key: Option>, +} +pub(crate) fn deserialize_string_seq_from_string<'de, D>( + deserializer: D, +) -> std::result::Result>, D::Error> +where + D: de::Deserializer<'de>, +{ + let s: Option = de::Deserialize::deserialize(deserializer)?; + if let Some(s) = s { + let s = s.to_ascii_lowercase(); + let s = s.split(',').map(|s| s.trim().to_owned()).collect(); + Ok(Some(s)) + } else { + Ok(None) + } } impl IcebergConfig { @@ -112,6 +137,22 @@ impl IcebergConfig { ))); } + if config.r#type == SINK_TYPE_UPSERT { + if let Some(primary_key) = &config.primary_key { + if primary_key.is_empty() { + return Err(SinkError::Config(anyhow!( + "Primary_key must not be empty in {}", + SINK_TYPE_UPSERT + ))); + } + } else { + return Err(SinkError::Config(anyhow!( + "Must set primary_key in {}", + SINK_TYPE_UPSERT + ))); + } + } + Ok(config) } @@ -188,6 +229,12 @@ impl IcebergConfig { iceberg_configs.insert("iceberg.table.io.bucket".to_string(), bucket); iceberg_configs.insert("iceberg.table.io.root".to_string(), root); + // #TODO + // Support load config file + iceberg_configs.insert( + "iceberg.table.io.disable_config_load".to_string(), + "true".to_string(), + ); Ok(iceberg_configs) } @@ -196,6 +243,8 @@ impl IcebergConfig { pub struct IcebergSink { config: IcebergConfig, param: SinkParam, + // In upsert mode, it never be None and empty. + unique_column_ids: Option>, } impl TryFrom for IcebergSink { @@ -244,14 +293,36 @@ impl IcebergSink { } pub fn new(config: IcebergConfig, param: SinkParam) -> Result { - // TODO(ZENOTME): Only support append-only mode now. - if !config.force_append_only { - return Err(SinkError::Iceberg(anyhow!( - "Iceberg sink only support append-only mode now." - ))); - } - - Ok(Self { config, param }) + let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only { + if let Some(pk) = &config.primary_key { + let mut unique_column_ids = Vec::with_capacity(pk.len()); + for col_name in pk { + let id = param + .columns + .iter() + .find(|col| col.name.as_str() == col_name) + .ok_or_else(|| { + SinkError::Config(anyhow!( + "Primary key column {} not found in sink schema", + col_name + )) + })? + .column_id + .get_id() as usize; + unique_column_ids.push(id); + } + Some(unique_column_ids) + } else { + unreachable!() + } + } else { + None + }; + Ok(Self { + config, + param, + unique_column_ids, + }) } } @@ -268,14 +339,10 @@ impl Sink for IcebergSink { async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { let table = self.create_table().await?; - - let inner = IcebergWriter { - is_append_only: self.config.force_append_only, - writer: table - .task_writer() - .await - .map_err(|err| SinkError::Iceberg(anyhow!(err)))?, - table, + let inner = if let Some(unique_column_ids) = &self.unique_column_ids { + IcebergWriter::new_upsert(table, unique_column_ids.clone()).await? + } else { + IcebergWriter::new_append_only(table).await? }; Ok(CoordinatedSinkWriter::new( writer_param @@ -305,22 +372,105 @@ impl Sink for IcebergSink { } } -/// TODO(ZENOTME): Just a placeholder, we will implement it later.(#10642) -pub struct IcebergWriter { - is_append_only: bool, +pub struct IcebergWriter(IcebergWriterEnum); + +enum IcebergWriterEnum { + AppendOnly(AppendOnlyWriter), + Upsert(UpsertWriter), +} + +impl IcebergWriter { + pub async fn new_append_only(table: Table) -> Result { + Ok(Self(IcebergWriterEnum::AppendOnly( + AppendOnlyWriter::new(table).await?, + ))) + } + + pub async fn new_upsert(table: Table, unique_column_ids: Vec) -> Result { + Ok(Self(IcebergWriterEnum::Upsert( + UpsertWriter::new(table, unique_column_ids).await?, + ))) + } +} + +#[async_trait] +impl SinkWriter for IcebergWriter { + type CommitMetadata = Option; + + /// Begin a new epoch + async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> { + // Just skip it. + Ok(()) + } + + /// Write a stream chunk to sink + async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { + match &mut self.0 { + IcebergWriterEnum::AppendOnly(writer) => writer.write(chunk).await?, + IcebergWriterEnum::Upsert(writer) => writer.write(chunk).await?, + } + Ok(()) + } + + /// Receive a barrier and mark the end of current epoch. When `is_checkpoint` is true, the sink + /// writer should commit the current epoch. + async fn barrier(&mut self, is_checkpoint: bool) -> Result> { + // Skip it if not checkpoint + if !is_checkpoint { + return Ok(None); + } + + let res = match &mut self.0 { + IcebergWriterEnum::AppendOnly(writer) => writer.flush().await?, + IcebergWriterEnum::Upsert(writer) => writer.flush().await?, + }; + + Ok(Some(SinkMetadata::try_from(&res)?)) + } + + /// Clean up + async fn abort(&mut self) -> Result<()> { + // TODO: abort should clean up all the data written in this epoch. + Ok(()) + } +} + +struct AppendOnlyWriter { table: Table, writer: icelake::io::task_writer::TaskWriter, + schema: SchemaRef, } -impl IcebergWriter { - async fn append_only_write(&mut self, chunk: StreamChunk) -> Result<()> { +impl AppendOnlyWriter { + pub async fn new(table: Table) -> Result { + let schema = Arc::new( + table + .current_table_metadata() + .current_schema() + .map_err(|err| SinkError::Iceberg(anyhow!(err)))? + .clone() + .try_into() + .map_err(|err: icelake::Error| SinkError::Iceberg(anyhow!(err)))?, + ); + + Ok(Self { + writer: table + .task_writer() + .await + .map_err(|err| SinkError::Iceberg(anyhow!(err)))?, + table, + schema, + }) + } + + pub async fn write(&mut self, chunk: StreamChunk) -> Result<()> { let (mut chunk, ops) = chunk.into_parts(); let filters = chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::(); chunk.set_visibility(filters); - let chunk = RecordBatch::try_from(&chunk.compact()) + let chunk = to_record_batch_with_schema(self.schema.clone(), &chunk.compact()) .map_err(|err| SinkError::Iceberg(anyhow!(err)))?; self.writer.write(&chunk).await.map_err(|err| { @@ -329,11 +479,255 @@ impl IcebergWriter { Ok(()) } + + pub async fn flush(&mut self) -> Result { + let old_writer = std::mem::replace( + &mut self.writer, + self.table + .task_writer() + .await + .map_err(|err| SinkError::Iceberg(anyhow!(err)))?, + ); + + let data_files = old_writer + .close() + .await + .map_err(|err| SinkError::Iceberg(anyhow!("Close writer fail: {}", err)))?; + + Ok(WriteResult { + data_files, + delete_files: vec![], + }) + } } +struct UpsertWriter { + writer: UpsertWriterInner, + schema: SchemaRef, +} +enum UpsertWriterInner { + Partition(PartitionDeltaWriter), + Unpartition(UnpartitionDeltaWriter), +} + +impl UpsertWriter { + pub async fn new(table: Table, unique_column_ids: Vec) -> Result { + let schema = Arc::new( + table + .current_table_metadata() + .current_schema() + .map_err(|err| SinkError::Iceberg(anyhow!(err)))? + .clone() + .try_into() + .map_err(|err: icelake::Error| SinkError::Iceberg(anyhow!(err)))?, + ); + let inner = if let Some(partition_splitter) = table.partition_splitter()? { + UpsertWriterInner::Partition(PartitionDeltaWriter::new( + table, + partition_splitter, + unique_column_ids, + )) + } else { + UpsertWriterInner::Unpartition( + UnpartitionDeltaWriter::new(table, unique_column_ids).await?, + ) + }; + Ok(Self { + writer: inner, + schema, + }) + } + + fn partition_ops(ops: &[Op]) -> Vec<(usize, usize)> { + let mut res = vec![]; + let mut start = 0; + let mut prev_op = ops[0]; + for (i, op) in ops.iter().enumerate().skip(1) { + if *op != prev_op { + res.push((start, i)); + start = i; + prev_op = *op; + } + } + res.push((start, ops.len())); + res + } + + pub async fn write(&mut self, chunk: StreamChunk) -> Result<()> { + let (chunk, ops) = chunk.compact().into_parts(); + let chunk = to_record_batch_with_schema(self.schema.clone(), &chunk.compact()) + .map_err(|err| SinkError::Iceberg(anyhow!(err)))?; + let ranges = Self::partition_ops(&ops); + for (start, end) in ranges { + let batch = chunk.slice(start, end - start); + match ops[start] { + Op::UpdateInsert | Op::Insert => match &mut self.writer { + UpsertWriterInner::Partition(writer) => writer.write(batch).await?, + UpsertWriterInner::Unpartition(writer) => writer.write(batch).await?, + }, + + Op::UpdateDelete | Op::Delete => match &mut self.writer { + UpsertWriterInner::Partition(writer) => writer.delete(batch).await?, + UpsertWriterInner::Unpartition(writer) => writer.delete(batch).await?, + }, + } + } + Ok(()) + } + + pub async fn flush(&mut self) -> Result { + match &mut self.writer { + UpsertWriterInner::Partition(writer) => { + let mut data_files = vec![]; + let mut delete_files = vec![]; + for res in writer.flush().await? { + data_files.extend(res.data); + delete_files.extend(res.pos_delete); + delete_files.extend(res.eq_delete); + } + Ok(WriteResult { + data_files, + delete_files, + }) + } + UpsertWriterInner::Unpartition(writer) => { + let res = writer.flush().await?; + let delete_files = res.pos_delete.into_iter().chain(res.eq_delete).collect(); + Ok(WriteResult { + data_files: res.data, + delete_files, + }) + } + } + } +} + +struct UnpartitionDeltaWriter { + table: Table, + writer: icelake::io::file_writer::EqualityDeltaWriter, + unique_column_ids: Vec, +} + +impl UnpartitionDeltaWriter { + pub async fn new(table: Table, unique_column_ids: Vec) -> Result { + Ok(Self { + writer: table + .writer_builder() + .await? + .build_equality_delta_writer(unique_column_ids.clone()) + .await?, + table, + unique_column_ids, + }) + } + + pub async fn write(&mut self, batch: RecordBatch) -> Result<()> { + self.writer.write(batch).await?; + Ok(()) + } + + pub async fn delete(&mut self, batch: RecordBatch) -> Result<()> { + self.writer.delete(batch).await?; + Ok(()) + } + + pub async fn flush(&mut self) -> Result { + let writer = std::mem::replace( + &mut self.writer, + self.table + .writer_builder() + .await? + .build_equality_delta_writer(self.unique_column_ids.clone()) + .await?, + ); + Ok(writer.close(None).await?) + } +} + +struct PartitionDeltaWriter { + table: Table, + writers: HashMap, + partition_splitter: icelake::types::PartitionSplitter, + unique_column_ids: Vec, +} + +impl PartitionDeltaWriter { + pub fn new( + table: Table, + partition_splitter: icelake::types::PartitionSplitter, + unique_column_ids: Vec, + ) -> Self { + Self { + table, + writers: HashMap::new(), + partition_splitter, + unique_column_ids, + } + } + + pub async fn write(&mut self, batch: RecordBatch) -> Result<()> { + let partitions = self.partition_splitter.split_by_partition(&batch)?; + for (partition_key, batch) in partitions { + match self.writers.entry(partition_key) { + Entry::Vacant(v) => { + v.insert( + self.table + .writer_builder() + .await? + .build_equality_delta_writer(self.unique_column_ids.clone()) + .await?, + ) + .write(batch) + .await? + } + Entry::Occupied(mut v) => v.get_mut().write(batch).await?, + } + } + Ok(()) + } + + pub async fn delete(&mut self, batch: RecordBatch) -> Result<()> { + let partitions = self.partition_splitter.split_by_partition(&batch)?; + for (partition_key, batch) in partitions { + match self.writers.entry(partition_key) { + Entry::Vacant(v) => { + v.insert( + self.table + .writer_builder() + .await? + .build_equality_delta_writer(self.unique_column_ids.clone()) + .await?, + ) + .delete(batch) + .await + .unwrap(); + } + Entry::Occupied(mut v) => v.get_mut().delete(batch).await.unwrap(), + } + } + Ok(()) + } + + pub async fn flush(&mut self) -> Result> { + let mut res = Vec::with_capacity(self.writers.len()); + for (partition_key, writer) in self.writers.drain() { + let partition_value = self + .partition_splitter + .convert_key_to_value(partition_key)?; + let delta_result = writer.close(Some(partition_value)).await?; + res.push(delta_result); + } + Ok(res) + } +} + +const DATA_FILES: &str = "data_files"; +const DELETE_FILES: &str = "delete_files"; + #[derive(Default, Debug)] struct WriteResult { data_files: Vec, + delete_files: Vec, } impl<'a> TryFrom<&'a SinkMetadata> for WriteResult { @@ -341,20 +735,45 @@ impl<'a> TryFrom<&'a SinkMetadata> for WriteResult { fn try_from(value: &'a SinkMetadata) -> std::result::Result { if let Some(Serialized(v)) = &value.metadata { - if let Value::Array(json_values) = + let mut values = if let serde_json::Value::Object(v) = serde_json::from_slice::(&v.metadata).map_err( |e| -> SinkError { anyhow!("Can't parse iceberg sink metadata: {}", e).into() }, - )? + )? { + v + } else { + return Err(anyhow!("iceberg sink metadata should be a object").into()); + }; + + let data_files: Vec; + let delete_files: Vec; + if let serde_json::Value::Array(values) = values + .remove(DATA_FILES) + .ok_or_else(|| anyhow!("icberg sink metadata should have data_files object"))? + { + data_files = values + .into_iter() + .map(data_file_from_json) + .collect::, icelake::Error>>() + .unwrap(); + } else { + return Err(anyhow!("icberg sink metadata should have data_files object").into()); + } + if let serde_json::Value::Array(values) = values + .remove(DELETE_FILES) + .ok_or_else(|| anyhow!("icberg sink metadata should have data_files object"))? { - let data_files = json_values + delete_files = values .into_iter() .map(data_file_from_json) .collect::, icelake::Error>>() .map_err(|e| anyhow!("Failed to parse data file from json: {}", e))?; - Ok(WriteResult { data_files }) } else { - Err(anyhow!("Serialized data files should be json array!").into()) + return Err(anyhow!("icberg sink metadata should have data_files object").into()); } + Ok(Self { + data_files, + delete_files, + }) } else { Err(anyhow!("Can't create iceberg sink write result from empty data!").into()) } @@ -365,7 +784,7 @@ impl<'a> TryFrom<&'a WriteResult> for SinkMetadata { type Error = SinkError; fn try_from(value: &'a WriteResult) -> std::result::Result { - let json_value = serde_json::Value::Array( + let json_data_files = serde_json::Value::Array( value .data_files .iter() @@ -374,6 +793,23 @@ impl<'a> TryFrom<&'a WriteResult> for SinkMetadata { .collect::, icelake::Error>>() .map_err(|e| anyhow!("Can't serialize data files to json: {}", e))?, ); + let json_delete_files = serde_json::Value::Array( + value + .delete_files + .iter() + .cloned() + .map(data_file_to_json) + .collect::, icelake::Error>>() + .map_err(|e| anyhow!("Can't serialize data files to json: {}", e))?, + ); + let json_value = serde_json::Value::Object( + vec![ + (DATA_FILES.to_string(), json_data_files), + (DELETE_FILES.to_string(), json_delete_files), + ] + .into_iter() + .collect(), + ); Ok(SinkMetadata { metadata: Some(Serialized(SerializedMetadata { metadata: serde_json::to_vec(&json_value).map_err(|e| -> SinkError { @@ -384,58 +820,6 @@ impl<'a> TryFrom<&'a WriteResult> for SinkMetadata { } } -#[async_trait] -impl SinkWriter for IcebergWriter { - type CommitMetadata = Option; - - /// Begin a new epoch - async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> { - // Just skip it. - Ok(()) - } - - /// Write a stream chunk to sink - async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { - if self.is_append_only { - self.append_only_write(chunk).await - } else { - return Err(SinkError::Iceberg(anyhow!( - "Iceberg sink only support append-only mode now." - ))); - } - } - - /// Receive a barrier and mark the end of current epoch. When `is_checkpoint` is true, the sink - /// writer should commit the current epoch. - async fn barrier(&mut self, is_checkpoint: bool) -> Result> { - // Skip it if not checkpoint - if !is_checkpoint { - return Ok(None); - } - - let old_writer = std::mem::replace( - &mut self.writer, - self.table - .task_writer() - .await - .map_err(|err| SinkError::Iceberg(anyhow!(err)))?, - ); - - let data_files = old_writer - .close() - .await - .map_err(|err| SinkError::Iceberg(anyhow!("Close writer fail: {}", err)))?; - - Ok(Some(SinkMetadata::try_from(&WriteResult { data_files })?)) - } - - /// Clean up - async fn abort(&mut self) -> Result<()> { - // TODO: abort should clean up all the data written in this epoch. - Ok(()) - } -} - pub struct IcebergSinkCommitter { table: Table, } @@ -456,11 +840,10 @@ impl SinkCommitCoordinator for IcebergSinkCommitter { .collect::>>()?; let mut txn = Transaction::new(&mut self.table); - txn.append_data_file( - write_results - .into_iter() - .flat_map(|s| s.data_files.into_iter()), - ); + write_results.into_iter().for_each(|s| { + txn.append_data_file(s.data_files); + txn.append_delete_file(s.delete_files); + }); txn.commit() .await .map_err(|err| SinkError::Iceberg(anyhow!(err)))?; diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index c67fdb8c1983..808b86fa9798 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -354,6 +354,12 @@ pub enum SinkError { Internal(anyhow::Error), } +impl From for SinkError { + fn from(value: icelake::Error) -> Self { + SinkError::Iceberg(anyhow_error!("{}", value)) + } +} + impl From for SinkError { fn from(value: RpcError) -> Self { SinkError::Remote(anyhow_error!("{}", value))