From 256cf92c1ac04af885ca406a5a9d97a6acb7d4e0 Mon Sep 17 00:00:00 2001
From: xxhZs <1060434431@qq.com>
Date: Tue, 20 Feb 2024 14:51:25 +0800
Subject: [PATCH 1/5] add http sink demo
---
integration_tests/http-sink/README.md | 26 +++++++++++++
integration_tests/http-sink/create_mv.sql | 6 +++
integration_tests/http-sink/create_sink.sql | 11 ++++++
integration_tests/http-sink/create_source.sql | 18 +++++++++
.../http-sink/docker-compose.yml | 37 +++++++++++++++++++
.../risingwave-connector-service/pom.xml | 1 -
6 files changed, 98 insertions(+), 1 deletion(-)
create mode 100644 integration_tests/http-sink/README.md
create mode 100644 integration_tests/http-sink/create_mv.sql
create mode 100644 integration_tests/http-sink/create_sink.sql
create mode 100644 integration_tests/http-sink/create_source.sql
create mode 100644 integration_tests/http-sink/docker-compose.yml
diff --git a/integration_tests/http-sink/README.md b/integration_tests/http-sink/README.md
new file mode 100644
index 0000000000000..cdfcfe71f95e4
--- /dev/null
+++ b/integration_tests/http-sink/README.md
@@ -0,0 +1,26 @@
+# Demo: Sinking to Http
+
+In this demo, we want to showcase how RisingWave is able to sink data to Http.
+
+1. Launch the cluster:
+
+```sh
+docker-compose up -d
+```
+
+The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data.
+
+2. Build an http server that can be built on its own
+
+3. Execute the SQL queries in sequence:
+
+- create_source.sql
+- create_mv.sql
+- create_sink.sql
+
+3. Check the contents in Http:
+On the http server side it will receive the json string, something like:
+```
+{"user_id":5,"target_id":"siFqrkdlCn"}
+```
+The number of json strings is 1000
diff --git a/integration_tests/http-sink/create_mv.sql b/integration_tests/http-sink/create_mv.sql
new file mode 100644
index 0000000000000..8a291a3c95ea7
--- /dev/null
+++ b/integration_tests/http-sink/create_mv.sql
@@ -0,0 +1,6 @@
+CREATE MATERIALIZED VIEW bhv_mv AS
+SELECT
+ user_id,
+ target_id
+FROM
+ user_behaviors;
diff --git a/integration_tests/http-sink/create_sink.sql b/integration_tests/http-sink/create_sink.sql
new file mode 100644
index 0000000000000..0644d1d51934b
--- /dev/null
+++ b/integration_tests/http-sink/create_sink.sql
@@ -0,0 +1,11 @@
+CREATE sink bhv_http_sink FROM bhv_mv WITH (
+ connector = 'http',
+ url = 'http://localhost:8080/endpoint',
+ format = 'json',
+ type = 'append-only',
+ force_append_only='true',
+ primary_key = 'user_id',
+ gid.connector.http.sink.header.Origin = '*',
+ "gid.connector.http.sink.header.X-Content-Type-Options" = 'nosniff',
+ "gid.connector.http.sink.header.Content-Type" = 'application/json'
+);
\ No newline at end of file
diff --git a/integration_tests/http-sink/create_source.sql b/integration_tests/http-sink/create_source.sql
new file mode 100644
index 0000000000000..c28c10f3616da
--- /dev/null
+++ b/integration_tests/http-sink/create_source.sql
@@ -0,0 +1,18 @@
+CREATE table user_behaviors (
+ user_id int,
+ target_id VARCHAR,
+ target_type VARCHAR,
+ event_timestamp TIMESTAMP,
+ behavior_type VARCHAR,
+ parent_target_type VARCHAR,
+ parent_target_id VARCHAR,
+ PRIMARY KEY(user_id)
+) WITH (
+ connector = 'datagen',
+ fields.user_id.kind = 'sequence',
+ fields.user_id.start = '1',
+ fields.user_id.end = '1000',
+ fields.user_name.kind = 'random',
+ fields.user_name.length = '10',
+ datagen.rows.per.second = '10'
+) FORMAT PLAIN ENCODE JSON;
\ No newline at end of file
diff --git a/integration_tests/http-sink/docker-compose.yml b/integration_tests/http-sink/docker-compose.yml
new file mode 100644
index 0000000000000..8fba5ff352dc0
--- /dev/null
+++ b/integration_tests/http-sink/docker-compose.yml
@@ -0,0 +1,37 @@
+---
+version: "3"
+services:
+ risingwave-standalone:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: risingwave-standalone
+ etcd-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: etcd-0
+ grafana-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: grafana-0
+ minio-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: minio-0
+ prometheus-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: prometheus-0
+volumes:
+ risingwave-standalone:
+ external: false
+ etcd-0:
+ external: false
+ grafana-0:
+ external: false
+ minio-0:
+ external: false
+ prometheus-0:
+ external: false
+ message_queue:
+ external: false
+name: risingwave-compose
diff --git a/java/connector-node/risingwave-connector-service/pom.xml b/java/connector-node/risingwave-connector-service/pom.xml
index 047c523c1c7db..d51d67497ce05 100644
--- a/java/connector-node/risingwave-connector-service/pom.xml
+++ b/java/connector-node/risingwave-connector-service/pom.xml
@@ -99,7 +99,6 @@
com.risingwave
risingwave-sink-mock-flink-http-sink
- provided
From a33c65a1dda35958cf0e9eb988bf709546eee9bc Mon Sep 17 00:00:00 2001
From: xxhZs <1060434431@qq.com>
Date: Tue, 20 Feb 2024 14:53:43 +0800
Subject: [PATCH 2/5] fix
---
integration_tests/http-sink/README.md | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git a/integration_tests/http-sink/README.md b/integration_tests/http-sink/README.md
index cdfcfe71f95e4..ed7d6d7d3d739 100644
--- a/integration_tests/http-sink/README.md
+++ b/integration_tests/http-sink/README.md
@@ -10,7 +10,7 @@ docker-compose up -d
The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data.
-2. Build an http server that can be built on its own
+2. Build an Http Server that can be built on its own
3. Execute the SQL queries in sequence:
@@ -18,9 +18,9 @@ The cluster contains a RisingWave cluster and its necessary dependencies, a data
- create_mv.sql
- create_sink.sql
-3. Check the contents in Http:
-On the http server side it will receive the json string, something like:
+3. Check the contents in Http Server:
+On the Http Server side it will receive the json string, something like:
```
{"user_id":5,"target_id":"siFqrkdlCn"}
```
-The number of json strings is 1000
+The number of json is 1000
From f95d334c69c37675a4e254a44e020790e6495f2e Mon Sep 17 00:00:00 2001
From: xxhZs <1060434431@qq.com>
Date: Tue, 20 Feb 2024 14:55:31 +0800
Subject: [PATCH 3/5] add
---
integration_tests/http-sink/README.md | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/integration_tests/http-sink/README.md b/integration_tests/http-sink/README.md
index ed7d6d7d3d739..dd59abe462c87 100644
--- a/integration_tests/http-sink/README.md
+++ b/integration_tests/http-sink/README.md
@@ -18,7 +18,7 @@ The cluster contains a RisingWave cluster and its necessary dependencies, a data
- create_mv.sql
- create_sink.sql
-3. Check the contents in Http Server:
+4. Check the contents in Http Server:
On the Http Server side it will receive the json string, something like:
```
{"user_id":5,"target_id":"siFqrkdlCn"}
From fc3df74671a8ac453b6a8b4166a73c23ec6d3691 Mon Sep 17 00:00:00 2001
From: xxhZs <1060434431@qq.com>
Date: Wed, 21 Feb 2024 17:38:40 +0800
Subject: [PATCH 4/5] add doc
---
integration_tests/http-sink/README.md | 10 +++++++++-
.../mock/flink/http/HttpFlinkMockSinkFactory.java | 2 ++
2 files changed, 11 insertions(+), 1 deletion(-)
diff --git a/integration_tests/http-sink/README.md b/integration_tests/http-sink/README.md
index dd59abe462c87..d956cb4ea95a4 100644
--- a/integration_tests/http-sink/README.md
+++ b/integration_tests/http-sink/README.md
@@ -1,7 +1,15 @@
# Demo: Sinking to Http
-In this demo, we want to showcase how RisingWave is able to sink data to Http.
+In this demo, we want to showcase how RisingWave is able to sink data to Http. This feature is depended on https://github.com/getindata/flink-http-connector.
+It has a few limitations:
+1. It offers only two options for HTTP method, i.e, PUT and POST.
+2. It can only execute one request-reply round to the service (session-less).
+3. It cannot handle status codes in the SQL API.
+
+Therefore, we suggest you to try Python UDF at first.
+
+### Demo:
1. Launch the cluster:
```sh
diff --git a/java/connector-node/risingwave-sink-mock-flink/risingwave-sink-mock-flink-http-sink/src/main/java/com/risingwave/mock/flink/http/HttpFlinkMockSinkFactory.java b/java/connector-node/risingwave-sink-mock-flink/risingwave-sink-mock-flink-http-sink/src/main/java/com/risingwave/mock/flink/http/HttpFlinkMockSinkFactory.java
index a969dddd620f7..d981917635298 100644
--- a/java/connector-node/risingwave-sink-mock-flink/risingwave-sink-mock-flink-http-sink/src/main/java/com/risingwave/mock/flink/http/HttpFlinkMockSinkFactory.java
+++ b/java/connector-node/risingwave-sink-mock-flink/risingwave-sink-mock-flink-http-sink/src/main/java/com/risingwave/mock/flink/http/HttpFlinkMockSinkFactory.java
@@ -26,6 +26,8 @@
/**
* The `FlinkMockSinkFactory` implementation of the http sink is responsible for creating the http
* counterpart of the `DynamicTableSinkFactory`. And `validate` don't need to do anything.
+ *
+ * This feature is depended on https://github.com/getindata/flink-http-connector
*/
public class HttpFlinkMockSinkFactory implements FlinkMockSinkFactory {
@Override
From 768f803952e9118b1c73680ceb05628af6132642 Mon Sep 17 00:00:00 2001
From: xxhZs <1060434431@qq.com>
Date: Wed, 21 Feb 2024 18:01:30 +0800
Subject: [PATCH 5/5] fix
---
.../risingwave/mock/flink/http/HttpFlinkMockSinkFactory.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/java/connector-node/risingwave-sink-mock-flink/risingwave-sink-mock-flink-http-sink/src/main/java/com/risingwave/mock/flink/http/HttpFlinkMockSinkFactory.java b/java/connector-node/risingwave-sink-mock-flink/risingwave-sink-mock-flink-http-sink/src/main/java/com/risingwave/mock/flink/http/HttpFlinkMockSinkFactory.java
index d981917635298..d316eeae74bed 100644
--- a/java/connector-node/risingwave-sink-mock-flink/risingwave-sink-mock-flink-http-sink/src/main/java/com/risingwave/mock/flink/http/HttpFlinkMockSinkFactory.java
+++ b/java/connector-node/risingwave-sink-mock-flink/risingwave-sink-mock-flink-http-sink/src/main/java/com/risingwave/mock/flink/http/HttpFlinkMockSinkFactory.java
@@ -27,7 +27,7 @@
* The `FlinkMockSinkFactory` implementation of the http sink is responsible for creating the http
* counterpart of the `DynamicTableSinkFactory`. And `validate` don't need to do anything.
*
- * This feature is depended on https://github.com/getindata/flink-http-connector
+ *
This feature is depended on https://github.com/getindata/flink-http-connector
*/
public class HttpFlinkMockSinkFactory implements FlinkMockSinkFactory {
@Override