From c0ce8a8a6b4d51e463a605fb307fa4b014e0f609 Mon Sep 17 00:00:00 2001 From: Xinhao Xu <84456268+xxhZs@users.noreply.github.com> Date: Tue, 3 Sep 2024 16:53:03 +0800 Subject: [PATCH] feat(sink): support async for bigquery sink (#17488) --- src/connector/src/sink/big_query.rs | 379 ++++++++++++++++----------- src/connector/with_options_sink.yaml | 8 - 2 files changed, 231 insertions(+), 156 deletions(-) diff --git a/src/connector/src/sink/big_query.rs b/src/connector/src/sink/big_query.rs index 22146e86d0d1..235b1ff5b653 100644 --- a/src/connector/src/sink/big_query.rs +++ b/src/connector/src/sink/big_query.rs @@ -12,19 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. +use core::pin::Pin; use core::time::Duration; -use std::collections::{BTreeMap, HashMap}; -use std::sync::Arc; +use std::collections::{BTreeMap, HashMap, VecDeque}; use anyhow::{anyhow, Context}; -use async_trait::async_trait; +use futures::future::pending; +use futures::prelude::Future; +use futures::{Stream, StreamExt}; +use futures_async_stream::try_stream; use gcp_bigquery_client::error::BQError; use gcp_bigquery_client::model::query_request::QueryRequest; use gcp_bigquery_client::model::table::Table; use gcp_bigquery_client::model::table_field_schema::TableFieldSchema; use gcp_bigquery_client::model::table_schema::TableSchema; use gcp_bigquery_client::Client; -use google_cloud_bigquery::grpc::apiv1::bigquery_client::StreamingWriteClient; use google_cloud_bigquery::grpc::apiv1::conn_pool::{WriteConnectionManager, DOMAIN}; use google_cloud_gax::conn::{ConnectionOptions, Environment}; use google_cloud_gax::grpc::Request; @@ -32,7 +34,7 @@ use google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_request:: ProtoData, Rows as AppendRowsRequestRows, }; use google_cloud_googleapis::cloud::bigquery::storage::v1::{ - AppendRowsRequest, ProtoRows, ProtoSchema, + AppendRowsRequest, AppendRowsResponse, ProtoRows, ProtoSchema, }; use google_cloud_pubsub::client::google_cloud_auth; use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile; @@ -42,32 +44,35 @@ use prost_types::{ FileDescriptorSet, }; use risingwave_common::array::{Op, StreamChunk}; -use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::types::DataType; use serde_derive::Deserialize; use serde_with::{serde_as, DisplayFromStr}; use simd_json::prelude::ArrayTrait; +use tokio::sync::mpsc; +use tonic::{async_trait, Response, Status}; use url::Url; use uuid::Uuid; use with_options::WithOptions; use yup_oauth2::ServiceAccountKey; use super::encoder::{ProtoEncoder, ProtoHeader, RowEncoder, SerTo}; -use super::writer::LogSinkerOf; -use super::{SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; +use super::log_store::{LogStoreReadItem, TruncateOffset}; +use super::{ + LogSinker, SinkError, SinkLogReader, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, +}; use crate::aws_utils::load_file_descriptor_from_s3; use crate::connector_common::AwsAuthProps; -use crate::sink::writer::SinkWriterExt; -use crate::sink::{ - DummySinkCommitCoordinator, Result, Sink, SinkParam, SinkWriter, SinkWriterParam, -}; +use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkParam, SinkWriterParam}; pub const BIGQUERY_SINK: &str = "bigquery"; pub const CHANGE_TYPE: &str = "_CHANGE_TYPE"; const DEFAULT_GRPC_CHANNEL_NUMS: usize = 4; const CONNECT_TIMEOUT: Option = Some(Duration::from_secs(30)); const CONNECTION_TIMEOUT: Option = None; +const BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536; +// < 10MB, we set 8MB +const MAX_ROW_SIZE: usize = 8 * 1024 * 1024; #[serde_as] #[derive(Deserialize, Debug, Clone, WithOptions)] @@ -82,23 +87,100 @@ pub struct BigQueryCommon { pub dataset: String, #[serde(rename = "bigquery.table")] pub table: String, - #[serde(rename = "bigquery.max_batch_rows", default = "default_max_batch_rows")] - #[serde_as(as = "DisplayFromStr")] - pub max_batch_rows: usize, - #[serde(rename = "bigquery.retry_times", default = "default_retry_times")] - #[serde_as(as = "DisplayFromStr")] - pub retry_times: usize, #[serde(default)] // default false #[serde_as(as = "DisplayFromStr")] pub auto_create: bool, } -fn default_max_batch_rows() -> usize { - 1024 +struct BigQueryFutureManager { + // `offset_queue` holds the Some corresponding to each future. + // When TruncateOffset is barrier, the num is 0, we don't need to wait for the return of `resp_stream`. + // When TruncateOffset is chunk: + // 1. chunk has no rows. we didn't send, the num is 0, we don't need to wait for the return of `resp_stream`. + // 2. chunk is less than `MAX_ROW_SIZE`, we only sent once, the num is 1 and we only have to wait once for `resp_stream`. + // 3. chunk is less than `MAX_ROW_SIZE`, we only sent n, the num is n and we need to wait n times for r. + offset_queue: VecDeque<(TruncateOffset, usize)>, + resp_stream: Pin> + Send>>, } +impl BigQueryFutureManager { + pub fn new( + max_future_num: usize, + resp_stream: impl Stream> + Send + 'static, + ) -> Self { + let offset_queue = VecDeque::with_capacity(max_future_num); + Self { + offset_queue, + resp_stream: Box::pin(resp_stream), + } + } + + pub fn add_offset(&mut self, offset: TruncateOffset, resp_num: usize) { + self.offset_queue.push_back((offset, resp_num)); + } -fn default_retry_times() -> usize { - 5 + pub async fn next_offset(&mut self) -> Result { + if let Some((_offset, remaining_resp_num)) = self.offset_queue.front_mut() { + if *remaining_resp_num == 0 { + return Ok(self.offset_queue.pop_front().unwrap().0); + } + while *remaining_resp_num > 0 { + self.resp_stream + .next() + .await + .ok_or_else(|| SinkError::BigQuery(anyhow::anyhow!("end of stream")))??; + *remaining_resp_num -= 1; + } + Ok(self.offset_queue.pop_front().unwrap().0) + } else { + pending().await + } + } +} +pub struct BigQueryLogSinker { + writer: BigQuerySinkWriter, + bigquery_future_manager: BigQueryFutureManager, + future_num: usize, +} +impl BigQueryLogSinker { + pub fn new( + writer: BigQuerySinkWriter, + resp_stream: impl Stream> + Send + 'static, + future_num: usize, + ) -> Self { + Self { + writer, + bigquery_future_manager: BigQueryFutureManager::new(future_num, resp_stream), + future_num, + } + } +} + +#[async_trait] +impl LogSinker for BigQueryLogSinker { + async fn consume_log_and_sink(mut self, log_reader: &mut impl SinkLogReader) -> Result { + loop { + tokio::select!( + offset = self.bigquery_future_manager.next_offset() => { + log_reader.truncate(offset?)?; + } + item_result = log_reader.next_item(), if self.bigquery_future_manager.offset_queue.len() <= self.future_num => { + let (epoch, item) = item_result?; + match item { + LogStoreReadItem::StreamChunk { chunk_id, chunk } => { + let resp_num = self.writer.write_chunk(chunk)?; + self.bigquery_future_manager + .add_offset(TruncateOffset::Chunk { epoch, chunk_id },resp_num); + } + LogStoreReadItem::Barrier { .. } => { + self.bigquery_future_manager + .add_offset(TruncateOffset::Barrier { epoch },0); + } + LogStoreReadItem::UpdateVnodeBitmap(_) => {} + } + } + ) + } + } } impl BigQueryCommon { @@ -116,14 +198,13 @@ impl BigQueryCommon { async fn build_writer_client( &self, aws_auth_props: &AwsAuthProps, - ) -> Result { + ) -> Result<(StorageWriterClient, impl Stream>)> { let auth_json = self.get_auth_json_from_path(aws_auth_props).await?; let credentials_file = CredentialsFile::new_from_str(&auth_json) .await .map_err(|e| SinkError::BigQuery(e.into()))?; - let client = StorageWriterClient::new(credentials_file).await?; - Ok(client) + StorageWriterClient::new(credentials_file).await } async fn get_auth_json_from_path(&self, aws_auth_props: &AwsAuthProps) -> Result { @@ -342,19 +423,23 @@ impl BigQuerySink { impl Sink for BigQuerySink { type Coordinator = DummySinkCommitCoordinator; - type LogSinker = LogSinkerOf; + type LogSinker = BigQueryLogSinker; const SINK_NAME: &'static str = BIGQUERY_SINK; - async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { - Ok(BigQuerySinkWriter::new( + async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result { + let (writer, resp_stream) = BigQuerySinkWriter::new( self.config.clone(), self.schema.clone(), self.pk_indices.clone(), self.is_append_only, ) - .await? - .into_log_sinker(writer_param.sink_metrics)) + .await?; + Ok(BigQueryLogSinker::new( + writer, + resp_stream, + BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE, + )) } async fn validate(&self) -> Result<()> { @@ -446,8 +531,6 @@ pub struct BigQuerySinkWriter { message_descriptor: MessageDescriptor, write_stream: String, proto_field: Option, - write_rows: Vec, - write_rows_count: usize, } impl TryFrom for BigQuerySink { @@ -471,8 +554,8 @@ impl BigQuerySinkWriter { schema: Schema, pk_indices: Vec, is_append_only: bool, - ) -> Result { - let client = config + ) -> Result<(Self, impl Stream>)> { + let (client, resp_stream) = config .common .build_writer_client(&config.aws_auth_props) .await?; @@ -519,25 +602,26 @@ impl BigQuerySinkWriter { message_descriptor.clone(), ProtoHeader::None, )?; - Ok(Self { - write_stream: format!( - "projects/{}/datasets/{}/tables/{}/streams/_default", - config.common.project, config.common.dataset, config.common.table - ), - config, - schema, - pk_indices, - client, - is_append_only, - row_encoder, - message_descriptor, - proto_field, - writer_pb_schema: ProtoSchema { - proto_descriptor: Some(descriptor_proto), + Ok(( + Self { + write_stream: format!( + "projects/{}/datasets/{}/tables/{}/streams/_default", + config.common.project, config.common.dataset, config.common.table + ), + config, + schema, + pk_indices, + client, + is_append_only, + row_encoder, + message_descriptor, + proto_field, + writer_pb_schema: ProtoSchema { + proto_descriptor: Some(descriptor_proto), + }, }, - write_rows: vec![], - write_rows_count: 0, - }) + resp_stream, + )) } fn append_only(&mut self, chunk: StreamChunk) -> Result>> { @@ -588,82 +672,96 @@ impl BigQuerySinkWriter { Ok(serialized_rows) } - async fn write_rows(&mut self) -> Result<()> { - if self.write_rows.is_empty() { - return Ok(()); - } - let mut errs = Vec::with_capacity(self.config.common.retry_times); - for _ in 0..self.config.common.retry_times { - match self - .client - .append_rows(self.write_rows.clone(), self.write_stream.clone()) - .await - { - Ok(_) => { - self.write_rows_count = 0; - self.write_rows.clear(); - return Ok(()); - } - Err(e) => errs.push(e), - } - } - Err(SinkError::BigQuery(anyhow::anyhow!( - "Insert error {:?}", - errs - ))) - } -} - -#[async_trait] -impl SinkWriter for BigQuerySinkWriter { - async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { + fn write_chunk(&mut self, chunk: StreamChunk) -> Result { let serialized_rows = if self.is_append_only { self.append_only(chunk)? } else { self.upsert(chunk)? }; - if !serialized_rows.is_empty() { - self.write_rows_count += serialized_rows.len(); + if serialized_rows.is_empty() { + return Ok(0); + } + let mut result = Vec::new(); + let mut result_inner = Vec::new(); + let mut size_count = 0; + for i in serialized_rows { + size_count += i.len(); + if size_count > MAX_ROW_SIZE { + result.push(result_inner); + result_inner = Vec::new(); + size_count = i.len(); + } + result_inner.push(i); + } + if !result_inner.is_empty() { + result.push(result_inner); + } + let len = result.len(); + for serialized_rows in result { let rows = AppendRowsRequestRows::ProtoRows(ProtoData { writer_schema: Some(self.writer_pb_schema.clone()), rows: Some(ProtoRows { serialized_rows }), }); - self.write_rows.push(rows); - - if self.write_rows_count >= self.config.common.max_batch_rows { - self.write_rows().await?; - } + self.client.append_rows(rows, self.write_stream.clone())?; } - Ok(()) - } - - async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> { - Ok(()) - } - - async fn abort(&mut self) -> Result<()> { - Ok(()) + Ok(len) } +} - async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> { - if is_checkpoint { - self.write_rows().await?; +#[try_stream(ok = (), error = SinkError)] +pub async fn resp_to_stream( + resp_stream: impl Future< + Output = std::result::Result< + Response>, + Status, + >, + > + + 'static + + Send, +) { + let mut resp_stream = resp_stream + .await + .map_err(|e| SinkError::BigQuery(e.into()))? + .into_inner(); + loop { + match resp_stream + .message() + .await + .map_err(|e| SinkError::BigQuery(e.into()))? + { + Some(append_rows_response) => { + if !append_rows_response.row_errors.is_empty() { + return Err(SinkError::BigQuery(anyhow::anyhow!( + "bigquery insert error {:?}", + append_rows_response.row_errors + ))); + } + if let Some(google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_response::Response::Error(status)) = append_rows_response.response{ + return Err(SinkError::BigQuery(anyhow::anyhow!( + "bigquery insert error {:?}", + status + ))); + } + yield (); + } + None => { + return Err(SinkError::BigQuery(anyhow::anyhow!( + "bigquery insert error: end of resp stream", + ))); + } } - Ok(()) - } - - async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Arc) -> Result<()> { - Ok(()) } } struct StorageWriterClient { - client: StreamingWriteClient, #[expect(dead_code)] environment: Environment, + request_sender: mpsc::UnboundedSender, } impl StorageWriterClient { - pub async fn new(credentials: CredentialsFile) -> Result { + pub async fn new( + credentials: CredentialsFile, + ) -> Result<(Self, impl Stream>)> { let ts_grpc = google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials( Self::bigquery_grpc_auth_config(), Box::new(credentials), @@ -683,49 +781,34 @@ impl StorageWriterClient { ) .await .map_err(|e| SinkError::BigQuery(e.into()))?; - let client = conn.conn(); - Ok(StorageWriterClient { - client, - environment, - }) + let mut client = conn.conn(); + + let (tx, rx) = mpsc::unbounded_channel(); + let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx); + + let resp = async move { client.append_rows(Request::new(stream)).await }; + let resp_stream = resp_to_stream(resp); + + Ok(( + StorageWriterClient { + environment, + request_sender: tx, + }, + resp_stream, + )) } - pub async fn append_rows( - &mut self, - rows: Vec, - write_stream: String, - ) -> Result<()> { - let mut resp_count = rows.len(); - let append_req: Vec = rows - .into_iter() - .map(|row| AppendRowsRequest { - write_stream: write_stream.clone(), - offset: None, - trace_id: Uuid::new_v4().hyphenated().to_string(), - missing_value_interpretations: HashMap::default(), - rows: Some(row), - }) - .collect(); - let mut resp = self - .client - .append_rows(Request::new(tokio_stream::iter(append_req))) - .await - .map_err(|e| SinkError::BigQuery(e.into()))? - .into_inner(); - while let Some(append_rows_response) = resp - .message() - .await - .map_err(|e| SinkError::BigQuery(e.into()))? - { - resp_count -= 1; - if !append_rows_response.row_errors.is_empty() { - return Err(SinkError::BigQuery(anyhow::anyhow!( - "Insert error {:?}", - append_rows_response.row_errors - ))); - } - } - assert_eq!(resp_count,0,"bigquery sink insert error: the number of response inserted is not equal to the number of request"); + pub fn append_rows(&mut self, row: AppendRowsRequestRows, write_stream: String) -> Result<()> { + let append_req = AppendRowsRequest { + write_stream: write_stream.clone(), + offset: None, + trace_id: Uuid::new_v4().hyphenated().to_string(), + missing_value_interpretations: HashMap::default(), + rows: Some(row), + }; + self.request_sender + .send(append_req) + .map_err(|e| SinkError::BigQuery(e.into()))?; Ok(()) } diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index d028ef5e3019..cc92f9a0a664 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -41,14 +41,6 @@ BigQueryConfig: - name: bigquery.table field_type: String required: true - - name: bigquery.max_batch_rows - field_type: usize - required: false - default: '1024' - - name: bigquery.retry_times - field_type: usize - required: false - default: '5' - name: auto_create field_type: bool required: false