Skip to content

Commit

Permalink
doris starrocks connector
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Apr 9, 2024
1 parent 5b2825b commit 7b59b91
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 61 deletions.
17 changes: 16 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ hyper = { version = "0.14", features = [
"tcp",
"http1",
"http2",
"stream",
] } # required by clickhouse client
hyper-tls = "0.5"
icelake = { workspace = true }
Expand Down Expand Up @@ -111,7 +110,7 @@ rdkafka = { workspace = true, features = [
] }
redis = { version = "0.25", features = ["aio", "tokio-comp", "async-std-comp"] }
regex = "1.4"
reqwest = { version = "0.12.2", features = ["json"] }
reqwest = { version = "0.12.2", features = ["json", "stream"] }
risingwave_common = { workspace = true }
risingwave_common_estimate_size = { workspace = true }
risingwave_jni_core = { workspace = true }
Expand Down
92 changes: 34 additions & 58 deletions src/connector/src/sink/doris_starrocks_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,15 @@
use core::mem;
use core::time::Duration;
use std::collections::HashMap;
use std::convert::Infallible;

use anyhow::Context;
use base64::engine::general_purpose;
use base64::Engine;
use bytes::{BufMut, Bytes, BytesMut};
use http::request::Builder;
use hyper::body::{Body, Sender};
use hyper::client::HttpConnector;
use hyper::{body, Client, Request, StatusCode};
use hyper_tls::HttpsConnector;
use futures::StreamExt;
use reqwest::{Body, Client, RequestBuilder, StatusCode};
use tokio::sync::mpsc::UnboundedSender;
use tokio::task::JoinHandle;
use url::Url;

Expand Down Expand Up @@ -187,31 +186,23 @@ impl InserterInnerBuilder {
})
}

// TODO: use hyper 1 or reqwest 0.12.2
fn build_request_and_client(
&self,
uri: String,
) -> (Builder, Client<HttpsConnector<HttpConnector>>) {
let mut builder = Request::put(uri);
for (k, v) in &self.header {
builder = builder.header(k, v);
}

let connector = HttpsConnector::new();
fn build_request_and_client(&self, uri: String) -> RequestBuilder {
let client = Client::builder()
.pool_idle_timeout(POOL_IDLE_TIMEOUT)
.build(connector);
.build()
.unwrap();

(builder, client)
let mut builder = client.put(uri);
for (k, v) in &self.header {
builder = builder.header(k, v);
}
builder
}

pub async fn build(&self) -> Result<InserterInner> {
let (builder, client) = self.build_request_and_client(self.url.clone());
let request_get_url = builder
.body(Body::empty())
.map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
let resp = client
.request(request_get_url)
let builder = self.build_request_and_client(self.url.clone());
let resp = builder
.send()
.await
.map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
let mut be_url = if resp.status() == StatusCode::TEMPORARY_REDIRECT {
Expand Down Expand Up @@ -249,23 +240,25 @@ impl InserterInnerBuilder {
}
}

let (builder, client) = self.build_request_and_client(be_url);
let (sender, body) = Body::channel();
let request = builder
.body(body)
.map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
let future = client.request(request);
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
let body = Body::wrap_stream(
tokio_stream::wrappers::UnboundedReceiverStream::new(receiver).map(Ok::<_, Infallible>),
);
let builder = self.build_request_and_client(be_url).body(body);

let handle: JoinHandle<Result<Vec<u8>>> = tokio::spawn(async move {
let response = future
let response = builder
.send()
.await
.map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
let status = response.status();
let raw = body::to_bytes(response.into_body())
let raw = response
.bytes()
.await
.map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?
.to_vec();
if status == StatusCode::OK && !raw.is_empty() {
.into();

if status == StatusCode::OK {
Ok(raw)
} else {
Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!(
Expand All @@ -280,6 +273,8 @@ impl InserterInnerBuilder {
}
}

type Sender = UnboundedSender<Bytes>;

pub struct InserterInner {
sender: Option<Sender>,
join_handle: Option<JoinHandle<Result<Vec<u8>>>>,
Expand All @@ -301,37 +296,18 @@ impl InserterInner {

let chunk = mem::replace(&mut self.buffer, BytesMut::with_capacity(BUFFER_SIZE));

let is_timed_out = match tokio::time::timeout(
SEND_CHUNK_TIMEOUT,
self.sender.as_mut().unwrap().send_data(chunk.into()),
)
.await
{
Ok(Ok(_)) => return Ok(()),
Ok(Err(_)) => false,
Err(_) => true,
};
self.abort()?;
if let Err(_e) = self.sender.as_mut().unwrap().send(chunk.freeze()) {
self.sender.take();
self.wait_handle().await?;

let res = self.wait_handle().await;

if is_timed_out {
Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!("timeout")))
} else {
res?;
Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!(
"channel closed"
)))
} else {
Ok(())
}
}

fn abort(&mut self) -> Result<()> {
if let Some(sender) = self.sender.take() {
sender.abort();
}
Ok(())
}

pub async fn write(&mut self, data: Bytes) -> Result<()> {
self.buffer.put_slice(&data);
if self.buffer.len() >= MIN_CHUNK_SIZE {
Expand Down

0 comments on commit 7b59b91

Please sign in to comment.