Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): support async for bigquery sink #17488

Merged
merged 23 commits into from
Sep 3, 2024
Merged
239 changes: 118 additions & 121 deletions src/connector/src/sink/big_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@ use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;

use anyhow::anyhow;
use async_trait::async_trait;
use futures::prelude::TryFuture;
use gcp_bigquery_client::error::BQError;
use gcp_bigquery_client::model::query_request::QueryRequest;
use gcp_bigquery_client::model::table::Table;
use gcp_bigquery_client::model::table_field_schema::TableFieldSchema;
use gcp_bigquery_client::model::table_schema::TableSchema;
use gcp_bigquery_client::Client;
use google_cloud_bigquery::grpc::apiv1::bigquery_client::StreamingWriteClient;
use google_cloud_bigquery::grpc::apiv1::conn_pool::{WriteConnectionManager, DOMAIN};
use google_cloud_gax::conn::{ConnectionOptions, Environment};
use google_cloud_gax::grpc::Request;
Expand All @@ -42,32 +41,34 @@ use prost_types::{
FileDescriptorSet,
};
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::types::DataType;
use serde_derive::Deserialize;
use serde_with::{serde_as, DisplayFromStr};
use simd_json::prelude::ArrayTrait;
use thiserror_ext::AsReport;
use tokio::sync::{broadcast, mpsc};
use url::Url;
use uuid::Uuid;
use with_options::WithOptions;
use yup_oauth2::ServiceAccountKey;

use super::encoder::{ProtoEncoder, ProtoHeader, RowEncoder, SerTo};
use super::writer::LogSinkerOf;
use super::log_store::DeliveryFutureManagerAddFuture;
use super::writer::{
AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt,
};
use super::{SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT};
use crate::aws_utils::load_file_descriptor_from_s3;
use crate::connector_common::AwsAuthProps;
use crate::sink::writer::SinkWriterExt;
use crate::sink::{
DummySinkCommitCoordinator, Result, Sink, SinkParam, SinkWriter, SinkWriterParam,
};
use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkParam, SinkWriterParam};

pub const BIGQUERY_SINK: &str = "bigquery";
pub const CHANGE_TYPE: &str = "_CHANGE_TYPE";
const DEFAULT_GRPC_CHANNEL_NUMS: usize = 4;
const CONNECT_TIMEOUT: Option<Duration> = Some(Duration::from_secs(30));
const CONNECTION_TIMEOUT: Option<Duration> = None;
const BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 256;

#[serde_as]
#[derive(Deserialize, Debug, Clone, WithOptions)]
Expand All @@ -82,25 +83,11 @@ pub struct BigQueryCommon {
pub dataset: String,
#[serde(rename = "bigquery.table")]
pub table: String,
#[serde(rename = "bigquery.max_batch_rows", default = "default_max_batch_rows")]
#[serde_as(as = "DisplayFromStr")]
pub max_batch_rows: usize,
#[serde(rename = "bigquery.retry_times", default = "default_retry_times")]
#[serde_as(as = "DisplayFromStr")]
pub retry_times: usize,
#[serde(default)] // default false
#[serde_as(as = "DisplayFromStr")]
pub auto_create: bool,
}

fn default_max_batch_rows() -> usize {
1024
}

fn default_retry_times() -> usize {
5
}

impl BigQueryCommon {
async fn build_client(&self, aws_auth_props: &AwsAuthProps) -> Result<Client> {
let auth_json = self.get_auth_json_from_path(aws_auth_props).await?;
Expand Down Expand Up @@ -340,19 +327,19 @@ impl BigQuerySink {

impl Sink for BigQuerySink {
type Coordinator = DummySinkCommitCoordinator;
type LogSinker = LogSinkerOf<BigQuerySinkWriter>;
type LogSinker = AsyncTruncateLogSinkerOf<BigQuerySinkWriter>;

const SINK_NAME: &'static str = BIGQUERY_SINK;

async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
Ok(BigQuerySinkWriter::new(
self.config.clone(),
self.schema.clone(),
self.pk_indices.clone(),
self.is_append_only,
)
.await?
.into_log_sinker(writer_param.sink_metrics))
.into_log_sinker(BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE))
}

async fn validate(&self) -> Result<()> {
Expand Down Expand Up @@ -441,8 +428,6 @@ pub struct BigQuerySinkWriter {
message_descriptor: MessageDescriptor,
write_stream: String,
proto_field: Option<FieldDescriptor>,
write_rows: Vec<AppendRowsRequestRows>,
write_rows_count: usize,
}

impl TryFrom<SinkParam> for BigQuerySink {
Expand Down Expand Up @@ -530,8 +515,6 @@ impl BigQuerySinkWriter {
writer_pb_schema: ProtoSchema {
proto_descriptor: Some(descriptor_proto),
},
write_rows: vec![],
write_rows_count: 0,
})
}

Expand Down Expand Up @@ -582,80 +565,64 @@ impl BigQuerySinkWriter {
}
Ok(serialized_rows)
}

async fn write_rows(&mut self) -> Result<()> {
if self.write_rows.is_empty() {
return Ok(());
}
let mut errs = Vec::with_capacity(self.config.common.retry_times);
for _ in 0..self.config.common.retry_times {
match self
.client
.append_rows(self.write_rows.clone(), self.write_stream.clone())
.await
{
Ok(_) => {
self.write_rows_count = 0;
self.write_rows.clear();
return Ok(());
}
Err(e) => errs.push(e),
}
}
Err(SinkError::BigQuery(anyhow::anyhow!(
"Insert error {:?}",
errs
)))
}
}

#[async_trait]
impl SinkWriter for BigQuerySinkWriter {
async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
pub type BigQuerySinkDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;

impl AsyncTruncateSinkWriter for BigQuerySinkWriter {
type DeliveryFuture = BigQuerySinkDeliveryFuture;

async fn write_chunk<'a>(
&'a mut self,
chunk: StreamChunk,
mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
) -> Result<()> {
let serialized_rows = if self.is_append_only {
self.append_only(chunk)?
} else {
self.upsert(chunk)?
};
if !serialized_rows.is_empty() {
self.write_rows_count += serialized_rows.len();
let rows = AppendRowsRequestRows::ProtoRows(ProtoData {
writer_schema: Some(self.writer_pb_schema.clone()),
rows: Some(ProtoRows { serialized_rows }),
});
self.write_rows.push(rows);

if self.write_rows_count >= self.config.common.max_batch_rows {
self.write_rows().await?;
}
}
Ok(())
}

async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
Ok(())
}

async fn abort(&mut self) -> Result<()> {
Ok(())
}

async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> {
if is_checkpoint {
self.write_rows().await?;
if serialized_rows.is_empty() {
return Ok(());
}
Ok(())
}

async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Arc<Bitmap>) -> Result<()> {
let rows = AppendRowsRequestRows::ProtoRows(ProtoData {
writer_schema: Some(self.writer_pb_schema.clone()),
rows: Some(ProtoRows { serialized_rows }),
});
let (expect_offset, mut rx) = self.client.get_subscribe();
let future = Box::pin(async move {
loop {
match rx.recv().await {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think instead you can create a oneshot channel for each AppendRowsRequest so that you won't need this shared broadcast channel. When the spawned worker knows that a request is handled, it notifies the rx with the oneshot tx, and here the future can be simply the rx.

Ok((result, offset)) => {
if offset == expect_offset {
if let Some(result) = result {
return Err(SinkError::BigQuery(anyhow::anyhow!(result)));
} else {
return Ok(());
}
}
}
Err(e) => {
return Err(SinkError::BigQuery(anyhow::anyhow!(e)));
}
}
}
});
add_future.add_future_may_await(future).await?;
self.client
.append_rows(rows, self.write_stream.clone())
.await?;
Ok(())
}
}

struct StorageWriterClient {
client: StreamingWriteClient,
#[expect(dead_code)]
environment: Environment,
request_sender: mpsc::Sender<AppendRowsRequest>,
result_sender: Arc<broadcast::Sender<(Option<String>, u64)>>,
abort_handle: tokio::task::AbortHandle,
offset: u64,
}
impl StorageWriterClient {
pub async fn new(credentials: CredentialsFile) -> Result<Self> {
Expand All @@ -678,49 +645,74 @@ impl StorageWriterClient {
)
.await
.map_err(|e| SinkError::BigQuery(e.into()))?;
let client = conn.conn();
let mut client = conn.conn();

let (tx, rx) = mpsc::channel(BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use bounded channel here since we already limit the queue size in the select!

let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
let (result_sender, _) = broadcast::channel(BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE + 1);
let result_sender = Arc::new(result_sender);
let result_sender_clone = result_sender.clone();
let abort_handle = tokio::spawn(async move {
let mut resp = client
.append_rows(Request::new(stream))
.await
.map_err(|e| SinkError::BigQuery(e.into()))
.unwrap()
.into_inner();
let mut offset = 0;
loop {
let result = match resp.message().await {
Ok(Some(append_rows_response)) => {
if !append_rows_response.row_errors.is_empty() {
Some(format!(
"Insert error {:?}",
append_rows_response.row_errors
))
} else {
None
}
}
Ok(None) => {
continue;
}
Err(e) => Some(e.to_report_string()),
};
result_sender_clone.send((result, offset)).unwrap();
offset += 1;
}
})
.abort_handle();

Ok(StorageWriterClient {
client,
environment,
request_sender: tx,
result_sender,
abort_handle,
offset: 0,
})
}

pub fn get_subscribe(&mut self) -> (u64, broadcast::Receiver<(Option<String>, u64)>) {
self.offset += 1;
(self.offset - 1, self.result_sender.subscribe())
}

pub async fn append_rows(
&mut self,
rows: Vec<AppendRowsRequestRows>,
row: AppendRowsRequestRows,
write_stream: String,
) -> Result<()> {
let mut resp_count = rows.len();
let append_req: Vec<AppendRowsRequest> = rows
.into_iter()
.map(|row| AppendRowsRequest {
write_stream: write_stream.clone(),
offset: None,
trace_id: Uuid::new_v4().hyphenated().to_string(),
missing_value_interpretations: HashMap::default(),
rows: Some(row),
})
.collect();
let mut resp = self
.client
.append_rows(Request::new(tokio_stream::iter(append_req)))
.await
.map_err(|e| SinkError::BigQuery(e.into()))?
.into_inner();
while let Some(append_rows_response) = resp
.message()
let append_req = AppendRowsRequest {
write_stream: write_stream.clone(),
offset: None,
trace_id: Uuid::new_v4().hyphenated().to_string(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you tracked the trace_id locally instead of incrementing the self.offset? I think in the response stream, it should return the same trace_id in the corresponding response.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried it, but there is no ID in the returned message

missing_value_interpretations: HashMap::default(),
rows: Some(row),
};
self.request_sender
.send(append_req)
.await
.map_err(|e| SinkError::BigQuery(e.into()))?
{
resp_count -= 1;
if !append_rows_response.row_errors.is_empty() {
return Err(SinkError::BigQuery(anyhow::anyhow!(
"Insert error {:?}",
append_rows_response.row_errors
)));
}
}
assert_eq!(resp_count,0,"bigquery sink insert error: the number of response inserted is not equal to the number of request");
.map_err(|e| SinkError::BigQuery(e.into()))?;
Ok(())
}

Expand All @@ -732,6 +724,11 @@ impl StorageWriterClient {
}
}
}
impl Drop for StorageWriterClient {
fn drop(&mut self) {
self.abort_handle.abort();
}
}

fn build_protobuf_descriptor_pool(desc: &DescriptorProto) -> prost_reflect::DescriptorPool {
let file_descriptor = FileDescriptorProto {
Expand Down
8 changes: 0 additions & 8 deletions src/connector/with_options_sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,6 @@ BigQueryConfig:
- name: bigquery.table
field_type: String
required: true
- name: bigquery.max_batch_rows
field_type: usize
required: false
default: '1024'
- name: bigquery.retry_times
field_type: usize
required: false
default: '5'
- name: auto_create
field_type: bool
required: false
Expand Down
Loading