Skip to content

Commit

Permalink
add integration test
Browse files Browse the repository at this point in the history
fix

fix
  • Loading branch information
xxhZs committed Oct 9, 2023
1 parent 6623771 commit b5b982d
Show file tree
Hide file tree
Showing 11 changed files with 219 additions and 7 deletions.
56 changes: 56 additions & 0 deletions integration_tests/starrocks-sink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Demo: Sinking to Starrocks

In this demo, we want to showcase how RisingWave is able to sink data to Starrocks.


1. Launch the cluster:

```sh
docker-compose up -d
```

The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data, a Starrocks fe and be for sink.

2. Create the Starrocks table via mysql:

Login to mysql
```sh
docker compose exec starrocks-fe mysql -uroot -P9030 -h127.0.0.1
```

Run the following queries to create database and table.
```sql
CREATE database demo;
use demo;

CREATE table demo_bhv_table(
user_id int,
target_id text,
event_timestamp datetime
) ENGINE=OLAP
PRIMARY KEY(`user_id`)
DISTRIBUTED BY HASH(`user_id`) properties("replication_num" = "1");

CREATE USER 'users'@'%' IDENTIFIED BY '123456';
GRANT ALL ON *.* TO 'users'@'%';
```

3. Execute the SQL queries in sequence:

- append-only sql:
- append-only/create_source.sql
- append-only/create_mv.sql
- append-only/create_sink.sql

- upsert sql:
- upsert/create_table.sql
- upsert/create_mv.sql
- upsert/create_sink.sql
- upsert/insert_update_delete.sql

We only support `upsert` with starrocks' `PRIMARY KEY`

Run the following query
```sql
select user_id, count(*) from demo.demo_bhv_table group by user_id;
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE MATERIALIZED VIEW bhv_mv AS
SELECT
user_id,
target_id,
event_timestamp
FROM
user_behaviors;
14 changes: 14 additions & 0 deletions integration_tests/starrocks-sink/append-only-sql/create_sink.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
CREATE SINK bhv_starrocks_sink
FROM
bhv_mv WITH (
connector = 'starrocks',
type = 'append-only',
starrocks.host = 'starrocks-fe',
starrocks.mysqlport = '9030',
starrocks.httpport = '8030',
starrocks.user = 'users',
starrocks.password = '123456',
starrocks.database = 'demo',
starrocks.table = 'demo_bhv_table',
force_append_only='true'
);
18 changes: 18 additions & 0 deletions integration_tests/starrocks-sink/append-only-sql/create_source.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
CREATE table user_behaviors (
user_id int,
target_id VARCHAR,
target_type VARCHAR,
event_timestamp TIMESTAMP,
behavior_type VARCHAR,
parent_target_type VARCHAR,
parent_target_id VARCHAR,
PRIMARY KEY(user_id)
) WITH (
connector = 'datagen',
fields.user_id.kind = 'sequence',
fields.user_id.start = '1',
fields.user_id.end = '1000',
fields.user_name.kind = 'random',
fields.user_name.length = '10',
datagen.rows.per.second = '10'
) FORMAT PLAIN ENCODE JSON;
78 changes: 78 additions & 0 deletions integration_tests/starrocks-sink/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
---
version: "3"
services:
starrocks-fe:
image: starrocks/fe-ubuntu:latest
hostname: starrocks-fe
container_name: starrocks-fe
command:
/opt/starrocks/fe/bin/start_fe.sh
ports:
- 8030:8030
- 9020:9020
- 9030:9030
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9030"]
interval: 5s
timeout: 5s
retries: 30
starrocks-be:
image: starrocks/be-ubuntu:latest
command:
- /bin/bash
- -c
- |
sleep 15s; mysql --connect-timeout 2 -h starrocks-fe -P9030 -uroot -e "alter system add backend \"starrocks-be:9050\";"
/opt/starrocks/be/bin/start_be.sh
ports:
- 8040:8040
hostname: starrocks-be
container_name: starrocks-be
depends_on:
- starrocks-fe
compactor-0:
extends:
file: ../../docker/docker-compose.yml
service: compactor-0
compute-node-0:
extends:
file: ../../docker/docker-compose.yml
service: compute-node-0
etcd-0:
extends:
file: ../../docker/docker-compose.yml
service: etcd-0
frontend-node-0:
extends:
file: ../../docker/docker-compose.yml
service: frontend-node-0
grafana-0:
extends:
file: ../../docker/docker-compose.yml
service: grafana-0
meta-node-0:
extends:
file: ../../docker/docker-compose.yml
service: meta-node-0
minio-0:
extends:
file: ../../docker/docker-compose.yml
service: minio-0
prometheus-0:
extends:
file: ../../docker/docker-compose.yml
service: prometheus-0
volumes:
compute-node-0:
external: false
etcd-0:
external: false
grafana-0:
external: false
minio-0:
external: false
prometheus-0:
external: false
message_queue:
external: false
name: risingwave-compose
7 changes: 7 additions & 0 deletions integration_tests/starrocks-sink/upsert/create_mv.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE MATERIALIZED VIEW bhv_mv AS
SELECT
user_id,
target_id,
event_timestamp
FROM
user_behaviors;
14 changes: 14 additions & 0 deletions integration_tests/starrocks-sink/upsert/create_sink.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
CREATE SINK bhv_starrocks_sink
FROM
bhv_mv WITH (
connector = 'starrocks',
type = 'upsert',
starrocks.host = 'starrocks-fe',
starrocks.mysqlport = '9030',
starrocks.httpport = '8030',
starrocks.user = 'users',
starrocks.password = '123456',
starrocks.database = 'demo',
starrocks.table = 'demo_bhv_table',
primary_key = 'user_id'
);
10 changes: 10 additions & 0 deletions integration_tests/starrocks-sink/upsert/create_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
CREATE table user_behaviors (
user_id int,
target_id VARCHAR,
target_type VARCHAR,
event_timestamp TIMESTAMP,
behavior_type VARCHAR,
parent_target_type VARCHAR,
parent_target_id VARCHAR,
PRIMARY KEY(user_id)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
INSERT INTO user_behaviors VALUES(1,'1','1','2020-01-01 01:01:01','1','1','1'),
(2,'2','2','2020-01-01 01:01:02','2','2','2'),
(3,'3','3','2020-01-01 01:01:03','3','3','3'),
(4,'4','4','2020-01-01 01:01:04','4','4','4');

DELETE FROM user_behaviors WHERE user_id = 2;

UPDATE user_behaviors SET target_id = 30 WHERE user_id = 3;
1 change: 1 addition & 0 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ macro_rules! for_all_sinks {
{ ElasticSearch, $crate::sink::remote::ElasticSearchSink },
{ Cassandra, $crate::sink::remote::CassandraSink },
{ Doris, $crate::sink::doris::DorisSink },
{ Starrocks, $crate::sink::starrocks::StarrocksSink },
{ Test, $crate::sink::test_sink::TestSink }
}
$(,$arg)*
Expand Down
13 changes: 6 additions & 7 deletions src/connector/src/sink/starrocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,6 @@ impl StarrocksSinkWriter {
config.common.table.clone(),
header,
);
// let insert = Some(starrocks_insert_builder.build_starrocks().await?);
Ok(Self {
config,
schema: schema.clone(),
Expand Down Expand Up @@ -362,11 +361,11 @@ impl StarrocksSinkWriter {
Value::String("0".to_string()),
);
let row_json_string = serde_json::to_string(&row_json_value)
.map_err(|e| SinkError::Doris(format!("Json derialize error {:?}", e)))?;
.map_err(|e| SinkError::Starrocks(format!("Json derialize error {:?}", e)))?;
self.insert
.as_mut()
.ok_or_else(|| {
SinkError::Doris("Can't find doris sink insert".to_string())
SinkError::Starrocks("Can't find starrocks sink insert".to_string())
})?
.write(row_json_string.into())
.await?;
Expand All @@ -378,11 +377,11 @@ impl StarrocksSinkWriter {
Value::String("1".to_string()),
);
let row_json_string = serde_json::to_string(&row_json_value)
.map_err(|e| SinkError::Doris(format!("Json derialize error {:?}", e)))?;
.map_err(|e| SinkError::Starrocks(format!("Json derialize error {:?}", e)))?;
self.insert
.as_mut()
.ok_or_else(|| {
SinkError::Doris("Can't find doris sink insert".to_string())
SinkError::Starrocks("Can't find starrocks sink insert".to_string())
})?
.write(row_json_string.into())
.await?;
Expand All @@ -395,11 +394,11 @@ impl StarrocksSinkWriter {
Value::String("0".to_string()),
);
let row_json_string = serde_json::to_string(&row_json_value)
.map_err(|e| SinkError::Doris(format!("Json derialize error {:?}", e)))?;
.map_err(|e| SinkError::Starrocks(format!("Json derialize error {:?}", e)))?;
self.insert
.as_mut()
.ok_or_else(|| {
SinkError::Doris("Can't find doris sink insert".to_string())
SinkError::Starrocks("Can't find starrocks sink insert".to_string())
})?
.write(row_json_string.into())
.await?;
Expand Down

0 comments on commit b5b982d

Please sign in to comment.