Skip to content

Commit

Permalink
feat(sink): support doris sink (risingwavelabs#12336)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored Sep 21, 2023
1 parent a12611d commit d8ec952
Show file tree
Hide file tree
Showing 21 changed files with 1,270 additions and 32 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 64 additions & 0 deletions integration_tests/doris-sink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Demo: Sinking to Doris

In this demo, we want to showcase how RisingWave is able to sink data to Doris.

1. Modify max_map_count

```sh
sysctl -w vm.max_map_count=2000000
```

If, after running these commands, Docker still encounters Doris startup errors, please refer to: https://doris.apache.org/docs/dev/install/construct-docker/run-docker-cluster


2. Launch the cluster:

```sh
docker-compose up -d
```

The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data, a Doris fe and be for sink.

3. Create the Doris table via mysql:

Login to mysql
```sh
docker compose exec fe mysql -uroot -P9030 -h127.0.0.1
```

Run the following queries to create database and table.
```sql
CREATE database demo;
use demo;
CREATE table demo_bhv_table(
user_id int,
target_id text,
event_timestamp datetime
) UNIQUE KEY(`user_id`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
CREATE USER 'users'@'%' IDENTIFIED BY '123456';
GRANT ALL ON *.* TO 'users'@'%';
```

4. Execute the SQL queries in sequence:

- append-only sql:
- append-only/create_source.sql
- append-only/create_mv.sql
- append-only/create_sink.sql

- upsert sql:
- upsert/create_table.sql
- upsert/create_mv.sql
- upsert/create_sink.sql
- upsert/insert_update_delete.sql

We only support `upsert` with doris' `UNIQUE KEY`

Run the following query
```sql
select user_id, count(*) from demo.demo_bhv_table group by user_id;
```
7 changes: 7 additions & 0 deletions integration_tests/doris-sink/append-only-sql/create_mv.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE MATERIALIZED VIEW bhv_mv AS
SELECT
user_id,
target_id,
event_timestamp
FROM
user_behaviors;
12 changes: 12 additions & 0 deletions integration_tests/doris-sink/append-only-sql/create_sink.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE SINK bhv_doris_sink
FROM
bhv_mv WITH (
connector = 'doris',
type = 'append-only',
doris.url = 'http://fe:8030',
doris.user = 'users',
doris.password = '123456',
doris.database = 'demo',
doris.table='demo_bhv_table',
force_append_only='true'
);
18 changes: 18 additions & 0 deletions integration_tests/doris-sink/append-only-sql/create_source.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
CREATE table user_behaviors (
user_id int,
target_id VARCHAR,
target_type VARCHAR,
event_timestamp TIMESTAMP,
behavior_type VARCHAR,
parent_target_type VARCHAR,
parent_target_id VARCHAR,
PRIMARY KEY(user_id)
) WITH (
connector = 'datagen',
fields.user_id.kind = 'sequence',
fields.user_id.start = '1',
fields.user_id.end = '1000',
fields.user_name.kind = 'random',
fields.user_name.length = '10',
datagen.rows.per.second = '10'
) FORMAT PLAIN ENCODE JSON;
104 changes: 104 additions & 0 deletions integration_tests/doris-sink/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
---
version: "3"
services:
fe:
image: apache/doris:2.0.0_alpha-fe-x86_64
hostname: fe
environment:
- FE_SERVERS=fe1:172.21.0.2:9010
- FE_ID=1
ports:
- "8030:8030"
- "9030:9030"
networks:
mynetwork:
ipv4_address: 172.21.0.2
be:
image: apache/doris:2.0.0_alpha-be-x86_64
hostname: be
environment:
- FE_SERVERS=fe1:172.21.0.2:9010
- BE_ADDR=172.21.0.3:9050
depends_on:
- fe
ports:
- "9050:9050"
networks:
mynetwork:
ipv4_address: 172.21.0.3
compactor-0:
extends:
file: ../../docker/docker-compose.yml
service: compactor-0
networks:
mynetwork:
ipv4_address: 172.21.0.4
compute-node-0:
extends:
file: ../../docker/docker-compose.yml
service: compute-node-0
networks:
mynetwork:
ipv4_address: 172.21.0.5
etcd-0:
extends:
file: ../../docker/docker-compose.yml
service: etcd-0
networks:
mynetwork:
ipv4_address: 172.21.0.6
frontend-node-0:
extends:
file: ../../docker/docker-compose.yml
service: frontend-node-0
networks:
mynetwork:
ipv4_address: 172.21.0.7
grafana-0:
extends:
file: ../../docker/docker-compose.yml
service: grafana-0
networks:
mynetwork:
ipv4_address: 172.21.0.8
meta-node-0:
extends:
file: ../../docker/docker-compose.yml
service: meta-node-0
networks:
mynetwork:
ipv4_address: 172.21.0.9
minio-0:
extends:
file: ../../docker/docker-compose.yml
service: minio-0
networks:
mynetwork:
ipv4_address: 172.21.0.10
prometheus-0:
extends:
file: ../../docker/docker-compose.yml
service: prometheus-0
networks:
mynetwork:
ipv4_address: 172.21.0.11
volumes:
compute-node-0:
external: false
etcd-0:
external: false
grafana-0:
external: false
minio-0:
external: false
prometheus-0:
external: false
message_queue:
external: false
name: risingwave-compose
networks:
mynetwork:
ipam:
config:
- subnet: 172.21.80.0/16
default:
7 changes: 7 additions & 0 deletions integration_tests/doris-sink/upsert/create_mv.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE MATERIALIZED VIEW bhv_mv AS
SELECT
user_id,
target_id,
event_timestamp
FROM
user_behaviors;
12 changes: 12 additions & 0 deletions integration_tests/doris-sink/upsert/create_sink.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE SINK bhv_doris_sink
FROM
bhv_mv WITH (
connector = 'doris',
type = 'upsert',
doris.url = 'http://fe:8030',
doris.user = 'users',
doris.password = '123456',
doris.database = 'demo',
doris.table='demo_bhv_table',
primary_key = 'user_id'
);
10 changes: 10 additions & 0 deletions integration_tests/doris-sink/upsert/create_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
CREATE table user_behaviors (
user_id int,
target_id VARCHAR,
target_type VARCHAR,
event_timestamp TIMESTAMP,
behavior_type VARCHAR,
parent_target_type VARCHAR,
parent_target_id VARCHAR,
PRIMARY KEY(user_id)
);
8 changes: 8 additions & 0 deletions integration_tests/doris-sink/upsert/insert_update_delete.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
INSERT INTO user_behaviors VALUES(1,'1','1','2020-01-01 01:01:01','1','1','1'),
(2,'2','2','2020-01-01 01:01:02','2','2','2'),
(3,'3','3','2020-01-01 01:01:03','3','3','3'),
(4,'4','4','2020-01-01 01:01:04','4','4','4');

DELETE FROM user_behaviors WHERE user_id = 2;

UPDATE user_behaviors SET target_id = 30 WHERE user_id = 3;
6 changes: 6 additions & 0 deletions src/common/src/types/decimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,12 @@ impl Decimal {
Some(d.scale() as _)
}

pub fn rescale(&mut self, scale: u32) {
if let Normalized(a) = self {
a.rescale(scale);
}
}

#[must_use]
pub fn round_dp_ties_away(&self, dp: u32) -> Self {
match self {
Expand Down
4 changes: 3 additions & 1 deletion src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,11 @@ futures = { version = "0.3", default-features = false, features = ["alloc"] }
futures-async-stream = { workspace = true }
glob = "0.3"
google-cloud-pubsub = "0.20"
hyper = "0.14"
http = "0.2"
hyper = { version = "0.14", features = ["client", "tcp", "http1", "http2", "stream"] }
hyper-tls = "0.5"
icelake = { workspace = true }
indexmap ={ version = "1.9.3", features = ["serde"] }
itertools = "0.11"
jni = { version = "0.21.1", features = ["invocation"] }
jsonschema-transpiler = { git = "https://github.com/mozilla/jsonschema-transpiler", rev = "c1a89d720d118843d8bcca51084deb0ed223e4b4" }
Expand Down
27 changes: 27 additions & 0 deletions src/connector/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use url::Url;
use crate::aws_auth::AwsAuthProps;
use crate::aws_utils::load_file_descriptor_from_s3;
use crate::deserialize_duration_from_string;
use crate::sink::doris_connector::DorisGet;
use crate::sink::SinkError;
use crate::source::nats::source::NatsOffset;
// The file describes the common abstractions for each connector and can be used in both source and
Expand Down Expand Up @@ -439,6 +440,32 @@ impl ClickHouseCommon {
}
}

#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct DorisCommon {
#[serde(rename = "doris.url")]
pub url: String,
#[serde(rename = "doris.user")]
pub user: String,
#[serde(rename = "doris.password")]
pub password: String,
#[serde(rename = "doris.database")]
pub database: String,
#[serde(rename = "doris.table")]
pub table: String,
}

impl DorisCommon {
pub(crate) fn build_get_client(&self) -> DorisGet {
DorisGet::new(
self.url.clone(),
self.table.clone(),
self.database.clone(),
self.user.clone(),
self.password.clone(),
)
}
}

#[derive(Debug, Serialize, Deserialize)]
pub struct UpsertMessage<'a> {
#[serde(borrow)]
Expand Down
Loading

0 comments on commit d8ec952

Please sign in to comment.