diff --git a/src/connector/src/sink/big_query.rs b/src/connector/src/sink/big_query.rs index 43f9d908c27a9..b5a4a4c72d517 100644 --- a/src/connector/src/sink/big_query.rs +++ b/src/connector/src/sink/big_query.rs @@ -29,7 +29,7 @@ use google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_request:: ProtoData, Rows as AppendRowsRequestRows, }; use google_cloud_googleapis::cloud::bigquery::storage::v1::{ - AppendRowsRequest, ProtoRows, ProtoSchema, + AppendRowsRequest, ProtoRows, ProtoSchema }; use google_cloud_pubsub::client::google_cloud_auth; use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile; @@ -494,7 +494,6 @@ impl SinkWriter for BigQuerySinkWriter { } async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> { - self.write_rows().await?; Ok(()) } @@ -502,7 +501,10 @@ impl SinkWriter for BigQuerySinkWriter { Ok(()) } - async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> { + async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> { + if is_checkpoint{ + self.write_rows().await?; + } Ok(()) } @@ -548,6 +550,7 @@ impl StorageWriterClient { rows: Vec, write_stream: String, ) -> Result<()> { + let mut resp_count = rows.len(); let append_req: Vec = rows .into_iter() .map(|row| AppendRowsRequest { @@ -564,18 +567,20 @@ impl StorageWriterClient { .await .map_err(|e| SinkError::BigQuery(e.into()))? .into_inner(); - while let Some(i) = resp + while let Some(append_rows_response) = resp .message() .await .map_err(|e| SinkError::BigQuery(e.into()))? { - if !i.row_errors.is_empty() { + resp_count -= 1; + if !append_rows_response.row_errors.is_empty() { return Err(SinkError::BigQuery(anyhow::anyhow!( "Insert error {:?}", - i.row_errors + append_rows_response.row_errors ))); } } + assert_eq!(resp_count,0,"bigquery sink insert error: the number of response inserted is not equal to the number of request"); Ok(()) }