-
Notifications
You must be signed in to change notification settings - Fork 591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(Sink): Enhanced integration with StarRocks #14363
Conversation
3c3d779
to
1918744
Compare
e083e86
to
4afb4c4
Compare
I wonder if we are using the same API for StarRocks and Doris? It seems we tried to differentiate them but nothing is really different. |
BTW, Would you mind sharing the behavior of 'partial_update'? I am not sure if it's an option that should be documented |
Our Doris and Starlocks use the same HTTP API, but the related configurations are different. |
https://docs.starrocks.io/docs/loading/Load_to_Primary_Key_tables/#partial-updates It seems that only Starrocks supports it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM But I am probably not appropriate for approving. Please ensure another code owner approves as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM. If this PR is urgent, we can have it merged first, but later we should add some integration tests in CI.
risingwave_common::types::DataType::Jsonb => Err(SinkError::Starrocks( | ||
"starrocks can not support import json".to_string(), | ||
)), | ||
risingwave_common::types::DataType::Jsonb => Ok(starrocks_data_type.contains("json")), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the starrocks's type name for json?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JSON
src/connector/src/sink/starrocks.rs
Outdated
@@ -125,8 +127,8 @@ impl StarrocksSink { | |||
starrocks_columns_desc: HashMap<String, String>, | |||
) -> Result<()> { | |||
let rw_fields_name = self.schema.fields(); | |||
if rw_fields_name.len().ne(&starrocks_columns_desc.len()) { | |||
return Err(SinkError::Starrocks("The length of the RisingWave column must be equal to the length of the starrocks column".to_string())); | |||
if rw_fields_name.len().gt(&starrocks_columns_desc.len()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use >
instead of gt.
If we do partial update, should we ensure that the starrrocks pk columns are all included in the sink columns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now it is time to calibrate whether the PK of rw and sr are completely consistent, and there is a tendency not to make any modifications. If it is changed to SR and rw not being completely consistent, it may lead to accuracy issues with the update. Our other sinks also follow a similar logic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean we should consider the case that the downstream pk of SR is changed after the sink is created? I don't think this is what we should consider, because we are not expected to adapt to downstream pk change yet, and users should be required not to change the pk.
Our other sinks also follow a similar logic
This is not ture. For downstreams that really have pk (e.g. jdbc sink), we do check whether the pks specified in the with options match the downstream pk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm just saying that we have code to check PK, but our verification is an exact match, not a superset like other columns(rw.columns.len <= sr.columns.len)
@@ -178,7 +180,7 @@ impl StarrocksSink { | |||
Ok(starrocks_data_type.contains("datetime")) | |||
} | |||
risingwave_common::types::DataType::Timestamptz => Err(SinkError::Starrocks( | |||
"starrocks can not support Timestamptz".to_string(), | |||
"TIMESTAMP WITH TIMEZONE is not supported for Starrocks sink as Starrocks doesn't store time values with timezone information. Please convert to TIMESTAMP first.".to_string(), | |||
)), | |||
risingwave_common::types::DataType::Interval => Err(SinkError::Starrocks( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please refine the error for INTERVAL and BYTEA as well.
986cc5e
to
84544b9
Compare
Co-authored-by: Xinhao Xu <[email protected]>
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Add starrocks sink partial_update with option
And suport this issue #14822
Checklist
./risedev check
(or alias,./risedev c
)Documentation
A new option:
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.