From 4baebceec31a81850e269042f839265e0211a823 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Thu, 27 Jun 2024 15:46:32 +0800 Subject: [PATCH 01/18] support --- src/connector/src/sink/big_query.rs | 211 ++++++++++++++-------------- 1 file changed, 107 insertions(+), 104 deletions(-) diff --git a/src/connector/src/sink/big_query.rs b/src/connector/src/sink/big_query.rs index c89e200093473..c6cd86c7da860 100644 --- a/src/connector/src/sink/big_query.rs +++ b/src/connector/src/sink/big_query.rs @@ -16,8 +16,9 @@ use core::time::Duration; use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; -use anyhow::anyhow; -use async_trait::async_trait; +use anyhow::{anyhow, Context}; +use futures::prelude::TryFuture; +use futures::{FutureExt}; use gcp_bigquery_client::model::query_request::QueryRequest; use gcp_bigquery_client::Client; use google_cloud_bigquery::grpc::apiv1::bigquery_client::StreamingWriteClient; @@ -28,7 +29,7 @@ use google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_request:: ProtoData, Rows as AppendRowsRequestRows, }; use google_cloud_googleapis::cloud::bigquery::storage::v1::{ - AppendRowsRequest, ProtoRows, ProtoSchema, + AppendRowsRequest, AppendRowsResponse, ProtoRows, ProtoSchema }; use google_cloud_pubsub::client::google_cloud_auth; use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile; @@ -38,24 +39,25 @@ use prost_types::{ FileDescriptorSet, }; use risingwave_common::array::{Op, StreamChunk}; -use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; use risingwave_common::types::DataType; use serde_derive::Deserialize; use serde_with::{serde_as, DisplayFromStr}; +use tokio::sync::{broadcast, mpsc, Mutex}; +use tonic::Streaming; use url::Url; use uuid::Uuid; use with_options::WithOptions; use yup_oauth2::ServiceAccountKey; use super::encoder::{ProtoEncoder, ProtoHeader, RowEncoder, SerTo}; -use super::writer::LogSinkerOf; +use super::log_store::DeliveryFutureManagerAddFuture; +use super::writer::{AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt}; use super::{SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; use crate::aws_utils::load_file_descriptor_from_s3; use crate::connector_common::AwsAuthProps; -use crate::sink::writer::SinkWriterExt; use crate::sink::{ - DummySinkCommitCoordinator, Result, Sink, SinkParam, SinkWriter, SinkWriterParam, + DummySinkCommitCoordinator, Result, Sink, SinkParam, SinkWriterParam, }; pub const BIGQUERY_SINK: &str = "bigquery"; @@ -63,6 +65,7 @@ pub const CHANGE_TYPE: &str = "_CHANGE_TYPE"; const DEFAULT_GRPC_CHANNEL_NUMS: usize = 4; const CONNECT_TIMEOUT: Option = Some(Duration::from_secs(30)); const CONNECTION_TIMEOUT: Option = None; +const BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 256; #[serde_as] #[derive(Deserialize, Debug, Clone, WithOptions)] @@ -259,11 +262,11 @@ impl BigQuerySink { impl Sink for BigQuerySink { type Coordinator = DummySinkCommitCoordinator; - type LogSinker = LogSinkerOf; + type LogSinker = AsyncTruncateLogSinkerOf; const SINK_NAME: &'static str = BIGQUERY_SINK; - async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { + async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result { Ok(BigQuerySinkWriter::new( self.config.clone(), self.schema.clone(), @@ -271,7 +274,7 @@ impl Sink for BigQuerySink { self.is_append_only, ) .await? - .into_log_sinker(writer_param.sink_metrics)) + .into_log_sinker(BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE)) } async fn validate(&self) -> Result<()> { @@ -329,8 +332,6 @@ pub struct BigQuerySinkWriter { message_descriptor: MessageDescriptor, write_stream: String, proto_field: Option, - write_rows: Vec, - write_rows_count: usize, } impl TryFrom for BigQuerySink { @@ -418,8 +419,6 @@ impl BigQuerySinkWriter { writer_pb_schema: ProtoSchema { proto_descriptor: Some(descriptor_proto), }, - write_rows: vec![], - write_rows_count: 0, }) } @@ -470,80 +469,63 @@ impl BigQuerySinkWriter { } Ok(serialized_rows) } - - async fn write_rows(&mut self) -> Result<()> { - if self.write_rows.is_empty() { - return Ok(()); - } - let mut errs = Vec::with_capacity(self.config.common.retry_times); - for _ in 0..self.config.common.retry_times { - match self - .client - .append_rows(self.write_rows.clone(), self.write_stream.clone()) - .await - { - Ok(_) => { - self.write_rows_count = 0; - self.write_rows.clear(); - return Ok(()); - } - Err(e) => errs.push(e), - } - } - Err(SinkError::BigQuery(anyhow::anyhow!( - "Insert error {:?}", - errs - ))) - } } -#[async_trait] -impl SinkWriter for BigQuerySinkWriter { - async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { +pub type BigQuerySinkDeliveryFuture = impl TryFuture + Unpin + 'static; + +impl AsyncTruncateSinkWriter for BigQuerySinkWriter { + type DeliveryFuture = BigQuerySinkDeliveryFuture; + + async fn write_chunk<'a>( + &'a mut self, + chunk: StreamChunk, + mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, + ) -> Result<()> { let serialized_rows = if self.is_append_only { self.append_only(chunk)? } else { self.upsert(chunk)? }; - if !serialized_rows.is_empty() { - self.write_rows_count += serialized_rows.len(); - let rows = AppendRowsRequestRows::ProtoRows(ProtoData { - writer_schema: Some(self.writer_pb_schema.clone()), - rows: Some(ProtoRows { serialized_rows }), - }); - self.write_rows.push(rows); - - if self.write_rows_count >= self.config.common.max_batch_rows { - self.write_rows().await?; - } - } - Ok(()) - } - - async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> { - Ok(()) - } - - async fn abort(&mut self) -> Result<()> { - Ok(()) - } - - async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> { - if is_checkpoint { - self.write_rows().await?; + if serialized_rows.is_empty() { + return Ok(()); } - Ok(()) - } - - async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Arc) -> Result<()> { + let rows = AppendRowsRequestRows::ProtoRows(ProtoData { + writer_schema: Some(self.writer_pb_schema.clone()), + rows: Some(ProtoRows { serialized_rows }), + }); + let (expect_offset,mut rx) = self.client + .append_rows(rows, self.write_stream.clone()) + .await?; + let future = Box::pin(async move{ + loop { + match rx.recv().await { + Ok((result,offset)) => { + if offset == expect_offset { + if let Some(result) = result { + return Err(SinkError::BigQuery(anyhow::anyhow!(result))); + }else{ + return Ok(()); + } + } + }, + Err(e) => { + return Err(SinkError::BigQuery(anyhow::anyhow!(e))); + } + } + } + }); + add_future.add_future_may_await(future).await?; Ok(()) } } struct StorageWriterClient { - client: StreamingWriteClient, #[expect(dead_code)] environment: Environment, + sender: mpsc::Sender, + result_sender: Arc, usize)>>, + abort_handle: tokio::task::AbortHandle, + offset: usize, } impl StorageWriterClient { pub async fn new(credentials: CredentialsFile) -> Result { @@ -566,50 +548,65 @@ impl StorageWriterClient { ) .await .map_err(|e| SinkError::BigQuery(e.into()))?; - let client = conn.conn(); + let mut client = conn.conn(); + + let (tx,rx) = mpsc::channel(BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE); + let stream = tokio_stream::wrappers::ReceiverStream::new(rx); + let (result_sender,_) = broadcast::channel(16); + let result_sender = Arc::new(result_sender); + let result_sender_clone = result_sender.clone(); + let abort_handle = tokio::spawn(async move { + let mut resp = client + .append_rows(Request::new(stream)) + .await + .map_err(|e| SinkError::BigQuery(e.into())).unwrap() + .into_inner(); + let mut offset = 0; + loop{ + let result = match resp.message().await { + Ok(Some(append_rows_response)) => { + if !append_rows_response.row_errors.is_empty() { + Some(format!("Insert error {:?}",append_rows_response.row_errors)) + }else{ + None + } + }, + Ok(None) => { + None + }, + Err(e) => { + Some(e.to_string()) + } + }; + result_sender_clone.send((result,offset)).unwrap(); + offset += 1; + } + }).abort_handle(); + Ok(StorageWriterClient { - client, environment, + sender: tx, + result_sender, + abort_handle, + offset: 0, }) } pub async fn append_rows( &mut self, - rows: Vec, + row: AppendRowsRequestRows, write_stream: String, - ) -> Result<()> { - let mut resp_count = rows.len(); - let append_req: Vec = rows - .into_iter() - .map(|row| AppendRowsRequest { + ) -> Result<(usize,broadcast::Receiver<(Option,usize)>)> { + let append_req = AppendRowsRequest { write_stream: write_stream.clone(), offset: None, trace_id: Uuid::new_v4().hyphenated().to_string(), missing_value_interpretations: HashMap::default(), rows: Some(row), - }) - .collect(); - let mut resp = self - .client - .append_rows(Request::new(tokio_stream::iter(append_req))) - .await - .map_err(|e| SinkError::BigQuery(e.into()))? - .into_inner(); - while let Some(append_rows_response) = resp - .message() - .await - .map_err(|e| SinkError::BigQuery(e.into()))? - { - resp_count -= 1; - if !append_rows_response.row_errors.is_empty() { - return Err(SinkError::BigQuery(anyhow::anyhow!( - "Insert error {:?}", - append_rows_response.row_errors - ))); - } - } - assert_eq!(resp_count,0,"bigquery sink insert error: the number of response inserted is not equal to the number of request"); - Ok(()) + }; + self.sender.send(append_req).await.map_err(|e| SinkError::BigQuery(e.into()))?; + self.offset += 1; + Ok((self.offset - 1, self.result_sender.subscribe())) } fn bigquery_grpc_auth_config() -> google_cloud_auth::project::Config<'static> { @@ -620,6 +617,12 @@ impl StorageWriterClient { } } } +impl Drop for StorageWriterClient { + fn drop(&mut self) { + println!("abort"); + self.abort_handle.abort(); + } +} fn build_protobuf_descriptor_pool(desc: &DescriptorProto) -> prost_reflect::DescriptorPool { let file_descriptor = FileDescriptorProto { From df1a80b2d12c39ab4979127505265952b5c4ac54 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Thu, 27 Jun 2024 18:11:50 +0800 Subject: [PATCH 02/18] support --- src/connector/src/sink/big_query.rs | 99 +++++++++++++++-------------- 1 file changed, 53 insertions(+), 46 deletions(-) diff --git a/src/connector/src/sink/big_query.rs b/src/connector/src/sink/big_query.rs index c6cd86c7da860..7e8985b23a774 100644 --- a/src/connector/src/sink/big_query.rs +++ b/src/connector/src/sink/big_query.rs @@ -16,12 +16,10 @@ use core::time::Duration; use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; -use anyhow::{anyhow, Context}; +use anyhow::anyhow; use futures::prelude::TryFuture; -use futures::{FutureExt}; use gcp_bigquery_client::model::query_request::QueryRequest; use gcp_bigquery_client::Client; -use google_cloud_bigquery::grpc::apiv1::bigquery_client::StreamingWriteClient; use google_cloud_bigquery::grpc::apiv1::conn_pool::{WriteConnectionManager, DOMAIN}; use google_cloud_gax::conn::{ConnectionOptions, Environment}; use google_cloud_gax::grpc::Request; @@ -29,7 +27,7 @@ use google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_request:: ProtoData, Rows as AppendRowsRequestRows, }; use google_cloud_googleapis::cloud::bigquery::storage::v1::{ - AppendRowsRequest, AppendRowsResponse, ProtoRows, ProtoSchema + AppendRowsRequest, ProtoRows, ProtoSchema, }; use google_cloud_pubsub::client::google_cloud_auth; use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile; @@ -43,8 +41,7 @@ use risingwave_common::catalog::Schema; use risingwave_common::types::DataType; use serde_derive::Deserialize; use serde_with::{serde_as, DisplayFromStr}; -use tokio::sync::{broadcast, mpsc, Mutex}; -use tonic::Streaming; +use tokio::sync::{broadcast, mpsc}; use url::Url; use uuid::Uuid; use with_options::WithOptions; @@ -52,13 +49,13 @@ use yup_oauth2::ServiceAccountKey; use super::encoder::{ProtoEncoder, ProtoHeader, RowEncoder, SerTo}; use super::log_store::DeliveryFutureManagerAddFuture; -use super::writer::{AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt}; +use super::writer::{ + AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, +}; use super::{SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; use crate::aws_utils::load_file_descriptor_from_s3; use crate::connector_common::AwsAuthProps; -use crate::sink::{ - DummySinkCommitCoordinator, Result, Sink, SinkParam, SinkWriterParam, -}; +use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkParam, SinkWriterParam}; pub const BIGQUERY_SINK: &str = "bigquery"; pub const CHANGE_TYPE: &str = "_CHANGE_TYPE"; @@ -493,21 +490,19 @@ impl AsyncTruncateSinkWriter for BigQuerySinkWriter { writer_schema: Some(self.writer_pb_schema.clone()), rows: Some(ProtoRows { serialized_rows }), }); - let (expect_offset,mut rx) = self.client - .append_rows(rows, self.write_stream.clone()) - .await?; - let future = Box::pin(async move{ + let (expect_offset, mut rx) = self.client.get_subscribe(); + let future = Box::pin(async move { loop { match rx.recv().await { - Ok((result,offset)) => { + Ok((result, offset)) => { if offset == expect_offset { if let Some(result) = result { return Err(SinkError::BigQuery(anyhow::anyhow!(result))); - }else{ + } else { return Ok(()); } } - }, + } Err(e) => { return Err(SinkError::BigQuery(anyhow::anyhow!(e))); } @@ -515,6 +510,9 @@ impl AsyncTruncateSinkWriter for BigQuerySinkWriter { } }); add_future.add_future_may_await(future).await?; + self.client + .append_rows(rows, self.write_stream.clone()) + .await?; Ok(()) } } @@ -523,9 +521,9 @@ struct StorageWriterClient { #[expect(dead_code)] environment: Environment, sender: mpsc::Sender, - result_sender: Arc, usize)>>, + result_sender: Arc, u64)>>, abort_handle: tokio::task::AbortHandle, - offset: usize, + offset: u64, } impl StorageWriterClient { pub async fn new(credentials: CredentialsFile) -> Result { @@ -550,38 +548,41 @@ impl StorageWriterClient { .map_err(|e| SinkError::BigQuery(e.into()))?; let mut client = conn.conn(); - let (tx,rx) = mpsc::channel(BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE); + let (tx, rx) = mpsc::channel(BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE); let stream = tokio_stream::wrappers::ReceiverStream::new(rx); - let (result_sender,_) = broadcast::channel(16); + let (result_sender, _) = broadcast::channel(BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE + 1); let result_sender = Arc::new(result_sender); let result_sender_clone = result_sender.clone(); let abort_handle = tokio::spawn(async move { let mut resp = client - .append_rows(Request::new(stream)) - .await - .map_err(|e| SinkError::BigQuery(e.into())).unwrap() - .into_inner(); + .append_rows(Request::new(stream)) + .await + .map_err(|e| SinkError::BigQuery(e.into())) + .unwrap() + .into_inner(); let mut offset = 0; - loop{ + loop { let result = match resp.message().await { Ok(Some(append_rows_response)) => { if !append_rows_response.row_errors.is_empty() { - Some(format!("Insert error {:?}",append_rows_response.row_errors)) - }else{ + Some(format!( + "Insert error {:?}", + append_rows_response.row_errors + )) + } else { None } - }, + } Ok(None) => { - None - }, - Err(e) => { - Some(e.to_string()) + continue; } + Err(e) => Some(e.to_string()), }; - result_sender_clone.send((result,offset)).unwrap(); + result_sender_clone.send((result, offset)).unwrap(); offset += 1; } - }).abort_handle(); + }) + .abort_handle(); Ok(StorageWriterClient { environment, @@ -592,21 +593,28 @@ impl StorageWriterClient { }) } + pub fn get_subscribe(&mut self) -> (u64, broadcast::Receiver<(Option, u64)>) { + self.offset += 1; + (self.offset - 1, self.result_sender.subscribe()) + } + pub async fn append_rows( &mut self, row: AppendRowsRequestRows, write_stream: String, - ) -> Result<(usize,broadcast::Receiver<(Option,usize)>)> { + ) -> Result<()> { let append_req = AppendRowsRequest { - write_stream: write_stream.clone(), - offset: None, - trace_id: Uuid::new_v4().hyphenated().to_string(), - missing_value_interpretations: HashMap::default(), - rows: Some(row), - }; - self.sender.send(append_req).await.map_err(|e| SinkError::BigQuery(e.into()))?; - self.offset += 1; - Ok((self.offset - 1, self.result_sender.subscribe())) + write_stream: write_stream.clone(), + offset: None, + trace_id: Uuid::new_v4().hyphenated().to_string(), + missing_value_interpretations: HashMap::default(), + rows: Some(row), + }; + self.sender + .send(append_req) + .await + .map_err(|e| SinkError::BigQuery(e.into()))?; + Ok(()) } fn bigquery_grpc_auth_config() -> google_cloud_auth::project::Config<'static> { @@ -619,7 +627,6 @@ impl StorageWriterClient { } impl Drop for StorageWriterClient { fn drop(&mut self) { - println!("abort"); self.abort_handle.abort(); } } From eed3cac39798ecdfe69694e257f76dd986a5e8bf Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Thu, 27 Jun 2024 18:17:12 +0800 Subject: [PATCH 03/18] remove option --- src/connector/src/sink/big_query.rs | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/src/connector/src/sink/big_query.rs b/src/connector/src/sink/big_query.rs index 7e8985b23a774..cdfb43d8772d8 100644 --- a/src/connector/src/sink/big_query.rs +++ b/src/connector/src/sink/big_query.rs @@ -40,7 +40,6 @@ use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::catalog::Schema; use risingwave_common::types::DataType; use serde_derive::Deserialize; -use serde_with::{serde_as, DisplayFromStr}; use tokio::sync::{broadcast, mpsc}; use url::Url; use uuid::Uuid; @@ -64,7 +63,6 @@ const CONNECT_TIMEOUT: Option = Some(Duration::from_secs(30)); const CONNECTION_TIMEOUT: Option = None; const BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 256; -#[serde_as] #[derive(Deserialize, Debug, Clone, WithOptions)] pub struct BigQueryCommon { #[serde(rename = "bigquery.local.path")] @@ -77,20 +75,6 @@ pub struct BigQueryCommon { pub dataset: String, #[serde(rename = "bigquery.table")] pub table: String, - #[serde(rename = "bigquery.max_batch_rows", default = "default_max_batch_rows")] - #[serde_as(as = "DisplayFromStr")] - pub max_batch_rows: usize, - #[serde(rename = "bigquery.retry_times", default = "default_retry_times")] - #[serde_as(as = "DisplayFromStr")] - pub retry_times: usize, -} - -fn default_max_batch_rows() -> usize { - 1024 -} - -fn default_retry_times() -> usize { - 5 } impl BigQueryCommon { @@ -135,7 +119,6 @@ impl BigQueryCommon { } } -#[serde_as] #[derive(Clone, Debug, Deserialize, WithOptions)] pub struct BigQueryConfig { #[serde(flatten)] From 7a311dcc12e274f1063dd9a14e31c63dc10edc26 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Thu, 27 Jun 2024 18:31:42 +0800 Subject: [PATCH 04/18] fmt --- src/connector/src/sink/big_query.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/connector/src/sink/big_query.rs b/src/connector/src/sink/big_query.rs index 7a510fb99f36c..2f80237b17f44 100644 --- a/src/connector/src/sink/big_query.rs +++ b/src/connector/src/sink/big_query.rs @@ -46,6 +46,7 @@ use risingwave_common::types::DataType; use serde_derive::Deserialize; use serde_with::{serde_as, DisplayFromStr}; use simd_json::prelude::ArrayTrait; +use thiserror_ext::AsReport; use tokio::sync::{broadcast, mpsc}; use url::Url; use uuid::Uuid; From e46e816762d875f41efa29fdb2586a445155c658 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Thu, 27 Jun 2024 18:37:55 +0800 Subject: [PATCH 05/18] recovery cargo lock recopver --- Cargo.lock | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4a3f657850780..80cfce6eb09dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3425,15 +3425,16 @@ dependencies = [ [[package]] name = "curve25519-dalek" -version = "4.1.3" +version = "4.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be" +checksum = "0a677b8922c94e01bdbb12126b0bc852f00447528dee1782229af9c720c3f348" dependencies = [ "cfg-if", "cpufeatures", "curve25519-dalek-derive", "digest", "fiat-crypto", + "platforms", "rustc_version 0.4.0", "subtle", "zeroize", @@ -9251,6 +9252,12 @@ version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" +[[package]] +name = "platforms" +version = "3.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4503fa043bf02cee09a9582e9554b4c6403b2ef55e4612e96561d294419429f8" + [[package]] name = "plotters" version = "0.3.5" From f097e351485036e0ee301a31c30c5a718f965a4d Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Fri, 28 Jun 2024 14:37:42 +0800 Subject: [PATCH 06/18] fix ci --- src/connector/with_options_sink.yaml | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 7deb67a524fcb..467486532a6e6 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -17,14 +17,6 @@ BigQueryConfig: - name: bigquery.table field_type: String required: true - - name: bigquery.max_batch_rows - field_type: usize - required: false - default: '1024' - - name: bigquery.retry_times - field_type: usize - required: false - default: '5' - name: auto_create field_type: bool required: false From 9c94778d8082b8ccd9a41c58b142fb89e87359d4 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Thu, 4 Jul 2024 18:41:08 +0800 Subject: [PATCH 07/18] refactor --- src/connector/src/sink/big_query.rs | 184 +++++++++++++--------------- 1 file changed, 83 insertions(+), 101 deletions(-) diff --git a/src/connector/src/sink/big_query.rs b/src/connector/src/sink/big_query.rs index 2f80237b17f44..26b9cd3dffca3 100644 --- a/src/connector/src/sink/big_query.rs +++ b/src/connector/src/sink/big_query.rs @@ -14,10 +14,11 @@ use core::time::Duration; use std::collections::{BTreeMap, HashMap}; -use std::sync::Arc; +use std::pin::pin; use anyhow::anyhow; -use futures::prelude::TryFuture; +use rw_futures_util::drop_either_future; +use futures::future::{select, Either}; use gcp_bigquery_client::error::BQError; use gcp_bigquery_client::model::query_request::QueryRequest; use gcp_bigquery_client::model::table::Table; @@ -31,7 +32,7 @@ use google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_request:: ProtoData, Rows as AppendRowsRequestRows, }; use google_cloud_googleapis::cloud::bigquery::storage::v1::{ - AppendRowsRequest, ProtoRows, ProtoSchema, + AppendRowsRequest, AppendRowsResponse, ProtoRows, ProtoSchema }; use google_cloud_pubsub::client::google_cloud_auth; use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile; @@ -46,19 +47,16 @@ use risingwave_common::types::DataType; use serde_derive::Deserialize; use serde_with::{serde_as, DisplayFromStr}; use simd_json::prelude::ArrayTrait; -use thiserror_ext::AsReport; -use tokio::sync::{broadcast, mpsc}; +use tokio::sync::mpsc; +use tonic::{async_trait, Streaming}; use url::Url; use uuid::Uuid; use with_options::WithOptions; use yup_oauth2::ServiceAccountKey; use super::encoder::{ProtoEncoder, ProtoHeader, RowEncoder, SerTo}; -use super::log_store::DeliveryFutureManagerAddFuture; -use super::writer::{ - AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, -}; -use super::{SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; +use super::log_store::{LogStoreReadItem, TruncateOffset}; +use super::{LogSinker, SinkError, SinkLogReader, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; use crate::aws_utils::load_file_descriptor_from_s3; use crate::connector_common::AwsAuthProps; use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkParam, SinkWriterParam}; @@ -88,6 +86,61 @@ pub struct BigQueryCommon { pub auto_create: bool, } +pub struct BigQueryLogSinker{ + writer: BigQuerySinkWriter, + resp_stream: Streaming, + future_num: usize, +} +impl BigQueryLogSinker { + pub async fn new(writer: BigQuerySinkWriter, resp_stream: Streaming, future_num: usize) -> Self { + Self { + writer, + resp_stream, + future_num, + } + } +} + +#[async_trait] +impl LogSinker for BigQueryLogSinker { + async fn consume_log_and_sink(mut self, log_reader: &mut impl SinkLogReader) -> Result { + let (tx, mut rx) = mpsc::channel(self.future_num); + loop { + let select_result = drop_either_future( + select( + pin!(log_reader.next_item()), + pin!(rx.recv()), + ) + .await, + ); + match select_result { + Either::Left(item_result) => { + let (epoch, item) = item_result?; + match item { + LogStoreReadItem::StreamChunk { chunk_id, chunk } => { + self.writer.write_chunk(chunk).await?; + tx.send(TruncateOffset::Chunk { epoch, chunk_id }).await.map_err(|err| SinkError::BigQuery(err.into()))?; + } + LogStoreReadItem::Barrier { .. } => { + tx.send(TruncateOffset::Barrier { epoch }).await.map_err(|err| SinkError::BigQuery(err.into()))?; + } + LogStoreReadItem::UpdateVnodeBitmap(_) => {} + } + } + Either::Right(result) => { + let offset = result.ok_or_else(||{ + SinkError::BigQuery(anyhow::anyhow!("BigQuerySinkWriter error: channal unexpectedly closed")) + })?; + if matches!(offset,TruncateOffset::Chunk{..}){ + self.resp_stream.message().await.map_err(|e|SinkError::BigQuery(e.into()))?; + } + log_reader.truncate(offset)?; + } + } + } + } +} + impl BigQueryCommon { async fn build_client(&self, aws_auth_props: &AwsAuthProps) -> Result { let auth_json = self.get_auth_json_from_path(aws_auth_props).await?; @@ -103,14 +156,13 @@ impl BigQueryCommon { async fn build_writer_client( &self, aws_auth_props: &AwsAuthProps, - ) -> Result { + ) -> Result<(StorageWriterClient,Streaming)> { let auth_json = self.get_auth_json_from_path(aws_auth_props).await?; let credentials_file = CredentialsFile::new_from_str(&auth_json) .await .map_err(|e| SinkError::BigQuery(e.into()))?; - let client = StorageWriterClient::new(credentials_file).await?; - Ok(client) + StorageWriterClient::new(credentials_file).await } async fn get_auth_json_from_path(&self, aws_auth_props: &AwsAuthProps) -> Result { @@ -327,19 +379,18 @@ impl BigQuerySink { impl Sink for BigQuerySink { type Coordinator = DummySinkCommitCoordinator; - type LogSinker = AsyncTruncateLogSinkerOf; + type LogSinker = BigQueryLogSinker; const SINK_NAME: &'static str = BIGQUERY_SINK; async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result { - Ok(BigQuerySinkWriter::new( + let (writer,resp_stream) = BigQuerySinkWriter::new( self.config.clone(), self.schema.clone(), self.pk_indices.clone(), self.is_append_only, - ) - .await? - .into_log_sinker(BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE)) + ).await?; + Ok(BigQueryLogSinker::new(writer,resp_stream,BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE).await) } async fn validate(&self) -> Result<()> { @@ -451,8 +502,8 @@ impl BigQuerySinkWriter { schema: Schema, pk_indices: Vec, is_append_only: bool, - ) -> Result { - let client = config + ) -> Result<(Self,Streaming)> { + let (client,resp_stream) = config .common .build_writer_client(&config.aws_auth_props) .await?; @@ -499,7 +550,7 @@ impl BigQuerySinkWriter { message_descriptor.clone(), ProtoHeader::None, )?; - Ok(Self { + Ok((Self { write_stream: format!( "projects/{}/datasets/{}/tables/{}/streams/_default", config.common.project, config.common.dataset, config.common.table @@ -515,7 +566,7 @@ impl BigQuerySinkWriter { writer_pb_schema: ProtoSchema { proto_descriptor: Some(descriptor_proto), }, - }) + },resp_stream)) } fn append_only(&mut self, chunk: StreamChunk) -> Result>> { @@ -565,17 +616,10 @@ impl BigQuerySinkWriter { } Ok(serialized_rows) } -} - -pub type BigQuerySinkDeliveryFuture = impl TryFuture + Unpin + 'static; - -impl AsyncTruncateSinkWriter for BigQuerySinkWriter { - type DeliveryFuture = BigQuerySinkDeliveryFuture; async fn write_chunk<'a>( &'a mut self, chunk: StreamChunk, - mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, ) -> Result<()> { let serialized_rows = if self.is_append_only { self.append_only(chunk)? @@ -589,26 +633,6 @@ impl AsyncTruncateSinkWriter for BigQuerySinkWriter { writer_schema: Some(self.writer_pb_schema.clone()), rows: Some(ProtoRows { serialized_rows }), }); - let (expect_offset, mut rx) = self.client.get_subscribe(); - let future = Box::pin(async move { - loop { - match rx.recv().await { - Ok((result, offset)) => { - if offset == expect_offset { - if let Some(result) = result { - return Err(SinkError::BigQuery(anyhow::anyhow!(result))); - } else { - return Ok(()); - } - } - } - Err(e) => { - return Err(SinkError::BigQuery(anyhow::anyhow!(e))); - } - } - } - }); - add_future.add_future_may_await(future).await?; self.client .append_rows(rows, self.write_stream.clone()) .await?; @@ -620,12 +644,9 @@ struct StorageWriterClient { #[expect(dead_code)] environment: Environment, request_sender: mpsc::Sender, - result_sender: Arc, u64)>>, - abort_handle: tokio::task::AbortHandle, - offset: u64, } impl StorageWriterClient { - pub async fn new(credentials: CredentialsFile) -> Result { + pub async fn new(credentials: CredentialsFile) -> Result<(Self,Streaming)> { let ts_grpc = google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials( Self::bigquery_grpc_auth_config(), Box::new(credentials), @@ -649,52 +670,18 @@ impl StorageWriterClient { let (tx, rx) = mpsc::channel(BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE); let stream = tokio_stream::wrappers::ReceiverStream::new(rx); - let (result_sender, _) = broadcast::channel(BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE + 1); - let result_sender = Arc::new(result_sender); - let result_sender_clone = result_sender.clone(); - let abort_handle = tokio::spawn(async move { - let mut resp = client - .append_rows(Request::new(stream)) - .await - .map_err(|e| SinkError::BigQuery(e.into())) - .unwrap() - .into_inner(); - let mut offset = 0; - loop { - let result = match resp.message().await { - Ok(Some(append_rows_response)) => { - if !append_rows_response.row_errors.is_empty() { - Some(format!( - "Insert error {:?}", - append_rows_response.row_errors - )) - } else { - None - } - } - Ok(None) => { - continue; - } - Err(e) => Some(e.to_report_string()), - }; - result_sender_clone.send((result, offset)).unwrap(); - offset += 1; - } - }) - .abort_handle(); + + let resp = client + .append_rows(Request::new(stream)) + .await + .map_err(|e| SinkError::BigQuery(e.into())) + .unwrap() + .into_inner(); - Ok(StorageWriterClient { + Ok((StorageWriterClient { environment, request_sender: tx, - result_sender, - abort_handle, - offset: 0, - }) - } - - pub fn get_subscribe(&mut self) -> (u64, broadcast::Receiver<(Option, u64)>) { - self.offset += 1; - (self.offset - 1, self.result_sender.subscribe()) + },resp)) } pub async fn append_rows( @@ -724,11 +711,6 @@ impl StorageWriterClient { } } } -impl Drop for StorageWriterClient { - fn drop(&mut self) { - self.abort_handle.abort(); - } -} fn build_protobuf_descriptor_pool(desc: &DescriptorProto) -> prost_reflect::DescriptorPool { let file_descriptor = FileDescriptorProto { From ca9e59928c8d8b7115fb24f647d8085039c89061 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Fri, 5 Jul 2024 16:28:16 +0800 Subject: [PATCH 08/18] fix --- src/bench/sink_bench/main.rs | 4 +- src/bench/sink_bench/sink_option.yml | 8 +-- src/connector/src/sink/big_query.rs | 84 +++++++++++++++++++--------- 3 files changed, 64 insertions(+), 32 deletions(-) diff --git a/src/bench/sink_bench/main.rs b/src/bench/sink_bench/main.rs index 189afdf6e83d4..329db4fadb863 100644 --- a/src/bench/sink_bench/main.rs +++ b/src/bench/sink_bench/main.rs @@ -367,9 +367,7 @@ where sink_writer_param.vnode_bitmap = Some(Bitmap::ones(1)); } let log_sinker = sink.new_log_sinker(sink_writer_param).await.unwrap(); - if let Err(e) = log_sinker.consume_log_and_sink(&mut log_reader).await { - return Err(e.to_report_string()); - } + log_sinker.consume_log_and_sink(&mut log_reader).await.unwrap(); Err("Stream closed".to_string()) } diff --git a/src/bench/sink_bench/sink_option.yml b/src/bench/sink_bench/sink_option.yml index 3a942db4edb30..09bed29bd06da 100644 --- a/src/bench/sink_bench/sink_option.yml +++ b/src/bench/sink_bench/sink_option.yml @@ -100,8 +100,8 @@ Starrocks: BigQuery: connector: 'bigquery' type: 'append-only' - bigquery.local.path: 'xxx.json' - bigquery.project: 'xxx' - bigquery.dataset: 'test_bigquery_sink' - bigquery.table: 'table_bench' + bigquery.local.path: '/home/xxhx/.ssh/rwc-playground-7e95d326dfd2.json' + bigquery.project: 'rwc-playground' + bigquery.dataset: 'test_123' + bigquery.table: 'table_sink' force_append_only: 'true' \ No newline at end of file diff --git a/src/connector/src/sink/big_query.rs b/src/connector/src/sink/big_query.rs index 26b9cd3dffca3..683f2811edcab 100644 --- a/src/connector/src/sink/big_query.rs +++ b/src/connector/src/sink/big_query.rs @@ -12,11 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +use core::pin::Pin; use core::time::Duration; use std::collections::{BTreeMap, HashMap}; use std::pin::pin; use anyhow::anyhow; +use futures::prelude::Future; +use futures_async_stream::try_stream; +use futures::{Stream, StreamExt}; use rw_futures_util::drop_either_future; use futures::future::{select, Either}; use gcp_bigquery_client::error::BQError; @@ -48,7 +52,7 @@ use serde_derive::Deserialize; use serde_with::{serde_as, DisplayFromStr}; use simd_json::prelude::ArrayTrait; use tokio::sync::mpsc; -use tonic::{async_trait, Streaming}; +use tonic::{async_trait, Response, Status, Streaming}; use url::Url; use uuid::Uuid; use with_options::WithOptions; @@ -88,14 +92,14 @@ pub struct BigQueryCommon { pub struct BigQueryLogSinker{ writer: BigQuerySinkWriter, - resp_stream: Streaming, + resp_stream: Pin> + Send>>, future_num: usize, } impl BigQueryLogSinker { - pub async fn new(writer: BigQuerySinkWriter, resp_stream: Streaming, future_num: usize) -> Self { + pub async fn new(writer: BigQuerySinkWriter, resp_stream: impl Stream> + Send + 'static, future_num: usize) -> Self { Self { writer, - resp_stream, + resp_stream: Box::pin(resp_stream), future_num, } } @@ -106,15 +110,8 @@ impl LogSinker for BigQueryLogSinker { async fn consume_log_and_sink(mut self, log_reader: &mut impl SinkLogReader) -> Result { let (tx, mut rx) = mpsc::channel(self.future_num); loop { - let select_result = drop_either_future( - select( - pin!(log_reader.next_item()), - pin!(rx.recv()), - ) - .await, - ); - match select_result { - Either::Left(item_result) => { + tokio::select! { + item_result = log_reader.next_item() => { let (epoch, item) = item_result?; match item { LogStoreReadItem::StreamChunk { chunk_id, chunk } => { @@ -127,16 +124,47 @@ impl LogSinker for BigQueryLogSinker { LogStoreReadItem::UpdateVnodeBitmap(_) => {} } } - Either::Right(result) => { + result = rx.recv() => { let offset = result.ok_or_else(||{ SinkError::BigQuery(anyhow::anyhow!("BigQuerySinkWriter error: channal unexpectedly closed")) })?; if matches!(offset,TruncateOffset::Chunk{..}){ - self.resp_stream.message().await.map_err(|e|SinkError::BigQuery(e.into()))?; + self.resp_stream.next().await.unwrap()?; } log_reader.truncate(offset)?; } } + // let select_result = drop_either_future( + // select( + // pin!(rx.recv()), + // pin!(log_reader.next_item()), + // ) + // .await, + // ); + // match select_result { + // Either::Right(item_result) => { + // let (epoch, item) = item_result?; + // match item { + // LogStoreReadItem::StreamChunk { chunk_id, chunk } => { + // self.writer.write_chunk(chunk).await?; + // tx.send(TruncateOffset::Chunk { epoch, chunk_id }).await.map_err(|err| SinkError::BigQuery(err.into()))?; + // } + // LogStoreReadItem::Barrier { .. } => { + // tx.send(TruncateOffset::Barrier { epoch }).await.map_err(|err| SinkError::BigQuery(err.into()))?; + // } + // LogStoreReadItem::UpdateVnodeBitmap(_) => {} + // } + // } + // Either::Left(result) => { + // let offset = result.ok_or_else(||{ + // SinkError::BigQuery(anyhow::anyhow!("BigQuerySinkWriter error: channal unexpectedly closed")) + // })?; + // if matches!(offset,TruncateOffset::Chunk{..}){ + // self.resp_stream.next().await.unwrap()?; + // } + // log_reader.truncate(offset)?; + // } + // } } } } @@ -156,7 +184,7 @@ impl BigQueryCommon { async fn build_writer_client( &self, aws_auth_props: &AwsAuthProps, - ) -> Result<(StorageWriterClient,Streaming)> { + ) -> Result<(StorageWriterClient,impl Stream>)> { let auth_json = self.get_auth_json_from_path(aws_auth_props).await?; let credentials_file = CredentialsFile::new_from_str(&auth_json) @@ -502,7 +530,7 @@ impl BigQuerySinkWriter { schema: Schema, pk_indices: Vec, is_append_only: bool, - ) -> Result<(Self,Streaming)> { + ) -> Result<(Self,impl Stream>)> { let (client,resp_stream) = config .common .build_writer_client(&config.aws_auth_props) @@ -640,13 +668,22 @@ impl BigQuerySinkWriter { } } +#[try_stream(ok = (), error = SinkError)] +pub async fn resp_to_stream(resp_stream: impl Future>, Status>> + 'static + Send) { + let mut resp_stream = resp_stream.await.map_err(|e| SinkError::BigQuery(e.into()))?.into_inner(); + loop { + resp_stream.message().await.map_err(|e|SinkError::BigQuery(e.into()))?; + yield (); + } +} + struct StorageWriterClient { #[expect(dead_code)] environment: Environment, request_sender: mpsc::Sender, } impl StorageWriterClient { - pub async fn new(credentials: CredentialsFile) -> Result<(Self,Streaming)> { + pub async fn new(credentials: CredentialsFile) -> Result<(Self,impl Stream>)> { let ts_grpc = google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials( Self::bigquery_grpc_auth_config(), Box::new(credentials), @@ -671,17 +708,14 @@ impl StorageWriterClient { let (tx, rx) = mpsc::channel(BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE); let stream = tokio_stream::wrappers::ReceiverStream::new(rx); - let resp = client - .append_rows(Request::new(stream)) - .await - .map_err(|e| SinkError::BigQuery(e.into())) - .unwrap() - .into_inner(); + let resp = async move {client + .append_rows(Request::new(stream)).await}; + let resp_stream = resp_to_stream(resp); Ok((StorageWriterClient { environment, request_sender: tx, - },resp)) + },resp_stream)) } pub async fn append_rows( From 09b645d174f7db7b74d6aff27790e4366bda0346 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Mon, 8 Jul 2024 12:57:37 +0800 Subject: [PATCH 09/18] save --- src/connector/src/sink/big_query.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connector/src/sink/big_query.rs b/src/connector/src/sink/big_query.rs index 683f2811edcab..664b35e439c82 100644 --- a/src/connector/src/sink/big_query.rs +++ b/src/connector/src/sink/big_query.rs @@ -70,7 +70,7 @@ pub const CHANGE_TYPE: &str = "_CHANGE_TYPE"; const DEFAULT_GRPC_CHANNEL_NUMS: usize = 4; const CONNECT_TIMEOUT: Option = Some(Duration::from_secs(30)); const CONNECTION_TIMEOUT: Option = None; -const BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 256; +const BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536; #[serde_as] #[derive(Deserialize, Debug, Clone, WithOptions)] From 02352ace548b685d50bbc3a71370c5cda4d547da Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Mon, 8 Jul 2024 18:41:12 +0800 Subject: [PATCH 10/18] refa fix --- src/bench/sink_bench/main.rs | 4 +- src/bench/sink_bench/sink_option.yml | 8 +- src/connector/src/sink/big_query.rs | 232 ++++++++++++++++----------- 3 files changed, 145 insertions(+), 99 deletions(-) diff --git a/src/bench/sink_bench/main.rs b/src/bench/sink_bench/main.rs index 329db4fadb863..189afdf6e83d4 100644 --- a/src/bench/sink_bench/main.rs +++ b/src/bench/sink_bench/main.rs @@ -367,7 +367,9 @@ where sink_writer_param.vnode_bitmap = Some(Bitmap::ones(1)); } let log_sinker = sink.new_log_sinker(sink_writer_param).await.unwrap(); - log_sinker.consume_log_and_sink(&mut log_reader).await.unwrap(); + if let Err(e) = log_sinker.consume_log_and_sink(&mut log_reader).await { + return Err(e.to_report_string()); + } Err("Stream closed".to_string()) } diff --git a/src/bench/sink_bench/sink_option.yml b/src/bench/sink_bench/sink_option.yml index 09bed29bd06da..3a942db4edb30 100644 --- a/src/bench/sink_bench/sink_option.yml +++ b/src/bench/sink_bench/sink_option.yml @@ -100,8 +100,8 @@ Starrocks: BigQuery: connector: 'bigquery' type: 'append-only' - bigquery.local.path: '/home/xxhx/.ssh/rwc-playground-7e95d326dfd2.json' - bigquery.project: 'rwc-playground' - bigquery.dataset: 'test_123' - bigquery.table: 'table_sink' + bigquery.local.path: 'xxx.json' + bigquery.project: 'xxx' + bigquery.dataset: 'test_bigquery_sink' + bigquery.table: 'table_bench' force_append_only: 'true' \ No newline at end of file diff --git a/src/connector/src/sink/big_query.rs b/src/connector/src/sink/big_query.rs index 664b35e439c82..7c92f3fa98db3 100644 --- a/src/connector/src/sink/big_query.rs +++ b/src/connector/src/sink/big_query.rs @@ -14,15 +14,14 @@ use core::pin::Pin; use core::time::Duration; -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap, VecDeque}; use std::pin::pin; use anyhow::anyhow; +use futures::prelude::future::{select, Either}; use futures::prelude::Future; -use futures_async_stream::try_stream; use futures::{Stream, StreamExt}; -use rw_futures_util::drop_either_future; -use futures::future::{select, Either}; +use futures_async_stream::try_stream; use gcp_bigquery_client::error::BQError; use gcp_bigquery_client::model::query_request::QueryRequest; use gcp_bigquery_client::model::table::Table; @@ -36,7 +35,7 @@ use google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_request:: ProtoData, Rows as AppendRowsRequestRows, }; use google_cloud_googleapis::cloud::bigquery::storage::v1::{ - AppendRowsRequest, AppendRowsResponse, ProtoRows, ProtoSchema + AppendRowsRequest, AppendRowsResponse, ProtoRows, ProtoSchema, }; use google_cloud_pubsub::client::google_cloud_auth; use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile; @@ -48,6 +47,7 @@ use prost_types::{ use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::types::DataType; +use rw_futures_util::drop_either_future; use serde_derive::Deserialize; use serde_with::{serde_as, DisplayFromStr}; use simd_json::prelude::ArrayTrait; @@ -60,7 +60,9 @@ use yup_oauth2::ServiceAccountKey; use super::encoder::{ProtoEncoder, ProtoHeader, RowEncoder, SerTo}; use super::log_store::{LogStoreReadItem, TruncateOffset}; -use super::{LogSinker, SinkError, SinkLogReader, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; +use super::{ + LogSinker, SinkError, SinkLogReader, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, +}; use crate::aws_utils::load_file_descriptor_from_s3; use crate::connector_common::AwsAuthProps; use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkParam, SinkWriterParam}; @@ -90,17 +92,71 @@ pub struct BigQueryCommon { pub auto_create: bool, } -pub struct BigQueryLogSinker{ - writer: BigQuerySinkWriter, +struct BigQueryFutureManager { + offset_queue: VecDeque, resp_stream: Pin> + Send>>, - future_num: usize, + max_future_num: usize, + tx: mpsc::Sender<()>, + rx: mpsc::Receiver<()>, +} +impl BigQueryFutureManager { + pub fn new( + max_future_num: usize, + resp_stream: impl Stream> + Send + 'static, + ) -> Self { + let offset_queue = VecDeque::with_capacity(max_future_num); + let (tx, rx) = mpsc::channel(1); + Self { + offset_queue, + resp_stream: Box::pin(resp_stream), + max_future_num, + tx, + rx, + } + } + + pub async fn add_offset(&mut self, offset: TruncateOffset) -> Result<()> { + self.offset_queue.push_back(offset); + let _ = self.tx.try_send(()); + while self.offset_queue.len() >= self.max_future_num { + self.wait_one_offset().await?; + } + Ok(()) + } + + pub async fn wait_next_offset(&mut self) -> Result { + loop { + self.rx.recv().await.unwrap(); + if let Some(offset) = self.wait_one_offset().await? { + return Ok(offset); + } + } + } + + pub async fn wait_one_offset(&mut self) -> Result> { + if let Some(offset) = self.offset_queue.pop_front() { + if matches!(offset, TruncateOffset::Chunk { .. }) { + self.resp_stream.next().await.unwrap()?; + } + Ok(Some(offset)) + } else { + Ok(None) + } + } +} +pub struct BigQueryLogSinker { + writer: BigQuerySinkWriter, + bigquery_future_manager: BigQueryFutureManager, } impl BigQueryLogSinker { - pub async fn new(writer: BigQuerySinkWriter, resp_stream: impl Stream> + Send + 'static, future_num: usize) -> Self { + pub fn new( + writer: BigQuerySinkWriter, + resp_stream: impl Stream> + Send + 'static, + future_num: usize, + ) -> Self { Self { writer, - resp_stream: Box::pin(resp_stream), - future_num, + bigquery_future_manager: BigQueryFutureManager::new(future_num, resp_stream), } } } @@ -108,63 +164,32 @@ impl BigQueryLogSinker { #[async_trait] impl LogSinker for BigQueryLogSinker { async fn consume_log_and_sink(mut self, log_reader: &mut impl SinkLogReader) -> Result { - let (tx, mut rx) = mpsc::channel(self.future_num); - loop { - tokio::select! { - item_result = log_reader.next_item() => { + loop{ + let select_result = drop_either_future( + select( + pin!(log_reader.next_item()), + pin!(self.bigquery_future_manager.wait_next_offset()), + ) + .await, + ); + match select_result { + Either::Left(item_result) => { let (epoch, item) = item_result?; match item { LogStoreReadItem::StreamChunk { chunk_id, chunk } => { self.writer.write_chunk(chunk).await?; - tx.send(TruncateOffset::Chunk { epoch, chunk_id }).await.map_err(|err| SinkError::BigQuery(err.into()))?; + self.bigquery_future_manager.add_offset(TruncateOffset::Chunk { epoch, chunk_id }).await?; } LogStoreReadItem::Barrier { .. } => { - tx.send(TruncateOffset::Barrier { epoch }).await.map_err(|err| SinkError::BigQuery(err.into()))?; + self.bigquery_future_manager.add_offset(TruncateOffset::Barrier { epoch }).await?; } LogStoreReadItem::UpdateVnodeBitmap(_) => {} } } - result = rx.recv() => { - let offset = result.ok_or_else(||{ - SinkError::BigQuery(anyhow::anyhow!("BigQuerySinkWriter error: channal unexpectedly closed")) - })?; - if matches!(offset,TruncateOffset::Chunk{..}){ - self.resp_stream.next().await.unwrap()?; - } - log_reader.truncate(offset)?; + Either::Right(offset) => { + log_reader.truncate(offset?)?; } } - // let select_result = drop_either_future( - // select( - // pin!(rx.recv()), - // pin!(log_reader.next_item()), - // ) - // .await, - // ); - // match select_result { - // Either::Right(item_result) => { - // let (epoch, item) = item_result?; - // match item { - // LogStoreReadItem::StreamChunk { chunk_id, chunk } => { - // self.writer.write_chunk(chunk).await?; - // tx.send(TruncateOffset::Chunk { epoch, chunk_id }).await.map_err(|err| SinkError::BigQuery(err.into()))?; - // } - // LogStoreReadItem::Barrier { .. } => { - // tx.send(TruncateOffset::Barrier { epoch }).await.map_err(|err| SinkError::BigQuery(err.into()))?; - // } - // LogStoreReadItem::UpdateVnodeBitmap(_) => {} - // } - // } - // Either::Left(result) => { - // let offset = result.ok_or_else(||{ - // SinkError::BigQuery(anyhow::anyhow!("BigQuerySinkWriter error: channal unexpectedly closed")) - // })?; - // if matches!(offset,TruncateOffset::Chunk{..}){ - // self.resp_stream.next().await.unwrap()?; - // } - // log_reader.truncate(offset)?; - // } - // } } } } @@ -184,7 +209,7 @@ impl BigQueryCommon { async fn build_writer_client( &self, aws_auth_props: &AwsAuthProps, - ) -> Result<(StorageWriterClient,impl Stream>)> { + ) -> Result<(StorageWriterClient, impl Stream>)> { let auth_json = self.get_auth_json_from_path(aws_auth_props).await?; let credentials_file = CredentialsFile::new_from_str(&auth_json) @@ -412,13 +437,18 @@ impl Sink for BigQuerySink { const SINK_NAME: &'static str = BIGQUERY_SINK; async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result { - let (writer,resp_stream) = BigQuerySinkWriter::new( + let (writer, resp_stream) = BigQuerySinkWriter::new( self.config.clone(), self.schema.clone(), self.pk_indices.clone(), self.is_append_only, - ).await?; - Ok(BigQueryLogSinker::new(writer,resp_stream,BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE).await) + ) + .await?; + Ok(BigQueryLogSinker::new( + writer, + resp_stream, + BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE, + )) } async fn validate(&self) -> Result<()> { @@ -530,8 +560,8 @@ impl BigQuerySinkWriter { schema: Schema, pk_indices: Vec, is_append_only: bool, - ) -> Result<(Self,impl Stream>)> { - let (client,resp_stream) = config + ) -> Result<(Self, impl Stream>)> { + let (client, resp_stream) = config .common .build_writer_client(&config.aws_auth_props) .await?; @@ -578,23 +608,26 @@ impl BigQuerySinkWriter { message_descriptor.clone(), ProtoHeader::None, )?; - Ok((Self { - write_stream: format!( - "projects/{}/datasets/{}/tables/{}/streams/_default", - config.common.project, config.common.dataset, config.common.table - ), - config, - schema, - pk_indices, - client, - is_append_only, - row_encoder, - message_descriptor, - proto_field, - writer_pb_schema: ProtoSchema { - proto_descriptor: Some(descriptor_proto), + Ok(( + Self { + write_stream: format!( + "projects/{}/datasets/{}/tables/{}/streams/_default", + config.common.project, config.common.dataset, config.common.table + ), + config, + schema, + pk_indices, + client, + is_append_only, + row_encoder, + message_descriptor, + proto_field, + writer_pb_schema: ProtoSchema { + proto_descriptor: Some(descriptor_proto), + }, }, - },resp_stream)) + resp_stream, + )) } fn append_only(&mut self, chunk: StreamChunk) -> Result>> { @@ -645,10 +678,7 @@ impl BigQuerySinkWriter { Ok(serialized_rows) } - async fn write_chunk<'a>( - &'a mut self, - chunk: StreamChunk, - ) -> Result<()> { + async fn write_chunk(&mut self, chunk: StreamChunk) -> Result<()> { let serialized_rows = if self.is_append_only { self.append_only(chunk)? } else { @@ -669,10 +699,20 @@ impl BigQuerySinkWriter { } #[try_stream(ok = (), error = SinkError)] -pub async fn resp_to_stream(resp_stream: impl Future>, Status>> + 'static + Send) { - let mut resp_stream = resp_stream.await.map_err(|e| SinkError::BigQuery(e.into()))?.into_inner(); +pub async fn resp_to_stream( + resp_stream: impl Future>, Status>> + + 'static + + Send, +) { + let mut resp_stream = resp_stream + .await + .map_err(|e| SinkError::BigQuery(e.into()))? + .into_inner(); loop { - resp_stream.message().await.map_err(|e|SinkError::BigQuery(e.into()))?; + resp_stream + .message() + .await + .map_err(|e| SinkError::BigQuery(e.into()))?; yield (); } } @@ -683,7 +723,9 @@ struct StorageWriterClient { request_sender: mpsc::Sender, } impl StorageWriterClient { - pub async fn new(credentials: CredentialsFile) -> Result<(Self,impl Stream>)> { + pub async fn new( + credentials: CredentialsFile, + ) -> Result<(Self, impl Stream>)> { let ts_grpc = google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials( Self::bigquery_grpc_auth_config(), Box::new(credentials), @@ -707,15 +749,17 @@ impl StorageWriterClient { let (tx, rx) = mpsc::channel(BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE); let stream = tokio_stream::wrappers::ReceiverStream::new(rx); - - let resp = async move {client - .append_rows(Request::new(stream)).await}; + + let resp = async move { client.append_rows(Request::new(stream)).await }; let resp_stream = resp_to_stream(resp); - Ok((StorageWriterClient { - environment, - request_sender: tx, - },resp_stream)) + Ok(( + StorageWriterClient { + environment, + request_sender: tx, + }, + resp_stream, + )) } pub async fn append_rows( From f26bf0691d4fdd7c4e17b6382cf760608be4f912 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Tue, 9 Jul 2024 12:11:35 +0800 Subject: [PATCH 11/18] fix ci --- src/connector/src/sink/big_query.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/connector/src/sink/big_query.rs b/src/connector/src/sink/big_query.rs index fee9445ba57f8..975c7503ab4c3 100644 --- a/src/connector/src/sink/big_query.rs +++ b/src/connector/src/sink/big_query.rs @@ -52,7 +52,7 @@ use serde_derive::Deserialize; use serde_with::{serde_as, DisplayFromStr}; use simd_json::prelude::ArrayTrait; use tokio::sync::mpsc; -use tonic::{async_trait, Response, Status, Streaming}; +use tonic::{async_trait, Response, Status}; use url::Url; use uuid::Uuid; use with_options::WithOptions; @@ -705,7 +705,12 @@ impl BigQuerySinkWriter { #[try_stream(ok = (), error = SinkError)] pub async fn resp_to_stream( - resp_stream: impl Future>, Status>> + resp_stream: impl Future< + Output = std::result::Result< + Response>, + Status, + >, + > + 'static + Send, ) { From 181978c4ce03e09ee30ba34636cefa49f808209c Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Fri, 19 Jul 2024 12:39:12 +0800 Subject: [PATCH 12/18] fix comm --- src/connector/src/sink/big_query.rs | 70 +++++++++-------------------- 1 file changed, 22 insertions(+), 48 deletions(-) diff --git a/src/connector/src/sink/big_query.rs b/src/connector/src/sink/big_query.rs index 975c7503ab4c3..d02a487d7b81b 100644 --- a/src/connector/src/sink/big_query.rs +++ b/src/connector/src/sink/big_query.rs @@ -15,10 +15,8 @@ use core::pin::Pin; use core::time::Duration; use std::collections::{BTreeMap, HashMap, VecDeque}; -use std::pin::pin; use anyhow::anyhow; -use futures::prelude::future::{select, Either}; use futures::prelude::Future; use futures::{Stream, StreamExt}; use futures_async_stream::try_stream; @@ -47,7 +45,6 @@ use prost_types::{ use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::types::DataType; -use rw_futures_util::drop_either_future; use serde_derive::Deserialize; use serde_with::{serde_as, DisplayFromStr}; use simd_json::prelude::ArrayTrait; @@ -95,9 +92,6 @@ pub struct BigQueryCommon { struct BigQueryFutureManager { offset_queue: VecDeque, resp_stream: Pin> + Send>>, - max_future_num: usize, - tx: mpsc::Sender<()>, - rx: mpsc::Receiver<()>, } impl BigQueryFutureManager { pub fn new( @@ -105,49 +99,35 @@ impl BigQueryFutureManager { resp_stream: impl Stream> + Send + 'static, ) -> Self { let offset_queue = VecDeque::with_capacity(max_future_num); - let (tx, rx) = mpsc::channel(1); Self { offset_queue, resp_stream: Box::pin(resp_stream), - max_future_num, - tx, - rx, } } - pub async fn add_offset(&mut self, offset: TruncateOffset) -> Result<()> { + pub async fn add_offset(&mut self, offset: TruncateOffset) { self.offset_queue.push_back(offset); - let _ = self.tx.try_send(()); - while self.offset_queue.len() >= self.max_future_num { - self.wait_one_offset().await?; - } - Ok(()) } pub async fn wait_next_offset(&mut self) -> Result { - loop { - if let Some(offset) = self.wait_one_offset().await? { - return Ok(offset); - } else { - self.rx.recv().await.unwrap(); - } - } - } - - pub async fn wait_one_offset(&mut self) -> Result> { - if let Some(offset) = self.offset_queue.pop_front() { - if matches!(offset, TruncateOffset::Chunk { .. }) { - self.resp_stream.next().await.unwrap()?; - } - Ok(Some(offset)) - } else { - Ok(None) + if let Some(TruncateOffset::Barrier { .. }) = self.offset_queue.front() { + return Ok(self.offset_queue.pop_front().unwrap()); } + self.resp_stream + .next() + .await + .ok_or_else(|| SinkError::BigQuery(anyhow::anyhow!("end of stream")))??; + self.offset_queue.pop_front().ok_or_else(|| { + SinkError::BigQuery(anyhow::anyhow!( + "should have pending chunk offset when we receive new response" + )) + }) } } pub struct BigQueryLogSinker { writer: BigQuerySinkWriter, bigquery_future_manager: BigQueryFutureManager, + future_num: usize, } impl BigQueryLogSinker { pub fn new( @@ -158,6 +138,7 @@ impl BigQueryLogSinker { Self { writer, bigquery_future_manager: BigQueryFutureManager::new(future_num, resp_stream), + future_num, } } } @@ -166,35 +147,28 @@ impl BigQueryLogSinker { impl LogSinker for BigQueryLogSinker { async fn consume_log_and_sink(mut self, log_reader: &mut impl SinkLogReader) -> Result { loop { - let select_result = drop_either_future( - select( - pin!(log_reader.next_item()), - pin!(self.bigquery_future_manager.wait_next_offset()), - ) - .await, - ); - match select_result { - Either::Left(item_result) => { + tokio::select!( + offset = self.bigquery_future_manager.wait_next_offset() => { + log_reader.truncate(offset?)?; + } + item_result = log_reader.next_item(), if self.bigquery_future_manager.offset_queue.len() <= self.future_num => { let (epoch, item) = item_result?; match item { LogStoreReadItem::StreamChunk { chunk_id, chunk } => { self.writer.write_chunk(chunk).await?; self.bigquery_future_manager .add_offset(TruncateOffset::Chunk { epoch, chunk_id }) - .await?; + .await; } LogStoreReadItem::Barrier { .. } => { self.bigquery_future_manager .add_offset(TruncateOffset::Barrier { epoch }) - .await?; + .await; } LogStoreReadItem::UpdateVnodeBitmap(_) => {} } } - Either::Right(offset) => { - log_reader.truncate(offset?)?; - } - } + ) } } } From 9cc98ca83a1dab62ba7974b876b5ed9a8b594d13 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Mon, 22 Jul 2024 11:22:19 +0800 Subject: [PATCH 13/18] fix ci svae add fix --- src/connector/src/sink/big_query.rs | 83 +++++++++++++++++++++-------- 1 file changed, 60 insertions(+), 23 deletions(-) diff --git a/src/connector/src/sink/big_query.rs b/src/connector/src/sink/big_query.rs index d02a487d7b81b..7c81a41386c9b 100644 --- a/src/connector/src/sink/big_query.rs +++ b/src/connector/src/sink/big_query.rs @@ -70,6 +70,8 @@ const DEFAULT_GRPC_CHANNEL_NUMS: usize = 4; const CONNECT_TIMEOUT: Option = Some(Duration::from_secs(30)); const CONNECTION_TIMEOUT: Option = None; const BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536; +// < 10MB, we set 8MB +const MAX_ROW_SIZE: usize = 8*1024*1024; #[serde_as] #[derive(Deserialize, Debug, Clone, WithOptions)] @@ -90,7 +92,7 @@ pub struct BigQueryCommon { } struct BigQueryFutureManager { - offset_queue: VecDeque, + offset_queue: VecDeque>, resp_stream: Pin> + Send>>, } impl BigQueryFutureManager { @@ -105,12 +107,16 @@ impl BigQueryFutureManager { } } - pub async fn add_offset(&mut self, offset: TruncateOffset) { - self.offset_queue.push_back(offset); + pub fn add_offset(&mut self, offset: TruncateOffset,mut resp_num: usize) { + while resp_num >1 { + self.offset_queue.push_back(None); + resp_num -= 1; + } + self.offset_queue.push_back(Some(offset)); } - pub async fn wait_next_offset(&mut self) -> Result { - if let Some(TruncateOffset::Barrier { .. }) = self.offset_queue.front() { + pub async fn wait_next_offset(&mut self) -> Result> { + if let Some(Some(TruncateOffset::Barrier { .. })) = self.offset_queue.front() { return Ok(self.offset_queue.pop_front().unwrap()); } self.resp_stream @@ -149,21 +155,21 @@ impl LogSinker for BigQueryLogSinker { loop { tokio::select!( offset = self.bigquery_future_manager.wait_next_offset() => { - log_reader.truncate(offset?)?; + if let Some(offset) = offset?{ + log_reader.truncate(offset)?; + } } item_result = log_reader.next_item(), if self.bigquery_future_manager.offset_queue.len() <= self.future_num => { let (epoch, item) = item_result?; match item { LogStoreReadItem::StreamChunk { chunk_id, chunk } => { - self.writer.write_chunk(chunk).await?; + let resp_num = self.writer.write_chunk(chunk).await?; self.bigquery_future_manager - .add_offset(TruncateOffset::Chunk { epoch, chunk_id }) - .await; + .add_offset(TruncateOffset::Chunk { epoch, chunk_id },resp_num); } LogStoreReadItem::Barrier { .. } => { self.bigquery_future_manager - .add_offset(TruncateOffset::Barrier { epoch }) - .await; + .add_offset(TruncateOffset::Barrier { epoch },0); } LogStoreReadItem::UpdateVnodeBitmap(_) => {} } @@ -657,23 +663,41 @@ impl BigQuerySinkWriter { Ok(serialized_rows) } - async fn write_chunk(&mut self, chunk: StreamChunk) -> Result<()> { + async fn write_chunk(&mut self, chunk: StreamChunk) -> Result { let serialized_rows = if self.is_append_only { self.append_only(chunk)? } else { self.upsert(chunk)? }; if serialized_rows.is_empty() { - return Ok(()); + return Ok(0); } - let rows = AppendRowsRequestRows::ProtoRows(ProtoData { - writer_schema: Some(self.writer_pb_schema.clone()), - rows: Some(ProtoRows { serialized_rows }), - }); - self.client - .append_rows(rows, self.write_stream.clone()) - .await?; - Ok(()) + let mut result = Vec::new(); + let mut result_inner = Vec::new(); + let mut size_count = 0; + for i in serialized_rows{ + size_count += i.len(); + if size_count > MAX_ROW_SIZE { + result.push(result_inner); + result_inner = Vec::new(); + size_count = i.len(); + } + result_inner.push(i); + } + if !result_inner.is_empty(){ + result.push(result_inner); + } + let len = result.len(); + for serialized_rows in result{ + let rows = AppendRowsRequestRows::ProtoRows(ProtoData { + writer_schema: Some(self.writer_pb_schema.clone()), + rows: Some(ProtoRows { serialized_rows }), + }); + self.client + .append_rows(rows, self.write_stream.clone()) + .await?; + } + Ok(len) } } @@ -693,10 +717,23 @@ pub async fn resp_to_stream( .map_err(|e| SinkError::BigQuery(e.into()))? .into_inner(); loop { - resp_stream + if let Some(append_rows_response) = resp_stream .message() .await - .map_err(|e| SinkError::BigQuery(e.into()))?; + .map_err(|e| SinkError::BigQuery(e.into()))?{ + if !append_rows_response.row_errors.is_empty() { + return Err(SinkError::BigQuery(anyhow::anyhow!( + "bigquery insert error {:?}", + append_rows_response.row_errors + ))); + } + if let Some(google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_response::Response::Error(status)) = append_rows_response.response{ + return Err(SinkError::BigQuery(anyhow::anyhow!( + "bigquery insert error {:?}", + status + ))); + } + } yield (); } } From 05d3922d6a0d86b0c12eef2588c89718f32c6e75 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Mon, 22 Jul 2024 14:00:39 +0800 Subject: [PATCH 14/18] fmt --- src/connector/src/sink/big_query.rs | 33 +++++++++++++++-------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/src/connector/src/sink/big_query.rs b/src/connector/src/sink/big_query.rs index 7c81a41386c9b..32e8c26ed255e 100644 --- a/src/connector/src/sink/big_query.rs +++ b/src/connector/src/sink/big_query.rs @@ -71,7 +71,7 @@ const CONNECT_TIMEOUT: Option = Some(Duration::from_secs(30)); const CONNECTION_TIMEOUT: Option = None; const BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536; // < 10MB, we set 8MB -const MAX_ROW_SIZE: usize = 8*1024*1024; +const MAX_ROW_SIZE: usize = 8 * 1024 * 1024; #[serde_as] #[derive(Deserialize, Debug, Clone, WithOptions)] @@ -107,8 +107,8 @@ impl BigQueryFutureManager { } } - pub fn add_offset(&mut self, offset: TruncateOffset,mut resp_num: usize) { - while resp_num >1 { + pub fn add_offset(&mut self, offset: TruncateOffset, mut resp_num: usize) { + while resp_num > 1 { self.offset_queue.push_back(None); resp_num -= 1; } @@ -675,20 +675,20 @@ impl BigQuerySinkWriter { let mut result = Vec::new(); let mut result_inner = Vec::new(); let mut size_count = 0; - for i in serialized_rows{ + for i in serialized_rows { size_count += i.len(); if size_count > MAX_ROW_SIZE { result.push(result_inner); result_inner = Vec::new(); - size_count = i.len(); + size_count = i.len(); } result_inner.push(i); } - if !result_inner.is_empty(){ + if !result_inner.is_empty() { result.push(result_inner); } let len = result.len(); - for serialized_rows in result{ + for serialized_rows in result { let rows = AppendRowsRequestRows::ProtoRows(ProtoData { writer_schema: Some(self.writer_pb_schema.clone()), rows: Some(ProtoRows { serialized_rows }), @@ -720,20 +720,21 @@ pub async fn resp_to_stream( if let Some(append_rows_response) = resp_stream .message() .await - .map_err(|e| SinkError::BigQuery(e.into()))?{ - if !append_rows_response.row_errors.is_empty() { - return Err(SinkError::BigQuery(anyhow::anyhow!( - "bigquery insert error {:?}", - append_rows_response.row_errors - ))); - } - if let Some(google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_response::Response::Error(status)) = append_rows_response.response{ + .map_err(|e| SinkError::BigQuery(e.into()))? + { + if !append_rows_response.row_errors.is_empty() { + return Err(SinkError::BigQuery(anyhow::anyhow!( + "bigquery insert error {:?}", + append_rows_response.row_errors + ))); + } + if let Some(google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_response::Response::Error(status)) = append_rows_response.response{ return Err(SinkError::BigQuery(anyhow::anyhow!( "bigquery insert error {:?}", status ))); } - } + } yield (); } } From 5721414f2c42a009d1e92ab87c328bed86147104 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Mon, 29 Jul 2024 11:50:29 +0800 Subject: [PATCH 15/18] add doc --- src/connector/src/sink/big_query.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/connector/src/sink/big_query.rs b/src/connector/src/sink/big_query.rs index 32e8c26ed255e..71ea512f27787 100644 --- a/src/connector/src/sink/big_query.rs +++ b/src/connector/src/sink/big_query.rs @@ -92,7 +92,13 @@ pub struct BigQueryCommon { } struct BigQueryFutureManager { + // `offset_queue` holds the Some corresponding to each future. + // When we receive a barrier we add a Some(barrier). + // When we receive a chunk, if the chunk is larger than `MAX_ROW_SIZE`, we split it, and then we add n None and a Some(chunk) to the queue. offset_queue: VecDeque>, + // When we pop a Some(barrier) from the queue, we don't have to wait `resp_stream`, we just truncate. + // When we pop a Some(chunk) from the queue, we have to wait `resp_stream`. + // When we pop a None from the queue, we have to wait `resp_stream`, but we don't need to truncate. resp_stream: Pin> + Send>>, } impl BigQueryFutureManager { From ecd13d6f2f1a1bd656779b1433a25dad14a42e62 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Mon, 29 Jul 2024 12:09:34 +0800 Subject: [PATCH 16/18] fix comm --- src/connector/src/sink/big_query.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/connector/src/sink/big_query.rs b/src/connector/src/sink/big_query.rs index fa794f0fd00a4..d37ca49dcbaf9 100644 --- a/src/connector/src/sink/big_query.rs +++ b/src/connector/src/sink/big_query.rs @@ -121,7 +121,7 @@ impl BigQueryFutureManager { self.offset_queue.push_back(Some(offset)); } - pub async fn wait_next_offset(&mut self) -> Result> { + pub async fn next_offset(&mut self) -> Result> { if let Some(Some(TruncateOffset::Barrier { .. })) = self.offset_queue.front() { return Ok(self.offset_queue.pop_front().unwrap()); } @@ -160,7 +160,7 @@ impl LogSinker for BigQueryLogSinker { async fn consume_log_and_sink(mut self, log_reader: &mut impl SinkLogReader) -> Result { loop { tokio::select!( - offset = self.bigquery_future_manager.wait_next_offset() => { + offset = self.bigquery_future_manager.next_offset() => { if let Some(offset) = offset?{ log_reader.truncate(offset)?; } From 485b0e7823bdfe3d6f65f343c84b9e1dc92425bb Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Thu, 29 Aug 2024 10:35:29 +0800 Subject: [PATCH 17/18] save save save --- src/bench/sink_bench/sink_option.yml | 8 +-- src/connector/src/sink/big_query.rs | 90 ++++++++++++++-------------- 2 files changed, 48 insertions(+), 50 deletions(-) diff --git a/src/bench/sink_bench/sink_option.yml b/src/bench/sink_bench/sink_option.yml index 3a942db4edb30..224a84f13b204 100644 --- a/src/bench/sink_bench/sink_option.yml +++ b/src/bench/sink_bench/sink_option.yml @@ -100,8 +100,8 @@ Starrocks: BigQuery: connector: 'bigquery' type: 'append-only' - bigquery.local.path: 'xxx.json' - bigquery.project: 'xxx' - bigquery.dataset: 'test_bigquery_sink' - bigquery.table: 'table_bench' + bigquery.local.path: '/home/xxhx/.ssh/rwc-playground-7e95d326dfd2.json' + bigquery.project: 'rwc-playground' + bigquery.dataset: 'test_123' + bigquery.table: 't2' force_append_only: 'true' \ No newline at end of file diff --git a/src/connector/src/sink/big_query.rs b/src/connector/src/sink/big_query.rs index d37ca49dcbaf9..ebfe68104ca46 100644 --- a/src/connector/src/sink/big_query.rs +++ b/src/connector/src/sink/big_query.rs @@ -17,6 +17,7 @@ use core::time::Duration; use std::collections::{BTreeMap, HashMap, VecDeque}; use anyhow::{anyhow, Context}; +use futures::future::pending; use futures::prelude::Future; use futures::{Stream, StreamExt}; use futures_async_stream::try_stream; @@ -95,7 +96,7 @@ struct BigQueryFutureManager { // `offset_queue` holds the Some corresponding to each future. // When we receive a barrier we add a Some(barrier). // When we receive a chunk, if the chunk is larger than `MAX_ROW_SIZE`, we split it, and then we add n None and a Some(chunk) to the queue. - offset_queue: VecDeque>, + offset_queue: VecDeque<(TruncateOffset, usize)>, // When we pop a Some(barrier) from the queue, we don't have to wait `resp_stream`, we just truncate. // When we pop a Some(chunk) from the queue, we have to wait `resp_stream`. // When we pop a None from the queue, we have to wait `resp_stream`, but we don't need to truncate. @@ -113,27 +114,26 @@ impl BigQueryFutureManager { } } - pub fn add_offset(&mut self, offset: TruncateOffset, mut resp_num: usize) { - while resp_num > 1 { - self.offset_queue.push_back(None); - resp_num -= 1; - } - self.offset_queue.push_back(Some(offset)); + pub fn add_offset(&mut self, offset: TruncateOffset, resp_num: usize) { + self.offset_queue.push_back((offset, resp_num)); } - pub async fn next_offset(&mut self) -> Result> { - if let Some(Some(TruncateOffset::Barrier { .. })) = self.offset_queue.front() { - return Ok(self.offset_queue.pop_front().unwrap()); + pub async fn next_offset(&mut self) -> Result { + if let Some((_offset, remaining_resp_num)) = self.offset_queue.front_mut() { + if *remaining_resp_num == 0 { + return Ok(self.offset_queue.pop_front().unwrap().0); + } + while *remaining_resp_num > 0 { + self.resp_stream + .next() + .await + .ok_or_else(|| SinkError::BigQuery(anyhow::anyhow!("end of stream")))??; + *remaining_resp_num -= 1; + } + Ok(self.offset_queue.pop_front().unwrap().0) + } else { + pending().await } - self.resp_stream - .next() - .await - .ok_or_else(|| SinkError::BigQuery(anyhow::anyhow!("end of stream")))??; - self.offset_queue.pop_front().ok_or_else(|| { - SinkError::BigQuery(anyhow::anyhow!( - "should have pending chunk offset when we receive new response" - )) - }) } } pub struct BigQueryLogSinker { @@ -161,15 +161,13 @@ impl LogSinker for BigQueryLogSinker { loop { tokio::select!( offset = self.bigquery_future_manager.next_offset() => { - if let Some(offset) = offset?{ - log_reader.truncate(offset)?; - } + log_reader.truncate(offset?)?; } item_result = log_reader.next_item(), if self.bigquery_future_manager.offset_queue.len() <= self.future_num => { let (epoch, item) = item_result?; match item { LogStoreReadItem::StreamChunk { chunk_id, chunk } => { - let resp_num = self.writer.write_chunk(chunk).await?; + let resp_num = self.writer.write_chunk(chunk)?; self.bigquery_future_manager .add_offset(TruncateOffset::Chunk { epoch, chunk_id },resp_num); } @@ -669,7 +667,7 @@ impl BigQuerySinkWriter { Ok(serialized_rows) } - async fn write_chunk(&mut self, chunk: StreamChunk) -> Result { + fn write_chunk(&mut self, chunk: StreamChunk) -> Result { let serialized_rows = if self.is_append_only { self.append_only(chunk)? } else { @@ -699,9 +697,7 @@ impl BigQuerySinkWriter { writer_schema: Some(self.writer_pb_schema.clone()), rows: Some(ProtoRows { serialized_rows }), }); - self.client - .append_rows(rows, self.write_stream.clone()) - .await?; + self.client.append_rows(rows, self.write_stream.clone())?; } Ok(len) } @@ -723,32 +719,39 @@ pub async fn resp_to_stream( .map_err(|e| SinkError::BigQuery(e.into()))? .into_inner(); loop { - if let Some(append_rows_response) = resp_stream + match resp_stream .message() .await .map_err(|e| SinkError::BigQuery(e.into()))? { - if !append_rows_response.row_errors.is_empty() { - return Err(SinkError::BigQuery(anyhow::anyhow!( - "bigquery insert error {:?}", - append_rows_response.row_errors - ))); - } - if let Some(google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_response::Response::Error(status)) = append_rows_response.response{ + Some(append_rows_response) => { + if !append_rows_response.row_errors.is_empty() { return Err(SinkError::BigQuery(anyhow::anyhow!( "bigquery insert error {:?}", - status + append_rows_response.row_errors ))); } + if let Some(google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_response::Response::Error(status)) = append_rows_response.response{ + return Err(SinkError::BigQuery(anyhow::anyhow!( + "bigquery insert error {:?}", + status + ))); + } + yield (); + } + None => { + return Err(SinkError::BigQuery(anyhow::anyhow!( + "bigquery insert error: end of resp stream", + ))); + } } - yield (); } } struct StorageWriterClient { #[expect(dead_code)] environment: Environment, - request_sender: mpsc::Sender, + request_sender: mpsc::UnboundedSender, } impl StorageWriterClient { pub async fn new( @@ -775,8 +778,8 @@ impl StorageWriterClient { .map_err(|e| SinkError::BigQuery(e.into()))?; let mut client = conn.conn(); - let (tx, rx) = mpsc::channel(BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE); - let stream = tokio_stream::wrappers::ReceiverStream::new(rx); + let (tx, rx) = mpsc::unbounded_channel(); + let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx); let resp = async move { client.append_rows(Request::new(stream)).await }; let resp_stream = resp_to_stream(resp); @@ -790,11 +793,7 @@ impl StorageWriterClient { )) } - pub async fn append_rows( - &mut self, - row: AppendRowsRequestRows, - write_stream: String, - ) -> Result<()> { + pub fn append_rows(&mut self, row: AppendRowsRequestRows, write_stream: String) -> Result<()> { let append_req = AppendRowsRequest { write_stream: write_stream.clone(), offset: None, @@ -804,7 +803,6 @@ impl StorageWriterClient { }; self.request_sender .send(append_req) - .await .map_err(|e| SinkError::BigQuery(e.into()))?; Ok(()) } From 8f58c7fd8406d58767b53cd0d659f564a7eb2b54 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Mon, 2 Sep 2024 14:49:05 +0800 Subject: [PATCH 18/18] fix comm --- src/bench/sink_bench/sink_option.yml | 8 ++++---- src/connector/src/sink/big_query.rs | 10 +++++----- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/bench/sink_bench/sink_option.yml b/src/bench/sink_bench/sink_option.yml index 224a84f13b204..3a942db4edb30 100644 --- a/src/bench/sink_bench/sink_option.yml +++ b/src/bench/sink_bench/sink_option.yml @@ -100,8 +100,8 @@ Starrocks: BigQuery: connector: 'bigquery' type: 'append-only' - bigquery.local.path: '/home/xxhx/.ssh/rwc-playground-7e95d326dfd2.json' - bigquery.project: 'rwc-playground' - bigquery.dataset: 'test_123' - bigquery.table: 't2' + bigquery.local.path: 'xxx.json' + bigquery.project: 'xxx' + bigquery.dataset: 'test_bigquery_sink' + bigquery.table: 'table_bench' force_append_only: 'true' \ No newline at end of file diff --git a/src/connector/src/sink/big_query.rs b/src/connector/src/sink/big_query.rs index ebfe68104ca46..ad5660dae2627 100644 --- a/src/connector/src/sink/big_query.rs +++ b/src/connector/src/sink/big_query.rs @@ -94,12 +94,12 @@ pub struct BigQueryCommon { struct BigQueryFutureManager { // `offset_queue` holds the Some corresponding to each future. - // When we receive a barrier we add a Some(barrier). - // When we receive a chunk, if the chunk is larger than `MAX_ROW_SIZE`, we split it, and then we add n None and a Some(chunk) to the queue. + // When TruncateOffset is barrier, the num is 0, we don't need to wait for the return of `resp_stream`. + // When TruncateOffset is chunk: + // 1. chunk has no rows. we didn't send, the num is 0, we don't need to wait for the return of `resp_stream`. + // 2. chunk is less than `MAX_ROW_SIZE`, we only sent once, the num is 1 and we only have to wait once for `resp_stream`. + // 3. chunk is less than `MAX_ROW_SIZE`, we only sent n, the num is n and we need to wait n times for r. offset_queue: VecDeque<(TruncateOffset, usize)>, - // When we pop a Some(barrier) from the queue, we don't have to wait `resp_stream`, we just truncate. - // When we pop a Some(chunk) from the queue, we have to wait `resp_stream`. - // When we pop a None from the queue, we have to wait `resp_stream`, but we don't need to truncate. resp_stream: Pin> + Send>>, } impl BigQueryFutureManager {