Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs committed Apr 9, 2024
1 parent 11f2ffb commit bd5181b
Showing 1 changed file with 11 additions and 6 deletions.
17 changes: 11 additions & 6 deletions src/connector/src/sink/big_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_request::
ProtoData, Rows as AppendRowsRequestRows,
};
use google_cloud_googleapis::cloud::bigquery::storage::v1::{
AppendRowsRequest, ProtoRows, ProtoSchema,
AppendRowsRequest, ProtoRows, ProtoSchema
};
use google_cloud_pubsub::client::google_cloud_auth;
use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile;
Expand Down Expand Up @@ -494,15 +494,17 @@ impl SinkWriter for BigQuerySinkWriter {
}

async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
self.write_rows().await?;
Ok(())
}

async fn abort(&mut self) -> Result<()> {
Ok(())
}

async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> {
async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> {
if is_checkpoint{
self.write_rows().await?;
}
Ok(())
}

Expand Down Expand Up @@ -548,6 +550,7 @@ impl StorageWriterClient {
rows: Vec<AppendRowsRequestRows>,
write_stream: String,
) -> Result<()> {
let mut resp_count = rows.len();
let append_req: Vec<AppendRowsRequest> = rows
.into_iter()
.map(|row| AppendRowsRequest {
Expand All @@ -564,18 +567,20 @@ impl StorageWriterClient {
.await
.map_err(|e| SinkError::BigQuery(e.into()))?
.into_inner();
while let Some(i) = resp
while let Some(append_rows_response) = resp
.message()
.await
.map_err(|e| SinkError::BigQuery(e.into()))?
{
if !i.row_errors.is_empty() {
resp_count -= 1;
if !append_rows_response.row_errors.is_empty() {
return Err(SinkError::BigQuery(anyhow::anyhow!(
"Insert error {:?}",
i.row_errors
append_rows_response.row_errors
)));
}
}
assert_eq!(resp_count,0,"bigquery sink insert error: the number of response inserted is not equal to the number of request");
Ok(())
}

Expand Down

0 comments on commit bd5181b

Please sign in to comment.