Skip to content

Commit

Permalink
feat(Sink): Enhanced integration with StarRocks (#14363) (#15555)
Browse files Browse the repository at this point in the history
Co-authored-by: Xinhao Xu <[email protected]>
  • Loading branch information
github-actions[bot] and xxhZs authored Mar 8, 2024
1 parent f661fbe commit 423b0d9
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 20 deletions.
2 changes: 1 addition & 1 deletion src/connector/src/sink/doris.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl DorisSink {
Ok(doris_data_type.contains("DATETIME"))
}
risingwave_common::types::DataType::Timestamptz => Err(SinkError::Doris(
"doris can not support Timestamptz".to_string(),
"TIMESTAMP WITH TIMEZONE is not supported for Doris sink as Doris doesn't store time values with timezone information. Please convert to TIMESTAMP first.".to_string(),
)),
risingwave_common::types::DataType::Interval => Err(SinkError::Doris(
"doris can not support Interval".to_string(),
Expand Down
8 changes: 8 additions & 0 deletions src/connector/src/sink/doris_starrocks_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,14 @@ impl HeaderBuilder {
self
}

pub fn set_partial_update(mut self, partial_update: Option<String>) -> Self {
self.header.insert(
"partial_update".to_string(),
partial_update.unwrap_or_else(|| "false".to_string()),
);
self
}

pub fn build(self) -> HashMap<String, String> {
self.header
}
Expand Down
31 changes: 28 additions & 3 deletions src/connector/src/sink/encoder/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,24 @@ impl JsonEncoder {
}
}

pub fn new_with_starrocks(
schema: Schema,
col_indices: Option<Vec<usize>>,
timestamp_handling_mode: TimestampHandlingMode,
map: HashMap<String, (u8, u8)>,
) -> Self {
Self {
schema,
col_indices,
time_handling_mode: TimeHandlingMode::Milli,
date_handling_mode: DateHandlingMode::String,
timestamp_handling_mode,
timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
custom_json_type: CustomJsonType::StarRocks(map),
kafka_connect: None,
}
}

pub fn with_kafka_connect(self, kafka_connect: KafkaConnectParams) -> Self {
Self {
kafka_connect: Some(Arc::new(kafka_connect)),
Expand Down Expand Up @@ -203,7 +221,7 @@ fn datum_to_json_object(
json!(v)
}
(DataType::Decimal, ScalarRefImpl::Decimal(mut v)) => match custom_json_type {
CustomJsonType::Doris(map) => {
CustomJsonType::Doris(map) | CustomJsonType::StarRocks(map) => {
if !matches!(v, Decimal::Normalized(_)) {
return Err(ArrayError::internal(
"doris/starrocks can't support decimal Inf, -Inf, Nan".to_string(),
Expand Down Expand Up @@ -270,8 +288,10 @@ fn datum_to_json_object(
json!(v.as_iso_8601())
}
(DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => match custom_json_type {
CustomJsonType::Es => JsonbVal::from(jsonb_ref).take(),
CustomJsonType::Doris(_) | CustomJsonType::None => json!(jsonb_ref.to_string()),
CustomJsonType::Es | CustomJsonType::StarRocks(_) => JsonbVal::from(jsonb_ref).take(),
CustomJsonType::Doris(_) | CustomJsonType::None => {
json!(jsonb_ref.to_string())
}
},
(DataType::List(datatype), ScalarRefImpl::List(list_ref)) => {
let elems = list_ref.iter();
Expand Down Expand Up @@ -315,6 +335,11 @@ fn datum_to_json_object(
ArrayError::internal(format!("Json to string err{:?}", err))
})?)
}
CustomJsonType::StarRocks(_) => {
return Err(ArrayError::internal(
"starrocks can't support struct".to_string(),
));
}
CustomJsonType::Es | CustomJsonType::None => {
let mut map = Map::with_capacity(st.len());
for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug(
Expand Down
2 changes: 2 additions & 0 deletions src/connector/src/sink/encoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ pub enum CustomJsonType {
Doris(HashMap<String, (u8, u8)>),
// Es's json need jsonb is struct
Es,
// starrocks' need jsonb is struct
StarRocks(HashMap<String, (u8, u8)>),
None,
}

Expand Down
32 changes: 16 additions & 16 deletions src/connector/src/sink/starrocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ pub struct StarrocksCommon {
/// The StarRocks table you want to sink data to.
#[serde(rename = "starrocks.table")]
pub table: String,
#[serde(rename = "starrocks.partial_update")]
pub partial_update: Option<String>,
}

#[serde_as]
Expand Down Expand Up @@ -125,8 +127,8 @@ impl StarrocksSink {
starrocks_columns_desc: HashMap<String, String>,
) -> Result<()> {
let rw_fields_name = self.schema.fields();
if rw_fields_name.len().ne(&starrocks_columns_desc.len()) {
return Err(SinkError::Starrocks("The length of the RisingWave column must be equal to the length of the starrocks column".to_string()));
if rw_fields_name.len() > starrocks_columns_desc.len() {
return Err(SinkError::Starrocks("The length of the RisingWave column must be equal or less to the length of the starrocks column".to_string()));
}

for i in rw_fields_name {
Expand Down Expand Up @@ -178,7 +180,7 @@ impl StarrocksSink {
Ok(starrocks_data_type.contains("datetime"))
}
risingwave_common::types::DataType::Timestamptz => Err(SinkError::Starrocks(
"starrocks can not support Timestamptz".to_string(),
"TIMESTAMP WITH TIMEZONE is not supported for Starrocks sink as Starrocks doesn't store time values with timezone information. Please convert to TIMESTAMP first.".to_string(),
)),
risingwave_common::types::DataType::Interval => Err(SinkError::Starrocks(
"INTERVAL is not supported for Starrocks sink. Please convert to VARCHAR or other supported types.".to_string(),
Expand All @@ -193,9 +195,7 @@ impl StarrocksSink {
risingwave_common::types::DataType::Bytea => Err(SinkError::Starrocks(
"BYTEA is not supported for Starrocks sink. Please convert to VARCHAR or other supported types.".to_string(),
)),
risingwave_common::types::DataType::Jsonb => Err(SinkError::Starrocks(
"starrocks can not support import json".to_string(),
)),
risingwave_common::types::DataType::Jsonb => Ok(starrocks_data_type.contains("json")),
risingwave_common::types::DataType::Serial => {
Ok(starrocks_data_type.contains("bigint"))
}
Expand Down Expand Up @@ -332,18 +332,18 @@ impl StarrocksSinkWriter {
decimal_map.insert(name.to_string(), (length, scale));
}
}
let mut fields_name = schema.names_str();
if !is_append_only {
fields_name.push(STARROCKS_DELETE_SIGN);
};

let builder = HeaderBuilder::new()
let header = HeaderBuilder::new()
.add_common_header()
.set_user_password(config.common.user.clone(), config.common.password.clone())
.add_json_format();
let header = if !is_append_only {
let mut fields_name = schema.names_str();
fields_name.push(STARROCKS_DELETE_SIGN);
builder.set_columns_name(fields_name).build()
} else {
builder.build()
};
.add_json_format()
.set_partial_update(config.common.partial_update.clone())
.set_columns_name(fields_name)
.build();

let starrocks_insert_builder = InserterInnerBuilder::new(
format!("http://{}:{}", config.common.host, config.common.http_port),
Expand All @@ -358,7 +358,7 @@ impl StarrocksSinkWriter {
inserter_innet_builder: starrocks_insert_builder,
is_append_only,
client: None,
row_encoder: JsonEncoder::new_with_doris(
row_encoder: JsonEncoder::new_with_starrocks(
schema,
None,
TimestampHandlingMode::String,
Expand Down
3 changes: 3 additions & 0 deletions src/connector/with_options_sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,9 @@ StarrocksConfig:
field_type: String
comments: The StarRocks table you want to sink data to.
required: true
- name: starrocks.partial_update
field_type: String
required: false
- name: r#type
field_type: String
required: true

0 comments on commit 423b0d9

Please sign in to comment.