Skip to content

Commit

Permalink
Merge branch 'main' into feat/jsonb-2-byte-array
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 committed Apr 11, 2024
2 parents f1db89e + bdde2a7 commit b4b6845
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 137 deletions.
23 changes: 18 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 2 additions & 10 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ chrono = { version = "0.4", default-features = false, features = [
"clock",
"std",
] }
clickhouse = { git = "https://github.com/risingwavelabs/clickhouse.rs", rev = "622501c1c98c80baaf578c716d6903dde947804e", features = [
clickhouse = { git = "https://github.com/risingwavelabs/clickhouse.rs", rev = "d38c8b6391af098b724c114e5a4746aedab6ab8e", features = [
"time",
] }
csv = "1.3"
Expand All @@ -60,14 +60,6 @@ gcp-bigquery-client = "0.18.0"
glob = "0.3"
google-cloud-pubsub = "0.23"
http = "0.2"
hyper = { version = "0.14", features = [
"client",
"tcp",
"http1",
"http2",
"stream",
] } # required by clickhouse client
hyper-tls = "0.5"
icelake = { workspace = true }
indexmap = { version = "1.9.3", features = ["serde"] }
itertools = { workspace = true }
Expand Down Expand Up @@ -112,7 +104,7 @@ rdkafka = { workspace = true, features = [
] }
redis = { version = "0.25", features = ["aio", "tokio-comp", "async-std-comp","cluster-async"] }
regex = "1.4"
reqwest = { version = "0.12.2", features = ["json"] }
reqwest = { version = "0.12.2", features = ["json", "stream"] }
risingwave_common = { workspace = true }
risingwave_common_estimate_size = { workspace = true }
risingwave_jni_core = { workspace = true }
Expand Down
13 changes: 2 additions & 11 deletions src/connector/src/sink/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use core::fmt::Debug;
use std::collections::{HashMap, HashSet};
use std::time::Duration;

use anyhow::anyhow;
use clickhouse::insert::Insert;
Expand Down Expand Up @@ -191,18 +191,9 @@ impl ClickHouseEngine {
}
}

const POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(5);

impl ClickHouseCommon {
pub(crate) fn build_client(&self) -> ConnectorResult<ClickHouseClient> {
use hyper_tls::HttpsConnector;

let https = HttpsConnector::new();
let client = hyper::Client::builder()
.pool_idle_timeout(POOL_IDLE_TIMEOUT)
.build::<_, hyper::Body>(https);

let client = ClickHouseClient::with_http_client(client)
let client = ClickHouseClient::default() // hyper(0.14) client inside
.with_url(&self.url)
.with_user(&self.user)
.with_password(&self.password)
Expand Down
46 changes: 17 additions & 29 deletions src/connector/src/sink/doris.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// TODO: use hyper 1 or reqwest 0.12.2

use std::collections::HashMap;
use std::sync::Arc;

Expand All @@ -22,9 +20,6 @@ use async_trait::async_trait;
use base64::engine::general_purpose;
use base64::Engine;
use bytes::{BufMut, Bytes, BytesMut};
use hyper::body::Body;
use hyper::{body, Client, Request};
use hyper_tls::HttpsConnector;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::Schema;
Expand Down Expand Up @@ -433,46 +428,39 @@ impl DorisSchemaClient {

pub async fn get_schema_from_doris(&self) -> Result<DorisSchema> {
let uri = format!("{}/api/{}/{}/_schema", self.url, self.db, self.table);
let builder = Request::get(uri);

let connector = HttpsConnector::new();
let client = Client::builder()
let client = reqwest::Client::builder()
.pool_idle_timeout(POOL_IDLE_TIMEOUT)
.build(connector);
.build()
.map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;

let request = builder
let response = client
.get(uri)
.header(
"Authorization",
format!(
"Basic {}",
general_purpose::STANDARD.encode(format!("{}:{}", self.user, self.password))
),
)
.body(Body::empty())
.map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;

let response = client
.request(request)
.send()
.await
.map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;

let raw_bytes = String::from_utf8(match body::to_bytes(response.into_body()).await {
Ok(bytes) => bytes.to_vec(),
Err(err) => return Err(SinkError::DorisStarrocksConnect(err.into())),
})
.map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;

let json_map: HashMap<String, Value> = serde_json::from_str(&raw_bytes)
let json: Value = response
.json()
.await
.map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
let json_data = if json_map.contains_key("code") && json_map.contains_key("msg") {
let data = json_map.get("data").ok_or_else(|| {
SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't find data"))
})?;
data.to_string()
let json_data = if json.get("code").is_some() && json.get("msg").is_some() {
json.get("data")
.ok_or_else(|| {
SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't find data"))
})?
.clone()
} else {
raw_bytes
json
};
let schema: DorisSchema = serde_json::from_str(&json_data)
let schema: DorisSchema = serde_json::from_value(json_data)
.context("Can't get schema from json")
.map_err(SinkError::DorisStarrocksConnect)?;
Ok(schema)
Expand Down
94 changes: 36 additions & 58 deletions src/connector/src/sink/doris_starrocks_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,15 @@
use core::mem;
use core::time::Duration;
use std::collections::HashMap;
use std::convert::Infallible;

use anyhow::Context;
use base64::engine::general_purpose;
use base64::Engine;
use bytes::{BufMut, Bytes, BytesMut};
use http::request::Builder;
use hyper::body::{Body, Sender};
use hyper::client::HttpConnector;
use hyper::{body, Client, Request, StatusCode};
use hyper_tls::HttpsConnector;
use futures::StreamExt;
use reqwest::{redirect, Body, Client, RequestBuilder, StatusCode};
use tokio::sync::mpsc::UnboundedSender;
use tokio::task::JoinHandle;
use url::Url;

Expand Down Expand Up @@ -187,33 +186,27 @@ impl InserterInnerBuilder {
})
}

// TODO: use hyper 1 or reqwest 0.12.2
fn build_request_and_client(
&self,
uri: String,
) -> (Builder, Client<HttpsConnector<HttpConnector>>) {
let mut builder = Request::put(uri);
for (k, v) in &self.header {
builder = builder.header(k, v);
}

let connector = HttpsConnector::new();
fn build_request(&self, uri: String) -> RequestBuilder {
let client = Client::builder()
.pool_idle_timeout(POOL_IDLE_TIMEOUT)
.build(connector);
.redirect(redirect::Policy::none()) // we handle redirect by ourselves
.build()
.unwrap();

(builder, client)
let mut builder = client.put(uri);
for (k, v) in &self.header {
builder = builder.header(k, v);
}
builder
}

pub async fn build(&self) -> Result<InserterInner> {
let (builder, client) = self.build_request_and_client(self.url.clone());
let request_get_url = builder
.body(Body::empty())
.map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
let resp = client
.request(request_get_url)
let builder = self.build_request(self.url.clone());
let resp = builder
.send()
.await
.map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
// TODO: shall we let `reqwest` handle the redirect?
let mut be_url = if resp.status() == StatusCode::TEMPORARY_REDIRECT {
resp.headers()
.get("location")
Expand Down Expand Up @@ -249,23 +242,25 @@ impl InserterInnerBuilder {
}
}

let (builder, client) = self.build_request_and_client(be_url);
let (sender, body) = Body::channel();
let request = builder
.body(body)
.map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
let future = client.request(request);
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
let body = Body::wrap_stream(
tokio_stream::wrappers::UnboundedReceiverStream::new(receiver).map(Ok::<_, Infallible>),
);
let builder = self.build_request(be_url).body(body);

let handle: JoinHandle<Result<Vec<u8>>> = tokio::spawn(async move {
let response = future
let response = builder
.send()
.await
.map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
let status = response.status();
let raw = body::to_bytes(response.into_body())
let raw = response
.bytes()
.await
.map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?
.to_vec();
if status == StatusCode::OK && !raw.is_empty() {
.into();

if status == StatusCode::OK {
Ok(raw)
} else {
Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!(
Expand All @@ -280,6 +275,8 @@ impl InserterInnerBuilder {
}
}

type Sender = UnboundedSender<Bytes>;

pub struct InserterInner {
sender: Option<Sender>,
join_handle: Option<JoinHandle<Result<Vec<u8>>>>,
Expand All @@ -301,37 +298,18 @@ impl InserterInner {

let chunk = mem::replace(&mut self.buffer, BytesMut::with_capacity(BUFFER_SIZE));

let is_timed_out = match tokio::time::timeout(
SEND_CHUNK_TIMEOUT,
self.sender.as_mut().unwrap().send_data(chunk.into()),
)
.await
{
Ok(Ok(_)) => return Ok(()),
Ok(Err(_)) => false,
Err(_) => true,
};
self.abort()?;
if let Err(_e) = self.sender.as_mut().unwrap().send(chunk.freeze()) {
self.sender.take();
self.wait_handle().await?;

let res = self.wait_handle().await;

if is_timed_out {
Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!("timeout")))
} else {
res?;
Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!(
"channel closed"
)))
} else {
Ok(())
}
}

fn abort(&mut self) -> Result<()> {
if let Some(sender) = self.sender.take() {
sender.abort();
}
Ok(())
}

pub async fn write(&mut self, data: Bytes) -> Result<()> {
self.buffer.put_slice(&data);
if self.buffer.len() >= MIN_CHUNK_SIZE {
Expand Down
Loading

0 comments on commit b4b6845

Please sign in to comment.