diff --git a/integration_tests/starrocks-sink/README.md b/integration_tests/starrocks-sink/README.md new file mode 100644 index 0000000000000..f65b4b9406685 --- /dev/null +++ b/integration_tests/starrocks-sink/README.md @@ -0,0 +1,56 @@ +# Demo: Sinking to Starrocks + +In this demo, we want to showcase how RisingWave is able to sink data to Starrocks. + + +1. Launch the cluster: + +```sh +docker-compose up -d +``` + +The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data, a Starrocks fe and be for sink. + +2. Create the Starrocks table via mysql: + +Login to mysql +```sh +docker compose exec starrocks-fe mysql -uroot -P9030 -h127.0.0.1 +``` + +Run the following queries to create database and table. +```sql +CREATE database demo; +use demo; + +CREATE table demo_bhv_table( + user_id int, + target_id text, + event_timestamp datetime +) ENGINE=OLAP +PRIMARY KEY(`user_id`) +DISTRIBUTED BY HASH(`user_id`) properties("replication_num" = "1"); + +CREATE USER 'users'@'%' IDENTIFIED BY '123456'; +GRANT ALL ON *.* TO 'users'@'%'; +``` + +3. Execute the SQL queries in sequence: + +- append-only sql: + - append-only/create_source.sql + - append-only/create_mv.sql + - append-only/create_sink.sql + +- upsert sql: + - upsert/create_table.sql + - upsert/create_mv.sql + - upsert/create_sink.sql + - upsert/insert_update_delete.sql + +We only support `upsert` with starrocks' `PRIMARY KEY` + +Run the following query +```sql +select user_id, count(*) from demo.demo_bhv_table group by user_id; +``` diff --git a/integration_tests/starrocks-sink/append-only-sql/create_mv.sql b/integration_tests/starrocks-sink/append-only-sql/create_mv.sql new file mode 100644 index 0000000000000..0a803f8a2762d --- /dev/null +++ b/integration_tests/starrocks-sink/append-only-sql/create_mv.sql @@ -0,0 +1,7 @@ +CREATE MATERIALIZED VIEW bhv_mv AS +SELECT + user_id, + target_id, + event_timestamp +FROM + user_behaviors; \ No newline at end of file diff --git a/integration_tests/starrocks-sink/append-only-sql/create_sink.sql b/integration_tests/starrocks-sink/append-only-sql/create_sink.sql new file mode 100644 index 0000000000000..56d1b227512de --- /dev/null +++ b/integration_tests/starrocks-sink/append-only-sql/create_sink.sql @@ -0,0 +1,14 @@ +CREATE SINK bhv_starrocks_sink +FROM + bhv_mv WITH ( + connector = 'starrocks', + type = 'append-only', + starrocks.host = 'starrocks-fe', + starrocks.mysqlport = '9030', + starrocks.httpport = '8030', + starrocks.user = 'users', + starrocks.password = '123456', + starrocks.database = 'demo', + starrocks.table = 'demo_bhv_table', + force_append_only='true' +); \ No newline at end of file diff --git a/integration_tests/starrocks-sink/append-only-sql/create_source.sql b/integration_tests/starrocks-sink/append-only-sql/create_source.sql new file mode 100644 index 0000000000000..c28c10f3616da --- /dev/null +++ b/integration_tests/starrocks-sink/append-only-sql/create_source.sql @@ -0,0 +1,18 @@ +CREATE table user_behaviors ( + user_id int, + target_id VARCHAR, + target_type VARCHAR, + event_timestamp TIMESTAMP, + behavior_type VARCHAR, + parent_target_type VARCHAR, + parent_target_id VARCHAR, + PRIMARY KEY(user_id) +) WITH ( + connector = 'datagen', + fields.user_id.kind = 'sequence', + fields.user_id.start = '1', + fields.user_id.end = '1000', + fields.user_name.kind = 'random', + fields.user_name.length = '10', + datagen.rows.per.second = '10' +) FORMAT PLAIN ENCODE JSON; \ No newline at end of file diff --git a/integration_tests/starrocks-sink/docker-compose.yml b/integration_tests/starrocks-sink/docker-compose.yml new file mode 100644 index 0000000000000..1933853c16915 --- /dev/null +++ b/integration_tests/starrocks-sink/docker-compose.yml @@ -0,0 +1,78 @@ +--- +version: "3" +services: + starrocks-fe: + image: starrocks/fe-ubuntu:latest + hostname: starrocks-fe + container_name: starrocks-fe + command: + /opt/starrocks/fe/bin/start_fe.sh + ports: + - 8030:8030 + - 9020:9020 + - 9030:9030 + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9030"] + interval: 5s + timeout: 5s + retries: 30 + starrocks-be: + image: starrocks/be-ubuntu:latest + command: + - /bin/bash + - -c + - | + sleep 15s; mysql --connect-timeout 2 -h starrocks-fe -P9030 -uroot -e "alter system add backend \"starrocks-be:9050\";" + /opt/starrocks/be/bin/start_be.sh + ports: + - 8040:8040 + hostname: starrocks-be + container_name: starrocks-be + depends_on: + - starrocks-fe + compactor-0: + extends: + file: ../../docker/docker-compose.yml + service: compactor-0 + compute-node-0: + extends: + file: ../../docker/docker-compose.yml + service: compute-node-0 + etcd-0: + extends: + file: ../../docker/docker-compose.yml + service: etcd-0 + frontend-node-0: + extends: + file: ../../docker/docker-compose.yml + service: frontend-node-0 + grafana-0: + extends: + file: ../../docker/docker-compose.yml + service: grafana-0 + meta-node-0: + extends: + file: ../../docker/docker-compose.yml + service: meta-node-0 + minio-0: + extends: + file: ../../docker/docker-compose.yml + service: minio-0 + prometheus-0: + extends: + file: ../../docker/docker-compose.yml + service: prometheus-0 +volumes: + compute-node-0: + external: false + etcd-0: + external: false + grafana-0: + external: false + minio-0: + external: false + prometheus-0: + external: false + message_queue: + external: false +name: risingwave-compose \ No newline at end of file diff --git a/integration_tests/starrocks-sink/upsert/create_mv.sql b/integration_tests/starrocks-sink/upsert/create_mv.sql new file mode 100644 index 0000000000000..0a803f8a2762d --- /dev/null +++ b/integration_tests/starrocks-sink/upsert/create_mv.sql @@ -0,0 +1,7 @@ +CREATE MATERIALIZED VIEW bhv_mv AS +SELECT + user_id, + target_id, + event_timestamp +FROM + user_behaviors; \ No newline at end of file diff --git a/integration_tests/starrocks-sink/upsert/create_sink.sql b/integration_tests/starrocks-sink/upsert/create_sink.sql new file mode 100644 index 0000000000000..d7557bc1bd4fc --- /dev/null +++ b/integration_tests/starrocks-sink/upsert/create_sink.sql @@ -0,0 +1,14 @@ +CREATE SINK bhv_starrocks_sink +FROM + bhv_mv WITH ( + connector = 'starrocks', + type = 'upsert', + starrocks.host = 'starrocks-fe', + starrocks.mysqlport = '9030', + starrocks.httpport = '8030', + starrocks.user = 'users', + starrocks.password = '123456', + starrocks.database = 'demo', + starrocks.table = 'demo_bhv_table', + primary_key = 'user_id' +); \ No newline at end of file diff --git a/integration_tests/starrocks-sink/upsert/create_table.sql b/integration_tests/starrocks-sink/upsert/create_table.sql new file mode 100644 index 0000000000000..6c98f88a0b510 --- /dev/null +++ b/integration_tests/starrocks-sink/upsert/create_table.sql @@ -0,0 +1,10 @@ +CREATE table user_behaviors ( + user_id int, + target_id VARCHAR, + target_type VARCHAR, + event_timestamp TIMESTAMP, + behavior_type VARCHAR, + parent_target_type VARCHAR, + parent_target_id VARCHAR, + PRIMARY KEY(user_id) +); \ No newline at end of file diff --git a/integration_tests/starrocks-sink/upsert/insert_update_delete.sql b/integration_tests/starrocks-sink/upsert/insert_update_delete.sql new file mode 100644 index 0000000000000..73d5cda442258 --- /dev/null +++ b/integration_tests/starrocks-sink/upsert/insert_update_delete.sql @@ -0,0 +1,8 @@ +INSERT INTO user_behaviors VALUES(1,'1','1','2020-01-01 01:01:01','1','1','1'), +(2,'2','2','2020-01-01 01:01:02','2','2','2'), +(3,'3','3','2020-01-01 01:01:03','3','3','3'), +(4,'4','4','2020-01-01 01:01:04','4','4','4'); + +DELETE FROM user_behaviors WHERE user_id = 2; + +UPDATE user_behaviors SET target_id = 30 WHERE user_id = 3; \ No newline at end of file diff --git a/src/connector/src/common.rs b/src/connector/src/common.rs index cd8d99666c3e5..43f36527b54a4 100644 --- a/src/connector/src/common.rs +++ b/src/connector/src/common.rs @@ -404,7 +404,6 @@ impl KinesisCommon { Ok(KinesisClient::from_conf(builder.build())) } } - #[derive(Debug, Serialize, Deserialize)] pub struct UpsertMessage<'a> { #[serde(borrow)] diff --git a/src/connector/src/sink/doris.rs b/src/connector/src/sink/doris.rs index c8e2ddd23af31..f7f0faba368c4 100644 --- a/src/connector/src/sink/doris.rs +++ b/src/connector/src/sink/doris.rs @@ -17,6 +17,12 @@ use std::sync::Arc; use anyhow::anyhow; use async_trait::async_trait; +use base64::engine::general_purpose; +use base64::Engine; +use bytes::{BufMut, Bytes, BytesMut}; +use hyper::body::Body; +use hyper::{body, Client, Request}; +use hyper_tls::HttpsConnector; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; @@ -27,15 +33,14 @@ use serde_json::Value; use serde_with::serde_as; use with_options::WithOptions; -use super::doris_connector::{ - DorisField, DorisGet, DorisInsert, DorisInsertClient, DORIS_DELETE_SIGN, +use super::doris_starrocks_connector::{ + HeaderBuilder, InserterInner, InserterInnerBuilder, DORIS_DELETE_SIGN, DORIS_SUCCESS_STATUS, + POOL_IDLE_TIMEOUT, }; -use super::{SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; +use super::{Result, SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; use crate::sink::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode}; use crate::sink::writer::{LogSinkerOf, SinkWriterExt}; -use crate::sink::{ - DummySinkCommitCoordinator, Result, Sink, SinkParam, SinkWriter, SinkWriterParam, -}; +use crate::sink::{DummySinkCommitCoordinator, Sink, SinkParam, SinkWriter, SinkWriterParam}; pub const DORIS_SINK: &str = "doris"; @@ -54,8 +59,8 @@ pub struct DorisCommon { } impl DorisCommon { - pub(crate) fn build_get_client(&self) -> DorisGet { - DorisGet::new( + pub(crate) fn build_get_client(&self) -> DorisSchemaClient { + DorisSchemaClient::new( self.url.clone(), self.table.clone(), self.database.clone(), @@ -224,9 +229,9 @@ pub struct DorisSinkWriter { pub config: DorisConfig, schema: Schema, pk_indices: Vec, - client: DorisInsertClient, + inseter_inner_builder: InserterInnerBuilder, is_append_only: bool, - insert: Option, + client: Option, row_encoder: JsonEncoder, } @@ -263,30 +268,31 @@ impl DorisSinkWriter { decimal_map.insert(s.name.clone(), v); } }); - let mut map = HashMap::new(); - map.insert("format".to_string(), "json".to_string()); - map.insert("read_json_by_line".to_string(), "true".to_string()); - let doris_insert_client = DorisInsertClient::new( + + let header_builder = HeaderBuilder::new() + .add_common_header() + .set_user_password(config.common.user.clone(), config.common.password.clone()) + .add_json_format() + .add_read_json_by_line(); + let header = if !is_append_only { + header_builder.add_hidden_column().build() + } else { + header_builder.build() + }; + + let doris_insert_builder = InserterInnerBuilder::new( config.common.url.clone(), config.common.database.clone(), config.common.table.clone(), - ) - .add_common_header() - .set_user_password(config.common.user.clone(), config.common.password.clone()) - .set_properties(map); - let mut doris_insert_client = if !is_append_only { - doris_insert_client.add_hidden_column() - } else { - doris_insert_client - }; - let insert = Some(doris_insert_client.build().await?); + header, + ); Ok(Self { config, schema: schema.clone(), pk_indices, - client: doris_insert_client, + inseter_inner_builder: doris_insert_builder, is_append_only, - insert, + client: None, row_encoder: JsonEncoder::new_with_doris( schema, None, @@ -302,7 +308,7 @@ impl DorisSinkWriter { continue; } let row_json_string = Value::Object(self.row_encoder.encode(row)?).to_string(); - self.insert + self.client .as_mut() .ok_or_else(|| SinkError::Doris("Can't find doris sink insert".to_string()))? .write(row_json_string.into()) @@ -322,7 +328,7 @@ impl DorisSinkWriter { ); let row_json_string = serde_json::to_string(&row_json_value) .map_err(|e| SinkError::Doris(format!("Json derialize error {:?}", e)))?; - self.insert + self.client .as_mut() .ok_or_else(|| { SinkError::Doris("Can't find doris sink insert".to_string()) @@ -338,7 +344,7 @@ impl DorisSinkWriter { ); let row_json_string = serde_json::to_string(&row_json_value) .map_err(|e| SinkError::Doris(format!("Json derialize error {:?}", e)))?; - self.insert + self.client .as_mut() .ok_or_else(|| { SinkError::Doris("Can't find doris sink insert".to_string()) @@ -355,7 +361,7 @@ impl DorisSinkWriter { ); let row_json_string = serde_json::to_string(&row_json_value) .map_err(|e| SinkError::Doris(format!("Json derialize error {:?}", e)))?; - self.insert + self.client .as_mut() .ok_or_else(|| { SinkError::Doris("Can't find doris sink insert".to_string()) @@ -372,8 +378,8 @@ impl DorisSinkWriter { #[async_trait] impl SinkWriter for DorisSinkWriter { async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { - if self.insert.is_none() { - self.insert = Some(self.client.build().await?); + if self.client.is_none() { + self.client = Some(DorisClient::new(self.inseter_inner_builder.build().await?)); } if self.is_append_only { self.append_only(chunk).await @@ -391,12 +397,12 @@ impl SinkWriter for DorisSinkWriter { } async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> { - if self.insert.is_some() { - let insert = self - .insert + if self.client.is_some() { + let client = self + .client .take() .ok_or_else(|| SinkError::Doris("Can't find doris inserter".to_string()))?; - insert.finish().await?; + client.finish().await?; } Ok(()) } @@ -405,3 +411,177 @@ impl SinkWriter for DorisSinkWriter { Ok(()) } } + +pub struct DorisSchemaClient { + url: String, + table: String, + db: String, + user: String, + password: String, +} +impl DorisSchemaClient { + pub fn new(url: String, table: String, db: String, user: String, password: String) -> Self { + Self { + url, + table, + db, + user, + password, + } + } + + pub async fn get_schema_from_doris(&self) -> Result { + let uri = format!("{}/api/{}/{}/_schema", self.url, self.db, self.table); + let builder = Request::get(uri); + + let connector = HttpsConnector::new(); + let client = Client::builder() + .pool_idle_timeout(POOL_IDLE_TIMEOUT) + .build(connector); + + let request = builder + .header( + "Authorization", + format!( + "Basic {}", + general_purpose::STANDARD.encode(format!("{}:{}", self.user, self.password)) + ), + ) + .body(Body::empty()) + .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + + let response = client + .request(request) + .await + .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + + let raw_bytes = String::from_utf8(match body::to_bytes(response.into_body()).await { + Ok(bytes) => bytes.to_vec(), + Err(err) => return Err(SinkError::DorisStarrocksConnect(err.into())), + }) + .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + + let json_map: HashMap = serde_json::from_str(&raw_bytes) + .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + let json_data = if json_map.contains_key("code") && json_map.contains_key("msg") { + let data = json_map.get("data").ok_or_else(|| { + SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't find data")) + })?; + data.to_string() + } else { + raw_bytes + }; + let schema: DorisSchema = serde_json::from_str(&json_data).map_err(|err| { + SinkError::DorisStarrocksConnect(anyhow::anyhow!( + "Can't get schema from json {:?}", + err + )) + })?; + Ok(schema) + } +} +#[derive(Debug, Serialize, Deserialize)] +pub struct DorisSchema { + status: i32, + #[serde(rename = "keysType")] + pub keys_type: String, + pub properties: Vec, +} +#[derive(Debug, Serialize, Deserialize)] +pub struct DorisField { + pub name: String, + pub r#type: String, + comment: String, + pub precision: Option, + pub scale: Option, + aggregation_type: String, +} +impl DorisField { + pub fn get_decimal_pre_scale(&self) -> Option<(u8, u8)> { + if self.r#type.contains("DECIMAL") { + let a = self.precision.clone().unwrap().parse::().unwrap(); + let b = self.scale.clone().unwrap().parse::().unwrap(); + Some((a, b)) + } else { + None + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct DorisInsertResultResponse { + #[serde(rename = "TxnId")] + txn_id: i64, + #[serde(rename = "Label")] + label: String, + #[serde(rename = "Status")] + status: String, + #[serde(rename = "TwoPhaseCommit")] + two_phase_commit: String, + #[serde(rename = "Message")] + message: String, + #[serde(rename = "NumberTotalRows")] + number_total_rows: i64, + #[serde(rename = "NumberLoadedRows")] + number_loaded_rows: i64, + #[serde(rename = "NumberFilteredRows")] + number_filtered_rows: i32, + #[serde(rename = "NumberUnselectedRows")] + number_unselected_rows: i32, + #[serde(rename = "LoadBytes")] + load_bytes: i64, + #[serde(rename = "LoadTimeMs")] + load_time_ms: i32, + #[serde(rename = "BeginTxnTimeMs")] + begin_txn_time_ms: i32, + #[serde(rename = "StreamLoadPutTimeMs")] + stream_load_put_time_ms: i32, + #[serde(rename = "ReadDataTimeMs")] + read_data_time_ms: i32, + #[serde(rename = "WriteDataTimeMs")] + write_data_time_ms: i32, + #[serde(rename = "CommitAndPublishTimeMs")] + commit_and_publish_time_ms: i32, + #[serde(rename = "ErrorURL")] + err_url: Option, +} + +pub struct DorisClient { + insert: InserterInner, + is_first_record: bool, +} +impl DorisClient { + pub fn new(insert: InserterInner) -> Self { + Self { + insert, + is_first_record: true, + } + } + + pub async fn write(&mut self, data: Bytes) -> Result<()> { + let mut data_build = BytesMut::new(); + if self.is_first_record { + self.is_first_record = false; + } else { + data_build.put_slice("\n".as_bytes()); + } + data_build.put_slice(&data); + self.insert.write(data_build.into()).await?; + Ok(()) + } + + pub async fn finish(self) -> Result { + let raw = self.insert.finish().await?; + let res: DorisInsertResultResponse = serde_json::from_slice(&raw) + .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + + if !DORIS_SUCCESS_STATUS.contains(&res.status.as_str()) { + return Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!( + "Insert error: {:?}, error url: {:?}", + res.message, + res.err_url + ))); + }; + Ok(res) + } +} diff --git a/src/connector/src/sink/doris_connector.rs b/src/connector/src/sink/doris_connector.rs deleted file mode 100644 index 116cd91d86542..0000000000000 --- a/src/connector/src/sink/doris_connector.rs +++ /dev/null @@ -1,423 +0,0 @@ -// Copyright 2023 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use core::mem; -use core::time::Duration; -use std::collections::HashMap; - -use base64::engine::general_purpose; -use base64::Engine; -use bytes::{BufMut, Bytes, BytesMut}; -use http::request::Builder; -use hyper::body::{Body, Sender}; -use hyper::client::HttpConnector; -use hyper::{body, Client, Request, StatusCode}; -use hyper_tls::HttpsConnector; -use serde::{Deserialize, Serialize}; -use serde_json::Value; -use tokio::task::JoinHandle; - -use super::{Result, SinkError}; - -const BUFFER_SIZE: usize = 64 * 1024; -const MIN_CHUNK_SIZE: usize = BUFFER_SIZE - 1024; -const DORIS_SUCCESS_STATUS: [&str; 2] = ["Success", "Publish Timeout"]; -pub(crate) const DORIS_DELETE_SIGN: &str = "__DORIS_DELETE_SIGN__"; -const SEND_CHUNK_TIMEOUT: Duration = Duration::from_secs(10); -const WAIT_HANDDLE_TIMEOUT: Duration = Duration::from_secs(10); -const POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(30); -pub struct DorisInsertClient { - url: String, - header: HashMap, - sender: Option, -} -impl DorisInsertClient { - pub fn new(url: String, db: String, table: String) -> Self { - let url = format!("{}/api/{}/{}/_stream_load", url, db, table); - Self { - url, - header: HashMap::default(), - sender: None, - } - } - - pub fn set_url(mut self, url: String) -> Self { - self.url = url; - self - } - - pub fn add_common_header(mut self) -> Self { - self.header - .insert("expect".to_string(), "100-continue".to_string()); - self - } - - /// The method is temporarily not in use, reserved for later use in 2PC. - /// Doris will generate a default, non-repeating label. - pub fn set_label(mut self, label: String) -> Self { - self.header.insert("label".to_string(), label); - self - } - - /// This method is only called during upsert operations. - pub fn add_hidden_column(mut self) -> Self { - self.header - .insert("hidden_columns".to_string(), DORIS_DELETE_SIGN.to_string()); - self - } - - /// The method is temporarily not in use, reserved for later use in 2PC. - pub fn enable_2_pc(mut self) -> Self { - self.header - .insert("two_phase_commit".to_string(), "true".to_string()); - self - } - - pub fn set_user_password(mut self, user: String, password: String) -> Self { - let auth = format!( - "Basic {}", - general_purpose::STANDARD.encode(format!("{}:{}", user, password)) - ); - self.header.insert("Authorization".to_string(), auth); - self - } - - /// The method is temporarily not in use, reserved for later use in 2PC. - pub fn set_txn_id(mut self, txn_id: i64) -> Self { - self.header - .insert("txn_operation".to_string(), txn_id.to_string()); - self - } - - /// The method is temporarily not in use, reserved for later use in 2PC. - pub fn add_commit(mut self) -> Self { - self.header - .insert("txn_operation".to_string(), "commit".to_string()); - self - } - - /// The method is temporarily not in use, reserved for later use in 2PC. - pub fn add_abort(mut self) -> Self { - self.header - .insert("txn_operation".to_string(), "abort".to_string()); - self - } - - /// This method is used to add custom message headers, such as the data import format. - pub fn set_properties(mut self, properties: HashMap) -> Self { - self.header.extend(properties); - self - } - - fn build_request_and_client( - &self, - uri: String, - ) -> (Builder, Client>) { - let mut builder = Request::put(uri); - for (k, v) in &self.header { - builder = builder.header(k, v); - } - - let connector = HttpsConnector::new(); - let client = Client::builder() - .pool_idle_timeout(POOL_IDLE_TIMEOUT) - .build(connector); - - (builder, client) - } - - pub async fn build(&mut self) -> Result { - let (builder, client) = self.build_request_and_client(self.url.clone()); - - let request_get_url = builder - .body(Body::empty()) - .map_err(|err| SinkError::Http(err.into()))?; - let resp = client - .request(request_get_url) - .await - .map_err(|err| SinkError::Http(err.into()))?; - let be_url = if resp.status() == StatusCode::TEMPORARY_REDIRECT { - resp.headers() - .get("location") - .ok_or_else(|| { - SinkError::Http(anyhow::anyhow!("Can't get doris BE url in header",)) - })? - .to_str() - .map_err(|err| { - SinkError::Http(anyhow::anyhow!( - "Can't get doris BE url in header {:?}", - err - )) - })? - } else { - return Err(SinkError::Http(anyhow::anyhow!("Can't get doris BE url",))); - }; - - let (builder, client) = self.build_request_and_client(be_url.to_string()); - let (sender, body) = Body::channel(); - let request = builder - .body(body) - .map_err(|err| SinkError::Http(err.into()))?; - let feature = client.request(request); - - let handle: JoinHandle> = tokio::spawn(async move { - let response = feature.await.map_err(|err| SinkError::Http(err.into()))?; - let status = response.status(); - let raw_string = String::from_utf8( - body::to_bytes(response.into_body()) - .await - .map_err(|err| SinkError::Http(err.into()))? - .to_vec(), - ) - .map_err(|err| SinkError::Http(err.into()))?; - - if status == StatusCode::OK && !raw_string.is_empty() { - let response: DorisInsertResultResponse = - serde_json::from_str(&raw_string).map_err(|err| SinkError::Http(err.into()))?; - Ok(response) - } else { - Err(SinkError::Http(anyhow::anyhow!( - "Failed connection {:?},{:?}", - status, - raw_string - ))) - } - }); - - Ok(DorisInsert::new(sender, handle)) - } -} - -pub struct DorisInsert { - sender: Option, - join_handle: Option>>, - buffer: BytesMut, - is_first_record: bool, -} -impl DorisInsert { - pub fn new(sender: Sender, join_handle: JoinHandle>) -> Self { - Self { - sender: Some(sender), - join_handle: Some(join_handle), - buffer: BytesMut::with_capacity(BUFFER_SIZE), - is_first_record: true, - } - } - - async fn send_chunk(&mut self) -> Result<()> { - if self.sender.is_none() { - return Ok(()); - } - - let chunk = mem::replace(&mut self.buffer, BytesMut::with_capacity(BUFFER_SIZE)); - - let is_timed_out = match tokio::time::timeout( - SEND_CHUNK_TIMEOUT, - self.sender.as_mut().unwrap().send_data(chunk.into()), - ) - .await - { - Ok(Ok(_)) => return Ok(()), - Ok(Err(_)) => false, - Err(_) => true, - }; - self.abort()?; - - let res = self.wait_handle().await; - - if is_timed_out { - Err(SinkError::Http(anyhow::anyhow!("timeout"))) - } else { - res?; - Err(SinkError::Http(anyhow::anyhow!("channel closed"))) - } - } - - fn abort(&mut self) -> Result<()> { - if let Some(sender) = self.sender.take() { - sender.abort(); - } - Ok(()) - } - - pub async fn write(&mut self, data: Bytes) -> Result<()> { - if self.is_first_record { - self.is_first_record = false; - } else { - self.buffer.put_slice("\n".as_bytes()); - } - self.buffer.put_slice(&data); - if self.buffer.len() >= MIN_CHUNK_SIZE { - self.send_chunk().await?; - } - Ok(()) - } - - async fn wait_handle(&mut self) -> Result { - let res = - match tokio::time::timeout(WAIT_HANDDLE_TIMEOUT, self.join_handle.as_mut().unwrap()) - .await - { - Ok(res) => res.map_err(|err| SinkError::Http(err.into()))??, - Err(err) => return Err(SinkError::Http(err.into())), - }; - if !DORIS_SUCCESS_STATUS.contains(&res.status.as_str()) { - return Err(SinkError::Http(anyhow::anyhow!( - "Insert error: {:?}, error url: {:?}", - res.message, - res.err_url - ))); - }; - Ok(res) - } - - pub async fn finish(mut self) -> Result { - if !self.buffer.is_empty() { - self.send_chunk().await?; - } - self.sender = None; - self.wait_handle().await - } -} - -pub struct DorisGet { - url: String, - table: String, - db: String, - user: String, - password: String, -} -impl DorisGet { - pub fn new(url: String, table: String, db: String, user: String, password: String) -> Self { - Self { - url, - table, - db, - user, - password, - } - } - - pub async fn get_schema_from_doris(&self) -> Result { - let uri = format!("{}/api/{}/{}/_schema", self.url, self.db, self.table); - let builder = Request::get(uri); - - let connector = HttpsConnector::new(); - let client = Client::builder() - .pool_idle_timeout(POOL_IDLE_TIMEOUT) - .build(connector); - - let request = builder - .header( - "Authorization", - format!( - "Basic {}", - general_purpose::STANDARD.encode(format!("{}:{}", self.user, self.password)) - ), - ) - .body(Body::empty()) - .map_err(|err| SinkError::Http(err.into()))?; - - let response = client - .request(request) - .await - .map_err(|err| SinkError::Http(err.into()))?; - - let raw_bytes = String::from_utf8(match body::to_bytes(response.into_body()).await { - Ok(bytes) => bytes.to_vec(), - Err(err) => return Err(SinkError::Http(err.into())), - }) - .map_err(|err| SinkError::Http(err.into()))?; - - let json_map: HashMap = - serde_json::from_str(&raw_bytes).map_err(|err| SinkError::Http(err.into()))?; - let json_data = if json_map.contains_key("code") && json_map.contains_key("msg") { - let data = json_map - .get("data") - .ok_or_else(|| SinkError::Http(anyhow::anyhow!("Can't find data")))?; - data.to_string() - } else { - raw_bytes - }; - let schema: DorisSchema = serde_json::from_str(&json_data).map_err(|err| { - SinkError::Http(anyhow::anyhow!("Can't get schema from json {:?}", err)) - })?; - Ok(schema) - } -} -#[derive(Debug, Serialize, Deserialize)] -pub struct DorisSchema { - status: i32, - #[serde(rename = "keysType")] - pub keys_type: String, - pub properties: Vec, -} -#[derive(Debug, Serialize, Deserialize)] -pub struct DorisField { - pub name: String, - pub r#type: String, - comment: String, - pub precision: Option, - pub scale: Option, - aggregation_type: String, -} -impl DorisField { - pub fn get_decimal_pre_scale(&self) -> Option<(u8, u8)> { - if self.r#type.contains("DECIMAL") { - let a = self.precision.clone().unwrap().parse::().unwrap(); - let b = self.scale.clone().unwrap().parse::().unwrap(); - Some((a, b)) - } else { - None - } - } -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct DorisInsertResultResponse { - #[serde(rename = "TxnId")] - txn_id: i64, - #[serde(rename = "Label")] - label: String, - #[serde(rename = "Status")] - status: String, - #[serde(rename = "TwoPhaseCommit")] - two_phase_commit: String, - #[serde(rename = "Message")] - message: String, - #[serde(rename = "NumberTotalRows")] - number_total_rows: i64, - #[serde(rename = "NumberLoadedRows")] - number_loaded_rows: i64, - #[serde(rename = "NumberFilteredRows")] - number_filtered_rows: i32, - #[serde(rename = "NumberUnselectedRows")] - number_unselected_rows: i32, - #[serde(rename = "LoadBytes")] - load_bytes: i64, - #[serde(rename = "LoadTimeMs")] - load_time_ms: i32, - #[serde(rename = "BeginTxnTimeMs")] - begin_txn_time_ms: i32, - #[serde(rename = "StreamLoadPutTimeMs")] - stream_load_put_time_ms: i32, - #[serde(rename = "ReadDataTimeMs")] - read_data_time_ms: i32, - #[serde(rename = "WriteDataTimeMs")] - write_data_time_ms: i32, - #[serde(rename = "CommitAndPublishTimeMs")] - commit_and_publish_time_ms: i32, - #[serde(rename = "ErrorURL")] - err_url: Option, -} diff --git a/src/connector/src/sink/doris_starrocks_connector.rs b/src/connector/src/sink/doris_starrocks_connector.rs new file mode 100644 index 0000000000000..ca5dd8696d434 --- /dev/null +++ b/src/connector/src/sink/doris_starrocks_connector.rs @@ -0,0 +1,320 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use core::mem; +use core::time::Duration; +use std::collections::HashMap; + +use base64::engine::general_purpose; +use base64::Engine; +use bytes::{BufMut, Bytes, BytesMut}; +use http::request::Builder; +use hyper::body::{Body, Sender}; +use hyper::client::HttpConnector; +use hyper::{body, Client, Request, StatusCode}; +use hyper_tls::HttpsConnector; +use tokio::task::JoinHandle; + +use super::{Result, SinkError}; + +const BUFFER_SIZE: usize = 64 * 1024; +const MIN_CHUNK_SIZE: usize = BUFFER_SIZE - 1024; +pub(crate) const DORIS_SUCCESS_STATUS: [&str; 2] = ["Success", "Publish Timeout"]; +pub(crate) const DORIS_DELETE_SIGN: &str = "__DORIS_DELETE_SIGN__"; +pub(crate) const STARROCKS_DELETE_SIGN: &str = "__op"; + +const SEND_CHUNK_TIMEOUT: Duration = Duration::from_secs(10); +const WAIT_HANDDLE_TIMEOUT: Duration = Duration::from_secs(10); +pub(crate) const POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(30); +const DORIS: &str = "doris"; +const STARROCKS: &str = "starrocks"; +pub struct HeaderBuilder { + header: HashMap, +} +impl Default for HeaderBuilder { + fn default() -> Self { + Self::new() + } +} +impl HeaderBuilder { + pub fn new() -> Self { + Self { + header: HashMap::default(), + } + } + + pub fn add_common_header(mut self) -> Self { + self.header + .insert("expect".to_string(), "100-continue".to_string()); + self + } + + /// The method is temporarily not in use, reserved for later use in 2PC. + /// Doris will generate a default, non-repeating label. + pub fn set_label(mut self, label: String) -> Self { + self.header.insert("label".to_string(), label); + self + } + + pub fn set_columns_name(mut self, columns_name: Vec<&str>) -> Self { + let columns_name_str = columns_name.join(","); + self.header.insert("columns".to_string(), columns_name_str); + self + } + + /// This method is only called during upsert operations. + pub fn add_hidden_column(mut self) -> Self { + self.header + .insert("hidden_columns".to_string(), DORIS_DELETE_SIGN.to_string()); + self + } + + /// The method is temporarily not in use, reserved for later use in 2PC. + /// Only use in Doris + pub fn enable_2_pc(mut self) -> Self { + self.header + .insert("two_phase_commit".to_string(), "true".to_string()); + self + } + + pub fn set_user_password(mut self, user: String, password: String) -> Self { + let auth = format!( + "Basic {}", + general_purpose::STANDARD.encode(format!("{}:{}", user, password)) + ); + self.header.insert("Authorization".to_string(), auth); + self + } + + /// The method is temporarily not in use, reserved for later use in 2PC. + /// Only use in Doris + pub fn set_txn_id(mut self, txn_id: i64) -> Self { + self.header + .insert("txn_operation".to_string(), txn_id.to_string()); + self + } + + /// The method is temporarily not in use, reserved for later use in 2PC. + /// Only use in Doris + pub fn add_commit(mut self) -> Self { + self.header + .insert("txn_operation".to_string(), "commit".to_string()); + self + } + + /// The method is temporarily not in use, reserved for later use in 2PC. + /// Only use in Doris + pub fn add_abort(mut self) -> Self { + self.header + .insert("txn_operation".to_string(), "abort".to_string()); + self + } + + pub fn add_json_format(mut self) -> Self { + self.header.insert("format".to_string(), "json".to_string()); + self + } + + /// Only use in Doris + pub fn add_read_json_by_line(mut self) -> Self { + self.header + .insert("read_json_by_line".to_string(), "true".to_string()); + self + } + + /// Only use in Starrocks + pub fn add_strip_outer_array(mut self) -> Self { + self.header + .insert("strip_outer_array".to_string(), "true".to_string()); + self + } + + pub fn build(self) -> HashMap { + self.header + } +} + +pub struct InserterInnerBuilder { + url: String, + header: HashMap, + sender: Option, +} +impl InserterInnerBuilder { + pub fn new(url: String, db: String, table: String, header: HashMap) -> Self { + let url = format!("{}/api/{}/{}/_stream_load", url, db, table); + + Self { + url, + sender: None, + header, + } + } + + fn build_request_and_client( + &self, + uri: String, + ) -> (Builder, Client>) { + let mut builder = Request::put(uri); + for (k, v) in &self.header { + builder = builder.header(k, v); + } + + let connector = HttpsConnector::new(); + let client = Client::builder() + .pool_idle_timeout(POOL_IDLE_TIMEOUT) + .build(connector); + + (builder, client) + } + + pub async fn build(&self) -> Result { + let (builder, client) = self.build_request_and_client(self.url.clone()); + let request_get_url = builder + .body(Body::empty()) + .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + let resp = client + .request(request_get_url) + .await + .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + let be_url = if resp.status() == StatusCode::TEMPORARY_REDIRECT { + resp.headers() + .get("location") + .ok_or_else(|| { + SinkError::DorisStarrocksConnect(anyhow::anyhow!( + "Can't get doris BE url in header", + )) + })? + .to_str() + .map_err(|err| { + SinkError::DorisStarrocksConnect(anyhow::anyhow!( + "Can't get doris BE url in header {:?}", + err + )) + })? + } else { + return Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!( + "Can't get doris BE url", + ))); + }; + + let (builder, client) = self.build_request_and_client(be_url.to_string()); + let (sender, body) = Body::channel(); + let request = builder + .body(body) + .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + let feature = client.request(request); + + let handle: JoinHandle>> = tokio::spawn(async move { + let response = feature + .await + .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + let status = response.status(); + let raw = body::to_bytes(response.into_body()) + .await + .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))? + .to_vec(); + if status == StatusCode::OK && !raw.is_empty() { + Ok(raw) + } else { + Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!( + "Failed connection {:?},{:?}", + status, + String::from_utf8(raw) + .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))? + ))) + } + }); + Ok(InserterInner::new(sender, handle)) + } +} + +pub struct InserterInner { + sender: Option, + join_handle: Option>>>, + buffer: BytesMut, +} +impl InserterInner { + pub fn new(sender: Sender, join_handle: JoinHandle>>) -> Self { + Self { + sender: Some(sender), + join_handle: Some(join_handle), + buffer: BytesMut::with_capacity(BUFFER_SIZE), + } + } + + async fn send_chunk(&mut self) -> Result<()> { + if self.sender.is_none() { + return Ok(()); + } + + let chunk = mem::replace(&mut self.buffer, BytesMut::with_capacity(BUFFER_SIZE)); + + let is_timed_out = match tokio::time::timeout( + SEND_CHUNK_TIMEOUT, + self.sender.as_mut().unwrap().send_data(chunk.into()), + ) + .await + { + Ok(Ok(_)) => return Ok(()), + Ok(Err(_)) => false, + Err(_) => true, + }; + self.abort()?; + + let res = self.wait_handle().await; + + if is_timed_out { + Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!("timeout"))) + } else { + res?; + Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!( + "channel closed" + ))) + } + } + + fn abort(&mut self) -> Result<()> { + if let Some(sender) = self.sender.take() { + sender.abort(); + } + Ok(()) + } + + pub async fn write(&mut self, data: Bytes) -> Result<()> { + self.buffer.put_slice(&data); + if self.buffer.len() >= MIN_CHUNK_SIZE { + self.send_chunk().await?; + } + Ok(()) + } + + async fn wait_handle(&mut self) -> Result> { + let res = + match tokio::time::timeout(WAIT_HANDDLE_TIMEOUT, self.join_handle.as_mut().unwrap()) + .await + { + Ok(res) => res.map_err(|err| SinkError::DorisStarrocksConnect(err.into()))??, + Err(err) => return Err(SinkError::DorisStarrocksConnect(err.into())), + }; + Ok(res) + } + + pub async fn finish(mut self) -> Result> { + if !self.buffer.is_empty() { + self.send_chunk().await?; + } + self.sender = None; + self.wait_handle().await + } +} diff --git a/src/connector/src/sink/encoder/json.rs b/src/connector/src/sink/encoder/json.rs index 7da859b8e9045..f85eb88acc233 100644 --- a/src/connector/src/sink/encoder/json.rs +++ b/src/connector/src/sink/encoder/json.rs @@ -196,15 +196,16 @@ fn datum_to_json_object( CustomJsonType::Doris(map) => { if !matches!(v, Decimal::Normalized(_)) { return Err(ArrayError::internal( - "doris can't support decimal Inf, -Inf, Nan".to_string(), + "doris/starrocks can't support decimal Inf, -Inf, Nan".to_string(), )); } let (p, s) = map.get(&field.name).unwrap(); v.rescale(*s as u32); let v_string = v.to_text(); - if v_string.len() > *p as usize { + let len = v_string.clone().replace(['.', '-'], "").len(); + if len > *p as usize { return Err(ArrayError::internal( - format!("rw Decimal's precision is large than doris max decimal len is {:?}, doris max is {:?}",v_string.len(),p))); + format!("rw Decimal's precision is large than doris/starrocks max decimal len is {:?}, doris max is {:?}",v_string.len(),p))); } json!(v_string) } diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 0ac365a51c950..c7748bceedca5 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -19,7 +19,7 @@ pub mod catalog; pub mod clickhouse; pub mod coordinate; pub mod doris; -pub mod doris_connector; +pub mod doris_starrocks_connector; pub mod encoder; pub mod formatter; pub mod iceberg; @@ -30,6 +30,7 @@ pub mod nats; pub mod pulsar; pub mod redis; pub mod remote; +pub mod starrocks; pub mod test_sink; pub mod utils; pub mod writer; @@ -79,6 +80,7 @@ macro_rules! for_all_sinks { { ElasticSearch, $crate::sink::remote::ElasticSearchSink }, { Cassandra, $crate::sink::remote::CassandraSink }, { Doris, $crate::sink::doris::DorisSink }, + { Starrocks, $crate::sink::starrocks::StarrocksSink }, { BigQuery, $crate::sink::big_query::BigQuerySink }, { Test, $crate::sink::test_sink::TestSink } } @@ -411,14 +413,16 @@ pub enum SinkError { #[backtrace] anyhow::Error, ), - #[error("Doris http error: {0}")] - Http( + #[error("Doris/Starrocks connect error: {0}")] + DorisStarrocksConnect( #[source] #[backtrace] anyhow::Error, ), #[error("Doris error: {0}")] Doris(String), + #[error("Starrocks error: {0}")] + Starrocks(String), #[error("Pulsar error: {0}")] Pulsar( #[source] diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs new file mode 100644 index 0000000000000..22597a80211eb --- /dev/null +++ b/src/connector/src/sink/starrocks.rs @@ -0,0 +1,610 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use anyhow::anyhow; +use async_trait::async_trait; +use bytes::Bytes; +use itertools::Itertools; +use mysql_async::prelude::Queryable; +use mysql_async::Opts; +use risingwave_common::array::{Op, StreamChunk}; +use risingwave_common::buffer::Bitmap; +use risingwave_common::catalog::Schema; +use risingwave_common::types::DataType; +use serde::Deserialize; +use serde_derive::Serialize; +use serde_json::Value; +use serde_with::serde_as; + +use super::doris_starrocks_connector::{ + HeaderBuilder, InserterInner, InserterInnerBuilder, DORIS_SUCCESS_STATUS, STARROCKS_DELETE_SIGN, +}; +use super::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode}; +use super::writer::LogSinkerOf; +use super::{SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; +use crate::sink::writer::SinkWriterExt; +use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam}; + +pub const STARROCKS_SINK: &str = "starrocks"; +const STARROCK_MYSQL_PREFER_SOCKET: &str = "false"; +const STARROCK_MYSQL_MAX_ALLOWED_PACKET: usize = 1024; +const STARROCK_MYSQL_WAIT_TIMEOUT: usize = 28800; + +#[derive(Deserialize, Serialize, Debug, Clone)] +pub struct StarrocksCommon { + #[serde(rename = "starrocks.host")] + pub host: String, + #[serde(rename = "starrocks.mysqlport")] + pub mysql_port: String, + #[serde(rename = "starrocks.httpport")] + pub http_port: String, + #[serde(rename = "starrocks.user")] + pub user: String, + #[serde(rename = "starrocks.password")] + pub password: String, + #[serde(rename = "starrocks.database")] + pub database: String, + #[serde(rename = "starrocks.table")] + pub table: String, +} + +#[serde_as] +#[derive(Clone, Debug, Deserialize)] +pub struct StarrocksConfig { + #[serde(flatten)] + pub common: StarrocksCommon, + + pub r#type: String, // accept "append-only" or "upsert" +} +impl StarrocksConfig { + pub fn from_hashmap(properties: HashMap) -> Result { + let config = + serde_json::from_value::(serde_json::to_value(properties).unwrap()) + .map_err(|e| SinkError::Config(anyhow!(e)))?; + if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT { + return Err(SinkError::Config(anyhow!( + "`{}` must be {}, or {}", + SINK_TYPE_OPTION, + SINK_TYPE_APPEND_ONLY, + SINK_TYPE_UPSERT + ))); + } + Ok(config) + } +} + +#[derive(Debug)] +pub struct StarrocksSink { + pub config: StarrocksConfig, + schema: Schema, + pk_indices: Vec, + is_append_only: bool, +} + +impl StarrocksSink { + pub fn new( + config: StarrocksConfig, + schema: Schema, + pk_indices: Vec, + is_append_only: bool, + ) -> Result { + Ok(Self { + config, + schema, + pk_indices, + is_append_only, + }) + } +} + +impl StarrocksSink { + fn check_column_name_and_type( + &self, + starrocks_columns_desc: HashMap, + ) -> Result<()> { + let rw_fields_name = self.schema.fields(); + if rw_fields_name.len().ne(&starrocks_columns_desc.len()) { + return Err(SinkError::Starrocks("The length of the RisingWave column must be equal to the length of the starrocks column".to_string())); + } + + for i in rw_fields_name { + let value = starrocks_columns_desc.get(&i.name).ok_or_else(|| { + SinkError::Starrocks(format!( + "Column name don't find in starrocks, risingwave is {:?} ", + i.name + )) + })?; + if !Self::check_and_correct_column_type(&i.data_type, value.to_string())? { + return Err(SinkError::Starrocks(format!( + "Column type don't match, column name is {:?}. starrocks type is {:?} risingwave type is {:?} ",i.name,value,i.data_type + ))); + } + } + Ok(()) + } + + fn check_and_correct_column_type( + rw_data_type: &DataType, + starrocks_data_type: String, + ) -> Result { + match rw_data_type { + risingwave_common::types::DataType::Boolean => { + Ok(starrocks_data_type.contains("tinyint")) + } + risingwave_common::types::DataType::Int16 => { + Ok(starrocks_data_type.contains("smallint")) + } + risingwave_common::types::DataType::Int32 => Ok(starrocks_data_type.contains("int")), + risingwave_common::types::DataType::Int64 => Ok(starrocks_data_type.contains("bigint")), + risingwave_common::types::DataType::Float32 => { + Ok(starrocks_data_type.contains("float")) + } + risingwave_common::types::DataType::Float64 => { + Ok(starrocks_data_type.contains("double")) + } + risingwave_common::types::DataType::Decimal => { + Ok(starrocks_data_type.contains("decimal")) + } + risingwave_common::types::DataType::Date => Ok(starrocks_data_type.contains("date")), + risingwave_common::types::DataType::Varchar => { + Ok(starrocks_data_type.contains("varchar")) + } + risingwave_common::types::DataType::Time => Err(SinkError::Starrocks( + "starrocks can not support Time".to_string(), + )), + risingwave_common::types::DataType::Timestamp => { + Ok(starrocks_data_type.contains("datetime")) + } + risingwave_common::types::DataType::Timestamptz => Err(SinkError::Starrocks( + "starrocks can not support Timestamptz".to_string(), + )), + risingwave_common::types::DataType::Interval => Err(SinkError::Starrocks( + "starrocks can not support Interval".to_string(), + )), + // todo! Validate the type struct and list + risingwave_common::types::DataType::Struct(_) => Err(SinkError::Starrocks( + "starrocks can not support import struct".to_string(), + )), + risingwave_common::types::DataType::List(_) => { + Ok(starrocks_data_type.contains("unknown")) + } + risingwave_common::types::DataType::Bytea => Err(SinkError::Starrocks( + "starrocks can not support Bytea".to_string(), + )), + risingwave_common::types::DataType::Jsonb => Err(SinkError::Starrocks( + "starrocks can not support import json".to_string(), + )), + risingwave_common::types::DataType::Serial => { + Ok(starrocks_data_type.contains("bigint")) + } + risingwave_common::types::DataType::Int256 => Err(SinkError::Starrocks( + "starrocks can not support Int256".to_string(), + )), + } + } +} + +impl Sink for StarrocksSink { + type Coordinator = DummySinkCommitCoordinator; + type LogSinker = LogSinkerOf; + + const SINK_NAME: &'static str = STARROCKS_SINK; + + async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { + Ok(StarrocksSinkWriter::new( + self.config.clone(), + self.schema.clone(), + self.pk_indices.clone(), + self.is_append_only, + ) + .await? + .into_log_sinker(writer_param.sink_metrics)) + } + + async fn validate(&self) -> Result<()> { + if !self.is_append_only && self.pk_indices.is_empty() { + return Err(SinkError::Config(anyhow!( + "Primary key not defined for upsert starrocks sink (please define in `primary_key` field)"))); + } + // check reachability + let mut client = StarrocksSchemaClient::new( + self.config.common.host.clone(), + self.config.common.mysql_port.clone(), + self.config.common.table.clone(), + self.config.common.database.clone(), + self.config.common.user.clone(), + self.config.common.password.clone(), + ) + .await?; + let (read_model, pks) = client.get_pk_from_starrocks().await?; + + if !self.is_append_only && read_model.ne("PRIMARY_KEYS") { + return Err(SinkError::Config(anyhow!( + "If you want to use upsert, please set the keysType of starrocks to PRIMARY_KEY" + ))); + } + + for (index, filed) in self.schema.fields().iter().enumerate() { + if self.pk_indices.contains(&index) && !pks.contains(&filed.name) { + return Err(SinkError::Starrocks(format!( + "Can't find pk {:?} in starrocks", + filed.name + ))); + } + } + + let starrocks_columns_desc = client.get_columns_from_starrocks().await?; + + self.check_column_name_and_type(starrocks_columns_desc)?; + Ok(()) + } +} + +pub struct StarrocksSinkWriter { + pub config: StarrocksConfig, + schema: Schema, + pk_indices: Vec, + inserter_innet_builder: InserterInnerBuilder, + is_append_only: bool, + client: Option, + row_encoder: JsonEncoder, +} + +impl TryFrom for StarrocksSink { + type Error = SinkError; + + fn try_from(param: SinkParam) -> std::result::Result { + let schema = param.schema(); + let config = StarrocksConfig::from_hashmap(param.properties)?; + StarrocksSink::new( + config, + schema, + param.downstream_pk, + param.sink_type.is_append_only(), + ) + } +} + +impl StarrocksSinkWriter { + pub async fn new( + config: StarrocksConfig, + schema: Schema, + pk_indices: Vec, + is_append_only: bool, + ) -> Result { + let mut decimal_map = HashMap::default(); + let starrocks_columns = StarrocksSchemaClient::new( + config.common.host.clone(), + config.common.mysql_port.clone(), + config.common.table.clone(), + config.common.database.clone(), + config.common.user.clone(), + config.common.password.clone(), + ) + .await? + .get_columns_from_starrocks() + .await?; + + for (name, column_type) in &starrocks_columns { + if column_type.contains("decimal") { + let decimal_all = column_type + .split("decimal(") + .last() + .ok_or_else(|| SinkError::Starrocks("must have last".to_string()))? + .split(')') + .next() + .ok_or_else(|| SinkError::Starrocks("must have next".to_string()))? + .split(',') + .collect_vec(); + let length = decimal_all + .first() + .ok_or_else(|| SinkError::Starrocks("must have next".to_string()))? + .parse::() + .map_err(|e| SinkError::Starrocks(format!("starrocks sink error {}", e)))?; + + let scale = decimal_all + .last() + .ok_or_else(|| SinkError::Starrocks("must have next".to_string()))? + .parse::() + .map_err(|e| SinkError::Starrocks(format!("starrocks sink error {}", e)))?; + decimal_map.insert(name.to_string(), (length, scale)); + } + } + + let builder = HeaderBuilder::new() + .add_common_header() + .set_user_password(config.common.user.clone(), config.common.password.clone()) + .add_json_format(); + let header = if !is_append_only { + let mut fields_name = schema.names_str(); + fields_name.push(STARROCKS_DELETE_SIGN); + builder.set_columns_name(fields_name).build() + } else { + builder.build() + }; + + let starrocks_insert_builder = InserterInnerBuilder::new( + format!("http://{}:{}", config.common.host, config.common.http_port), + config.common.database.clone(), + config.common.table.clone(), + header, + ); + Ok(Self { + config, + schema: schema.clone(), + pk_indices, + inserter_innet_builder: starrocks_insert_builder, + is_append_only, + client: None, + row_encoder: JsonEncoder::new_with_doris( + schema, + None, + TimestampHandlingMode::String, + decimal_map, + ), + }) + } + + async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> { + for (op, row) in chunk.rows() { + if op != Op::Insert { + continue; + } + let row_json_string = Value::Object(self.row_encoder.encode(row)?).to_string(); + self.client + .as_mut() + .ok_or_else(|| { + SinkError::Starrocks("Can't find starrocks sink insert".to_string()) + })? + .write(row_json_string.into()) + .await?; + } + Ok(()) + } + + async fn upsert(&mut self, chunk: StreamChunk) -> Result<()> { + for (op, row) in chunk.rows() { + match op { + Op::Insert => { + let mut row_json_value = self.row_encoder.encode(row)?; + row_json_value.insert( + STARROCKS_DELETE_SIGN.to_string(), + Value::String("0".to_string()), + ); + let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| { + SinkError::Starrocks(format!("Json derialize error {:?}", e)) + })?; + self.client + .as_mut() + .ok_or_else(|| { + SinkError::Starrocks("Can't find starrocks sink insert".to_string()) + })? + .write(row_json_string.into()) + .await?; + } + Op::Delete => { + let mut row_json_value = self.row_encoder.encode(row)?; + row_json_value.insert( + STARROCKS_DELETE_SIGN.to_string(), + Value::String("1".to_string()), + ); + let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| { + SinkError::Starrocks(format!("Json derialize error {:?}", e)) + })?; + self.client + .as_mut() + .ok_or_else(|| { + SinkError::Starrocks("Can't find starrocks sink insert".to_string()) + })? + .write(row_json_string.into()) + .await?; + } + Op::UpdateDelete => {} + Op::UpdateInsert => { + let mut row_json_value = self.row_encoder.encode(row)?; + row_json_value.insert( + STARROCKS_DELETE_SIGN.to_string(), + Value::String("0".to_string()), + ); + let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| { + SinkError::Starrocks(format!("Json derialize error {:?}", e)) + })?; + self.client + .as_mut() + .ok_or_else(|| { + SinkError::Starrocks("Can't find starrocks sink insert".to_string()) + })? + .write(row_json_string.into()) + .await?; + } + } + } + Ok(()) + } +} + +#[async_trait] +impl SinkWriter for StarrocksSinkWriter { + async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { + if self.client.is_none() { + self.client = Some(StarrocksClient::new( + self.inserter_innet_builder.build().await?, + )); + } + if self.is_append_only { + self.append_only(chunk).await + } else { + self.upsert(chunk).await + } + } + + async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> { + Ok(()) + } + + async fn abort(&mut self) -> Result<()> { + Ok(()) + } + + async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> { + if self.client.is_some() { + let client = self + .client + .take() + .ok_or_else(|| SinkError::Starrocks("Can't find starrocks inserter".to_string()))?; + client.finish().await?; + } + Ok(()) + } + + async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Arc) -> Result<()> { + Ok(()) + } +} + +pub struct StarrocksSchemaClient { + table: String, + db: String, + conn: mysql_async::Conn, +} + +impl StarrocksSchemaClient { + pub async fn new( + host: String, + port: String, + table: String, + db: String, + user: String, + password: String, + ) -> Result { + let conn_uri = format!( + "mysql://{}:{}@{}:{}/{}?prefer_socket={}&max_allowed_packet={}&wait_timeout={}", + user, + password, + host, + port, + db, + STARROCK_MYSQL_PREFER_SOCKET, + STARROCK_MYSQL_MAX_ALLOWED_PACKET, + STARROCK_MYSQL_WAIT_TIMEOUT + ); + let pool = mysql_async::Pool::new( + Opts::from_url(&conn_uri) + .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?, + ); + let conn = pool + .get_conn() + .await + .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + + Ok(Self { table, db, conn }) + } + + pub async fn get_columns_from_starrocks(&mut self) -> Result> { + let query = format!("select column_name, column_type from information_schema.columns where table_name = {:?} and table_schema = {:?};",self.table,self.db); + let mut query_map: HashMap = HashMap::default(); + self.conn + .query_map(query, |(column_name, column_type)| { + query_map.insert(column_name, column_type) + }) + .await + .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + Ok(query_map) + } + + pub async fn get_pk_from_starrocks(&mut self) -> Result<(String, String)> { + let query = format!("select table_model, primary_key from information_schema.tables_config where table_name = {:?} and table_schema = {:?};",self.table,self.db); + let table_mode_pk: (String, String) = self + .conn + .query_map(query, |(table_model, primary_key)| { + (table_model, primary_key) + }) + .await + .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))? + .get(0) + .ok_or_else(|| { + SinkError::Starrocks(format!( + "Can't find schema with table {:?} and database {:?}", + self.table, self.db + )) + })? + .clone(); + Ok(table_mode_pk) + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct StarrocksInsertResultResponse { + #[serde(rename = "TxnId")] + txn_id: i64, + #[serde(rename = "Label")] + label: String, + #[serde(rename = "Status")] + status: String, + #[serde(rename = "Message")] + message: String, + #[serde(rename = "NumberTotalRows")] + number_total_rows: i64, + #[serde(rename = "NumberLoadedRows")] + number_loaded_rows: i64, + #[serde(rename = "NumberFilteredRows")] + number_filtered_rows: i32, + #[serde(rename = "NumberUnselectedRows")] + number_unselected_rows: i32, + #[serde(rename = "LoadBytes")] + load_bytes: i64, + #[serde(rename = "LoadTimeMs")] + load_time_ms: i32, + #[serde(rename = "BeginTxnTimeMs")] + begin_txn_time_ms: i32, + #[serde(rename = "ReadDataTimeMs")] + read_data_time_ms: i32, + #[serde(rename = "WriteDataTimeMs")] + write_data_time_ms: i32, + #[serde(rename = "CommitAndPublishTimeMs")] + commit_and_publish_time_ms: i32, + #[serde(rename = "StreamLoadPlanTimeMs")] + stream_load_plan_time_ms: Option, +} + +pub struct StarrocksClient { + insert: InserterInner, +} +impl StarrocksClient { + pub fn new(insert: InserterInner) -> Self { + Self { insert } + } + + pub async fn write(&mut self, data: Bytes) -> Result<()> { + self.insert.write(data).await?; + Ok(()) + } + + pub async fn finish(self) -> Result { + let raw = self.insert.finish().await?; + let res: StarrocksInsertResultResponse = serde_json::from_slice(&raw) + .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + + if !DORIS_SUCCESS_STATUS.contains(&res.status.as_str()) { + return Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!( + "Insert error: {:?}", + res.message, + ))); + }; + Ok(res) + } +}