Skip to content

Commit

Permalink
temp recover to main
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Dec 29, 2023
1 parent 904bffa commit b0f3eee
Show file tree
Hide file tree
Showing 14 changed files with 82 additions and 70 deletions.
66 changes: 33 additions & 33 deletions ci/scripts/connector-node-integration-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -115,43 +115,43 @@ for ((i=0; i<${#type[@]}; i++)); do
fi

# test upsert mode
# echo "--- running iceberg upsert mode ${type[i]} integration tests"
# cd ${RISINGWAVE_ROOT}/java/connector-node/python-client
# python3 pyspark-util.py create_iceberg
# if python3 integration_tests.py --upsert_iceberg_sink ${upsert_sink_input_feature[i]}; then
# python3 pyspark-util.py test_upsert_iceberg --input_file="./data/upsert_sink_input.json"
# echo "Upsert iceberg sink ${type[i]} test passed"
# else
# echo "Upsert iceberg sink ${type[i]} test failed"
# exit 1
# fi
# python3 pyspark-util.py drop_iceberg
echo "--- running iceberg upsert mode ${type[i]} integration tests"
cd ${RISINGWAVE_ROOT}/java/connector-node/python-client
python3 pyspark-util.py create_iceberg
if python3 integration_tests.py --upsert_iceberg_sink ${upsert_sink_input_feature[i]}; then
python3 pyspark-util.py test_upsert_iceberg --input_file="./data/upsert_sink_input.json"
echo "Upsert iceberg sink ${type[i]} test passed"
else
echo "Upsert iceberg sink ${type[i]} test failed"
exit 1
fi
python3 pyspark-util.py drop_iceberg

# test append-only mode
# echo "--- running iceberg append-only mode ${type[i]} integration tests"
# cd ${RISINGWAVE_ROOT}/java/connector-node/python-client
# python3 pyspark-util.py create_iceberg
# if python3 integration_tests.py --iceberg_sink ${sink_input_feature[i]}; then
# python3 pyspark-util.py test_iceberg
# echo "Iceberg sink ${type[i]} test passed"
# else
# echo "Iceberg sink ${type[i]} test failed"
# exit 1
# fi
# python3 pyspark-util.py drop_iceberg
echo "--- running iceberg append-only mode ${type[i]} integration tests"
cd ${RISINGWAVE_ROOT}/java/connector-node/python-client
python3 pyspark-util.py create_iceberg
if python3 integration_tests.py --iceberg_sink ${sink_input_feature[i]}; then
python3 pyspark-util.py test_iceberg
echo "Iceberg sink ${type[i]} test passed"
else
echo "Iceberg sink ${type[i]} test failed"
exit 1
fi
python3 pyspark-util.py drop_iceberg

# test append-only mode
# echo "--- running deltalake append-only mod ${type[i]} integration tests"
# cd ${RISINGWAVE_ROOT}/java/connector-node/python-client
# python3 pyspark-util.py create_deltalake
# if python3 integration_tests.py --deltalake_sink ${sink_input_feature[i]}; then
# python3 pyspark-util.py test_deltalake
# echo "Deltalake sink ${type[i]} test passed"
# else
# echo "Deltalake sink ${type[i]} test failed"
# exit 1
# fi
# python3 pyspark-util.py clean_deltalake
echo "--- running deltalake append-only mod ${type[i]} integration tests"
cd ${RISINGWAVE_ROOT}/java/connector-node/python-client
python3 pyspark-util.py create_deltalake
if python3 integration_tests.py --deltalake_sink ${sink_input_feature[i]}; then
python3 pyspark-util.py test_deltalake
echo "Deltalake sink ${type[i]} test passed"
else
echo "Deltalake sink ${type[i]} test failed"
exit 1
fi
python3 pyspark-util.py clean_deltalake
done


Expand Down
2 changes: 0 additions & 2 deletions e2e_test/iceberg/test_case/cdc/load.slt
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ CREATE SINK s1 AS select * from products WITH (
statement ok
flush;

sleep 10s

query I
select count(*) from products;
----
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/sink/deltalake_rust_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;
statement ok
create sink s6 as select * from mv6
with (
connector = 'deltalake',
connector = 'deltalake_rust',
type = 'append-only',
force_append_only = 'true',
location = 's3a://deltalake/deltalake-test',
Expand Down
8 changes: 3 additions & 5 deletions e2e_test/sink/iceberg_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,15 @@ CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;

statement ok
CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH (
connector = 'iceberg',
connector = 'iceberg_java',
type = 'upsert',
primary_key = 'v1',
warehouse.path = 's3://iceberg',
s3.endpoint = 'http://127.0.0.1:9301',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
s3.region = 'us-east-1',
catalog.type = 'storage',
database.name='demo',
table.name='demo_db.demo_table'
database.name='demo_db',
table.name='demo_table'
);

statement ok
Expand Down
8 changes: 3 additions & 5 deletions integration_tests/iceberg-sink/create_sink.sql
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
CREATE SINK bhv_iceberg_sink
FROM
bhv_mv WITH (
connector = 'iceberg',
connector = 'iceberg_java',
type = 'upsert',
primary_key = 'user_id, target_id, event_timestamp',
warehouse.path = 's3://hummock001/iceberg-data',
s3.endpoint = 'http://minio-0:9301',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
s3.region = 'us-east-1',
catalog.type = 'storage',
database.name='demo',
table.name='demo_db.demo_table'
database.name='demo_db',
table.name='demo_table'
);
5 changes: 4 additions & 1 deletion java/connector-node/assembly/assembly.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,11 @@
<include>*:risingwave-sink-es-7</include>
<include>*:risingwave-sink-cassandra</include>
<include>*:risingwave-sink-jdbc</include>
<include>*:risingwave-sink-mock-flink-http-sink</include>
<include>*:risingwave-sink-iceberg</include>
<include>*:risingwave-sink-deltalake</include>

<!-- For S3-->
<include>*:s3-common</include>
</includes>
<useTransitiveDependencies>true</useTransitiveDependencies>
<useTransitiveFiltering>true</useTransitiveFiltering>
Expand Down
16 changes: 10 additions & 6 deletions java/connector-node/risingwave-connector-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@
<groupId>com.risingwave</groupId>
<artifactId>connector-api</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>


<dependency>
<groupId>com.google.code.gson</groupId>
Expand Down Expand Up @@ -82,6 +77,16 @@
<artifactId>risingwave-sink-jdbc</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.risingwave</groupId>
<artifactId>risingwave-sink-iceberg</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.risingwave</groupId>
<artifactId>risingwave-sink-deltalake</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.risingwave</groupId>
<artifactId>risingwave-sink-es-7</artifactId>
Expand All @@ -99,7 +104,6 @@
<dependency>
<groupId>com.risingwave</groupId>
<artifactId>risingwave-sink-mock-flink-http-sink</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public static SinkFactory getSinkFactory(String sinkName) {
return new FileSinkFactory();
case "jdbc":
return new JDBCSinkFactory();
case "iceberg_java":
return new IcebergSinkFactory();
case "deltalake":
return new DeltaLakeSinkFactory();
case "elasticsearch":
return new EsSinkFactory();
case "cassandra":
Expand Down
4 changes: 0 additions & 4 deletions java/connector-node/risingwave-sink-es-7/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
</dependencies>

</project>
10 changes: 0 additions & 10 deletions java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -303,16 +303,6 @@
<artifactId>gson</artifactId>
<version>${gson.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>33.0.0-jre</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.10</version>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_httpserver</artifactId>
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/sink/deltalake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use super::{
};
use crate::sink::writer::SinkWriterExt;

pub const DELTALAKE_SINK: &str = "deltalake";
pub const DELTALAKE_SINK: &str = "deltalake_rust";
pub const DEFAULT_REGION: &str = "us-east-1";

#[derive(Deserialize, Serialize, Debug, Clone, WithOptions)]
Expand Down Expand Up @@ -534,7 +534,7 @@ mod test {
.unwrap();

let properties = hashmap! {
"connector".to_string() => "deltalake".to_string(),
"connector".to_string() => "deltalake_rust".to_string(),
"force_append_only".to_string() => "true".to_string(),
"type".to_string() => "append-only".to_string(),
"location".to_string() => format!("file://{}", path),
Expand Down
11 changes: 11 additions & 0 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,22 @@ use super::{
};
use crate::deserialize_bool_from_string;
use crate::sink::coordinate::CoordinatedSinkWriter;
use crate::sink::remote::{CoordinatedRemoteSink, RemoteSinkTrait};
use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt};
use crate::sink::{Result, SinkCommitCoordinator, SinkParam};

/// This iceberg sink is WIP. When it ready, we will change this name to "iceberg".
pub const ICEBERG_SINK: &str = "iceberg";
pub const REMOTE_ICEBERG_SINK: &str = "iceberg_java";

#[derive(Debug)]
pub struct RemoteIceberg;

impl RemoteSinkTrait for RemoteIceberg {
const SINK_NAME: &'static str = REMOTE_ICEBERG_SINK;
}

pub type RemoteIcebergSink = CoordinatedRemoteSink<RemoteIceberg>;

#[derive(Debug, Clone, Deserialize, WithOptions)]
#[serde(deny_unknown_fields)]
Expand Down
4 changes: 3 additions & 1 deletion src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,15 @@ macro_rules! for_all_sinks {
{ ClickHouse, $crate::sink::clickhouse::ClickHouseSink },
{ Iceberg, $crate::sink::iceberg::IcebergSink },
{ Nats, $crate::sink::nats::NatsSink },
{ RemoteIceberg, $crate::sink::iceberg::RemoteIcebergSink },
{ Jdbc, $crate::sink::remote::JdbcSink },
{ DeltaLake, $crate::sink::remote::DeltaLakeSink },
{ ElasticSearch, $crate::sink::remote::ElasticSearchSink },
{ Cassandra, $crate::sink::remote::CassandraSink },
{ HttpJava, $crate::sink::remote::HttpJavaSink },
{ Doris, $crate::sink::doris::DorisSink },
{ Starrocks, $crate::sink::starrocks::StarrocksSink },
{ DeltaLake, $crate::sink::deltalake::DeltaLakeSink },
{ DeltaLakeRust, $crate::sink::deltalake::DeltaLakeSink },
{ BigQuery, $crate::sink::big_query::BigQuerySink },
{ Test, $crate::sink::test_sink::TestSink },
{ Table, $crate::sink::table::TableSink }
Expand Down
8 changes: 8 additions & 0 deletions src/frontend/src/optimizer/plan_node/stream_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use risingwave_connector::sink::{
SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION,
};
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use tracing::info;

use super::derive::{derive_columns, derive_pk};
use super::generic::GenericPlanRef;
Expand Down Expand Up @@ -149,6 +150,13 @@ impl StreamSink {
Distribution::Single => RequiredDist::single(),
_ => {
match properties.get("connector") {
Some(s) if s == "deltalake" => {
// iceberg with multiple parallelism will fail easily with concurrent commit
// on metadata
// TODO: reset iceberg sink to have multiple parallelism
info!("setting iceberg sink parallelism to singleton");
RequiredDist::single()
}
Some(s) if s == "jdbc" && sink_type == SinkType::Upsert => {
if sink_type == SinkType::Upsert && downstream_pk.is_empty() {
return Err(ErrorCode::SinkError(Box::new(Error::new(
Expand Down

0 comments on commit b0f3eee

Please sign in to comment.