diff --git a/Cargo.lock b/Cargo.lock index cd98f9ce0fab3..58a157173e81e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2232,7 +2232,7 @@ checksum = "98cc8fbded0c607b7ba9dd60cd98df59af97e84d24e49c8557331cfc26d301ce" [[package]] name = "clickhouse" version = "0.11.5" -source = "git+https://github.com/risingwavelabs/clickhouse.rs?rev=622501c1c98c80baaf578c716d6903dde947804e#622501c1c98c80baaf578c716d6903dde947804e" +source = "git+https://github.com/risingwavelabs/clickhouse.rs?rev=d38c8b6391af098b724c114e5a4746aedab6ab8e#d38c8b6391af098b724c114e5a4746aedab6ab8e" dependencies = [ "bstr", "bytes", @@ -2254,7 +2254,7 @@ dependencies = [ [[package]] name = "clickhouse-derive" version = "0.1.1" -source = "git+https://github.com/risingwavelabs/clickhouse.rs?rev=622501c1c98c80baaf578c716d6903dde947804e#622501c1c98c80baaf578c716d6903dde947804e" +source = "git+https://github.com/risingwavelabs/clickhouse.rs?rev=d38c8b6391af098b724c114e5a4746aedab6ab8e#d38c8b6391af098b724c114e5a4746aedab6ab8e" dependencies = [ "proc-macro2", "quote", @@ -8889,7 +8889,7 @@ dependencies = [ "url", "wasm-bindgen", "wasm-bindgen-futures", - "wasm-streams", + "wasm-streams 0.3.0", "web-sys", "webpki-roots", "winreg", @@ -8930,10 +8930,12 @@ dependencies = [ "system-configuration", "tokio", "tokio-native-tls", + "tokio-util", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams 0.4.0", "web-sys", "winreg", ] @@ -9579,8 +9581,6 @@ dependencies = [ "glob", "google-cloud-pubsub", "http 0.2.9", - "hyper 0.14.27", - "hyper-tls 0.5.0", "icelake", "indexmap 1.9.3", "itertools 0.12.1", @@ -13807,6 +13807,19 @@ dependencies = [ "web-sys", ] +[[package]] +name = "wasm-streams" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b65dc4c90b63b118468cf747d8bf3566c1913ef60be765b5730ead9e0a3ba129" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "wasmparser" version = "0.201.0" diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 0a1ba523a9ff6..51437f82e3ec5 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -46,7 +46,7 @@ chrono = { version = "0.4", default-features = false, features = [ "clock", "std", ] } -clickhouse = { git = "https://github.com/risingwavelabs/clickhouse.rs", rev = "622501c1c98c80baaf578c716d6903dde947804e", features = [ +clickhouse = { git = "https://github.com/risingwavelabs/clickhouse.rs", rev = "d38c8b6391af098b724c114e5a4746aedab6ab8e", features = [ "time", ] } csv = "1.3" @@ -60,14 +60,6 @@ gcp-bigquery-client = "0.18.0" glob = "0.3" google-cloud-pubsub = "0.23" http = "0.2" -hyper = { version = "0.14", features = [ - "client", - "tcp", - "http1", - "http2", - "stream", -] } # required by clickhouse client -hyper-tls = "0.5" icelake = { workspace = true } indexmap = { version = "1.9.3", features = ["serde"] } itertools = { workspace = true } @@ -112,7 +104,7 @@ rdkafka = { workspace = true, features = [ ] } redis = { version = "0.25", features = ["aio", "tokio-comp", "async-std-comp","cluster-async"] } regex = "1.4" -reqwest = { version = "0.12.2", features = ["json"] } +reqwest = { version = "0.12.2", features = ["json", "stream"] } risingwave_common = { workspace = true } risingwave_common_estimate_size = { workspace = true } risingwave_jni_core = { workspace = true } diff --git a/src/connector/src/sink/clickhouse.rs b/src/connector/src/sink/clickhouse.rs index cb4c1b64b04ed..5329118c73d61 100644 --- a/src/connector/src/sink/clickhouse.rs +++ b/src/connector/src/sink/clickhouse.rs @@ -11,9 +11,9 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + use core::fmt::Debug; use std::collections::{HashMap, HashSet}; -use std::time::Duration; use anyhow::anyhow; use clickhouse::insert::Insert; @@ -191,18 +191,9 @@ impl ClickHouseEngine { } } -const POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(5); - impl ClickHouseCommon { pub(crate) fn build_client(&self) -> ConnectorResult { - use hyper_tls::HttpsConnector; - - let https = HttpsConnector::new(); - let client = hyper::Client::builder() - .pool_idle_timeout(POOL_IDLE_TIMEOUT) - .build::<_, hyper::Body>(https); - - let client = ClickHouseClient::with_http_client(client) + let client = ClickHouseClient::default() // hyper(0.14) client inside .with_url(&self.url) .with_user(&self.user) .with_password(&self.password) diff --git a/src/connector/src/sink/doris.rs b/src/connector/src/sink/doris.rs index 55b499b652623..75283f200c254 100644 --- a/src/connector/src/sink/doris.rs +++ b/src/connector/src/sink/doris.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -// TODO: use hyper 1 or reqwest 0.12.2 - use std::collections::HashMap; use std::sync::Arc; @@ -22,9 +20,6 @@ use async_trait::async_trait; use base64::engine::general_purpose; use base64::Engine; use bytes::{BufMut, Bytes, BytesMut}; -use hyper::body::Body; -use hyper::{body, Client, Request}; -use hyper_tls::HttpsConnector; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; @@ -433,14 +428,14 @@ impl DorisSchemaClient { pub async fn get_schema_from_doris(&self) -> Result { let uri = format!("{}/api/{}/{}/_schema", self.url, self.db, self.table); - let builder = Request::get(uri); - let connector = HttpsConnector::new(); - let client = Client::builder() + let client = reqwest::Client::builder() .pool_idle_timeout(POOL_IDLE_TIMEOUT) - .build(connector); + .build() + .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; - let request = builder + let response = client + .get(uri) .header( "Authorization", format!( @@ -448,31 +443,24 @@ impl DorisSchemaClient { general_purpose::STANDARD.encode(format!("{}:{}", self.user, self.password)) ), ) - .body(Body::empty()) - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; - - let response = client - .request(request) + .send() .await .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; - let raw_bytes = String::from_utf8(match body::to_bytes(response.into_body()).await { - Ok(bytes) => bytes.to_vec(), - Err(err) => return Err(SinkError::DorisStarrocksConnect(err.into())), - }) - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; - - let json_map: HashMap = serde_json::from_str(&raw_bytes) + let json: Value = response + .json() + .await .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; - let json_data = if json_map.contains_key("code") && json_map.contains_key("msg") { - let data = json_map.get("data").ok_or_else(|| { - SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't find data")) - })?; - data.to_string() + let json_data = if json.get("code").is_some() && json.get("msg").is_some() { + json.get("data") + .ok_or_else(|| { + SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't find data")) + })? + .clone() } else { - raw_bytes + json }; - let schema: DorisSchema = serde_json::from_str(&json_data) + let schema: DorisSchema = serde_json::from_value(json_data) .context("Can't get schema from json") .map_err(SinkError::DorisStarrocksConnect)?; Ok(schema) diff --git a/src/connector/src/sink/doris_starrocks_connector.rs b/src/connector/src/sink/doris_starrocks_connector.rs index 04d591b820786..6c045c63beb47 100644 --- a/src/connector/src/sink/doris_starrocks_connector.rs +++ b/src/connector/src/sink/doris_starrocks_connector.rs @@ -15,16 +15,15 @@ use core::mem; use core::time::Duration; use std::collections::HashMap; +use std::convert::Infallible; use anyhow::Context; use base64::engine::general_purpose; use base64::Engine; use bytes::{BufMut, Bytes, BytesMut}; -use http::request::Builder; -use hyper::body::{Body, Sender}; -use hyper::client::HttpConnector; -use hyper::{body, Client, Request, StatusCode}; -use hyper_tls::HttpsConnector; +use futures::StreamExt; +use reqwest::{redirect, Body, Client, RequestBuilder, StatusCode}; +use tokio::sync::mpsc::UnboundedSender; use tokio::task::JoinHandle; use url::Url; @@ -187,33 +186,27 @@ impl InserterInnerBuilder { }) } - // TODO: use hyper 1 or reqwest 0.12.2 - fn build_request_and_client( - &self, - uri: String, - ) -> (Builder, Client>) { - let mut builder = Request::put(uri); - for (k, v) in &self.header { - builder = builder.header(k, v); - } - - let connector = HttpsConnector::new(); + fn build_request(&self, uri: String) -> RequestBuilder { let client = Client::builder() .pool_idle_timeout(POOL_IDLE_TIMEOUT) - .build(connector); + .redirect(redirect::Policy::none()) // we handle redirect by ourselves + .build() + .unwrap(); - (builder, client) + let mut builder = client.put(uri); + for (k, v) in &self.header { + builder = builder.header(k, v); + } + builder } pub async fn build(&self) -> Result { - let (builder, client) = self.build_request_and_client(self.url.clone()); - let request_get_url = builder - .body(Body::empty()) - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; - let resp = client - .request(request_get_url) + let builder = self.build_request(self.url.clone()); + let resp = builder + .send() .await .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + // TODO: shall we let `reqwest` handle the redirect? let mut be_url = if resp.status() == StatusCode::TEMPORARY_REDIRECT { resp.headers() .get("location") @@ -249,23 +242,25 @@ impl InserterInnerBuilder { } } - let (builder, client) = self.build_request_and_client(be_url); - let (sender, body) = Body::channel(); - let request = builder - .body(body) - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; - let future = client.request(request); + let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); + let body = Body::wrap_stream( + tokio_stream::wrappers::UnboundedReceiverStream::new(receiver).map(Ok::<_, Infallible>), + ); + let builder = self.build_request(be_url).body(body); let handle: JoinHandle>> = tokio::spawn(async move { - let response = future + let response = builder + .send() .await .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; let status = response.status(); - let raw = body::to_bytes(response.into_body()) + let raw = response + .bytes() .await .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))? - .to_vec(); - if status == StatusCode::OK && !raw.is_empty() { + .into(); + + if status == StatusCode::OK { Ok(raw) } else { Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!( @@ -280,6 +275,8 @@ impl InserterInnerBuilder { } } +type Sender = UnboundedSender; + pub struct InserterInner { sender: Option, join_handle: Option>>>, @@ -301,37 +298,18 @@ impl InserterInner { let chunk = mem::replace(&mut self.buffer, BytesMut::with_capacity(BUFFER_SIZE)); - let is_timed_out = match tokio::time::timeout( - SEND_CHUNK_TIMEOUT, - self.sender.as_mut().unwrap().send_data(chunk.into()), - ) - .await - { - Ok(Ok(_)) => return Ok(()), - Ok(Err(_)) => false, - Err(_) => true, - }; - self.abort()?; + if let Err(_e) = self.sender.as_mut().unwrap().send(chunk.freeze()) { + self.sender.take(); + self.wait_handle().await?; - let res = self.wait_handle().await; - - if is_timed_out { - Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!("timeout"))) - } else { - res?; Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!( "channel closed" ))) + } else { + Ok(()) } } - fn abort(&mut self) -> Result<()> { - if let Some(sender) = self.sender.take() { - sender.abort(); - } - Ok(()) - } - pub async fn write(&mut self, data: Bytes) -> Result<()> { self.buffer.put_slice(&data); if self.buffer.len() >= MIN_CHUNK_SIZE { diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index e5e37deb14652..30c74045441a2 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -22,14 +22,10 @@ use aws_sdk_s3::primitives::ByteStream; use aws_sdk_s3::Client as S3Client; use aws_types::region::Region; use bytes::Bytes; -use http::header; -use http::request::Builder; -use hyper::body::Body; -use hyper::client::HttpConnector; -use hyper::{Client, Request, StatusCode}; -use hyper_tls::HttpsConnector; use jsonwebtoken::{encode, Algorithm, EncodingKey, Header}; +use reqwest::{header, Client, RequestBuilder, StatusCode}; use serde::{Deserialize, Serialize}; +use thiserror_ext::AsReport; use super::doris_starrocks_connector::POOL_IDLE_TIMEOUT; use super::{Result, SinkError}; @@ -148,21 +144,19 @@ impl SnowflakeHttpClient { Ok(jwt_token) } - fn build_request_and_client(&self) -> (Builder, Client>) { - let builder = Request::post(self.url.clone()); - - let connector = HttpsConnector::new(); + fn build_request_and_client(&self) -> RequestBuilder { let client = Client::builder() .pool_idle_timeout(POOL_IDLE_TIMEOUT) - .build(connector); + .build() + .unwrap(); - (builder, client) + client.post(&self.url) } /// NOTE: this function should ONLY be called *after* /// uploading files to remote external staged storage, i.e., AWS S3 pub async fn send_request(&self, file_suffix: String) -> Result<()> { - let (builder, client) = self.build_request_and_client(); + let builder = self.build_request_and_client(); // Generate the jwt_token let jwt_token = self.generate_jwt_token()?; @@ -172,19 +166,13 @@ impl SnowflakeHttpClient { .header( "X-Snowflake-Authorization-Token-Type".to_string(), "KEYPAIR_JWT", - ); - - let request = builder - .body(Body::from(generate_s3_file_name( - self.s3_path.clone(), - file_suffix, - ))) - .map_err(|err| SinkError::Snowflake(err.to_string()))?; + ) + .body(generate_s3_file_name(self.s3_path.clone(), file_suffix)); - let response = client - .request(request) + let response = builder + .send() .await - .map_err(|err| SinkError::Snowflake(err.to_string()))?; + .map_err(|err| SinkError::Snowflake(err.to_report_string()))?; if response.status() != StatusCode::OK { return Err(SinkError::Snowflake(format!(