From b38da9cead2dbda82409b3e3926a85537253b309 Mon Sep 17 00:00:00 2001
From: William Wen <44139337+wenym1@users.noreply.github.com>
Date: Tue, 2 Jan 2024 14:58:54 +0800
Subject: [PATCH 01/12] chore(sink): exclude java iceberg and deltalake sink
from connector node (#14277)
---
ci/scripts/connector-node-integration-test.sh | 39 -------------------
e2e_test/sink/deltalake_rust_sink.slt | 2 +-
e2e_test/sink/iceberg_sink.slt | 8 ++--
.../iceberg-sink/create_sink.sql | 8 ++--
java/connector-node/assembly/assembly.xml | 5 +--
.../risingwave-connector-service/pom.xml | 16 +++-----
.../com/risingwave/connector/SinkUtils.java | 4 --
.../risingwave-sink-es-7/pom.xml | 4 ++
java/pom.xml | 10 +++++
src/connector/src/sink/deltalake.rs | 4 +-
src/connector/src/sink/iceberg/mod.rs | 11 ------
src/connector/src/sink/mod.rs | 4 +-
.../src/optimizer/plan_node/stream_sink.rs | 8 ----
13 files changed, 35 insertions(+), 88 deletions(-)
diff --git a/ci/scripts/connector-node-integration-test.sh b/ci/scripts/connector-node-integration-test.sh
index 6b3147bbeb193..a6b748c5728c7 100755
--- a/ci/scripts/connector-node-integration-test.sh
+++ b/ci/scripts/connector-node-integration-test.sh
@@ -113,45 +113,6 @@ for ((i=0; i<${#type[@]}; i++)); do
echo "File sink ${type[i]} test failed"
exit 1
fi
-
- # test upsert mode
- echo "--- running iceberg upsert mode ${type[i]} integration tests"
- cd ${RISINGWAVE_ROOT}/java/connector-node/python-client
- python3 pyspark-util.py create_iceberg
- if python3 integration_tests.py --upsert_iceberg_sink ${upsert_sink_input_feature[i]}; then
- python3 pyspark-util.py test_upsert_iceberg --input_file="./data/upsert_sink_input.json"
- echo "Upsert iceberg sink ${type[i]} test passed"
- else
- echo "Upsert iceberg sink ${type[i]} test failed"
- exit 1
- fi
- python3 pyspark-util.py drop_iceberg
-
- # test append-only mode
- echo "--- running iceberg append-only mode ${type[i]} integration tests"
- cd ${RISINGWAVE_ROOT}/java/connector-node/python-client
- python3 pyspark-util.py create_iceberg
- if python3 integration_tests.py --iceberg_sink ${sink_input_feature[i]}; then
- python3 pyspark-util.py test_iceberg
- echo "Iceberg sink ${type[i]} test passed"
- else
- echo "Iceberg sink ${type[i]} test failed"
- exit 1
- fi
- python3 pyspark-util.py drop_iceberg
-
- # test append-only mode
- echo "--- running deltalake append-only mod ${type[i]} integration tests"
- cd ${RISINGWAVE_ROOT}/java/connector-node/python-client
- python3 pyspark-util.py create_deltalake
- if python3 integration_tests.py --deltalake_sink ${sink_input_feature[i]}; then
- python3 pyspark-util.py test_deltalake
- echo "Deltalake sink ${type[i]} test passed"
- else
- echo "Deltalake sink ${type[i]} test failed"
- exit 1
- fi
- python3 pyspark-util.py clean_deltalake
done
diff --git a/e2e_test/sink/deltalake_rust_sink.slt b/e2e_test/sink/deltalake_rust_sink.slt
index 4207c9446dc1b..9265940e01edf 100644
--- a/e2e_test/sink/deltalake_rust_sink.slt
+++ b/e2e_test/sink/deltalake_rust_sink.slt
@@ -7,7 +7,7 @@ CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;
statement ok
create sink s6 as select * from mv6
with (
- connector = 'deltalake_rust',
+ connector = 'deltalake',
type = 'append-only',
force_append_only = 'true',
location = 's3a://deltalake/deltalake-test',
diff --git a/e2e_test/sink/iceberg_sink.slt b/e2e_test/sink/iceberg_sink.slt
index 9ea6470d2d395..9e7bef8d12239 100644
--- a/e2e_test/sink/iceberg_sink.slt
+++ b/e2e_test/sink/iceberg_sink.slt
@@ -6,15 +6,17 @@ CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;
statement ok
CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH (
- connector = 'iceberg_java',
+ connector = 'iceberg',
type = 'upsert',
primary_key = 'v1',
warehouse.path = 's3://iceberg',
s3.endpoint = 'http://127.0.0.1:9301',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
- database.name='demo_db',
- table.name='demo_table'
+ s3.region = 'us-east-1',
+ catalog.type = 'storage',
+ database.name='demo',
+ table.name='demo_db.demo_table'
);
statement ok
diff --git a/integration_tests/iceberg-sink/create_sink.sql b/integration_tests/iceberg-sink/create_sink.sql
index 303d6591257a3..ad1cf0727c9c1 100644
--- a/integration_tests/iceberg-sink/create_sink.sql
+++ b/integration_tests/iceberg-sink/create_sink.sql
@@ -1,13 +1,15 @@
CREATE SINK bhv_iceberg_sink
FROM
bhv_mv WITH (
- connector = 'iceberg_java',
+ connector = 'iceberg',
type = 'upsert',
primary_key = 'user_id, target_id, event_timestamp',
warehouse.path = 's3://hummock001/iceberg-data',
s3.endpoint = 'http://minio-0:9301',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
- database.name='demo_db',
- table.name='demo_table'
+ s3.region = 'us-east-1',
+ catalog.type = 'storage',
+ database.name='demo',
+ table.name='demo_db.demo_table'
);
\ No newline at end of file
diff --git a/java/connector-node/assembly/assembly.xml b/java/connector-node/assembly/assembly.xml
index 751ba2f410e0b..9cf457d8a0b6d 100644
--- a/java/connector-node/assembly/assembly.xml
+++ b/java/connector-node/assembly/assembly.xml
@@ -42,11 +42,8 @@
*:risingwave-sink-es-7
*:risingwave-sink-cassandra
*:risingwave-sink-jdbc
- *:risingwave-sink-iceberg
- *:risingwave-sink-deltalake
+ *:risingwave-sink-mock-flink-http-sink
-
- *:s3-common
true
true
diff --git a/java/connector-node/risingwave-connector-service/pom.xml b/java/connector-node/risingwave-connector-service/pom.xml
index 8c11a4cacb3c0..047c523c1c7db 100644
--- a/java/connector-node/risingwave-connector-service/pom.xml
+++ b/java/connector-node/risingwave-connector-service/pom.xml
@@ -38,6 +38,11 @@
com.risingwave
connector-api
+
+ com.google.guava
+ guava
+
+
com.google.code.gson
@@ -77,16 +82,6 @@
risingwave-sink-jdbc
provided
-
- com.risingwave
- risingwave-sink-iceberg
- provided
-
-
- com.risingwave
- risingwave-sink-deltalake
- provided
-
com.risingwave
risingwave-sink-es-7
@@ -104,6 +99,7 @@
com.risingwave
risingwave-sink-mock-flink-http-sink
+ provided
diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java
index 326962ddb8d62..679deedebcabf 100644
--- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java
+++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java
@@ -39,10 +39,6 @@ public static SinkFactory getSinkFactory(String sinkName) {
return new FileSinkFactory();
case "jdbc":
return new JDBCSinkFactory();
- case "iceberg_java":
- return new IcebergSinkFactory();
- case "deltalake":
- return new DeltaLakeSinkFactory();
case "elasticsearch":
return new EsSinkFactory();
case "cassandra":
diff --git a/java/connector-node/risingwave-sink-es-7/pom.xml b/java/connector-node/risingwave-sink-es-7/pom.xml
index 5bfe39c1518c6..9c8515098d7d8 100644
--- a/java/connector-node/risingwave-sink-es-7/pom.xml
+++ b/java/connector-node/risingwave-sink-es-7/pom.xml
@@ -51,6 +51,10 @@
org.elasticsearch.client
elasticsearch-rest-high-level-client
+
+ org.apache.httpcomponents
+ httpclient
+
diff --git a/java/pom.xml b/java/pom.xml
index ab7a32d874174..5f168c48bd9ef 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -309,6 +309,16 @@
gson
${gson.version}
+
+ com.google.guava
+ guava
+ 33.0.0-jre
+
+
+ org.apache.httpcomponents
+ httpclient
+ 4.5.10
+
io.prometheus
simpleclient_httpserver
diff --git a/src/connector/src/sink/deltalake.rs b/src/connector/src/sink/deltalake.rs
index d49045b65e3cb..0ea787d2735d8 100644
--- a/src/connector/src/sink/deltalake.rs
+++ b/src/connector/src/sink/deltalake.rs
@@ -46,7 +46,7 @@ use super::{
};
use crate::sink::writer::SinkWriterExt;
-pub const DELTALAKE_SINK: &str = "deltalake_rust";
+pub const DELTALAKE_SINK: &str = "deltalake";
pub const DEFAULT_REGION: &str = "us-east-1";
#[derive(Deserialize, Serialize, Debug, Clone, WithOptions)]
@@ -534,7 +534,7 @@ mod test {
.unwrap();
let properties = hashmap! {
- "connector".to_string() => "deltalake_rust".to_string(),
+ "connector".to_string() => "deltalake".to_string(),
"force_append_only".to_string() => "true".to_string(),
"type".to_string() => "append-only".to_string(),
"location".to_string() => format!("file://{}", path),
diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs
index fae27991f9e39..b2cbcef9f6712 100644
--- a/src/connector/src/sink/iceberg/mod.rs
+++ b/src/connector/src/sink/iceberg/mod.rs
@@ -51,22 +51,11 @@ use super::{
};
use crate::deserialize_bool_from_string;
use crate::sink::coordinate::CoordinatedSinkWriter;
-use crate::sink::remote::{CoordinatedRemoteSink, RemoteSinkTrait};
use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt};
use crate::sink::{Result, SinkCommitCoordinator, SinkParam};
/// This iceberg sink is WIP. When it ready, we will change this name to "iceberg".
pub const ICEBERG_SINK: &str = "iceberg";
-pub const REMOTE_ICEBERG_SINK: &str = "iceberg_java";
-
-#[derive(Debug)]
-pub struct RemoteIceberg;
-
-impl RemoteSinkTrait for RemoteIceberg {
- const SINK_NAME: &'static str = REMOTE_ICEBERG_SINK;
-}
-
-pub type RemoteIcebergSink = CoordinatedRemoteSink;
#[derive(Debug, Clone, Deserialize, WithOptions)]
#[serde(deny_unknown_fields)]
diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs
index 393cbc916f464..945835449dcad 100644
--- a/src/connector/src/sink/mod.rs
+++ b/src/connector/src/sink/mod.rs
@@ -79,15 +79,13 @@ macro_rules! for_all_sinks {
{ ClickHouse, $crate::sink::clickhouse::ClickHouseSink },
{ Iceberg, $crate::sink::iceberg::IcebergSink },
{ Nats, $crate::sink::nats::NatsSink },
- { RemoteIceberg, $crate::sink::iceberg::RemoteIcebergSink },
{ Jdbc, $crate::sink::remote::JdbcSink },
- { DeltaLake, $crate::sink::remote::DeltaLakeSink },
{ ElasticSearch, $crate::sink::remote::ElasticSearchSink },
{ Cassandra, $crate::sink::remote::CassandraSink },
{ HttpJava, $crate::sink::remote::HttpJavaSink },
{ Doris, $crate::sink::doris::DorisSink },
{ Starrocks, $crate::sink::starrocks::StarrocksSink },
- { DeltaLakeRust, $crate::sink::deltalake::DeltaLakeSink },
+ { DeltaLake, $crate::sink::deltalake::DeltaLakeSink },
{ BigQuery, $crate::sink::big_query::BigQuerySink },
{ Test, $crate::sink::test_sink::TestSink },
{ Table, $crate::sink::table::TableSink }
diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs
index ad37dd876fd79..8c690d6047445 100644
--- a/src/frontend/src/optimizer/plan_node/stream_sink.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs
@@ -34,7 +34,6 @@ use risingwave_connector::sink::{
SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION,
};
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
-use tracing::info;
use super::derive::{derive_columns, derive_pk};
use super::generic::GenericPlanRef;
@@ -150,13 +149,6 @@ impl StreamSink {
Distribution::Single => RequiredDist::single(),
_ => {
match properties.get("connector") {
- Some(s) if s == "deltalake" => {
- // iceberg with multiple parallelism will fail easily with concurrent commit
- // on metadata
- // TODO: reset iceberg sink to have multiple parallelism
- info!("setting iceberg sink parallelism to singleton");
- RequiredDist::single()
- }
Some(s) if s == "jdbc" && sink_type == SinkType::Upsert => {
if sink_type == SinkType::Upsert && downstream_pk.is_empty() {
return Err(ErrorCode::SinkError(Box::new(Error::new(
From 7c84d06ff4f8b705117b862619a1f6766ca2d4c0 Mon Sep 17 00:00:00 2001
From: lmatz
Date: Tue, 2 Jan 2024 15:24:22 +0800
Subject: [PATCH 02/12] chore: make some source log debug instead of info
(#14225)
---
src/stream/src/executor/source/fs_source_executor.rs | 2 +-
src/stream/src/executor/source/source_executor.rs | 4 ++--
2 files changed, 3 insertions(+), 3 deletions(-)
diff --git a/src/stream/src/executor/source/fs_source_executor.rs b/src/stream/src/executor/source/fs_source_executor.rs
index 3f7853ad9dd1e..85967a253ba91 100644
--- a/src/stream/src/executor/source/fs_source_executor.rs
+++ b/src/stream/src/executor/source/fs_source_executor.rs
@@ -329,7 +329,7 @@ impl FsSourceExecutor {
// init in-memory split states with persisted state if any
self.stream_source_core.init_split_state(boot_state.clone());
let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state);
- tracing::info!(actor_id = self.actor_ctx.id, state = ?recover_state, "start with state");
+ tracing::debug!(actor_id = self.actor_ctx.id, state = ?recover_state, "start with state");
let source_chunk_reader = self
.build_stream_source_reader(&source_desc, recover_state)
diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs
index c3066074c288c..433d54431ced6 100644
--- a/src/stream/src/executor/source/source_executor.rs
+++ b/src/stream/src/executor/source/source_executor.rs
@@ -382,7 +382,7 @@ impl SourceExecutor {
..
}) => {
if let Some(splits) = splits.get(&self.actor_ctx.id) {
- tracing::info!(
+ tracing::debug!(
"source exector: actor {:?} boot with splits: {:?}",
self.actor_ctx.id,
splits
@@ -414,7 +414,7 @@ impl SourceExecutor {
self.stream_source_core = Some(core);
let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state);
- tracing::info!(actor_id = self.actor_ctx.id, state = ?recover_state, "start with state");
+ tracing::debug!(actor_id = self.actor_ctx.id, state = ?recover_state, "start with state");
let source_chunk_reader = self
.build_stream_source_reader(&source_desc, recover_state)
.instrument_await("source_build_reader")
From 43383aaacbf1896b0430b253f30aea8c75f8d8e0 Mon Sep 17 00:00:00 2001
From: August
Date: Tue, 2 Jan 2024 17:21:19 +0800
Subject: [PATCH 03/12] fix: dummy table fragments are not persist when drop
sink with table replacement (#14299)
---
src/meta/src/rpc/ddl_controller.rs | 5 +++++
1 file changed, 5 insertions(+)
diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs
index 9221af4d4e6e3..123f239fcb0fa 100644
--- a/src/meta/src/rpc/ddl_controller.rs
+++ b/src/meta/src/rpc/ddl_controller.rs
@@ -1180,6 +1180,11 @@ impl DdlController {
)
.await?;
+ // Add table fragments to meta store with state: `State::Initial`.
+ mgr.fragment_manager
+ .start_create_table_fragments(table_fragments.clone())
+ .await?;
+
self.stream_manager
.replace_table(table_fragments, context)
.await?;
From f54cb93d9a16b382ab1a4809318fc9c64cd1c29e Mon Sep 17 00:00:00 2001
From: William Wen <44139337+wenym1@users.noreply.github.com>
Date: Tue, 2 Jan 2024 18:00:44 +0800
Subject: [PATCH 04/12] refactor(sink): unify log write path and distinguish
table sink from external sink (#14260)
---
ci/scripts/e2e-sink-test.sh | 1 +
e2e_test/sink/sink_into_table.slt | 3 +
proto/connector_service.proto | 1 -
src/connector/src/sink/log_store.rs | 6 +-
src/connector/src/sink/mod.rs | 29 ++-
src/connector/src/sink/table.rs | 76 -------
.../src/sink/{blackhole.rs => trivial.rs} | 39 +++-
.../src/optimizer/plan_node/stream_sink.rs | 28 ++-
src/meta/src/hummock/manager/mod.rs | 7 +-
.../src/manager/sink_coordination/manager.rs | 5 -
.../src/executor/monitor/streaming_stats.rs | 8 +-
src/stream/src/executor/sink.rs | 189 +++++++++---------
src/stream/src/from_proto/sink.rs | 51 +++--
13 files changed, 200 insertions(+), 243 deletions(-)
delete mode 100644 src/connector/src/sink/table.rs
rename src/connector/src/sink/{blackhole.rs => trivial.rs} (71%)
diff --git a/ci/scripts/e2e-sink-test.sh b/ci/scripts/e2e-sink-test.sh
index 0dbdce47abf55..92f3e30f6cf4c 100755
--- a/ci/scripts/e2e-sink-test.sh
+++ b/ci/scripts/e2e-sink-test.sh
@@ -61,6 +61,7 @@ sqllogictest -p 4566 -d dev './e2e_test/sink/append_only_sink.slt'
sqllogictest -p 4566 -d dev './e2e_test/sink/create_sink_as.slt'
sqllogictest -p 4566 -d dev './e2e_test/sink/blackhole_sink.slt'
sqllogictest -p 4566 -d dev './e2e_test/sink/remote/types.slt'
+sqllogictest -p 4566 -d dev './e2e_test/sink/sink_into_table.slt'
sleep 1
echo "--- testing remote sinks"
diff --git a/e2e_test/sink/sink_into_table.slt b/e2e_test/sink/sink_into_table.slt
index a3df061fc8db4..15a6a37e17c4c 100644
--- a/e2e_test/sink/sink_into_table.slt
+++ b/e2e_test/sink/sink_into_table.slt
@@ -6,6 +6,9 @@ SET RW_IMPLICIT_FLUSH TO true;
statement ok
create table t_simple (v1 int, v2 int);
+statement error unsupported sink type table
+create sink table_sink from t_simple with (connector = 'table');
+
statement ok
create table m_simple (v1 int primary key, v2 int);
diff --git a/proto/connector_service.proto b/proto/connector_service.proto
index 7bc46cab9c187..ddda62c6aace5 100644
--- a/proto/connector_service.proto
+++ b/proto/connector_service.proto
@@ -27,7 +27,6 @@ message SinkParam {
string db_name = 5;
string sink_from_name = 6;
catalog.SinkFormatDesc format_desc = 7;
- optional uint32 target_table = 8;
}
enum SinkPayloadFormat {
diff --git a/src/connector/src/sink/log_store.rs b/src/connector/src/sink/log_store.rs
index 548f33389c3c5..f74a22d3b80e5 100644
--- a/src/connector/src/sink/log_store.rs
+++ b/src/connector/src/sink/log_store.rs
@@ -174,9 +174,9 @@ pub trait LogReader: Send + Sized + 'static {
) -> impl Future