Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(sink): exclude java iceberg and deltalake sink from connector node #14277

Merged
merged 9 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 0 additions & 39 deletions ci/scripts/connector-node-integration-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -113,45 +113,6 @@ for ((i=0; i<${#type[@]}; i++)); do
echo "File sink ${type[i]} test failed"
exit 1
fi

# test upsert mode
echo "--- running iceberg upsert mode ${type[i]} integration tests"
cd ${RISINGWAVE_ROOT}/java/connector-node/python-client
python3 pyspark-util.py create_iceberg
if python3 integration_tests.py --upsert_iceberg_sink ${upsert_sink_input_feature[i]}; then
python3 pyspark-util.py test_upsert_iceberg --input_file="./data/upsert_sink_input.json"
echo "Upsert iceberg sink ${type[i]} test passed"
else
echo "Upsert iceberg sink ${type[i]} test failed"
exit 1
fi
python3 pyspark-util.py drop_iceberg

# test append-only mode
echo "--- running iceberg append-only mode ${type[i]} integration tests"
cd ${RISINGWAVE_ROOT}/java/connector-node/python-client
python3 pyspark-util.py create_iceberg
if python3 integration_tests.py --iceberg_sink ${sink_input_feature[i]}; then
python3 pyspark-util.py test_iceberg
echo "Iceberg sink ${type[i]} test passed"
else
echo "Iceberg sink ${type[i]} test failed"
exit 1
fi
python3 pyspark-util.py drop_iceberg

# test append-only mode
echo "--- running deltalake append-only mod ${type[i]} integration tests"
cd ${RISINGWAVE_ROOT}/java/connector-node/python-client
python3 pyspark-util.py create_deltalake
if python3 integration_tests.py --deltalake_sink ${sink_input_feature[i]}; then
python3 pyspark-util.py test_deltalake
echo "Deltalake sink ${type[i]} test passed"
else
echo "Deltalake sink ${type[i]} test failed"
exit 1
fi
python3 pyspark-util.py clean_deltalake
done


Expand Down
2 changes: 1 addition & 1 deletion e2e_test/sink/deltalake_rust_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;
statement ok
create sink s6 as select * from mv6
with (
connector = 'deltalake_rust',
connector = 'deltalake',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a user facing change?

type = 'append-only',
force_append_only = 'true',
location = 's3a://deltalake/deltalake-test',
Expand Down
8 changes: 5 additions & 3 deletions e2e_test/sink/iceberg_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@ CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;

statement ok
CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH (
connector = 'iceberg_java',
connector = 'iceberg',
type = 'upsert',
primary_key = 'v1',
warehouse.path = 's3://iceberg',
s3.endpoint = 'http://127.0.0.1:9301',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
database.name='demo_db',
table.name='demo_table'
s3.region = 'us-east-1',
catalog.type = 'storage',
database.name='demo',
table.name='demo_db.demo_table'
);

statement ok
Expand Down
8 changes: 5 additions & 3 deletions integration_tests/iceberg-sink/create_sink.sql
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
CREATE SINK bhv_iceberg_sink
FROM
bhv_mv WITH (
connector = 'iceberg_java',
connector = 'iceberg',
type = 'upsert',
primary_key = 'user_id, target_id, event_timestamp',
warehouse.path = 's3://hummock001/iceberg-data',
s3.endpoint = 'http://minio-0:9301',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
database.name='demo_db',
table.name='demo_table'
s3.region = 'us-east-1',
catalog.type = 'storage',
database.name='demo',
table.name='demo_db.demo_table'
);
5 changes: 1 addition & 4 deletions java/connector-node/assembly/assembly.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,8 @@
<include>*:risingwave-sink-es-7</include>
<include>*:risingwave-sink-cassandra</include>
<include>*:risingwave-sink-jdbc</include>
<include>*:risingwave-sink-iceberg</include>
<include>*:risingwave-sink-deltalake</include>
<include>*:risingwave-sink-mock-flink-http-sink</include>

<!-- For S3-->
<include>*:s3-common</include>
</includes>
<useTransitiveDependencies>true</useTransitiveDependencies>
<useTransitiveFiltering>true</useTransitiveFiltering>
Expand Down
16 changes: 6 additions & 10 deletions java/connector-node/risingwave-connector-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@
<groupId>com.risingwave</groupId>
<artifactId>connector-api</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>


<dependency>
<groupId>com.google.code.gson</groupId>
Expand Down Expand Up @@ -77,16 +82,6 @@
<artifactId>risingwave-sink-jdbc</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.risingwave</groupId>
<artifactId>risingwave-sink-iceberg</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.risingwave</groupId>
<artifactId>risingwave-sink-deltalake</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.risingwave</groupId>
<artifactId>risingwave-sink-es-7</artifactId>
Expand All @@ -104,6 +99,7 @@
<dependency>
<groupId>com.risingwave</groupId>
<artifactId>risingwave-sink-mock-flink-http-sink</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ public static SinkFactory getSinkFactory(String sinkName) {
return new FileSinkFactory();
case "jdbc":
return new JDBCSinkFactory();
case "iceberg_java":
return new IcebergSinkFactory();
case "deltalake":
return new DeltaLakeSinkFactory();
case "elasticsearch":
return new EsSinkFactory();
case "cassandra":
Expand Down
4 changes: 4 additions & 0 deletions java/connector-node/risingwave-sink-es-7/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
</dependencies>

</project>
10 changes: 10 additions & 0 deletions java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,16 @@
<artifactId>gson</artifactId>
<version>${gson.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>33.0.0-jre</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.10</version>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_httpserver</artifactId>
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/sink/deltalake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use super::{
};
use crate::sink::writer::SinkWriterExt;

pub const DELTALAKE_SINK: &str = "deltalake_rust";
pub const DELTALAKE_SINK: &str = "deltalake";
pub const DEFAULT_REGION: &str = "us-east-1";

#[derive(Deserialize, Serialize, Debug, Clone, WithOptions)]
Expand Down Expand Up @@ -534,7 +534,7 @@ mod test {
.unwrap();

let properties = hashmap! {
"connector".to_string() => "deltalake_rust".to_string(),
"connector".to_string() => "deltalake".to_string(),
"force_append_only".to_string() => "true".to_string(),
"type".to_string() => "append-only".to_string(),
"location".to_string() => format!("file://{}", path),
Expand Down
11 changes: 0 additions & 11 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,11 @@ use super::{
};
use crate::deserialize_bool_from_string;
use crate::sink::coordinate::CoordinatedSinkWriter;
use crate::sink::remote::{CoordinatedRemoteSink, RemoteSinkTrait};
use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt};
use crate::sink::{Result, SinkCommitCoordinator, SinkParam};

/// This iceberg sink is WIP. When it ready, we will change this name to "iceberg".
pub const ICEBERG_SINK: &str = "iceberg";
pub const REMOTE_ICEBERG_SINK: &str = "iceberg_java";

#[derive(Debug)]
pub struct RemoteIceberg;

impl RemoteSinkTrait for RemoteIceberg {
const SINK_NAME: &'static str = REMOTE_ICEBERG_SINK;
}

pub type RemoteIcebergSink = CoordinatedRemoteSink<RemoteIceberg>;

#[derive(Debug, Clone, Deserialize, WithOptions)]
#[serde(deny_unknown_fields)]
Expand Down
4 changes: 1 addition & 3 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,13 @@ macro_rules! for_all_sinks {
{ ClickHouse, $crate::sink::clickhouse::ClickHouseSink },
{ Iceberg, $crate::sink::iceberg::IcebergSink },
{ Nats, $crate::sink::nats::NatsSink },
{ RemoteIceberg, $crate::sink::iceberg::RemoteIcebergSink },
{ Jdbc, $crate::sink::remote::JdbcSink },
{ DeltaLake, $crate::sink::remote::DeltaLakeSink },
{ ElasticSearch, $crate::sink::remote::ElasticSearchSink },
{ Cassandra, $crate::sink::remote::CassandraSink },
{ HttpJava, $crate::sink::remote::HttpJavaSink },
{ Doris, $crate::sink::doris::DorisSink },
{ Starrocks, $crate::sink::starrocks::StarrocksSink },
{ DeltaLakeRust, $crate::sink::deltalake::DeltaLakeSink },
{ DeltaLake, $crate::sink::deltalake::DeltaLakeSink },
{ BigQuery, $crate::sink::big_query::BigQuerySink },
{ Test, $crate::sink::test_sink::TestSink },
{ Table, $crate::sink::table::TableSink }
Expand Down
8 changes: 0 additions & 8 deletions src/frontend/src/optimizer/plan_node/stream_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use risingwave_connector::sink::{
SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION,
};
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use tracing::info;

use super::derive::{derive_columns, derive_pk};
use super::generic::GenericPlanRef;
Expand Down Expand Up @@ -150,13 +149,6 @@ impl StreamSink {
Distribution::Single => RequiredDist::single(),
_ => {
match properties.get("connector") {
Some(s) if s == "deltalake" => {
// iceberg with multiple parallelism will fail easily with concurrent commit
// on metadata
// TODO: reset iceberg sink to have multiple parallelism
Comment on lines -154 to -156
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the parallelism requirement for iceberg rust version

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment was for previous iceberg sink. Now there is no singleton requirement on both iceberg sink and delta lake sink.

info!("setting iceberg sink parallelism to singleton");
RequiredDist::single()
}
Some(s) if s == "jdbc" && sink_type == SinkType::Upsert => {
if sink_type == SinkType::Upsert && downstream_pk.is_empty() {
return Err(ErrorCode::SinkError(Box::new(Error::new(
Expand Down
Loading