From 256cf92c1ac04af885ca406a5a9d97a6acb7d4e0 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Tue, 20 Feb 2024 14:51:25 +0800 Subject: [PATCH 1/5] add http sink demo --- integration_tests/http-sink/README.md | 26 +++++++++++++ integration_tests/http-sink/create_mv.sql | 6 +++ integration_tests/http-sink/create_sink.sql | 11 ++++++ integration_tests/http-sink/create_source.sql | 18 +++++++++ .../http-sink/docker-compose.yml | 37 +++++++++++++++++++ .../risingwave-connector-service/pom.xml | 1 - 6 files changed, 98 insertions(+), 1 deletion(-) create mode 100644 integration_tests/http-sink/README.md create mode 100644 integration_tests/http-sink/create_mv.sql create mode 100644 integration_tests/http-sink/create_sink.sql create mode 100644 integration_tests/http-sink/create_source.sql create mode 100644 integration_tests/http-sink/docker-compose.yml diff --git a/integration_tests/http-sink/README.md b/integration_tests/http-sink/README.md new file mode 100644 index 0000000000000..cdfcfe71f95e4 --- /dev/null +++ b/integration_tests/http-sink/README.md @@ -0,0 +1,26 @@ +# Demo: Sinking to Http + +In this demo, we want to showcase how RisingWave is able to sink data to Http. + +1. Launch the cluster: + +```sh +docker-compose up -d +``` + +The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data. + +2. Build an http server that can be built on its own + +3. Execute the SQL queries in sequence: + +- create_source.sql +- create_mv.sql +- create_sink.sql + +3. Check the contents in Http: +On the http server side it will receive the json string, something like: +``` +{"user_id":5,"target_id":"siFqrkdlCn"} +``` +The number of json strings is 1000 diff --git a/integration_tests/http-sink/create_mv.sql b/integration_tests/http-sink/create_mv.sql new file mode 100644 index 0000000000000..8a291a3c95ea7 --- /dev/null +++ b/integration_tests/http-sink/create_mv.sql @@ -0,0 +1,6 @@ +CREATE MATERIALIZED VIEW bhv_mv AS +SELECT + user_id, + target_id +FROM + user_behaviors; diff --git a/integration_tests/http-sink/create_sink.sql b/integration_tests/http-sink/create_sink.sql new file mode 100644 index 0000000000000..0644d1d51934b --- /dev/null +++ b/integration_tests/http-sink/create_sink.sql @@ -0,0 +1,11 @@ +CREATE sink bhv_http_sink FROM bhv_mv WITH ( + connector = 'http', + url = 'http://localhost:8080/endpoint', + format = 'json', + type = 'append-only', + force_append_only='true', + primary_key = 'user_id', + gid.connector.http.sink.header.Origin = '*', + "gid.connector.http.sink.header.X-Content-Type-Options" = 'nosniff', + "gid.connector.http.sink.header.Content-Type" = 'application/json' +); \ No newline at end of file diff --git a/integration_tests/http-sink/create_source.sql b/integration_tests/http-sink/create_source.sql new file mode 100644 index 0000000000000..c28c10f3616da --- /dev/null +++ b/integration_tests/http-sink/create_source.sql @@ -0,0 +1,18 @@ +CREATE table user_behaviors ( + user_id int, + target_id VARCHAR, + target_type VARCHAR, + event_timestamp TIMESTAMP, + behavior_type VARCHAR, + parent_target_type VARCHAR, + parent_target_id VARCHAR, + PRIMARY KEY(user_id) +) WITH ( + connector = 'datagen', + fields.user_id.kind = 'sequence', + fields.user_id.start = '1', + fields.user_id.end = '1000', + fields.user_name.kind = 'random', + fields.user_name.length = '10', + datagen.rows.per.second = '10' +) FORMAT PLAIN ENCODE JSON; \ No newline at end of file diff --git a/integration_tests/http-sink/docker-compose.yml b/integration_tests/http-sink/docker-compose.yml new file mode 100644 index 0000000000000..8fba5ff352dc0 --- /dev/null +++ b/integration_tests/http-sink/docker-compose.yml @@ -0,0 +1,37 @@ +--- +version: "3" +services: + risingwave-standalone: + extends: + file: ../../docker/docker-compose.yml + service: risingwave-standalone + etcd-0: + extends: + file: ../../docker/docker-compose.yml + service: etcd-0 + grafana-0: + extends: + file: ../../docker/docker-compose.yml + service: grafana-0 + minio-0: + extends: + file: ../../docker/docker-compose.yml + service: minio-0 + prometheus-0: + extends: + file: ../../docker/docker-compose.yml + service: prometheus-0 +volumes: + risingwave-standalone: + external: false + etcd-0: + external: false + grafana-0: + external: false + minio-0: + external: false + prometheus-0: + external: false + message_queue: + external: false +name: risingwave-compose diff --git a/java/connector-node/risingwave-connector-service/pom.xml b/java/connector-node/risingwave-connector-service/pom.xml index 047c523c1c7db..d51d67497ce05 100644 --- a/java/connector-node/risingwave-connector-service/pom.xml +++ b/java/connector-node/risingwave-connector-service/pom.xml @@ -99,7 +99,6 @@ com.risingwave risingwave-sink-mock-flink-http-sink - provided From a33c65a1dda35958cf0e9eb988bf709546eee9bc Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Tue, 20 Feb 2024 14:53:43 +0800 Subject: [PATCH 2/5] fix --- integration_tests/http-sink/README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/integration_tests/http-sink/README.md b/integration_tests/http-sink/README.md index cdfcfe71f95e4..ed7d6d7d3d739 100644 --- a/integration_tests/http-sink/README.md +++ b/integration_tests/http-sink/README.md @@ -10,7 +10,7 @@ docker-compose up -d The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data. -2. Build an http server that can be built on its own +2. Build an Http Server that can be built on its own 3. Execute the SQL queries in sequence: @@ -18,9 +18,9 @@ The cluster contains a RisingWave cluster and its necessary dependencies, a data - create_mv.sql - create_sink.sql -3. Check the contents in Http: -On the http server side it will receive the json string, something like: +3. Check the contents in Http Server: +On the Http Server side it will receive the json string, something like: ``` {"user_id":5,"target_id":"siFqrkdlCn"} ``` -The number of json strings is 1000 +The number of json is 1000 From f95d334c69c37675a4e254a44e020790e6495f2e Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Tue, 20 Feb 2024 14:55:31 +0800 Subject: [PATCH 3/5] add --- integration_tests/http-sink/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration_tests/http-sink/README.md b/integration_tests/http-sink/README.md index ed7d6d7d3d739..dd59abe462c87 100644 --- a/integration_tests/http-sink/README.md +++ b/integration_tests/http-sink/README.md @@ -18,7 +18,7 @@ The cluster contains a RisingWave cluster and its necessary dependencies, a data - create_mv.sql - create_sink.sql -3. Check the contents in Http Server: +4. Check the contents in Http Server: On the Http Server side it will receive the json string, something like: ``` {"user_id":5,"target_id":"siFqrkdlCn"} From fc3df74671a8ac453b6a8b4166a73c23ec6d3691 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Wed, 21 Feb 2024 17:38:40 +0800 Subject: [PATCH 4/5] add doc --- integration_tests/http-sink/README.md | 10 +++++++++- .../mock/flink/http/HttpFlinkMockSinkFactory.java | 2 ++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/integration_tests/http-sink/README.md b/integration_tests/http-sink/README.md index dd59abe462c87..d956cb4ea95a4 100644 --- a/integration_tests/http-sink/README.md +++ b/integration_tests/http-sink/README.md @@ -1,7 +1,15 @@ # Demo: Sinking to Http -In this demo, we want to showcase how RisingWave is able to sink data to Http. +In this demo, we want to showcase how RisingWave is able to sink data to Http. This feature is depended on https://github.com/getindata/flink-http-connector. +It has a few limitations: +1. It offers only two options for HTTP method, i.e, PUT and POST. +2. It can only execute one request-reply round to the service (session-less). +3. It cannot handle status codes in the SQL API. + +Therefore, we suggest you to try Python UDF at first. + +### Demo: 1. Launch the cluster: ```sh diff --git a/java/connector-node/risingwave-sink-mock-flink/risingwave-sink-mock-flink-http-sink/src/main/java/com/risingwave/mock/flink/http/HttpFlinkMockSinkFactory.java b/java/connector-node/risingwave-sink-mock-flink/risingwave-sink-mock-flink-http-sink/src/main/java/com/risingwave/mock/flink/http/HttpFlinkMockSinkFactory.java index a969dddd620f7..d981917635298 100644 --- a/java/connector-node/risingwave-sink-mock-flink/risingwave-sink-mock-flink-http-sink/src/main/java/com/risingwave/mock/flink/http/HttpFlinkMockSinkFactory.java +++ b/java/connector-node/risingwave-sink-mock-flink/risingwave-sink-mock-flink-http-sink/src/main/java/com/risingwave/mock/flink/http/HttpFlinkMockSinkFactory.java @@ -26,6 +26,8 @@ /** * The `FlinkMockSinkFactory` implementation of the http sink is responsible for creating the http * counterpart of the `DynamicTableSinkFactory`. And `validate` don't need to do anything. + * + * This feature is depended on https://github.com/getindata/flink-http-connector */ public class HttpFlinkMockSinkFactory implements FlinkMockSinkFactory { @Override From 768f803952e9118b1c73680ceb05628af6132642 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Wed, 21 Feb 2024 18:01:30 +0800 Subject: [PATCH 5/5] fix --- .../risingwave/mock/flink/http/HttpFlinkMockSinkFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/connector-node/risingwave-sink-mock-flink/risingwave-sink-mock-flink-http-sink/src/main/java/com/risingwave/mock/flink/http/HttpFlinkMockSinkFactory.java b/java/connector-node/risingwave-sink-mock-flink/risingwave-sink-mock-flink-http-sink/src/main/java/com/risingwave/mock/flink/http/HttpFlinkMockSinkFactory.java index d981917635298..d316eeae74bed 100644 --- a/java/connector-node/risingwave-sink-mock-flink/risingwave-sink-mock-flink-http-sink/src/main/java/com/risingwave/mock/flink/http/HttpFlinkMockSinkFactory.java +++ b/java/connector-node/risingwave-sink-mock-flink/risingwave-sink-mock-flink-http-sink/src/main/java/com/risingwave/mock/flink/http/HttpFlinkMockSinkFactory.java @@ -27,7 +27,7 @@ * The `FlinkMockSinkFactory` implementation of the http sink is responsible for creating the http * counterpart of the `DynamicTableSinkFactory`. And `validate` don't need to do anything. * - * This feature is depended on https://github.com/getindata/flink-http-connector + *

This feature is depended on https://github.com/getindata/flink-http-connector */ public class HttpFlinkMockSinkFactory implements FlinkMockSinkFactory { @Override