diff --git a/ci/scripts/connector-node-integration-test.sh b/ci/scripts/connector-node-integration-test.sh index 0be1dcfcabd8a..6b3147bbeb193 100755 --- a/ci/scripts/connector-node-integration-test.sh +++ b/ci/scripts/connector-node-integration-test.sh @@ -115,43 +115,43 @@ for ((i=0; i<${#type[@]}; i++)); do fi # test upsert mode -# echo "--- running iceberg upsert mode ${type[i]} integration tests" -# cd ${RISINGWAVE_ROOT}/java/connector-node/python-client -# python3 pyspark-util.py create_iceberg -# if python3 integration_tests.py --upsert_iceberg_sink ${upsert_sink_input_feature[i]}; then -# python3 pyspark-util.py test_upsert_iceberg --input_file="./data/upsert_sink_input.json" -# echo "Upsert iceberg sink ${type[i]} test passed" -# else -# echo "Upsert iceberg sink ${type[i]} test failed" -# exit 1 -# fi -# python3 pyspark-util.py drop_iceberg + echo "--- running iceberg upsert mode ${type[i]} integration tests" + cd ${RISINGWAVE_ROOT}/java/connector-node/python-client + python3 pyspark-util.py create_iceberg + if python3 integration_tests.py --upsert_iceberg_sink ${upsert_sink_input_feature[i]}; then + python3 pyspark-util.py test_upsert_iceberg --input_file="./data/upsert_sink_input.json" + echo "Upsert iceberg sink ${type[i]} test passed" + else + echo "Upsert iceberg sink ${type[i]} test failed" + exit 1 + fi + python3 pyspark-util.py drop_iceberg # test append-only mode -# echo "--- running iceberg append-only mode ${type[i]} integration tests" -# cd ${RISINGWAVE_ROOT}/java/connector-node/python-client -# python3 pyspark-util.py create_iceberg -# if python3 integration_tests.py --iceberg_sink ${sink_input_feature[i]}; then -# python3 pyspark-util.py test_iceberg -# echo "Iceberg sink ${type[i]} test passed" -# else -# echo "Iceberg sink ${type[i]} test failed" -# exit 1 -# fi -# python3 pyspark-util.py drop_iceberg + echo "--- running iceberg append-only mode ${type[i]} integration tests" + cd ${RISINGWAVE_ROOT}/java/connector-node/python-client + python3 pyspark-util.py create_iceberg + if python3 integration_tests.py --iceberg_sink ${sink_input_feature[i]}; then + python3 pyspark-util.py test_iceberg + echo "Iceberg sink ${type[i]} test passed" + else + echo "Iceberg sink ${type[i]} test failed" + exit 1 + fi + python3 pyspark-util.py drop_iceberg # test append-only mode -# echo "--- running deltalake append-only mod ${type[i]} integration tests" -# cd ${RISINGWAVE_ROOT}/java/connector-node/python-client -# python3 pyspark-util.py create_deltalake -# if python3 integration_tests.py --deltalake_sink ${sink_input_feature[i]}; then -# python3 pyspark-util.py test_deltalake -# echo "Deltalake sink ${type[i]} test passed" -# else -# echo "Deltalake sink ${type[i]} test failed" -# exit 1 -# fi -# python3 pyspark-util.py clean_deltalake + echo "--- running deltalake append-only mod ${type[i]} integration tests" + cd ${RISINGWAVE_ROOT}/java/connector-node/python-client + python3 pyspark-util.py create_deltalake + if python3 integration_tests.py --deltalake_sink ${sink_input_feature[i]}; then + python3 pyspark-util.py test_deltalake + echo "Deltalake sink ${type[i]} test passed" + else + echo "Deltalake sink ${type[i]} test failed" + exit 1 + fi + python3 pyspark-util.py clean_deltalake done diff --git a/e2e_test/iceberg/test_case/cdc/load.slt b/e2e_test/iceberg/test_case/cdc/load.slt index e053fd82efb63..2ac8ab2d61f25 100644 --- a/e2e_test/iceberg/test_case/cdc/load.slt +++ b/e2e_test/iceberg/test_case/cdc/load.slt @@ -38,8 +38,6 @@ CREATE SINK s1 AS select * from products WITH ( statement ok flush; -sleep 10s - query I select count(*) from products; ---- diff --git a/e2e_test/sink/deltalake_rust_sink.slt b/e2e_test/sink/deltalake_rust_sink.slt index 9265940e01edf..4207c9446dc1b 100644 --- a/e2e_test/sink/deltalake_rust_sink.slt +++ b/e2e_test/sink/deltalake_rust_sink.slt @@ -7,7 +7,7 @@ CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6; statement ok create sink s6 as select * from mv6 with ( - connector = 'deltalake', + connector = 'deltalake_rust', type = 'append-only', force_append_only = 'true', location = 's3a://deltalake/deltalake-test', diff --git a/e2e_test/sink/iceberg_sink.slt b/e2e_test/sink/iceberg_sink.slt index 9e7bef8d12239..9ea6470d2d395 100644 --- a/e2e_test/sink/iceberg_sink.slt +++ b/e2e_test/sink/iceberg_sink.slt @@ -6,17 +6,15 @@ CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6; statement ok CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH ( - connector = 'iceberg', + connector = 'iceberg_java', type = 'upsert', primary_key = 'v1', warehouse.path = 's3://iceberg', s3.endpoint = 'http://127.0.0.1:9301', s3.access.key = 'hummockadmin', s3.secret.key = 'hummockadmin', - s3.region = 'us-east-1', - catalog.type = 'storage', - database.name='demo', - table.name='demo_db.demo_table' + database.name='demo_db', + table.name='demo_table' ); statement ok diff --git a/integration_tests/iceberg-sink/create_sink.sql b/integration_tests/iceberg-sink/create_sink.sql index ad1cf0727c9c1..303d6591257a3 100644 --- a/integration_tests/iceberg-sink/create_sink.sql +++ b/integration_tests/iceberg-sink/create_sink.sql @@ -1,15 +1,13 @@ CREATE SINK bhv_iceberg_sink FROM bhv_mv WITH ( - connector = 'iceberg', + connector = 'iceberg_java', type = 'upsert', primary_key = 'user_id, target_id, event_timestamp', warehouse.path = 's3://hummock001/iceberg-data', s3.endpoint = 'http://minio-0:9301', s3.access.key = 'hummockadmin', s3.secret.key = 'hummockadmin', - s3.region = 'us-east-1', - catalog.type = 'storage', - database.name='demo', - table.name='demo_db.demo_table' + database.name='demo_db', + table.name='demo_table' ); \ No newline at end of file diff --git a/java/connector-node/assembly/assembly.xml b/java/connector-node/assembly/assembly.xml index 9cf457d8a0b6d..751ba2f410e0b 100644 --- a/java/connector-node/assembly/assembly.xml +++ b/java/connector-node/assembly/assembly.xml @@ -42,8 +42,11 @@ *:risingwave-sink-es-7 *:risingwave-sink-cassandra *:risingwave-sink-jdbc - *:risingwave-sink-mock-flink-http-sink + *:risingwave-sink-iceberg + *:risingwave-sink-deltalake + + *:s3-common true true diff --git a/java/connector-node/risingwave-connector-service/pom.xml b/java/connector-node/risingwave-connector-service/pom.xml index 047c523c1c7db..8c11a4cacb3c0 100644 --- a/java/connector-node/risingwave-connector-service/pom.xml +++ b/java/connector-node/risingwave-connector-service/pom.xml @@ -38,11 +38,6 @@ com.risingwave connector-api - - com.google.guava - guava - - com.google.code.gson @@ -82,6 +77,16 @@ risingwave-sink-jdbc provided + + com.risingwave + risingwave-sink-iceberg + provided + + + com.risingwave + risingwave-sink-deltalake + provided + com.risingwave risingwave-sink-es-7 @@ -99,7 +104,6 @@ com.risingwave risingwave-sink-mock-flink-http-sink - provided diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java index 97ddefcdc141c..8a1b9c1f7d5da 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java @@ -39,6 +39,10 @@ public static SinkFactory getSinkFactory(String sinkName) { return new FileSinkFactory(); case "jdbc": return new JDBCSinkFactory(); + case "iceberg_java": + return new IcebergSinkFactory(); + case "deltalake": + return new DeltaLakeSinkFactory(); case "elasticsearch": return new EsSinkFactory(); case "cassandra": diff --git a/java/connector-node/risingwave-sink-es-7/pom.xml b/java/connector-node/risingwave-sink-es-7/pom.xml index 9c8515098d7d8..5bfe39c1518c6 100644 --- a/java/connector-node/risingwave-sink-es-7/pom.xml +++ b/java/connector-node/risingwave-sink-es-7/pom.xml @@ -51,10 +51,6 @@ org.elasticsearch.client elasticsearch-rest-high-level-client - - org.apache.httpcomponents - httpclient - diff --git a/java/pom.xml b/java/pom.xml index 5dffac5a81c56..7e7e554abc0d4 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -303,16 +303,6 @@ gson ${gson.version} - - com.google.guava - guava - 33.0.0-jre - - - org.apache.httpcomponents - httpclient - 4.5.10 - io.prometheus simpleclient_httpserver diff --git a/src/connector/src/sink/deltalake.rs b/src/connector/src/sink/deltalake.rs index 3e786adb4e974..7bf8d3c5b5966 100644 --- a/src/connector/src/sink/deltalake.rs +++ b/src/connector/src/sink/deltalake.rs @@ -46,7 +46,7 @@ use super::{ }; use crate::sink::writer::SinkWriterExt; -pub const DELTALAKE_SINK: &str = "deltalake"; +pub const DELTALAKE_SINK: &str = "deltalake_rust"; pub const DEFAULT_REGION: &str = "us-east-1"; #[derive(Deserialize, Serialize, Debug, Clone, WithOptions)] @@ -534,7 +534,7 @@ mod test { .unwrap(); let properties = hashmap! { - "connector".to_string() => "deltalake".to_string(), + "connector".to_string() => "deltalake_rust".to_string(), "force_append_only".to_string() => "true".to_string(), "type".to_string() => "append-only".to_string(), "location".to_string() => format!("file://{}", path), diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 4b3ea60cd46b6..660fdc3f9c6b3 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -51,11 +51,22 @@ use super::{ }; use crate::deserialize_bool_from_string; use crate::sink::coordinate::CoordinatedSinkWriter; +use crate::sink::remote::{CoordinatedRemoteSink, RemoteSinkTrait}; use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt}; use crate::sink::{Result, SinkCommitCoordinator, SinkParam}; /// This iceberg sink is WIP. When it ready, we will change this name to "iceberg". pub const ICEBERG_SINK: &str = "iceberg"; +pub const REMOTE_ICEBERG_SINK: &str = "iceberg_java"; + +#[derive(Debug)] +pub struct RemoteIceberg; + +impl RemoteSinkTrait for RemoteIceberg { + const SINK_NAME: &'static str = REMOTE_ICEBERG_SINK; +} + +pub type RemoteIcebergSink = CoordinatedRemoteSink; #[derive(Debug, Clone, Deserialize, WithOptions)] #[serde(deny_unknown_fields)] diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 2809d31093b42..d31c1cb475d19 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -79,13 +79,15 @@ macro_rules! for_all_sinks { { ClickHouse, $crate::sink::clickhouse::ClickHouseSink }, { Iceberg, $crate::sink::iceberg::IcebergSink }, { Nats, $crate::sink::nats::NatsSink }, + { RemoteIceberg, $crate::sink::iceberg::RemoteIcebergSink }, { Jdbc, $crate::sink::remote::JdbcSink }, + { DeltaLake, $crate::sink::remote::DeltaLakeSink }, { ElasticSearch, $crate::sink::remote::ElasticSearchSink }, { Cassandra, $crate::sink::remote::CassandraSink }, { HttpJava, $crate::sink::remote::HttpJavaSink }, { Doris, $crate::sink::doris::DorisSink }, { Starrocks, $crate::sink::starrocks::StarrocksSink }, - { DeltaLake, $crate::sink::deltalake::DeltaLakeSink }, + { DeltaLakeRust, $crate::sink::deltalake::DeltaLakeSink }, { BigQuery, $crate::sink::big_query::BigQuerySink }, { Test, $crate::sink::test_sink::TestSink }, { Table, $crate::sink::table::TableSink } diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 6837e9b2c1dcc..68814531d9293 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -34,6 +34,7 @@ use risingwave_connector::sink::{ SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION, }; use risingwave_pb::stream_plan::stream_node::PbNodeBody; +use tracing::info; use super::derive::{derive_columns, derive_pk}; use super::generic::GenericPlanRef; @@ -149,6 +150,13 @@ impl StreamSink { Distribution::Single => RequiredDist::single(), _ => { match properties.get("connector") { + Some(s) if s == "deltalake" => { + // iceberg with multiple parallelism will fail easily with concurrent commit + // on metadata + // TODO: reset iceberg sink to have multiple parallelism + info!("setting iceberg sink parallelism to singleton"); + RequiredDist::single() + } Some(s) if s == "jdbc" && sink_type == SinkType::Upsert => { if sink_type == SinkType::Upsert && downstream_pk.is_empty() { return Err(ErrorCode::SinkError(Box::new(Error::new(