Skip to content

Commit

Permalink
fix: Add check for iceberg test. (#14683)
Browse files Browse the repository at this point in the history
  • Loading branch information
liurenjie1024 authored Jan 21, 2024
1 parent 5382afc commit 732e582
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 18 deletions.
3 changes: 2 additions & 1 deletion e2e_test/iceberg/start_spark_connect_server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ tar -xzf $SPARK_FILE --no-same-owner
./spark-${SPARK_VERSION}-bin-hadoop3/sbin/start-connect-server.sh --packages $PACKAGES \
--master local[3] \
--conf spark.driver.bindAddress=0.0.0.0 \
--conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.demo.type=hadoop \
--conf spark.sql.catalog.demo.warehouse=s3a://icebergdata/demo \
--conf spark.sql.catalog.demo.hadoop.fs.s3a.endpoint=http://127.0.0.1:9301 \
--conf spark.sql.catalog.demo.hadoop.fs.s3a.path.style.access=true \
--conf spark.sql.catalog.demo.hadoop.fs.s3a.access.key=hummockadmin \
--conf spark.sql.catalog.demo.hadoop.fs.s3a.secret.key=hummockadmin \
--conf spark.sql.defaultCatalog=demo
Expand Down
12 changes: 3 additions & 9 deletions e2e_test/iceberg/test_case/cdc/load.slt
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@ create source mysql_mydb with (
port = '3306',
username = 'root',
password = '123456',
database.name = 'my@db',
server.id = '2'
database.name = 'mydb',
server.id = '5085'
);

statement ok
create table products ( id INT,
name STRING,
description STRING,
PRIMARY KEY (id)
) FROM mysql_mydb TABLE 'my@db.products';
) FROM mysql_mydb TABLE 'mydb.products';


statement ok
Expand All @@ -35,15 +35,9 @@ CREATE SINK s1 AS select * from products WITH (
primary_key = 'id'
);

statement ok
flush;

sleep 20s

query I
select count(*) from products;
----
8

statement ok
flush;
6 changes: 3 additions & 3 deletions e2e_test/iceberg/test_case/cdc/mysql_cdc.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
DROP DATABASE IF EXISTS `my@db`;
CREATE DATABASE `my@db`;
DROP DATABASE IF EXISTS `mydb`;
CREATE DATABASE `mydb`;

USE `my@db`;
USE `mydb`;

CREATE TABLE products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/iceberg/test_case/cdc/mysql_cdc_insert.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
USE `my@db`;
USE `mydb`;

INSERT INTO products VALUES (default,"109","109"),
(default,"110","110"),
Expand Down
4 changes: 4 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_sink_append_only.slt
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,17 @@ INSERT INTO t6 VALUES
statement ok
FLUSH;

sleep 5s

statement ok
INSERT INTO t6 VALUES
(5, 5, 5000, 5.5, 5.55, '5-5', true, '2022-03-15', '2022-03-15 05:00:00Z'::timestamptz, '2022-03-15 05:00:00');

statement ok
FLUSH;

sleep 5s

statement ok
DROP SINK s6;

Expand Down
14 changes: 14 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_sink_upsert.slt
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,26 @@ INSERT INTO t6 VALUES (1, 1, 2, '1-2'), (1, 2, 2, '2-2'), (1, 3, 2, '3-2'), (1,
statement ok
FLUSH;

sleep 5s

statement ok
INSERT INTO t6 VALUES (1, 1, 50, '1-50');

statement ok
FLUSH;

sleep 10s

query I
select count(*) from t6;
----
7

statement ok
FLUSH;

sleep 10s

statement ok
DROP SINK s6;

Expand Down
4 changes: 4 additions & 0 deletions e2e_test/sink/iceberg_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@ INSERT INTO t6 VALUES (1, 2, '1-2'), (2, 2, '2-2'), (3, 2, '3-2'), (5, 2, '5-2')
statement ok
FLUSH;

sleep 5s

statement ok
INSERT INTO t6 VALUES (1, 50, '1-50');

statement ok
FLUSH;

sleep 5s

statement ok
DROP SINK s6;

Expand Down
9 changes: 5 additions & 4 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -832,11 +832,12 @@ impl SinkCommitCoordinator for IcebergSinkCommitter {
txn.append_data_file(s.data_files);
txn.append_delete_file(s.delete_files);
});
txn.commit()
.await
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
txn.commit().await.map_err(|err| {
tracing::error!(?err, "Failed to commit iceberg table");
SinkError::Iceberg(anyhow!(err))
})?;

tracing::info!("Succeeded to commit ti iceberg table in epoch {epoch}.");
tracing::info!("Succeeded to commit to iceberg table in epoch {epoch}.");
Ok(())
}
}
Expand Down

0 comments on commit 732e582

Please sign in to comment.