diff --git a/Cargo.lock b/Cargo.lock index cf1f1f0e493e9..fa9b96ca6f3b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3363,6 +3363,29 @@ dependencies = [ "byteorder", ] +[[package]] +name = "gcp-bigquery-client" +version = "0.17.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "576b349942a1b96327b1b2e50d271d3d54dc9669eb38271c830916168e753820" +dependencies = [ + "async-stream", + "async-trait", + "dyn-clone", + "hyper", + "hyper-rustls 0.24.1", + "log", + "reqwest", + "serde", + "serde_json", + "thiserror", + "time", + "tokio", + "tokio-stream", + "url", + "yup-oauth2", +] + [[package]] name = "generator" version = "0.7.5" @@ -3786,7 +3809,9 @@ dependencies = [ "futures-util", "http", "hyper", + "log", "rustls 0.21.7", + "rustls-native-certs", "tokio", "tokio-rustls 0.24.1", ] @@ -7305,6 +7330,7 @@ dependencies = [ "enum-as-inner", "futures", "futures-async-stream", + "gcp-bigquery-client", "glob", "google-cloud-pubsub", "http", @@ -11154,6 +11180,33 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec" +[[package]] +name = "yup-oauth2" +version = "8.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "364ca376b5c04d9b2be9693054e3e0d2d146b363819d0f9a10c6ee66e4c8406b" +dependencies = [ + "anyhow", + "async-trait", + "base64 0.13.1", + "futures", + "http", + "hyper", + "hyper-rustls 0.24.1", + "itertools 0.10.5", + "log", + "percent-encoding", + "rustls 0.21.7", + "rustls-pemfile", + "seahash", + "serde", + "serde_json", + "time", + "tokio", + "tower-service", + "url", +] + [[package]] name = "zerocopy" version = "0.6.4" diff --git a/integration_tests/big-query-sink/README.md b/integration_tests/big-query-sink/README.md new file mode 100644 index 0000000000000..dd957eb4e9bb4 --- /dev/null +++ b/integration_tests/big-query-sink/README.md @@ -0,0 +1,35 @@ +# Demo: Sinking to Bigquery + +In this demo, we want to showcase how RisingWave is able to sink data to Bigquery. + +1. Launch the cluster: + +```sh +docker-compose up -d +``` + +The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data. + +3. Create the Bigquery table in Bigquery + +```sql +CREATE table '${project_id}'.'${dataset_id}'.'${table_id}'( + user_id int, + target_id string, + event_timestamp datetime +); +``` + +4. Execute the SQL queries in sequence: + +- append-only/create_source.sql +- append-only/create_mv.sql +- append-only/create_sink.sql + + 1. We need to obtain the JSON file for Google Cloud service accounts, which can be configured here: https://console.cloud.google.com/iam-admin/serviceaccounts. + 2. Because BigQuery has limited support for updates and deletes, we currently only support 'append only' + +Run the following query +```sql +select user_id, count(*) from demo.demo_bhv_table group by user_id; +``` diff --git a/integration_tests/big-query-sink/append-only-sql/create_mv.sql b/integration_tests/big-query-sink/append-only-sql/create_mv.sql new file mode 100644 index 0000000000000..0a803f8a2762d --- /dev/null +++ b/integration_tests/big-query-sink/append-only-sql/create_mv.sql @@ -0,0 +1,7 @@ +CREATE MATERIALIZED VIEW bhv_mv AS +SELECT + user_id, + target_id, + event_timestamp +FROM + user_behaviors; \ No newline at end of file diff --git a/integration_tests/big-query-sink/append-only-sql/create_sink.sql b/integration_tests/big-query-sink/append-only-sql/create_sink.sql new file mode 100644 index 0000000000000..4cc9b377a861e --- /dev/null +++ b/integration_tests/big-query-sink/append-only-sql/create_sink.sql @@ -0,0 +1,11 @@ +CREATE SINK bhv_big_query_sink +FROM + bhv_mv WITH ( + connector = 'bigquery', + type = 'append-only', + bigquery.path= '${bigquery_service_account_json_path}', + bigquery.project= '${project_id}', + bigquery.dataset= '${dataset_id}', + bigquery.table= '${table_id}', + force_append_only='true' +); \ No newline at end of file diff --git a/integration_tests/big-query-sink/append-only-sql/create_source.sql b/integration_tests/big-query-sink/append-only-sql/create_source.sql new file mode 100644 index 0000000000000..c28c10f3616da --- /dev/null +++ b/integration_tests/big-query-sink/append-only-sql/create_source.sql @@ -0,0 +1,18 @@ +CREATE table user_behaviors ( + user_id int, + target_id VARCHAR, + target_type VARCHAR, + event_timestamp TIMESTAMP, + behavior_type VARCHAR, + parent_target_type VARCHAR, + parent_target_id VARCHAR, + PRIMARY KEY(user_id) +) WITH ( + connector = 'datagen', + fields.user_id.kind = 'sequence', + fields.user_id.start = '1', + fields.user_id.end = '1000', + fields.user_name.kind = 'random', + fields.user_name.length = '10', + datagen.rows.per.second = '10' +) FORMAT PLAIN ENCODE JSON; \ No newline at end of file diff --git a/integration_tests/big-query-sink/docker-compose.yml b/integration_tests/big-query-sink/docker-compose.yml new file mode 100644 index 0000000000000..e002b72065bf1 --- /dev/null +++ b/integration_tests/big-query-sink/docker-compose.yml @@ -0,0 +1,49 @@ +--- +version: "3" +services: + compactor-0: + extends: + file: ../../docker/docker-compose.yml + service: compactor-0 + compute-node-0: + extends: + file: ../../docker/docker-compose.yml + service: compute-node-0 + etcd-0: + extends: + file: ../../docker/docker-compose.yml + service: etcd-0 + frontend-node-0: + extends: + file: ../../docker/docker-compose.yml + service: frontend-node-0 + grafana-0: + extends: + file: ../../docker/docker-compose.yml + service: grafana-0 + meta-node-0: + extends: + file: ../../docker/docker-compose.yml + service: meta-node-0 + minio-0: + extends: + file: ../../docker/docker-compose.yml + service: minio-0 + prometheus-0: + extends: + file: ../../docker/docker-compose.yml + service: prometheus-0 +volumes: + compute-node-0: + external: false + etcd-0: + external: false + grafana-0: + external: false + minio-0: + external: false + prometheus-0: + external: false + message_queue: + external: false +name: risingwave-compose \ No newline at end of file diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 87d2a0bdef689..d8ba8f7c6d4a7 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -49,6 +49,7 @@ easy-ext = "1" enum-as-inner = "0.6" futures = { version = "0.3", default-features = false, features = ["alloc"] } futures-async-stream = { workspace = true } +gcp-bigquery-client = "0.17.1" glob = "0.3" google-cloud-pubsub = "0.20" http = "0.2" diff --git a/src/connector/src/sink/big_query.rs b/src/connector/src/sink/big_query.rs new file mode 100644 index 0000000000000..9b62ddda37588 --- /dev/null +++ b/src/connector/src/sink/big_query.rs @@ -0,0 +1,382 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use core::mem; +use std::collections::HashMap; +use std::sync::Arc; + +use anyhow::anyhow; +use async_trait::async_trait; +use gcp_bigquery_client::model::query_request::QueryRequest; +use gcp_bigquery_client::model::table_data_insert_all_request::TableDataInsertAllRequest; +use gcp_bigquery_client::model::table_data_insert_all_request_rows::TableDataInsertAllRequestRows; +use gcp_bigquery_client::Client; +use risingwave_common::array::{Op, StreamChunk}; +use risingwave_common::buffer::Bitmap; +use risingwave_common::catalog::Schema; +use risingwave_common::types::DataType; +use serde_derive::{Deserialize, Serialize}; +use serde_json::Value; +use serde_with::serde_as; + +use super::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode}; +use super::writer::LogSinkerOf; +use super::{SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; +use crate::sink::writer::SinkWriterExt; +use crate::sink::{ + DummySinkCommitCoordinator, Result, Sink, SinkParam, SinkWriter, SinkWriterParam, +}; + +pub const BIGQUERY_SINK: &str = "bigquery"; +const BIGQUERY_INSERT_MAX_NUMS: usize = 1024; + +#[derive(Deserialize, Serialize, Debug, Clone)] +pub struct BigQueryCommon { + #[serde(rename = "bigquery.path")] + pub file_path: String, + #[serde(rename = "bigquery.project")] + pub project: String, + #[serde(rename = "bigquery.dataset")] + pub dataset: String, + #[serde(rename = "bigquery.table")] + pub table: String, +} + +#[serde_as] +#[derive(Clone, Debug, Deserialize)] +pub struct BigQueryConfig { + #[serde(flatten)] + pub common: BigQueryCommon, + + pub r#type: String, // accept "append-only" or "upsert" +} +impl BigQueryConfig { + pub fn from_hashmap(properties: HashMap) -> Result { + let config = + serde_json::from_value::(serde_json::to_value(properties).unwrap()) + .map_err(|e| SinkError::Config(anyhow!(e)))?; + if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT { + return Err(SinkError::Config(anyhow!( + "`{}` must be {}, or {}", + SINK_TYPE_OPTION, + SINK_TYPE_APPEND_ONLY, + SINK_TYPE_UPSERT + ))); + } + Ok(config) + } +} + +#[derive(Debug)] +pub struct BigQuerySink { + pub config: BigQueryConfig, + schema: Schema, + pk_indices: Vec, + is_append_only: bool, +} + +impl BigQuerySink { + pub fn new( + config: BigQueryConfig, + schema: Schema, + pk_indices: Vec, + is_append_only: bool, + ) -> Result { + Ok(Self { + config, + schema, + pk_indices, + is_append_only, + }) + } +} + +impl BigQuerySink { + fn check_column_name_and_type( + &self, + big_query_columns_desc: HashMap, + ) -> Result<()> { + let rw_fields_name = self.schema.fields(); + if big_query_columns_desc.is_empty() { + return Err(SinkError::BigQuery(anyhow::anyhow!( + "Cannot find table in bigquery" + ))); + } + if rw_fields_name.len().ne(&big_query_columns_desc.len()) { + return Err(SinkError::BigQuery(anyhow::anyhow!("The length of the RisingWave column {} must be equal to the length of the bigquery column {}",rw_fields_name.len(),big_query_columns_desc.len()))); + } + + for i in rw_fields_name { + let value = big_query_columns_desc.get(&i.name).ok_or_else(|| { + SinkError::BigQuery(anyhow::anyhow!( + "Column name don't find in bigquery, risingwave is {:?} ", + i.name + )) + })?; + let data_type_string = Self::get_string_and_check_support_from_datatype(&i.data_type)?; + if data_type_string.ne(value) { + return Err(SinkError::BigQuery(anyhow::anyhow!( + "Column type don't match, column name is {:?}. bigquery type is {:?} risingwave type is {:?} ",i.name,value,data_type_string + ))); + }; + } + Ok(()) + } + + fn get_string_and_check_support_from_datatype(rw_data_type: &DataType) -> Result { + match rw_data_type { + DataType::Boolean => Ok("BOOL".to_owned()), + DataType::Int16 => Ok("INT64".to_owned()), + DataType::Int32 => Ok("INT64".to_owned()), + DataType::Int64 => Ok("INT64".to_owned()), + DataType::Float32 => Err(SinkError::BigQuery(anyhow::anyhow!( + "Bigquery cannot support real" + ))), + DataType::Float64 => Ok("FLOAT64".to_owned()), + DataType::Decimal => Ok("NUMERIC".to_owned()), + DataType::Date => Ok("DATE".to_owned()), + DataType::Varchar => Ok("STRING".to_owned()), + DataType::Time => Err(SinkError::BigQuery(anyhow::anyhow!( + "Bigquery cannot support Time" + ))), + DataType::Timestamp => Ok("DATETIME".to_owned()), + DataType::Timestamptz => Ok("TIMESTAMP".to_owned()), + DataType::Interval => Ok("INTERVAL".to_owned()), + DataType::Struct(structs) => { + let mut elements_vec = vec![]; + for (name, datatype) in structs.iter() { + let element_string = + Self::get_string_and_check_support_from_datatype(datatype)?; + elements_vec.push(format!("{} {}", name, element_string)); + } + Ok(format!("STRUCT<{}>", elements_vec.join(", "))) + } + DataType::List(l) => { + let element_string = Self::get_string_and_check_support_from_datatype(l.as_ref())?; + Ok(format!("ARRAY<{}>", element_string)) + } + DataType::Bytea => Ok("BYTES".to_owned()), + DataType::Jsonb => Ok("JSON".to_owned()), + DataType::Serial => Ok("INT64".to_owned()), + DataType::Int256 => Err(SinkError::BigQuery(anyhow::anyhow!( + "Bigquery cannot support Int256" + ))), + } + } +} + +impl Sink for BigQuerySink { + type Coordinator = DummySinkCommitCoordinator; + type LogSinker = LogSinkerOf; + + const SINK_NAME: &'static str = BIGQUERY_SINK; + + async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { + Ok(BigQuerySinkWriter::new( + self.config.clone(), + self.schema.clone(), + self.pk_indices.clone(), + self.is_append_only, + ) + .await? + .into_log_sinker(writer_param.sink_metrics)) + } + + async fn validate(&self) -> Result<()> { + if !self.is_append_only { + return Err(SinkError::Config(anyhow!( + "BigQuery sink don't support upsert" + ))); + } + + let client = Client::from_service_account_key_file(&self.config.common.file_path) + .await + .map_err(|e| SinkError::BigQuery(e.into()))?; + let mut rs = client + .job() + .query( + &self.config.common.project, + QueryRequest::new(format!( + "SELECT column_name, data_type FROM `{}.{}.INFORMATION_SCHEMA.COLUMNS` WHERE table_name = '{}'" + ,self.config.common.project,self.config.common.dataset,self.config.common.table, + )), + ) + .await.map_err(|e| SinkError::BigQuery(e.into()))?; + let mut big_query_schema = HashMap::default(); + while rs.next_row() { + big_query_schema.insert( + rs.get_string_by_name("column_name") + .map_err(|e| SinkError::BigQuery(e.into()))? + .ok_or_else(|| { + SinkError::BigQuery(anyhow::anyhow!("Cannot find column_name")) + })?, + rs.get_string_by_name("data_type") + .map_err(|e| SinkError::BigQuery(e.into()))? + .ok_or_else(|| { + SinkError::BigQuery(anyhow::anyhow!("Cannot find column_name")) + })?, + ); + } + + self.check_column_name_and_type(big_query_schema)?; + Ok(()) + } +} + +pub struct BigQuerySinkWriter { + pub config: BigQueryConfig, + schema: Schema, + pk_indices: Vec, + client: Client, + is_append_only: bool, + insert_request: TableDataInsertAllRequest, + row_encoder: JsonEncoder, +} + +impl TryFrom for BigQuerySink { + type Error = SinkError; + + fn try_from(param: SinkParam) -> std::result::Result { + let schema = param.schema(); + let config = BigQueryConfig::from_hashmap(param.properties)?; + BigQuerySink::new( + config, + schema, + param.downstream_pk, + param.sink_type.is_append_only(), + ) + } +} + +impl BigQuerySinkWriter { + pub async fn new( + config: BigQueryConfig, + schema: Schema, + pk_indices: Vec, + is_append_only: bool, + ) -> Result { + let client = Client::from_service_account_key_file(&config.common.file_path) + .await + .map_err(|e| SinkError::BigQuery(e.into())) + .unwrap(); + Ok(Self { + config, + schema: schema.clone(), + pk_indices, + client, + is_append_only, + insert_request: TableDataInsertAllRequest::new(), + row_encoder: JsonEncoder::new_with_big_query( + schema, + None, + TimestampHandlingMode::String, + ), + }) + } + + async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> { + let mut insert_vec = Vec::with_capacity(chunk.capacity()); + for (op, row) in chunk.rows() { + if op != Op::Insert { + return Err(SinkError::BigQuery(anyhow::anyhow!( + "BigQuery sink don't support upsert" + ))); + } + insert_vec.push(TableDataInsertAllRequestRows { + insert_id: None, + json: Value::Object(self.row_encoder.encode(row)?), + }) + } + self.insert_request + .add_rows(insert_vec) + .map_err(|e| SinkError::BigQuery(e.into()))?; + if self.insert_request.len().ge(&BIGQUERY_INSERT_MAX_NUMS) { + self.insert_data().await?; + } + Ok(()) + } + + async fn insert_data(&mut self) -> Result<()> { + if !self.insert_request.is_empty() { + let insert_request = + mem::replace(&mut self.insert_request, TableDataInsertAllRequest::new()); + self.client + .tabledata() + .insert_all( + &self.config.common.project, + &self.config.common.dataset, + &self.config.common.table, + insert_request, + ) + .await + .map_err(|e| SinkError::BigQuery(e.into()))?; + } + Ok(()) + } +} + +#[async_trait] +impl SinkWriter for BigQuerySinkWriter { + async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { + if self.is_append_only { + self.append_only(chunk).await + } else { + Err(SinkError::BigQuery(anyhow::anyhow!( + "BigQuery sink don't support upsert" + ))) + } + } + + async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> { + Ok(()) + } + + async fn abort(&mut self) -> Result<()> { + Ok(()) + } + + async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> { + self.insert_data().await + } + + async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Arc) -> Result<()> { + Ok(()) + } +} + +#[cfg(test)] +mod test { + use risingwave_common::types::{DataType, StructType}; + + use crate::sink::big_query::BigQuerySink; + + #[tokio::test] + async fn test_type_check() { + let big_query_type_string = "ARRAY, v2 STRUCT>>"; + let rw_datatype = DataType::List(Box::new(DataType::Struct(StructType::new(vec![ + ("v1".to_owned(), DataType::List(Box::new(DataType::Int64))), + ( + "v2".to_owned(), + DataType::Struct(StructType::new(vec![ + ("v1".to_owned(), DataType::Int64), + ("v2".to_owned(), DataType::Int64), + ])), + ), + ])))); + assert_eq!( + BigQuerySink::get_string_and_check_support_from_datatype(&rw_datatype).unwrap(), + big_query_type_string + ); + } +} diff --git a/src/connector/src/sink/encoder/json.rs b/src/connector/src/sink/encoder/json.rs index 12176b491d6ab..c8bfc489c70c9 100644 --- a/src/connector/src/sink/encoder/json.rs +++ b/src/connector/src/sink/encoder/json.rs @@ -77,6 +77,20 @@ impl JsonEncoder { ..self } } + + pub fn new_with_big_query( + schema: Schema, + col_indices: Option>, + timestamp_handling_mode: TimestampHandlingMode, + ) -> Self { + Self { + schema, + col_indices, + timestamp_handling_mode, + custom_json_type: CustomJsonType::Bigquery, + kafka_connect: None, + } + } } impl RowEncoder for JsonEncoder { @@ -187,7 +201,7 @@ fn datum_to_json_object( } json!(v_string) } - CustomJsonType::None => { + CustomJsonType::None | CustomJsonType::Bigquery => { json!(v.to_text()) } }, @@ -204,7 +218,7 @@ fn datum_to_json_object( } (DataType::Date, ScalarRefImpl::Date(v)) => match custom_json_type { CustomJsonType::None => json!(v.0.num_days_from_ce()), - CustomJsonType::Doris(_) => { + CustomJsonType::Bigquery | CustomJsonType::Doris(_) => { let a = v.0.format("%Y-%m-%d").to_string(); json!(a) } @@ -259,7 +273,7 @@ fn datum_to_json_object( ArrayError::internal(format!("Json to string err{:?}", err)) })?) } - CustomJsonType::None => { + CustomJsonType::None | CustomJsonType::Bigquery => { let mut map = Map::with_capacity(st.len()); for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug( st.iter() diff --git a/src/connector/src/sink/encoder/mod.rs b/src/connector/src/sink/encoder/mod.rs index 83b2ab4f09df0..ec532c70b2c9e 100644 --- a/src/connector/src/sink/encoder/mod.rs +++ b/src/connector/src/sink/encoder/mod.rs @@ -91,7 +91,12 @@ pub enum TimestampHandlingMode { #[derive(Clone)] pub enum CustomJsonType { + // Doris's json need date is string. + // The internal order of the struct should follow the insertion order. + // The decimal needs verification and calibration. Doris(HashMap), + // Bigquery's json need date is string. + Bigquery, None, } diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 6afd08778cd96..fc590d2fa6935 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod big_query; pub mod blackhole; pub mod boxed; pub mod catalog; @@ -78,6 +79,7 @@ macro_rules! for_all_sinks { { ElasticSearch, $crate::sink::remote::ElasticSearchSink }, { Cassandra, $crate::sink::remote::CassandraSink }, { Doris, $crate::sink::doris::DorisSink }, + { BigQuery, $crate::sink::big_query::BigQuerySink }, { Test, $crate::sink::test_sink::TestSink } } $(,$arg)* @@ -393,6 +395,8 @@ pub enum SinkError { Pulsar(anyhow::Error), #[error("Internal error: {0}")] Internal(anyhow::Error), + #[error("BigQuery error: {0}")] + BigQuery(anyhow::Error), } impl From for SinkError {