From 80e6391eb9d5f70ee24d41c81ea87efbf1f19e79 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Tue, 26 Sep 2023 17:26:17 +0800 Subject: [PATCH 1/8] add --- src/connector/src/sink/doris_connector.rs | 76 ++++++++++++++++------- 1 file changed, 55 insertions(+), 21 deletions(-) diff --git a/src/connector/src/sink/doris_connector.rs b/src/connector/src/sink/doris_connector.rs index 116cd91d86542..1ba59315dc6e0 100644 --- a/src/connector/src/sink/doris_connector.rs +++ b/src/connector/src/sink/doris_connector.rs @@ -171,7 +171,7 @@ impl DorisInsertClient { .map_err(|err| SinkError::Http(err.into()))?; let feature = client.request(request); - let handle: JoinHandle> = tokio::spawn(async move { + let handle: JoinHandle> = tokio::spawn(async move { let response = feature.await.map_err(|err| SinkError::Http(err.into()))?; let status = response.status(); let raw_string = String::from_utf8( @@ -185,6 +185,7 @@ impl DorisInsertClient { if status == StatusCode::OK && !raw_string.is_empty() { let response: DorisInsertResultResponse = serde_json::from_str(&raw_string).map_err(|err| SinkError::Http(err.into()))?; + let response = InsertResultResponse::Doris(response); Ok(response) } else { Err(SinkError::Http(anyhow::anyhow!( @@ -200,18 +201,55 @@ impl DorisInsertClient { } pub struct DorisInsert { - sender: Option, - join_handle: Option>>, - buffer: BytesMut, + insert:Inserter, is_first_record: bool, } impl DorisInsert { - pub fn new(sender: Sender, join_handle: JoinHandle>) -> Self { + pub fn new(sender: Sender, join_handle: JoinHandle>) -> Self { + Self { insert: Inserter::new(sender, join_handle), is_first_record: true } + } + + pub async fn write(&mut self, data: Bytes) -> Result<()> { + let mut data_build = BytesMut::new(); + if self.is_first_record { + self.is_first_record = false; + } else { + data_build.put_slice("\n".as_bytes().into()); + } + data_build.put_slice(&data); + self.insert.write(data_build.into()).await?; + Ok(()) + } + + pub async fn finish(self) -> Result { + let res = match self.insert.finish().await?{ + InsertResultResponse::Doris(doris_res) => doris_res, + InsertResultResponse::Starrocks(_) => return Err(SinkError::Http(anyhow::anyhow!( + "Response is not doris"))), + }; + + if !DORIS_SUCCESS_STATUS.contains(&res.status.as_str()) { + return Err(SinkError::Http(anyhow::anyhow!( + "Insert error: {:?}, error url: {:?}", + res.message, + res.err_url + ))); + }; + Ok(res) + } +} + +struct Inserter{ + sender: Option, + join_handle: Option>>, + buffer: BytesMut, +} +impl Inserter{ + pub fn new(sender: Sender, join_handle: JoinHandle>) -> Self { Self { sender: Some(sender), join_handle: Some(join_handle), - buffer: BytesMut::with_capacity(BUFFER_SIZE), - is_first_record: true, + buffer: BytesMut::with_capacity(BUFFER_SIZE) } } @@ -252,11 +290,6 @@ impl DorisInsert { } pub async fn write(&mut self, data: Bytes) -> Result<()> { - if self.is_first_record { - self.is_first_record = false; - } else { - self.buffer.put_slice("\n".as_bytes()); - } self.buffer.put_slice(&data); if self.buffer.len() >= MIN_CHUNK_SIZE { self.send_chunk().await?; @@ -264,7 +297,7 @@ impl DorisInsert { Ok(()) } - async fn wait_handle(&mut self) -> Result { + async fn wait_handle(&mut self) -> Result { let res = match tokio::time::timeout(WAIT_HANDDLE_TIMEOUT, self.join_handle.as_mut().unwrap()) .await @@ -272,17 +305,10 @@ impl DorisInsert { Ok(res) => res.map_err(|err| SinkError::Http(err.into()))??, Err(err) => return Err(SinkError::Http(err.into())), }; - if !DORIS_SUCCESS_STATUS.contains(&res.status.as_str()) { - return Err(SinkError::Http(anyhow::anyhow!( - "Insert error: {:?}, error url: {:?}", - res.message, - res.err_url - ))); - }; Ok(res) } - pub async fn finish(mut self) -> Result { + pub async fn finish(mut self) -> Result { if !self.buffer.is_empty() { self.send_chunk().await?; } @@ -384,6 +410,10 @@ impl DorisField { } } +pub enum InsertResultResponse{ + Doris(DorisInsertResultResponse), + Starrocks(StarrocksInsertResultResponse), +} #[derive(Debug, Serialize, Deserialize)] pub struct DorisInsertResultResponse { #[serde(rename = "TxnId")] @@ -421,3 +451,7 @@ pub struct DorisInsertResultResponse { #[serde(rename = "ErrorURL")] err_url: Option, } + +#[derive(Debug, Serialize, Deserialize)] +pub struct StarrocksInsertResultResponse { +} \ No newline at end of file From 68126b3ad8a7a0001bfac89eecdcddf388ab2ee3 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Sun, 1 Oct 2023 02:05:46 +0800 Subject: [PATCH 2/8] save --- Cargo.lock | 34 +-- src/connector/Cargo.toml | 1 + src/connector/src/sink/doris.rs | 37 ++-- src/connector/src/sink/doris_connector.rs | 253 ++++++++++++++++++---- src/connector/src/sink/starrocks.rs | 0 5 files changed, 242 insertions(+), 83 deletions(-) create mode 100644 src/connector/src/sink/starrocks.rs diff --git a/Cargo.lock b/Cargo.lock index 3cbeb7ef0c52b..fe4b5d76082e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2828,13 +2828,12 @@ dependencies = [ [[package]] name = "flume" -version = "0.10.14" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577" +checksum = "55ac459de2512911e4b674ce33cf20befaba382d05b62b008afc1c8b57cbf181" dependencies = [ "futures-core", "futures-sink", - "pin-project", "spin 0.9.8", ] @@ -6934,6 +6933,7 @@ dependencies = [ "serde_json", "serde_with 3.3.0", "simd-json", + "sqlx", "tempfile", "thiserror", "time", @@ -8650,9 +8650,9 @@ dependencies = [ [[package]] name = "sqlx" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e58421b6bc416714d5115a2ca953718f6c621a51b68e4f4922aea5a4391a721" +checksum = "0e50c216e3624ec8e7ecd14c6a6a6370aad6ee5d8cfc3ab30b5162eeeef2ed33" dependencies = [ "sqlx-core", "sqlx-macros", @@ -8663,9 +8663,9 @@ dependencies = [ [[package]] name = "sqlx-core" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd4cef4251aabbae751a3710927945901ee1d97ee96d757f6880ebb9a79bfd53" +checksum = "8d6753e460c998bbd4cd8c6f0ed9a64346fcca0723d6e75e52fdc351c5d2169d" dependencies = [ "ahash 0.8.3", "atoi", @@ -8704,9 +8704,9 @@ dependencies = [ [[package]] name = "sqlx-macros" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "208e3165167afd7f3881b16c1ef3f2af69fa75980897aac8874a0696516d12c2" +checksum = "9a793bb3ba331ec8359c1853bd39eed32cdd7baaf22c35ccf5c92a7e8d1189ec" dependencies = [ "proc-macro2", "quote", @@ -8717,9 +8717,9 @@ dependencies = [ [[package]] name = "sqlx-macros-core" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a4a8336d278c62231d87f24e8a7a74898156e34c1c18942857be2acb29c7dfc" +checksum = "0a4ee1e104e00dedb6aa5ffdd1343107b0a4702e862a84320ee7cc74782d96fc" dependencies = [ "dotenvy", "either", @@ -8743,9 +8743,9 @@ dependencies = [ [[package]] name = "sqlx-mysql" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ca69bf415b93b60b80dc8fda3cb4ef52b2336614d8da2de5456cc942a110482" +checksum = "864b869fdf56263f4c95c45483191ea0af340f9f3e3e7b4d57a61c7c87a970db" dependencies = [ "atoi", "base64 0.21.3", @@ -8786,9 +8786,9 @@ dependencies = [ [[package]] name = "sqlx-postgres" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0db2df1b8731c3651e204629dd55e52adbae0462fa1bdcbed56a2302c18181e" +checksum = "eb7ae0e6a97fb3ba33b23ac2671a5ce6e3cabe003f451abd5a56e7951d975624" dependencies = [ "atoi", "base64 0.21.3", @@ -8826,9 +8826,9 @@ dependencies = [ [[package]] name = "sqlx-sqlite" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be4c21bf34c7cae5b283efb3ac1bcc7670df7561124dc2f8bdc0b59be40f79a2" +checksum = "d59dc83cf45d89c555a577694534fcd1b55c545a816c816ce51f20bbe56a4f3f" dependencies = [ "atoi", "chrono", diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index bca5acedbba40..b2eab585ecb71 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -70,6 +70,7 @@ mysql_common = { version = "0.30", default-features = false, features = [ nexmark = { version = "0.2", features = ["serde"] } nkeys = "0.3.2" num-bigint = "0.4" +sqlx = { version = "0.7.2", features = ["mysql"] } opendal = "0.39" parking_lot = "0.12" paste = "1" diff --git a/src/connector/src/sink/doris.rs b/src/connector/src/sink/doris.rs index c19365fcc51c4..46c8f3687ce27 100644 --- a/src/connector/src/sink/doris.rs +++ b/src/connector/src/sink/doris.rs @@ -25,7 +25,7 @@ use serde::Deserialize; use serde_json::Value; use serde_with::serde_as; -use super::doris_connector::{DorisField, DorisInsert, DorisInsertClient, DORIS_DELETE_SIGN}; +use super::doris_connector::{DorisField, DorisInsert, DORIS_DELETE_SIGN, InserterBuilder, HeaderBuilder}; use super::utils::doris_rows_to_json; use super::{SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; use crate::common::DorisCommon; @@ -189,7 +189,7 @@ pub struct DorisSinkWriter { pub config: DorisConfig, schema: Schema, pk_indices: Vec, - client: DorisInsertClient, + builder: InserterBuilder, is_append_only: bool, insert: Option, decimal_map: HashMap, @@ -213,28 +213,29 @@ impl DorisSinkWriter { decimal_map.insert(s.name.clone(), v); } }); - let mut map = HashMap::new(); - map.insert("format".to_string(), "json".to_string()); - map.insert("read_json_by_line".to_string(), "true".to_string()); - let doris_insert_client = DorisInsertClient::new( - config.common.url.clone(), - config.common.database.clone(), - config.common.table.clone(), - ) - .add_common_header() + + let header = HeaderBuilder::new().add_common_header() .set_user_password(config.common.user.clone(), config.common.password.clone()) - .set_properties(map); - let mut doris_insert_client = if !is_append_only { - doris_insert_client.add_hidden_column() + .add_json_format() + .add_read_json_by_line(); + let header = if !is_append_only { + header.add_hidden_column().build() } else { - doris_insert_client + header.build() }; - let insert = Some(doris_insert_client.build().await?); + + let mut doris_insert_builder = InserterBuilder::new( + config.common.url.clone(), + config.common.database.clone(), + config.common.table.clone(), + header, + ); + let insert = Some(doris_insert_builder.build_doris().await?); Ok(Self { config, schema, pk_indices, - client: doris_insert_client, + builder:doris_insert_builder, is_append_only, insert, decimal_map, @@ -323,7 +324,7 @@ impl DorisSinkWriter { impl SinkWriter for DorisSinkWriter { async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { if self.insert.is_none() { - self.insert = Some(self.client.build().await?); + self.insert = Some(self.builder.build_doris().await?); } if self.is_append_only { self.append_only(chunk).await diff --git a/src/connector/src/sink/doris_connector.rs b/src/connector/src/sink/doris_connector.rs index 1ba59315dc6e0..ff03a7f2e5370 100644 --- a/src/connector/src/sink/doris_connector.rs +++ b/src/connector/src/sink/doris_connector.rs @@ -26,6 +26,7 @@ use hyper::{body, Client, Request, StatusCode}; use hyper_tls::HttpsConnector; use serde::{Deserialize, Serialize}; use serde_json::Value; +use sqlx::{MySqlPool, Row}; use tokio::task::JoinHandle; use super::{Result, SinkError}; @@ -37,26 +38,17 @@ pub(crate) const DORIS_DELETE_SIGN: &str = "__DORIS_DELETE_SIGN__"; const SEND_CHUNK_TIMEOUT: Duration = Duration::from_secs(10); const WAIT_HANDDLE_TIMEOUT: Duration = Duration::from_secs(10); const POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(30); -pub struct DorisInsertClient { - url: String, +const DORIS:&str = "doris"; +const STARROCKS:&str = "starrocks"; +pub struct HeaderBuilder { header: HashMap, - sender: Option, } -impl DorisInsertClient { - pub fn new(url: String, db: String, table: String) -> Self { - let url = format!("{}/api/{}/{}/_stream_load", url, db, table); +impl HeaderBuilder{ + pub fn new() -> Self { Self { - url, header: HashMap::default(), - sender: None, } } - - pub fn set_url(mut self, url: String) -> Self { - self.url = url; - self - } - pub fn add_common_header(mut self) -> Self { self.header .insert("expect".to_string(), "100-continue".to_string()); @@ -78,6 +70,7 @@ impl DorisInsertClient { } /// The method is temporarily not in use, reserved for later use in 2PC. + /// Only use in Doris pub fn enable_2_pc(mut self) -> Self { self.header .insert("two_phase_commit".to_string(), "true".to_string()); @@ -94,6 +87,7 @@ impl DorisInsertClient { } /// The method is temporarily not in use, reserved for later use in 2PC. + /// Only use in Doris pub fn set_txn_id(mut self, txn_id: i64) -> Self { self.header .insert("txn_operation".to_string(), txn_id.to_string()); @@ -101,6 +95,7 @@ impl DorisInsertClient { } /// The method is temporarily not in use, reserved for later use in 2PC. + /// Only use in Doris pub fn add_commit(mut self) -> Self { self.header .insert("txn_operation".to_string(), "commit".to_string()); @@ -108,18 +103,54 @@ impl DorisInsertClient { } /// The method is temporarily not in use, reserved for later use in 2PC. + /// Only use in Doris pub fn add_abort(mut self) -> Self { self.header .insert("txn_operation".to_string(), "abort".to_string()); self } - /// This method is used to add custom message headers, such as the data import format. - pub fn set_properties(mut self, properties: HashMap) -> Self { - self.header.extend(properties); + pub fn add_json_format(mut self) -> Self { + self.header + .insert("format".to_string(), "json".to_string()); + self + } + + /// Only use in Doris + pub fn add_read_json_by_line(mut self) -> Self { + self.header + .insert("read_json_by_line".to_string(), "true".to_string()); + self + } + + /// Only use in Starrocks + pub fn add_strip_outer_array(mut self) -> Self { + self.header + .insert("strip_outer_array".to_string(), "true".to_string()); self } + pub fn build(self) -> HashMap{ + self.header + } +} + +pub struct InserterBuilder { + url: String, + header: HashMap, + sender: Option, +} +impl InserterBuilder { + pub fn new(url: String, db: String, table: String,header: HashMap) -> Self { + let url = format!("{}/api/{}/{}/_stream_load", url, db, table); + + Self { + url, + sender: None, + header, + } + } + fn build_request_and_client( &self, uri: String, @@ -137,34 +168,8 @@ impl DorisInsertClient { (builder, client) } - pub async fn build(&mut self) -> Result { - let (builder, client) = self.build_request_and_client(self.url.clone()); - - let request_get_url = builder - .body(Body::empty()) - .map_err(|err| SinkError::Http(err.into()))?; - let resp = client - .request(request_get_url) - .await - .map_err(|err| SinkError::Http(err.into()))?; - let be_url = if resp.status() == StatusCode::TEMPORARY_REDIRECT { - resp.headers() - .get("location") - .ok_or_else(|| { - SinkError::Http(anyhow::anyhow!("Can't get doris BE url in header",)) - })? - .to_str() - .map_err(|err| { - SinkError::Http(anyhow::anyhow!( - "Can't get doris BE url in header {:?}", - err - )) - })? - } else { - return Err(SinkError::Http(anyhow::anyhow!("Can't get doris BE url",))); - }; - - let (builder, client) = self.build_request_and_client(be_url.to_string()); + fn build_http_send_feature(&self, uri: &str, send_type: &'static str) -> Result<(JoinHandle>,Sender)>{ + let (builder, client) = self.build_request_and_client(uri.to_string()); let (sender, body) = Body::channel(); let request = builder .body(body) @@ -183,9 +188,18 @@ impl DorisInsertClient { .map_err(|err| SinkError::Http(err.into()))?; if status == StatusCode::OK && !raw_string.is_empty() { - let response: DorisInsertResultResponse = - serde_json::from_str(&raw_string).map_err(|err| SinkError::Http(err.into()))?; - let response = InsertResultResponse::Doris(response); + let response = if send_type.eq(DORIS){ + let response: DorisInsertResultResponse =serde_json::from_str(&raw_string).map_err(|err| SinkError::Http(err.into()))?; + InsertResultResponse::Doris(response) + }else if send_type.eq(STARROCKS) { + let response: StarrocksInsertResultResponse =serde_json::from_str(&raw_string).map_err(|err| SinkError::Http(err.into()))?; + InsertResultResponse::Starrocks(response) + }else{ + return Err(SinkError::Http(anyhow::anyhow!( + "Can't convert {:?}'s http response to struct", + send_type + ))) + }; Ok(response) } else { Err(SinkError::Http(anyhow::anyhow!( @@ -195,9 +209,46 @@ impl DorisInsertClient { ))) } }); + Ok((handle,sender)) + } + + pub async fn build_doris(&mut self) -> Result { + let (builder, client) = self.build_request_and_client(self.url.clone()); + + let request_get_url = builder + .body(Body::empty()) + .map_err(|err| SinkError::Http(err.into()))?; + let resp = client + .request(request_get_url) + .await + .map_err(|err| SinkError::Http(err.into()))?; + let be_url = if resp.status() == StatusCode::TEMPORARY_REDIRECT { + resp.headers() + .get("location") + .ok_or_else(|| { + SinkError::Http(anyhow::anyhow!("Can't get doris BE url in header",)) + })? + .to_str() + .map_err(|err| { + SinkError::Http(anyhow::anyhow!( + "Can't get doris BE url in header {:?}", + err + )) + })? + } else { + return Err(SinkError::Http(anyhow::anyhow!("Can't get doris BE url",))); + }; + + let (handle,sender) = self.build_http_send_feature(&be_url, DORIS)?; Ok(DorisInsert::new(sender, handle)) } + + pub async fn build_starrocks(&mut self) -> Result { + let (handle,sender) = self.build_http_send_feature(&self.url, STARROCKS)?; + + Ok(StarrocksInsert::new(sender, handle)) + } } pub struct DorisInsert { @@ -239,6 +290,37 @@ impl DorisInsert { } } +pub struct StarrocksInsert { + insert:Inserter, + is_first_record: bool, +} +impl StarrocksInsert { + pub fn new(sender: Sender, join_handle: JoinHandle>) -> Self { + Self { insert: Inserter::new(sender, join_handle), is_first_record: true } + } + + pub async fn write(&mut self, data: Bytes) -> Result<()> { + self.insert.write(data).await?; + Ok(()) + } + + pub async fn finish(self) -> Result { + let res = match self.insert.finish().await?{ + InsertResultResponse::Doris(_) => return Err(SinkError::Http(anyhow::anyhow!( + "Response is not starrocks"))), + InsertResultResponse::Starrocks(res) => res, + }; + + if res.status.ne("OK") { + return Err(SinkError::Http(anyhow::anyhow!( + "Insert error: {:?}", + res.message, + ))); + }; + Ok(res) + } +} + struct Inserter{ sender: Option, join_handle: Option>>, @@ -454,4 +536,79 @@ pub struct DorisInsertResultResponse { #[derive(Debug, Serialize, Deserialize)] pub struct StarrocksInsertResultResponse { + #[serde(rename = "TxnId")] + txn_id: i64, + #[serde(rename = "Label")] + label: String, + #[serde(rename = "Status")] + status: String, + #[serde(rename = "Message")] + message: String, + #[serde(rename = "NumberTotalRows")] + number_total_rows: i64, + #[serde(rename = "NumberLoadedRows")] + number_loaded_rows: i64, + #[serde(rename = "NumberFilteredRows")] + number_filtered_rows: i32, + #[serde(rename = "NumberUnselectedRows")] + number_unselected_rows: i32, + #[serde(rename = "LoadBytes")] + load_bytes: i64, + #[serde(rename = "LoadTimeMs")] + load_time_ms: i32, + #[serde(rename = "BeginTxnTimeMs")] + begin_txn_time_ms: i32, + #[serde(rename = "ReadDataTimeMs")] + read_data_time_ms: i32, + #[serde(rename = "WriteDataTimeMs")] + write_data_time_ms: i32, + #[serde(rename = "CommitAndPublishTimeMs")] + commit_and_publish_time_ms: i32, + #[serde(rename = "StreamLoadPlanTimeMs")] + stream_load_plan_time_ms: Option, +} + +pub struct StarrocksMysqlQuery{ + table: String, + db: String, + pool: MySqlPool, +} + +impl StarrocksMysqlQuery{ + pub async fn new(url: String, table: String, db: String, user: String, password: String) -> Result{ + let mysql_url = format!("mysql://{}:{}@{}/{}",user,password,url,db); + println!("{:?}",mysql_url); + + let pool = MySqlPool::connect("mysql://xxhx:123456@127.0.0.1:9030/demo") + .await + .map_err(|err| SinkError::Http(err.into()))?; + + Ok(Self { table, db, pool }) + } + + pub async fn get_schema_from_starrocks(&self) -> Result>{ + let query = format!("select column_name, column_type from information_schema.columns where table_name = {:?} and database_name = {:?};",self.table,self.db); + let row = sqlx::query(&query).fetch_all(&self.pool).await + .map_err(|err| SinkError::Http(err.into()))?; + + let query_map: HashMap = row.into_iter().map(|row| (row.get(0), row.get(1))).collect(); + Ok(query_map) + } +} + +#[cfg(test)] +mod tests { + use crate::sink::doris_connector::StarrocksMysqlQuery; + + + #[tokio::test] + async fn test_get_schema() { + let a = StarrocksMysqlQuery::new("127.0.0.1:9030".to_string(),"detailDemo".to_string(),"demo".to_string(),"xxhx".to_string(),"123456".to_string()).await; + let b = a.unwrap().get_schema_from_starrocks().await.unwrap(); + println!("{:?}",b); + } + #[tokio::test] + async fn test_insert() { + + } } \ No newline at end of file diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs new file mode 100644 index 0000000000000..e69de29bb2d1d From 023d847ee71c19c119700224e5b31a447103f525 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Sun, 8 Oct 2023 17:32:56 +0800 Subject: [PATCH 3/8] support starrocks --- Cargo.lock | 1 - src/connector/Cargo.toml | 1 - src/connector/src/common.rs | 18 + src/connector/src/sink/doris.rs | 55 ++- src/connector/src/sink/doris_connector.rs | 247 ++++++++----- src/connector/src/sink/mod.rs | 20 +- src/connector/src/sink/starrocks.rs | 429 ++++++++++++++++++++++ 7 files changed, 653 insertions(+), 118 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fe4b5d76082e6..dad996d5ca065 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6933,7 +6933,6 @@ dependencies = [ "serde_json", "serde_with 3.3.0", "simd-json", - "sqlx", "tempfile", "thiserror", "time", diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index b2eab585ecb71..bca5acedbba40 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -70,7 +70,6 @@ mysql_common = { version = "0.30", default-features = false, features = [ nexmark = { version = "0.2", features = ["serde"] } nkeys = "0.3.2" num-bigint = "0.4" -sqlx = { version = "0.7.2", features = ["mysql"] } opendal = "0.39" parking_lot = "0.12" paste = "1" diff --git a/src/connector/src/common.rs b/src/connector/src/common.rs index 2031cd8eb0b55..c92f209c0fb60 100644 --- a/src/connector/src/common.rs +++ b/src/connector/src/common.rs @@ -466,6 +466,24 @@ impl DorisCommon { } } +#[derive(Deserialize, Serialize, Debug, Clone)] +pub struct StarrocksCommon { + #[serde(rename = "starrocks.host")] + pub host: String, + #[serde(rename = "starrocks.mysqlport")] + pub mysql_port: String, + #[serde(rename = "starrocks.httpport")] + pub http_port: String, + #[serde(rename = "starrocks.user")] + pub user: String, + #[serde(rename = "starrocks.password")] + pub password: String, + #[serde(rename = "starrocks.database")] + pub database: String, + #[serde(rename = "starrocks.table")] + pub table: String, +} + #[derive(Debug, Serialize, Deserialize)] pub struct UpsertMessage<'a> { #[serde(borrow)] diff --git a/src/connector/src/sink/doris.rs b/src/connector/src/sink/doris.rs index 46c8f3687ce27..9457d26b34b30 100644 --- a/src/connector/src/sink/doris.rs +++ b/src/connector/src/sink/doris.rs @@ -25,7 +25,9 @@ use serde::Deserialize; use serde_json::Value; use serde_with::serde_as; -use super::doris_connector::{DorisField, DorisInsert, DORIS_DELETE_SIGN, InserterBuilder, HeaderBuilder}; +use super::doris_connector::{ + DorisField, DorisInsert, HeaderBuilder, InserterBuilder, DORIS_DELETE_SIGN, +}; use super::utils::doris_rows_to_json; use super::{SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; use crate::common::DorisCommon; @@ -214,28 +216,29 @@ impl DorisSinkWriter { } }); - let header = HeaderBuilder::new().add_common_header() - .set_user_password(config.common.user.clone(), config.common.password.clone()) - .add_json_format() - .add_read_json_by_line(); + let header_builder = HeaderBuilder::new() + .add_common_header() + .set_user_password(config.common.user.clone(), config.common.password.clone()) + .add_json_format() + .add_read_json_by_line(); let header = if !is_append_only { - header.add_hidden_column().build() + header_builder.add_hidden_column().build() } else { - header.build() + header_builder.build() }; let mut doris_insert_builder = InserterBuilder::new( - config.common.url.clone(), - config.common.database.clone(), - config.common.table.clone(), - header, + config.common.url.clone(), + config.common.database.clone(), + config.common.table.clone(), + header, ); let insert = Some(doris_insert_builder.build_doris().await?); Ok(Self { config, schema, pk_indices, - builder:doris_insert_builder, + builder: doris_insert_builder, is_append_only, insert, decimal_map, @@ -252,7 +255,7 @@ impl DorisSinkWriter { .to_string(); self.insert .as_mut() - .ok_or_else(|| SinkError::Doris("Can't find doris sink insert".to_string()))? + .ok_or_else(|| SinkError::Doris("Can't find starrocks sink insert".to_string()))? .write(row_json_string.into()) .await?; } @@ -274,7 +277,7 @@ impl DorisSinkWriter { self.insert .as_mut() .ok_or_else(|| { - SinkError::Doris("Can't find doris sink insert".to_string()) + SinkError::Doris("Can't find starrocks sink insert".to_string()) })? .write(row_json_string.into()) .await?; @@ -291,12 +294,28 @@ impl DorisSinkWriter { self.insert .as_mut() .ok_or_else(|| { - SinkError::Doris("Can't find doris sink insert".to_string()) + SinkError::Doris("Can't find starrocks sink insert".to_string()) + })? + .write(row_json_string.into()) + .await?; + } + Op::UpdateDelete => { + let mut row_json_value = + doris_rows_to_json(row, &self.schema, &self.decimal_map)?; + row_json_value.insert( + DORIS_DELETE_SIGN.to_string(), + Value::String("1".to_string()), + ); + let row_json_string = serde_json::to_string(&row_json_value) + .map_err(|e| SinkError::Doris(format!("Json derialize error {:?}", e)))?; + self.insert + .as_mut() + .ok_or_else(|| { + SinkError::Doris("Can't find starrocks sink insert".to_string()) })? .write(row_json_string.into()) .await?; } - Op::UpdateDelete => {} Op::UpdateInsert => { let mut row_json_value = doris_rows_to_json(row, &self.schema, &self.decimal_map)?; @@ -309,7 +328,7 @@ impl DorisSinkWriter { self.insert .as_mut() .ok_or_else(|| { - SinkError::Doris("Can't find doris sink insert".to_string()) + SinkError::Doris("Can't find starrocks sink insert".to_string()) })? .write(row_json_string.into()) .await?; @@ -346,7 +365,7 @@ impl SinkWriter for DorisSinkWriter { let insert = self .insert .take() - .ok_or_else(|| SinkError::Doris("Can't find doris inserter".to_string()))?; + .ok_or_else(|| SinkError::Doris("Can't find starrocks inserter".to_string()))?; insert.finish().await?; } Ok(()) diff --git a/src/connector/src/sink/doris_connector.rs b/src/connector/src/sink/doris_connector.rs index ff03a7f2e5370..0a31a87addfdb 100644 --- a/src/connector/src/sink/doris_connector.rs +++ b/src/connector/src/sink/doris_connector.rs @@ -24,9 +24,10 @@ use hyper::body::{Body, Sender}; use hyper::client::HttpConnector; use hyper::{body, Client, Request, StatusCode}; use hyper_tls::HttpsConnector; +use mysql_async::prelude::Queryable; +use mysql_async::Opts; use serde::{Deserialize, Serialize}; use serde_json::Value; -use sqlx::{MySqlPool, Row}; use tokio::task::JoinHandle; use super::{Result, SinkError}; @@ -35,20 +36,30 @@ const BUFFER_SIZE: usize = 64 * 1024; const MIN_CHUNK_SIZE: usize = BUFFER_SIZE - 1024; const DORIS_SUCCESS_STATUS: [&str; 2] = ["Success", "Publish Timeout"]; pub(crate) const DORIS_DELETE_SIGN: &str = "__DORIS_DELETE_SIGN__"; +pub(crate) const STARROCKS_DELETE_SIGN: &str = "__op"; +const STARROCK_MYSQL_PREFER_SOCKET: &str = "false"; +const STARROCK_MYSQL_MAX_ALLOWED_PACKET: usize = 1024; +const STARROCK_MYSQL_WAIT_TIMEOUT: usize = 31536000; const SEND_CHUNK_TIMEOUT: Duration = Duration::from_secs(10); const WAIT_HANDDLE_TIMEOUT: Duration = Duration::from_secs(10); const POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(30); -const DORIS:&str = "doris"; -const STARROCKS:&str = "starrocks"; +const DORIS: &str = "doris"; +const STARROCKS: &str = "starrocks"; pub struct HeaderBuilder { header: HashMap, } -impl HeaderBuilder{ - pub fn new() -> Self { +impl Default for HeaderBuilder { + fn default() -> Self { + Self::new() + } +} +impl HeaderBuilder { + pub fn new() -> Self { Self { header: HashMap::default(), } } + pub fn add_common_header(mut self) -> Self { self.header .insert("expect".to_string(), "100-continue".to_string()); @@ -62,6 +73,12 @@ impl HeaderBuilder{ self } + pub fn set_columns_name(mut self, columns_name: Vec<&str>) -> Self { + let columns_name_str = columns_name.join(","); + self.header.insert("columns".to_string(), columns_name_str); + self + } + /// This method is only called during upsert operations. pub fn add_hidden_column(mut self) -> Self { self.header @@ -111,8 +128,7 @@ impl HeaderBuilder{ } pub fn add_json_format(mut self) -> Self { - self.header - .insert("format".to_string(), "json".to_string()); + self.header.insert("format".to_string(), "json".to_string()); self } @@ -130,7 +146,7 @@ impl HeaderBuilder{ self } - pub fn build(self) -> HashMap{ + pub fn build(self) -> HashMap { self.header } } @@ -141,9 +157,9 @@ pub struct InserterBuilder { sender: Option, } impl InserterBuilder { - pub fn new(url: String, db: String, table: String,header: HashMap) -> Self { + pub fn new(url: String, db: String, table: String, header: HashMap) -> Self { let url = format!("{}/api/{}/{}/_stream_load", url, db, table); - + Self { url, sender: None, @@ -168,8 +184,36 @@ impl InserterBuilder { (builder, client) } - fn build_http_send_feature(&self, uri: &str, send_type: &'static str) -> Result<(JoinHandle>,Sender)>{ - let (builder, client) = self.build_request_and_client(uri.to_string()); + async fn build_http_send_feature( + &self, + send_type: &'static str, + ) -> Result<(JoinHandle>, Sender)> { + let (builder, client) = self.build_request_and_client(self.url.clone()); + let request_get_url = builder + .body(Body::empty()) + .map_err(|err| SinkError::Http(err.into()))?; + let resp = client + .request(request_get_url) + .await + .map_err(|err| SinkError::Http(err.into()))?; + let be_url = if resp.status() == StatusCode::TEMPORARY_REDIRECT { + resp.headers() + .get("location") + .ok_or_else(|| { + SinkError::Http(anyhow::anyhow!("Can't get doris BE url in header",)) + })? + .to_str() + .map_err(|err| { + SinkError::Http(anyhow::anyhow!( + "Can't get doris BE url in header {:?}", + err + )) + })? + } else { + return Err(SinkError::Http(anyhow::anyhow!("Can't get doris BE url",))); + }; + + let (builder, client) = self.build_request_and_client(be_url.to_string()); let (sender, body) = Body::channel(); let request = builder .body(body) @@ -188,17 +232,19 @@ impl InserterBuilder { .map_err(|err| SinkError::Http(err.into()))?; if status == StatusCode::OK && !raw_string.is_empty() { - let response = if send_type.eq(DORIS){ - let response: DorisInsertResultResponse =serde_json::from_str(&raw_string).map_err(|err| SinkError::Http(err.into()))?; + let response = if send_type.eq(DORIS) { + let response: DorisInsertResultResponse = serde_json::from_str(&raw_string) + .map_err(|err| SinkError::Http(err.into()))?; InsertResultResponse::Doris(response) - }else if send_type.eq(STARROCKS) { - let response: StarrocksInsertResultResponse =serde_json::from_str(&raw_string).map_err(|err| SinkError::Http(err.into()))?; + } else if send_type.eq(STARROCKS) { + let response: StarrocksInsertResultResponse = serde_json::from_str(&raw_string) + .map_err(|err| SinkError::Http(err.into()))?; InsertResultResponse::Starrocks(response) - }else{ + } else { return Err(SinkError::Http(anyhow::anyhow!( "Can't convert {:?}'s http response to struct", send_type - ))) + ))); }; Ok(response) } else { @@ -209,55 +255,32 @@ impl InserterBuilder { ))) } }); - Ok((handle,sender)) + Ok((handle, sender)) } pub async fn build_doris(&mut self) -> Result { - let (builder, client) = self.build_request_and_client(self.url.clone()); - - let request_get_url = builder - .body(Body::empty()) - .map_err(|err| SinkError::Http(err.into()))?; - let resp = client - .request(request_get_url) - .await - .map_err(|err| SinkError::Http(err.into()))?; - let be_url = if resp.status() == StatusCode::TEMPORARY_REDIRECT { - resp.headers() - .get("location") - .ok_or_else(|| { - SinkError::Http(anyhow::anyhow!("Can't get doris BE url in header",)) - })? - .to_str() - .map_err(|err| { - SinkError::Http(anyhow::anyhow!( - "Can't get doris BE url in header {:?}", - err - )) - })? - } else { - return Err(SinkError::Http(anyhow::anyhow!("Can't get doris BE url",))); - }; - - let (handle,sender) = self.build_http_send_feature(&be_url, DORIS)?; + let (handle, sender) = self.build_http_send_feature(DORIS).await?; Ok(DorisInsert::new(sender, handle)) } pub async fn build_starrocks(&mut self) -> Result { - let (handle,sender) = self.build_http_send_feature(&self.url, STARROCKS)?; + let (handle, sender) = self.build_http_send_feature(STARROCKS).await?; Ok(StarrocksInsert::new(sender, handle)) } } pub struct DorisInsert { - insert:Inserter, + insert: InserterInner, is_first_record: bool, } impl DorisInsert { pub fn new(sender: Sender, join_handle: JoinHandle>) -> Self { - Self { insert: Inserter::new(sender, join_handle), is_first_record: true } + Self { + insert: InserterInner::new(sender, join_handle), + is_first_record: true, + } } pub async fn write(&mut self, data: Bytes) -> Result<()> { @@ -265,7 +288,7 @@ impl DorisInsert { if self.is_first_record { self.is_first_record = false; } else { - data_build.put_slice("\n".as_bytes().into()); + data_build.put_slice("\n".as_bytes()); } data_build.put_slice(&data); self.insert.write(data_build.into()).await?; @@ -273,10 +296,11 @@ impl DorisInsert { } pub async fn finish(self) -> Result { - let res = match self.insert.finish().await?{ + let res = match self.insert.finish().await? { InsertResultResponse::Doris(doris_res) => doris_res, - InsertResultResponse::Starrocks(_) => return Err(SinkError::Http(anyhow::anyhow!( - "Response is not doris"))), + InsertResultResponse::Starrocks(_) => { + return Err(SinkError::Http(anyhow::anyhow!("Response is not doris"))) + } }; if !DORIS_SUCCESS_STATUS.contains(&res.status.as_str()) { @@ -291,12 +315,13 @@ impl DorisInsert { } pub struct StarrocksInsert { - insert:Inserter, - is_first_record: bool, + insert: InserterInner, } impl StarrocksInsert { pub fn new(sender: Sender, join_handle: JoinHandle>) -> Self { - Self { insert: Inserter::new(sender, join_handle), is_first_record: true } + Self { + insert: InserterInner::new(sender, join_handle), + } } pub async fn write(&mut self, data: Bytes) -> Result<()> { @@ -305,13 +330,16 @@ impl StarrocksInsert { } pub async fn finish(self) -> Result { - let res = match self.insert.finish().await?{ - InsertResultResponse::Doris(_) => return Err(SinkError::Http(anyhow::anyhow!( - "Response is not starrocks"))), + let res = match self.insert.finish().await? { + InsertResultResponse::Doris(_) => { + return Err(SinkError::Http(anyhow::anyhow!( + "Response is not starrocks" + ))) + } InsertResultResponse::Starrocks(res) => res, }; - if res.status.ne("OK") { + if !DORIS_SUCCESS_STATUS.contains(&res.status.as_str()) { return Err(SinkError::Http(anyhow::anyhow!( "Insert error: {:?}", res.message, @@ -321,17 +349,17 @@ impl StarrocksInsert { } } -struct Inserter{ +struct InserterInner { sender: Option, join_handle: Option>>, buffer: BytesMut, } -impl Inserter{ +impl InserterInner { pub fn new(sender: Sender, join_handle: JoinHandle>) -> Self { Self { sender: Some(sender), join_handle: Some(join_handle), - buffer: BytesMut::with_capacity(BUFFER_SIZE) + buffer: BytesMut::with_capacity(BUFFER_SIZE), } } @@ -492,7 +520,7 @@ impl DorisField { } } -pub enum InsertResultResponse{ +pub enum InsertResultResponse { Doris(DorisInsertResultResponse), Starrocks(StarrocksInsertResultResponse), } @@ -568,47 +596,72 @@ pub struct StarrocksInsertResultResponse { stream_load_plan_time_ms: Option, } -pub struct StarrocksMysqlQuery{ +pub struct StarrocksMysqlQuery { table: String, db: String, - pool: MySqlPool, + conn: mysql_async::Conn, } -impl StarrocksMysqlQuery{ - pub async fn new(url: String, table: String, db: String, user: String, password: String) -> Result{ - let mysql_url = format!("mysql://{}:{}@{}/{}",user,password,url,db); - println!("{:?}",mysql_url); - - let pool = MySqlPool::connect("mysql://xxhx:123456@127.0.0.1:9030/demo") - .await - .map_err(|err| SinkError::Http(err.into()))?; +impl StarrocksMysqlQuery { + pub async fn new( + host: String, + port: String, + table: String, + db: String, + user: String, + password: String, + ) -> Result { + let conn_uri = format!( + "mysql://{}:{}@{}:{}/{}?prefer_socket={}&max_allowed_packet={}&wait_timeout={}", + user, + password, + host, + port, + db, + STARROCK_MYSQL_PREFER_SOCKET, + STARROCK_MYSQL_MAX_ALLOWED_PACKET, + STARROCK_MYSQL_WAIT_TIMEOUT + ); + let pool = mysql_async::Pool::new( + Opts::from_url(&conn_uri).map_err(|err| SinkError::Http(err.into()))?, + ); + let conn = pool + .get_conn() + .await + .map_err(|err| SinkError::Http(err.into()))?; - Ok(Self { table, db, pool }) + Ok(Self { table, db, conn }) } - pub async fn get_schema_from_starrocks(&self) -> Result>{ - let query = format!("select column_name, column_type from information_schema.columns where table_name = {:?} and database_name = {:?};",self.table,self.db); - let row = sqlx::query(&query).fetch_all(&self.pool).await - .map_err(|err| SinkError::Http(err.into()))?; - - let query_map: HashMap = row.into_iter().map(|row| (row.get(0), row.get(1))).collect(); + pub async fn get_columns_from_starrocks(&mut self) -> Result> { + let query = format!("select column_name, column_type from information_schema.columns where table_name = {:?} and table_schema = {:?};",self.table,self.db); + let mut query_map: HashMap = HashMap::default(); + self.conn + .query_map(query, |(column_name, column_type)| { + query_map.insert(column_name, column_type) + }) + .await + .map_err(|err| SinkError::Http(err.into()))?; Ok(query_map) } -} - -#[cfg(test)] -mod tests { - use crate::sink::doris_connector::StarrocksMysqlQuery; - - - #[tokio::test] - async fn test_get_schema() { - let a = StarrocksMysqlQuery::new("127.0.0.1:9030".to_string(),"detailDemo".to_string(),"demo".to_string(),"xxhx".to_string(),"123456".to_string()).await; - let b = a.unwrap().get_schema_from_starrocks().await.unwrap(); - println!("{:?}",b); - } - #[tokio::test] - async fn test_insert() { + pub async fn get_pk_from_starrocks(&mut self) -> Result<(String, String)> { + let query = format!("select table_model, primary_key from information_schema.tables_config where table_name = {:?} and table_schema = {:?};",self.table,self.db); + let table_mode_pk: (String, String) = self + .conn + .query_map(query, |(table_model, primary_key)| { + (table_model, primary_key) + }) + .await + .map_err(|err| SinkError::Http(err.into()))? + .get(0) + .ok_or_else(|| { + SinkError::Starrocks(format!( + "Can't find schema with table {:?} and database {:?}", + self.table, self.db + )) + })? + .clone(); + Ok(table_mode_pk) } -} \ No newline at end of file +} diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index f1f12ed323d7b..dd0baadbb33a7 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -27,6 +27,7 @@ pub mod nats; pub mod pulsar; pub mod redis; pub mod remote; +pub mod starrocks; #[cfg(any(test, madsim))] pub mod test_sink; pub mod utils; @@ -56,6 +57,7 @@ use self::encoder::SerTo; use self::formatter::SinkFormatter; use self::iceberg::{IcebergSink, ICEBERG_SINK, REMOTE_ICEBERG_SINK}; use self::pulsar::{PulsarConfig, PulsarSink}; +use self::starrocks::{StarrocksConfig, StarrocksSink}; use crate::sink::boxed::BoxSink; use crate::sink::catalog::{SinkCatalog, SinkId}; use crate::sink::clickhouse::CLICKHOUSE_SINK; @@ -67,6 +69,7 @@ use crate::sink::nats::{NatsConfig, NatsSink, NATS_SINK}; use crate::sink::pulsar::PULSAR_SINK; use crate::sink::redis::{RedisConfig, RedisSink}; use crate::sink::remote::{CoordinatedRemoteSink, RemoteConfig, RemoteSink}; +use crate::sink::starrocks::STARROCKS_SINK; #[cfg(any(test, madsim))] use crate::sink::test_sink::{build_test_sink, TEST_SINK_NAME}; use crate::ConnectorParams; @@ -332,6 +335,7 @@ pub enum SinkConfig { BlackHole, ClickHouse(Box), Doris(Box), + Starrocks(Box), Nats(NatsConfig), #[cfg(any(test, madsim))] Test, @@ -397,6 +401,9 @@ impl SinkConfig { DORIS_SINK => Ok(SinkConfig::Doris(Box::new(DorisConfig::from_hashmap( properties, )?))), + STARROCKS_SINK => Ok(SinkConfig::Starrocks(Box::new( + StarrocksConfig::from_hashmap(properties)?, + ))), BLACKHOLE_SINK => Ok(SinkConfig::BlackHole), PULSAR_SINK => Ok(SinkConfig::Pulsar(PulsarConfig::from_hashmap(properties)?)), REMOTE_ICEBERG_SINK => Ok(SinkConfig::RemoteIceberg( @@ -429,6 +436,7 @@ pub enum SinkImpl { Kinesis(KinesisSink), ClickHouse(ClickHouseSink), Doris(DorisSink), + Starrocks(StarrocksSink), Iceberg(IcebergSink), Nats(NatsSink), RemoteIceberg(RemoteIcebergSink), @@ -450,6 +458,7 @@ impl SinkImpl { SinkImpl::RemoteIceberg(_) => "iceberg_java", SinkImpl::TestSink(_) => "test", SinkImpl::Doris(_) => "doris", + SinkImpl::Starrocks(_) => "starrocks", } } } @@ -468,6 +477,7 @@ macro_rules! dispatch_sink { SinkImpl::Kinesis($sink) => $body, SinkImpl::ClickHouse($sink) => $body, SinkImpl::Doris($sink) => $body, + SinkImpl::Starrocks($sink) => $body, SinkImpl::Iceberg($sink) => $body, SinkImpl::Nats($sink) => $body, SinkImpl::RemoteIceberg($sink) => $body, @@ -508,6 +518,12 @@ impl SinkImpl { param.downstream_pk, param.sink_type.is_append_only(), )?), + SinkConfig::Starrocks(cfg) => SinkImpl::Starrocks(StarrocksSink::new( + *cfg, + param.schema(), + param.downstream_pk, + param.sink_type.is_append_only(), + )?), }) } } @@ -534,10 +550,12 @@ pub enum SinkError { ClickHouse(String), #[error("Nats error: {0}")] Nats(anyhow::Error), - #[error("Doris http error: {0}")] + #[error("Doris/Starrocks connect error: {0}")] Http(anyhow::Error), #[error("Doris error: {0}")] Doris(String), + #[error("Starrocks error: {0}")] + Starrocks(String), #[error("Pulsar error: {0}")] Pulsar(anyhow::Error), #[error("Internal error: {0}")] diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs index e69de29bb2d1d..dd1e1abf50d6f 100644 --- a/src/connector/src/sink/starrocks.rs +++ b/src/connector/src/sink/starrocks.rs @@ -0,0 +1,429 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use anyhow::anyhow; +use async_trait::async_trait; +use itertools::Itertools; +use risingwave_common::array::{Op, StreamChunk}; +use risingwave_common::buffer::Bitmap; +use risingwave_common::catalog::Schema; +use risingwave_common::types::DataType; +use serde::Deserialize; +use serde_json::Value; +use serde_with::serde_as; + +use super::doris_connector::{ + HeaderBuilder, InserterBuilder, StarrocksInsert, StarrocksMysqlQuery, STARROCKS_DELETE_SIGN, +}; +use super::utils::doris_rows_to_json; +use super::{SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; +use crate::common::StarrocksCommon; +use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam}; + +pub const STARROCKS_SINK: &str = "starrocks"; +#[serde_as] +#[derive(Clone, Debug, Deserialize)] +pub struct StarrocksConfig { + #[serde(flatten)] + pub common: StarrocksCommon, + + pub r#type: String, // accept "append-only" or "upsert" +} +impl StarrocksConfig { + pub fn from_hashmap(properties: HashMap) -> Result { + let config = + serde_json::from_value::(serde_json::to_value(properties).unwrap()) + .map_err(|e| SinkError::Config(anyhow!(e)))?; + if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT { + return Err(SinkError::Config(anyhow!( + "`{}` must be {}, or {}", + SINK_TYPE_OPTION, + SINK_TYPE_APPEND_ONLY, + SINK_TYPE_UPSERT + ))); + } + Ok(config) + } +} + +#[derive(Debug)] +pub struct StarrocksSink { + pub config: StarrocksConfig, + schema: Schema, + pk_indices: Vec, + is_append_only: bool, +} + +impl StarrocksSink { + pub fn new( + config: StarrocksConfig, + schema: Schema, + pk_indices: Vec, + is_append_only: bool, + ) -> Result { + Ok(Self { + config, + schema, + pk_indices, + is_append_only, + }) + } +} + +impl StarrocksSink { + fn check_column_name_and_type( + &self, + starrocks_columns_desc: HashMap, + ) -> Result<()> { + let rw_fields_name = self.schema.fields(); + if rw_fields_name.len().ne(&starrocks_columns_desc.len()) { + return Err(SinkError::Starrocks("The length of the RisingWave column must be equal to the length of the starrocks column".to_string())); + } + + for i in rw_fields_name { + let value = starrocks_columns_desc.get(&i.name).ok_or_else(|| { + SinkError::Starrocks(format!( + "Column name don't find in starrocks, risingwave is {:?} ", + i.name + )) + })?; + if !Self::check_and_correct_column_type(&i.data_type, value.to_string())? { + return Err(SinkError::Starrocks(format!( + "Column type don't match, column name is {:?}. starrocks type is {:?} risingwave type is {:?} ",i.name,value,i.data_type + ))); + } + } + Ok(()) + } + + fn check_and_correct_column_type( + rw_data_type: &DataType, + starrocks_data_type: String, + ) -> Result { + match rw_data_type { + risingwave_common::types::DataType::Boolean => { + Ok(starrocks_data_type.contains("tinyint")) + } + risingwave_common::types::DataType::Int16 => { + Ok(starrocks_data_type.contains("smallint")) + } + risingwave_common::types::DataType::Int32 => Ok(starrocks_data_type.contains("int")), + risingwave_common::types::DataType::Int64 => Ok(starrocks_data_type.contains("bigint")), + risingwave_common::types::DataType::Float32 => { + Ok(starrocks_data_type.contains("float")) + } + risingwave_common::types::DataType::Float64 => { + Ok(starrocks_data_type.contains("double")) + } + risingwave_common::types::DataType::Decimal => { + Ok(starrocks_data_type.contains("decimal")) + } + risingwave_common::types::DataType::Date => Ok(starrocks_data_type.contains("date")), + risingwave_common::types::DataType::Varchar => { + Ok(starrocks_data_type.contains("varchar")) + } + risingwave_common::types::DataType::Time => Err(SinkError::Starrocks( + "starrocks can not support Time".to_string(), + )), + risingwave_common::types::DataType::Timestamp => { + Ok(starrocks_data_type.contains("datetime")) + } + risingwave_common::types::DataType::Timestamptz => Err(SinkError::Starrocks( + "starrocks can not support Timestamptz".to_string(), + )), + risingwave_common::types::DataType::Interval => Err(SinkError::Starrocks( + "starrocks can not support Interval".to_string(), + )), + // todo! Validate the type struct and list + risingwave_common::types::DataType::Struct(_) => Err(SinkError::Starrocks( + "starrocks can not support import struct".to_string(), + )), + risingwave_common::types::DataType::List(_) => { + Ok(starrocks_data_type.contains("unknown")) + } + risingwave_common::types::DataType::Bytea => Err(SinkError::Starrocks( + "starrocks can not support Bytea".to_string(), + )), + risingwave_common::types::DataType::Jsonb => Err(SinkError::Starrocks( + "starrocks can not support import json".to_string(), + )), + risingwave_common::types::DataType::Serial => { + Ok(starrocks_data_type.contains("bigint")) + } + risingwave_common::types::DataType::Int256 => Err(SinkError::Starrocks( + "starrocks can not support Int256".to_string(), + )), + } + } +} + +#[async_trait] +impl Sink for StarrocksSink { + type Coordinator = DummySinkCommitCoordinator; + type Writer = StarrocksSinkWriter; + + async fn new_writer(&self, _writer_env: SinkWriterParam) -> Result { + Ok(StarrocksSinkWriter::new( + self.config.clone(), + self.schema.clone(), + self.pk_indices.clone(), + self.is_append_only, + ) + .await?) + } + + async fn validate(&self) -> Result<()> { + if !self.is_append_only && self.pk_indices.is_empty() { + return Err(SinkError::Config(anyhow!( + "Primary key not defined for upsert starrocks sink (please define in `primary_key` field)"))); + } + // check reachability + let mut client = StarrocksMysqlQuery::new( + self.config.common.host.clone(), + self.config.common.mysql_port.clone(), + self.config.common.table.clone(), + self.config.common.database.clone(), + self.config.common.user.clone(), + self.config.common.password.clone(), + ) + .await?; + let (read_model, pks) = client.get_pk_from_starrocks().await?; + + if !self.is_append_only && read_model.ne("PRIMARY_KEYS") { + return Err(SinkError::Config(anyhow!( + "If you want to use upsert, please set the keysType of starrocks to PRIMARY_KEY" + ))); + } + + for (index, filed) in self.schema.fields().iter().enumerate() { + if self.pk_indices.contains(&index) && !pks.contains(&filed.name) { + return Err(SinkError::Starrocks(format!( + "Can't find pk {:?} in starrocks", + filed.name + ))); + } + } + + let starrocks_columns_desc = client.get_columns_from_starrocks().await?; + + self.check_column_name_and_type(starrocks_columns_desc)?; + Ok(()) + } +} + +pub struct StarrocksSinkWriter { + pub config: StarrocksConfig, + schema: Schema, + pk_indices: Vec, + builder: InserterBuilder, + is_append_only: bool, + insert: Option, + decimal_map: HashMap, +} + +impl StarrocksSinkWriter { + pub async fn new( + config: StarrocksConfig, + schema: Schema, + pk_indices: Vec, + is_append_only: bool, + ) -> Result { + let mut decimal_map = HashMap::default(); + let starrocks_columns = StarrocksMysqlQuery::new( + config.common.host.clone(), + config.common.mysql_port.clone(), + config.common.table.clone(), + config.common.database.clone(), + config.common.user.clone(), + config.common.password.clone(), + ) + .await? + .get_columns_from_starrocks() + .await?; + + for (name, column_type) in &starrocks_columns { + if column_type.contains("decimal") { + let decimal_all = column_type + .split("decimal(") + .last() + .ok_or_else(|| SinkError::Starrocks("must have last".to_string()))? + .split(')') + .next() + .ok_or_else(|| SinkError::Starrocks("must have next".to_string()))? + .split(',') + .collect_vec(); + let length = decimal_all + .first() + .ok_or_else(|| SinkError::Starrocks("must have next".to_string()))? + .parse::() + .map_err(|e| SinkError::Starrocks(format!("starrocks sink error {}", e)))?; + + let scale = decimal_all + .last() + .ok_or_else(|| SinkError::Starrocks("must have next".to_string()))? + .parse::() + .map_err(|e| SinkError::Starrocks(format!("starrocks sink error {}", e)))?; + decimal_map.insert(name.to_string(), (length, scale)); + } + } + + let builder = HeaderBuilder::new() + .add_common_header() + .set_user_password(config.common.user.clone(), config.common.password.clone()) + .add_json_format(); + let header = if !is_append_only { + let mut fields_name = schema.names_str(); + fields_name.push(STARROCKS_DELETE_SIGN); + builder.set_columns_name(fields_name).build() + } else { + builder.build() + }; + + let starrocks_insert_builder = InserterBuilder::new( + format!("http://{}:{}", config.common.host, config.common.http_port), + config.common.database.clone(), + config.common.table.clone(), + header, + ); + // let insert = Some(starrocks_insert_builder.build_starrocks().await?); + Ok(Self { + config, + schema, + pk_indices, + builder: starrocks_insert_builder, + is_append_only, + insert: None, + decimal_map, + }) + } + + async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> { + for (op, row) in chunk.rows() { + if op != Op::Insert { + continue; + } + let row_json_string = + Value::Object(doris_rows_to_json(row, &self.schema, &self.decimal_map)?) + .to_string(); + self.insert + .as_mut() + .ok_or_else(|| { + SinkError::Starrocks("Can't find starrocks sink insert".to_string()) + })? + .write(row_json_string.into()) + .await?; + } + Ok(()) + } + + async fn upsert(&mut self, chunk: StreamChunk) -> Result<()> { + for (op, row) in chunk.rows() { + match op { + Op::Insert => { + let mut row_json_value = + doris_rows_to_json(row, &self.schema, &self.decimal_map)?; + row_json_value.insert( + STARROCKS_DELETE_SIGN.to_string(), + Value::String("0".to_string()), + ); + let row_json_string = serde_json::to_string(&row_json_value) + .map_err(|e| SinkError::Doris(format!("Json derialize error {:?}", e)))?; + self.insert + .as_mut() + .ok_or_else(|| { + SinkError::Doris("Can't find doris sink insert".to_string()) + })? + .write(row_json_string.into()) + .await?; + } + Op::Delete => { + let mut row_json_value = + doris_rows_to_json(row, &self.schema, &self.decimal_map)?; + row_json_value.insert( + STARROCKS_DELETE_SIGN.to_string(), + Value::String("1".to_string()), + ); + let row_json_string = serde_json::to_string(&row_json_value) + .map_err(|e| SinkError::Doris(format!("Json derialize error {:?}", e)))?; + self.insert + .as_mut() + .ok_or_else(|| { + SinkError::Doris("Can't find doris sink insert".to_string()) + })? + .write(row_json_string.into()) + .await?; + } + Op::UpdateDelete => {} + Op::UpdateInsert => { + let mut row_json_value = + doris_rows_to_json(row, &self.schema, &self.decimal_map)?; + row_json_value.insert( + STARROCKS_DELETE_SIGN.to_string(), + Value::String("0".to_string()), + ); + let row_json_string = serde_json::to_string(&row_json_value) + .map_err(|e| SinkError::Doris(format!("Json derialize error {:?}", e)))?; + self.insert + .as_mut() + .ok_or_else(|| { + SinkError::Doris("Can't find doris sink insert".to_string()) + })? + .write(row_json_string.into()) + .await?; + } + } + } + Ok(()) + } +} + +#[async_trait] +impl SinkWriter for StarrocksSinkWriter { + async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { + if self.insert.is_none() { + self.insert = Some(self.builder.build_starrocks().await?); + } + if self.is_append_only { + self.append_only(chunk).await + } else { + self.upsert(chunk).await + } + } + + async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> { + Ok(()) + } + + async fn abort(&mut self) -> Result<()> { + Ok(()) + } + + async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> { + if self.insert.is_some() { + let insert = self + .insert + .take() + .ok_or_else(|| SinkError::Starrocks("Can't find starrocks inserter".to_string()))?; + insert.finish().await?; + } + Ok(()) + } + + async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Arc) -> Result<()> { + Ok(()) + } +} From 6815c9dfad18966c2100a58855aeb43c47bf9783 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Sun, 8 Oct 2023 17:46:21 +0800 Subject: [PATCH 4/8] fix cargo lock --- Cargo.lock | 33 +++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dad996d5ca065..3cbeb7ef0c52b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2828,12 +2828,13 @@ dependencies = [ [[package]] name = "flume" -version = "0.11.0" +version = "0.10.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55ac459de2512911e4b674ce33cf20befaba382d05b62b008afc1c8b57cbf181" +checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577" dependencies = [ "futures-core", "futures-sink", + "pin-project", "spin 0.9.8", ] @@ -8649,9 +8650,9 @@ dependencies = [ [[package]] name = "sqlx" -version = "0.7.2" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e50c216e3624ec8e7ecd14c6a6a6370aad6ee5d8cfc3ab30b5162eeeef2ed33" +checksum = "8e58421b6bc416714d5115a2ca953718f6c621a51b68e4f4922aea5a4391a721" dependencies = [ "sqlx-core", "sqlx-macros", @@ -8662,9 +8663,9 @@ dependencies = [ [[package]] name = "sqlx-core" -version = "0.7.2" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d6753e460c998bbd4cd8c6f0ed9a64346fcca0723d6e75e52fdc351c5d2169d" +checksum = "dd4cef4251aabbae751a3710927945901ee1d97ee96d757f6880ebb9a79bfd53" dependencies = [ "ahash 0.8.3", "atoi", @@ -8703,9 +8704,9 @@ dependencies = [ [[package]] name = "sqlx-macros" -version = "0.7.2" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a793bb3ba331ec8359c1853bd39eed32cdd7baaf22c35ccf5c92a7e8d1189ec" +checksum = "208e3165167afd7f3881b16c1ef3f2af69fa75980897aac8874a0696516d12c2" dependencies = [ "proc-macro2", "quote", @@ -8716,9 +8717,9 @@ dependencies = [ [[package]] name = "sqlx-macros-core" -version = "0.7.2" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a4ee1e104e00dedb6aa5ffdd1343107b0a4702e862a84320ee7cc74782d96fc" +checksum = "8a4a8336d278c62231d87f24e8a7a74898156e34c1c18942857be2acb29c7dfc" dependencies = [ "dotenvy", "either", @@ -8742,9 +8743,9 @@ dependencies = [ [[package]] name = "sqlx-mysql" -version = "0.7.2" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "864b869fdf56263f4c95c45483191ea0af340f9f3e3e7b4d57a61c7c87a970db" +checksum = "8ca69bf415b93b60b80dc8fda3cb4ef52b2336614d8da2de5456cc942a110482" dependencies = [ "atoi", "base64 0.21.3", @@ -8785,9 +8786,9 @@ dependencies = [ [[package]] name = "sqlx-postgres" -version = "0.7.2" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb7ae0e6a97fb3ba33b23ac2671a5ce6e3cabe003f451abd5a56e7951d975624" +checksum = "a0db2df1b8731c3651e204629dd55e52adbae0462fa1bdcbed56a2302c18181e" dependencies = [ "atoi", "base64 0.21.3", @@ -8825,9 +8826,9 @@ dependencies = [ [[package]] name = "sqlx-sqlite" -version = "0.7.2" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d59dc83cf45d89c555a577694534fcd1b55c545a816c816ce51f20bbe56a4f3f" +checksum = "be4c21bf34c7cae5b283efb3ac1bcc7670df7561124dc2f8bdc0b59be40f79a2" dependencies = [ "atoi", "chrono", From 662377155608774961d9ad6feb304be3d3488ec6 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Sun, 8 Oct 2023 18:12:19 +0800 Subject: [PATCH 5/8] fmt --- src/connector/src/sink/doris.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/connector/src/sink/doris.rs b/src/connector/src/sink/doris.rs index 5287772b976ea..075d0474f4c1b 100644 --- a/src/connector/src/sink/doris.rs +++ b/src/connector/src/sink/doris.rs @@ -278,7 +278,7 @@ impl DorisSinkWriter { let row_json_string = Value::Object(self.row_encoder.encode(row)?).to_string(); self.insert .as_mut() - .ok_or_else(|| SinkError::Doris("Can't find starrocks sink insert".to_string()))? + .ok_or_else(|| SinkError::Doris("Can't find doris sink insert".to_string()))? .write(row_json_string.into()) .await?; } @@ -299,7 +299,7 @@ impl DorisSinkWriter { self.insert .as_mut() .ok_or_else(|| { - SinkError::Doris("Can't find starrocks sink insert".to_string()) + SinkError::Doris("Can't find doris sink insert".to_string()) })? .write(row_json_string.into()) .await?; @@ -315,7 +315,7 @@ impl DorisSinkWriter { self.insert .as_mut() .ok_or_else(|| { - SinkError::Doris("Can't find starrocks sink insert".to_string()) + SinkError::Doris("Can't find doris sink insert".to_string()) })? .write(row_json_string.into()) .await?; @@ -332,7 +332,7 @@ impl DorisSinkWriter { self.insert .as_mut() .ok_or_else(|| { - SinkError::Doris("Can't find starrocks sink insert".to_string()) + SinkError::Doris("Can't find doris sink insert".to_string()) })? .write(row_json_string.into()) .await?; @@ -369,7 +369,7 @@ impl SinkWriter for DorisSinkWriter { let insert = self .insert .take() - .ok_or_else(|| SinkError::Doris("Can't find starrocks inserter".to_string()))?; + .ok_or_else(|| SinkError::Doris("Can't find doris inserter".to_string()))?; insert.finish().await?; } Ok(()) From aed8ef7e6a4214e198d589ec32ba93c983cfbf96 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Sun, 8 Oct 2023 18:21:11 +0800 Subject: [PATCH 6/8] add integration test fix fix fix --- integration_tests/starrocks-sink/README.md | 56 +++++++++++++ .../append-only-sql/create_mv.sql | 7 ++ .../append-only-sql/create_sink.sql | 14 ++++ .../append-only-sql/create_source.sql | 18 +++++ .../starrocks-sink/docker-compose.yml | 78 +++++++++++++++++++ .../starrocks-sink/upsert/create_mv.sql | 7 ++ .../starrocks-sink/upsert/create_sink.sql | 14 ++++ .../starrocks-sink/upsert/create_table.sql | 10 +++ .../upsert/insert_update_delete.sql | 8 ++ src/connector/src/sink/encoder/json.rs | 7 +- src/connector/src/sink/mod.rs | 1 + src/connector/src/sink/starrocks.rs | 22 +++--- 12 files changed, 229 insertions(+), 13 deletions(-) create mode 100644 integration_tests/starrocks-sink/README.md create mode 100644 integration_tests/starrocks-sink/append-only-sql/create_mv.sql create mode 100644 integration_tests/starrocks-sink/append-only-sql/create_sink.sql create mode 100644 integration_tests/starrocks-sink/append-only-sql/create_source.sql create mode 100644 integration_tests/starrocks-sink/docker-compose.yml create mode 100644 integration_tests/starrocks-sink/upsert/create_mv.sql create mode 100644 integration_tests/starrocks-sink/upsert/create_sink.sql create mode 100644 integration_tests/starrocks-sink/upsert/create_table.sql create mode 100644 integration_tests/starrocks-sink/upsert/insert_update_delete.sql diff --git a/integration_tests/starrocks-sink/README.md b/integration_tests/starrocks-sink/README.md new file mode 100644 index 0000000000000..f65b4b9406685 --- /dev/null +++ b/integration_tests/starrocks-sink/README.md @@ -0,0 +1,56 @@ +# Demo: Sinking to Starrocks + +In this demo, we want to showcase how RisingWave is able to sink data to Starrocks. + + +1. Launch the cluster: + +```sh +docker-compose up -d +``` + +The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data, a Starrocks fe and be for sink. + +2. Create the Starrocks table via mysql: + +Login to mysql +```sh +docker compose exec starrocks-fe mysql -uroot -P9030 -h127.0.0.1 +``` + +Run the following queries to create database and table. +```sql +CREATE database demo; +use demo; + +CREATE table demo_bhv_table( + user_id int, + target_id text, + event_timestamp datetime +) ENGINE=OLAP +PRIMARY KEY(`user_id`) +DISTRIBUTED BY HASH(`user_id`) properties("replication_num" = "1"); + +CREATE USER 'users'@'%' IDENTIFIED BY '123456'; +GRANT ALL ON *.* TO 'users'@'%'; +``` + +3. Execute the SQL queries in sequence: + +- append-only sql: + - append-only/create_source.sql + - append-only/create_mv.sql + - append-only/create_sink.sql + +- upsert sql: + - upsert/create_table.sql + - upsert/create_mv.sql + - upsert/create_sink.sql + - upsert/insert_update_delete.sql + +We only support `upsert` with starrocks' `PRIMARY KEY` + +Run the following query +```sql +select user_id, count(*) from demo.demo_bhv_table group by user_id; +``` diff --git a/integration_tests/starrocks-sink/append-only-sql/create_mv.sql b/integration_tests/starrocks-sink/append-only-sql/create_mv.sql new file mode 100644 index 0000000000000..0a803f8a2762d --- /dev/null +++ b/integration_tests/starrocks-sink/append-only-sql/create_mv.sql @@ -0,0 +1,7 @@ +CREATE MATERIALIZED VIEW bhv_mv AS +SELECT + user_id, + target_id, + event_timestamp +FROM + user_behaviors; \ No newline at end of file diff --git a/integration_tests/starrocks-sink/append-only-sql/create_sink.sql b/integration_tests/starrocks-sink/append-only-sql/create_sink.sql new file mode 100644 index 0000000000000..56d1b227512de --- /dev/null +++ b/integration_tests/starrocks-sink/append-only-sql/create_sink.sql @@ -0,0 +1,14 @@ +CREATE SINK bhv_starrocks_sink +FROM + bhv_mv WITH ( + connector = 'starrocks', + type = 'append-only', + starrocks.host = 'starrocks-fe', + starrocks.mysqlport = '9030', + starrocks.httpport = '8030', + starrocks.user = 'users', + starrocks.password = '123456', + starrocks.database = 'demo', + starrocks.table = 'demo_bhv_table', + force_append_only='true' +); \ No newline at end of file diff --git a/integration_tests/starrocks-sink/append-only-sql/create_source.sql b/integration_tests/starrocks-sink/append-only-sql/create_source.sql new file mode 100644 index 0000000000000..c28c10f3616da --- /dev/null +++ b/integration_tests/starrocks-sink/append-only-sql/create_source.sql @@ -0,0 +1,18 @@ +CREATE table user_behaviors ( + user_id int, + target_id VARCHAR, + target_type VARCHAR, + event_timestamp TIMESTAMP, + behavior_type VARCHAR, + parent_target_type VARCHAR, + parent_target_id VARCHAR, + PRIMARY KEY(user_id) +) WITH ( + connector = 'datagen', + fields.user_id.kind = 'sequence', + fields.user_id.start = '1', + fields.user_id.end = '1000', + fields.user_name.kind = 'random', + fields.user_name.length = '10', + datagen.rows.per.second = '10' +) FORMAT PLAIN ENCODE JSON; \ No newline at end of file diff --git a/integration_tests/starrocks-sink/docker-compose.yml b/integration_tests/starrocks-sink/docker-compose.yml new file mode 100644 index 0000000000000..1933853c16915 --- /dev/null +++ b/integration_tests/starrocks-sink/docker-compose.yml @@ -0,0 +1,78 @@ +--- +version: "3" +services: + starrocks-fe: + image: starrocks/fe-ubuntu:latest + hostname: starrocks-fe + container_name: starrocks-fe + command: + /opt/starrocks/fe/bin/start_fe.sh + ports: + - 8030:8030 + - 9020:9020 + - 9030:9030 + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9030"] + interval: 5s + timeout: 5s + retries: 30 + starrocks-be: + image: starrocks/be-ubuntu:latest + command: + - /bin/bash + - -c + - | + sleep 15s; mysql --connect-timeout 2 -h starrocks-fe -P9030 -uroot -e "alter system add backend \"starrocks-be:9050\";" + /opt/starrocks/be/bin/start_be.sh + ports: + - 8040:8040 + hostname: starrocks-be + container_name: starrocks-be + depends_on: + - starrocks-fe + compactor-0: + extends: + file: ../../docker/docker-compose.yml + service: compactor-0 + compute-node-0: + extends: + file: ../../docker/docker-compose.yml + service: compute-node-0 + etcd-0: + extends: + file: ../../docker/docker-compose.yml + service: etcd-0 + frontend-node-0: + extends: + file: ../../docker/docker-compose.yml + service: frontend-node-0 + grafana-0: + extends: + file: ../../docker/docker-compose.yml + service: grafana-0 + meta-node-0: + extends: + file: ../../docker/docker-compose.yml + service: meta-node-0 + minio-0: + extends: + file: ../../docker/docker-compose.yml + service: minio-0 + prometheus-0: + extends: + file: ../../docker/docker-compose.yml + service: prometheus-0 +volumes: + compute-node-0: + external: false + etcd-0: + external: false + grafana-0: + external: false + minio-0: + external: false + prometheus-0: + external: false + message_queue: + external: false +name: risingwave-compose \ No newline at end of file diff --git a/integration_tests/starrocks-sink/upsert/create_mv.sql b/integration_tests/starrocks-sink/upsert/create_mv.sql new file mode 100644 index 0000000000000..0a803f8a2762d --- /dev/null +++ b/integration_tests/starrocks-sink/upsert/create_mv.sql @@ -0,0 +1,7 @@ +CREATE MATERIALIZED VIEW bhv_mv AS +SELECT + user_id, + target_id, + event_timestamp +FROM + user_behaviors; \ No newline at end of file diff --git a/integration_tests/starrocks-sink/upsert/create_sink.sql b/integration_tests/starrocks-sink/upsert/create_sink.sql new file mode 100644 index 0000000000000..d7557bc1bd4fc --- /dev/null +++ b/integration_tests/starrocks-sink/upsert/create_sink.sql @@ -0,0 +1,14 @@ +CREATE SINK bhv_starrocks_sink +FROM + bhv_mv WITH ( + connector = 'starrocks', + type = 'upsert', + starrocks.host = 'starrocks-fe', + starrocks.mysqlport = '9030', + starrocks.httpport = '8030', + starrocks.user = 'users', + starrocks.password = '123456', + starrocks.database = 'demo', + starrocks.table = 'demo_bhv_table', + primary_key = 'user_id' +); \ No newline at end of file diff --git a/integration_tests/starrocks-sink/upsert/create_table.sql b/integration_tests/starrocks-sink/upsert/create_table.sql new file mode 100644 index 0000000000000..6c98f88a0b510 --- /dev/null +++ b/integration_tests/starrocks-sink/upsert/create_table.sql @@ -0,0 +1,10 @@ +CREATE table user_behaviors ( + user_id int, + target_id VARCHAR, + target_type VARCHAR, + event_timestamp TIMESTAMP, + behavior_type VARCHAR, + parent_target_type VARCHAR, + parent_target_id VARCHAR, + PRIMARY KEY(user_id) +); \ No newline at end of file diff --git a/integration_tests/starrocks-sink/upsert/insert_update_delete.sql b/integration_tests/starrocks-sink/upsert/insert_update_delete.sql new file mode 100644 index 0000000000000..73d5cda442258 --- /dev/null +++ b/integration_tests/starrocks-sink/upsert/insert_update_delete.sql @@ -0,0 +1,8 @@ +INSERT INTO user_behaviors VALUES(1,'1','1','2020-01-01 01:01:01','1','1','1'), +(2,'2','2','2020-01-01 01:01:02','2','2','2'), +(3,'3','3','2020-01-01 01:01:03','3','3','3'), +(4,'4','4','2020-01-01 01:01:04','4','4','4'); + +DELETE FROM user_behaviors WHERE user_id = 2; + +UPDATE user_behaviors SET target_id = 30 WHERE user_id = 3; \ No newline at end of file diff --git a/src/connector/src/sink/encoder/json.rs b/src/connector/src/sink/encoder/json.rs index 6add09b2cb86e..3844c164fbc16 100644 --- a/src/connector/src/sink/encoder/json.rs +++ b/src/connector/src/sink/encoder/json.rs @@ -150,15 +150,16 @@ fn datum_to_json_object( CustomJsonType::Doris(map) => { if !matches!(v, Decimal::Normalized(_)) { return Err(ArrayError::internal( - "doris can't support decimal Inf, -Inf, Nan".to_string(), + "doris/starrocks can't support decimal Inf, -Inf, Nan".to_string(), )); } let (p, s) = map.get(&field.name).unwrap(); v.rescale(*s as u32); let v_string = v.to_text(); - if v_string.len() > *p as usize { + let len = v_string.clone().replace(['.', '-'], "").len(); + if len > *p as usize { return Err(ArrayError::internal( - format!("rw Decimal's precision is large than doris max decimal len is {:?}, doris max is {:?}",v_string.len(),p))); + format!("rw Decimal's precision is large than doris/starrocks max decimal len is {:?}, doris max is {:?}",v_string.len(),p))); } json!(v_string) } diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 74d7f048bdb1d..9fbcf143d724a 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -76,6 +76,7 @@ macro_rules! for_all_sinks { { ElasticSearch, $crate::sink::remote::ElasticSearchSink }, { Cassandra, $crate::sink::remote::CassandraSink }, { Doris, $crate::sink::doris::DorisSink }, + { Starrocks, $crate::sink::starrocks::StarrocksSink }, { Test, $crate::sink::test_sink::TestSink } } $(,$arg)* diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs index c879d24468bc5..8b6c94fe70232 100644 --- a/src/connector/src/sink/starrocks.rs +++ b/src/connector/src/sink/starrocks.rs @@ -318,7 +318,6 @@ impl StarrocksSinkWriter { config.common.table.clone(), header, ); - // let insert = Some(starrocks_insert_builder.build_starrocks().await?); Ok(Self { config, schema: schema.clone(), @@ -361,12 +360,13 @@ impl StarrocksSinkWriter { STARROCKS_DELETE_SIGN.to_string(), Value::String("0".to_string()), ); - let row_json_string = serde_json::to_string(&row_json_value) - .map_err(|e| SinkError::Doris(format!("Json derialize error {:?}", e)))?; + let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| { + SinkError::Starrocks(format!("Json derialize error {:?}", e)) + })?; self.insert .as_mut() .ok_or_else(|| { - SinkError::Doris("Can't find doris sink insert".to_string()) + SinkError::Starrocks("Can't find starrocks sink insert".to_string()) })? .write(row_json_string.into()) .await?; @@ -377,12 +377,13 @@ impl StarrocksSinkWriter { STARROCKS_DELETE_SIGN.to_string(), Value::String("1".to_string()), ); - let row_json_string = serde_json::to_string(&row_json_value) - .map_err(|e| SinkError::Doris(format!("Json derialize error {:?}", e)))?; + let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| { + SinkError::Starrocks(format!("Json derialize error {:?}", e)) + })?; self.insert .as_mut() .ok_or_else(|| { - SinkError::Doris("Can't find doris sink insert".to_string()) + SinkError::Starrocks("Can't find starrocks sink insert".to_string()) })? .write(row_json_string.into()) .await?; @@ -394,12 +395,13 @@ impl StarrocksSinkWriter { STARROCKS_DELETE_SIGN.to_string(), Value::String("0".to_string()), ); - let row_json_string = serde_json::to_string(&row_json_value) - .map_err(|e| SinkError::Doris(format!("Json derialize error {:?}", e)))?; + let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| { + SinkError::Starrocks(format!("Json derialize error {:?}", e)) + })?; self.insert .as_mut() .ok_or_else(|| { - SinkError::Doris("Can't find doris sink insert".to_string()) + SinkError::Starrocks("Can't find starrocks sink insert".to_string()) })? .write(row_json_string.into()) .await?; From ed31578bbff3207078fd490b05212aad156cd4d5 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Mon, 9 Oct 2023 15:09:30 +0800 Subject: [PATCH 7/8] rename name --- src/connector/src/common.rs | 2 +- src/connector/src/sink/doris.rs | 2 +- .../sink/{doris_connector.rs => doris_starrocks_connector.rs} | 0 src/connector/src/sink/mod.rs | 2 +- src/connector/src/sink/starrocks.rs | 2 +- 5 files changed, 4 insertions(+), 4 deletions(-) rename src/connector/src/sink/{doris_connector.rs => doris_starrocks_connector.rs} (100%) diff --git a/src/connector/src/common.rs b/src/connector/src/common.rs index c92f209c0fb60..ac8f60182ec10 100644 --- a/src/connector/src/common.rs +++ b/src/connector/src/common.rs @@ -37,7 +37,7 @@ use url::Url; use crate::aws_auth::AwsAuthProps; use crate::aws_utils::load_file_descriptor_from_s3; use crate::deserialize_duration_from_string; -use crate::sink::doris_connector::DorisGet; +use crate::sink::doris_starrocks_connector::DorisGet; use crate::sink::SinkError; use crate::source::nats::source::NatsOffset; // The file describes the common abstractions for each connector and can be used in both source and diff --git a/src/connector/src/sink/doris.rs b/src/connector/src/sink/doris.rs index 075d0474f4c1b..cb73cf6cbe9d7 100644 --- a/src/connector/src/sink/doris.rs +++ b/src/connector/src/sink/doris.rs @@ -25,7 +25,7 @@ use serde::Deserialize; use serde_json::Value; use serde_with::serde_as; -use super::doris_connector::{ +use super::doris_starrocks_connector::{ DorisField, DorisInsert, HeaderBuilder, InserterBuilder, DORIS_DELETE_SIGN, }; use super::{SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; diff --git a/src/connector/src/sink/doris_connector.rs b/src/connector/src/sink/doris_starrocks_connector.rs similarity index 100% rename from src/connector/src/sink/doris_connector.rs rename to src/connector/src/sink/doris_starrocks_connector.rs diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 9fbcf143d724a..2e003621196e8 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -18,7 +18,7 @@ pub mod catalog; pub mod clickhouse; pub mod coordinate; pub mod doris; -pub mod doris_connector; +pub mod doris_starrocks_connector; pub mod encoder; pub mod formatter; pub mod iceberg; diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs index 8b6c94fe70232..1748a2a863834 100644 --- a/src/connector/src/sink/starrocks.rs +++ b/src/connector/src/sink/starrocks.rs @@ -26,7 +26,7 @@ use serde::Deserialize; use serde_json::Value; use serde_with::serde_as; -use super::doris_connector::{ +use super::doris_starrocks_connector::{ HeaderBuilder, InserterBuilder, StarrocksInsert, StarrocksMysqlQuery, STARROCKS_DELETE_SIGN, }; use super::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode}; From 7f00ebcc339d42cbfa3cc011f2e66756b2d364eb Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Thu, 19 Oct 2023 12:24:00 +0800 Subject: [PATCH 8/8] fix --- src/connector/src/common.rs | 6 +- src/connector/src/sink/doris.rs | 218 ++++++++- .../src/sink/doris_starrocks_connector.rs | 421 ++---------------- src/connector/src/sink/starrocks.rs | 177 +++++++- 4 files changed, 404 insertions(+), 418 deletions(-) diff --git a/src/connector/src/common.rs b/src/connector/src/common.rs index ac8f60182ec10..b7b61aefa8a91 100644 --- a/src/connector/src/common.rs +++ b/src/connector/src/common.rs @@ -37,7 +37,7 @@ use url::Url; use crate::aws_auth::AwsAuthProps; use crate::aws_utils::load_file_descriptor_from_s3; use crate::deserialize_duration_from_string; -use crate::sink::doris_starrocks_connector::DorisGet; +use crate::sink::doris::DorisSchemaClient; use crate::sink::SinkError; use crate::source::nats::source::NatsOffset; // The file describes the common abstractions for each connector and can be used in both source and @@ -455,8 +455,8 @@ pub struct DorisCommon { } impl DorisCommon { - pub(crate) fn build_get_client(&self) -> DorisGet { - DorisGet::new( + pub(crate) fn build_get_client(&self) -> DorisSchemaClient { + DorisSchemaClient::new( self.url.clone(), self.table.clone(), self.database.clone(), diff --git a/src/connector/src/sink/doris.rs b/src/connector/src/sink/doris.rs index cb73cf6cbe9d7..d104eca87a86f 100644 --- a/src/connector/src/sink/doris.rs +++ b/src/connector/src/sink/doris.rs @@ -17,24 +17,30 @@ use std::sync::Arc; use anyhow::anyhow; use async_trait::async_trait; +use base64::engine::general_purpose; +use base64::Engine; +use bytes::{BufMut, Bytes, BytesMut}; +use hyper::body::Body; +use hyper::{body, Client, Request}; +use hyper_tls::HttpsConnector; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; use risingwave_common::types::DataType; use serde::Deserialize; +use serde_derive::Serialize; use serde_json::Value; use serde_with::serde_as; use super::doris_starrocks_connector::{ - DorisField, DorisInsert, HeaderBuilder, InserterBuilder, DORIS_DELETE_SIGN, + HeaderBuilder, InserterInner, InserterInnerBuilder, DORIS_DELETE_SIGN, DORIS_SUCCESS_STATUS, + POOL_IDLE_TIMEOUT, }; -use super::{SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; +use super::{Result, SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; use crate::common::DorisCommon; use crate::sink::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode}; use crate::sink::writer::{LogSinkerOf, SinkWriterExt}; -use crate::sink::{ - DummySinkCommitCoordinator, Result, Sink, SinkParam, SinkWriter, SinkWriterParam, -}; +use crate::sink::{DummySinkCommitCoordinator, Sink, SinkParam, SinkWriter, SinkWriterParam}; pub const DORIS_SINK: &str = "doris"; #[serde_as] @@ -196,9 +202,9 @@ pub struct DorisSinkWriter { pub config: DorisConfig, schema: Schema, pk_indices: Vec, - builder: InserterBuilder, + inseter_inner_builder: InserterInnerBuilder, is_append_only: bool, - insert: Option, + client: Option, row_encoder: JsonEncoder, } @@ -247,20 +253,19 @@ impl DorisSinkWriter { header_builder.build() }; - let mut doris_insert_builder = InserterBuilder::new( + let doris_insert_builder = InserterInnerBuilder::new( config.common.url.clone(), config.common.database.clone(), config.common.table.clone(), header, ); - let insert = Some(doris_insert_builder.build_doris().await?); Ok(Self { config, schema: schema.clone(), pk_indices, - builder: doris_insert_builder, + inseter_inner_builder: doris_insert_builder, is_append_only, - insert, + client: None, row_encoder: JsonEncoder::new_with_doris( schema, None, @@ -276,7 +281,7 @@ impl DorisSinkWriter { continue; } let row_json_string = Value::Object(self.row_encoder.encode(row)?).to_string(); - self.insert + self.client .as_mut() .ok_or_else(|| SinkError::Doris("Can't find doris sink insert".to_string()))? .write(row_json_string.into()) @@ -296,7 +301,7 @@ impl DorisSinkWriter { ); let row_json_string = serde_json::to_string(&row_json_value) .map_err(|e| SinkError::Doris(format!("Json derialize error {:?}", e)))?; - self.insert + self.client .as_mut() .ok_or_else(|| { SinkError::Doris("Can't find doris sink insert".to_string()) @@ -312,7 +317,7 @@ impl DorisSinkWriter { ); let row_json_string = serde_json::to_string(&row_json_value) .map_err(|e| SinkError::Doris(format!("Json derialize error {:?}", e)))?; - self.insert + self.client .as_mut() .ok_or_else(|| { SinkError::Doris("Can't find doris sink insert".to_string()) @@ -329,7 +334,7 @@ impl DorisSinkWriter { ); let row_json_string = serde_json::to_string(&row_json_value) .map_err(|e| SinkError::Doris(format!("Json derialize error {:?}", e)))?; - self.insert + self.client .as_mut() .ok_or_else(|| { SinkError::Doris("Can't find doris sink insert".to_string()) @@ -346,8 +351,8 @@ impl DorisSinkWriter { #[async_trait] impl SinkWriter for DorisSinkWriter { async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { - if self.insert.is_none() { - self.insert = Some(self.builder.build_doris().await?); + if self.client.is_none() { + self.client = Some(DorisClient::new(self.inseter_inner_builder.build().await?)); } if self.is_append_only { self.append_only(chunk).await @@ -365,12 +370,12 @@ impl SinkWriter for DorisSinkWriter { } async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> { - if self.insert.is_some() { - let insert = self - .insert + if self.client.is_some() { + let client = self + .client .take() .ok_or_else(|| SinkError::Doris("Can't find doris inserter".to_string()))?; - insert.finish().await?; + client.finish().await?; } Ok(()) } @@ -379,3 +384,174 @@ impl SinkWriter for DorisSinkWriter { Ok(()) } } + +pub struct DorisSchemaClient { + url: String, + table: String, + db: String, + user: String, + password: String, +} +impl DorisSchemaClient { + pub fn new(url: String, table: String, db: String, user: String, password: String) -> Self { + Self { + url, + table, + db, + user, + password, + } + } + + pub async fn get_schema_from_doris(&self) -> Result { + let uri = format!("{}/api/{}/{}/_schema", self.url, self.db, self.table); + let builder = Request::get(uri); + + let connector = HttpsConnector::new(); + let client = Client::builder() + .pool_idle_timeout(POOL_IDLE_TIMEOUT) + .build(connector); + + let request = builder + .header( + "Authorization", + format!( + "Basic {}", + general_purpose::STANDARD.encode(format!("{}:{}", self.user, self.password)) + ), + ) + .body(Body::empty()) + .map_err(|err| SinkError::Http(err.into()))?; + + let response = client + .request(request) + .await + .map_err(|err| SinkError::Http(err.into()))?; + + let raw_bytes = String::from_utf8(match body::to_bytes(response.into_body()).await { + Ok(bytes) => bytes.to_vec(), + Err(err) => return Err(SinkError::Http(err.into())), + }) + .map_err(|err| SinkError::Http(err.into()))?; + + let json_map: HashMap = + serde_json::from_str(&raw_bytes).map_err(|err| SinkError::Http(err.into()))?; + let json_data = if json_map.contains_key("code") && json_map.contains_key("msg") { + let data = json_map + .get("data") + .ok_or_else(|| SinkError::Http(anyhow::anyhow!("Can't find data")))?; + data.to_string() + } else { + raw_bytes + }; + let schema: DorisSchema = serde_json::from_str(&json_data).map_err(|err| { + SinkError::Http(anyhow::anyhow!("Can't get schema from json {:?}", err)) + })?; + Ok(schema) + } +} +#[derive(Debug, Serialize, Deserialize)] +pub struct DorisSchema { + status: i32, + #[serde(rename = "keysType")] + pub keys_type: String, + pub properties: Vec, +} +#[derive(Debug, Serialize, Deserialize)] +pub struct DorisField { + pub name: String, + pub r#type: String, + comment: String, + pub precision: Option, + pub scale: Option, + aggregation_type: String, +} +impl DorisField { + pub fn get_decimal_pre_scale(&self) -> Option<(u8, u8)> { + if self.r#type.contains("DECIMAL") { + let a = self.precision.clone().unwrap().parse::().unwrap(); + let b = self.scale.clone().unwrap().parse::().unwrap(); + Some((a, b)) + } else { + None + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct DorisInsertResultResponse { + #[serde(rename = "TxnId")] + txn_id: i64, + #[serde(rename = "Label")] + label: String, + #[serde(rename = "Status")] + status: String, + #[serde(rename = "TwoPhaseCommit")] + two_phase_commit: String, + #[serde(rename = "Message")] + message: String, + #[serde(rename = "NumberTotalRows")] + number_total_rows: i64, + #[serde(rename = "NumberLoadedRows")] + number_loaded_rows: i64, + #[serde(rename = "NumberFilteredRows")] + number_filtered_rows: i32, + #[serde(rename = "NumberUnselectedRows")] + number_unselected_rows: i32, + #[serde(rename = "LoadBytes")] + load_bytes: i64, + #[serde(rename = "LoadTimeMs")] + load_time_ms: i32, + #[serde(rename = "BeginTxnTimeMs")] + begin_txn_time_ms: i32, + #[serde(rename = "StreamLoadPutTimeMs")] + stream_load_put_time_ms: i32, + #[serde(rename = "ReadDataTimeMs")] + read_data_time_ms: i32, + #[serde(rename = "WriteDataTimeMs")] + write_data_time_ms: i32, + #[serde(rename = "CommitAndPublishTimeMs")] + commit_and_publish_time_ms: i32, + #[serde(rename = "ErrorURL")] + err_url: Option, +} + +pub struct DorisClient { + insert: InserterInner, + is_first_record: bool, +} +impl DorisClient { + pub fn new(insert: InserterInner) -> Self { + Self { + insert, + is_first_record: true, + } + } + + pub async fn write(&mut self, data: Bytes) -> Result<()> { + let mut data_build = BytesMut::new(); + if self.is_first_record { + self.is_first_record = false; + } else { + data_build.put_slice("\n".as_bytes()); + } + data_build.put_slice(&data); + self.insert.write(data_build.into()).await?; + Ok(()) + } + + pub async fn finish(self) -> Result { + let raw = self.insert.finish().await?; + let res: DorisInsertResultResponse = + serde_json::from_slice(&raw).map_err(|err| SinkError::Http(err.into()))?; + + if !DORIS_SUCCESS_STATUS.contains(&res.status.as_str()) { + return Err(SinkError::Http(anyhow::anyhow!( + "Insert error: {:?}, error url: {:?}", + res.message, + res.err_url + ))); + }; + Ok(res) + } +} diff --git a/src/connector/src/sink/doris_starrocks_connector.rs b/src/connector/src/sink/doris_starrocks_connector.rs index 0a31a87addfdb..e4a127d6b048d 100644 --- a/src/connector/src/sink/doris_starrocks_connector.rs +++ b/src/connector/src/sink/doris_starrocks_connector.rs @@ -24,25 +24,19 @@ use hyper::body::{Body, Sender}; use hyper::client::HttpConnector; use hyper::{body, Client, Request, StatusCode}; use hyper_tls::HttpsConnector; -use mysql_async::prelude::Queryable; -use mysql_async::Opts; -use serde::{Deserialize, Serialize}; -use serde_json::Value; use tokio::task::JoinHandle; use super::{Result, SinkError}; const BUFFER_SIZE: usize = 64 * 1024; const MIN_CHUNK_SIZE: usize = BUFFER_SIZE - 1024; -const DORIS_SUCCESS_STATUS: [&str; 2] = ["Success", "Publish Timeout"]; +pub(crate) const DORIS_SUCCESS_STATUS: [&str; 2] = ["Success", "Publish Timeout"]; pub(crate) const DORIS_DELETE_SIGN: &str = "__DORIS_DELETE_SIGN__"; pub(crate) const STARROCKS_DELETE_SIGN: &str = "__op"; -const STARROCK_MYSQL_PREFER_SOCKET: &str = "false"; -const STARROCK_MYSQL_MAX_ALLOWED_PACKET: usize = 1024; -const STARROCK_MYSQL_WAIT_TIMEOUT: usize = 31536000; + const SEND_CHUNK_TIMEOUT: Duration = Duration::from_secs(10); const WAIT_HANDDLE_TIMEOUT: Duration = Duration::from_secs(10); -const POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(30); +pub(crate) const POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(30); const DORIS: &str = "doris"; const STARROCKS: &str = "starrocks"; pub struct HeaderBuilder { @@ -151,12 +145,12 @@ impl HeaderBuilder { } } -pub struct InserterBuilder { +pub struct InserterInnerBuilder { url: String, header: HashMap, sender: Option, } -impl InserterBuilder { +impl InserterInnerBuilder { pub fn new(url: String, db: String, table: String, header: HashMap) -> Self { let url = format!("{}/api/{}/{}/_stream_load", url, db, table); @@ -184,10 +178,7 @@ impl InserterBuilder { (builder, client) } - async fn build_http_send_feature( - &self, - send_type: &'static str, - ) -> Result<(JoinHandle>, Sender)> { + pub async fn build(&self) -> Result { let (builder, client) = self.build_request_and_client(self.url.clone()); let request_get_url = builder .body(Body::empty()) @@ -220,142 +211,59 @@ impl InserterBuilder { .map_err(|err| SinkError::Http(err.into()))?; let feature = client.request(request); - let handle: JoinHandle> = tokio::spawn(async move { + let handle: JoinHandle>> = tokio::spawn(async move { let response = feature.await.map_err(|err| SinkError::Http(err.into()))?; let status = response.status(); - let raw_string = String::from_utf8( - body::to_bytes(response.into_body()) - .await - .map_err(|err| SinkError::Http(err.into()))? - .to_vec(), - ) - .map_err(|err| SinkError::Http(err.into()))?; - - if status == StatusCode::OK && !raw_string.is_empty() { - let response = if send_type.eq(DORIS) { - let response: DorisInsertResultResponse = serde_json::from_str(&raw_string) - .map_err(|err| SinkError::Http(err.into()))?; - InsertResultResponse::Doris(response) - } else if send_type.eq(STARROCKS) { - let response: StarrocksInsertResultResponse = serde_json::from_str(&raw_string) - .map_err(|err| SinkError::Http(err.into()))?; - InsertResultResponse::Starrocks(response) - } else { - return Err(SinkError::Http(anyhow::anyhow!( - "Can't convert {:?}'s http response to struct", - send_type - ))); - }; - Ok(response) + let raw = body::to_bytes(response.into_body()) + .await + .map_err(|err| SinkError::Http(err.into()))? + .to_vec(); + if status == StatusCode::OK && !raw.is_empty() { + Ok(raw) + + // let raw_string = String::from_utf8( + // body::to_bytes(response.into_body()) + // .await + // .map_err(|err| SinkError::Http(err.into()))? + // .to_vec(), + // ) + // .map_err(|err| SinkError::Http(err.into()))?; + + // if status == StatusCode::OK && !raw_string.is_empty() { + // let response = if send_type.eq(DORIS) { + // let response: DorisInsertResultResponse = serde_json::from_str(&raw_string) + // .map_err(|err| SinkError::Http(err.into()))?; + // InsertResultResponse::Doris(response) + // } else if send_type.eq(STARROCKS) { + // let response: StarrocksInsertResultResponse = serde_json::from_str(&raw_string) + // .map_err(|err| SinkError::Http(err.into()))?; + // InsertResultResponse::Starrocks(response) + // } else { + // return Err(SinkError::Http(anyhow::anyhow!( + // "Can't convert {:?}'s http response to struct", + // send_type + // ))); + // }; + // Ok(response) } else { Err(SinkError::Http(anyhow::anyhow!( "Failed connection {:?},{:?}", status, - raw_string + String::from_utf8(raw).map_err(|err| SinkError::Http(err.into()))? ))) } }); - Ok((handle, sender)) + Ok(InserterInner::new(sender, handle)) } - - pub async fn build_doris(&mut self) -> Result { - let (handle, sender) = self.build_http_send_feature(DORIS).await?; - - Ok(DorisInsert::new(sender, handle)) - } - - pub async fn build_starrocks(&mut self) -> Result { - let (handle, sender) = self.build_http_send_feature(STARROCKS).await?; - - Ok(StarrocksInsert::new(sender, handle)) - } -} - -pub struct DorisInsert { - insert: InserterInner, - is_first_record: bool, } -impl DorisInsert { - pub fn new(sender: Sender, join_handle: JoinHandle>) -> Self { - Self { - insert: InserterInner::new(sender, join_handle), - is_first_record: true, - } - } - pub async fn write(&mut self, data: Bytes) -> Result<()> { - let mut data_build = BytesMut::new(); - if self.is_first_record { - self.is_first_record = false; - } else { - data_build.put_slice("\n".as_bytes()); - } - data_build.put_slice(&data); - self.insert.write(data_build.into()).await?; - Ok(()) - } - - pub async fn finish(self) -> Result { - let res = match self.insert.finish().await? { - InsertResultResponse::Doris(doris_res) => doris_res, - InsertResultResponse::Starrocks(_) => { - return Err(SinkError::Http(anyhow::anyhow!("Response is not doris"))) - } - }; - - if !DORIS_SUCCESS_STATUS.contains(&res.status.as_str()) { - return Err(SinkError::Http(anyhow::anyhow!( - "Insert error: {:?}, error url: {:?}", - res.message, - res.err_url - ))); - }; - Ok(res) - } -} - -pub struct StarrocksInsert { - insert: InserterInner, -} -impl StarrocksInsert { - pub fn new(sender: Sender, join_handle: JoinHandle>) -> Self { - Self { - insert: InserterInner::new(sender, join_handle), - } - } - - pub async fn write(&mut self, data: Bytes) -> Result<()> { - self.insert.write(data).await?; - Ok(()) - } - - pub async fn finish(self) -> Result { - let res = match self.insert.finish().await? { - InsertResultResponse::Doris(_) => { - return Err(SinkError::Http(anyhow::anyhow!( - "Response is not starrocks" - ))) - } - InsertResultResponse::Starrocks(res) => res, - }; - - if !DORIS_SUCCESS_STATUS.contains(&res.status.as_str()) { - return Err(SinkError::Http(anyhow::anyhow!( - "Insert error: {:?}", - res.message, - ))); - }; - Ok(res) - } -} - -struct InserterInner { +pub struct InserterInner { sender: Option, - join_handle: Option>>, + join_handle: Option>>>, buffer: BytesMut, } impl InserterInner { - pub fn new(sender: Sender, join_handle: JoinHandle>) -> Self { + pub fn new(sender: Sender, join_handle: JoinHandle>>) -> Self { Self { sender: Some(sender), join_handle: Some(join_handle), @@ -407,7 +315,7 @@ impl InserterInner { Ok(()) } - async fn wait_handle(&mut self) -> Result { + async fn wait_handle(&mut self) -> Result> { let res = match tokio::time::timeout(WAIT_HANDDLE_TIMEOUT, self.join_handle.as_mut().unwrap()) .await @@ -418,7 +326,7 @@ impl InserterInner { Ok(res) } - pub async fn finish(mut self) -> Result { + pub async fn finish(mut self) -> Result> { if !self.buffer.is_empty() { self.send_chunk().await?; } @@ -426,242 +334,3 @@ impl InserterInner { self.wait_handle().await } } - -pub struct DorisGet { - url: String, - table: String, - db: String, - user: String, - password: String, -} -impl DorisGet { - pub fn new(url: String, table: String, db: String, user: String, password: String) -> Self { - Self { - url, - table, - db, - user, - password, - } - } - - pub async fn get_schema_from_doris(&self) -> Result { - let uri = format!("{}/api/{}/{}/_schema", self.url, self.db, self.table); - let builder = Request::get(uri); - - let connector = HttpsConnector::new(); - let client = Client::builder() - .pool_idle_timeout(POOL_IDLE_TIMEOUT) - .build(connector); - - let request = builder - .header( - "Authorization", - format!( - "Basic {}", - general_purpose::STANDARD.encode(format!("{}:{}", self.user, self.password)) - ), - ) - .body(Body::empty()) - .map_err(|err| SinkError::Http(err.into()))?; - - let response = client - .request(request) - .await - .map_err(|err| SinkError::Http(err.into()))?; - - let raw_bytes = String::from_utf8(match body::to_bytes(response.into_body()).await { - Ok(bytes) => bytes.to_vec(), - Err(err) => return Err(SinkError::Http(err.into())), - }) - .map_err(|err| SinkError::Http(err.into()))?; - - let json_map: HashMap = - serde_json::from_str(&raw_bytes).map_err(|err| SinkError::Http(err.into()))?; - let json_data = if json_map.contains_key("code") && json_map.contains_key("msg") { - let data = json_map - .get("data") - .ok_or_else(|| SinkError::Http(anyhow::anyhow!("Can't find data")))?; - data.to_string() - } else { - raw_bytes - }; - let schema: DorisSchema = serde_json::from_str(&json_data).map_err(|err| { - SinkError::Http(anyhow::anyhow!("Can't get schema from json {:?}", err)) - })?; - Ok(schema) - } -} -#[derive(Debug, Serialize, Deserialize)] -pub struct DorisSchema { - status: i32, - #[serde(rename = "keysType")] - pub keys_type: String, - pub properties: Vec, -} -#[derive(Debug, Serialize, Deserialize)] -pub struct DorisField { - pub name: String, - pub r#type: String, - comment: String, - pub precision: Option, - pub scale: Option, - aggregation_type: String, -} -impl DorisField { - pub fn get_decimal_pre_scale(&self) -> Option<(u8, u8)> { - if self.r#type.contains("DECIMAL") { - let a = self.precision.clone().unwrap().parse::().unwrap(); - let b = self.scale.clone().unwrap().parse::().unwrap(); - Some((a, b)) - } else { - None - } - } -} - -pub enum InsertResultResponse { - Doris(DorisInsertResultResponse), - Starrocks(StarrocksInsertResultResponse), -} -#[derive(Debug, Serialize, Deserialize)] -pub struct DorisInsertResultResponse { - #[serde(rename = "TxnId")] - txn_id: i64, - #[serde(rename = "Label")] - label: String, - #[serde(rename = "Status")] - status: String, - #[serde(rename = "TwoPhaseCommit")] - two_phase_commit: String, - #[serde(rename = "Message")] - message: String, - #[serde(rename = "NumberTotalRows")] - number_total_rows: i64, - #[serde(rename = "NumberLoadedRows")] - number_loaded_rows: i64, - #[serde(rename = "NumberFilteredRows")] - number_filtered_rows: i32, - #[serde(rename = "NumberUnselectedRows")] - number_unselected_rows: i32, - #[serde(rename = "LoadBytes")] - load_bytes: i64, - #[serde(rename = "LoadTimeMs")] - load_time_ms: i32, - #[serde(rename = "BeginTxnTimeMs")] - begin_txn_time_ms: i32, - #[serde(rename = "StreamLoadPutTimeMs")] - stream_load_put_time_ms: i32, - #[serde(rename = "ReadDataTimeMs")] - read_data_time_ms: i32, - #[serde(rename = "WriteDataTimeMs")] - write_data_time_ms: i32, - #[serde(rename = "CommitAndPublishTimeMs")] - commit_and_publish_time_ms: i32, - #[serde(rename = "ErrorURL")] - err_url: Option, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct StarrocksInsertResultResponse { - #[serde(rename = "TxnId")] - txn_id: i64, - #[serde(rename = "Label")] - label: String, - #[serde(rename = "Status")] - status: String, - #[serde(rename = "Message")] - message: String, - #[serde(rename = "NumberTotalRows")] - number_total_rows: i64, - #[serde(rename = "NumberLoadedRows")] - number_loaded_rows: i64, - #[serde(rename = "NumberFilteredRows")] - number_filtered_rows: i32, - #[serde(rename = "NumberUnselectedRows")] - number_unselected_rows: i32, - #[serde(rename = "LoadBytes")] - load_bytes: i64, - #[serde(rename = "LoadTimeMs")] - load_time_ms: i32, - #[serde(rename = "BeginTxnTimeMs")] - begin_txn_time_ms: i32, - #[serde(rename = "ReadDataTimeMs")] - read_data_time_ms: i32, - #[serde(rename = "WriteDataTimeMs")] - write_data_time_ms: i32, - #[serde(rename = "CommitAndPublishTimeMs")] - commit_and_publish_time_ms: i32, - #[serde(rename = "StreamLoadPlanTimeMs")] - stream_load_plan_time_ms: Option, -} - -pub struct StarrocksMysqlQuery { - table: String, - db: String, - conn: mysql_async::Conn, -} - -impl StarrocksMysqlQuery { - pub async fn new( - host: String, - port: String, - table: String, - db: String, - user: String, - password: String, - ) -> Result { - let conn_uri = format!( - "mysql://{}:{}@{}:{}/{}?prefer_socket={}&max_allowed_packet={}&wait_timeout={}", - user, - password, - host, - port, - db, - STARROCK_MYSQL_PREFER_SOCKET, - STARROCK_MYSQL_MAX_ALLOWED_PACKET, - STARROCK_MYSQL_WAIT_TIMEOUT - ); - let pool = mysql_async::Pool::new( - Opts::from_url(&conn_uri).map_err(|err| SinkError::Http(err.into()))?, - ); - let conn = pool - .get_conn() - .await - .map_err(|err| SinkError::Http(err.into()))?; - - Ok(Self { table, db, conn }) - } - - pub async fn get_columns_from_starrocks(&mut self) -> Result> { - let query = format!("select column_name, column_type from information_schema.columns where table_name = {:?} and table_schema = {:?};",self.table,self.db); - let mut query_map: HashMap = HashMap::default(); - self.conn - .query_map(query, |(column_name, column_type)| { - query_map.insert(column_name, column_type) - }) - .await - .map_err(|err| SinkError::Http(err.into()))?; - Ok(query_map) - } - - pub async fn get_pk_from_starrocks(&mut self) -> Result<(String, String)> { - let query = format!("select table_model, primary_key from information_schema.tables_config where table_name = {:?} and table_schema = {:?};",self.table,self.db); - let table_mode_pk: (String, String) = self - .conn - .query_map(query, |(table_model, primary_key)| { - (table_model, primary_key) - }) - .await - .map_err(|err| SinkError::Http(err.into()))? - .get(0) - .ok_or_else(|| { - SinkError::Starrocks(format!( - "Can't find schema with table {:?} and database {:?}", - self.table, self.db - )) - })? - .clone(); - Ok(table_mode_pk) - } -} diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs index 1748a2a863834..8959a7aa0398f 100644 --- a/src/connector/src/sink/starrocks.rs +++ b/src/connector/src/sink/starrocks.rs @@ -17,17 +17,21 @@ use std::sync::Arc; use anyhow::anyhow; use async_trait::async_trait; +use bytes::Bytes; use itertools::Itertools; +use mysql_async::prelude::Queryable; +use mysql_async::Opts; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; use risingwave_common::types::DataType; use serde::Deserialize; +use serde_derive::Serialize; use serde_json::Value; use serde_with::serde_as; use super::doris_starrocks_connector::{ - HeaderBuilder, InserterBuilder, StarrocksInsert, StarrocksMysqlQuery, STARROCKS_DELETE_SIGN, + HeaderBuilder, InserterInner, InserterInnerBuilder, DORIS_SUCCESS_STATUS, STARROCKS_DELETE_SIGN, }; use super::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode}; use super::writer::LogSinkerOf; @@ -37,6 +41,9 @@ use crate::sink::writer::SinkWriterExt; use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam}; pub const STARROCKS_SINK: &str = "starrocks"; +const STARROCK_MYSQL_PREFER_SOCKET: &str = "false"; +const STARROCK_MYSQL_MAX_ALLOWED_PACKET: usize = 1024; +const STARROCK_MYSQL_WAIT_TIMEOUT: usize = 28800; #[serde_as] #[derive(Clone, Debug, Deserialize)] pub struct StarrocksConfig { @@ -196,7 +203,7 @@ impl Sink for StarrocksSink { "Primary key not defined for upsert starrocks sink (please define in `primary_key` field)"))); } // check reachability - let mut client = StarrocksMysqlQuery::new( + let mut client = StarrocksSchemaClient::new( self.config.common.host.clone(), self.config.common.mysql_port.clone(), self.config.common.table.clone(), @@ -233,9 +240,9 @@ pub struct StarrocksSinkWriter { pub config: StarrocksConfig, schema: Schema, pk_indices: Vec, - builder: InserterBuilder, + inserter_innet_builder: InserterInnerBuilder, is_append_only: bool, - insert: Option, + client: Option, row_encoder: JsonEncoder, } @@ -262,7 +269,7 @@ impl StarrocksSinkWriter { is_append_only: bool, ) -> Result { let mut decimal_map = HashMap::default(); - let starrocks_columns = StarrocksMysqlQuery::new( + let starrocks_columns = StarrocksSchemaClient::new( config.common.host.clone(), config.common.mysql_port.clone(), config.common.table.clone(), @@ -312,7 +319,7 @@ impl StarrocksSinkWriter { builder.build() }; - let starrocks_insert_builder = InserterBuilder::new( + let starrocks_insert_builder = InserterInnerBuilder::new( format!("http://{}:{}", config.common.host, config.common.http_port), config.common.database.clone(), config.common.table.clone(), @@ -322,9 +329,9 @@ impl StarrocksSinkWriter { config, schema: schema.clone(), pk_indices, - builder: starrocks_insert_builder, + inserter_innet_builder: starrocks_insert_builder, is_append_only, - insert: None, + client: None, row_encoder: JsonEncoder::new_with_doris( schema, None, @@ -340,7 +347,7 @@ impl StarrocksSinkWriter { continue; } let row_json_string = Value::Object(self.row_encoder.encode(row)?).to_string(); - self.insert + self.client .as_mut() .ok_or_else(|| { SinkError::Starrocks("Can't find starrocks sink insert".to_string()) @@ -363,7 +370,7 @@ impl StarrocksSinkWriter { let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| { SinkError::Starrocks(format!("Json derialize error {:?}", e)) })?; - self.insert + self.client .as_mut() .ok_or_else(|| { SinkError::Starrocks("Can't find starrocks sink insert".to_string()) @@ -380,7 +387,7 @@ impl StarrocksSinkWriter { let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| { SinkError::Starrocks(format!("Json derialize error {:?}", e)) })?; - self.insert + self.client .as_mut() .ok_or_else(|| { SinkError::Starrocks("Can't find starrocks sink insert".to_string()) @@ -398,7 +405,7 @@ impl StarrocksSinkWriter { let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| { SinkError::Starrocks(format!("Json derialize error {:?}", e)) })?; - self.insert + self.client .as_mut() .ok_or_else(|| { SinkError::Starrocks("Can't find starrocks sink insert".to_string()) @@ -415,8 +422,10 @@ impl StarrocksSinkWriter { #[async_trait] impl SinkWriter for StarrocksSinkWriter { async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { - if self.insert.is_none() { - self.insert = Some(self.builder.build_starrocks().await?); + if self.client.is_none() { + self.client = Some(StarrocksClient::new( + self.inserter_innet_builder.build().await?, + )); } if self.is_append_only { self.append_only(chunk).await @@ -434,12 +443,12 @@ impl SinkWriter for StarrocksSinkWriter { } async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> { - if self.insert.is_some() { - let insert = self - .insert + if self.client.is_some() { + let client = self + .client .take() .ok_or_else(|| SinkError::Starrocks("Can't find starrocks inserter".to_string()))?; - insert.finish().await?; + client.finish().await?; } Ok(()) } @@ -448,3 +457,135 @@ impl SinkWriter for StarrocksSinkWriter { Ok(()) } } + +pub struct StarrocksSchemaClient { + table: String, + db: String, + conn: mysql_async::Conn, +} + +impl StarrocksSchemaClient { + pub async fn new( + host: String, + port: String, + table: String, + db: String, + user: String, + password: String, + ) -> Result { + let conn_uri = format!( + "mysql://{}:{}@{}:{}/{}?prefer_socket={}&max_allowed_packet={}&wait_timeout={}", + user, + password, + host, + port, + db, + STARROCK_MYSQL_PREFER_SOCKET, + STARROCK_MYSQL_MAX_ALLOWED_PACKET, + STARROCK_MYSQL_WAIT_TIMEOUT + ); + let pool = mysql_async::Pool::new( + Opts::from_url(&conn_uri).map_err(|err| SinkError::Http(err.into()))?, + ); + let conn = pool + .get_conn() + .await + .map_err(|err| SinkError::Http(err.into()))?; + + Ok(Self { table, db, conn }) + } + + pub async fn get_columns_from_starrocks(&mut self) -> Result> { + let query = format!("select column_name, column_type from information_schema.columns where table_name = {:?} and table_schema = {:?};",self.table,self.db); + let mut query_map: HashMap = HashMap::default(); + self.conn + .query_map(query, |(column_name, column_type)| { + query_map.insert(column_name, column_type) + }) + .await + .map_err(|err| SinkError::Http(err.into()))?; + Ok(query_map) + } + + pub async fn get_pk_from_starrocks(&mut self) -> Result<(String, String)> { + let query = format!("select table_model, primary_key from information_schema.tables_config where table_name = {:?} and table_schema = {:?};",self.table,self.db); + let table_mode_pk: (String, String) = self + .conn + .query_map(query, |(table_model, primary_key)| { + (table_model, primary_key) + }) + .await + .map_err(|err| SinkError::Http(err.into()))? + .get(0) + .ok_or_else(|| { + SinkError::Starrocks(format!( + "Can't find schema with table {:?} and database {:?}", + self.table, self.db + )) + })? + .clone(); + Ok(table_mode_pk) + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct StarrocksInsertResultResponse { + #[serde(rename = "TxnId")] + txn_id: i64, + #[serde(rename = "Label")] + label: String, + #[serde(rename = "Status")] + status: String, + #[serde(rename = "Message")] + message: String, + #[serde(rename = "NumberTotalRows")] + number_total_rows: i64, + #[serde(rename = "NumberLoadedRows")] + number_loaded_rows: i64, + #[serde(rename = "NumberFilteredRows")] + number_filtered_rows: i32, + #[serde(rename = "NumberUnselectedRows")] + number_unselected_rows: i32, + #[serde(rename = "LoadBytes")] + load_bytes: i64, + #[serde(rename = "LoadTimeMs")] + load_time_ms: i32, + #[serde(rename = "BeginTxnTimeMs")] + begin_txn_time_ms: i32, + #[serde(rename = "ReadDataTimeMs")] + read_data_time_ms: i32, + #[serde(rename = "WriteDataTimeMs")] + write_data_time_ms: i32, + #[serde(rename = "CommitAndPublishTimeMs")] + commit_and_publish_time_ms: i32, + #[serde(rename = "StreamLoadPlanTimeMs")] + stream_load_plan_time_ms: Option, +} + +pub struct StarrocksClient { + insert: InserterInner, +} +impl StarrocksClient { + pub fn new(insert: InserterInner) -> Self { + Self { insert } + } + + pub async fn write(&mut self, data: Bytes) -> Result<()> { + self.insert.write(data).await?; + Ok(()) + } + + pub async fn finish(self) -> Result { + let raw = self.insert.finish().await?; + let res: StarrocksInsertResultResponse = + serde_json::from_slice(&raw).map_err(|err| SinkError::Http(err.into()))?; + + if !DORIS_SUCCESS_STATUS.contains(&res.status.as_str()) { + return Err(SinkError::Http(anyhow::anyhow!( + "Insert error: {:?}", + res.message, + ))); + }; + Ok(res) + } +}