From 1c673ea637996a4c58c23233f21407ed6aab722b Mon Sep 17 00:00:00 2001 From: William Wen Date: Fri, 29 Dec 2023 15:01:40 +0800 Subject: [PATCH 1/7] chore(sink): exclude java iceberg and deltalake sink from connector node --- ci/scripts/connector-node-integration-test.sh | 66 +++++++++---------- e2e_test/sink/deltalake_rust_sink.slt | 2 +- e2e_test/sink/iceberg_sink.slt | 2 +- .../iceberg-sink/create_sink.sql | 2 +- java/connector-node/assembly/assembly.xml | 5 +- .../risingwave-connector-service/pom.xml | 11 +--- .../com/risingwave/connector/SinkUtils.java | 4 -- src/connector/src/sink/deltalake.rs | 4 +- src/connector/src/sink/iceberg/mod.rs | 11 ---- src/connector/src/sink/mod.rs | 4 +- 10 files changed, 41 insertions(+), 70 deletions(-) diff --git a/ci/scripts/connector-node-integration-test.sh b/ci/scripts/connector-node-integration-test.sh index 6b3147bbeb193..0be1dcfcabd8a 100755 --- a/ci/scripts/connector-node-integration-test.sh +++ b/ci/scripts/connector-node-integration-test.sh @@ -115,43 +115,43 @@ for ((i=0; i<${#type[@]}; i++)); do fi # test upsert mode - echo "--- running iceberg upsert mode ${type[i]} integration tests" - cd ${RISINGWAVE_ROOT}/java/connector-node/python-client - python3 pyspark-util.py create_iceberg - if python3 integration_tests.py --upsert_iceberg_sink ${upsert_sink_input_feature[i]}; then - python3 pyspark-util.py test_upsert_iceberg --input_file="./data/upsert_sink_input.json" - echo "Upsert iceberg sink ${type[i]} test passed" - else - echo "Upsert iceberg sink ${type[i]} test failed" - exit 1 - fi - python3 pyspark-util.py drop_iceberg +# echo "--- running iceberg upsert mode ${type[i]} integration tests" +# cd ${RISINGWAVE_ROOT}/java/connector-node/python-client +# python3 pyspark-util.py create_iceberg +# if python3 integration_tests.py --upsert_iceberg_sink ${upsert_sink_input_feature[i]}; then +# python3 pyspark-util.py test_upsert_iceberg --input_file="./data/upsert_sink_input.json" +# echo "Upsert iceberg sink ${type[i]} test passed" +# else +# echo "Upsert iceberg sink ${type[i]} test failed" +# exit 1 +# fi +# python3 pyspark-util.py drop_iceberg # test append-only mode - echo "--- running iceberg append-only mode ${type[i]} integration tests" - cd ${RISINGWAVE_ROOT}/java/connector-node/python-client - python3 pyspark-util.py create_iceberg - if python3 integration_tests.py --iceberg_sink ${sink_input_feature[i]}; then - python3 pyspark-util.py test_iceberg - echo "Iceberg sink ${type[i]} test passed" - else - echo "Iceberg sink ${type[i]} test failed" - exit 1 - fi - python3 pyspark-util.py drop_iceberg +# echo "--- running iceberg append-only mode ${type[i]} integration tests" +# cd ${RISINGWAVE_ROOT}/java/connector-node/python-client +# python3 pyspark-util.py create_iceberg +# if python3 integration_tests.py --iceberg_sink ${sink_input_feature[i]}; then +# python3 pyspark-util.py test_iceberg +# echo "Iceberg sink ${type[i]} test passed" +# else +# echo "Iceberg sink ${type[i]} test failed" +# exit 1 +# fi +# python3 pyspark-util.py drop_iceberg # test append-only mode - echo "--- running deltalake append-only mod ${type[i]} integration tests" - cd ${RISINGWAVE_ROOT}/java/connector-node/python-client - python3 pyspark-util.py create_deltalake - if python3 integration_tests.py --deltalake_sink ${sink_input_feature[i]}; then - python3 pyspark-util.py test_deltalake - echo "Deltalake sink ${type[i]} test passed" - else - echo "Deltalake sink ${type[i]} test failed" - exit 1 - fi - python3 pyspark-util.py clean_deltalake +# echo "--- running deltalake append-only mod ${type[i]} integration tests" +# cd ${RISINGWAVE_ROOT}/java/connector-node/python-client +# python3 pyspark-util.py create_deltalake +# if python3 integration_tests.py --deltalake_sink ${sink_input_feature[i]}; then +# python3 pyspark-util.py test_deltalake +# echo "Deltalake sink ${type[i]} test passed" +# else +# echo "Deltalake sink ${type[i]} test failed" +# exit 1 +# fi +# python3 pyspark-util.py clean_deltalake done diff --git a/e2e_test/sink/deltalake_rust_sink.slt b/e2e_test/sink/deltalake_rust_sink.slt index 4207c9446dc1b..9265940e01edf 100644 --- a/e2e_test/sink/deltalake_rust_sink.slt +++ b/e2e_test/sink/deltalake_rust_sink.slt @@ -7,7 +7,7 @@ CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6; statement ok create sink s6 as select * from mv6 with ( - connector = 'deltalake_rust', + connector = 'deltalake', type = 'append-only', force_append_only = 'true', location = 's3a://deltalake/deltalake-test', diff --git a/e2e_test/sink/iceberg_sink.slt b/e2e_test/sink/iceberg_sink.slt index 9ea6470d2d395..2a214c0710ddd 100644 --- a/e2e_test/sink/iceberg_sink.slt +++ b/e2e_test/sink/iceberg_sink.slt @@ -6,7 +6,7 @@ CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6; statement ok CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH ( - connector = 'iceberg_java', + connector = 'iceberg', type = 'upsert', primary_key = 'v1', warehouse.path = 's3://iceberg', diff --git a/integration_tests/iceberg-sink/create_sink.sql b/integration_tests/iceberg-sink/create_sink.sql index 303d6591257a3..9306b836f7744 100644 --- a/integration_tests/iceberg-sink/create_sink.sql +++ b/integration_tests/iceberg-sink/create_sink.sql @@ -1,7 +1,7 @@ CREATE SINK bhv_iceberg_sink FROM bhv_mv WITH ( - connector = 'iceberg_java', + connector = 'iceberg', type = 'upsert', primary_key = 'user_id, target_id, event_timestamp', warehouse.path = 's3://hummock001/iceberg-data', diff --git a/java/connector-node/assembly/assembly.xml b/java/connector-node/assembly/assembly.xml index 751ba2f410e0b..9cf457d8a0b6d 100644 --- a/java/connector-node/assembly/assembly.xml +++ b/java/connector-node/assembly/assembly.xml @@ -42,11 +42,8 @@ *:risingwave-sink-es-7 *:risingwave-sink-cassandra *:risingwave-sink-jdbc - *:risingwave-sink-iceberg - *:risingwave-sink-deltalake + *:risingwave-sink-mock-flink-http-sink - - *:s3-common true true diff --git a/java/connector-node/risingwave-connector-service/pom.xml b/java/connector-node/risingwave-connector-service/pom.xml index 8c11a4cacb3c0..74579879616cb 100644 --- a/java/connector-node/risingwave-connector-service/pom.xml +++ b/java/connector-node/risingwave-connector-service/pom.xml @@ -77,16 +77,6 @@ risingwave-sink-jdbc provided - - com.risingwave - risingwave-sink-iceberg - provided - - - com.risingwave - risingwave-sink-deltalake - provided - com.risingwave risingwave-sink-es-7 @@ -104,6 +94,7 @@ com.risingwave risingwave-sink-mock-flink-http-sink + provided diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java index 8a1b9c1f7d5da..97ddefcdc141c 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java @@ -39,10 +39,6 @@ public static SinkFactory getSinkFactory(String sinkName) { return new FileSinkFactory(); case "jdbc": return new JDBCSinkFactory(); - case "iceberg_java": - return new IcebergSinkFactory(); - case "deltalake": - return new DeltaLakeSinkFactory(); case "elasticsearch": return new EsSinkFactory(); case "cassandra": diff --git a/src/connector/src/sink/deltalake.rs b/src/connector/src/sink/deltalake.rs index 7bf8d3c5b5966..3e786adb4e974 100644 --- a/src/connector/src/sink/deltalake.rs +++ b/src/connector/src/sink/deltalake.rs @@ -46,7 +46,7 @@ use super::{ }; use crate::sink::writer::SinkWriterExt; -pub const DELTALAKE_SINK: &str = "deltalake_rust"; +pub const DELTALAKE_SINK: &str = "deltalake"; pub const DEFAULT_REGION: &str = "us-east-1"; #[derive(Deserialize, Serialize, Debug, Clone, WithOptions)] @@ -534,7 +534,7 @@ mod test { .unwrap(); let properties = hashmap! { - "connector".to_string() => "deltalake_rust".to_string(), + "connector".to_string() => "deltalake".to_string(), "force_append_only".to_string() => "true".to_string(), "type".to_string() => "append-only".to_string(), "location".to_string() => format!("file://{}", path), diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 660fdc3f9c6b3..4b3ea60cd46b6 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -51,22 +51,11 @@ use super::{ }; use crate::deserialize_bool_from_string; use crate::sink::coordinate::CoordinatedSinkWriter; -use crate::sink::remote::{CoordinatedRemoteSink, RemoteSinkTrait}; use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt}; use crate::sink::{Result, SinkCommitCoordinator, SinkParam}; /// This iceberg sink is WIP. When it ready, we will change this name to "iceberg". pub const ICEBERG_SINK: &str = "iceberg"; -pub const REMOTE_ICEBERG_SINK: &str = "iceberg_java"; - -#[derive(Debug)] -pub struct RemoteIceberg; - -impl RemoteSinkTrait for RemoteIceberg { - const SINK_NAME: &'static str = REMOTE_ICEBERG_SINK; -} - -pub type RemoteIcebergSink = CoordinatedRemoteSink; #[derive(Debug, Clone, Deserialize, WithOptions)] #[serde(deny_unknown_fields)] diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index d31c1cb475d19..2809d31093b42 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -79,15 +79,13 @@ macro_rules! for_all_sinks { { ClickHouse, $crate::sink::clickhouse::ClickHouseSink }, { Iceberg, $crate::sink::iceberg::IcebergSink }, { Nats, $crate::sink::nats::NatsSink }, - { RemoteIceberg, $crate::sink::iceberg::RemoteIcebergSink }, { Jdbc, $crate::sink::remote::JdbcSink }, - { DeltaLake, $crate::sink::remote::DeltaLakeSink }, { ElasticSearch, $crate::sink::remote::ElasticSearchSink }, { Cassandra, $crate::sink::remote::CassandraSink }, { HttpJava, $crate::sink::remote::HttpJavaSink }, { Doris, $crate::sink::doris::DorisSink }, { Starrocks, $crate::sink::starrocks::StarrocksSink }, - { DeltaLakeRust, $crate::sink::deltalake::DeltaLakeSink }, + { DeltaLake, $crate::sink::deltalake::DeltaLakeSink }, { BigQuery, $crate::sink::big_query::BigQuerySink }, { Test, $crate::sink::test_sink::TestSink }, { Table, $crate::sink::table::TableSink } From 2471b5ca037bac58735c0070f2701a14b3f27d21 Mon Sep 17 00:00:00 2001 From: William Wen Date: Fri, 29 Dec 2023 15:46:53 +0800 Subject: [PATCH 2/7] fix --- e2e_test/sink/deltalake_rust_sink.slt | 1 + e2e_test/sink/iceberg_sink.slt | 1 + integration_tests/iceberg-sink/create_sink.sql | 1 + java/connector-node/risingwave-connector-service/pom.xml | 5 +++++ java/pom.xml | 5 +++++ src/frontend/src/optimizer/plan_node/stream_sink.rs | 8 -------- 6 files changed, 13 insertions(+), 8 deletions(-) diff --git a/e2e_test/sink/deltalake_rust_sink.slt b/e2e_test/sink/deltalake_rust_sink.slt index 9265940e01edf..6c078afba9715 100644 --- a/e2e_test/sink/deltalake_rust_sink.slt +++ b/e2e_test/sink/deltalake_rust_sink.slt @@ -13,6 +13,7 @@ with ( location = 's3a://deltalake/deltalake-test', s3.access.key = 'hummockadmin', s3.secret.key = 'hummockadmin', + s3.region = 'us-east-1', s3.endpoint = 'http://127.0.0.1:9301' ); diff --git a/e2e_test/sink/iceberg_sink.slt b/e2e_test/sink/iceberg_sink.slt index 2a214c0710ddd..b664a986f967b 100644 --- a/e2e_test/sink/iceberg_sink.slt +++ b/e2e_test/sink/iceberg_sink.slt @@ -13,6 +13,7 @@ CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH s3.endpoint = 'http://127.0.0.1:9301', s3.access.key = 'hummockadmin', s3.secret.key = 'hummockadmin', + s3.region = 'us-east-1', database.name='demo_db', table.name='demo_table' ); diff --git a/integration_tests/iceberg-sink/create_sink.sql b/integration_tests/iceberg-sink/create_sink.sql index 9306b836f7744..dd3fd5a9d7ced 100644 --- a/integration_tests/iceberg-sink/create_sink.sql +++ b/integration_tests/iceberg-sink/create_sink.sql @@ -8,6 +8,7 @@ FROM s3.endpoint = 'http://minio-0:9301', s3.access.key = 'hummockadmin', s3.secret.key = 'hummockadmin', + s3.region = 'us-east-1', database.name='demo_db', table.name='demo_table' ); \ No newline at end of file diff --git a/java/connector-node/risingwave-connector-service/pom.xml b/java/connector-node/risingwave-connector-service/pom.xml index 74579879616cb..047c523c1c7db 100644 --- a/java/connector-node/risingwave-connector-service/pom.xml +++ b/java/connector-node/risingwave-connector-service/pom.xml @@ -38,6 +38,11 @@ com.risingwave connector-api + + com.google.guava + guava + + com.google.code.gson diff --git a/java/pom.xml b/java/pom.xml index 7e7e554abc0d4..26251df28502c 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -303,6 +303,11 @@ gson ${gson.version} + + com.google.guava + guava + 33.0.0-jre + io.prometheus simpleclient_httpserver diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 68814531d9293..6837e9b2c1dcc 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -34,7 +34,6 @@ use risingwave_connector::sink::{ SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION, }; use risingwave_pb::stream_plan::stream_node::PbNodeBody; -use tracing::info; use super::derive::{derive_columns, derive_pk}; use super::generic::GenericPlanRef; @@ -150,13 +149,6 @@ impl StreamSink { Distribution::Single => RequiredDist::single(), _ => { match properties.get("connector") { - Some(s) if s == "deltalake" => { - // iceberg with multiple parallelism will fail easily with concurrent commit - // on metadata - // TODO: reset iceberg sink to have multiple parallelism - info!("setting iceberg sink parallelism to singleton"); - RequiredDist::single() - } Some(s) if s == "jdbc" && sink_type == SinkType::Upsert => { if sink_type == SinkType::Upsert && downstream_pk.is_empty() { return Err(ErrorCode::SinkError(Box::new(Error::new( From a58e3adede2daf4a734a77d9ffaddb8e5197cf42 Mon Sep 17 00:00:00 2001 From: William Wen Date: Fri, 29 Dec 2023 15:48:06 +0800 Subject: [PATCH 3/7] remoev delta lake test region --- e2e_test/sink/deltalake_rust_sink.slt | 1 - 1 file changed, 1 deletion(-) diff --git a/e2e_test/sink/deltalake_rust_sink.slt b/e2e_test/sink/deltalake_rust_sink.slt index 6c078afba9715..9265940e01edf 100644 --- a/e2e_test/sink/deltalake_rust_sink.slt +++ b/e2e_test/sink/deltalake_rust_sink.slt @@ -13,7 +13,6 @@ with ( location = 's3a://deltalake/deltalake-test', s3.access.key = 'hummockadmin', s3.secret.key = 'hummockadmin', - s3.region = 'us-east-1', s3.endpoint = 'http://127.0.0.1:9301' ); From 349bae3dd54de5dd57542e325905ca84027d9d5f Mon Sep 17 00:00:00 2001 From: William Wen Date: Fri, 29 Dec 2023 16:28:06 +0800 Subject: [PATCH 4/7] fix es deps --- java/connector-node/risingwave-sink-es-7/pom.xml | 4 ++++ java/pom.xml | 5 +++++ 2 files changed, 9 insertions(+) diff --git a/java/connector-node/risingwave-sink-es-7/pom.xml b/java/connector-node/risingwave-sink-es-7/pom.xml index 5bfe39c1518c6..9c8515098d7d8 100644 --- a/java/connector-node/risingwave-sink-es-7/pom.xml +++ b/java/connector-node/risingwave-sink-es-7/pom.xml @@ -51,6 +51,10 @@ org.elasticsearch.client elasticsearch-rest-high-level-client + + org.apache.httpcomponents + httpclient + diff --git a/java/pom.xml b/java/pom.xml index 26251df28502c..5dffac5a81c56 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -308,6 +308,11 @@ guava 33.0.0-jre + + org.apache.httpcomponents + httpclient + 4.5.10 + io.prometheus simpleclient_httpserver From 6e7323039ea00e2dc7a995718ef4c68ff1400677 Mon Sep 17 00:00:00 2001 From: William Wen Date: Fri, 29 Dec 2023 16:37:09 +0800 Subject: [PATCH 5/7] add catalog.type --- e2e_test/sink/iceberg_sink.slt | 1 + integration_tests/iceberg-sink/create_sink.sql | 1 + 2 files changed, 2 insertions(+) diff --git a/e2e_test/sink/iceberg_sink.slt b/e2e_test/sink/iceberg_sink.slt index b664a986f967b..dbc3163b70585 100644 --- a/e2e_test/sink/iceberg_sink.slt +++ b/e2e_test/sink/iceberg_sink.slt @@ -14,6 +14,7 @@ CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH s3.access.key = 'hummockadmin', s3.secret.key = 'hummockadmin', s3.region = 'us-east-1', + catalog.type = 'storage', database.name='demo_db', table.name='demo_table' ); diff --git a/integration_tests/iceberg-sink/create_sink.sql b/integration_tests/iceberg-sink/create_sink.sql index dd3fd5a9d7ced..009f7346578ea 100644 --- a/integration_tests/iceberg-sink/create_sink.sql +++ b/integration_tests/iceberg-sink/create_sink.sql @@ -9,6 +9,7 @@ FROM s3.access.key = 'hummockadmin', s3.secret.key = 'hummockadmin', s3.region = 'us-east-1', + catalog.type = 'storage', database.name='demo_db', table.name='demo_table' ); \ No newline at end of file From 53976078f899ddccd86401df92cb9092c3ef1d29 Mon Sep 17 00:00:00 2001 From: William Wen Date: Fri, 29 Dec 2023 17:49:56 +0800 Subject: [PATCH 6/7] fix it --- e2e_test/sink/iceberg_sink.slt | 4 ++-- integration_tests/iceberg-sink/create_sink.sql | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/e2e_test/sink/iceberg_sink.slt b/e2e_test/sink/iceberg_sink.slt index dbc3163b70585..9e7bef8d12239 100644 --- a/e2e_test/sink/iceberg_sink.slt +++ b/e2e_test/sink/iceberg_sink.slt @@ -15,8 +15,8 @@ CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH s3.secret.key = 'hummockadmin', s3.region = 'us-east-1', catalog.type = 'storage', - database.name='demo_db', - table.name='demo_table' + database.name='demo', + table.name='demo_db.demo_table' ); statement ok diff --git a/integration_tests/iceberg-sink/create_sink.sql b/integration_tests/iceberg-sink/create_sink.sql index 009f7346578ea..ad1cf0727c9c1 100644 --- a/integration_tests/iceberg-sink/create_sink.sql +++ b/integration_tests/iceberg-sink/create_sink.sql @@ -10,6 +10,6 @@ FROM s3.secret.key = 'hummockadmin', s3.region = 'us-east-1', catalog.type = 'storage', - database.name='demo_db', - table.name='demo_table' + database.name='demo', + table.name='demo_db.demo_table' ); \ No newline at end of file From bd1e6e8ac0264e62f1c3e1ba8779b6d2bbd32160 Mon Sep 17 00:00:00 2001 From: William Wen Date: Tue, 2 Jan 2024 14:34:54 +0800 Subject: [PATCH 7/7] remove deadcode --- ci/scripts/connector-node-integration-test.sh | 39 ------------------- 1 file changed, 39 deletions(-) diff --git a/ci/scripts/connector-node-integration-test.sh b/ci/scripts/connector-node-integration-test.sh index 0be1dcfcabd8a..a6b748c5728c7 100755 --- a/ci/scripts/connector-node-integration-test.sh +++ b/ci/scripts/connector-node-integration-test.sh @@ -113,45 +113,6 @@ for ((i=0; i<${#type[@]}; i++)); do echo "File sink ${type[i]} test failed" exit 1 fi - - # test upsert mode -# echo "--- running iceberg upsert mode ${type[i]} integration tests" -# cd ${RISINGWAVE_ROOT}/java/connector-node/python-client -# python3 pyspark-util.py create_iceberg -# if python3 integration_tests.py --upsert_iceberg_sink ${upsert_sink_input_feature[i]}; then -# python3 pyspark-util.py test_upsert_iceberg --input_file="./data/upsert_sink_input.json" -# echo "Upsert iceberg sink ${type[i]} test passed" -# else -# echo "Upsert iceberg sink ${type[i]} test failed" -# exit 1 -# fi -# python3 pyspark-util.py drop_iceberg - - # test append-only mode -# echo "--- running iceberg append-only mode ${type[i]} integration tests" -# cd ${RISINGWAVE_ROOT}/java/connector-node/python-client -# python3 pyspark-util.py create_iceberg -# if python3 integration_tests.py --iceberg_sink ${sink_input_feature[i]}; then -# python3 pyspark-util.py test_iceberg -# echo "Iceberg sink ${type[i]} test passed" -# else -# echo "Iceberg sink ${type[i]} test failed" -# exit 1 -# fi -# python3 pyspark-util.py drop_iceberg - - # test append-only mode -# echo "--- running deltalake append-only mod ${type[i]} integration tests" -# cd ${RISINGWAVE_ROOT}/java/connector-node/python-client -# python3 pyspark-util.py create_deltalake -# if python3 integration_tests.py --deltalake_sink ${sink_input_feature[i]}; then -# python3 pyspark-util.py test_deltalake -# echo "Deltalake sink ${type[i]} test passed" -# else -# echo "Deltalake sink ${type[i]} test failed" -# exit 1 -# fi -# python3 pyspark-util.py clean_deltalake done