-
Notifications
You must be signed in to change notification settings - Fork 591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(Sink): Enhanced integration with StarRocks #14363
Changes from 4 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -68,6 +68,8 @@ pub struct StarrocksCommon { | |
/// The StarRocks table you want to sink data to. | ||
#[serde(rename = "starrocks.table")] | ||
pub table: String, | ||
#[serde(rename = "starrocks.partial_update")] | ||
pub partial_update: Option<String>, | ||
} | ||
|
||
#[serde_as] | ||
|
@@ -125,8 +127,8 @@ impl StarrocksSink { | |
starrocks_columns_desc: HashMap<String, String>, | ||
) -> Result<()> { | ||
let rw_fields_name = self.schema.fields(); | ||
if rw_fields_name.len().ne(&starrocks_columns_desc.len()) { | ||
return Err(SinkError::Starrocks("The length of the RisingWave column must be equal to the length of the starrocks column".to_string())); | ||
if rw_fields_name.len().gt(&starrocks_columns_desc.len()) { | ||
return Err(SinkError::Starrocks("The length of the RisingWave column must be equal or less to the length of the starrocks column".to_string())); | ||
} | ||
|
||
for i in rw_fields_name { | ||
|
@@ -178,7 +180,7 @@ impl StarrocksSink { | |
Ok(starrocks_data_type.contains("datetime")) | ||
} | ||
risingwave_common::types::DataType::Timestamptz => Err(SinkError::Starrocks( | ||
"starrocks can not support Timestamptz".to_string(), | ||
"TIMESTAMP WITH TIMEZONE is not supported for Starrocks sink as Starrocks doesn't store time values with timezone information. Please convert to TIMESTAMP first.".to_string(), | ||
)), | ||
risingwave_common::types::DataType::Interval => Err(SinkError::Starrocks( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please refine the error for INTERVAL and BYTEA as well. |
||
"starrocks can not support Interval".to_string(), | ||
|
@@ -193,9 +195,7 @@ impl StarrocksSink { | |
risingwave_common::types::DataType::Bytea => Err(SinkError::Starrocks( | ||
"starrocks can not support Bytea".to_string(), | ||
)), | ||
risingwave_common::types::DataType::Jsonb => Err(SinkError::Starrocks( | ||
"starrocks can not support import json".to_string(), | ||
)), | ||
risingwave_common::types::DataType::Jsonb => Ok(starrocks_data_type.contains("json")), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the starrocks's type name for json? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. JSON |
||
risingwave_common::types::DataType::Serial => { | ||
Ok(starrocks_data_type.contains("bigint")) | ||
} | ||
|
@@ -332,18 +332,18 @@ impl StarrocksSinkWriter { | |
decimal_map.insert(name.to_string(), (length, scale)); | ||
} | ||
} | ||
let mut fields_name = schema.names_str(); | ||
if !is_append_only { | ||
fields_name.push(STARROCKS_DELETE_SIGN); | ||
}; | ||
|
||
let builder = HeaderBuilder::new() | ||
let header = HeaderBuilder::new() | ||
.add_common_header() | ||
.set_user_password(config.common.user.clone(), config.common.password.clone()) | ||
.add_json_format(); | ||
let header = if !is_append_only { | ||
let mut fields_name = schema.names_str(); | ||
fields_name.push(STARROCKS_DELETE_SIGN); | ||
builder.set_columns_name(fields_name).build() | ||
} else { | ||
builder.build() | ||
}; | ||
.add_json_format() | ||
.set_partial_update(config.common.partial_update.clone()) | ||
.set_columns_name(fields_name) | ||
.build(); | ||
|
||
let starrocks_insert_builder = InserterInnerBuilder::new( | ||
format!("http://{}:{}", config.common.host, config.common.http_port), | ||
|
@@ -358,7 +358,7 @@ impl StarrocksSinkWriter { | |
inserter_innet_builder: starrocks_insert_builder, | ||
is_append_only, | ||
client: None, | ||
row_encoder: JsonEncoder::new_with_doris( | ||
row_encoder: JsonEncoder::new_with_starrocks( | ||
schema, | ||
None, | ||
TimestampHandlingMode::String, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use
>
instead of gt.If we do partial update, should we ensure that the starrrocks pk columns are all included in the sink columns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now it is time to calibrate whether the PK of rw and sr are completely consistent, and there is a tendency not to make any modifications. If it is changed to SR and rw not being completely consistent, it may lead to accuracy issues with the update. Our other sinks also follow a similar logic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean we should consider the case that the downstream pk of SR is changed after the sink is created? I don't think this is what we should consider, because we are not expected to adapt to downstream pk change yet, and users should be required not to change the pk.
This is not ture. For downstreams that really have pk (e.g. jdbc sink), we do check whether the pks specified in the with options match the downstream pk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm just saying that we have code to check PK, but our verification is an exact match, not a superset like other columns(rw.columns.len <= sr.columns.len)