Skip to content

Commit

Permalink
add integration test
Browse files Browse the repository at this point in the history
fix

fix

fmt
  • Loading branch information
xxhZs committed Oct 16, 2023
1 parent 735b6d1 commit 0469554
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 5 deletions.
35 changes: 35 additions & 0 deletions integration_tests/big-query-sink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Demo: Sinking to Bigquery

In this demo, we want to showcase how RisingWave is able to sink data to Bigquery.

1. Launch the cluster:

```sh
docker-compose up -d
```

The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data.

3. Create the Bigquery table in Bigquery

```sql
CREATE table '${project_id}'.'${dataset_id}'.'${table_id}'(
user_id int,
target_id string,
event_timestamp datetime
);
```

4. Execute the SQL queries in sequence:

- append-only/create_source.sql
- append-only/create_mv.sql
- append-only/create_sink.sql

1. We need to obtain the JSON file for Google Cloud service accounts, which can be configured here: https://console.cloud.google.com/iam-admin/serviceaccounts.
2. Because BigQuery has limited support for updates and deletes, we currently only support 'append only'

Run the following query
```sql
select user_id, count(*) from demo.demo_bhv_table group by user_id;
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE MATERIALIZED VIEW bhv_mv AS
SELECT
user_id,
target_id,
event_timestamp
FROM
user_behaviors;
11 changes: 11 additions & 0 deletions integration_tests/big-query-sink/append-only-sql/create_sink.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
CREATE SINK bhv_big_query_sink
FROM
bhv_mv WITH (
connector = 'bigquery',
type = 'append-only',
bigquery.path= '${bigquery_service_account_json_path}',
bigquery.project= '${project_id}',
bigquery.dataset= '${dataset_id}',
bigquery.table= '${table_id}',
force_append_only='true'
);
18 changes: 18 additions & 0 deletions integration_tests/big-query-sink/append-only-sql/create_source.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
CREATE table user_behaviors (
user_id int,
target_id VARCHAR,
target_type VARCHAR,
event_timestamp TIMESTAMP,
behavior_type VARCHAR,
parent_target_type VARCHAR,
parent_target_id VARCHAR,
PRIMARY KEY(user_id)
) WITH (
connector = 'datagen',
fields.user_id.kind = 'sequence',
fields.user_id.start = '1',
fields.user_id.end = '1000',
fields.user_name.kind = 'random',
fields.user_name.length = '10',
datagen.rows.per.second = '10'
) FORMAT PLAIN ENCODE JSON;
49 changes: 49 additions & 0 deletions integration_tests/big-query-sink/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
---
version: "3"
services:
compactor-0:
extends:
file: ../../docker/docker-compose.yml
service: compactor-0
compute-node-0:
extends:
file: ../../docker/docker-compose.yml
service: compute-node-0
etcd-0:
extends:
file: ../../docker/docker-compose.yml
service: etcd-0
frontend-node-0:
extends:
file: ../../docker/docker-compose.yml
service: frontend-node-0
grafana-0:
extends:
file: ../../docker/docker-compose.yml
service: grafana-0
meta-node-0:
extends:
file: ../../docker/docker-compose.yml
service: meta-node-0
minio-0:
extends:
file: ../../docker/docker-compose.yml
service: minio-0
prometheus-0:
extends:
file: ../../docker/docker-compose.yml
service: prometheus-0
volumes:
compute-node-0:
external: false
etcd-0:
external: false
grafana-0:
external: false
minio-0:
external: false
prometheus-0:
external: false
message_queue:
external: false
name: risingwave-compose
2 changes: 1 addition & 1 deletion src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ mod native_type;
mod num256;
mod ops;
mod ordered;
pub mod ordered_float;
mod ordered_float;
mod postgres_type;
mod scalar_impl;
mod serial;
Expand Down
12 changes: 9 additions & 3 deletions src/connector/src/sink/big_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use crate::sink::{
};

pub const BIGQUERY_SINK: &str = "bigquery";
const BIGQUERY_INSERT_MAX_NUMS: usize = 500;
#[serde_as]
#[derive(Clone, Debug, Deserialize)]
pub struct BigQueryConfig {
Expand Down Expand Up @@ -95,8 +96,13 @@ impl BigQuerySink {
big_query_columns_desc: HashMap<String, String>,
) -> Result<()> {
let rw_fields_name = self.schema.fields();
if big_query_columns_desc.is_empty() {
return Err(SinkError::BigQuery(
"Cannot find table in bigquery".to_string(),
));
}
if rw_fields_name.len().ne(&big_query_columns_desc.len()) {
return Err(SinkError::BigQuery("The length of the RisingWave column must be equal to the length of the bigquery column".to_string()));
return Err(SinkError::BigQuery(format!("The length of the RisingWave column {} must be equal to the length of the bigquery column {}",rw_fields_name.len(),big_query_columns_desc.len())));
}

for i in rw_fields_name {
Expand Down Expand Up @@ -132,7 +138,7 @@ impl BigQuerySink {
DataType::Time => Err(SinkError::BigQuery(
"Bigquery cannot support Time".to_string(),
)),
DataType::Timestamp => Ok("DATATIME".to_owned()),
DataType::Timestamp => Ok("DATETIME".to_owned()),
DataType::Timestamptz => Ok("TIMESTAMP".to_owned()),
DataType::Interval => Ok("INTERVAL".to_owned()),
DataType::Struct(structs) => {
Expand Down Expand Up @@ -285,7 +291,7 @@ impl BigQuerySinkWriter {
self.insert_request
.add_rows(insert_vec)
.map_err(|e| SinkError::BigQuery(e.to_string()))?;
if self.insert_request.len().ge(&1000) {
if self.insert_request.len().ge(&BIGQUERY_INSERT_MAX_NUMS) {
self.insert_data().await?;
}
Ok(())
Expand Down
1 change: 0 additions & 1 deletion src/connector/src/sink/encoder/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ fn datum_to_json_object(
json!(v)
}
(DataType::Float32, ScalarRefImpl::Float32(v)) => {
println!("float32: {:?}", json!(f32::from(v)));
json!(f32::from(v))
}
(DataType::Float64, ScalarRefImpl::Float64(v)) => {
Expand Down

0 comments on commit 0469554

Please sign in to comment.