diff --git a/integration_tests/starrocks-sink/README.md b/integration_tests/starrocks-sink/README.md new file mode 100644 index 0000000000000..f65b4b9406685 --- /dev/null +++ b/integration_tests/starrocks-sink/README.md @@ -0,0 +1,56 @@ +# Demo: Sinking to Starrocks + +In this demo, we want to showcase how RisingWave is able to sink data to Starrocks. + + +1. Launch the cluster: + +```sh +docker-compose up -d +``` + +The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data, a Starrocks fe and be for sink. + +2. Create the Starrocks table via mysql: + +Login to mysql +```sh +docker compose exec starrocks-fe mysql -uroot -P9030 -h127.0.0.1 +``` + +Run the following queries to create database and table. +```sql +CREATE database demo; +use demo; + +CREATE table demo_bhv_table( + user_id int, + target_id text, + event_timestamp datetime +) ENGINE=OLAP +PRIMARY KEY(`user_id`) +DISTRIBUTED BY HASH(`user_id`) properties("replication_num" = "1"); + +CREATE USER 'users'@'%' IDENTIFIED BY '123456'; +GRANT ALL ON *.* TO 'users'@'%'; +``` + +3. Execute the SQL queries in sequence: + +- append-only sql: + - append-only/create_source.sql + - append-only/create_mv.sql + - append-only/create_sink.sql + +- upsert sql: + - upsert/create_table.sql + - upsert/create_mv.sql + - upsert/create_sink.sql + - upsert/insert_update_delete.sql + +We only support `upsert` with starrocks' `PRIMARY KEY` + +Run the following query +```sql +select user_id, count(*) from demo.demo_bhv_table group by user_id; +``` diff --git a/integration_tests/starrocks-sink/append-only-sql/create_mv.sql b/integration_tests/starrocks-sink/append-only-sql/create_mv.sql new file mode 100644 index 0000000000000..0a803f8a2762d --- /dev/null +++ b/integration_tests/starrocks-sink/append-only-sql/create_mv.sql @@ -0,0 +1,7 @@ +CREATE MATERIALIZED VIEW bhv_mv AS +SELECT + user_id, + target_id, + event_timestamp +FROM + user_behaviors; \ No newline at end of file diff --git a/integration_tests/starrocks-sink/append-only-sql/create_sink.sql b/integration_tests/starrocks-sink/append-only-sql/create_sink.sql new file mode 100644 index 0000000000000..56d1b227512de --- /dev/null +++ b/integration_tests/starrocks-sink/append-only-sql/create_sink.sql @@ -0,0 +1,14 @@ +CREATE SINK bhv_starrocks_sink +FROM + bhv_mv WITH ( + connector = 'starrocks', + type = 'append-only', + starrocks.host = 'starrocks-fe', + starrocks.mysqlport = '9030', + starrocks.httpport = '8030', + starrocks.user = 'users', + starrocks.password = '123456', + starrocks.database = 'demo', + starrocks.table = 'demo_bhv_table', + force_append_only='true' +); \ No newline at end of file diff --git a/integration_tests/starrocks-sink/append-only-sql/create_source.sql b/integration_tests/starrocks-sink/append-only-sql/create_source.sql new file mode 100644 index 0000000000000..c28c10f3616da --- /dev/null +++ b/integration_tests/starrocks-sink/append-only-sql/create_source.sql @@ -0,0 +1,18 @@ +CREATE table user_behaviors ( + user_id int, + target_id VARCHAR, + target_type VARCHAR, + event_timestamp TIMESTAMP, + behavior_type VARCHAR, + parent_target_type VARCHAR, + parent_target_id VARCHAR, + PRIMARY KEY(user_id) +) WITH ( + connector = 'datagen', + fields.user_id.kind = 'sequence', + fields.user_id.start = '1', + fields.user_id.end = '1000', + fields.user_name.kind = 'random', + fields.user_name.length = '10', + datagen.rows.per.second = '10' +) FORMAT PLAIN ENCODE JSON; \ No newline at end of file diff --git a/integration_tests/starrocks-sink/docker-compose.yml b/integration_tests/starrocks-sink/docker-compose.yml new file mode 100644 index 0000000000000..1933853c16915 --- /dev/null +++ b/integration_tests/starrocks-sink/docker-compose.yml @@ -0,0 +1,78 @@ +--- +version: "3" +services: + starrocks-fe: + image: starrocks/fe-ubuntu:latest + hostname: starrocks-fe + container_name: starrocks-fe + command: + /opt/starrocks/fe/bin/start_fe.sh + ports: + - 8030:8030 + - 9020:9020 + - 9030:9030 + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9030"] + interval: 5s + timeout: 5s + retries: 30 + starrocks-be: + image: starrocks/be-ubuntu:latest + command: + - /bin/bash + - -c + - | + sleep 15s; mysql --connect-timeout 2 -h starrocks-fe -P9030 -uroot -e "alter system add backend \"starrocks-be:9050\";" + /opt/starrocks/be/bin/start_be.sh + ports: + - 8040:8040 + hostname: starrocks-be + container_name: starrocks-be + depends_on: + - starrocks-fe + compactor-0: + extends: + file: ../../docker/docker-compose.yml + service: compactor-0 + compute-node-0: + extends: + file: ../../docker/docker-compose.yml + service: compute-node-0 + etcd-0: + extends: + file: ../../docker/docker-compose.yml + service: etcd-0 + frontend-node-0: + extends: + file: ../../docker/docker-compose.yml + service: frontend-node-0 + grafana-0: + extends: + file: ../../docker/docker-compose.yml + service: grafana-0 + meta-node-0: + extends: + file: ../../docker/docker-compose.yml + service: meta-node-0 + minio-0: + extends: + file: ../../docker/docker-compose.yml + service: minio-0 + prometheus-0: + extends: + file: ../../docker/docker-compose.yml + service: prometheus-0 +volumes: + compute-node-0: + external: false + etcd-0: + external: false + grafana-0: + external: false + minio-0: + external: false + prometheus-0: + external: false + message_queue: + external: false +name: risingwave-compose \ No newline at end of file diff --git a/integration_tests/starrocks-sink/upsert/create_mv.sql b/integration_tests/starrocks-sink/upsert/create_mv.sql new file mode 100644 index 0000000000000..0a803f8a2762d --- /dev/null +++ b/integration_tests/starrocks-sink/upsert/create_mv.sql @@ -0,0 +1,7 @@ +CREATE MATERIALIZED VIEW bhv_mv AS +SELECT + user_id, + target_id, + event_timestamp +FROM + user_behaviors; \ No newline at end of file diff --git a/integration_tests/starrocks-sink/upsert/create_sink.sql b/integration_tests/starrocks-sink/upsert/create_sink.sql new file mode 100644 index 0000000000000..d7557bc1bd4fc --- /dev/null +++ b/integration_tests/starrocks-sink/upsert/create_sink.sql @@ -0,0 +1,14 @@ +CREATE SINK bhv_starrocks_sink +FROM + bhv_mv WITH ( + connector = 'starrocks', + type = 'upsert', + starrocks.host = 'starrocks-fe', + starrocks.mysqlport = '9030', + starrocks.httpport = '8030', + starrocks.user = 'users', + starrocks.password = '123456', + starrocks.database = 'demo', + starrocks.table = 'demo_bhv_table', + primary_key = 'user_id' +); \ No newline at end of file diff --git a/integration_tests/starrocks-sink/upsert/create_table.sql b/integration_tests/starrocks-sink/upsert/create_table.sql new file mode 100644 index 0000000000000..6c98f88a0b510 --- /dev/null +++ b/integration_tests/starrocks-sink/upsert/create_table.sql @@ -0,0 +1,10 @@ +CREATE table user_behaviors ( + user_id int, + target_id VARCHAR, + target_type VARCHAR, + event_timestamp TIMESTAMP, + behavior_type VARCHAR, + parent_target_type VARCHAR, + parent_target_id VARCHAR, + PRIMARY KEY(user_id) +); \ No newline at end of file diff --git a/integration_tests/starrocks-sink/upsert/insert_update_delete.sql b/integration_tests/starrocks-sink/upsert/insert_update_delete.sql new file mode 100644 index 0000000000000..73d5cda442258 --- /dev/null +++ b/integration_tests/starrocks-sink/upsert/insert_update_delete.sql @@ -0,0 +1,8 @@ +INSERT INTO user_behaviors VALUES(1,'1','1','2020-01-01 01:01:01','1','1','1'), +(2,'2','2','2020-01-01 01:01:02','2','2','2'), +(3,'3','3','2020-01-01 01:01:03','3','3','3'), +(4,'4','4','2020-01-01 01:01:04','4','4','4'); + +DELETE FROM user_behaviors WHERE user_id = 2; + +UPDATE user_behaviors SET target_id = 30 WHERE user_id = 3; \ No newline at end of file diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 74d7f048bdb1d..9fbcf143d724a 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -76,6 +76,7 @@ macro_rules! for_all_sinks { { ElasticSearch, $crate::sink::remote::ElasticSearchSink }, { Cassandra, $crate::sink::remote::CassandraSink }, { Doris, $crate::sink::doris::DorisSink }, + { Starrocks, $crate::sink::starrocks::StarrocksSink }, { Test, $crate::sink::test_sink::TestSink } } $(,$arg)* diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs index c879d24468bc5..6d79a051d23fc 100644 --- a/src/connector/src/sink/starrocks.rs +++ b/src/connector/src/sink/starrocks.rs @@ -318,7 +318,6 @@ impl StarrocksSinkWriter { config.common.table.clone(), header, ); - // let insert = Some(starrocks_insert_builder.build_starrocks().await?); Ok(Self { config, schema: schema.clone(), @@ -362,11 +361,11 @@ impl StarrocksSinkWriter { Value::String("0".to_string()), ); let row_json_string = serde_json::to_string(&row_json_value) - .map_err(|e| SinkError::Doris(format!("Json derialize error {:?}", e)))?; + .map_err(|e| SinkError::Starrocks(format!("Json derialize error {:?}", e)))?; self.insert .as_mut() .ok_or_else(|| { - SinkError::Doris("Can't find doris sink insert".to_string()) + SinkError::Starrocks("Can't find starrocks sink insert".to_string()) })? .write(row_json_string.into()) .await?; @@ -378,11 +377,11 @@ impl StarrocksSinkWriter { Value::String("1".to_string()), ); let row_json_string = serde_json::to_string(&row_json_value) - .map_err(|e| SinkError::Doris(format!("Json derialize error {:?}", e)))?; + .map_err(|e| SinkError::Starrocks(format!("Json derialize error {:?}", e)))?; self.insert .as_mut() .ok_or_else(|| { - SinkError::Doris("Can't find doris sink insert".to_string()) + SinkError::Starrocks("Can't find starrocks sink insert".to_string()) })? .write(row_json_string.into()) .await?; @@ -395,11 +394,11 @@ impl StarrocksSinkWriter { Value::String("0".to_string()), ); let row_json_string = serde_json::to_string(&row_json_value) - .map_err(|e| SinkError::Doris(format!("Json derialize error {:?}", e)))?; + .map_err(|e| SinkError::Starrocks(format!("Json derialize error {:?}", e)))?; self.insert .as_mut() .ok_or_else(|| { - SinkError::Doris("Can't find doris sink insert".to_string()) + SinkError::Starrocks("Can't find starrocks sink insert".to_string()) })? .write(row_json_string.into()) .await?;