Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into bz/embedded-jaeger-ui
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Jan 3, 2024
2 parents 8a079a0 + 035da38 commit b4bbb1a
Show file tree
Hide file tree
Showing 107 changed files with 3,511 additions and 1,968 deletions.
39 changes: 0 additions & 39 deletions ci/scripts/connector-node-integration-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -113,45 +113,6 @@ for ((i=0; i<${#type[@]}; i++)); do
echo "File sink ${type[i]} test failed"
exit 1
fi

# test upsert mode
echo "--- running iceberg upsert mode ${type[i]} integration tests"
cd ${RISINGWAVE_ROOT}/java/connector-node/python-client
python3 pyspark-util.py create_iceberg
if python3 integration_tests.py --upsert_iceberg_sink ${upsert_sink_input_feature[i]}; then
python3 pyspark-util.py test_upsert_iceberg --input_file="./data/upsert_sink_input.json"
echo "Upsert iceberg sink ${type[i]} test passed"
else
echo "Upsert iceberg sink ${type[i]} test failed"
exit 1
fi
python3 pyspark-util.py drop_iceberg

# test append-only mode
echo "--- running iceberg append-only mode ${type[i]} integration tests"
cd ${RISINGWAVE_ROOT}/java/connector-node/python-client
python3 pyspark-util.py create_iceberg
if python3 integration_tests.py --iceberg_sink ${sink_input_feature[i]}; then
python3 pyspark-util.py test_iceberg
echo "Iceberg sink ${type[i]} test passed"
else
echo "Iceberg sink ${type[i]} test failed"
exit 1
fi
python3 pyspark-util.py drop_iceberg

# test append-only mode
echo "--- running deltalake append-only mod ${type[i]} integration tests"
cd ${RISINGWAVE_ROOT}/java/connector-node/python-client
python3 pyspark-util.py create_deltalake
if python3 integration_tests.py --deltalake_sink ${sink_input_feature[i]}; then
python3 pyspark-util.py test_deltalake
echo "Deltalake sink ${type[i]} test passed"
else
echo "Deltalake sink ${type[i]} test failed"
exit 1
fi
python3 pyspark-util.py clean_deltalake
done


Expand Down
1 change: 1 addition & 0 deletions ci/scripts/e2e-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ sqllogictest -p 4566 -d dev './e2e_test/sink/append_only_sink.slt'
sqllogictest -p 4566 -d dev './e2e_test/sink/create_sink_as.slt'
sqllogictest -p 4566 -d dev './e2e_test/sink/blackhole_sink.slt'
sqllogictest -p 4566 -d dev './e2e_test/sink/remote/types.slt'
sqllogictest -p 4566 -d dev './e2e_test/sink/sink_into_table.slt'
sleep 1

echo "--- testing remote sinks"
Expand Down
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion e2e_test/sink/deltalake_rust_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;
statement ok
create sink s6 as select * from mv6
with (
connector = 'deltalake_rust',
connector = 'deltalake',
type = 'append-only',
force_append_only = 'true',
location = 's3a://deltalake/deltalake-test',
Expand Down
8 changes: 5 additions & 3 deletions e2e_test/sink/iceberg_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@ CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;

statement ok
CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH (
connector = 'iceberg_java',
connector = 'iceberg',
type = 'upsert',
primary_key = 'v1',
warehouse.path = 's3://iceberg',
s3.endpoint = 'http://127.0.0.1:9301',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
database.name='demo_db',
table.name='demo_table'
s3.region = 'us-east-1',
catalog.type = 'storage',
database.name='demo',
table.name='demo_db.demo_table'
);

statement ok
Expand Down
3 changes: 3 additions & 0 deletions e2e_test/sink/sink_into_table.slt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ SET RW_IMPLICIT_FLUSH TO true;
statement ok
create table t_simple (v1 int, v2 int);

statement error unsupported sink type table
create sink table_sink from t_simple with (connector = 'table');

statement ok
create table m_simple (v1 int primary key, v2 int);

Expand Down
30 changes: 14 additions & 16 deletions e2e_test/streaming/over_window/main.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,20 @@ set rw_streaming_over_window_cache_policy = full;

include ./generated/main.slt.part

# TODO(rc): The following tests are temporarily commented out because of recovery test failure.

#statement ok
#set rw_streaming_over_window_cache_policy = recent;
#
#include ./generated/main.slt.part
#
#statement ok
#set rw_streaming_over_window_cache_policy = recent_first_n;
#
#include ./generated/main.slt.part
#
#statement ok
#set rw_streaming_over_window_cache_policy = recent_last_n;
#
#include ./generated/main.slt.part
statement ok
set rw_streaming_over_window_cache_policy = recent;

include ./generated/main.slt.part

statement ok
set rw_streaming_over_window_cache_policy = recent_first_n;

include ./generated/main.slt.part

statement ok
set rw_streaming_over_window_cache_policy = recent_last_n;

include ./generated/main.slt.part

statement ok
set rw_streaming_over_window_cache_policy = default;
8 changes: 7 additions & 1 deletion grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,13 @@ def section_compaction(outer_panels):
[
panels.target(
f"avg({metric('storage_compact_task_pending_num')}) by({COMPONENT_LABEL}, {NODE_LABEL})",
"compactor_task_split_count - {{%s}} @ {{%s}}"
"compactor_task_count - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),

panels.target(
f"avg({metric('storage_compact_task_pending_parallelism')}) by({COMPONENT_LABEL}, {NODE_LABEL})",
"compactor_task_pending_parallelism - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),
],
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions integration_tests/iceberg-sink/create_sink.sql
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
CREATE SINK bhv_iceberg_sink
FROM
bhv_mv WITH (
connector = 'iceberg_java',
connector = 'iceberg',
type = 'upsert',
primary_key = 'user_id, target_id, event_timestamp',
warehouse.path = 's3://hummock001/iceberg-data',
s3.endpoint = 'http://minio-0:9301',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
database.name='demo_db',
table.name='demo_table'
s3.region = 'us-east-1',
catalog.type = 'storage',
database.name='demo',
table.name='demo_db.demo_table'
);
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public MetaClient(String metaAddr, ScheduledExecutorService scheduler) {
.build();
AddWorkerNodeResponse resp = clusterStub.addWorkerNode(req);

this.workerId = resp.getNode().getId();
this.workerId = resp.getNodeId();
}

public HummockVersion pinVersion() {
Expand Down
5 changes: 1 addition & 4 deletions java/connector-node/assembly/assembly.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,8 @@
<include>*:risingwave-sink-es-7</include>
<include>*:risingwave-sink-cassandra</include>
<include>*:risingwave-sink-jdbc</include>
<include>*:risingwave-sink-iceberg</include>
<include>*:risingwave-sink-deltalake</include>
<include>*:risingwave-sink-mock-flink-http-sink</include>

<!-- For S3-->
<include>*:s3-common</include>
</includes>
<useTransitiveDependencies>true</useTransitiveDependencies>
<useTransitiveFiltering>true</useTransitiveFiltering>
Expand Down
16 changes: 6 additions & 10 deletions java/connector-node/risingwave-connector-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@
<groupId>com.risingwave</groupId>
<artifactId>connector-api</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>


<dependency>
<groupId>com.google.code.gson</groupId>
Expand Down Expand Up @@ -77,16 +82,6 @@
<artifactId>risingwave-sink-jdbc</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.risingwave</groupId>
<artifactId>risingwave-sink-iceberg</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.risingwave</groupId>
<artifactId>risingwave-sink-deltalake</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.risingwave</groupId>
<artifactId>risingwave-sink-es-7</artifactId>
Expand All @@ -104,6 +99,7 @@
<dependency>
<groupId>com.risingwave</groupId>
<artifactId>risingwave-sink-mock-flink-http-sink</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ public static SinkFactory getSinkFactory(String sinkName) {
return new FileSinkFactory();
case "jdbc":
return new JDBCSinkFactory();
case "iceberg_java":
return new IcebergSinkFactory();
case "deltalake":
return new DeltaLakeSinkFactory();
case "elasticsearch":
return new EsSinkFactory();
case "cassandra":
Expand Down
4 changes: 4 additions & 0 deletions java/connector-node/risingwave-sink-es-7/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
</dependencies>

</project>
Loading

0 comments on commit b4bbb1a

Please sign in to comment.