Skip to content

Commit

Permalink
add integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs committed Oct 16, 2023
1 parent 735b6d1 commit cc5c6cd
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 2 deletions.
35 changes: 35 additions & 0 deletions integration_tests/big-query-sink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Demo: Sinking to Bigquery

In this demo, we want to showcase how RisingWave is able to sink data to Bigquery.

1. Launch the cluster:

```sh
docker-compose up -d
```

The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data.

3. Create the Bigquery table in Bigquery

```sql
CREATE table '${project_id}'.'${dataset_id}'.'${table_id}'(
user_id int,
target_id string,
event_timestamp datetime
);
```

4. Execute the SQL queries in sequence:

- append-only/create_source.sql
- append-only/create_mv.sql
- append-only/create_sink.sql

1. We need to obtain the JSON file for Google Cloud service accounts, which can be configured here: https://console.cloud.google.com/iam-admin/serviceaccounts.
2. Because BigQuery has limited support for updates and deletes, we currently only support 'append only'

Run the following query
```sql
select user_id, count(*) from demo.demo_bhv_table group by user_id;
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE MATERIALIZED VIEW bhv_mv AS
SELECT
user_id,
target_id,
event_timestamp
FROM
user_behaviors;
11 changes: 11 additions & 0 deletions integration_tests/big-query-sink/append-only-sql/create_sink.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
CREATE SINK bhv_big_query_sink
FROM
bhv_mv WITH (
connector = 'bigquery',
type = 'append-only',
bigquery.path= '${bigquery_service_account_json_path}',
bigquery.project= '${project_id}',
bigquery.dataset= '${dataset_id}',
bigquery.table= '${table_id}',
force_append_only='true'
);
18 changes: 18 additions & 0 deletions integration_tests/big-query-sink/append-only-sql/create_source.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
CREATE table user_behaviors (
user_id int,
target_id VARCHAR,
target_type VARCHAR,
event_timestamp TIMESTAMP,
behavior_type VARCHAR,
parent_target_type VARCHAR,
parent_target_id VARCHAR,
PRIMARY KEY(user_id)
) WITH (
connector = 'datagen',
fields.user_id.kind = 'sequence',
fields.user_id.start = '1',
fields.user_id.end = '1000',
fields.user_name.kind = 'random',
fields.user_name.length = '10',
datagen.rows.per.second = '10'
) FORMAT PLAIN ENCODE JSON;
49 changes: 49 additions & 0 deletions integration_tests/big-query-sink/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
---
version: "3"
services:
compactor-0:
extends:
file: ../../docker/docker-compose.yml
service: compactor-0
compute-node-0:
extends:
file: ../../docker/docker-compose.yml
service: compute-node-0
etcd-0:
extends:
file: ../../docker/docker-compose.yml
service: etcd-0
frontend-node-0:
extends:
file: ../../docker/docker-compose.yml
service: frontend-node-0
grafana-0:
extends:
file: ../../docker/docker-compose.yml
service: grafana-0
meta-node-0:
extends:
file: ../../docker/docker-compose.yml
service: meta-node-0
minio-0:
extends:
file: ../../docker/docker-compose.yml
service: minio-0
prometheus-0:
extends:
file: ../../docker/docker-compose.yml
service: prometheus-0
volumes:
compute-node-0:
external: false
etcd-0:
external: false
grafana-0:
external: false
minio-0:
external: false
prometheus-0:
external: false
message_queue:
external: false
name: risingwave-compose
9 changes: 7 additions & 2 deletions src/connector/src/sink/big_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,13 @@ impl BigQuerySink {
big_query_columns_desc: HashMap<String, String>,
) -> Result<()> {
let rw_fields_name = self.schema.fields();
if big_query_columns_desc.is_empty() {
return Err(SinkError::BigQuery(
"Cannot find table in bigquery".to_string(),
));
}
if rw_fields_name.len().ne(&big_query_columns_desc.len()) {
return Err(SinkError::BigQuery("The length of the RisingWave column must be equal to the length of the bigquery column".to_string()));
return Err(SinkError::BigQuery(format!("The length of the RisingWave column {} must be equal to the length of the bigquery column {}",rw_fields_name.len(),big_query_columns_desc.len())));
}

for i in rw_fields_name {
Expand Down Expand Up @@ -132,7 +137,7 @@ impl BigQuerySink {
DataType::Time => Err(SinkError::BigQuery(
"Bigquery cannot support Time".to_string(),
)),
DataType::Timestamp => Ok("DATATIME".to_owned()),
DataType::Timestamp => Ok("DATETIME".to_owned()),
DataType::Timestamptz => Ok("TIMESTAMP".to_owned()),
DataType::Interval => Ok("INTERVAL".to_owned()),
DataType::Struct(structs) => {
Expand Down

0 comments on commit cc5c6cd

Please sign in to comment.