Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): Support big query sink #12873

Merged
merged 11 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 36 additions & 0 deletions integration_tests/big-query-sink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Demo: Sinking to Bigquery

In this demo, we want to showcase how RisingWave is able to sink data to Bigquery.

1. Launch the cluster:

```sh
docker-compose up -d
```

The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data.

3. Create the Bigquery table in Bigquery

```sql
CREATE table '${project_id}'.'${dataset_id}'.'${table_id}'(
user_id int,
target_id string,
event_timestamp datetime
);
```

4. Execute the SQL queries in sequence:

- append-only/create_source.sql
- append-only/create_mv.sql
- append-only/create_sink.sql

1. We need to obtain the JSON file for Google Cloud service accounts, which can be configured here: https://console.cloud.google.com/iam-admin/serviceaccounts.
2. Because BigQuery has limited support for updates and deletes, we currently only support 'append only'
3. Regarding file path, we can choose between S3 and local files, and the specific SQL statement is in the 'create_sink.sql'.

Run the following query
```sql
select user_id, count(*) from demo.demo_bhv_table group by user_id;
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE MATERIALIZED VIEW bhv_mv AS
SELECT
user_id,
target_id,
event_timestamp
FROM
user_behaviors;
29 changes: 29 additions & 0 deletions integration_tests/big-query-sink/append-only-sql/create_sink.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
-- create sink with local file
CREATE SINK bhv_big_query_sink
FROM
bhv_mv WITH (
connector = 'bigquery',
type = 'append-only',
bigquery.local.path= '${bigquery_service_account_json_path}',
bigquery.project= '${project_id}',
bigquery.dataset= '${dataset_id}',
bigquery.table= '${table_id}',
force_append_only='true'
);


-- create sink with s3 file
CREATE SINK bhv_big_query_sink
FROM
bhv_mv WITH (
connector = 'bigquery',
type = 'append-only',
bigquery.s3.path= '${s3_service_account_json_path}',
bigquery.project= '${project_id}',
bigquery.dataset= '${dataset_id}',
bigquery.table= '${table_id}',
access_key = '${aws_access_key}',
secret_access = '${aws_secret_access}',
region = '${aws_region}',
force_append_only='true',
);
18 changes: 18 additions & 0 deletions integration_tests/big-query-sink/append-only-sql/create_source.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
CREATE table user_behaviors (
user_id int,
target_id VARCHAR,
target_type VARCHAR,
event_timestamp TIMESTAMP,
behavior_type VARCHAR,
parent_target_type VARCHAR,
parent_target_id VARCHAR,
PRIMARY KEY(user_id)
) WITH (
connector = 'datagen',
fields.user_id.kind = 'sequence',
fields.user_id.start = '1',
fields.user_id.end = '1000',
fields.user_name.kind = 'random',
fields.user_name.length = '10',
datagen.rows.per.second = '10'
) FORMAT PLAIN ENCODE JSON;
49 changes: 49 additions & 0 deletions integration_tests/big-query-sink/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
---
version: "3"
services:
compactor-0:
extends:
file: ../../docker/docker-compose.yml
service: compactor-0
compute-node-0:
extends:
file: ../../docker/docker-compose.yml
service: compute-node-0
etcd-0:
extends:
file: ../../docker/docker-compose.yml
service: etcd-0
frontend-node-0:
extends:
file: ../../docker/docker-compose.yml
service: frontend-node-0
grafana-0:
extends:
file: ../../docker/docker-compose.yml
service: grafana-0
meta-node-0:
extends:
file: ../../docker/docker-compose.yml
service: meta-node-0
minio-0:
extends:
file: ../../docker/docker-compose.yml
service: minio-0
prometheus-0:
extends:
file: ../../docker/docker-compose.yml
service: prometheus-0
volumes:
compute-node-0:
external: false
etcd-0:
external: false
grafana-0:
external: false
minio-0:
external: false
prometheus-0:
external: false
message_queue:
external: false
name: risingwave-compose
2 changes: 2 additions & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ easy-ext = "1"
enum-as-inner = "0.6"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
futures-async-stream = { workspace = true }
gcp-bigquery-client = "0.17.1"
glob = "0.3"
google-cloud-pubsub = "0.20"
http = "0.2"
Expand Down Expand Up @@ -131,6 +132,7 @@ tracing = "0.1"
tracing-futures = { version = "0.2", features = ["futures-03"] }
url = "2"
urlencoding = "2"
yup-oauth2 = "8.3"

[target.'cfg(not(madsim))'.dependencies]
workspace-hack = { path = "../workspace-hack" }
Expand Down
Loading
Loading