From 732e5826e68711f31048d7c250bf7739e4030c1f Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Sun, 21 Jan 2024 19:47:45 +0800 Subject: [PATCH] fix: Add check for iceberg test. (#14683) --- e2e_test/iceberg/start_spark_connect_server.sh | 3 ++- e2e_test/iceberg/test_case/cdc/load.slt | 12 +++--------- e2e_test/iceberg/test_case/cdc/mysql_cdc.sql | 6 +++--- .../iceberg/test_case/cdc/mysql_cdc_insert.sql | 2 +- .../iceberg/test_case/iceberg_sink_append_only.slt | 4 ++++ e2e_test/iceberg/test_case/iceberg_sink_upsert.slt | 14 ++++++++++++++ e2e_test/sink/iceberg_sink.slt | 4 ++++ src/connector/src/sink/iceberg/mod.rs | 9 +++++---- 8 files changed, 36 insertions(+), 18 deletions(-) diff --git a/e2e_test/iceberg/start_spark_connect_server.sh b/e2e_test/iceberg/start_spark_connect_server.sh index fb6a37e8135f3..cf3bff1e3991e 100755 --- a/e2e_test/iceberg/start_spark_connect_server.sh +++ b/e2e_test/iceberg/start_spark_connect_server.sh @@ -15,11 +15,12 @@ tar -xzf $SPARK_FILE --no-same-owner ./spark-${SPARK_VERSION}-bin-hadoop3/sbin/start-connect-server.sh --packages $PACKAGES \ --master local[3] \ --conf spark.driver.bindAddress=0.0.0.0 \ - --conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ + --conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.demo.type=hadoop \ --conf spark.sql.catalog.demo.warehouse=s3a://icebergdata/demo \ --conf spark.sql.catalog.demo.hadoop.fs.s3a.endpoint=http://127.0.0.1:9301 \ + --conf spark.sql.catalog.demo.hadoop.fs.s3a.path.style.access=true \ --conf spark.sql.catalog.demo.hadoop.fs.s3a.access.key=hummockadmin \ --conf spark.sql.catalog.demo.hadoop.fs.s3a.secret.key=hummockadmin \ --conf spark.sql.defaultCatalog=demo diff --git a/e2e_test/iceberg/test_case/cdc/load.slt b/e2e_test/iceberg/test_case/cdc/load.slt index 8fa14471ceeaf..12abdd283397d 100644 --- a/e2e_test/iceberg/test_case/cdc/load.slt +++ b/e2e_test/iceberg/test_case/cdc/load.slt @@ -7,8 +7,8 @@ create source mysql_mydb with ( port = '3306', username = 'root', password = '123456', - database.name = 'my@db', - server.id = '2' + database.name = 'mydb', + server.id = '5085' ); statement ok @@ -16,7 +16,7 @@ create table products ( id INT, name STRING, description STRING, PRIMARY KEY (id) -) FROM mysql_mydb TABLE 'my@db.products'; +) FROM mysql_mydb TABLE 'mydb.products'; statement ok @@ -35,15 +35,9 @@ CREATE SINK s1 AS select * from products WITH ( primary_key = 'id' ); -statement ok -flush; - sleep 20s query I select count(*) from products; ---- 8 - -statement ok -flush; diff --git a/e2e_test/iceberg/test_case/cdc/mysql_cdc.sql b/e2e_test/iceberg/test_case/cdc/mysql_cdc.sql index b7b6f13af83cf..f95c6c2c8d4a8 100644 --- a/e2e_test/iceberg/test_case/cdc/mysql_cdc.sql +++ b/e2e_test/iceberg/test_case/cdc/mysql_cdc.sql @@ -1,7 +1,7 @@ -DROP DATABASE IF EXISTS `my@db`; -CREATE DATABASE `my@db`; +DROP DATABASE IF EXISTS `mydb`; +CREATE DATABASE `mydb`; -USE `my@db`; +USE `mydb`; CREATE TABLE products ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, diff --git a/e2e_test/iceberg/test_case/cdc/mysql_cdc_insert.sql b/e2e_test/iceberg/test_case/cdc/mysql_cdc_insert.sql index 641d6220ea8dc..c7dc50316d3cb 100644 --- a/e2e_test/iceberg/test_case/cdc/mysql_cdc_insert.sql +++ b/e2e_test/iceberg/test_case/cdc/mysql_cdc_insert.sql @@ -1,4 +1,4 @@ -USE `my@db`; +USE `mydb`; INSERT INTO products VALUES (default,"109","109"), (default,"110","110"), diff --git a/e2e_test/iceberg/test_case/iceberg_sink_append_only.slt b/e2e_test/iceberg/test_case/iceberg_sink_append_only.slt index dff6737057363..f3156a9b40ca5 100644 --- a/e2e_test/iceberg/test_case/iceberg_sink_append_only.slt +++ b/e2e_test/iceberg/test_case/iceberg_sink_append_only.slt @@ -43,6 +43,8 @@ INSERT INTO t6 VALUES statement ok FLUSH; +sleep 5s + statement ok INSERT INTO t6 VALUES (5, 5, 5000, 5.5, 5.55, '5-5', true, '2022-03-15', '2022-03-15 05:00:00Z'::timestamptz, '2022-03-15 05:00:00'); @@ -50,6 +52,8 @@ INSERT INTO t6 VALUES statement ok FLUSH; +sleep 5s + statement ok DROP SINK s6; diff --git a/e2e_test/iceberg/test_case/iceberg_sink_upsert.slt b/e2e_test/iceberg/test_case/iceberg_sink_upsert.slt index 2e8ce54e1c742..f867f0d746450 100644 --- a/e2e_test/iceberg/test_case/iceberg_sink_upsert.slt +++ b/e2e_test/iceberg/test_case/iceberg_sink_upsert.slt @@ -29,12 +29,26 @@ INSERT INTO t6 VALUES (1, 1, 2, '1-2'), (1, 2, 2, '2-2'), (1, 3, 2, '3-2'), (1, statement ok FLUSH; +sleep 5s + statement ok INSERT INTO t6 VALUES (1, 1, 50, '1-50'); statement ok FLUSH; +sleep 10s + +query I +select count(*) from t6; +---- +7 + +statement ok +FLUSH; + +sleep 10s + statement ok DROP SINK s6; diff --git a/e2e_test/sink/iceberg_sink.slt b/e2e_test/sink/iceberg_sink.slt index dbc3163b70585..4935032e88285 100644 --- a/e2e_test/sink/iceberg_sink.slt +++ b/e2e_test/sink/iceberg_sink.slt @@ -25,12 +25,16 @@ INSERT INTO t6 VALUES (1, 2, '1-2'), (2, 2, '2-2'), (3, 2, '3-2'), (5, 2, '5-2') statement ok FLUSH; +sleep 5s + statement ok INSERT INTO t6 VALUES (1, 50, '1-50'); statement ok FLUSH; +sleep 5s + statement ok DROP SINK s6; diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 4412388314cf5..856c8deea0f40 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -832,11 +832,12 @@ impl SinkCommitCoordinator for IcebergSinkCommitter { txn.append_data_file(s.data_files); txn.append_delete_file(s.delete_files); }); - txn.commit() - .await - .map_err(|err| SinkError::Iceberg(anyhow!(err)))?; + txn.commit().await.map_err(|err| { + tracing::error!(?err, "Failed to commit iceberg table"); + SinkError::Iceberg(anyhow!(err)) + })?; - tracing::info!("Succeeded to commit ti iceberg table in epoch {epoch}."); + tracing::info!("Succeeded to commit to iceberg table in epoch {epoch}."); Ok(()) } }