From 7b59b918369956643ec23bb0275542837efe0cf8 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 4 Apr 2024 17:23:51 +0800 Subject: [PATCH] doris starrocks connector Signed-off-by: Bugen Zhao --- Cargo.lock | 17 +++- src/connector/Cargo.toml | 3 +- .../src/sink/doris_starrocks_connector.rs | 92 +++++++------------ 3 files changed, 51 insertions(+), 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f4b1716d1a20d..9add4ee34a56c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8878,7 +8878,7 @@ dependencies = [ "url", "wasm-bindgen", "wasm-bindgen-futures", - "wasm-streams", + "wasm-streams 0.3.0", "web-sys", "webpki-roots", "winreg", @@ -8919,10 +8919,12 @@ dependencies = [ "system-configuration", "tokio", "tokio-native-tls", + "tokio-util", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams 0.4.0", "web-sys", "winreg", ] @@ -13793,6 +13795,19 @@ dependencies = [ "web-sys", ] +[[package]] +name = "wasm-streams" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b65dc4c90b63b118468cf747d8bf3566c1913ef60be765b5730ead9e0a3ba129" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "wasmparser" version = "0.201.0" diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index e8b3b9c9d131f..840510ed4b20e 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -65,7 +65,6 @@ hyper = { version = "0.14", features = [ "tcp", "http1", "http2", - "stream", ] } # required by clickhouse client hyper-tls = "0.5" icelake = { workspace = true } @@ -111,7 +110,7 @@ rdkafka = { workspace = true, features = [ ] } redis = { version = "0.25", features = ["aio", "tokio-comp", "async-std-comp"] } regex = "1.4" -reqwest = { version = "0.12.2", features = ["json"] } +reqwest = { version = "0.12.2", features = ["json", "stream"] } risingwave_common = { workspace = true } risingwave_common_estimate_size = { workspace = true } risingwave_jni_core = { workspace = true } diff --git a/src/connector/src/sink/doris_starrocks_connector.rs b/src/connector/src/sink/doris_starrocks_connector.rs index 04d591b820786..15b19bac8b89f 100644 --- a/src/connector/src/sink/doris_starrocks_connector.rs +++ b/src/connector/src/sink/doris_starrocks_connector.rs @@ -15,16 +15,15 @@ use core::mem; use core::time::Duration; use std::collections::HashMap; +use std::convert::Infallible; use anyhow::Context; use base64::engine::general_purpose; use base64::Engine; use bytes::{BufMut, Bytes, BytesMut}; -use http::request::Builder; -use hyper::body::{Body, Sender}; -use hyper::client::HttpConnector; -use hyper::{body, Client, Request, StatusCode}; -use hyper_tls::HttpsConnector; +use futures::StreamExt; +use reqwest::{Body, Client, RequestBuilder, StatusCode}; +use tokio::sync::mpsc::UnboundedSender; use tokio::task::JoinHandle; use url::Url; @@ -187,31 +186,23 @@ impl InserterInnerBuilder { }) } - // TODO: use hyper 1 or reqwest 0.12.2 - fn build_request_and_client( - &self, - uri: String, - ) -> (Builder, Client>) { - let mut builder = Request::put(uri); - for (k, v) in &self.header { - builder = builder.header(k, v); - } - - let connector = HttpsConnector::new(); + fn build_request_and_client(&self, uri: String) -> RequestBuilder { let client = Client::builder() .pool_idle_timeout(POOL_IDLE_TIMEOUT) - .build(connector); + .build() + .unwrap(); - (builder, client) + let mut builder = client.put(uri); + for (k, v) in &self.header { + builder = builder.header(k, v); + } + builder } pub async fn build(&self) -> Result { - let (builder, client) = self.build_request_and_client(self.url.clone()); - let request_get_url = builder - .body(Body::empty()) - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; - let resp = client - .request(request_get_url) + let builder = self.build_request_and_client(self.url.clone()); + let resp = builder + .send() .await .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; let mut be_url = if resp.status() == StatusCode::TEMPORARY_REDIRECT { @@ -249,23 +240,25 @@ impl InserterInnerBuilder { } } - let (builder, client) = self.build_request_and_client(be_url); - let (sender, body) = Body::channel(); - let request = builder - .body(body) - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; - let future = client.request(request); + let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); + let body = Body::wrap_stream( + tokio_stream::wrappers::UnboundedReceiverStream::new(receiver).map(Ok::<_, Infallible>), + ); + let builder = self.build_request_and_client(be_url).body(body); let handle: JoinHandle>> = tokio::spawn(async move { - let response = future + let response = builder + .send() .await .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; let status = response.status(); - let raw = body::to_bytes(response.into_body()) + let raw = response + .bytes() .await .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))? - .to_vec(); - if status == StatusCode::OK && !raw.is_empty() { + .into(); + + if status == StatusCode::OK { Ok(raw) } else { Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!( @@ -280,6 +273,8 @@ impl InserterInnerBuilder { } } +type Sender = UnboundedSender; + pub struct InserterInner { sender: Option, join_handle: Option>>>, @@ -301,37 +296,18 @@ impl InserterInner { let chunk = mem::replace(&mut self.buffer, BytesMut::with_capacity(BUFFER_SIZE)); - let is_timed_out = match tokio::time::timeout( - SEND_CHUNK_TIMEOUT, - self.sender.as_mut().unwrap().send_data(chunk.into()), - ) - .await - { - Ok(Ok(_)) => return Ok(()), - Ok(Err(_)) => false, - Err(_) => true, - }; - self.abort()?; + if let Err(_e) = self.sender.as_mut().unwrap().send(chunk.freeze()) { + self.sender.take(); + self.wait_handle().await?; - let res = self.wait_handle().await; - - if is_timed_out { - Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!("timeout"))) - } else { - res?; Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!( "channel closed" ))) + } else { + Ok(()) } } - fn abort(&mut self) -> Result<()> { - if let Some(sender) = self.sender.take() { - sender.abort(); - } - Ok(()) - } - pub async fn write(&mut self, data: Bytes) -> Result<()> { self.buffer.put_slice(&data); if self.buffer.len() >= MIN_CHUNK_SIZE {