-
Notifications
You must be signed in to change notification settings - Fork 599
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sink): support async for bigquery sink #17488
Changes from 7 commits
4baebce
df1a80b
eed3cac
7df4521
7a311dc
e46e816
f097e35
9c94778
37101b1
ca9e599
09b645d
02352ac
568773a
f26bf06
181978c
9cc98ca
05d3922
5721414
aff5ac8
ecd13d6
485b0e7
8f58c7f
040d2da
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,14 +17,13 @@ use std::collections::{BTreeMap, HashMap}; | |
use std::sync::Arc; | ||
|
||
use anyhow::anyhow; | ||
use async_trait::async_trait; | ||
use futures::prelude::TryFuture; | ||
use gcp_bigquery_client::error::BQError; | ||
use gcp_bigquery_client::model::query_request::QueryRequest; | ||
use gcp_bigquery_client::model::table::Table; | ||
use gcp_bigquery_client::model::table_field_schema::TableFieldSchema; | ||
use gcp_bigquery_client::model::table_schema::TableSchema; | ||
use gcp_bigquery_client::Client; | ||
use google_cloud_bigquery::grpc::apiv1::bigquery_client::StreamingWriteClient; | ||
use google_cloud_bigquery::grpc::apiv1::conn_pool::{WriteConnectionManager, DOMAIN}; | ||
use google_cloud_gax::conn::{ConnectionOptions, Environment}; | ||
use google_cloud_gax::grpc::Request; | ||
|
@@ -42,32 +41,34 @@ use prost_types::{ | |
FileDescriptorSet, | ||
}; | ||
use risingwave_common::array::{Op, StreamChunk}; | ||
use risingwave_common::buffer::Bitmap; | ||
use risingwave_common::catalog::{Field, Schema}; | ||
use risingwave_common::types::DataType; | ||
use serde_derive::Deserialize; | ||
use serde_with::{serde_as, DisplayFromStr}; | ||
use simd_json::prelude::ArrayTrait; | ||
use thiserror_ext::AsReport; | ||
use tokio::sync::{broadcast, mpsc}; | ||
use url::Url; | ||
use uuid::Uuid; | ||
use with_options::WithOptions; | ||
use yup_oauth2::ServiceAccountKey; | ||
|
||
use super::encoder::{ProtoEncoder, ProtoHeader, RowEncoder, SerTo}; | ||
use super::writer::LogSinkerOf; | ||
use super::log_store::DeliveryFutureManagerAddFuture; | ||
use super::writer::{ | ||
AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, | ||
}; | ||
use super::{SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; | ||
use crate::aws_utils::load_file_descriptor_from_s3; | ||
use crate::connector_common::AwsAuthProps; | ||
use crate::sink::writer::SinkWriterExt; | ||
use crate::sink::{ | ||
DummySinkCommitCoordinator, Result, Sink, SinkParam, SinkWriter, SinkWriterParam, | ||
}; | ||
use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkParam, SinkWriterParam}; | ||
|
||
pub const BIGQUERY_SINK: &str = "bigquery"; | ||
pub const CHANGE_TYPE: &str = "_CHANGE_TYPE"; | ||
const DEFAULT_GRPC_CHANNEL_NUMS: usize = 4; | ||
const CONNECT_TIMEOUT: Option<Duration> = Some(Duration::from_secs(30)); | ||
const CONNECTION_TIMEOUT: Option<Duration> = None; | ||
const BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 256; | ||
|
||
#[serde_as] | ||
#[derive(Deserialize, Debug, Clone, WithOptions)] | ||
|
@@ -82,25 +83,11 @@ pub struct BigQueryCommon { | |
pub dataset: String, | ||
#[serde(rename = "bigquery.table")] | ||
pub table: String, | ||
#[serde(rename = "bigquery.max_batch_rows", default = "default_max_batch_rows")] | ||
#[serde_as(as = "DisplayFromStr")] | ||
pub max_batch_rows: usize, | ||
#[serde(rename = "bigquery.retry_times", default = "default_retry_times")] | ||
#[serde_as(as = "DisplayFromStr")] | ||
pub retry_times: usize, | ||
#[serde(default)] // default false | ||
#[serde_as(as = "DisplayFromStr")] | ||
pub auto_create: bool, | ||
} | ||
|
||
fn default_max_batch_rows() -> usize { | ||
1024 | ||
} | ||
|
||
fn default_retry_times() -> usize { | ||
5 | ||
} | ||
|
||
impl BigQueryCommon { | ||
async fn build_client(&self, aws_auth_props: &AwsAuthProps) -> Result<Client> { | ||
let auth_json = self.get_auth_json_from_path(aws_auth_props).await?; | ||
|
@@ -340,19 +327,19 @@ impl BigQuerySink { | |
|
||
impl Sink for BigQuerySink { | ||
type Coordinator = DummySinkCommitCoordinator; | ||
type LogSinker = LogSinkerOf<BigQuerySinkWriter>; | ||
type LogSinker = AsyncTruncateLogSinkerOf<BigQuerySinkWriter>; | ||
|
||
const SINK_NAME: &'static str = BIGQUERY_SINK; | ||
|
||
async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> { | ||
async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> { | ||
Ok(BigQuerySinkWriter::new( | ||
self.config.clone(), | ||
self.schema.clone(), | ||
self.pk_indices.clone(), | ||
self.is_append_only, | ||
) | ||
.await? | ||
.into_log_sinker(writer_param.sink_metrics)) | ||
.into_log_sinker(BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE)) | ||
} | ||
|
||
async fn validate(&self) -> Result<()> { | ||
|
@@ -441,8 +428,6 @@ pub struct BigQuerySinkWriter { | |
message_descriptor: MessageDescriptor, | ||
write_stream: String, | ||
proto_field: Option<FieldDescriptor>, | ||
write_rows: Vec<AppendRowsRequestRows>, | ||
write_rows_count: usize, | ||
} | ||
|
||
impl TryFrom<SinkParam> for BigQuerySink { | ||
|
@@ -530,8 +515,6 @@ impl BigQuerySinkWriter { | |
writer_pb_schema: ProtoSchema { | ||
proto_descriptor: Some(descriptor_proto), | ||
}, | ||
write_rows: vec![], | ||
write_rows_count: 0, | ||
}) | ||
} | ||
|
||
|
@@ -582,80 +565,64 @@ impl BigQuerySinkWriter { | |
} | ||
Ok(serialized_rows) | ||
} | ||
|
||
async fn write_rows(&mut self) -> Result<()> { | ||
if self.write_rows.is_empty() { | ||
return Ok(()); | ||
} | ||
let mut errs = Vec::with_capacity(self.config.common.retry_times); | ||
for _ in 0..self.config.common.retry_times { | ||
match self | ||
.client | ||
.append_rows(self.write_rows.clone(), self.write_stream.clone()) | ||
.await | ||
{ | ||
Ok(_) => { | ||
self.write_rows_count = 0; | ||
self.write_rows.clear(); | ||
return Ok(()); | ||
} | ||
Err(e) => errs.push(e), | ||
} | ||
} | ||
Err(SinkError::BigQuery(anyhow::anyhow!( | ||
"Insert error {:?}", | ||
errs | ||
))) | ||
} | ||
} | ||
|
||
#[async_trait] | ||
impl SinkWriter for BigQuerySinkWriter { | ||
async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { | ||
pub type BigQuerySinkDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static; | ||
|
||
impl AsyncTruncateSinkWriter for BigQuerySinkWriter { | ||
type DeliveryFuture = BigQuerySinkDeliveryFuture; | ||
|
||
async fn write_chunk<'a>( | ||
&'a mut self, | ||
chunk: StreamChunk, | ||
mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, | ||
) -> Result<()> { | ||
let serialized_rows = if self.is_append_only { | ||
self.append_only(chunk)? | ||
} else { | ||
self.upsert(chunk)? | ||
}; | ||
if !serialized_rows.is_empty() { | ||
self.write_rows_count += serialized_rows.len(); | ||
let rows = AppendRowsRequestRows::ProtoRows(ProtoData { | ||
writer_schema: Some(self.writer_pb_schema.clone()), | ||
rows: Some(ProtoRows { serialized_rows }), | ||
}); | ||
self.write_rows.push(rows); | ||
|
||
if self.write_rows_count >= self.config.common.max_batch_rows { | ||
self.write_rows().await?; | ||
} | ||
} | ||
Ok(()) | ||
} | ||
|
||
async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> { | ||
Ok(()) | ||
} | ||
|
||
async fn abort(&mut self) -> Result<()> { | ||
Ok(()) | ||
} | ||
|
||
async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> { | ||
if is_checkpoint { | ||
self.write_rows().await?; | ||
if serialized_rows.is_empty() { | ||
return Ok(()); | ||
} | ||
Ok(()) | ||
} | ||
|
||
async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Arc<Bitmap>) -> Result<()> { | ||
let rows = AppendRowsRequestRows::ProtoRows(ProtoData { | ||
writer_schema: Some(self.writer_pb_schema.clone()), | ||
rows: Some(ProtoRows { serialized_rows }), | ||
}); | ||
let (expect_offset, mut rx) = self.client.get_subscribe(); | ||
let future = Box::pin(async move { | ||
loop { | ||
match rx.recv().await { | ||
Ok((result, offset)) => { | ||
if offset == expect_offset { | ||
if let Some(result) = result { | ||
return Err(SinkError::BigQuery(anyhow::anyhow!(result))); | ||
} else { | ||
return Ok(()); | ||
} | ||
} | ||
} | ||
Err(e) => { | ||
return Err(SinkError::BigQuery(anyhow::anyhow!(e))); | ||
} | ||
} | ||
} | ||
}); | ||
add_future.add_future_may_await(future).await?; | ||
self.client | ||
.append_rows(rows, self.write_stream.clone()) | ||
.await?; | ||
Ok(()) | ||
} | ||
} | ||
|
||
struct StorageWriterClient { | ||
client: StreamingWriteClient, | ||
#[expect(dead_code)] | ||
environment: Environment, | ||
request_sender: mpsc::Sender<AppendRowsRequest>, | ||
result_sender: Arc<broadcast::Sender<(Option<String>, u64)>>, | ||
abort_handle: tokio::task::AbortHandle, | ||
offset: u64, | ||
} | ||
impl StorageWriterClient { | ||
pub async fn new(credentials: CredentialsFile) -> Result<Self> { | ||
|
@@ -678,49 +645,74 @@ impl StorageWriterClient { | |
) | ||
.await | ||
.map_err(|e| SinkError::BigQuery(e.into()))?; | ||
let client = conn.conn(); | ||
let mut client = conn.conn(); | ||
|
||
let (tx, rx) = mpsc::channel(BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can use bounded channel here since we already limit the queue size in the |
||
let stream = tokio_stream::wrappers::ReceiverStream::new(rx); | ||
let (result_sender, _) = broadcast::channel(BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE + 1); | ||
let result_sender = Arc::new(result_sender); | ||
let result_sender_clone = result_sender.clone(); | ||
let abort_handle = tokio::spawn(async move { | ||
let mut resp = client | ||
.append_rows(Request::new(stream)) | ||
.await | ||
.map_err(|e| SinkError::BigQuery(e.into())) | ||
.unwrap() | ||
.into_inner(); | ||
let mut offset = 0; | ||
loop { | ||
let result = match resp.message().await { | ||
Ok(Some(append_rows_response)) => { | ||
if !append_rows_response.row_errors.is_empty() { | ||
Some(format!( | ||
"Insert error {:?}", | ||
append_rows_response.row_errors | ||
)) | ||
} else { | ||
None | ||
} | ||
} | ||
Ok(None) => { | ||
continue; | ||
} | ||
Err(e) => Some(e.to_report_string()), | ||
}; | ||
result_sender_clone.send((result, offset)).unwrap(); | ||
offset += 1; | ||
} | ||
}) | ||
.abort_handle(); | ||
|
||
Ok(StorageWriterClient { | ||
client, | ||
environment, | ||
request_sender: tx, | ||
result_sender, | ||
abort_handle, | ||
offset: 0, | ||
}) | ||
} | ||
|
||
pub fn get_subscribe(&mut self) -> (u64, broadcast::Receiver<(Option<String>, u64)>) { | ||
self.offset += 1; | ||
(self.offset - 1, self.result_sender.subscribe()) | ||
} | ||
|
||
pub async fn append_rows( | ||
&mut self, | ||
rows: Vec<AppendRowsRequestRows>, | ||
row: AppendRowsRequestRows, | ||
write_stream: String, | ||
) -> Result<()> { | ||
let mut resp_count = rows.len(); | ||
let append_req: Vec<AppendRowsRequest> = rows | ||
.into_iter() | ||
.map(|row| AppendRowsRequest { | ||
write_stream: write_stream.clone(), | ||
offset: None, | ||
trace_id: Uuid::new_v4().hyphenated().to_string(), | ||
missing_value_interpretations: HashMap::default(), | ||
rows: Some(row), | ||
}) | ||
.collect(); | ||
let mut resp = self | ||
.client | ||
.append_rows(Request::new(tokio_stream::iter(append_req))) | ||
.await | ||
.map_err(|e| SinkError::BigQuery(e.into()))? | ||
.into_inner(); | ||
while let Some(append_rows_response) = resp | ||
.message() | ||
let append_req = AppendRowsRequest { | ||
write_stream: write_stream.clone(), | ||
offset: None, | ||
trace_id: Uuid::new_v4().hyphenated().to_string(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you tracked the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tried it, but there is no ID in the returned message |
||
missing_value_interpretations: HashMap::default(), | ||
rows: Some(row), | ||
}; | ||
self.request_sender | ||
.send(append_req) | ||
.await | ||
.map_err(|e| SinkError::BigQuery(e.into()))? | ||
{ | ||
resp_count -= 1; | ||
if !append_rows_response.row_errors.is_empty() { | ||
return Err(SinkError::BigQuery(anyhow::anyhow!( | ||
"Insert error {:?}", | ||
append_rows_response.row_errors | ||
))); | ||
} | ||
} | ||
assert_eq!(resp_count,0,"bigquery sink insert error: the number of response inserted is not equal to the number of request"); | ||
.map_err(|e| SinkError::BigQuery(e.into()))?; | ||
Ok(()) | ||
} | ||
|
||
|
@@ -732,6 +724,11 @@ impl StorageWriterClient { | |
} | ||
} | ||
} | ||
impl Drop for StorageWriterClient { | ||
fn drop(&mut self) { | ||
self.abort_handle.abort(); | ||
} | ||
} | ||
|
||
fn build_protobuf_descriptor_pool(desc: &DescriptorProto) -> prost_reflect::DescriptorPool { | ||
let file_descriptor = FileDescriptorProto { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think instead you can create a oneshot channel for each AppendRowsRequest so that you won't need this shared broadcast channel. When the spawned worker knows that a request is handled, it notifies the rx with the oneshot tx, and here the future can be simply the rx.