Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): support async for bigquery sink #17488

Merged
merged 23 commits into from
Sep 3, 2024
Prev Previous commit
Next Next commit
fix ci
svae

add fix
xxhZs committed Jul 22, 2024
commit 9cc98ca83a1dab62ba7974b876b5ed9a8b594d13
83 changes: 60 additions & 23 deletions src/connector/src/sink/big_query.rs
Original file line number Diff line number Diff line change
@@ -70,6 +70,8 @@ const DEFAULT_GRPC_CHANNEL_NUMS: usize = 4;
const CONNECT_TIMEOUT: Option<Duration> = Some(Duration::from_secs(30));
const CONNECTION_TIMEOUT: Option<Duration> = None;
const BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
// < 10MB, we set 8MB
const MAX_ROW_SIZE: usize = 8*1024*1024;

#[serde_as]
#[derive(Deserialize, Debug, Clone, WithOptions)]
@@ -90,7 +92,7 @@ pub struct BigQueryCommon {
}

struct BigQueryFutureManager {
offset_queue: VecDeque<TruncateOffset>,
offset_queue: VecDeque<Option<TruncateOffset>>,
resp_stream: Pin<Box<dyn Stream<Item = Result<()>> + Send>>,
}
impl BigQueryFutureManager {
@@ -105,12 +107,16 @@ impl BigQueryFutureManager {
}
}

pub async fn add_offset(&mut self, offset: TruncateOffset) {
self.offset_queue.push_back(offset);
pub fn add_offset(&mut self, offset: TruncateOffset,mut resp_num: usize) {
while resp_num >1 {
self.offset_queue.push_back(None);
resp_num -= 1;
}
self.offset_queue.push_back(Some(offset));
}

pub async fn wait_next_offset(&mut self) -> Result<TruncateOffset> {
if let Some(TruncateOffset::Barrier { .. }) = self.offset_queue.front() {
pub async fn wait_next_offset(&mut self) -> Result<Option<TruncateOffset>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess async fn next_offset is enough to indicate that the function call may need to be "waited".

if let Some(Some(TruncateOffset::Barrier { .. })) = self.offset_queue.front() {
return Ok(self.offset_queue.pop_front().unwrap());
}
self.resp_stream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the correctness depends on that the number of non-barrier items (either None or Some(TruncateOffset::Chunk)) in the queue is the same as the inflight request. However, in the implementation, it's possible to have 0 resp_num when we write a chunk. For this chunk, there is no inflight request, but it will have an item in the queue, which causes inconsistency between the number of queue items and inflight requests.

I think instead of using none to represent that a chunk is split into multiple requests, we'd better store (TruncateOffset, remaining_resp_num) as the queue item. Code will be like the following

if let Some((offset, remaining_resp_num)) = self.offset_queue.front_mut() {
    if *remaining_resp_num == 0 {
        return Ok(self.offset_queue.pop_front().unwrap().0);
    }
    while *remaining_resp_num > 0 {
        self.resp_stream.next().await...??;
        *remaining_resp_num -= 1;
    }
} else {
    return pending().await;
}

@@ -149,21 +155,21 @@ impl LogSinker for BigQueryLogSinker {
loop {
tokio::select!(
offset = self.bigquery_future_manager.wait_next_offset() => {
log_reader.truncate(offset?)?;
if let Some(offset) = offset?{
log_reader.truncate(offset)?;
}
}
item_result = log_reader.next_item(), if self.bigquery_future_manager.offset_queue.len() <= self.future_num => {
let (epoch, item) = item_result?;
match item {
LogStoreReadItem::StreamChunk { chunk_id, chunk } => {
self.writer.write_chunk(chunk).await?;
let resp_num = self.writer.write_chunk(chunk).await?;
self.bigquery_future_manager
.add_offset(TruncateOffset::Chunk { epoch, chunk_id })
.await;
.add_offset(TruncateOffset::Chunk { epoch, chunk_id },resp_num);
}
LogStoreReadItem::Barrier { .. } => {
self.bigquery_future_manager
.add_offset(TruncateOffset::Barrier { epoch })
.await;
.add_offset(TruncateOffset::Barrier { epoch },0);
}
LogStoreReadItem::UpdateVnodeBitmap(_) => {}
}
@@ -657,23 +663,41 @@ impl BigQuerySinkWriter {
Ok(serialized_rows)
}

async fn write_chunk(&mut self, chunk: StreamChunk) -> Result<()> {
async fn write_chunk(&mut self, chunk: StreamChunk) -> Result<usize> {
let serialized_rows = if self.is_append_only {
self.append_only(chunk)?
} else {
self.upsert(chunk)?
};
if serialized_rows.is_empty() {
return Ok(());
return Ok(0);
}
let rows = AppendRowsRequestRows::ProtoRows(ProtoData {
writer_schema: Some(self.writer_pb_schema.clone()),
rows: Some(ProtoRows { serialized_rows }),
});
self.client
.append_rows(rows, self.write_stream.clone())
.await?;
Ok(())
let mut result = Vec::new();
let mut result_inner = Vec::new();
let mut size_count = 0;
for i in serialized_rows{
size_count += i.len();
if size_count > MAX_ROW_SIZE {
result.push(result_inner);
result_inner = Vec::new();
size_count = i.len();
}
result_inner.push(i);
}
if !result_inner.is_empty(){
result.push(result_inner);
}
let len = result.len();
for serialized_rows in result{
let rows = AppendRowsRequestRows::ProtoRows(ProtoData {
writer_schema: Some(self.writer_pb_schema.clone()),
rows: Some(ProtoRows { serialized_rows }),
});
self.client
.append_rows(rows, self.write_stream.clone())
.await?;
}
Ok(len)
}
}

@@ -693,10 +717,23 @@ pub async fn resp_to_stream(
.map_err(|e| SinkError::BigQuery(e.into()))?
.into_inner();
loop {
resp_stream
if let Some(append_rows_response) = resp_stream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we reach the end of the resp_stream, we will keep yielding (). This is not correct. Instead of using if let Some(...), we'd better just resp_stream.message().await.ok_or_else(|| ...) to turn None into an end-of-stream error.

Copy link
Contributor Author

@xxhZs xxhZs Aug 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't return () at the end of the stream, after receiving every reply, if there are no errors, it returns a meaningless (),
The goal is to return that a message was received with no errors

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know the purpose of returning a meaningless (). What I meant was that, when resp_stream.message() returns None, we should break the loop with an EndOfStream error, or just simply break the loop, instead of ignoring it.

In current code, after resp_stream returns None, an () will still be yielded, and later when polled again, and resp_stream returns None again, and an () is still yielded, and this will be repeated endlessly over and over again, while the external code is not aware that the response stream has actually stopped.

.message()
.await
.map_err(|e| SinkError::BigQuery(e.into()))?;
.map_err(|e| SinkError::BigQuery(e.into()))?{
if !append_rows_response.row_errors.is_empty() {
return Err(SinkError::BigQuery(anyhow::anyhow!(
"bigquery insert error {:?}",
append_rows_response.row_errors
)));
}
if let Some(google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_response::Response::Error(status)) = append_rows_response.response{
return Err(SinkError::BigQuery(anyhow::anyhow!(
"bigquery insert error {:?}",
status
)));
}
}
yield ();
}
}