Skip to content

Commit

Permalink
feat(sink): Support big query sink (#12873)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored Oct 25, 2023
1 parent e48547d commit 05bb5cc
Show file tree
Hide file tree
Showing 11 changed files with 582 additions and 3 deletions.
53 changes: 53 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 35 additions & 0 deletions integration_tests/big-query-sink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Demo: Sinking to Bigquery

In this demo, we want to showcase how RisingWave is able to sink data to Bigquery.

1. Launch the cluster:

```sh
docker-compose up -d
```

The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data.

3. Create the Bigquery table in Bigquery

```sql
CREATE table '${project_id}'.'${dataset_id}'.'${table_id}'(
user_id int,
target_id string,
event_timestamp datetime
);
```

4. Execute the SQL queries in sequence:

- append-only/create_source.sql
- append-only/create_mv.sql
- append-only/create_sink.sql

1. We need to obtain the JSON file for Google Cloud service accounts, which can be configured here: https://console.cloud.google.com/iam-admin/serviceaccounts.
2. Because BigQuery has limited support for updates and deletes, we currently only support 'append only'

Run the following query
```sql
select user_id, count(*) from demo.demo_bhv_table group by user_id;
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE MATERIALIZED VIEW bhv_mv AS
SELECT
user_id,
target_id,
event_timestamp
FROM
user_behaviors;
11 changes: 11 additions & 0 deletions integration_tests/big-query-sink/append-only-sql/create_sink.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
CREATE SINK bhv_big_query_sink
FROM
bhv_mv WITH (
connector = 'bigquery',
type = 'append-only',
bigquery.path= '${bigquery_service_account_json_path}',
bigquery.project= '${project_id}',
bigquery.dataset= '${dataset_id}',
bigquery.table= '${table_id}',
force_append_only='true'
);
18 changes: 18 additions & 0 deletions integration_tests/big-query-sink/append-only-sql/create_source.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
CREATE table user_behaviors (
user_id int,
target_id VARCHAR,
target_type VARCHAR,
event_timestamp TIMESTAMP,
behavior_type VARCHAR,
parent_target_type VARCHAR,
parent_target_id VARCHAR,
PRIMARY KEY(user_id)
) WITH (
connector = 'datagen',
fields.user_id.kind = 'sequence',
fields.user_id.start = '1',
fields.user_id.end = '1000',
fields.user_name.kind = 'random',
fields.user_name.length = '10',
datagen.rows.per.second = '10'
) FORMAT PLAIN ENCODE JSON;
49 changes: 49 additions & 0 deletions integration_tests/big-query-sink/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
---
version: "3"
services:
compactor-0:
extends:
file: ../../docker/docker-compose.yml
service: compactor-0
compute-node-0:
extends:
file: ../../docker/docker-compose.yml
service: compute-node-0
etcd-0:
extends:
file: ../../docker/docker-compose.yml
service: etcd-0
frontend-node-0:
extends:
file: ../../docker/docker-compose.yml
service: frontend-node-0
grafana-0:
extends:
file: ../../docker/docker-compose.yml
service: grafana-0
meta-node-0:
extends:
file: ../../docker/docker-compose.yml
service: meta-node-0
minio-0:
extends:
file: ../../docker/docker-compose.yml
service: minio-0
prometheus-0:
extends:
file: ../../docker/docker-compose.yml
service: prometheus-0
volumes:
compute-node-0:
external: false
etcd-0:
external: false
grafana-0:
external: false
minio-0:
external: false
prometheus-0:
external: false
message_queue:
external: false
name: risingwave-compose
1 change: 1 addition & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ easy-ext = "1"
enum-as-inner = "0.6"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
futures-async-stream = { workspace = true }
gcp-bigquery-client = "0.17.1"
glob = "0.3"
google-cloud-pubsub = "0.20"
http = "0.2"
Expand Down
Loading

0 comments on commit 05bb5cc

Please sign in to comment.