diff --git a/integration_tests/big-query-sink/README.md b/integration_tests/big-query-sink/README.md new file mode 100644 index 0000000000000..dd957eb4e9bb4 --- /dev/null +++ b/integration_tests/big-query-sink/README.md @@ -0,0 +1,35 @@ +# Demo: Sinking to Bigquery + +In this demo, we want to showcase how RisingWave is able to sink data to Bigquery. + +1. Launch the cluster: + +```sh +docker-compose up -d +``` + +The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data. + +3. Create the Bigquery table in Bigquery + +```sql +CREATE table '${project_id}'.'${dataset_id}'.'${table_id}'( + user_id int, + target_id string, + event_timestamp datetime +); +``` + +4. Execute the SQL queries in sequence: + +- append-only/create_source.sql +- append-only/create_mv.sql +- append-only/create_sink.sql + + 1. We need to obtain the JSON file for Google Cloud service accounts, which can be configured here: https://console.cloud.google.com/iam-admin/serviceaccounts. + 2. Because BigQuery has limited support for updates and deletes, we currently only support 'append only' + +Run the following query +```sql +select user_id, count(*) from demo.demo_bhv_table group by user_id; +``` diff --git a/integration_tests/big-query-sink/append-only-sql/create_mv.sql b/integration_tests/big-query-sink/append-only-sql/create_mv.sql new file mode 100644 index 0000000000000..0a803f8a2762d --- /dev/null +++ b/integration_tests/big-query-sink/append-only-sql/create_mv.sql @@ -0,0 +1,7 @@ +CREATE MATERIALIZED VIEW bhv_mv AS +SELECT + user_id, + target_id, + event_timestamp +FROM + user_behaviors; \ No newline at end of file diff --git a/integration_tests/big-query-sink/append-only-sql/create_sink.sql b/integration_tests/big-query-sink/append-only-sql/create_sink.sql new file mode 100644 index 0000000000000..4cc9b377a861e --- /dev/null +++ b/integration_tests/big-query-sink/append-only-sql/create_sink.sql @@ -0,0 +1,11 @@ +CREATE SINK bhv_big_query_sink +FROM + bhv_mv WITH ( + connector = 'bigquery', + type = 'append-only', + bigquery.path= '${bigquery_service_account_json_path}', + bigquery.project= '${project_id}', + bigquery.dataset= '${dataset_id}', + bigquery.table= '${table_id}', + force_append_only='true' +); \ No newline at end of file diff --git a/integration_tests/big-query-sink/append-only-sql/create_source.sql b/integration_tests/big-query-sink/append-only-sql/create_source.sql new file mode 100644 index 0000000000000..c28c10f3616da --- /dev/null +++ b/integration_tests/big-query-sink/append-only-sql/create_source.sql @@ -0,0 +1,18 @@ +CREATE table user_behaviors ( + user_id int, + target_id VARCHAR, + target_type VARCHAR, + event_timestamp TIMESTAMP, + behavior_type VARCHAR, + parent_target_type VARCHAR, + parent_target_id VARCHAR, + PRIMARY KEY(user_id) +) WITH ( + connector = 'datagen', + fields.user_id.kind = 'sequence', + fields.user_id.start = '1', + fields.user_id.end = '1000', + fields.user_name.kind = 'random', + fields.user_name.length = '10', + datagen.rows.per.second = '10' +) FORMAT PLAIN ENCODE JSON; \ No newline at end of file diff --git a/integration_tests/big-query-sink/docker-compose.yml b/integration_tests/big-query-sink/docker-compose.yml new file mode 100644 index 0000000000000..e002b72065bf1 --- /dev/null +++ b/integration_tests/big-query-sink/docker-compose.yml @@ -0,0 +1,49 @@ +--- +version: "3" +services: + compactor-0: + extends: + file: ../../docker/docker-compose.yml + service: compactor-0 + compute-node-0: + extends: + file: ../../docker/docker-compose.yml + service: compute-node-0 + etcd-0: + extends: + file: ../../docker/docker-compose.yml + service: etcd-0 + frontend-node-0: + extends: + file: ../../docker/docker-compose.yml + service: frontend-node-0 + grafana-0: + extends: + file: ../../docker/docker-compose.yml + service: grafana-0 + meta-node-0: + extends: + file: ../../docker/docker-compose.yml + service: meta-node-0 + minio-0: + extends: + file: ../../docker/docker-compose.yml + service: minio-0 + prometheus-0: + extends: + file: ../../docker/docker-compose.yml + service: prometheus-0 +volumes: + compute-node-0: + external: false + etcd-0: + external: false + grafana-0: + external: false + minio-0: + external: false + prometheus-0: + external: false + message_queue: + external: false +name: risingwave-compose \ No newline at end of file diff --git a/src/common/src/types/mod.rs b/src/common/src/types/mod.rs index d4d4a49adbd26..83d281c5238e6 100644 --- a/src/common/src/types/mod.rs +++ b/src/common/src/types/mod.rs @@ -55,7 +55,7 @@ mod native_type; mod num256; mod ops; mod ordered; -pub mod ordered_float; +mod ordered_float; mod postgres_type; mod scalar_impl; mod serial; diff --git a/src/connector/src/sink/big_query.rs b/src/connector/src/sink/big_query.rs index 466422f44f8d5..c7e3febd9c017 100644 --- a/src/connector/src/sink/big_query.rs +++ b/src/connector/src/sink/big_query.rs @@ -40,6 +40,7 @@ use crate::sink::{ }; pub const BIGQUERY_SINK: &str = "bigquery"; +const BIGQUERY_INSERT_MAX_NUMS: usize = 500; #[serde_as] #[derive(Clone, Debug, Deserialize)] pub struct BigQueryConfig { @@ -95,8 +96,13 @@ impl BigQuerySink { big_query_columns_desc: HashMap, ) -> Result<()> { let rw_fields_name = self.schema.fields(); + if big_query_columns_desc.is_empty() { + return Err(SinkError::BigQuery( + "Cannot find table in bigquery".to_string(), + )); + } if rw_fields_name.len().ne(&big_query_columns_desc.len()) { - return Err(SinkError::BigQuery("The length of the RisingWave column must be equal to the length of the bigquery column".to_string())); + return Err(SinkError::BigQuery(format!("The length of the RisingWave column {} must be equal to the length of the bigquery column {}",rw_fields_name.len(),big_query_columns_desc.len()))); } for i in rw_fields_name { @@ -132,7 +138,7 @@ impl BigQuerySink { DataType::Time => Err(SinkError::BigQuery( "Bigquery cannot support Time".to_string(), )), - DataType::Timestamp => Ok("DATATIME".to_owned()), + DataType::Timestamp => Ok("DATETIME".to_owned()), DataType::Timestamptz => Ok("TIMESTAMP".to_owned()), DataType::Interval => Ok("INTERVAL".to_owned()), DataType::Struct(structs) => { @@ -285,7 +291,7 @@ impl BigQuerySinkWriter { self.insert_request .add_rows(insert_vec) .map_err(|e| SinkError::BigQuery(e.to_string()))?; - if self.insert_request.len().ge(&1000) { + if self.insert_request.len().ge(&BIGQUERY_INSERT_MAX_NUMS) { self.insert_data().await?; } Ok(()) diff --git a/src/connector/src/sink/encoder/json.rs b/src/connector/src/sink/encoder/json.rs index adc0015404b88..d97295cda56d9 100644 --- a/src/connector/src/sink/encoder/json.rs +++ b/src/connector/src/sink/encoder/json.rs @@ -151,7 +151,6 @@ fn datum_to_json_object( json!(v) } (DataType::Float32, ScalarRefImpl::Float32(v)) => { - println!("float32: {:?}", json!(f32::from(v))); json!(f32::from(v)) } (DataType::Float64, ScalarRefImpl::Float64(v)) => {