Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: deprecate SinkPayloadFormat #16723

Merged
merged 11 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions .github/workflows/connector-node-integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ name: Connector Node Integration Tests

on:
push:
branches: [main]
branches: [ main ]
pull_request:
branches: [main]
branches: [ main ]
merge_group:
types: [checks_requested]
types: [ checks_requested ]

jobs:
build:
Expand Down Expand Up @@ -42,4 +42,8 @@ jobs:
echo "--- build connector node"
cd ${RISINGWAVE_ROOT}/java
# run unit test
mvn --batch-mode --update-snapshots clean package -Dno-build-rust
# WARN: `testOnNext_writeValidation` is skipped because it relies on Rust code to decode message,
# while we don't build Rust code (`-Dno-build-rust`) here to save time
mvn --batch-mode --update-snapshots clean package -Dno-build-rust \
'-Dtest=!com.risingwave.connector.sink.SinkStreamObserverTest#testOnNext_writeValidation' \
-Dsurefire.failIfNoSpecifiedTests=false
6 changes: 3 additions & 3 deletions ci/scripts/connector-node-integration-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,15 @@ export PYTHONPATH=proto

echo "--- running streamchunk data format integration tests"
cd "${RISINGWAVE_ROOT}"/java/connector-node/python-client
if python3 integration_tests.py --stream_chunk_format_test --input_binary_file="./data/stream_chunk_data" --data_format_use_json=False; then
if python3 integration_tests.py --stream_chunk_format_test --input_binary_file="./data/stream_chunk_data"; then
echo "StreamChunk data format test passed"
else
echo "StreamChunk data format test failed"
exit 1
fi

sink_input_feature=("--input_binary_file=./data/sink_input --data_format_use_json=False")
upsert_sink_input_feature=("--input_binary_file=./data/upsert_sink_input --data_format_use_json=False")
sink_input_feature=("--input_binary_file=./data/sink_input")
upsert_sink_input_feature=("--input_binary_file=./data/upsert_sink_input")
type=("StreamChunk format")

${MC_PATH} mb minio/bucket
Expand Down
3 changes: 2 additions & 1 deletion java/connector-node/python-client/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
.vscode
.vscode
sink-client-venv/
21 changes: 2 additions & 19 deletions java/connector-node/python-client/integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def load_stream_chunk_payload(input_file):
return payloads


def test_sink(prop, format, payload_input, table_schema, is_coordinated=False):
def test_sink(prop, payload_input, table_schema, is_coordinated=False):
sink_param = connector_service_pb2.SinkParam(
sink_id=0,
properties=prop,
Expand All @@ -128,7 +128,6 @@ def test_sink(prop, format, payload_input, table_schema, is_coordinated=False):
request_list = [
connector_service_pb2.SinkWriterStreamRequest(
start=connector_service_pb2.SinkWriterStreamRequest.StartSink(
format=format,
sink_param=sink_param,
)
)
Expand Down Expand Up @@ -291,9 +290,6 @@ def test_stream_chunk_data_format(param):
parser.add_argument(
"--deltalake_sink", action="store_true", help="run deltalake sink test"
)
parser.add_argument(
"--input_file", default="./data/sink_input.json", help="input data to run tests"
)
parser.add_argument(
"--input_binary_file",
default="./data/sink_input",
Expand All @@ -302,29 +298,18 @@ def test_stream_chunk_data_format(param):
parser.add_argument(
"--es_sink", action="store_true", help="run elasticsearch sink test"
)
parser.add_argument(
"--data_format_use_json", default=True, help="choose json or streamchunk"
)
args = parser.parse_args()
use_json = args.data_format_use_json == True or args.data_format_use_json == "True"
if use_json:
payload = load_json_payload(args.input_file)
format = connector_service_pb2.SinkPayloadFormat.JSON
else:
payload = load_stream_chunk_payload(args.input_binary_file)
format = connector_service_pb2.SinkPayloadFormat.STREAM_CHUNK
payload = load_stream_chunk_payload(args.input_binary_file)

# stream chunk format
if args.stream_chunk_format_test:
param = {
"format": format,
"payload_input": payload,
"table_schema": make_mock_schema_stream_chunk(),
}
test_stream_chunk_data_format(param)

param = {
"format": format,
"payload_input": payload,
"table_schema": make_mock_schema(),
}
Expand All @@ -337,7 +322,5 @@ def test_stream_chunk_data_format(param):
test_deltalake_sink(param)
if args.es_sink:
test_elasticsearch_sink(param)

# json format
if args.upsert_iceberg_sink:
test_upsert_iceberg_sink(param)

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -206,19 +206,7 @@ private void bindSink(ConnectorServiceProto.SinkWriterStreamRequest.StartSink st
String connectorName = getConnectorName(sinkParam);
SinkFactory sinkFactory = SinkUtils.getSinkFactory(connectorName);
sink = sinkFactory.createWriter(tableSchema, sinkParam.getPropertiesMap());
switch (startSink.getFormat()) {
case FORMAT_UNSPECIFIED:
case UNRECOGNIZED:
throw INVALID_ARGUMENT
.withDescription("should specify payload format in request")
.asRuntimeException();
case JSON:
deserializer = new JsonDeserializer(tableSchema);
break;
case STREAM_CHUNK:
deserializer = new StreamChunkDeserializer(tableSchema);
break;
}
deserializer = new StreamChunkDeserializer(tableSchema);
this.connectorName = connectorName.toUpperCase();
ConnectorNodeMetrics.incActiveSinkConnections(connectorName, "node1");
}
Expand Down
Loading
Loading