From 23796a21bfa5071627d64e8dc6574739c0282012 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Thu, 16 May 2024 15:52:36 +0800 Subject: [PATCH 01/15] add --- ...ink.rs => decouple_checkpoint_log_sink.rs} | 8 +-- src/connector/src/sink/deltalake.rs | 53 ++++++++++++++++--- src/connector/src/sink/iceberg/mod.rs | 7 ++- src/connector/src/sink/mod.rs | 1 + 4 files changed, 55 insertions(+), 14 deletions(-) rename src/connector/src/sink/{iceberg/log_sink.rs => decouple_checkpoint_log_sink.rs} (96%) diff --git a/src/connector/src/sink/iceberg/log_sink.rs b/src/connector/src/sink/decouple_checkpoint_log_sink.rs similarity index 96% rename from src/connector/src/sink/iceberg/log_sink.rs rename to src/connector/src/sink/decouple_checkpoint_log_sink.rs index dc9ee34f7dda..ed30d83eae69 100644 --- a/src/connector/src/sink/iceberg/log_sink.rs +++ b/src/connector/src/sink/decouple_checkpoint_log_sink.rs @@ -21,13 +21,13 @@ use crate::sink::log_store::{LogStoreReadItem, TruncateOffset}; use crate::sink::writer::SinkWriter; use crate::sink::{LogSinker, Result, SinkLogReader, SinkMetrics}; -pub struct IcebergLogSinkerOf { +pub struct DecoupleCheckpointLogSinkerOf { writer: W, sink_metrics: SinkMetrics, commit_checkpoint_interval: NonZeroU64, } -impl IcebergLogSinkerOf { +impl DecoupleCheckpointLogSinkerOf { /// Create a log sinker with a commit checkpoint interval. The sinker should be used with a /// decouple log reader `KvLogStoreReader`. pub fn new( @@ -35,7 +35,7 @@ impl IcebergLogSinkerOf { sink_metrics: SinkMetrics, commit_checkpoint_interval: NonZeroU64, ) -> Self { - IcebergLogSinkerOf { + DecoupleCheckpointLogSinkerOf { writer, sink_metrics, commit_checkpoint_interval, @@ -44,7 +44,7 @@ impl IcebergLogSinkerOf { } #[async_trait] -impl> LogSinker for IcebergLogSinkerOf { +impl> LogSinker for DecoupleCheckpointLogSinkerOf { async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<()> { let mut sink_writer = self.writer; let sink_metrics = self.sink_metrics; diff --git a/src/connector/src/sink/deltalake.rs b/src/connector/src/sink/deltalake.rs index e81bb141a4c3..f5940df637ca 100644 --- a/src/connector/src/sink/deltalake.rs +++ b/src/connector/src/sink/deltalake.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use core::num::NonZeroU64; use std::collections::HashMap; use std::sync::Arc; @@ -30,6 +31,7 @@ use risingwave_common::array::StreamChunk; use risingwave_common::bail; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; +use risingwave_common::session_config::sink_decouple::SinkDecouple; use risingwave_common::types::DataType; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized; @@ -39,13 +41,15 @@ use serde_derive::{Deserialize, Serialize}; use serde_with::serde_as; use with_options::WithOptions; +use super::catalog::desc::SinkDesc; use super::coordinate::CoordinatedSinkWriter; -use super::writer::{LogSinkerOf, SinkWriter}; +use super::decouple_checkpoint_log_sink::DecoupleCheckpointLogSinkerOf; +use super::writer::SinkWriter; use super::{ Result, Sink, SinkCommitCoordinator, SinkError, SinkParam, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION, }; -use crate::sink::writer::SinkWriterExt; +use crate::deserialize_optional_u64_from_string; pub const DELTALAKE_SINK: &str = "deltalake"; pub const DEFAULT_REGION: &str = "us-east-1"; @@ -65,6 +69,9 @@ pub struct DeltaLakeCommon { pub s3_endpoint: Option, #[serde(rename = "gcs.service.account")] pub gcs_service_account: Option, + // Commit every n(>0) checkpoints, if n is not set, we will commit every checkpoint. + #[serde(default, deserialize_with = "deserialize_optional_u64_from_string")] + pub commit_checkpoint_interval: Option, } impl DeltaLakeCommon { pub async fn create_deltalake_client(&self) -> Result { @@ -269,10 +276,34 @@ fn check_field_type(rw_data_type: &DataType, dl_data_type: &DeltaLakeDataType) - impl Sink for DeltaLakeSink { type Coordinator = DeltaLakeSinkCommitter; - type LogSinker = LogSinkerOf>; + type LogSinker = DecoupleCheckpointLogSinkerOf>; const SINK_NAME: &'static str = DELTALAKE_SINK; + fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { + let config_decouple = if let Some(interval) = + desc.properties.get("commit_checkpoint_interval") + && interval.parse::().unwrap_or(0) > 1 + { + true + } else { + false + }; + + match user_specified { + SinkDecouple::Default => Ok(config_decouple), + SinkDecouple::Disable => { + if config_decouple { + return Err(SinkError::Config(anyhow!( + "config conflict: DeltaLake config `commit_checkpoint_interval` bigger than 1 which means that must enable sink decouple, but session config sink decouple is disabled" + ))); + } + Ok(false) + } + SinkDecouple::Enable => Ok(true), + } + } + async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { let inner = DeltaLakeSinkWriter::new( self.config.clone(), @@ -280,7 +311,7 @@ impl Sink for DeltaLakeSink { self.param.downstream_pk.clone(), ) .await?; - Ok(CoordinatedSinkWriter::new( + let writer = CoordinatedSinkWriter::new( writer_param .meta_client .expect("should have meta client") @@ -294,8 +325,18 @@ impl Sink for DeltaLakeSink { })?, inner, ) - .await? - .into_log_sinker(writer_param.sink_metrics)) + .await?; + + let commit_checkpoint_interval = + NonZeroU64::new(self.config.common.commit_checkpoint_interval.unwrap_or(1)).expect( + "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation", + ); + + Ok(DecoupleCheckpointLogSinkerOf::new( + writer, + writer_param.sink_metrics, + commit_checkpoint_interval, + )) } async fn validate(&self) -> Result<()> { diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index f2e3b45d5915..deceeee249b5 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -13,7 +13,6 @@ // limitations under the License. mod jni_catalog; -mod log_sink; mod mock_catalog; mod prometheus; @@ -54,11 +53,11 @@ use thiserror_ext::AsReport; use url::Url; use with_options::WithOptions; -use self::log_sink::IcebergLogSinkerOf; use self::mock_catalog::MockCatalog; use self::prometheus::monitored_base_file_writer::MonitoredBaseFileWriterBuilder; use self::prometheus::monitored_position_delete_writer::MonitoredPositionDeleteWriterBuilder; use super::catalog::desc::SinkDesc; +use super::decouple_checkpoint_log_sink::DecoupleCheckpointLogSinkerOf; use super::{ Sink, SinkError, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, }; @@ -516,7 +515,7 @@ impl IcebergSink { impl Sink for IcebergSink { type Coordinator = IcebergSinkCommitter; - type LogSinker = IcebergLogSinkerOf>; + type LogSinker = DecoupleCheckpointLogSinkerOf>; const SINK_NAME: &'static str = ICEBERG_SINK; @@ -577,7 +576,7 @@ impl Sink for IcebergSink { "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation", ); - Ok(IcebergLogSinkerOf::new( + Ok(DecoupleCheckpointLogSinkerOf::new( writer, writer_param.sink_metrics, commit_checkpoint_interval, diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 2ef4bb953b67..699398870979 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -17,6 +17,7 @@ pub mod boxed; pub mod catalog; pub mod clickhouse; pub mod coordinate; +pub mod decouple_checkpoint_log_sink; pub mod deltalake; pub mod doris; pub mod doris_starrocks_connector; From 11305fc877e0251295b7004d8866ca77abbf5723 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Thu, 16 May 2024 15:53:44 +0800 Subject: [PATCH 02/15] fix ut --- src/connector/with_options_sink.yaml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 3dbe5d394a8e..8607ffe7fd3f 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -110,6 +110,10 @@ DeltaLakeConfig: - name: r#type field_type: String required: true + - name: commit_checkpoint_interval + field_type: u64 + required: false + default: Default::default DorisConfig: fields: - name: doris.url From 902de7422abbd89041d0a27d18a31f5e8be86d31 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Thu, 16 May 2024 16:14:11 +0800 Subject: [PATCH 03/15] fix ci --- src/connector/with_options_sink.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 8607ffe7fd3f..83f7d3bba3cd 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -107,13 +107,13 @@ DeltaLakeConfig: - name: gcs.service.account field_type: String required: false - - name: r#type - field_type: String - required: true - name: commit_checkpoint_interval field_type: u64 required: false default: Default::default + - name: r#type + field_type: String + required: true DorisConfig: fields: - name: doris.url From 268365135161fd25aebcd73058662c57e480cd99 Mon Sep 17 00:00:00 2001 From: ly9chee <54972801+ly9chee@users.noreply.github.com> Date: Sat, 18 May 2024 18:02:36 +0800 Subject: [PATCH 04/15] implement decouping logic --- .../src/sink/doris_starrocks_connector.rs | 350 +++++++++++++-- src/connector/src/sink/starrocks.rs | 425 +++++++++++++++--- 2 files changed, 673 insertions(+), 102 deletions(-) diff --git a/src/connector/src/sink/doris_starrocks_connector.rs b/src/connector/src/sink/doris_starrocks_connector.rs index 6c045c63beb4..86ef7b070bfb 100644 --- a/src/connector/src/sink/doris_starrocks_connector.rs +++ b/src/connector/src/sink/doris_starrocks_connector.rs @@ -16,13 +16,15 @@ use core::mem; use core::time::Duration; use std::collections::HashMap; use std::convert::Infallible; +use std::sync::Arc; use anyhow::Context; use base64::engine::general_purpose; use base64::Engine; use bytes::{BufMut, Bytes, BytesMut}; use futures::StreamExt; -use reqwest::{redirect, Body, Client, RequestBuilder, StatusCode}; +use reqwest::header::{HeaderName, HeaderValue}; +use reqwest::{redirect, Body, Client, Method, Request, RequestBuilder, Response, StatusCode}; use tokio::sync::mpsc::UnboundedSender; use tokio::task::JoinHandle; use url::Url; @@ -31,7 +33,7 @@ use super::{Result, SinkError}; const BUFFER_SIZE: usize = 64 * 1024; const MIN_CHUNK_SIZE: usize = BUFFER_SIZE - 1024; -pub(crate) const DORIS_SUCCESS_STATUS: [&str; 2] = ["Success", "Publish Timeout"]; +pub(crate) const DORIS_SUCCESS_STATUS: [&str; 1] = ["OK"]; pub(crate) const DORIS_DELETE_SIGN: &str = "__DORIS_DELETE_SIGN__"; pub(crate) const STARROCKS_DELETE_SIGN: &str = "__op"; @@ -151,11 +153,75 @@ impl HeaderBuilder { self } + /// Only used in Starrocks Transaction API + pub fn set_db(mut self, db: String) -> Self { + self.header.insert("db".to_string(), db); + self + } + + /// Only used in Starrocks Transaction API + pub fn set_table(mut self, table: String) -> Self { + self.header.insert("table".to_string(), table); + self + } + pub fn build(self) -> HashMap { self.header } } +/// Try getting BE url from a redirected response, returning `Ok(None)` indicates this request does +/// not redirect. +/// +/// The reason we handle the redirection manually is that if we let `reqwest` handle the redirection +/// automatically, it will remove sensitive headers (such as Authorization) during the redirection, +/// and there's no way to prevent this behavior. +fn try_get_be_url(resp: &Response, fe_host: String) -> Result> { + match resp.status() { + StatusCode::TEMPORARY_REDIRECT => { + let be_url = resp + .headers() + .get("location") + .ok_or_else(|| { + SinkError::DorisStarrocksConnect(anyhow::anyhow!( + "Can't get doris BE url in header", + )) + })? + .to_str() + .context("Can't get doris BE url in header") + .map_err(SinkError::DorisStarrocksConnect)? + .to_string(); + + let mut parsed_be_url = + Url::parse(&be_url).map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + + if fe_host != LOCALHOST && fe_host != LOCALHOST_IP { + let be_host = parsed_be_url.host_str().ok_or_else(|| { + SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't get be host from url")) + })?; + + if be_host == LOCALHOST || be_host == LOCALHOST_IP { + // if be host is 127.0.0.1, we may can't connect to it directly, + // so replace it with fe host + parsed_be_url + .set_host(Some(fe_host.as_str())) + .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + } + } + Ok(Some(parsed_be_url)) + } + StatusCode::OK => { + // Some of the `StarRocks` transactional APIs will respond directly from FE. For example, + // the request to `/api/transaction/commit` endpoint does not seem to redirect to BE. + // In this case, the request should be treated as finished. + Ok(None) + } + _ => Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!( + "Can't get doris BE url", + ))), + } +} + pub struct InserterInnerBuilder { url: String, header: HashMap, @@ -206,47 +272,16 @@ impl InserterInnerBuilder { .send() .await .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; - // TODO: shall we let `reqwest` handle the redirect? - let mut be_url = if resp.status() == StatusCode::TEMPORARY_REDIRECT { - resp.headers() - .get("location") - .ok_or_else(|| { - SinkError::DorisStarrocksConnect(anyhow::anyhow!( - "Can't get doris BE url in header", - )) - })? - .to_str() - .context("Can't get doris BE url in header") - .map_err(SinkError::DorisStarrocksConnect)? - .to_string() - } else { - return Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!( - "Can't get doris BE url", - ))); - }; - if self.fe_host != LOCALHOST && self.fe_host != LOCALHOST_IP { - let mut parsed_be_url = - Url::parse(&be_url).map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; - let be_host = parsed_be_url.host_str().ok_or_else(|| { - SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't get be host from url")) - })?; - - if be_host == LOCALHOST || be_host == LOCALHOST_IP { - // if be host is 127.0.0.1, we may can't connect to it directly, - // so replace it with fe host - parsed_be_url - .set_host(Some(self.fe_host.as_str())) - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; - be_url = parsed_be_url.as_str().into(); - } - } + let be_url = try_get_be_url(&resp, self.fe_host.clone())?.ok_or_else(|| { + SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't get doris BE url",)) + })?; let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); let body = Body::wrap_stream( tokio_stream::wrappers::UnboundedReceiverStream::new(receiver).map(Ok::<_, Infallible>), ); - let builder = self.build_request(be_url).body(body); + let builder = self.build_request(be_url.into()).body(body); let handle: JoinHandle>> = tokio::spawn(async move { let response = builder @@ -271,7 +306,7 @@ impl InserterInnerBuilder { ))) } }); - Ok(InserterInner::new(sender, handle)) + Ok(InserterInner::new(sender, handle, WAIT_HANDDLE_TIMEOUT)) } } @@ -281,13 +316,19 @@ pub struct InserterInner { sender: Option, join_handle: Option>>>, buffer: BytesMut, + stream_load_http_timeout: Duration, } impl InserterInner { - pub fn new(sender: Sender, join_handle: JoinHandle>>) -> Self { + pub fn new( + sender: Sender, + join_handle: JoinHandle>>, + stream_load_http_timeout: Duration, + ) -> Self { Self { sender: Some(sender), join_handle: Some(join_handle), buffer: BytesMut::with_capacity(BUFFER_SIZE), + stream_load_http_timeout, } } @@ -312,6 +353,7 @@ impl InserterInner { pub async fn write(&mut self, data: Bytes) -> Result<()> { self.buffer.put_slice(&data); + // Should we check if self.buffer.len() >= MIN_CHUNK_SIZE { self.send_chunk().await?; } @@ -337,3 +379,233 @@ impl InserterInner { self.wait_handle().await } } + +pub struct MetaRequestSender { + client: Arc, + request: Request, + fe_host: String, +} + +impl MetaRequestSender { + pub fn new(client: Arc, request: Request, fe_host: String) -> Self { + Self { + client, + request, + fe_host, + } + } + + /// Send the request and handle redirection if any. + /// The reason we handle the redirection manually is that if we let `reqwest` handle the redirection + /// automatically, it will remove sensitive headers (such as Authorization) during the redirection, + /// and there's no way to prevent this behavior. + /// + /// Another interesting point is that some of the `StarRocks` transactional APIs will respond directly from FE. + /// For example, the request to `/api/transaction/commit` endpoint does not seem to redirect to BE. + pub async fn send(self) -> Result { + let request = self.request; + let mut request_for_redirection = request.try_clone().ok_or_else(|| { + SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't clone request")) + })?; + + let resp = self + .client + .execute(request) + .await + .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + + let be_url = try_get_be_url(&resp, self.fe_host)?; + + match be_url { + Some(be_url) => { + *request_for_redirection.url_mut() = be_url; + + self.client + .execute(request_for_redirection) + .await + .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))? + .bytes() + .await + .map_err(|err| SinkError::DorisStarrocksConnect(err.into())) + } + None => resp + .bytes() + .await + .map_err(|err| SinkError::DorisStarrocksConnect(err.into())), + } + } +} + +pub struct StarrocksTxnRequestBuilder { + url_begin: String, + url_load: String, + url_prepare: String, + url_commit: String, + url_rollback: String, + header: HashMap, + fe_host: String, + stream_load_http_timeout: Duration, + // `client` needs to be shared with `MetaRequestSender` and `TxnInserterInner`, There's no need to + // build an HTTP client every time a request is made, so we use `Arc`. + client: Arc, +} + +impl StarrocksTxnRequestBuilder { + pub fn new( + url: String, + header: HashMap, + stream_load_http_timeout_ms: u64, + ) -> Result { + let fe_host = Url::parse(&url) + .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))? + .host_str() + .ok_or_else(|| { + SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't get fe host from url")) + })? + .to_string(); + + let url_begin = format!("{}/api/transaction/begin", url); + let url_load = format!("{}/api/transaction/load", url); + let url_prepare = format!("{}/api/transaction/prepare", url); + let url_commit = format!("{}/api/transaction/commit", url); + let url_rollback = format!("{}/api/transaction/rollback", url); + + let stream_load_http_timeout = Duration::from_millis(stream_load_http_timeout_ms); + + let client = Client::builder() + .pool_idle_timeout(POOL_IDLE_TIMEOUT) + .redirect(redirect::Policy::none()) + .build() + .unwrap(); + + Ok(Self { + url_begin, + url_load, + url_prepare, + url_commit, + url_rollback, + header, + fe_host, + stream_load_http_timeout, + client: Arc::new(client), + }) + } + + fn build_request(&self, uri: String, method: Method, label: String) -> Result { + let parsed_url = + Url::parse(&uri).map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + let mut request = Request::new(method, parsed_url); + + if uri != self.url_load { + // Set timeout for non-load requests; load requests' timeout is controlled by `tokio::timeout` + *request.timeout_mut() = Some(self.stream_load_http_timeout); + } + + let header = request.headers_mut(); + for (k, v) in &self.header { + header.insert( + HeaderName::try_from(k) + .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?, + HeaderValue::try_from(v) + .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?, + ); + } + header.insert( + "label", + HeaderValue::try_from(label) + .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?, + ); + + Ok(request) + } + + pub fn build_begin_request_sender(&self, label: String) -> Result { + let request = self.build_request(self.url_begin.clone(), Method::POST, label)?; + Ok(MetaRequestSender::new( + self.client.clone(), + request, + self.fe_host.clone(), + )) + } + + pub fn build_prepare_request_sender(&self, label: String) -> Result { + let request = self.build_request(self.url_prepare.clone(), Method::POST, label)?; + Ok(MetaRequestSender::new( + self.client.clone(), + request, + self.fe_host.clone(), + )) + } + + pub fn build_commit_request_sender(&self, label: String) -> Result { + let request = self.build_request(self.url_commit.clone(), Method::POST, label)?; + Ok(MetaRequestSender::new( + self.client.clone(), + request, + self.fe_host.clone(), + )) + } + + pub fn build_rollback_request_sender(&self, label: String) -> Result { + let request = self.build_request(self.url_rollback.clone(), Method::POST, label)?; + Ok(MetaRequestSender::new( + self.client.clone(), + request, + self.fe_host.clone(), + )) + } + + pub async fn build_txn_inserter(&self, label: String) -> Result { + let request = self.build_request(self.url_load.clone(), Method::PUT, label)?; + let mut request_for_redirection = request.try_clone().ok_or_else(|| { + SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't clone request")) + })?; + + let resp = self + .client + .execute(request) + .await + .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + + let be_url = try_get_be_url(&resp, self.fe_host.clone())?.ok_or_else(|| { + SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't get doris BE url",)) + })?; + *request_for_redirection.url_mut() = be_url; + + let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); + let body = Body::wrap_stream( + tokio_stream::wrappers::UnboundedReceiverStream::new(receiver).map(Ok::<_, Infallible>), + ); + *request_for_redirection.body_mut() = Some(body); + + let client = self.client.clone(); + let handle: JoinHandle>> = tokio::spawn(async move { + let response = client + .execute(request_for_redirection) + .await + .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + let status = response.status(); + let raw = response + .bytes() + .await + .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))? + .into(); + + if status == StatusCode::OK { + Ok(raw) + } else { + Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!( + "Failed connection {:?},{:?}", + status, + String::from_utf8(raw) + .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))? + ))) + } + }); + Ok(InserterInner::new( + sender, + handle, + self.stream_load_http_timeout, + )) + } +} diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs index 994e4e43d817..da0dd4459209 100644 --- a/src/connector/src/sink/starrocks.rs +++ b/src/connector/src/sink/starrocks.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::HashMap; +use std::num::NonZeroU64; use std::sync::Arc; use anyhow::anyhow; @@ -23,28 +24,44 @@ use mysql_async::Opts; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; +use risingwave_common::session_config::sink_decouple::SinkDecouple; use risingwave_common::types::DataType; +use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized; +use risingwave_pb::connector_service::sink_metadata::SerializedMetadata; +use risingwave_pb::connector_service::SinkMetadata; use serde::Deserialize; use serde_derive::Serialize; use serde_json::Value; use serde_with::serde_as; use thiserror_ext::AsReport; +use tokio::task::JoinHandle; +use url::form_urlencoded; use with_options::WithOptions; use super::doris_starrocks_connector::{ - HeaderBuilder, InserterInner, InserterInnerBuilder, DORIS_SUCCESS_STATUS, STARROCKS_DELETE_SIGN, + HeaderBuilder, InserterInner, StarrocksTxnRequestBuilder, DORIS_SUCCESS_STATUS, + STARROCKS_DELETE_SIGN, }; use super::encoder::{JsonEncoder, RowEncoder}; -use super::writer::LogSinkerOf; -use super::{SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; -use crate::sink::writer::SinkWriterExt; -use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam}; +use super::{ + SinkCommitCoordinator, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, + SINK_TYPE_UPSERT, +}; +use crate::deserialize_optional_u64_from_string; +use crate::sink::catalog::desc::SinkDesc; +use crate::sink::coordinate::CoordinatedSinkWriter; +use crate::sink::decouple_checkpoint_log_sink::DecoupleCheckpointLogSinkerOf; +use crate::sink::{Result, Sink, SinkWriter, SinkWriterParam}; pub const STARROCKS_SINK: &str = "starrocks"; const STARROCK_MYSQL_PREFER_SOCKET: &str = "false"; const STARROCK_MYSQL_MAX_ALLOWED_PACKET: usize = 1024; const STARROCK_MYSQL_WAIT_TIMEOUT: usize = 28800; +const fn _default_stream_load_http_timeout_ms() -> u64 { + 10 * 1000 +} + #[derive(Deserialize, Debug, Clone, WithOptions)] pub struct StarrocksCommon { /// The `StarRocks` host address. @@ -68,8 +85,6 @@ pub struct StarrocksCommon { /// The `StarRocks` table you want to sink data to. #[serde(rename = "starrocks.table")] pub table: String, - #[serde(rename = "starrocks.partial_update")] - pub partial_update: Option, } #[serde_as] @@ -78,6 +93,25 @@ pub struct StarrocksConfig { #[serde(flatten)] pub common: StarrocksCommon, + /// The timeout in milliseconds for stream load http request, defaults to 10 seconds. + #[serde( + rename = "starrocks.stream_load.http.timeout.ms", + default = "_default_stream_load_http_timeout_ms" + )] + pub stream_load_http_timeout_ms: u64, + + /// Set this option to a positive integer n, RisingWave will try to commit data + /// to Starrocks at every n checkpoints by leveraging the + /// [StreamLoad Transaction API](https://docs.starrocks.io/docs/loading/Stream_Load_transaction_interface/), + /// also, in this time, the `sink_decouple` option should be enabled as well. + /// Defaults to 1 if commit_checkpoint_interval <= 0 + #[serde(default, deserialize_with = "deserialize_optional_u64_from_string")] + pub commit_checkpoint_interval: Option, + + /// Enable partial update + #[serde(rename = "starrocks.partial_update")] + pub partial_update: Option, + pub r#type: String, // accept "append-only" or "upsert" } impl StarrocksConfig { @@ -93,12 +127,18 @@ impl StarrocksConfig { SINK_TYPE_UPSERT ))); } + if config.commit_checkpoint_interval == Some(0) { + return Err(SinkError::Config(anyhow!( + "commit_checkpoint_interval must be greater than 0" + ))); + } Ok(config) } } #[derive(Debug)] pub struct StarrocksSink { + param: SinkParam, pub config: StarrocksConfig, schema: Schema, pk_indices: Vec, @@ -106,13 +146,11 @@ pub struct StarrocksSink { } impl StarrocksSink { - pub fn new( - config: StarrocksConfig, - schema: Schema, - pk_indices: Vec, - is_append_only: bool, - ) -> Result { + pub fn new(param: SinkParam, config: StarrocksConfig, schema: Schema) -> Result { + let pk_indices = param.downstream_pk.clone(); + let is_append_only = param.sink_type.is_append_only(); Ok(Self { + param, config, schema, pk_indices, @@ -211,19 +249,33 @@ impl StarrocksSink { } impl Sink for StarrocksSink { - type Coordinator = DummySinkCommitCoordinator; - type LogSinker = LogSinkerOf; + type Coordinator = StarrocksSinkCommitter; + type LogSinker = DecoupleCheckpointLogSinkerOf>; const SINK_NAME: &'static str = STARROCKS_SINK; - async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { - Ok(StarrocksSinkWriter::new( - self.config.clone(), - self.schema.clone(), - self.pk_indices.clone(), - self.is_append_only, - )? - .into_log_sinker(writer_param.sink_metrics)) + fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { + let config_decouple = if let Some(interval) = + desc.properties.get("commit_checkpoint_interval") + && interval.parse::().unwrap_or(0) > 1 + { + true + } else { + false + }; + + match user_specified { + SinkDecouple::Default => Ok(config_decouple), + SinkDecouple::Disable => { + if config_decouple { + return Err(SinkError::Config(anyhow!( + "config conflict: Starrocks config `commit_checkpoint_interval` bigger than 1 which means that must enable sink decouple, but session config sink decouple is disabled" + ))); + } + Ok(false) + } + SinkDecouple::Enable => Ok(true), + } } async fn validate(&self) -> Result<()> { @@ -263,16 +315,78 @@ impl Sink for StarrocksSink { self.check_column_name_and_type(starrocks_columns_desc)?; Ok(()) } + + async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { + let commit_checkpoint_interval = + NonZeroU64::new(self.config.commit_checkpoint_interval.unwrap_or(1)).expect( + "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation", + ); + + let inner = StarrocksSinkWriter::new( + self.config.clone(), + self.schema.clone(), + self.pk_indices.clone(), + self.is_append_only, + writer_param.executor_id, + )?; + let writer = CoordinatedSinkWriter::new( + writer_param + .meta_client + .expect("should have meta client") + .sink_coordinate_client() + .await, + self.param.clone(), + writer_param.vnode_bitmap.ok_or_else(|| { + SinkError::Remote(anyhow!( + "sink needs coordination should not have singleton input" + )) + })?, + inner, + ) + .await?; + + Ok(DecoupleCheckpointLogSinkerOf::new( + writer, + writer_param.sink_metrics, + commit_checkpoint_interval, + )) + } + + async fn new_coordinator(&self) -> Result { + let header = HeaderBuilder::new() + .add_common_header() + .set_user_password( + self.config.common.user.clone(), + self.config.common.password.clone(), + ) + .set_db(self.config.common.database.clone()) + .set_table(self.config.common.table.clone()) + .build(); + + let txn_request_builder = StarrocksTxnRequestBuilder::new( + format!( + "http://{}:{}", + self.config.common.host, self.config.common.http_port + ), + header, + self.config.stream_load_http_timeout_ms, + )?; + Ok(StarrocksSinkCommitter { + client: Arc::new(StarrocksTxnClient::new(txn_request_builder)), + }) + } } pub struct StarrocksSinkWriter { pub config: StarrocksConfig, schema: Schema, pk_indices: Vec, - inserter_innet_builder: InserterInnerBuilder, is_append_only: bool, client: Option, + txn_client: StarrocksTxnClient, row_encoder: JsonEncoder, + executor_id: u64, + curr_txn_label: Option, } impl TryFrom for StarrocksSink { @@ -280,13 +394,8 @@ impl TryFrom for StarrocksSink { fn try_from(param: SinkParam) -> std::result::Result { let schema = param.schema(); - let config = StarrocksConfig::from_hashmap(param.properties)?; - StarrocksSink::new( - config, - schema, - param.downstream_pk, - param.sink_type.is_append_only(), - ) + let config = StarrocksConfig::from_hashmap(param.properties.clone())?; + StarrocksSink::new(param, config, schema) } } @@ -296,6 +405,7 @@ impl StarrocksSinkWriter { schema: Schema, pk_indices: Vec, is_append_only: bool, + executor_id: u64, ) -> Result { let mut fields_name = schema.names_str(); if !is_append_only { @@ -306,24 +416,28 @@ impl StarrocksSinkWriter { .add_common_header() .set_user_password(config.common.user.clone(), config.common.password.clone()) .add_json_format() - .set_partial_update(config.common.partial_update.clone()) + .set_partial_update(config.partial_update.clone()) .set_columns_name(fields_name) + .set_db(config.common.database.clone()) + .set_table(config.common.table.clone()) .build(); - let starrocks_insert_builder = InserterInnerBuilder::new( + let txn_request_builder = StarrocksTxnRequestBuilder::new( format!("http://{}:{}", config.common.host, config.common.http_port), - config.common.database.clone(), - config.common.table.clone(), header, + config.stream_load_http_timeout_ms, )?; + Ok(Self { config, schema: schema.clone(), pk_indices, - inserter_innet_builder: starrocks_insert_builder, is_append_only, client: None, + txn_client: StarrocksTxnClient::new(txn_request_builder), row_encoder: JsonEncoder::new_with_starrocks(schema, None), + executor_id, + curr_txn_label: None, }) } @@ -403,15 +517,39 @@ impl StarrocksSinkWriter { } Ok(()) } + + /// Generating a new transaction label, should be unique across all `SinkWriters` even under rewinding. + #[inline(always)] + fn new_txn_label(&self) -> String { + format!( + "rw-txn-{}-{}", + self.executor_id, + chrono::Utc::now().timestamp_micros() + ) + } } #[async_trait] impl SinkWriter for StarrocksSinkWriter { + type CommitMetadata = Option; + + async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> { + Ok(()) + } + async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { if self.client.is_none() { - self.client = Some(StarrocksClient::new( - self.inserter_innet_builder.build().await?, - )); + let txn_label = self.new_txn_label(); + tracing::debug!(?txn_label, "begin transaction"); + let txn_label_res = self.txn_client.begin(txn_label.clone()).await?; + assert_eq!( + txn_label, txn_label_res, + "label responding from StarRocks: {} differ from generated one: {}", + txn_label, txn_label_res + ); + + self.curr_txn_label = Some(txn_label.clone()); + self.client = Some(StarrocksClient::new(self.txn_client.load(txn_label).await?)); } if self.is_append_only { self.append_only(chunk).await @@ -420,21 +558,40 @@ impl SinkWriter for StarrocksSinkWriter { } } - async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> { - Ok(()) - } - - async fn abort(&mut self) -> Result<()> { - Ok(()) - } - - async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> { + async fn barrier(&mut self, is_checkpoint: bool) -> Result> { if self.client.is_some() { + // Here we finish the `/api/transaction/load` request when a barrier is received. Therefore, + // one or more load requests should be made within one commit_checkpoint_interval period. + // StarRocks will take care of merging those splits into a larger one during prepare transaction. + // Thus, only one version will be produced when the transaction is committed. See Stream Load + // transaction interface for more information. let client = self .client .take() .ok_or_else(|| SinkError::Starrocks("Can't find starrocks inserter".to_string()))?; client.finish().await?; + + if is_checkpoint { + assert!(self.curr_txn_label.is_some(), "no txn label during prepare"); + let txn_label = self.curr_txn_label.take().unwrap(); + tracing::debug!(?txn_label, "prepare transaction"); + let txn_label_res = self.txn_client.prepare(txn_label.clone()).await?; + assert_eq!( + txn_label, txn_label_res, + "label responding from StarRocks differs from the current one" + ); + return Ok(Some(StarrocksWriteResult(txn_label).try_into()?)); + } + } + Ok(None) + } + + async fn abort(&mut self) -> Result<()> { + if self.client.is_some() && self.curr_txn_label.is_some() { + self.client.take(); + let txn_label = self.curr_txn_label.take().unwrap(); + tracing::debug!(?txn_label, "rollback transaction"); + self.txn_client.rollback(txn_label).await?; } Ok(()) } @@ -459,6 +616,11 @@ impl StarrocksSchemaClient { user: String, password: String, ) -> Result { + // username & password may contain special chars, so we need to do URL encoding on them. + // Otherwise, Opts::from_url may report a `Parse error` + let user = form_urlencoded::byte_serialize(user.as_bytes()).collect::(); + let password = form_urlencoded::byte_serialize(password.as_bytes()).collect::(); + let conn_uri = format!( "mysql://{}:{}@{}:{}/{}?prefer_socket={}&max_allowed_packet={}&wait_timeout={}", user, @@ -518,35 +680,39 @@ impl StarrocksSchemaClient { #[derive(Debug, Serialize, Deserialize)] pub struct StarrocksInsertResultResponse { #[serde(rename = "TxnId")] - txn_id: i64, + pub txn_id: Option, + #[serde(rename = "Seq")] + pub seq: Option, #[serde(rename = "Label")] - label: String, + pub label: Option, #[serde(rename = "Status")] - status: String, + pub status: String, #[serde(rename = "Message")] - message: String, + pub message: String, #[serde(rename = "NumberTotalRows")] - number_total_rows: i64, + pub number_total_rows: Option, #[serde(rename = "NumberLoadedRows")] - number_loaded_rows: i64, + pub number_loaded_rows: Option, #[serde(rename = "NumberFilteredRows")] - number_filtered_rows: i32, + pub number_filtered_rows: Option, #[serde(rename = "NumberUnselectedRows")] - number_unselected_rows: i32, + pub number_unselected_rows: Option, #[serde(rename = "LoadBytes")] - load_bytes: i64, + pub load_bytes: Option, #[serde(rename = "LoadTimeMs")] - load_time_ms: i32, + pub load_time_ms: Option, #[serde(rename = "BeginTxnTimeMs")] - begin_txn_time_ms: i32, + pub begin_txn_time_ms: Option, #[serde(rename = "ReadDataTimeMs")] - read_data_time_ms: i32, + pub read_data_time_ms: Option, #[serde(rename = "WriteDataTimeMs")] - write_data_time_ms: i32, + pub write_data_time_ms: Option, #[serde(rename = "CommitAndPublishTimeMs")] - commit_and_publish_time_ms: i32, + pub commit_and_publish_time_ms: Option, #[serde(rename = "StreamLoadPlanTimeMs")] - stream_load_plan_time_ms: Option, + pub stream_load_plan_time_ms: Option, + #[serde(rename = "ExistingJobStatus")] + pub existing_job_status: Option, } pub struct StarrocksClient { @@ -576,3 +742,136 @@ impl StarrocksClient { Ok(res) } } + +pub struct StarrocksSinkCommitter { + client: Arc, +} + +pub struct StarrocksTxnClient { + request_builder: StarrocksTxnRequestBuilder, +} + +impl StarrocksTxnClient { + pub fn new(request_builder: StarrocksTxnRequestBuilder) -> Self { + Self { request_builder } + } + + fn check_response_and_extract_label(&self, res: Bytes) -> Result { + let res: StarrocksInsertResultResponse = serde_json::from_slice(&res) + .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + if !DORIS_SUCCESS_STATUS.contains(&res.status.as_str()) { + return Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!( + "transaction error: {:?}", + res.message, + ))); + } + res.label.ok_or_else(|| { + SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't get label from response")) + }) + } + + pub async fn begin(&self, label: String) -> Result { + let res = self + .request_builder + .build_begin_request_sender(label)? + .send() + .await?; + self.check_response_and_extract_label(res) + } + + pub async fn prepare(&self, label: String) -> Result { + let res = self + .request_builder + .build_prepare_request_sender(label)? + .send() + .await?; + self.check_response_and_extract_label(res) + } + + pub async fn commit(&self, label: String) -> Result { + let res = self + .request_builder + .build_commit_request_sender(label)? + .send() + .await?; + self.check_response_and_extract_label(res) + } + + pub async fn rollback(&self, label: String) -> Result { + let res = self + .request_builder + .build_rollback_request_sender(label)? + .send() + .await?; + self.check_response_and_extract_label(res) + } + + pub async fn load(&self, label: String) -> Result { + self.request_builder.build_txn_inserter(label).await + } +} + +struct StarrocksWriteResult(String); + +impl TryFrom for SinkMetadata { + type Error = SinkError; + + fn try_from(value: StarrocksWriteResult) -> std::result::Result { + if value.0.is_empty() { + return Err(SinkError::DorisStarrocksConnect(anyhow!( + "txn label is empty during serialization" + ))); + } + let metadata = value.0.into_bytes(); + Ok(SinkMetadata { + metadata: Some(Serialized(SerializedMetadata { metadata })), + }) + } +} + +impl TryFrom for StarrocksWriteResult { + type Error = SinkError; + + fn try_from(value: SinkMetadata) -> std::result::Result { + if let Some(Serialized(v)) = value.metadata { + Ok(StarrocksWriteResult( + String::from_utf8(v.metadata) + .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?, + )) + } else { + Err(SinkError::DorisStarrocksConnect(anyhow!( + "no metadata found during deserialization" + ))) + } + } +} + +#[async_trait::async_trait] +impl SinkCommitCoordinator for StarrocksSinkCommitter { + async fn init(&mut self) -> Result<()> { + tracing::info!("Starrocks commit coordinator inited."); + Ok(()) + } + + async fn commit(&mut self, epoch: u64, metadata: Vec) -> Result<()> { + let txn_labels = metadata + .into_iter() + .map(TryFrom::try_from) + .map(|r| r.map(|v: StarrocksWriteResult| v.0)) + .collect::>>()?; + tracing::debug!(?epoch, ?txn_labels, "commit transaction"); + + let join_handles = txn_labels + .into_iter() + .map(|txn_label| { + let client = self.client.clone(); + tokio::spawn(async move { client.commit(txn_label).await }) + }) + .collect::>>>(); + futures::future::try_join_all(join_handles) + .await + .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + + Ok(()) + } +} From 64da9a4ead8c32706c0df1819eeadcd8c0d7b55f Mon Sep 17 00:00:00 2001 From: ly9chee <54972801+ly9chee@users.noreply.github.com> Date: Sat, 18 May 2024 23:41:36 +0800 Subject: [PATCH 05/15] fix commit logic --- .../src/sink/doris_starrocks_connector.rs | 1 + src/connector/src/sink/starrocks.rs | 86 +++++++++++-------- 2 files changed, 51 insertions(+), 36 deletions(-) diff --git a/src/connector/src/sink/doris_starrocks_connector.rs b/src/connector/src/sink/doris_starrocks_connector.rs index 86ef7b070bfb..b974c40250d8 100644 --- a/src/connector/src/sink/doris_starrocks_connector.rs +++ b/src/connector/src/sink/doris_starrocks_connector.rs @@ -584,6 +584,7 @@ impl StarrocksTxnRequestBuilder { .execute(request_for_redirection) .await .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + let status = response.status(); let raw = response .bytes() diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs index da0dd4459209..0262b17be4e7 100644 --- a/src/connector/src/sink/starrocks.rs +++ b/src/connector/src/sink/starrocks.rs @@ -538,7 +538,10 @@ impl SinkWriter for StarrocksSinkWriter { } async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { - if self.client.is_none() { + // We check whether start a new transaction in `write_batch`. Therefore, if no data has been written + // within the `commit_checkpoint_interval` period, no meta requests will be made. Otherwise if we request + // `prepare` against an empty transaction, the `StarRocks` will report a `hasn't send any data yet` error. + if self.curr_txn_label.is_none() { let txn_label = self.new_txn_label(); tracing::debug!(?txn_label, "begin transaction"); let txn_label_res = self.txn_client.begin(txn_label.clone()).await?; @@ -547,9 +550,12 @@ impl SinkWriter for StarrocksSinkWriter { "label responding from StarRocks: {} differ from generated one: {}", txn_label, txn_label_res ); - self.curr_txn_label = Some(txn_label.clone()); - self.client = Some(StarrocksClient::new(self.txn_client.load(txn_label).await?)); + } + if self.client.is_none() { + let txn_label = self.curr_txn_label.clone(); + assert!(txn_label.is_some(), "transaction label is none during load"); + self.client = Some(StarrocksClient::new(self.txn_client.load(txn_label.unwrap()).await?)); } if self.is_append_only { self.append_only(chunk).await @@ -570,9 +576,10 @@ impl SinkWriter for StarrocksSinkWriter { .take() .ok_or_else(|| SinkError::Starrocks("Can't find starrocks inserter".to_string()))?; client.finish().await?; + } - if is_checkpoint { - assert!(self.curr_txn_label.is_some(), "no txn label during prepare"); + if is_checkpoint { + if self.curr_txn_label.is_some() { let txn_label = self.curr_txn_label.take().unwrap(); tracing::debug!(?txn_label, "prepare transaction"); let txn_label_res = self.txn_client.prepare(txn_label.clone()).await?; @@ -580,15 +587,18 @@ impl SinkWriter for StarrocksSinkWriter { txn_label, txn_label_res, "label responding from StarRocks differs from the current one" ); - return Ok(Some(StarrocksWriteResult(txn_label).try_into()?)); + Ok(Some(StarrocksWriteResult(Some(txn_label)).try_into()?)) + } else { + // no data was written within previous epoch + Ok(Some(StarrocksWriteResult(None).try_into()?)) } + } else { + Ok(None) } - Ok(None) } async fn abort(&mut self) -> Result<()> { - if self.client.is_some() && self.curr_txn_label.is_some() { - self.client.take(); + if self.curr_txn_label.is_some() { let txn_label = self.curr_txn_label.take().unwrap(); tracing::debug!(?txn_label, "rollback transaction"); self.txn_client.rollback(txn_label).await?; @@ -811,21 +821,21 @@ impl StarrocksTxnClient { } } -struct StarrocksWriteResult(String); +struct StarrocksWriteResult(Option); impl TryFrom for SinkMetadata { type Error = SinkError; fn try_from(value: StarrocksWriteResult) -> std::result::Result { - if value.0.is_empty() { - return Err(SinkError::DorisStarrocksConnect(anyhow!( - "txn label is empty during serialization" - ))); + match value.0 { + Some(label) => { + let metadata = label.into_bytes(); + Ok(SinkMetadata { + metadata: Some(Serialized(SerializedMetadata { metadata })), + }) + } + None => Ok(SinkMetadata { metadata: None }), } - let metadata = value.0.into_bytes(); - Ok(SinkMetadata { - metadata: Some(Serialized(SerializedMetadata { metadata })), - }) } } @@ -834,14 +844,12 @@ impl TryFrom for StarrocksWriteResult { fn try_from(value: SinkMetadata) -> std::result::Result { if let Some(Serialized(v)) = value.metadata { - Ok(StarrocksWriteResult( + Ok(StarrocksWriteResult(Some( String::from_utf8(v.metadata) .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?, - )) - } else { - Err(SinkError::DorisStarrocksConnect(anyhow!( - "no metadata found during deserialization" ))) + } else { + Ok(StarrocksWriteResult(None)) } } } @@ -854,24 +862,30 @@ impl SinkCommitCoordinator for StarrocksSinkCommitter { } async fn commit(&mut self, epoch: u64, metadata: Vec) -> Result<()> { - let txn_labels = metadata + let write_results = metadata .into_iter() .map(TryFrom::try_from) - .map(|r| r.map(|v: StarrocksWriteResult| v.0)) - .collect::>>()?; - tracing::debug!(?epoch, ?txn_labels, "commit transaction"); + .collect::>>()?; - let join_handles = txn_labels + let txn_labels = write_results .into_iter() - .map(|txn_label| { - let client = self.client.clone(); - tokio::spawn(async move { client.commit(txn_label).await }) - }) - .collect::>>>(); - futures::future::try_join_all(join_handles) - .await - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + .filter_map(|v| v.0) + .collect::>(); + + tracing::debug!(?epoch, ?txn_labels, "commit transaction"); + if !txn_labels.is_empty() { + let join_handles = txn_labels + .into_iter() + .map(|txn_label| { + let client = self.client.clone(); + tokio::spawn(async move { client.commit(txn_label).await }) + }) + .collect::>>>(); + futures::future::try_join_all(join_handles) + .await + .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + } Ok(()) } } From 752a77017d767c8c14fc6274bd27d57686950af5 Mon Sep 17 00:00:00 2001 From: ly9chee <54972801+ly9chee@users.noreply.github.com> Date: Sun, 19 May 2024 19:01:32 +0800 Subject: [PATCH 06/15] fix rustfmt --- src/connector/src/sink/starrocks.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs index 0262b17be4e7..16ec983d3282 100644 --- a/src/connector/src/sink/starrocks.rs +++ b/src/connector/src/sink/starrocks.rs @@ -539,7 +539,7 @@ impl SinkWriter for StarrocksSinkWriter { async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { // We check whether start a new transaction in `write_batch`. Therefore, if no data has been written - // within the `commit_checkpoint_interval` period, no meta requests will be made. Otherwise if we request + // within the `commit_checkpoint_interval` period, no meta requests will be made. Otherwise if we request // `prepare` against an empty transaction, the `StarRocks` will report a `hasn't send any data yet` error. if self.curr_txn_label.is_none() { let txn_label = self.new_txn_label(); @@ -555,7 +555,9 @@ impl SinkWriter for StarrocksSinkWriter { if self.client.is_none() { let txn_label = self.curr_txn_label.clone(); assert!(txn_label.is_some(), "transaction label is none during load"); - self.client = Some(StarrocksClient::new(self.txn_client.load(txn_label.unwrap()).await?)); + self.client = Some(StarrocksClient::new( + self.txn_client.load(txn_label.unwrap()).await?, + )); } if self.is_append_only { self.append_only(chunk).await From a5601b4641ec903d2c319f07d1ad02b856c6a893 Mon Sep 17 00:00:00 2001 From: ly9chee <54972801+ly9chee@users.noreply.github.com> Date: Sun, 19 May 2024 23:04:26 +0800 Subject: [PATCH 07/15] add with_options --- src/connector/with_options_sink.yaml | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 83f7d3bba3cd..0bcecb970b97 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -733,8 +733,19 @@ StarrocksConfig: field_type: String comments: The `StarRocks` table you want to sink data to. required: true + - name: starrocks.stream_load.http.timeout.ms + field_type: u64 + comments: The timeout in milliseconds for stream load http request, defaults to 10 seconds. + required: false + default: 10 * 1000 + - name: commit_checkpoint_interval + field_type: u64 + comments: Set this option to a positive integer n, RisingWave will try to commit data to Starrocks at every n checkpoints by leveraging the [StreamLoad Transaction API](https://docs.starrocks.io/docs/loading/Stream_Load_transaction_interface/), also, in this time, the `sink_decouple` option should be enabled as well. Defaults to 1 if commit_checkpoint_interval <= 0 + required: false + default: Default::default - name: starrocks.partial_update field_type: String + comments: Enable partial update required: false - name: r#type field_type: String From cfc46420ec7f3a3f546c819c9e2b0401d1fdef60 Mon Sep 17 00:00:00 2001 From: ly9chee <54972801+ly9chee@users.noreply.github.com> Date: Mon, 20 May 2024 11:18:10 +0800 Subject: [PATCH 08/15] separate doris/starrocks success status --- src/connector/src/sink/doris_starrocks_connector.rs | 3 ++- src/connector/src/sink/starrocks.rs | 8 ++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/connector/src/sink/doris_starrocks_connector.rs b/src/connector/src/sink/doris_starrocks_connector.rs index b974c40250d8..9874ccaaec42 100644 --- a/src/connector/src/sink/doris_starrocks_connector.rs +++ b/src/connector/src/sink/doris_starrocks_connector.rs @@ -33,7 +33,8 @@ use super::{Result, SinkError}; const BUFFER_SIZE: usize = 64 * 1024; const MIN_CHUNK_SIZE: usize = BUFFER_SIZE - 1024; -pub(crate) const DORIS_SUCCESS_STATUS: [&str; 1] = ["OK"]; +pub(crate) const DORIS_SUCCESS_STATUS: [&str; 2] = ["Success", "Publish Timeout"]; +pub(crate) const STARROCKS_SUCCESS_STATUS: [&str; 1] = ["OK"]; pub(crate) const DORIS_DELETE_SIGN: &str = "__DORIS_DELETE_SIGN__"; pub(crate) const STARROCKS_DELETE_SIGN: &str = "__op"; diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs index 16ec983d3282..dd7d3b6c6917 100644 --- a/src/connector/src/sink/starrocks.rs +++ b/src/connector/src/sink/starrocks.rs @@ -39,8 +39,8 @@ use url::form_urlencoded; use with_options::WithOptions; use super::doris_starrocks_connector::{ - HeaderBuilder, InserterInner, StarrocksTxnRequestBuilder, DORIS_SUCCESS_STATUS, - STARROCKS_DELETE_SIGN, + HeaderBuilder, InserterInner, StarrocksTxnRequestBuilder, STARROCKS_DELETE_SIGN, + STARROCKS_SUCCESS_STATUS, }; use super::encoder::{JsonEncoder, RowEncoder}; use super::{ @@ -745,7 +745,7 @@ impl StarrocksClient { let res: StarrocksInsertResultResponse = serde_json::from_slice(&raw) .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; - if !DORIS_SUCCESS_STATUS.contains(&res.status.as_str()) { + if !STARROCKS_SUCCESS_STATUS.contains(&res.status.as_str()) { return Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!( "Insert error: {:?}", res.message, @@ -771,7 +771,7 @@ impl StarrocksTxnClient { fn check_response_and_extract_label(&self, res: Bytes) -> Result { let res: StarrocksInsertResultResponse = serde_json::from_slice(&res) .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; - if !DORIS_SUCCESS_STATUS.contains(&res.status.as_str()) { + if !STARROCKS_SUCCESS_STATUS.contains(&res.status.as_str()) { return Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!( "transaction error: {:?}", res.message, From 5daac28b9525173918765b89ac3a9988b01d38b5 Mon Sep 17 00:00:00 2001 From: ly9chee <54972801+ly9chee@users.noreply.github.com> Date: Mon, 20 May 2024 11:37:03 +0800 Subject: [PATCH 09/15] reorganize code --- src/connector/src/sink/doris_starrocks_connector.rs | 1 - src/connector/src/sink/starrocks.rs | 8 ++++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/connector/src/sink/doris_starrocks_connector.rs b/src/connector/src/sink/doris_starrocks_connector.rs index 9874ccaaec42..1a38d5310b53 100644 --- a/src/connector/src/sink/doris_starrocks_connector.rs +++ b/src/connector/src/sink/doris_starrocks_connector.rs @@ -354,7 +354,6 @@ impl InserterInner { pub async fn write(&mut self, data: Bytes) -> Result<()> { self.buffer.put_slice(&data); - // Should we check if self.buffer.len() >= MIN_CHUNK_SIZE { self.send_chunk().await?; } diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs index dd7d3b6c6917..654fce89dc97 100644 --- a/src/connector/src/sink/starrocks.rs +++ b/src/connector/src/sink/starrocks.rs @@ -755,10 +755,6 @@ impl StarrocksClient { } } -pub struct StarrocksSinkCommitter { - client: Arc, -} - pub struct StarrocksTxnClient { request_builder: StarrocksTxnRequestBuilder, } @@ -856,6 +852,10 @@ impl TryFrom for StarrocksWriteResult { } } +pub struct StarrocksSinkCommitter { + client: Arc, +} + #[async_trait::async_trait] impl SinkCommitCoordinator for StarrocksSinkCommitter { async fn init(&mut self) -> Result<()> { From 3168941cc1f1fac3e7c8d5a3c42c482943d726c8 Mon Sep 17 00:00:00 2001 From: ly9chee <54972801+ly9chee@users.noreply.github.com> Date: Mon, 20 May 2024 15:31:35 +0800 Subject: [PATCH 10/15] strip Arc from reqwest::Client --- src/connector/src/sink/doris_starrocks_connector.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/connector/src/sink/doris_starrocks_connector.rs b/src/connector/src/sink/doris_starrocks_connector.rs index 1a38d5310b53..85021179e331 100644 --- a/src/connector/src/sink/doris_starrocks_connector.rs +++ b/src/connector/src/sink/doris_starrocks_connector.rs @@ -16,7 +16,6 @@ use core::mem; use core::time::Duration; use std::collections::HashMap; use std::convert::Infallible; -use std::sync::Arc; use anyhow::Context; use base64::engine::general_purpose; @@ -381,13 +380,13 @@ impl InserterInner { } pub struct MetaRequestSender { - client: Arc, + client: Client, request: Request, fe_host: String, } impl MetaRequestSender { - pub fn new(client: Arc, request: Request, fe_host: String) -> Self { + pub fn new(client: Client, request: Request, fe_host: String) -> Self { Self { client, request, @@ -445,9 +444,9 @@ pub struct StarrocksTxnRequestBuilder { header: HashMap, fe_host: String, stream_load_http_timeout: Duration, - // `client` needs to be shared with `MetaRequestSender` and `TxnInserterInner`, There's no need to - // build an HTTP client every time a request is made, so we use `Arc`. - client: Arc, + // The `reqwest` crate suggests us reuse the Client, and we don't need make it Arc, because it + // already uses an Arc internally. + client: Client, } impl StarrocksTxnRequestBuilder { @@ -487,7 +486,7 @@ impl StarrocksTxnRequestBuilder { header, fe_host, stream_load_http_timeout, - client: Arc::new(client), + client, }) } From 73cc5dd37f350d67d5390a5857845100cad7fe77 Mon Sep 17 00:00:00 2001 From: rex <54972801+ly9chee@users.noreply.github.com> Date: Mon, 20 May 2024 15:56:29 +0800 Subject: [PATCH 11/15] fix comment Co-authored-by: TennyZhuang --- src/connector/src/sink/deltalake.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connector/src/sink/deltalake.rs b/src/connector/src/sink/deltalake.rs index f5940df637ca..ea5cb9dafb99 100644 --- a/src/connector/src/sink/deltalake.rs +++ b/src/connector/src/sink/deltalake.rs @@ -69,7 +69,7 @@ pub struct DeltaLakeCommon { pub s3_endpoint: Option, #[serde(rename = "gcs.service.account")] pub gcs_service_account: Option, - // Commit every n(>0) checkpoints, if n is not set, we will commit every checkpoint. + /// Commit every n(>0) checkpoints, if n is not set, we will commit every checkpoint. #[serde(default, deserialize_with = "deserialize_optional_u64_from_string")] pub commit_checkpoint_interval: Option, } From 42d5b6d83193d5dffe365c00b3b78222ad25b267 Mon Sep 17 00:00:00 2001 From: ly9chee <54972801+ly9chee@users.noreply.github.com> Date: Tue, 21 May 2024 15:32:32 +0800 Subject: [PATCH 12/15] add rustdoc for DecoupleCheckpointLogSinkerOf --- src/connector/src/sink/decouple_checkpoint_log_sink.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/connector/src/sink/decouple_checkpoint_log_sink.rs b/src/connector/src/sink/decouple_checkpoint_log_sink.rs index ed30d83eae69..0c923385dbe8 100644 --- a/src/connector/src/sink/decouple_checkpoint_log_sink.rs +++ b/src/connector/src/sink/decouple_checkpoint_log_sink.rs @@ -21,6 +21,9 @@ use crate::sink::log_store::{LogStoreReadItem, TruncateOffset}; use crate::sink::writer::SinkWriter; use crate::sink::{LogSinker, Result, SinkLogReader, SinkMetrics}; +/// The `LogSinker` implementation used for commit-decoupled sinks (such as Iceberg DeltaLake and StarRocks). +/// The concurrent/frequent commit capability of these sinks is poor, so by leveraging the decoupled log reader, +/// we delay the checkpoint barrier to make commits less frequent. pub struct DecoupleCheckpointLogSinkerOf { writer: W, sink_metrics: SinkMetrics, From e35d6c1b1a679281b015a9b41549af138387ade6 Mon Sep 17 00:00:00 2001 From: ly9chee <54972801+ly9chee@users.noreply.github.com> Date: Tue, 21 May 2024 16:31:41 +0800 Subject: [PATCH 13/15] refine error handling --- .../src/sink/decouple_checkpoint_log_sink.rs | 2 +- src/connector/src/sink/deltalake.rs | 2 +- .../src/sink/doris_starrocks_connector.rs | 106 +++++++++--------- src/connector/src/sink/starrocks.rs | 18 +-- 4 files changed, 63 insertions(+), 65 deletions(-) diff --git a/src/connector/src/sink/decouple_checkpoint_log_sink.rs b/src/connector/src/sink/decouple_checkpoint_log_sink.rs index 0c923385dbe8..9eaba2a10f12 100644 --- a/src/connector/src/sink/decouple_checkpoint_log_sink.rs +++ b/src/connector/src/sink/decouple_checkpoint_log_sink.rs @@ -21,7 +21,7 @@ use crate::sink::log_store::{LogStoreReadItem, TruncateOffset}; use crate::sink::writer::SinkWriter; use crate::sink::{LogSinker, Result, SinkLogReader, SinkMetrics}; -/// The `LogSinker` implementation used for commit-decoupled sinks (such as Iceberg DeltaLake and StarRocks). +/// The `LogSinker` implementation used for commit-decoupled sinks (such as `Iceberg`, `DeltaLake` and `StarRocks`). /// The concurrent/frequent commit capability of these sinks is poor, so by leveraging the decoupled log reader, /// we delay the checkpoint barrier to make commits less frequent. pub struct DecoupleCheckpointLogSinkerOf { diff --git a/src/connector/src/sink/deltalake.rs b/src/connector/src/sink/deltalake.rs index ea5cb9dafb99..c16143aa6ac5 100644 --- a/src/connector/src/sink/deltalake.rs +++ b/src/connector/src/sink/deltalake.rs @@ -295,7 +295,7 @@ impl Sink for DeltaLakeSink { SinkDecouple::Disable => { if config_decouple { return Err(SinkError::Config(anyhow!( - "config conflict: DeltaLake config `commit_checkpoint_interval` bigger than 1 which means that must enable sink decouple, but session config sink decouple is disabled" + "config conflict: DeltaLake config `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled" ))); } Ok(false) diff --git a/src/connector/src/sink/doris_starrocks_connector.rs b/src/connector/src/sink/doris_starrocks_connector.rs index 85021179e331..2bc4f7019b6b 100644 --- a/src/connector/src/sink/doris_starrocks_connector.rs +++ b/src/connector/src/sink/doris_starrocks_connector.rs @@ -17,7 +17,7 @@ use core::time::Duration; use std::collections::HashMap; use std::convert::Infallible; -use anyhow::Context; +use anyhow::{anyhow, Context}; use base64::engine::general_purpose; use base64::Engine; use bytes::{BufMut, Bytes, BytesMut}; @@ -183,21 +183,19 @@ fn try_get_be_url(resp: &Response, fe_host: String) -> Result> { .headers() .get("location") .ok_or_else(|| { - SinkError::DorisStarrocksConnect(anyhow::anyhow!( - "Can't get doris BE url in header", - )) + SinkError::DorisStarrocksConnect(anyhow!("Can't get doris BE url in header",)) })? .to_str() .context("Can't get doris BE url in header") .map_err(SinkError::DorisStarrocksConnect)? .to_string(); - let mut parsed_be_url = - Url::parse(&be_url).map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + let mut parsed_be_url = Url::parse(&be_url) + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?; if fe_host != LOCALHOST && fe_host != LOCALHOST_IP { let be_host = parsed_be_url.host_str().ok_or_else(|| { - SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't get be host from url")) + SinkError::DorisStarrocksConnect(anyhow!("Can't get be host from url")) })?; if be_host == LOCALHOST || be_host == LOCALHOST_IP { @@ -205,7 +203,7 @@ fn try_get_be_url(resp: &Response, fe_host: String) -> Result> { // so replace it with fe host parsed_be_url .set_host(Some(fe_host.as_str())) - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?; } } Ok(Some(parsed_be_url)) @@ -216,7 +214,7 @@ fn try_get_be_url(resp: &Response, fe_host: String) -> Result> { // In this case, the request should be treated as finished. Ok(None) } - _ => Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!( + _ => Err(SinkError::DorisStarrocksConnect(anyhow!( "Can't get doris BE url", ))), } @@ -236,11 +234,9 @@ impl InserterInnerBuilder { header: HashMap, ) -> Result { let fe_host = Url::parse(&url) - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))? + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))? .host_str() - .ok_or_else(|| { - SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't get fe host from url")) - })? + .ok_or_else(|| SinkError::DorisStarrocksConnect(anyhow!("Can't get fe host from url")))? .to_string(); let url = format!("{}/api/{}/{}/_stream_load", url, db, table); @@ -271,11 +267,10 @@ impl InserterInnerBuilder { let resp = builder .send() .await - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?; - let be_url = try_get_be_url(&resp, self.fe_host.clone())?.ok_or_else(|| { - SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't get doris BE url",)) - })?; + let be_url = try_get_be_url(&resp, self.fe_host.clone())? + .ok_or_else(|| SinkError::DorisStarrocksConnect(anyhow!("Can't get doris BE url",)))?; let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); let body = Body::wrap_stream( @@ -287,22 +282,26 @@ impl InserterInnerBuilder { let response = builder .send() .await - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?; let status = response.status(); let raw = response .bytes() .await - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))? + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))? .into(); if status == StatusCode::OK { Ok(raw) } else { - Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!( + let response_body = String::from_utf8(raw).map_err(|err| { + SinkError::DorisStarrocksConnect( + anyhow!(err).context("failed to parse response body"), + ) + })?; + Err(SinkError::DorisStarrocksConnect(anyhow!( "Failed connection {:?},{:?}", status, - String::from_utf8(raw) - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))? + response_body ))) } }); @@ -343,9 +342,7 @@ impl InserterInner { self.sender.take(); self.wait_handle().await?; - Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!( - "channel closed" - ))) + Err(SinkError::DorisStarrocksConnect(anyhow!("channel closed"))) } else { Ok(()) } @@ -364,8 +361,8 @@ impl InserterInner { match tokio::time::timeout(WAIT_HANDDLE_TIMEOUT, self.join_handle.as_mut().unwrap()) .await { - Ok(res) => res.map_err(|err| SinkError::DorisStarrocksConnect(err.into()))??, - Err(err) => return Err(SinkError::DorisStarrocksConnect(err.into())), + Ok(res) => res.map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))??, + Err(err) => return Err(SinkError::DorisStarrocksConnect(anyhow!(err))), }; Ok(res) } @@ -403,15 +400,15 @@ impl MetaRequestSender { /// For example, the request to `/api/transaction/commit` endpoint does not seem to redirect to BE. pub async fn send(self) -> Result { let request = self.request; - let mut request_for_redirection = request.try_clone().ok_or_else(|| { - SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't clone request")) - })?; + let mut request_for_redirection = request + .try_clone() + .ok_or_else(|| SinkError::DorisStarrocksConnect(anyhow!("Can't clone request")))?; let resp = self .client .execute(request) .await - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?; let be_url = try_get_be_url(&resp, self.fe_host)?; @@ -422,15 +419,15 @@ impl MetaRequestSender { self.client .execute(request_for_redirection) .await - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))? + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))? .bytes() .await - .map_err(|err| SinkError::DorisStarrocksConnect(err.into())) + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err))) } None => resp .bytes() .await - .map_err(|err| SinkError::DorisStarrocksConnect(err.into())), + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err))), } } } @@ -456,11 +453,9 @@ impl StarrocksTxnRequestBuilder { stream_load_http_timeout_ms: u64, ) -> Result { let fe_host = Url::parse(&url) - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))? + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))? .host_str() - .ok_or_else(|| { - SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't get fe host from url")) - })? + .ok_or_else(|| SinkError::DorisStarrocksConnect(anyhow!("Can't get fe host from url")))? .to_string(); let url_begin = format!("{}/api/transaction/begin", url); @@ -492,7 +487,7 @@ impl StarrocksTxnRequestBuilder { fn build_request(&self, uri: String, method: Method, label: String) -> Result { let parsed_url = - Url::parse(&uri).map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + Url::parse(&uri).map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?; let mut request = Request::new(method, parsed_url); if uri != self.url_load { @@ -504,15 +499,15 @@ impl StarrocksTxnRequestBuilder { for (k, v) in &self.header { header.insert( HeaderName::try_from(k) - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?, + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?, HeaderValue::try_from(v) - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?, + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?, ); } header.insert( "label", HeaderValue::try_from(label) - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?, + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?, ); Ok(request) @@ -556,19 +551,18 @@ impl StarrocksTxnRequestBuilder { pub async fn build_txn_inserter(&self, label: String) -> Result { let request = self.build_request(self.url_load.clone(), Method::PUT, label)?; - let mut request_for_redirection = request.try_clone().ok_or_else(|| { - SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't clone request")) - })?; + let mut request_for_redirection = request + .try_clone() + .ok_or_else(|| SinkError::DorisStarrocksConnect(anyhow!("Can't clone request")))?; let resp = self .client .execute(request) .await - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?; - let be_url = try_get_be_url(&resp, self.fe_host.clone())?.ok_or_else(|| { - SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't get doris BE url",)) - })?; + let be_url = try_get_be_url(&resp, self.fe_host.clone())? + .ok_or_else(|| SinkError::DorisStarrocksConnect(anyhow!("Can't get doris BE url",)))?; *request_for_redirection.url_mut() = be_url; let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); @@ -582,23 +576,27 @@ impl StarrocksTxnRequestBuilder { let response = client .execute(request_for_redirection) .await - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?; let status = response.status(); let raw = response .bytes() .await - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))? + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))? .into(); if status == StatusCode::OK { Ok(raw) } else { - Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!( + let response_body = String::from_utf8(raw).map_err(|err| { + SinkError::DorisStarrocksConnect( + anyhow!(err).context("failed to parse response body"), + ) + })?; + Err(SinkError::DorisStarrocksConnect(anyhow!( "Failed connection {:?},{:?}", status, - String::from_utf8(raw) - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))? + response_body ))) } }); diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs index 654fce89dc97..27ecf8ab5d03 100644 --- a/src/connector/src/sink/starrocks.rs +++ b/src/connector/src/sink/starrocks.rs @@ -269,7 +269,7 @@ impl Sink for StarrocksSink { SinkDecouple::Disable => { if config_decouple { return Err(SinkError::Config(anyhow!( - "config conflict: Starrocks config `commit_checkpoint_interval` bigger than 1 which means that must enable sink decouple, but session config sink decouple is disabled" + "config conflict: StarRocks config `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled" ))); } Ok(false) @@ -646,12 +646,12 @@ impl StarrocksSchemaClient { ); let pool = mysql_async::Pool::new( Opts::from_url(&conn_uri) - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?, + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?, ); let conn = pool .get_conn() .await - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?; Ok(Self { table, db, conn }) } @@ -664,7 +664,7 @@ impl StarrocksSchemaClient { query_map.insert(column_name, column_type) }) .await - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?; Ok(query_map) } @@ -676,7 +676,7 @@ impl StarrocksSchemaClient { (table_model, primary_key) }) .await - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))? + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))? .first() .ok_or_else(|| { SinkError::Starrocks(format!( @@ -743,7 +743,7 @@ impl StarrocksClient { pub async fn finish(self) -> Result { let raw = self.insert.finish().await?; let res: StarrocksInsertResultResponse = serde_json::from_slice(&raw) - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?; if !STARROCKS_SUCCESS_STATUS.contains(&res.status.as_str()) { return Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!( @@ -766,7 +766,7 @@ impl StarrocksTxnClient { fn check_response_and_extract_label(&self, res: Bytes) -> Result { let res: StarrocksInsertResultResponse = serde_json::from_slice(&res) - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?; if !STARROCKS_SUCCESS_STATUS.contains(&res.status.as_str()) { return Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!( "transaction error: {:?}", @@ -844,7 +844,7 @@ impl TryFrom for StarrocksWriteResult { if let Some(Serialized(v)) = value.metadata { Ok(StarrocksWriteResult(Some( String::from_utf8(v.metadata) - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?, + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?, ))) } else { Ok(StarrocksWriteResult(None)) @@ -886,7 +886,7 @@ impl SinkCommitCoordinator for StarrocksSinkCommitter { .collect::>>>(); futures::future::try_join_all(join_handles) .await - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?; } Ok(()) } From d48599bd68b437d45c2ca40b0eac05958e0b9e01 Mon Sep 17 00:00:00 2001 From: ly9chee <54972801+ly9chee@users.noreply.github.com> Date: Fri, 24 May 2024 19:28:39 +0800 Subject: [PATCH 14/15] add missing with_options comment and fix stream_load_timeout deserialization issue --- src/connector/src/sink/starrocks.rs | 6 ++++-- src/connector/with_options_sink.yaml | 3 ++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs index 27ecf8ab5d03..18a89d09b392 100644 --- a/src/connector/src/sink/starrocks.rs +++ b/src/connector/src/sink/starrocks.rs @@ -32,7 +32,7 @@ use risingwave_pb::connector_service::SinkMetadata; use serde::Deserialize; use serde_derive::Serialize; use serde_json::Value; -use serde_with::serde_as; +use serde_with::{serde_as, DisplayFromStr}; use thiserror_ext::AsReport; use tokio::task::JoinHandle; use url::form_urlencoded; @@ -59,7 +59,7 @@ const STARROCK_MYSQL_MAX_ALLOWED_PACKET: usize = 1024; const STARROCK_MYSQL_WAIT_TIMEOUT: usize = 28800; const fn _default_stream_load_http_timeout_ms() -> u64 { - 10 * 1000 + 30 * 1000 } #[derive(Deserialize, Debug, Clone, WithOptions)] @@ -98,6 +98,7 @@ pub struct StarrocksConfig { rename = "starrocks.stream_load.http.timeout.ms", default = "_default_stream_load_http_timeout_ms" )] + #[serde_as(as = "DisplayFromStr")] pub stream_load_http_timeout_ms: u64, /// Set this option to a positive integer n, RisingWave will try to commit data @@ -114,6 +115,7 @@ pub struct StarrocksConfig { pub r#type: String, // accept "append-only" or "upsert" } + impl StarrocksConfig { pub fn from_hashmap(properties: HashMap) -> Result { let config = diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 0bcecb970b97..ec67b978d42e 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -109,6 +109,7 @@ DeltaLakeConfig: required: false - name: commit_checkpoint_interval field_type: u64 + comments: Commit every n(>0) checkpoints, if n is not set, we will commit every checkpoint. required: false default: Default::default - name: r#type @@ -737,7 +738,7 @@ StarrocksConfig: field_type: u64 comments: The timeout in milliseconds for stream load http request, defaults to 10 seconds. required: false - default: 10 * 1000 + default: 30 * 1000 - name: commit_checkpoint_interval field_type: u64 comments: Set this option to a positive integer n, RisingWave will try to commit data to Starrocks at every n checkpoints by leveraging the [StreamLoad Transaction API](https://docs.starrocks.io/docs/loading/Stream_Load_transaction_interface/), also, in this time, the `sink_decouple` option should be enabled as well. Defaults to 1 if commit_checkpoint_interval <= 0 From 82c8779ada8230ce7c95fa05b056466a71aa5d3a Mon Sep 17 00:00:00 2001 From: ly9chee <54972801+ly9chee@users.noreply.github.com> Date: Wed, 5 Jun 2024 17:38:31 +0800 Subject: [PATCH 15/15] fix forget to use stream_load_http_timeout --- .../src/sink/doris_starrocks_connector.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/connector/src/sink/doris_starrocks_connector.rs b/src/connector/src/sink/doris_starrocks_connector.rs index 2bc4f7019b6b..26f3e863bb23 100644 --- a/src/connector/src/sink/doris_starrocks_connector.rs +++ b/src/connector/src/sink/doris_starrocks_connector.rs @@ -357,13 +357,15 @@ impl InserterInner { } async fn wait_handle(&mut self) -> Result> { - let res = - match tokio::time::timeout(WAIT_HANDDLE_TIMEOUT, self.join_handle.as_mut().unwrap()) - .await - { - Ok(res) => res.map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))??, - Err(err) => return Err(SinkError::DorisStarrocksConnect(anyhow!(err))), - }; + let res = match tokio::time::timeout( + self.stream_load_http_timeout, + self.join_handle.as_mut().unwrap(), + ) + .await + { + Ok(res) => res.map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))??, + Err(err) => return Err(SinkError::DorisStarrocksConnect(anyhow!(err))), + }; Ok(res) }