From cc5c6cd2755f8c13a514d027527c59e5b80327b7 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Mon, 16 Oct 2023 17:56:36 +0800 Subject: [PATCH] add integration test --- integration_tests/big-query-sink/README.md | 35 +++++++++++++ .../append-only-sql/create_mv.sql | 7 +++ .../append-only-sql/create_sink.sql | 11 +++++ .../append-only-sql/create_source.sql | 18 +++++++ .../big-query-sink/docker-compose.yml | 49 +++++++++++++++++++ src/connector/src/sink/big_query.rs | 9 +++- 6 files changed, 127 insertions(+), 2 deletions(-) create mode 100644 integration_tests/big-query-sink/README.md create mode 100644 integration_tests/big-query-sink/append-only-sql/create_mv.sql create mode 100644 integration_tests/big-query-sink/append-only-sql/create_sink.sql create mode 100644 integration_tests/big-query-sink/append-only-sql/create_source.sql create mode 100644 integration_tests/big-query-sink/docker-compose.yml diff --git a/integration_tests/big-query-sink/README.md b/integration_tests/big-query-sink/README.md new file mode 100644 index 0000000000000..dd957eb4e9bb4 --- /dev/null +++ b/integration_tests/big-query-sink/README.md @@ -0,0 +1,35 @@ +# Demo: Sinking to Bigquery + +In this demo, we want to showcase how RisingWave is able to sink data to Bigquery. + +1. Launch the cluster: + +```sh +docker-compose up -d +``` + +The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data. + +3. Create the Bigquery table in Bigquery + +```sql +CREATE table '${project_id}'.'${dataset_id}'.'${table_id}'( + user_id int, + target_id string, + event_timestamp datetime +); +``` + +4. Execute the SQL queries in sequence: + +- append-only/create_source.sql +- append-only/create_mv.sql +- append-only/create_sink.sql + + 1. We need to obtain the JSON file for Google Cloud service accounts, which can be configured here: https://console.cloud.google.com/iam-admin/serviceaccounts. + 2. Because BigQuery has limited support for updates and deletes, we currently only support 'append only' + +Run the following query +```sql +select user_id, count(*) from demo.demo_bhv_table group by user_id; +``` diff --git a/integration_tests/big-query-sink/append-only-sql/create_mv.sql b/integration_tests/big-query-sink/append-only-sql/create_mv.sql new file mode 100644 index 0000000000000..0a803f8a2762d --- /dev/null +++ b/integration_tests/big-query-sink/append-only-sql/create_mv.sql @@ -0,0 +1,7 @@ +CREATE MATERIALIZED VIEW bhv_mv AS +SELECT + user_id, + target_id, + event_timestamp +FROM + user_behaviors; \ No newline at end of file diff --git a/integration_tests/big-query-sink/append-only-sql/create_sink.sql b/integration_tests/big-query-sink/append-only-sql/create_sink.sql new file mode 100644 index 0000000000000..4cc9b377a861e --- /dev/null +++ b/integration_tests/big-query-sink/append-only-sql/create_sink.sql @@ -0,0 +1,11 @@ +CREATE SINK bhv_big_query_sink +FROM + bhv_mv WITH ( + connector = 'bigquery', + type = 'append-only', + bigquery.path= '${bigquery_service_account_json_path}', + bigquery.project= '${project_id}', + bigquery.dataset= '${dataset_id}', + bigquery.table= '${table_id}', + force_append_only='true' +); \ No newline at end of file diff --git a/integration_tests/big-query-sink/append-only-sql/create_source.sql b/integration_tests/big-query-sink/append-only-sql/create_source.sql new file mode 100644 index 0000000000000..c28c10f3616da --- /dev/null +++ b/integration_tests/big-query-sink/append-only-sql/create_source.sql @@ -0,0 +1,18 @@ +CREATE table user_behaviors ( + user_id int, + target_id VARCHAR, + target_type VARCHAR, + event_timestamp TIMESTAMP, + behavior_type VARCHAR, + parent_target_type VARCHAR, + parent_target_id VARCHAR, + PRIMARY KEY(user_id) +) WITH ( + connector = 'datagen', + fields.user_id.kind = 'sequence', + fields.user_id.start = '1', + fields.user_id.end = '1000', + fields.user_name.kind = 'random', + fields.user_name.length = '10', + datagen.rows.per.second = '10' +) FORMAT PLAIN ENCODE JSON; \ No newline at end of file diff --git a/integration_tests/big-query-sink/docker-compose.yml b/integration_tests/big-query-sink/docker-compose.yml new file mode 100644 index 0000000000000..e002b72065bf1 --- /dev/null +++ b/integration_tests/big-query-sink/docker-compose.yml @@ -0,0 +1,49 @@ +--- +version: "3" +services: + compactor-0: + extends: + file: ../../docker/docker-compose.yml + service: compactor-0 + compute-node-0: + extends: + file: ../../docker/docker-compose.yml + service: compute-node-0 + etcd-0: + extends: + file: ../../docker/docker-compose.yml + service: etcd-0 + frontend-node-0: + extends: + file: ../../docker/docker-compose.yml + service: frontend-node-0 + grafana-0: + extends: + file: ../../docker/docker-compose.yml + service: grafana-0 + meta-node-0: + extends: + file: ../../docker/docker-compose.yml + service: meta-node-0 + minio-0: + extends: + file: ../../docker/docker-compose.yml + service: minio-0 + prometheus-0: + extends: + file: ../../docker/docker-compose.yml + service: prometheus-0 +volumes: + compute-node-0: + external: false + etcd-0: + external: false + grafana-0: + external: false + minio-0: + external: false + prometheus-0: + external: false + message_queue: + external: false +name: risingwave-compose \ No newline at end of file diff --git a/src/connector/src/sink/big_query.rs b/src/connector/src/sink/big_query.rs index 466422f44f8d5..1319b135ca7d5 100644 --- a/src/connector/src/sink/big_query.rs +++ b/src/connector/src/sink/big_query.rs @@ -95,8 +95,13 @@ impl BigQuerySink { big_query_columns_desc: HashMap, ) -> Result<()> { let rw_fields_name = self.schema.fields(); + if big_query_columns_desc.is_empty() { + return Err(SinkError::BigQuery( + "Cannot find table in bigquery".to_string(), + )); + } if rw_fields_name.len().ne(&big_query_columns_desc.len()) { - return Err(SinkError::BigQuery("The length of the RisingWave column must be equal to the length of the bigquery column".to_string())); + return Err(SinkError::BigQuery(format!("The length of the RisingWave column {} must be equal to the length of the bigquery column {}",rw_fields_name.len(),big_query_columns_desc.len()))); } for i in rw_fields_name { @@ -132,7 +137,7 @@ impl BigQuerySink { DataType::Time => Err(SinkError::BigQuery( "Bigquery cannot support Time".to_string(), )), - DataType::Timestamp => Ok("DATATIME".to_owned()), + DataType::Timestamp => Ok("DATETIME".to_owned()), DataType::Timestamptz => Ok("TIMESTAMP".to_owned()), DataType::Interval => Ok("INTERVAL".to_owned()), DataType::Struct(structs) => {