-
Notifications
You must be signed in to change notification settings - Fork 598
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sink): support bigquery sink upsert #15780
Conversation
5cc2579
to
2b2402e
Compare
@@ -364,18 +409,61 @@ fn encode_field<D: MaybeData>( | |||
Ok(Value::Message(message.transcode_to_dynamic())) | |||
})? | |||
} | |||
(false, Kind::String) if is_big_query => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that the newly added custom_proto_type
is only used to generate the is_big_query
flag, and the flag is only used to control the logic when seeing different datatypes. If so, instead of adding this new parameter, we can just add new methods like on_timestamptz
, on_jsonb
... to the MaybeData
trait and then call the corresponding trait methods here. And then for bigquery we can implement its own MaybeData
with new customized logic and then pass it to the encoder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's rw and proto type match , bigquery has some special matches (many of which are converted to string) that require is_big_query
to determine whether the match holds or not
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The newly added logic does not look like specially for bigquery. It's more like an extension of the original type compatibility and can be generalized for proto encoding used in sinks other than bigquery (cc @xiangjinwu ).
If so, we can remove the is_big_query
flag and custom_proto_type
and make it a general way of processing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree it can be more general. It is better to introduce a TimestamptzHandlingMode
and bigquery can just select one of the string formats, similar to the json encoder.
However it may not be another implementation of MaybeData
(for bigquery). That trait is only meant to be implemented twice: once with type info alone (for validation), and once with concrete datum (for encoding). Given that we want to affect both validation and encoding here, it is supposed to be in encode_field
here.
@@ -364,18 +409,61 @@ fn encode_field<D: MaybeData>( | |||
Ok(Value::Message(message.transcode_to_dynamic())) | |||
})? | |||
} | |||
(false, Kind::String) if is_big_query => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The newly added logic does not look like specially for bigquery. It's more like an extension of the original type compatibility and can be generalized for proto encoding used in sinks other than bigquery (cc @xiangjinwu ).
If so, we can remove the is_big_query
flag and custom_proto_type
and make it a general way of processing.
2b2402e
to
eaba3b6
Compare
* Group C: experimental */ | ||
}, | ||
DataType::Int16 => match (expect_list, proto_field.kind()) { | ||
(false, Kind::Int64) => maybe.on_base(|s| Ok(Value::I64(s.into_int16() as i64)))?, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May also add support for casting to Int32
and Int16
by the way.
src/connector/src/sink/big_query.rs
Outdated
message_descriptor, | ||
writer_pb_schema: ProtoSchema { | ||
proto_descriptor: Some(descriptor_proto), | ||
}, | ||
}) | ||
} | ||
|
||
async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need this special append_only
method? It is only a subset of upsert
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One difference is that append-only does not add the CHANGE_TYPE column
d112c89
to
11f2ffb
Compare
} | ||
|
||
async fn get_auth_json_from_path(&self, aws_auth_props: &AwsAuthProps) -> Result<String> { | ||
if let Some(local_path) = &self.local_path { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe not related to this PR: can we possibly not use separate local_path
and s3_path
and use a single option path
? The path type can be distinguished by the path prefix such as file://
and s3://
?
Besides, bigquery is on google cloud platform while users should upload the auth file to S3. It looks strange to me. Does gcp have S3 compatible object store?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed, references to s3 or aws in general always confused me. Is there a gcp alternative?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, this file is necessary to connect to gcp, so it can't be saved at gcp, another alternative is to import the json from this file by means of the string
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May provide an extra option for users to include the raw auth string in the options.
Can do it in a separate PR since the current PR is already large and this feature is independent to the current PR.
fmt
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM.
} | ||
|
||
async fn get_auth_json_from_path(&self, aws_auth_props: &AwsAuthProps) -> Result<String> { | ||
if let Some(local_path) = &self.local_path { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May provide an extra option for users to include the raw auth string in the options.
Can do it in a separate PR since the current PR is already large and this feature is independent to the current PR.
checksum = "c48abc8687f4c4cc143dd5bd3da5f1d7ef38334e4af5cef6de4c39295c6a3fd0" | ||
dependencies = [ | ||
"anyhow", | ||
"arrow 50.0.0", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We introduce full arrow 50 dependencies here. We should verify whether it will greatly affect the compile time of DEBUG and RELEASE build. If yes, we may need to consider how to avoid this dependency given that we technically don't use arrow
in google-cloud-bigquery
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test with my computer
branch | time | target size | build num |
---|---|---|---|
release main | 27m 24s | 27173092317 | 1471 |
release current pr | 28m 14s | 27275146355 | 1476 |
debug main | 4m 31s | 48776880726 | 1413 |
debug current pr | 4m 51s | 48920738495 | 1418 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The result looks good. Thanks for the efforts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
use gcp_bigquery_client::Client; | ||
use google_cloud_bigquery::grpc::apiv1::bigquery_client::StreamingWriteClient; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we using 2 bigquery clients? Can we only use one? (The former gcp_bigquery_client
is quite old now)
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
#14882
refactor bigquery with storage write api. And support bigquery sink upsert
https://cloud.google.com/bigquery/docs/write-api
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
In bigquery sink, Support upsert,
Users need to set corresponding permissions and pk based on the document in bigquery
https://cloud.google.com/bigquery/docs/change-data-capture?hl=zh-cn