Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Sink): Enhanced integration with StarRocks #14363

Merged
merged 5 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/connector/src/sink/doris.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl DorisSink {
Ok(doris_data_type.contains("DATETIME"))
}
risingwave_common::types::DataType::Timestamptz => Err(SinkError::Doris(
"doris can not support Timestamptz".to_string(),
"TIMESTAMP WITH TIMEZONE is not supported for Doris sink as Doris doesn't store time values with timezone information. Please convert to TIMESTAMP first.".to_string(),
)),
risingwave_common::types::DataType::Interval => Err(SinkError::Doris(
"doris can not support Interval".to_string(),
Expand Down
8 changes: 8 additions & 0 deletions src/connector/src/sink/doris_starrocks_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,14 @@ impl HeaderBuilder {
self
}

pub fn set_partial_update(mut self, partial_update: Option<String>) -> Self {
self.header.insert(
"partial_update".to_string(),
partial_update.unwrap_or_else(|| "false".to_string()),
);
self
}

pub fn build(self) -> HashMap<String, String> {
self.header
}
Expand Down
31 changes: 28 additions & 3 deletions src/connector/src/sink/encoder/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,24 @@ impl JsonEncoder {
}
}

pub fn new_with_starrocks(
schema: Schema,
col_indices: Option<Vec<usize>>,
timestamp_handling_mode: TimestampHandlingMode,
map: HashMap<String, (u8, u8)>,
) -> Self {
Self {
schema,
col_indices,
time_handling_mode: TimeHandlingMode::Milli,
date_handling_mode: DateHandlingMode::String,
timestamp_handling_mode,
timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
custom_json_type: CustomJsonType::StarRocks(map),
kafka_connect: None,
}
}

pub fn with_kafka_connect(self, kafka_connect: KafkaConnectParams) -> Self {
Self {
kafka_connect: Some(Arc::new(kafka_connect)),
Expand Down Expand Up @@ -203,7 +221,7 @@ fn datum_to_json_object(
json!(v)
}
(DataType::Decimal, ScalarRefImpl::Decimal(mut v)) => match custom_json_type {
CustomJsonType::Doris(map) => {
CustomJsonType::Doris(map) | CustomJsonType::StarRocks(map) => {
if !matches!(v, Decimal::Normalized(_)) {
return Err(ArrayError::internal(
"doris/starrocks can't support decimal Inf, -Inf, Nan".to_string(),
Expand Down Expand Up @@ -270,8 +288,10 @@ fn datum_to_json_object(
json!(v.as_iso_8601())
}
(DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => match custom_json_type {
CustomJsonType::Es => JsonbVal::from(jsonb_ref).take(),
CustomJsonType::Doris(_) | CustomJsonType::None => json!(jsonb_ref.to_string()),
CustomJsonType::Es | CustomJsonType::StarRocks(_) => JsonbVal::from(jsonb_ref).take(),
CustomJsonType::Doris(_) | CustomJsonType::None => {
json!(jsonb_ref.to_string())
}
},
(DataType::List(datatype), ScalarRefImpl::List(list_ref)) => {
let elems = list_ref.iter();
Expand Down Expand Up @@ -315,6 +335,11 @@ fn datum_to_json_object(
ArrayError::internal(format!("Json to string err{:?}", err))
})?)
}
CustomJsonType::StarRocks(_) => {
return Err(ArrayError::internal(
"starrocks can't support struct".to_string(),
));
}
CustomJsonType::Es | CustomJsonType::None => {
let mut map = Map::with_capacity(st.len());
for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug(
Expand Down
2 changes: 2 additions & 0 deletions src/connector/src/sink/encoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ pub enum CustomJsonType {
Doris(HashMap<String, (u8, u8)>),
// Es's json need jsonb is struct
Es,
// starrocks' need jsonb is struct
StarRocks(HashMap<String, (u8, u8)>),
None,
}

Expand Down
32 changes: 16 additions & 16 deletions src/connector/src/sink/starrocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ pub struct StarrocksCommon {
/// The StarRocks table you want to sink data to.
#[serde(rename = "starrocks.table")]
pub table: String,
#[serde(rename = "starrocks.partial_update")]
pub partial_update: Option<String>,
}

#[serde_as]
Expand Down Expand Up @@ -125,8 +127,8 @@ impl StarrocksSink {
starrocks_columns_desc: HashMap<String, String>,
) -> Result<()> {
let rw_fields_name = self.schema.fields();
if rw_fields_name.len().ne(&starrocks_columns_desc.len()) {
return Err(SinkError::Starrocks("The length of the RisingWave column must be equal to the length of the starrocks column".to_string()));
if rw_fields_name.len().gt(&starrocks_columns_desc.len()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use > instead of gt.

If we do partial update, should we ensure that the starrrocks pk columns are all included in the sink columns?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now it is time to calibrate whether the PK of rw and sr are completely consistent, and there is a tendency not to make any modifications. If it is changed to SR and rw not being completely consistent, it may lead to accuracy issues with the update. Our other sinks also follow a similar logic

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean we should consider the case that the downstream pk of SR is changed after the sink is created? I don't think this is what we should consider, because we are not expected to adapt to downstream pk change yet, and users should be required not to change the pk.

Our other sinks also follow a similar logic

This is not ture. For downstreams that really have pk (e.g. jdbc sink), we do check whether the pks specified in the with options match the downstream pk.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just saying that we have code to check PK, but our verification is an exact match, not a superset like other columns(rw.columns.len <= sr.columns.len)

return Err(SinkError::Starrocks("The length of the RisingWave column must be equal or less to the length of the starrocks column".to_string()));
}

for i in rw_fields_name {
Expand Down Expand Up @@ -178,7 +180,7 @@ impl StarrocksSink {
Ok(starrocks_data_type.contains("datetime"))
}
risingwave_common::types::DataType::Timestamptz => Err(SinkError::Starrocks(
"starrocks can not support Timestamptz".to_string(),
"TIMESTAMP WITH TIMEZONE is not supported for Starrocks sink as Starrocks doesn't store time values with timezone information. Please convert to TIMESTAMP first.".to_string(),
)),
risingwave_common::types::DataType::Interval => Err(SinkError::Starrocks(
Copy link
Contributor

@neverchanje neverchanje Feb 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please refine the error for INTERVAL and BYTEA as well.

"starrocks can not support Interval".to_string(),
Expand All @@ -193,9 +195,7 @@ impl StarrocksSink {
risingwave_common::types::DataType::Bytea => Err(SinkError::Starrocks(
"starrocks can not support Bytea".to_string(),
)),
risingwave_common::types::DataType::Jsonb => Err(SinkError::Starrocks(
"starrocks can not support import json".to_string(),
)),
risingwave_common::types::DataType::Jsonb => Ok(starrocks_data_type.contains("json")),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the starrocks's type name for json?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JSON

risingwave_common::types::DataType::Serial => {
Ok(starrocks_data_type.contains("bigint"))
}
Expand Down Expand Up @@ -332,18 +332,18 @@ impl StarrocksSinkWriter {
decimal_map.insert(name.to_string(), (length, scale));
}
}
let mut fields_name = schema.names_str();
if !is_append_only {
fields_name.push(STARROCKS_DELETE_SIGN);
};

let builder = HeaderBuilder::new()
let header = HeaderBuilder::new()
.add_common_header()
.set_user_password(config.common.user.clone(), config.common.password.clone())
.add_json_format();
let header = if !is_append_only {
let mut fields_name = schema.names_str();
fields_name.push(STARROCKS_DELETE_SIGN);
builder.set_columns_name(fields_name).build()
} else {
builder.build()
};
.add_json_format()
.set_partial_update(config.common.partial_update.clone())
.set_columns_name(fields_name)
.build();

let starrocks_insert_builder = InserterInnerBuilder::new(
format!("http://{}:{}", config.common.host, config.common.http_port),
Expand All @@ -358,7 +358,7 @@ impl StarrocksSinkWriter {
inserter_innet_builder: starrocks_insert_builder,
is_append_only,
client: None,
row_encoder: JsonEncoder::new_with_doris(
row_encoder: JsonEncoder::new_with_starrocks(
schema,
None,
TimestampHandlingMode::String,
Expand Down
Loading