Skip to content

Commit

Permalink
feat(sink demo): Add http sink demo (#15149)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored Feb 23, 2024
1 parent d8cca2a commit 316f180
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 1 deletion.
34 changes: 34 additions & 0 deletions integration_tests/http-sink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Demo: Sinking to Http

In this demo, we want to showcase how RisingWave is able to sink data to Http. This feature is depended on https://github.com/getindata/flink-http-connector.

It has a few limitations:
1. It offers only two options for HTTP method, i.e, PUT and POST.
2. It can only execute one request-reply round to the service (session-less).
3. It cannot handle status codes in the SQL API.

Therefore, we suggest you to try Python UDF at first.

### Demo:
1. Launch the cluster:

```sh
docker-compose up -d
```

The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data.

2. Build an Http Server that can be built on its own

3. Execute the SQL queries in sequence:

- create_source.sql
- create_mv.sql
- create_sink.sql

4. Check the contents in Http Server:
On the Http Server side it will receive the json string, something like:
```
{"user_id":5,"target_id":"siFqrkdlCn"}
```
The number of json is 1000
6 changes: 6 additions & 0 deletions integration_tests/http-sink/create_mv.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE MATERIALIZED VIEW bhv_mv AS
SELECT
user_id,
target_id
FROM
user_behaviors;
11 changes: 11 additions & 0 deletions integration_tests/http-sink/create_sink.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
CREATE sink bhv_http_sink FROM bhv_mv WITH (
connector = 'http',
url = 'http://localhost:8080/endpoint',
format = 'json',
type = 'append-only',
force_append_only='true',
primary_key = 'user_id',
gid.connector.http.sink.header.Origin = '*',
"gid.connector.http.sink.header.X-Content-Type-Options" = 'nosniff',
"gid.connector.http.sink.header.Content-Type" = 'application/json'
);
18 changes: 18 additions & 0 deletions integration_tests/http-sink/create_source.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
CREATE table user_behaviors (
user_id int,
target_id VARCHAR,
target_type VARCHAR,
event_timestamp TIMESTAMP,
behavior_type VARCHAR,
parent_target_type VARCHAR,
parent_target_id VARCHAR,
PRIMARY KEY(user_id)
) WITH (
connector = 'datagen',
fields.user_id.kind = 'sequence',
fields.user_id.start = '1',
fields.user_id.end = '1000',
fields.user_name.kind = 'random',
fields.user_name.length = '10',
datagen.rows.per.second = '10'
) FORMAT PLAIN ENCODE JSON;
37 changes: 37 additions & 0 deletions integration_tests/http-sink/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
---
version: "3"
services:
risingwave-standalone:
extends:
file: ../../docker/docker-compose.yml
service: risingwave-standalone
etcd-0:
extends:
file: ../../docker/docker-compose.yml
service: etcd-0
grafana-0:
extends:
file: ../../docker/docker-compose.yml
service: grafana-0
minio-0:
extends:
file: ../../docker/docker-compose.yml
service: minio-0
prometheus-0:
extends:
file: ../../docker/docker-compose.yml
service: prometheus-0
volumes:
risingwave-standalone:
external: false
etcd-0:
external: false
grafana-0:
external: false
minio-0:
external: false
prometheus-0:
external: false
message_queue:
external: false
name: risingwave-compose
1 change: 0 additions & 1 deletion java/connector-node/risingwave-connector-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@
<dependency>
<groupId>com.risingwave</groupId>
<artifactId>risingwave-sink-mock-flink-http-sink</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
/**
* The `FlinkMockSinkFactory` implementation of the http sink is responsible for creating the http
* counterpart of the `DynamicTableSinkFactory`. And `validate` don't need to do anything.
*
* <p>This feature is depended on https://github.com/getindata/flink-http-connector
*/
public class HttpFlinkMockSinkFactory implements FlinkMockSinkFactory {
@Override
Expand Down

0 comments on commit 316f180

Please sign in to comment.