Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): support starrocks sink #12681

Merged
merged 11 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions integration_tests/starrocks-sink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Demo: Sinking to Starrocks

In this demo, we want to showcase how RisingWave is able to sink data to Starrocks.


1. Launch the cluster:

```sh
docker-compose up -d
```

The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data, a Starrocks fe and be for sink.

2. Create the Starrocks table via mysql:

Login to mysql
```sh
docker compose exec starrocks-fe mysql -uroot -P9030 -h127.0.0.1
```

Run the following queries to create database and table.
```sql
CREATE database demo;
use demo;

CREATE table demo_bhv_table(
user_id int,
target_id text,
event_timestamp datetime
) ENGINE=OLAP
PRIMARY KEY(`user_id`)
DISTRIBUTED BY HASH(`user_id`) properties("replication_num" = "1");

CREATE USER 'users'@'%' IDENTIFIED BY '123456';
GRANT ALL ON *.* TO 'users'@'%';
```

3. Execute the SQL queries in sequence:

- append-only sql:
- append-only/create_source.sql
- append-only/create_mv.sql
- append-only/create_sink.sql

- upsert sql:
- upsert/create_table.sql
- upsert/create_mv.sql
- upsert/create_sink.sql
- upsert/insert_update_delete.sql

We only support `upsert` with starrocks' `PRIMARY KEY`

Run the following query
```sql
select user_id, count(*) from demo.demo_bhv_table group by user_id;
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE MATERIALIZED VIEW bhv_mv AS
SELECT
user_id,
target_id,
event_timestamp
FROM
user_behaviors;
14 changes: 14 additions & 0 deletions integration_tests/starrocks-sink/append-only-sql/create_sink.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
CREATE SINK bhv_starrocks_sink
FROM
bhv_mv WITH (
connector = 'starrocks',
type = 'append-only',
starrocks.host = 'starrocks-fe',
starrocks.mysqlport = '9030',
starrocks.httpport = '8030',
starrocks.user = 'users',
starrocks.password = '123456',
starrocks.database = 'demo',
starrocks.table = 'demo_bhv_table',
force_append_only='true'
);
18 changes: 18 additions & 0 deletions integration_tests/starrocks-sink/append-only-sql/create_source.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
CREATE table user_behaviors (
user_id int,
target_id VARCHAR,
target_type VARCHAR,
event_timestamp TIMESTAMP,
behavior_type VARCHAR,
parent_target_type VARCHAR,
parent_target_id VARCHAR,
PRIMARY KEY(user_id)
) WITH (
connector = 'datagen',
fields.user_id.kind = 'sequence',
fields.user_id.start = '1',
fields.user_id.end = '1000',
fields.user_name.kind = 'random',
fields.user_name.length = '10',
datagen.rows.per.second = '10'
) FORMAT PLAIN ENCODE JSON;
78 changes: 78 additions & 0 deletions integration_tests/starrocks-sink/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
---
version: "3"
services:
starrocks-fe:
image: starrocks/fe-ubuntu:latest
hostname: starrocks-fe
container_name: starrocks-fe
command:
/opt/starrocks/fe/bin/start_fe.sh
ports:
- 8030:8030
- 9020:9020
- 9030:9030
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9030"]
interval: 5s
timeout: 5s
retries: 30
starrocks-be:
image: starrocks/be-ubuntu:latest
command:
- /bin/bash
- -c
- |
sleep 15s; mysql --connect-timeout 2 -h starrocks-fe -P9030 -uroot -e "alter system add backend \"starrocks-be:9050\";"
/opt/starrocks/be/bin/start_be.sh
ports:
- 8040:8040
hostname: starrocks-be
container_name: starrocks-be
depends_on:
- starrocks-fe
compactor-0:
extends:
file: ../../docker/docker-compose.yml
service: compactor-0
compute-node-0:
extends:
file: ../../docker/docker-compose.yml
service: compute-node-0
etcd-0:
extends:
file: ../../docker/docker-compose.yml
service: etcd-0
frontend-node-0:
extends:
file: ../../docker/docker-compose.yml
service: frontend-node-0
grafana-0:
extends:
file: ../../docker/docker-compose.yml
service: grafana-0
meta-node-0:
extends:
file: ../../docker/docker-compose.yml
service: meta-node-0
minio-0:
extends:
file: ../../docker/docker-compose.yml
service: minio-0
prometheus-0:
extends:
file: ../../docker/docker-compose.yml
service: prometheus-0
volumes:
compute-node-0:
external: false
etcd-0:
external: false
grafana-0:
external: false
minio-0:
external: false
prometheus-0:
external: false
message_queue:
external: false
name: risingwave-compose
7 changes: 7 additions & 0 deletions integration_tests/starrocks-sink/upsert/create_mv.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE MATERIALIZED VIEW bhv_mv AS
SELECT
user_id,
target_id,
event_timestamp
FROM
user_behaviors;
14 changes: 14 additions & 0 deletions integration_tests/starrocks-sink/upsert/create_sink.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
CREATE SINK bhv_starrocks_sink
FROM
bhv_mv WITH (
connector = 'starrocks',
type = 'upsert',
starrocks.host = 'starrocks-fe',
starrocks.mysqlport = '9030',
starrocks.httpport = '8030',
starrocks.user = 'users',
starrocks.password = '123456',
starrocks.database = 'demo',
starrocks.table = 'demo_bhv_table',
primary_key = 'user_id'
);
10 changes: 10 additions & 0 deletions integration_tests/starrocks-sink/upsert/create_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
CREATE table user_behaviors (
user_id int,
target_id VARCHAR,
target_type VARCHAR,
event_timestamp TIMESTAMP,
behavior_type VARCHAR,
parent_target_type VARCHAR,
parent_target_id VARCHAR,
PRIMARY KEY(user_id)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
INSERT INTO user_behaviors VALUES(1,'1','1','2020-01-01 01:01:01','1','1','1'),
(2,'2','2','2020-01-01 01:01:02','2','2','2'),
(3,'3','3','2020-01-01 01:01:03','3','3','3'),
(4,'4','4','2020-01-01 01:01:04','4','4','4');

DELETE FROM user_behaviors WHERE user_id = 2;

UPDATE user_behaviors SET target_id = 30 WHERE user_id = 3;
1 change: 0 additions & 1 deletion src/connector/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,6 @@ impl KinesisCommon {
Ok(KinesisClient::from_conf(builder.build()))
}
}

#[derive(Debug, Serialize, Deserialize)]
pub struct UpsertMessage<'a> {
#[serde(borrow)]
Expand Down
Loading
Loading