From d8ec952fffa840e55a9c4a10c2f49e413f97e4eb Mon Sep 17 00:00:00 2001 From: Xinhao Xu <84456268+xxhZs@users.noreply.github.com> Date: Thu, 21 Sep 2023 20:24:48 +0800 Subject: [PATCH] feat(sink): support doris sink (#12336) --- Cargo.lock | 2 + integration_tests/doris-sink/README.md | 64 +++ .../doris-sink/append-only-sql/create_mv.sql | 7 + .../append-only-sql/create_sink.sql | 12 + .../append-only-sql/create_source.sql | 18 + .../doris-sink/docker-compose.yml | 104 +++++ .../doris-sink/upsert/create_mv.sql | 7 + .../doris-sink/upsert/create_sink.sql | 12 + .../doris-sink/upsert/create_table.sql | 10 + .../upsert/insert_update_delete.sql | 8 + src/common/src/types/decimal.rs | 6 + src/connector/Cargo.toml | 4 +- src/connector/src/common.rs | 27 ++ src/connector/src/sink/doris.rs | 357 +++++++++++++++ src/connector/src/sink/doris_connector.rs | 423 ++++++++++++++++++ src/connector/src/sink/encoder/json.rs | 186 ++++++-- src/connector/src/sink/encoder/mod.rs | 8 + src/connector/src/sink/kinesis.rs | 2 - src/connector/src/sink/mod.rs | 21 + src/connector/src/sink/utils.rs | 22 +- src/workspace-hack/Cargo.toml | 2 +- 21 files changed, 1270 insertions(+), 32 deletions(-) create mode 100644 integration_tests/doris-sink/README.md create mode 100644 integration_tests/doris-sink/append-only-sql/create_mv.sql create mode 100644 integration_tests/doris-sink/append-only-sql/create_sink.sql create mode 100644 integration_tests/doris-sink/append-only-sql/create_source.sql create mode 100644 integration_tests/doris-sink/docker-compose.yml create mode 100644 integration_tests/doris-sink/upsert/create_mv.sql create mode 100644 integration_tests/doris-sink/upsert/create_sink.sql create mode 100644 integration_tests/doris-sink/upsert/create_table.sql create mode 100644 integration_tests/doris-sink/upsert/insert_update_delete.sql create mode 100644 src/connector/src/sink/doris.rs create mode 100644 src/connector/src/sink/doris_connector.rs diff --git a/Cargo.lock b/Cargo.lock index 4f8157bef3a8b..3cbeb7ef0c52b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6894,9 +6894,11 @@ dependencies = [ "futures-async-stream", "glob", "google-cloud-pubsub", + "http", "hyper", "hyper-tls", "icelake", + "indexmap 1.9.3", "itertools 0.11.0", "jni", "jsonschema-transpiler", diff --git a/integration_tests/doris-sink/README.md b/integration_tests/doris-sink/README.md new file mode 100644 index 0000000000000..add7db0a0aaa8 --- /dev/null +++ b/integration_tests/doris-sink/README.md @@ -0,0 +1,64 @@ +# Demo: Sinking to Doris + +In this demo, we want to showcase how RisingWave is able to sink data to Doris. + +1. Modify max_map_count + +```sh +sysctl -w vm.max_map_count=2000000 +``` + +If, after running these commands, Docker still encounters Doris startup errors, please refer to: https://doris.apache.org/docs/dev/install/construct-docker/run-docker-cluster + + +2. Launch the cluster: + +```sh +docker-compose up -d +``` + +The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data, a Doris fe and be for sink. + +3. Create the Doris table via mysql: + +Login to mysql +```sh +docker compose exec fe mysql -uroot -P9030 -h127.0.0.1 +``` + +Run the following queries to create database and table. +```sql +CREATE database demo; +use demo; +CREATE table demo_bhv_table( + user_id int, + target_id text, + event_timestamp datetime +) UNIQUE KEY(`user_id`) +DISTRIBUTED BY HASH(`user_id`) BUCKETS 1 +PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" +); +CREATE USER 'users'@'%' IDENTIFIED BY '123456'; +GRANT ALL ON *.* TO 'users'@'%'; +``` + +4. Execute the SQL queries in sequence: + +- append-only sql: + - append-only/create_source.sql + - append-only/create_mv.sql + - append-only/create_sink.sql + +- upsert sql: + - upsert/create_table.sql + - upsert/create_mv.sql + - upsert/create_sink.sql + - upsert/insert_update_delete.sql + +We only support `upsert` with doris' `UNIQUE KEY` + +Run the following query +```sql +select user_id, count(*) from demo.demo_bhv_table group by user_id; +``` diff --git a/integration_tests/doris-sink/append-only-sql/create_mv.sql b/integration_tests/doris-sink/append-only-sql/create_mv.sql new file mode 100644 index 0000000000000..0a803f8a2762d --- /dev/null +++ b/integration_tests/doris-sink/append-only-sql/create_mv.sql @@ -0,0 +1,7 @@ +CREATE MATERIALIZED VIEW bhv_mv AS +SELECT + user_id, + target_id, + event_timestamp +FROM + user_behaviors; \ No newline at end of file diff --git a/integration_tests/doris-sink/append-only-sql/create_sink.sql b/integration_tests/doris-sink/append-only-sql/create_sink.sql new file mode 100644 index 0000000000000..fa0cfddf7bf16 --- /dev/null +++ b/integration_tests/doris-sink/append-only-sql/create_sink.sql @@ -0,0 +1,12 @@ +CREATE SINK bhv_doris_sink +FROM + bhv_mv WITH ( + connector = 'doris', + type = 'append-only', + doris.url = 'http://fe:8030', + doris.user = 'users', + doris.password = '123456', + doris.database = 'demo', + doris.table='demo_bhv_table', + force_append_only='true' +); \ No newline at end of file diff --git a/integration_tests/doris-sink/append-only-sql/create_source.sql b/integration_tests/doris-sink/append-only-sql/create_source.sql new file mode 100644 index 0000000000000..c28c10f3616da --- /dev/null +++ b/integration_tests/doris-sink/append-only-sql/create_source.sql @@ -0,0 +1,18 @@ +CREATE table user_behaviors ( + user_id int, + target_id VARCHAR, + target_type VARCHAR, + event_timestamp TIMESTAMP, + behavior_type VARCHAR, + parent_target_type VARCHAR, + parent_target_id VARCHAR, + PRIMARY KEY(user_id) +) WITH ( + connector = 'datagen', + fields.user_id.kind = 'sequence', + fields.user_id.start = '1', + fields.user_id.end = '1000', + fields.user_name.kind = 'random', + fields.user_name.length = '10', + datagen.rows.per.second = '10' +) FORMAT PLAIN ENCODE JSON; \ No newline at end of file diff --git a/integration_tests/doris-sink/docker-compose.yml b/integration_tests/doris-sink/docker-compose.yml new file mode 100644 index 0000000000000..697a6ac1880ea --- /dev/null +++ b/integration_tests/doris-sink/docker-compose.yml @@ -0,0 +1,104 @@ +--- +version: "3" +services: + fe: + image: apache/doris:2.0.0_alpha-fe-x86_64 + hostname: fe + environment: + - FE_SERVERS=fe1:172.21.0.2:9010 + - FE_ID=1 + ports: + - "8030:8030" + - "9030:9030" + networks: + mynetwork: + ipv4_address: 172.21.0.2 + be: + image: apache/doris:2.0.0_alpha-be-x86_64 + hostname: be + environment: + - FE_SERVERS=fe1:172.21.0.2:9010 + - BE_ADDR=172.21.0.3:9050 + depends_on: + - fe + ports: + - "9050:9050" + networks: + mynetwork: + ipv4_address: 172.21.0.3 + compactor-0: + extends: + file: ../../docker/docker-compose.yml + service: compactor-0 + networks: + mynetwork: + ipv4_address: 172.21.0.4 + compute-node-0: + extends: + file: ../../docker/docker-compose.yml + service: compute-node-0 + networks: + mynetwork: + ipv4_address: 172.21.0.5 + etcd-0: + extends: + file: ../../docker/docker-compose.yml + service: etcd-0 + networks: + mynetwork: + ipv4_address: 172.21.0.6 + frontend-node-0: + extends: + file: ../../docker/docker-compose.yml + service: frontend-node-0 + networks: + mynetwork: + ipv4_address: 172.21.0.7 + grafana-0: + extends: + file: ../../docker/docker-compose.yml + service: grafana-0 + networks: + mynetwork: + ipv4_address: 172.21.0.8 + meta-node-0: + extends: + file: ../../docker/docker-compose.yml + service: meta-node-0 + networks: + mynetwork: + ipv4_address: 172.21.0.9 + minio-0: + extends: + file: ../../docker/docker-compose.yml + service: minio-0 + networks: + mynetwork: + ipv4_address: 172.21.0.10 + prometheus-0: + extends: + file: ../../docker/docker-compose.yml + service: prometheus-0 + networks: + mynetwork: + ipv4_address: 172.21.0.11 +volumes: + compute-node-0: + external: false + etcd-0: + external: false + grafana-0: + external: false + minio-0: + external: false + prometheus-0: + external: false + message_queue: + external: false +name: risingwave-compose +networks: + mynetwork: + ipam: + config: + - subnet: 172.21.80.0/16 + default: \ No newline at end of file diff --git a/integration_tests/doris-sink/upsert/create_mv.sql b/integration_tests/doris-sink/upsert/create_mv.sql new file mode 100644 index 0000000000000..0a803f8a2762d --- /dev/null +++ b/integration_tests/doris-sink/upsert/create_mv.sql @@ -0,0 +1,7 @@ +CREATE MATERIALIZED VIEW bhv_mv AS +SELECT + user_id, + target_id, + event_timestamp +FROM + user_behaviors; \ No newline at end of file diff --git a/integration_tests/doris-sink/upsert/create_sink.sql b/integration_tests/doris-sink/upsert/create_sink.sql new file mode 100644 index 0000000000000..e7bd5445ba557 --- /dev/null +++ b/integration_tests/doris-sink/upsert/create_sink.sql @@ -0,0 +1,12 @@ +CREATE SINK bhv_doris_sink +FROM + bhv_mv WITH ( + connector = 'doris', + type = 'upsert', + doris.url = 'http://fe:8030', + doris.user = 'users', + doris.password = '123456', + doris.database = 'demo', + doris.table='demo_bhv_table', + primary_key = 'user_id' +); \ No newline at end of file diff --git a/integration_tests/doris-sink/upsert/create_table.sql b/integration_tests/doris-sink/upsert/create_table.sql new file mode 100644 index 0000000000000..6c98f88a0b510 --- /dev/null +++ b/integration_tests/doris-sink/upsert/create_table.sql @@ -0,0 +1,10 @@ +CREATE table user_behaviors ( + user_id int, + target_id VARCHAR, + target_type VARCHAR, + event_timestamp TIMESTAMP, + behavior_type VARCHAR, + parent_target_type VARCHAR, + parent_target_id VARCHAR, + PRIMARY KEY(user_id) +); \ No newline at end of file diff --git a/integration_tests/doris-sink/upsert/insert_update_delete.sql b/integration_tests/doris-sink/upsert/insert_update_delete.sql new file mode 100644 index 0000000000000..73d5cda442258 --- /dev/null +++ b/integration_tests/doris-sink/upsert/insert_update_delete.sql @@ -0,0 +1,8 @@ +INSERT INTO user_behaviors VALUES(1,'1','1','2020-01-01 01:01:01','1','1','1'), +(2,'2','2','2020-01-01 01:01:02','2','2','2'), +(3,'3','3','2020-01-01 01:01:03','3','3','3'), +(4,'4','4','2020-01-01 01:01:04','4','4','4'); + +DELETE FROM user_behaviors WHERE user_id = 2; + +UPDATE user_behaviors SET target_id = 30 WHERE user_id = 3; \ No newline at end of file diff --git a/src/common/src/types/decimal.rs b/src/common/src/types/decimal.rs index 0d4c1ab30ac00..3b63da076748d 100644 --- a/src/common/src/types/decimal.rs +++ b/src/common/src/types/decimal.rs @@ -416,6 +416,12 @@ impl Decimal { Some(d.scale() as _) } + pub fn rescale(&mut self, scale: u32) { + if let Normalized(a) = self { + a.rescale(scale); + } + } + #[must_use] pub fn round_dp_ties_away(&self, dp: u32) -> Self { match self { diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index c8cb5a0204fa9..bca5acedbba40 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -51,9 +51,11 @@ futures = { version = "0.3", default-features = false, features = ["alloc"] } futures-async-stream = { workspace = true } glob = "0.3" google-cloud-pubsub = "0.20" -hyper = "0.14" +http = "0.2" +hyper = { version = "0.14", features = ["client", "tcp", "http1", "http2", "stream"] } hyper-tls = "0.5" icelake = { workspace = true } +indexmap ={ version = "1.9.3", features = ["serde"] } itertools = "0.11" jni = { version = "0.21.1", features = ["invocation"] } jsonschema-transpiler = { git = "https://github.com/mozilla/jsonschema-transpiler", rev = "c1a89d720d118843d8bcca51084deb0ed223e4b4" } diff --git a/src/connector/src/common.rs b/src/connector/src/common.rs index a0be76d7a837c..2031cd8eb0b55 100644 --- a/src/connector/src/common.rs +++ b/src/connector/src/common.rs @@ -37,6 +37,7 @@ use url::Url; use crate::aws_auth::AwsAuthProps; use crate::aws_utils::load_file_descriptor_from_s3; use crate::deserialize_duration_from_string; +use crate::sink::doris_connector::DorisGet; use crate::sink::SinkError; use crate::source::nats::source::NatsOffset; // The file describes the common abstractions for each connector and can be used in both source and @@ -439,6 +440,32 @@ impl ClickHouseCommon { } } +#[derive(Deserialize, Serialize, Debug, Clone)] +pub struct DorisCommon { + #[serde(rename = "doris.url")] + pub url: String, + #[serde(rename = "doris.user")] + pub user: String, + #[serde(rename = "doris.password")] + pub password: String, + #[serde(rename = "doris.database")] + pub database: String, + #[serde(rename = "doris.table")] + pub table: String, +} + +impl DorisCommon { + pub(crate) fn build_get_client(&self) -> DorisGet { + DorisGet::new( + self.url.clone(), + self.table.clone(), + self.database.clone(), + self.user.clone(), + self.password.clone(), + ) + } +} + #[derive(Debug, Serialize, Deserialize)] pub struct UpsertMessage<'a> { #[serde(borrow)] diff --git a/src/connector/src/sink/doris.rs b/src/connector/src/sink/doris.rs new file mode 100644 index 0000000000000..c19365fcc51c4 --- /dev/null +++ b/src/connector/src/sink/doris.rs @@ -0,0 +1,357 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use anyhow::anyhow; +use async_trait::async_trait; +use risingwave_common::array::{Op, StreamChunk}; +use risingwave_common::buffer::Bitmap; +use risingwave_common::catalog::Schema; +use risingwave_common::types::DataType; +use serde::Deserialize; +use serde_json::Value; +use serde_with::serde_as; + +use super::doris_connector::{DorisField, DorisInsert, DorisInsertClient, DORIS_DELETE_SIGN}; +use super::utils::doris_rows_to_json; +use super::{SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; +use crate::common::DorisCommon; +use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam}; + +pub const DORIS_SINK: &str = "doris"; +#[serde_as] +#[derive(Clone, Debug, Deserialize)] +pub struct DorisConfig { + #[serde(flatten)] + pub common: DorisCommon, + + pub r#type: String, // accept "append-only" or "upsert" +} +impl DorisConfig { + pub fn from_hashmap(properties: HashMap) -> Result { + let config = + serde_json::from_value::(serde_json::to_value(properties).unwrap()) + .map_err(|e| SinkError::Config(anyhow!(e)))?; + if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT { + return Err(SinkError::Config(anyhow!( + "`{}` must be {}, or {}", + SINK_TYPE_OPTION, + SINK_TYPE_APPEND_ONLY, + SINK_TYPE_UPSERT + ))); + } + Ok(config) + } +} + +#[derive(Debug)] +pub struct DorisSink { + pub config: DorisConfig, + schema: Schema, + pk_indices: Vec, + is_append_only: bool, +} + +impl DorisSink { + pub fn new( + config: DorisConfig, + schema: Schema, + pk_indices: Vec, + is_append_only: bool, + ) -> Result { + Ok(Self { + config, + schema, + pk_indices, + is_append_only, + }) + } +} + +impl DorisSink { + fn check_column_name_and_type(&self, doris_column_fields: Vec) -> Result<()> { + let doris_columns_desc: HashMap = doris_column_fields + .iter() + .map(|s| (s.name.clone(), s.r#type.clone())) + .collect(); + + let rw_fields_name = self.schema.fields(); + if rw_fields_name.len().ne(&doris_columns_desc.len()) { + return Err(SinkError::Doris("The length of the RisingWave column must be equal to the length of the doris column".to_string())); + } + + for i in rw_fields_name { + let value = doris_columns_desc.get(&i.name).ok_or_else(|| { + SinkError::Doris(format!( + "Column name don't find in doris, risingwave is {:?} ", + i.name + )) + })?; + if !Self::check_and_correct_column_type(&i.data_type, value.to_string())? { + return Err(SinkError::Doris(format!( + "Column type don't match, column name is {:?}. doris type is {:?} risingwave type is {:?} ",i.name,value,i.data_type + ))); + } + } + Ok(()) + } + + fn check_and_correct_column_type( + rw_data_type: &DataType, + doris_data_type: String, + ) -> Result { + match rw_data_type { + risingwave_common::types::DataType::Boolean => Ok(doris_data_type.contains("BOOLEAN")), + risingwave_common::types::DataType::Int16 => Ok(doris_data_type.contains("SMALLINT")), + risingwave_common::types::DataType::Int32 => Ok(doris_data_type.contains("INT")), + risingwave_common::types::DataType::Int64 => Ok(doris_data_type.contains("BIGINT")), + risingwave_common::types::DataType::Float32 => Ok(doris_data_type.contains("FLOAT")), + risingwave_common::types::DataType::Float64 => Ok(doris_data_type.contains("DOUBLE")), + risingwave_common::types::DataType::Decimal => Ok(doris_data_type.contains("DECIMAL")), + risingwave_common::types::DataType::Date => Ok(doris_data_type.contains("DATE")), + risingwave_common::types::DataType::Varchar => { + Ok(doris_data_type.contains("STRING") | doris_data_type.contains("VARCHAR")) + } + risingwave_common::types::DataType::Time => { + Err(SinkError::Doris("doris can not support Time".to_string())) + } + risingwave_common::types::DataType::Timestamp => { + Ok(doris_data_type.contains("DATETIME")) + } + risingwave_common::types::DataType::Timestamptz => Err(SinkError::Doris( + "doris can not support Timestamptz".to_string(), + )), + risingwave_common::types::DataType::Interval => Err(SinkError::Doris( + "doris can not support Interval".to_string(), + )), + risingwave_common::types::DataType::Struct(_) => Ok(doris_data_type.contains("STRUCT")), + risingwave_common::types::DataType::List(_) => Ok(doris_data_type.contains("ARRAY")), + risingwave_common::types::DataType::Bytea => { + Err(SinkError::Doris("doris can not support Bytea".to_string())) + } + risingwave_common::types::DataType::Jsonb => Ok(doris_data_type.contains("JSONB")), + risingwave_common::types::DataType::Serial => Ok(doris_data_type.contains("BIGINT")), + risingwave_common::types::DataType::Int256 => { + Err(SinkError::Doris("doris can not support Int256".to_string())) + } + } + } +} + +#[async_trait] +impl Sink for DorisSink { + type Coordinator = DummySinkCommitCoordinator; + type Writer = DorisSinkWriter; + + async fn new_writer(&self, _writer_env: SinkWriterParam) -> Result { + Ok(DorisSinkWriter::new( + self.config.clone(), + self.schema.clone(), + self.pk_indices.clone(), + self.is_append_only, + ) + .await?) + } + + async fn validate(&self) -> Result<()> { + if !self.is_append_only && self.pk_indices.is_empty() { + return Err(SinkError::Config(anyhow!( + "Primary key not defined for upsert doris sink (please define in `primary_key` field)"))); + } + // check reachability + let client = self.config.common.build_get_client(); + let doris_schema = client.get_schema_from_doris().await?; + + if !self.is_append_only && doris_schema.keys_type.ne("UNIQUE_KEYS") { + return Err(SinkError::Config(anyhow!( + "If you want to use upsert, please set the keysType of doris to UNIQUE_KEYS" + ))); + } + self.check_column_name_and_type(doris_schema.properties)?; + Ok(()) + } +} + +pub struct DorisSinkWriter { + pub config: DorisConfig, + schema: Schema, + pk_indices: Vec, + client: DorisInsertClient, + is_append_only: bool, + insert: Option, + decimal_map: HashMap, +} + +impl DorisSinkWriter { + pub async fn new( + config: DorisConfig, + schema: Schema, + pk_indices: Vec, + is_append_only: bool, + ) -> Result { + let mut decimal_map = HashMap::default(); + let doris_schema = config + .common + .build_get_client() + .get_schema_from_doris() + .await?; + doris_schema.properties.iter().for_each(|s| { + if let Some(v) = s.get_decimal_pre_scale() { + decimal_map.insert(s.name.clone(), v); + } + }); + let mut map = HashMap::new(); + map.insert("format".to_string(), "json".to_string()); + map.insert("read_json_by_line".to_string(), "true".to_string()); + let doris_insert_client = DorisInsertClient::new( + config.common.url.clone(), + config.common.database.clone(), + config.common.table.clone(), + ) + .add_common_header() + .set_user_password(config.common.user.clone(), config.common.password.clone()) + .set_properties(map); + let mut doris_insert_client = if !is_append_only { + doris_insert_client.add_hidden_column() + } else { + doris_insert_client + }; + let insert = Some(doris_insert_client.build().await?); + Ok(Self { + config, + schema, + pk_indices, + client: doris_insert_client, + is_append_only, + insert, + decimal_map, + }) + } + + async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> { + for (op, row) in chunk.rows() { + if op != Op::Insert { + continue; + } + let row_json_string = + Value::Object(doris_rows_to_json(row, &self.schema, &self.decimal_map)?) + .to_string(); + self.insert + .as_mut() + .ok_or_else(|| SinkError::Doris("Can't find doris sink insert".to_string()))? + .write(row_json_string.into()) + .await?; + } + Ok(()) + } + + async fn upsert(&mut self, chunk: StreamChunk) -> Result<()> { + for (op, row) in chunk.rows() { + match op { + Op::Insert => { + let mut row_json_value = + doris_rows_to_json(row, &self.schema, &self.decimal_map)?; + row_json_value.insert( + DORIS_DELETE_SIGN.to_string(), + Value::String("0".to_string()), + ); + let row_json_string = serde_json::to_string(&row_json_value) + .map_err(|e| SinkError::Doris(format!("Json derialize error {:?}", e)))?; + self.insert + .as_mut() + .ok_or_else(|| { + SinkError::Doris("Can't find doris sink insert".to_string()) + })? + .write(row_json_string.into()) + .await?; + } + Op::Delete => { + let mut row_json_value = + doris_rows_to_json(row, &self.schema, &self.decimal_map)?; + row_json_value.insert( + DORIS_DELETE_SIGN.to_string(), + Value::String("1".to_string()), + ); + let row_json_string = serde_json::to_string(&row_json_value) + .map_err(|e| SinkError::Doris(format!("Json derialize error {:?}", e)))?; + self.insert + .as_mut() + .ok_or_else(|| { + SinkError::Doris("Can't find doris sink insert".to_string()) + })? + .write(row_json_string.into()) + .await?; + } + Op::UpdateDelete => {} + Op::UpdateInsert => { + let mut row_json_value = + doris_rows_to_json(row, &self.schema, &self.decimal_map)?; + row_json_value.insert( + DORIS_DELETE_SIGN.to_string(), + Value::String("0".to_string()), + ); + let row_json_string = serde_json::to_string(&row_json_value) + .map_err(|e| SinkError::Doris(format!("Json derialize error {:?}", e)))?; + self.insert + .as_mut() + .ok_or_else(|| { + SinkError::Doris("Can't find doris sink insert".to_string()) + })? + .write(row_json_string.into()) + .await?; + } + } + } + Ok(()) + } +} + +#[async_trait] +impl SinkWriter for DorisSinkWriter { + async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { + if self.insert.is_none() { + self.insert = Some(self.client.build().await?); + } + if self.is_append_only { + self.append_only(chunk).await + } else { + self.upsert(chunk).await + } + } + + async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> { + Ok(()) + } + + async fn abort(&mut self) -> Result<()> { + Ok(()) + } + + async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> { + if self.insert.is_some() { + let insert = self + .insert + .take() + .ok_or_else(|| SinkError::Doris("Can't find doris inserter".to_string()))?; + insert.finish().await?; + } + Ok(()) + } + + async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Arc) -> Result<()> { + Ok(()) + } +} diff --git a/src/connector/src/sink/doris_connector.rs b/src/connector/src/sink/doris_connector.rs new file mode 100644 index 0000000000000..116cd91d86542 --- /dev/null +++ b/src/connector/src/sink/doris_connector.rs @@ -0,0 +1,423 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use core::mem; +use core::time::Duration; +use std::collections::HashMap; + +use base64::engine::general_purpose; +use base64::Engine; +use bytes::{BufMut, Bytes, BytesMut}; +use http::request::Builder; +use hyper::body::{Body, Sender}; +use hyper::client::HttpConnector; +use hyper::{body, Client, Request, StatusCode}; +use hyper_tls::HttpsConnector; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use tokio::task::JoinHandle; + +use super::{Result, SinkError}; + +const BUFFER_SIZE: usize = 64 * 1024; +const MIN_CHUNK_SIZE: usize = BUFFER_SIZE - 1024; +const DORIS_SUCCESS_STATUS: [&str; 2] = ["Success", "Publish Timeout"]; +pub(crate) const DORIS_DELETE_SIGN: &str = "__DORIS_DELETE_SIGN__"; +const SEND_CHUNK_TIMEOUT: Duration = Duration::from_secs(10); +const WAIT_HANDDLE_TIMEOUT: Duration = Duration::from_secs(10); +const POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(30); +pub struct DorisInsertClient { + url: String, + header: HashMap, + sender: Option, +} +impl DorisInsertClient { + pub fn new(url: String, db: String, table: String) -> Self { + let url = format!("{}/api/{}/{}/_stream_load", url, db, table); + Self { + url, + header: HashMap::default(), + sender: None, + } + } + + pub fn set_url(mut self, url: String) -> Self { + self.url = url; + self + } + + pub fn add_common_header(mut self) -> Self { + self.header + .insert("expect".to_string(), "100-continue".to_string()); + self + } + + /// The method is temporarily not in use, reserved for later use in 2PC. + /// Doris will generate a default, non-repeating label. + pub fn set_label(mut self, label: String) -> Self { + self.header.insert("label".to_string(), label); + self + } + + /// This method is only called during upsert operations. + pub fn add_hidden_column(mut self) -> Self { + self.header + .insert("hidden_columns".to_string(), DORIS_DELETE_SIGN.to_string()); + self + } + + /// The method is temporarily not in use, reserved for later use in 2PC. + pub fn enable_2_pc(mut self) -> Self { + self.header + .insert("two_phase_commit".to_string(), "true".to_string()); + self + } + + pub fn set_user_password(mut self, user: String, password: String) -> Self { + let auth = format!( + "Basic {}", + general_purpose::STANDARD.encode(format!("{}:{}", user, password)) + ); + self.header.insert("Authorization".to_string(), auth); + self + } + + /// The method is temporarily not in use, reserved for later use in 2PC. + pub fn set_txn_id(mut self, txn_id: i64) -> Self { + self.header + .insert("txn_operation".to_string(), txn_id.to_string()); + self + } + + /// The method is temporarily not in use, reserved for later use in 2PC. + pub fn add_commit(mut self) -> Self { + self.header + .insert("txn_operation".to_string(), "commit".to_string()); + self + } + + /// The method is temporarily not in use, reserved for later use in 2PC. + pub fn add_abort(mut self) -> Self { + self.header + .insert("txn_operation".to_string(), "abort".to_string()); + self + } + + /// This method is used to add custom message headers, such as the data import format. + pub fn set_properties(mut self, properties: HashMap) -> Self { + self.header.extend(properties); + self + } + + fn build_request_and_client( + &self, + uri: String, + ) -> (Builder, Client>) { + let mut builder = Request::put(uri); + for (k, v) in &self.header { + builder = builder.header(k, v); + } + + let connector = HttpsConnector::new(); + let client = Client::builder() + .pool_idle_timeout(POOL_IDLE_TIMEOUT) + .build(connector); + + (builder, client) + } + + pub async fn build(&mut self) -> Result { + let (builder, client) = self.build_request_and_client(self.url.clone()); + + let request_get_url = builder + .body(Body::empty()) + .map_err(|err| SinkError::Http(err.into()))?; + let resp = client + .request(request_get_url) + .await + .map_err(|err| SinkError::Http(err.into()))?; + let be_url = if resp.status() == StatusCode::TEMPORARY_REDIRECT { + resp.headers() + .get("location") + .ok_or_else(|| { + SinkError::Http(anyhow::anyhow!("Can't get doris BE url in header",)) + })? + .to_str() + .map_err(|err| { + SinkError::Http(anyhow::anyhow!( + "Can't get doris BE url in header {:?}", + err + )) + })? + } else { + return Err(SinkError::Http(anyhow::anyhow!("Can't get doris BE url",))); + }; + + let (builder, client) = self.build_request_and_client(be_url.to_string()); + let (sender, body) = Body::channel(); + let request = builder + .body(body) + .map_err(|err| SinkError::Http(err.into()))?; + let feature = client.request(request); + + let handle: JoinHandle> = tokio::spawn(async move { + let response = feature.await.map_err(|err| SinkError::Http(err.into()))?; + let status = response.status(); + let raw_string = String::from_utf8( + body::to_bytes(response.into_body()) + .await + .map_err(|err| SinkError::Http(err.into()))? + .to_vec(), + ) + .map_err(|err| SinkError::Http(err.into()))?; + + if status == StatusCode::OK && !raw_string.is_empty() { + let response: DorisInsertResultResponse = + serde_json::from_str(&raw_string).map_err(|err| SinkError::Http(err.into()))?; + Ok(response) + } else { + Err(SinkError::Http(anyhow::anyhow!( + "Failed connection {:?},{:?}", + status, + raw_string + ))) + } + }); + + Ok(DorisInsert::new(sender, handle)) + } +} + +pub struct DorisInsert { + sender: Option, + join_handle: Option>>, + buffer: BytesMut, + is_first_record: bool, +} +impl DorisInsert { + pub fn new(sender: Sender, join_handle: JoinHandle>) -> Self { + Self { + sender: Some(sender), + join_handle: Some(join_handle), + buffer: BytesMut::with_capacity(BUFFER_SIZE), + is_first_record: true, + } + } + + async fn send_chunk(&mut self) -> Result<()> { + if self.sender.is_none() { + return Ok(()); + } + + let chunk = mem::replace(&mut self.buffer, BytesMut::with_capacity(BUFFER_SIZE)); + + let is_timed_out = match tokio::time::timeout( + SEND_CHUNK_TIMEOUT, + self.sender.as_mut().unwrap().send_data(chunk.into()), + ) + .await + { + Ok(Ok(_)) => return Ok(()), + Ok(Err(_)) => false, + Err(_) => true, + }; + self.abort()?; + + let res = self.wait_handle().await; + + if is_timed_out { + Err(SinkError::Http(anyhow::anyhow!("timeout"))) + } else { + res?; + Err(SinkError::Http(anyhow::anyhow!("channel closed"))) + } + } + + fn abort(&mut self) -> Result<()> { + if let Some(sender) = self.sender.take() { + sender.abort(); + } + Ok(()) + } + + pub async fn write(&mut self, data: Bytes) -> Result<()> { + if self.is_first_record { + self.is_first_record = false; + } else { + self.buffer.put_slice("\n".as_bytes()); + } + self.buffer.put_slice(&data); + if self.buffer.len() >= MIN_CHUNK_SIZE { + self.send_chunk().await?; + } + Ok(()) + } + + async fn wait_handle(&mut self) -> Result { + let res = + match tokio::time::timeout(WAIT_HANDDLE_TIMEOUT, self.join_handle.as_mut().unwrap()) + .await + { + Ok(res) => res.map_err(|err| SinkError::Http(err.into()))??, + Err(err) => return Err(SinkError::Http(err.into())), + }; + if !DORIS_SUCCESS_STATUS.contains(&res.status.as_str()) { + return Err(SinkError::Http(anyhow::anyhow!( + "Insert error: {:?}, error url: {:?}", + res.message, + res.err_url + ))); + }; + Ok(res) + } + + pub async fn finish(mut self) -> Result { + if !self.buffer.is_empty() { + self.send_chunk().await?; + } + self.sender = None; + self.wait_handle().await + } +} + +pub struct DorisGet { + url: String, + table: String, + db: String, + user: String, + password: String, +} +impl DorisGet { + pub fn new(url: String, table: String, db: String, user: String, password: String) -> Self { + Self { + url, + table, + db, + user, + password, + } + } + + pub async fn get_schema_from_doris(&self) -> Result { + let uri = format!("{}/api/{}/{}/_schema", self.url, self.db, self.table); + let builder = Request::get(uri); + + let connector = HttpsConnector::new(); + let client = Client::builder() + .pool_idle_timeout(POOL_IDLE_TIMEOUT) + .build(connector); + + let request = builder + .header( + "Authorization", + format!( + "Basic {}", + general_purpose::STANDARD.encode(format!("{}:{}", self.user, self.password)) + ), + ) + .body(Body::empty()) + .map_err(|err| SinkError::Http(err.into()))?; + + let response = client + .request(request) + .await + .map_err(|err| SinkError::Http(err.into()))?; + + let raw_bytes = String::from_utf8(match body::to_bytes(response.into_body()).await { + Ok(bytes) => bytes.to_vec(), + Err(err) => return Err(SinkError::Http(err.into())), + }) + .map_err(|err| SinkError::Http(err.into()))?; + + let json_map: HashMap = + serde_json::from_str(&raw_bytes).map_err(|err| SinkError::Http(err.into()))?; + let json_data = if json_map.contains_key("code") && json_map.contains_key("msg") { + let data = json_map + .get("data") + .ok_or_else(|| SinkError::Http(anyhow::anyhow!("Can't find data")))?; + data.to_string() + } else { + raw_bytes + }; + let schema: DorisSchema = serde_json::from_str(&json_data).map_err(|err| { + SinkError::Http(anyhow::anyhow!("Can't get schema from json {:?}", err)) + })?; + Ok(schema) + } +} +#[derive(Debug, Serialize, Deserialize)] +pub struct DorisSchema { + status: i32, + #[serde(rename = "keysType")] + pub keys_type: String, + pub properties: Vec, +} +#[derive(Debug, Serialize, Deserialize)] +pub struct DorisField { + pub name: String, + pub r#type: String, + comment: String, + pub precision: Option, + pub scale: Option, + aggregation_type: String, +} +impl DorisField { + pub fn get_decimal_pre_scale(&self) -> Option<(u8, u8)> { + if self.r#type.contains("DECIMAL") { + let a = self.precision.clone().unwrap().parse::().unwrap(); + let b = self.scale.clone().unwrap().parse::().unwrap(); + Some((a, b)) + } else { + None + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct DorisInsertResultResponse { + #[serde(rename = "TxnId")] + txn_id: i64, + #[serde(rename = "Label")] + label: String, + #[serde(rename = "Status")] + status: String, + #[serde(rename = "TwoPhaseCommit")] + two_phase_commit: String, + #[serde(rename = "Message")] + message: String, + #[serde(rename = "NumberTotalRows")] + number_total_rows: i64, + #[serde(rename = "NumberLoadedRows")] + number_loaded_rows: i64, + #[serde(rename = "NumberFilteredRows")] + number_filtered_rows: i32, + #[serde(rename = "NumberUnselectedRows")] + number_unselected_rows: i32, + #[serde(rename = "LoadBytes")] + load_bytes: i64, + #[serde(rename = "LoadTimeMs")] + load_time_ms: i32, + #[serde(rename = "BeginTxnTimeMs")] + begin_txn_time_ms: i32, + #[serde(rename = "StreamLoadPutTimeMs")] + stream_load_put_time_ms: i32, + #[serde(rename = "ReadDataTimeMs")] + read_data_time_ms: i32, + #[serde(rename = "WriteDataTimeMs")] + write_data_time_ms: i32, + #[serde(rename = "CommitAndPublishTimeMs")] + commit_and_publish_time_ms: i32, + #[serde(rename = "ErrorURL")] + err_url: Option, +} diff --git a/src/connector/src/sink/encoder/json.rs b/src/connector/src/sink/encoder/json.rs index e4fe775f6306c..9264df0d12a97 100644 --- a/src/connector/src/sink/encoder/json.rs +++ b/src/connector/src/sink/encoder/json.rs @@ -12,23 +12,26 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use base64::engine::general_purpose; use base64::Engine as _; use chrono::{Datelike, Timelike}; +use indexmap::IndexMap; use risingwave_common::array::{ArrayError, ArrayResult}; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::row::Row; -use risingwave_common::types::{DataType, DatumRef, ScalarRefImpl, ToText}; +use risingwave_common::types::{DataType, DatumRef, Decimal, ScalarRefImpl, ToText}; use risingwave_common::util::iter_util::ZipEqDebug; use serde_json::{json, Map, Value}; -use super::{Result, RowEncoder, SerTo, TimestampHandlingMode}; +use super::{CustomJsonType, Result, RowEncoder, SerTo, TimestampHandlingMode}; use crate::sink::SinkError; - pub struct JsonEncoder<'a> { schema: &'a Schema, col_indices: Option<&'a [usize]>, timestamp_handling_mode: TimestampHandlingMode, + custom_json_type: CustomJsonType, } impl<'a> JsonEncoder<'a> { @@ -41,6 +44,21 @@ impl<'a> JsonEncoder<'a> { schema, col_indices, timestamp_handling_mode, + custom_json_type: CustomJsonType::None, + } + } + + pub fn new_with_doris( + schema: &'a Schema, + col_indices: Option<&'a [usize]>, + timestamp_handling_mode: TimestampHandlingMode, + map: HashMap, + ) -> Self { + Self { + schema, + col_indices, + timestamp_handling_mode, + custom_json_type: CustomJsonType::Doris(map), } } } @@ -65,9 +83,13 @@ impl<'a> RowEncoder for JsonEncoder<'a> { for idx in col_indices { let field = &self.schema[idx]; let key = field.name.clone(); - let value = - datum_to_json_object(field, row.datum_at(idx), self.timestamp_handling_mode) - .map_err(|e| SinkError::JsonParse(e.to_string()))?; + let value = datum_to_json_object( + field, + row.datum_at(idx), + self.timestamp_handling_mode, + &self.custom_json_type, + ) + .map_err(|e| SinkError::JsonParse(e.to_string()))?; mappings.insert(key, value); } Ok(mappings) @@ -90,6 +112,7 @@ fn datum_to_json_object( field: &Field, datum: DatumRef<'_>, timestamp_handling_mode: TimestampHandlingMode, + custom_json_type: &CustomJsonType, ) -> ArrayResult { let scalar_ref = match datum { None => return Ok(Value::Null), @@ -122,9 +145,26 @@ fn datum_to_json_object( (DataType::Varchar, ScalarRefImpl::Utf8(v)) => { json!(v) } - (DataType::Decimal, ScalarRefImpl::Decimal(v)) => { - json!(v.to_text()) - } + (DataType::Decimal, ScalarRefImpl::Decimal(mut v)) => match custom_json_type { + CustomJsonType::Doris(map) => { + if !matches!(v, Decimal::Normalized(_)) { + return Err(ArrayError::internal( + "doris can't support decimal Inf, -Inf, Nan".to_string(), + )); + } + let (p, s) = map.get(&field.name).unwrap(); + v.rescale(*s as u32); + let v_string = v.to_text(); + if v_string.len() > *p as usize { + return Err(ArrayError::internal( + format!("rw Decimal's precision is large than doris max decimal len is {:?}, doris max is {:?}",v_string.len(),p))); + } + json!(v_string) + } + CustomJsonType::None => { + json!(v.to_text()) + } + }, (DataType::Timestamptz, ScalarRefImpl::Timestamptz(v)) => { // risingwave's timestamp with timezone is stored in UTC and does not maintain the // timezone info and the time is in microsecond. @@ -136,9 +176,13 @@ fn datum_to_json_object( // todo: just ignore the nanos part to avoid leap second complex json!(v.0.num_seconds_from_midnight() as i64 * 1000) } - (DataType::Date, ScalarRefImpl::Date(v)) => { - json!(v.0.num_days_from_ce()) - } + (DataType::Date, ScalarRefImpl::Date(v)) => match custom_json_type { + CustomJsonType::None => json!(v.0.num_days_from_ce()), + CustomJsonType::Doris(_) => { + let a = v.0.format("%Y-%m-%d").to_string(); + json!(a) + } + }, (DataType::Timestamp, ScalarRefImpl::Timestamp(v)) => match timestamp_handling_mode { TimestampHandlingMode::Milli => json!(v.0.timestamp_millis()), TimestampHandlingMode::String => json!(v.0.format("%Y-%m-%d %H:%M:%S%.6f").to_string()), @@ -158,23 +202,54 @@ fn datum_to_json_object( let mut vec = Vec::with_capacity(elems.len()); let inner_field = Field::unnamed(Box::::into_inner(datatype)); for sub_datum_ref in elems { - let value = - datum_to_json_object(&inner_field, sub_datum_ref, timestamp_handling_mode)?; + let value = datum_to_json_object( + &inner_field, + sub_datum_ref, + timestamp_handling_mode, + custom_json_type, + )?; vec.push(value); } json!(vec) } (DataType::Struct(st), ScalarRefImpl::Struct(struct_ref)) => { - let mut map = Map::with_capacity(st.len()); - for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug( - st.iter() - .map(|(name, dt)| Field::with_name(dt.clone(), name)), - ) { - let value = - datum_to_json_object(&sub_field, sub_datum_ref, timestamp_handling_mode)?; - map.insert(sub_field.name.clone(), value); + match custom_json_type { + CustomJsonType::Doris(_) => { + // We need to ensure that the order of elements in the json matches the insertion order. + let mut map = IndexMap::with_capacity(st.len()); + for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug( + st.iter() + .map(|(name, dt)| Field::with_name(dt.clone(), name)), + ) { + let value = datum_to_json_object( + &sub_field, + sub_datum_ref, + timestamp_handling_mode, + custom_json_type, + )?; + map.insert(sub_field.name.clone(), value); + } + Value::String(serde_json::to_string(&map).map_err(|err| { + ArrayError::internal(format!("Json to string err{:?}", err)) + })?) + } + CustomJsonType::None => { + let mut map = Map::with_capacity(st.len()); + for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug( + st.iter() + .map(|(name, dt)| Field::with_name(dt.clone(), name)), + ) { + let value = datum_to_json_object( + &sub_field, + sub_datum_ref, + timestamp_handling_mode, + custom_json_type, + )?; + map.insert(sub_field.name.clone(), value); + } + json!(map) + } } - json!(map) } (data_type, scalar_ref) => { return Err(ArrayError::internal( @@ -189,7 +264,10 @@ fn datum_to_json_object( #[cfg(test)] mod tests { - use risingwave_common::types::{DataType, Interval, ScalarImpl, Time, Timestamp}; + use risingwave_common::types::{ + DataType, Date, Interval, Scalar, ScalarImpl, StructRef, StructType, StructValue, Time, + Timestamp, + }; use super::*; #[test] @@ -200,6 +278,7 @@ mod tests { sub_fields: Default::default(), type_name: Default::default(), }; + let boolean_value = datum_to_json_object( &Field { data_type: DataType::Boolean, @@ -207,6 +286,7 @@ mod tests { }, Some(ScalarImpl::Bool(false).as_scalar_ref_impl()), TimestampHandlingMode::String, + &CustomJsonType::None, ) .unwrap(); assert_eq!(boolean_value, json!(false)); @@ -218,6 +298,7 @@ mod tests { }, Some(ScalarImpl::Int16(16).as_scalar_ref_impl()), TimestampHandlingMode::String, + &CustomJsonType::None, ) .unwrap(); assert_eq!(int16_value, json!(16)); @@ -229,6 +310,7 @@ mod tests { }, Some(ScalarImpl::Int64(std::i64::MAX).as_scalar_ref_impl()), TimestampHandlingMode::String, + &CustomJsonType::None, ) .unwrap(); assert_eq!( @@ -245,6 +327,7 @@ mod tests { }, Some(ScalarImpl::Timestamptz(tstz_inner).as_scalar_ref_impl()), TimestampHandlingMode::String, + &CustomJsonType::None, ) .unwrap(); assert_eq!(tstz_value, "2018-01-26 18:30:09.453000"); @@ -259,6 +342,7 @@ mod tests { .as_scalar_ref_impl(), ), TimestampHandlingMode::Milli, + &CustomJsonType::None, ) .unwrap(); assert_eq!(ts_value, json!(1000 * 1000)); @@ -273,6 +357,7 @@ mod tests { .as_scalar_ref_impl(), ), TimestampHandlingMode::String, + &CustomJsonType::None, ) .unwrap(); assert_eq!(ts_value, json!("1970-01-01 00:16:40.000000".to_string())); @@ -288,6 +373,7 @@ mod tests { .as_scalar_ref_impl(), ), TimestampHandlingMode::String, + &CustomJsonType::None, ) .unwrap(); assert_eq!(time_value, json!(1000 * 1000)); @@ -295,15 +381,65 @@ mod tests { let interval_value = datum_to_json_object( &Field { data_type: DataType::Interval, - ..mock_field + ..mock_field.clone() }, Some( ScalarImpl::Interval(Interval::from_month_day_usec(13, 2, 1000000)) .as_scalar_ref_impl(), ), TimestampHandlingMode::String, + &CustomJsonType::None, ) .unwrap(); assert_eq!(interval_value, json!("P1Y1M2DT0H0M1S")); + + let mut map = HashMap::default(); + map.insert("aaa".to_string(), (10_u8, 5_u8)); + let decimal = datum_to_json_object( + &Field { + data_type: DataType::Decimal, + name: "aaa".to_string(), + ..mock_field.clone() + }, + Some(ScalarImpl::Decimal(Decimal::try_from(1.1111111).unwrap()).as_scalar_ref_impl()), + TimestampHandlingMode::String, + &CustomJsonType::Doris(map), + ) + .unwrap(); + assert_eq!(decimal, json!("1.11111")); + + let date_value = datum_to_json_object( + &Field { + data_type: DataType::Date, + ..mock_field.clone() + }, + Some(ScalarImpl::Date(Date::from_ymd_uncheck(2010, 10, 10)).as_scalar_ref_impl()), + TimestampHandlingMode::String, + &CustomJsonType::Doris(HashMap::default()), + ) + .unwrap(); + assert_eq!(date_value, json!("2010-10-10")); + + let value = StructValue::new(vec![ + Some(3_i32.to_scalar_value()), + Some(2_i32.to_scalar_value()), + Some(1_i32.to_scalar_value()), + ]); + + let interval_value = datum_to_json_object( + &Field { + data_type: DataType::Struct(StructType::new(vec![ + ("v3", DataType::Int32), + ("v2", DataType::Int32), + ("v1", DataType::Int32), + ])), + ..mock_field.clone() + }, + Some(ScalarRefImpl::Struct(StructRef::ValueRef { val: &value })), + TimestampHandlingMode::String, + &CustomJsonType::Doris(HashMap::default()), + ) + .unwrap(); + assert_eq!(interval_value, json!("{\"v3\":3,\"v2\":2,\"v1\":1}")); } } diff --git a/src/connector/src/sink/encoder/mod.rs b/src/connector/src/sink/encoder/mod.rs index 83f185935a44e..1807fd1d421e8 100644 --- a/src/connector/src/sink/encoder/mod.rs +++ b/src/connector/src/sink/encoder/mod.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use risingwave_common::catalog::Schema; use risingwave_common::row::Row; @@ -78,3 +80,9 @@ pub enum TimestampHandlingMode { Milli, String, } + +#[derive(Clone)] +pub enum CustomJsonType { + Doris(HashMap), + None, +} diff --git a/src/connector/src/sink/kinesis.rs b/src/connector/src/sink/kinesis.rs index bf52e5dcf5594..7c43abcbe82a7 100644 --- a/src/connector/src/sink/kinesis.rs +++ b/src/connector/src/sink/kinesis.rs @@ -208,7 +208,6 @@ impl KinesisSinkWriter { ); let val_encoder = JsonEncoder::new(&self.schema, None, TimestampHandlingMode::Milli); let f = UpsertFormatter::new(key_encoder, val_encoder); - self.write_chunk(chunk, f).await } @@ -220,7 +219,6 @@ impl KinesisSinkWriter { ); let val_encoder = JsonEncoder::new(&self.schema, None, TimestampHandlingMode::Milli); let f = AppendOnlyFormatter::new(key_encoder, val_encoder); - self.write_chunk(chunk, f).await } diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 5aee898f7f284..f1f12ed323d7b 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -16,6 +16,8 @@ pub mod boxed; pub mod catalog; pub mod clickhouse; pub mod coordinate; +pub mod doris; +pub mod doris_connector; pub mod encoder; pub mod formatter; pub mod iceberg; @@ -49,6 +51,7 @@ pub use tracing; use self::catalog::SinkType; use self::clickhouse::{ClickHouseConfig, ClickHouseSink}; +use self::doris::{DorisConfig, DorisSink}; use self::encoder::SerTo; use self::formatter::SinkFormatter; use self::iceberg::{IcebergSink, ICEBERG_SINK, REMOTE_ICEBERG_SINK}; @@ -56,6 +59,7 @@ use self::pulsar::{PulsarConfig, PulsarSink}; use crate::sink::boxed::BoxSink; use crate::sink::catalog::{SinkCatalog, SinkId}; use crate::sink::clickhouse::CLICKHOUSE_SINK; +use crate::sink::doris::DORIS_SINK; use crate::sink::iceberg::{IcebergConfig, RemoteIcebergConfig, RemoteIcebergSink}; use crate::sink::kafka::{KafkaConfig, KafkaSink, KAFKA_SINK}; use crate::sink::kinesis::{KinesisSink, KinesisSinkConfig, KINESIS_SINK}; @@ -327,6 +331,7 @@ pub enum SinkConfig { Pulsar(PulsarConfig), BlackHole, ClickHouse(Box), + Doris(Box), Nats(NatsConfig), #[cfg(any(test, madsim))] Test, @@ -389,6 +394,9 @@ impl SinkConfig { CLICKHOUSE_SINK => Ok(SinkConfig::ClickHouse(Box::new( ClickHouseConfig::from_hashmap(properties)?, ))), + DORIS_SINK => Ok(SinkConfig::Doris(Box::new(DorisConfig::from_hashmap( + properties, + )?))), BLACKHOLE_SINK => Ok(SinkConfig::BlackHole), PULSAR_SINK => Ok(SinkConfig::Pulsar(PulsarConfig::from_hashmap(properties)?)), REMOTE_ICEBERG_SINK => Ok(SinkConfig::RemoteIceberg( @@ -420,6 +428,7 @@ pub enum SinkImpl { BlackHole(BlackHoleSink), Kinesis(KinesisSink), ClickHouse(ClickHouseSink), + Doris(DorisSink), Iceberg(IcebergSink), Nats(NatsSink), RemoteIceberg(RemoteIcebergSink), @@ -440,6 +449,7 @@ impl SinkImpl { SinkImpl::Nats(_) => "nats", SinkImpl::RemoteIceberg(_) => "iceberg_java", SinkImpl::TestSink(_) => "test", + SinkImpl::Doris(_) => "doris", } } } @@ -457,6 +467,7 @@ macro_rules! dispatch_sink { SinkImpl::BlackHole($sink) => $body, SinkImpl::Kinesis($sink) => $body, SinkImpl::ClickHouse($sink) => $body, + SinkImpl::Doris($sink) => $body, SinkImpl::Iceberg($sink) => $body, SinkImpl::Nats($sink) => $body, SinkImpl::RemoteIceberg($sink) => $body, @@ -491,6 +502,12 @@ impl SinkImpl { } #[cfg(any(test, madsim))] SinkConfig::Test => SinkImpl::TestSink(build_test_sink(param)?), + SinkConfig::Doris(cfg) => SinkImpl::Doris(DorisSink::new( + *cfg, + param.schema(), + param.downstream_pk, + param.sink_type.is_append_only(), + )?), }) } } @@ -517,6 +534,10 @@ pub enum SinkError { ClickHouse(String), #[error("Nats error: {0}")] Nats(anyhow::Error), + #[error("Doris http error: {0}")] + Http(anyhow::Error), + #[error("Doris error: {0}")] + Doris(String), #[error("Pulsar error: {0}")] Pulsar(anyhow::Error), #[error("Internal error: {0}")] diff --git a/src/connector/src/sink/utils.rs b/src/connector/src/sink/utils.rs index fd25b24f8a7da..da9e92b6ae0aa 100644 --- a/src/connector/src/sink/utils.rs +++ b/src/connector/src/sink/utils.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + // Copyright 2023 RisingWave Labs // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -11,10 +13,9 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - -use risingwave_common::array::StreamChunk; +use risingwave_common::array::{RowRef, StreamChunk}; use risingwave_common::catalog::Schema; -use serde_json::Value; +use serde_json::{Map, Value}; use super::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode}; use crate::sink::Result; @@ -29,3 +30,18 @@ pub fn chunk_to_json(chunk: StreamChunk, schema: &Schema) -> Result> Ok(records) } + +pub fn doris_rows_to_json( + row: RowRef<'_>, + schema: &Schema, + decimal_map: &HashMap, +) -> Result> { + let encoder = JsonEncoder::new_with_doris( + schema, + None, + TimestampHandlingMode::String, + decimal_map.clone(), + ); + let map = encoder.encode(row)?; + Ok(map) +} diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index acbc053ff9983..c380c28c8217d 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -54,7 +54,7 @@ hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", features hashbrown-5ef9efb8ec2df382 = { package = "hashbrown", version = "0.12", features = ["nightly", "raw"] } heck = { version = "0.4", features = ["unicode"] } hyper = { version = "0.14", features = ["full"] } -indexmap = { version = "1", default-features = false, features = ["std"] } +indexmap = { version = "1", default-features = false, features = ["serde", "std"] } itertools = { version = "0.10" } jni = { version = "0.21", features = ["invocation"] } lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] }