diff --git a/src/connector/src/sink/decouple_checkpoint_log_sink.rs b/src/connector/src/sink/decouple_checkpoint_log_sink.rs index ed30d83eae69..9eaba2a10f12 100644 --- a/src/connector/src/sink/decouple_checkpoint_log_sink.rs +++ b/src/connector/src/sink/decouple_checkpoint_log_sink.rs @@ -21,6 +21,9 @@ use crate::sink::log_store::{LogStoreReadItem, TruncateOffset}; use crate::sink::writer::SinkWriter; use crate::sink::{LogSinker, Result, SinkLogReader, SinkMetrics}; +/// The `LogSinker` implementation used for commit-decoupled sinks (such as `Iceberg`, `DeltaLake` and `StarRocks`). +/// The concurrent/frequent commit capability of these sinks is poor, so by leveraging the decoupled log reader, +/// we delay the checkpoint barrier to make commits less frequent. pub struct DecoupleCheckpointLogSinkerOf { writer: W, sink_metrics: SinkMetrics, diff --git a/src/connector/src/sink/deltalake.rs b/src/connector/src/sink/deltalake.rs index f5940df637ca..c16143aa6ac5 100644 --- a/src/connector/src/sink/deltalake.rs +++ b/src/connector/src/sink/deltalake.rs @@ -69,7 +69,7 @@ pub struct DeltaLakeCommon { pub s3_endpoint: Option, #[serde(rename = "gcs.service.account")] pub gcs_service_account: Option, - // Commit every n(>0) checkpoints, if n is not set, we will commit every checkpoint. + /// Commit every n(>0) checkpoints, if n is not set, we will commit every checkpoint. #[serde(default, deserialize_with = "deserialize_optional_u64_from_string")] pub commit_checkpoint_interval: Option, } @@ -295,7 +295,7 @@ impl Sink for DeltaLakeSink { SinkDecouple::Disable => { if config_decouple { return Err(SinkError::Config(anyhow!( - "config conflict: DeltaLake config `commit_checkpoint_interval` bigger than 1 which means that must enable sink decouple, but session config sink decouple is disabled" + "config conflict: DeltaLake config `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled" ))); } Ok(false) diff --git a/src/connector/src/sink/doris_starrocks_connector.rs b/src/connector/src/sink/doris_starrocks_connector.rs index 6c045c63beb4..26f3e863bb23 100644 --- a/src/connector/src/sink/doris_starrocks_connector.rs +++ b/src/connector/src/sink/doris_starrocks_connector.rs @@ -17,12 +17,13 @@ use core::time::Duration; use std::collections::HashMap; use std::convert::Infallible; -use anyhow::Context; +use anyhow::{anyhow, Context}; use base64::engine::general_purpose; use base64::Engine; use bytes::{BufMut, Bytes, BytesMut}; use futures::StreamExt; -use reqwest::{redirect, Body, Client, RequestBuilder, StatusCode}; +use reqwest::header::{HeaderName, HeaderValue}; +use reqwest::{redirect, Body, Client, Method, Request, RequestBuilder, Response, StatusCode}; use tokio::sync::mpsc::UnboundedSender; use tokio::task::JoinHandle; use url::Url; @@ -32,6 +33,7 @@ use super::{Result, SinkError}; const BUFFER_SIZE: usize = 64 * 1024; const MIN_CHUNK_SIZE: usize = BUFFER_SIZE - 1024; pub(crate) const DORIS_SUCCESS_STATUS: [&str; 2] = ["Success", "Publish Timeout"]; +pub(crate) const STARROCKS_SUCCESS_STATUS: [&str; 1] = ["OK"]; pub(crate) const DORIS_DELETE_SIGN: &str = "__DORIS_DELETE_SIGN__"; pub(crate) const STARROCKS_DELETE_SIGN: &str = "__op"; @@ -151,11 +153,73 @@ impl HeaderBuilder { self } + /// Only used in Starrocks Transaction API + pub fn set_db(mut self, db: String) -> Self { + self.header.insert("db".to_string(), db); + self + } + + /// Only used in Starrocks Transaction API + pub fn set_table(mut self, table: String) -> Self { + self.header.insert("table".to_string(), table); + self + } + pub fn build(self) -> HashMap { self.header } } +/// Try getting BE url from a redirected response, returning `Ok(None)` indicates this request does +/// not redirect. +/// +/// The reason we handle the redirection manually is that if we let `reqwest` handle the redirection +/// automatically, it will remove sensitive headers (such as Authorization) during the redirection, +/// and there's no way to prevent this behavior. +fn try_get_be_url(resp: &Response, fe_host: String) -> Result> { + match resp.status() { + StatusCode::TEMPORARY_REDIRECT => { + let be_url = resp + .headers() + .get("location") + .ok_or_else(|| { + SinkError::DorisStarrocksConnect(anyhow!("Can't get doris BE url in header",)) + })? + .to_str() + .context("Can't get doris BE url in header") + .map_err(SinkError::DorisStarrocksConnect)? + .to_string(); + + let mut parsed_be_url = Url::parse(&be_url) + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?; + + if fe_host != LOCALHOST && fe_host != LOCALHOST_IP { + let be_host = parsed_be_url.host_str().ok_or_else(|| { + SinkError::DorisStarrocksConnect(anyhow!("Can't get be host from url")) + })?; + + if be_host == LOCALHOST || be_host == LOCALHOST_IP { + // if be host is 127.0.0.1, we may can't connect to it directly, + // so replace it with fe host + parsed_be_url + .set_host(Some(fe_host.as_str())) + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?; + } + } + Ok(Some(parsed_be_url)) + } + StatusCode::OK => { + // Some of the `StarRocks` transactional APIs will respond directly from FE. For example, + // the request to `/api/transaction/commit` endpoint does not seem to redirect to BE. + // In this case, the request should be treated as finished. + Ok(None) + } + _ => Err(SinkError::DorisStarrocksConnect(anyhow!( + "Can't get doris BE url", + ))), + } +} + pub struct InserterInnerBuilder { url: String, header: HashMap, @@ -170,11 +234,9 @@ impl InserterInnerBuilder { header: HashMap, ) -> Result { let fe_host = Url::parse(&url) - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))? + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))? .host_str() - .ok_or_else(|| { - SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't get fe host from url")) - })? + .ok_or_else(|| SinkError::DorisStarrocksConnect(anyhow!("Can't get fe host from url")))? .to_string(); let url = format!("{}/api/{}/{}/_stream_load", url, db, table); @@ -205,73 +267,45 @@ impl InserterInnerBuilder { let resp = builder .send() .await - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; - // TODO: shall we let `reqwest` handle the redirect? - let mut be_url = if resp.status() == StatusCode::TEMPORARY_REDIRECT { - resp.headers() - .get("location") - .ok_or_else(|| { - SinkError::DorisStarrocksConnect(anyhow::anyhow!( - "Can't get doris BE url in header", - )) - })? - .to_str() - .context("Can't get doris BE url in header") - .map_err(SinkError::DorisStarrocksConnect)? - .to_string() - } else { - return Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!( - "Can't get doris BE url", - ))); - }; + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?; - if self.fe_host != LOCALHOST && self.fe_host != LOCALHOST_IP { - let mut parsed_be_url = - Url::parse(&be_url).map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; - let be_host = parsed_be_url.host_str().ok_or_else(|| { - SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't get be host from url")) - })?; - - if be_host == LOCALHOST || be_host == LOCALHOST_IP { - // if be host is 127.0.0.1, we may can't connect to it directly, - // so replace it with fe host - parsed_be_url - .set_host(Some(self.fe_host.as_str())) - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; - be_url = parsed_be_url.as_str().into(); - } - } + let be_url = try_get_be_url(&resp, self.fe_host.clone())? + .ok_or_else(|| SinkError::DorisStarrocksConnect(anyhow!("Can't get doris BE url",)))?; let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); let body = Body::wrap_stream( tokio_stream::wrappers::UnboundedReceiverStream::new(receiver).map(Ok::<_, Infallible>), ); - let builder = self.build_request(be_url).body(body); + let builder = self.build_request(be_url.into()).body(body); let handle: JoinHandle>> = tokio::spawn(async move { let response = builder .send() .await - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?; let status = response.status(); let raw = response .bytes() .await - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))? + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))? .into(); if status == StatusCode::OK { Ok(raw) } else { - Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!( + let response_body = String::from_utf8(raw).map_err(|err| { + SinkError::DorisStarrocksConnect( + anyhow!(err).context("failed to parse response body"), + ) + })?; + Err(SinkError::DorisStarrocksConnect(anyhow!( "Failed connection {:?},{:?}", status, - String::from_utf8(raw) - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))? + response_body ))) } }); - Ok(InserterInner::new(sender, handle)) + Ok(InserterInner::new(sender, handle, WAIT_HANDDLE_TIMEOUT)) } } @@ -281,13 +315,19 @@ pub struct InserterInner { sender: Option, join_handle: Option>>>, buffer: BytesMut, + stream_load_http_timeout: Duration, } impl InserterInner { - pub fn new(sender: Sender, join_handle: JoinHandle>>) -> Self { + pub fn new( + sender: Sender, + join_handle: JoinHandle>>, + stream_load_http_timeout: Duration, + ) -> Self { Self { sender: Some(sender), join_handle: Some(join_handle), buffer: BytesMut::with_capacity(BUFFER_SIZE), + stream_load_http_timeout, } } @@ -302,9 +342,7 @@ impl InserterInner { self.sender.take(); self.wait_handle().await?; - Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!( - "channel closed" - ))) + Err(SinkError::DorisStarrocksConnect(anyhow!("channel closed"))) } else { Ok(()) } @@ -319,13 +357,15 @@ impl InserterInner { } async fn wait_handle(&mut self) -> Result> { - let res = - match tokio::time::timeout(WAIT_HANDDLE_TIMEOUT, self.join_handle.as_mut().unwrap()) - .await - { - Ok(res) => res.map_err(|err| SinkError::DorisStarrocksConnect(err.into()))??, - Err(err) => return Err(SinkError::DorisStarrocksConnect(err.into())), - }; + let res = match tokio::time::timeout( + self.stream_load_http_timeout, + self.join_handle.as_mut().unwrap(), + ) + .await + { + Ok(res) => res.map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))??, + Err(err) => return Err(SinkError::DorisStarrocksConnect(anyhow!(err))), + }; Ok(res) } @@ -337,3 +377,235 @@ impl InserterInner { self.wait_handle().await } } + +pub struct MetaRequestSender { + client: Client, + request: Request, + fe_host: String, +} + +impl MetaRequestSender { + pub fn new(client: Client, request: Request, fe_host: String) -> Self { + Self { + client, + request, + fe_host, + } + } + + /// Send the request and handle redirection if any. + /// The reason we handle the redirection manually is that if we let `reqwest` handle the redirection + /// automatically, it will remove sensitive headers (such as Authorization) during the redirection, + /// and there's no way to prevent this behavior. + /// + /// Another interesting point is that some of the `StarRocks` transactional APIs will respond directly from FE. + /// For example, the request to `/api/transaction/commit` endpoint does not seem to redirect to BE. + pub async fn send(self) -> Result { + let request = self.request; + let mut request_for_redirection = request + .try_clone() + .ok_or_else(|| SinkError::DorisStarrocksConnect(anyhow!("Can't clone request")))?; + + let resp = self + .client + .execute(request) + .await + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?; + + let be_url = try_get_be_url(&resp, self.fe_host)?; + + match be_url { + Some(be_url) => { + *request_for_redirection.url_mut() = be_url; + + self.client + .execute(request_for_redirection) + .await + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))? + .bytes() + .await + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err))) + } + None => resp + .bytes() + .await + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err))), + } + } +} + +pub struct StarrocksTxnRequestBuilder { + url_begin: String, + url_load: String, + url_prepare: String, + url_commit: String, + url_rollback: String, + header: HashMap, + fe_host: String, + stream_load_http_timeout: Duration, + // The `reqwest` crate suggests us reuse the Client, and we don't need make it Arc, because it + // already uses an Arc internally. + client: Client, +} + +impl StarrocksTxnRequestBuilder { + pub fn new( + url: String, + header: HashMap, + stream_load_http_timeout_ms: u64, + ) -> Result { + let fe_host = Url::parse(&url) + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))? + .host_str() + .ok_or_else(|| SinkError::DorisStarrocksConnect(anyhow!("Can't get fe host from url")))? + .to_string(); + + let url_begin = format!("{}/api/transaction/begin", url); + let url_load = format!("{}/api/transaction/load", url); + let url_prepare = format!("{}/api/transaction/prepare", url); + let url_commit = format!("{}/api/transaction/commit", url); + let url_rollback = format!("{}/api/transaction/rollback", url); + + let stream_load_http_timeout = Duration::from_millis(stream_load_http_timeout_ms); + + let client = Client::builder() + .pool_idle_timeout(POOL_IDLE_TIMEOUT) + .redirect(redirect::Policy::none()) + .build() + .unwrap(); + + Ok(Self { + url_begin, + url_load, + url_prepare, + url_commit, + url_rollback, + header, + fe_host, + stream_load_http_timeout, + client, + }) + } + + fn build_request(&self, uri: String, method: Method, label: String) -> Result { + let parsed_url = + Url::parse(&uri).map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?; + let mut request = Request::new(method, parsed_url); + + if uri != self.url_load { + // Set timeout for non-load requests; load requests' timeout is controlled by `tokio::timeout` + *request.timeout_mut() = Some(self.stream_load_http_timeout); + } + + let header = request.headers_mut(); + for (k, v) in &self.header { + header.insert( + HeaderName::try_from(k) + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?, + HeaderValue::try_from(v) + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?, + ); + } + header.insert( + "label", + HeaderValue::try_from(label) + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?, + ); + + Ok(request) + } + + pub fn build_begin_request_sender(&self, label: String) -> Result { + let request = self.build_request(self.url_begin.clone(), Method::POST, label)?; + Ok(MetaRequestSender::new( + self.client.clone(), + request, + self.fe_host.clone(), + )) + } + + pub fn build_prepare_request_sender(&self, label: String) -> Result { + let request = self.build_request(self.url_prepare.clone(), Method::POST, label)?; + Ok(MetaRequestSender::new( + self.client.clone(), + request, + self.fe_host.clone(), + )) + } + + pub fn build_commit_request_sender(&self, label: String) -> Result { + let request = self.build_request(self.url_commit.clone(), Method::POST, label)?; + Ok(MetaRequestSender::new( + self.client.clone(), + request, + self.fe_host.clone(), + )) + } + + pub fn build_rollback_request_sender(&self, label: String) -> Result { + let request = self.build_request(self.url_rollback.clone(), Method::POST, label)?; + Ok(MetaRequestSender::new( + self.client.clone(), + request, + self.fe_host.clone(), + )) + } + + pub async fn build_txn_inserter(&self, label: String) -> Result { + let request = self.build_request(self.url_load.clone(), Method::PUT, label)?; + let mut request_for_redirection = request + .try_clone() + .ok_or_else(|| SinkError::DorisStarrocksConnect(anyhow!("Can't clone request")))?; + + let resp = self + .client + .execute(request) + .await + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?; + + let be_url = try_get_be_url(&resp, self.fe_host.clone())? + .ok_or_else(|| SinkError::DorisStarrocksConnect(anyhow!("Can't get doris BE url",)))?; + *request_for_redirection.url_mut() = be_url; + + let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); + let body = Body::wrap_stream( + tokio_stream::wrappers::UnboundedReceiverStream::new(receiver).map(Ok::<_, Infallible>), + ); + *request_for_redirection.body_mut() = Some(body); + + let client = self.client.clone(); + let handle: JoinHandle>> = tokio::spawn(async move { + let response = client + .execute(request_for_redirection) + .await + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?; + + let status = response.status(); + let raw = response + .bytes() + .await + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))? + .into(); + + if status == StatusCode::OK { + Ok(raw) + } else { + let response_body = String::from_utf8(raw).map_err(|err| { + SinkError::DorisStarrocksConnect( + anyhow!(err).context("failed to parse response body"), + ) + })?; + Err(SinkError::DorisStarrocksConnect(anyhow!( + "Failed connection {:?},{:?}", + status, + response_body + ))) + } + }); + Ok(InserterInner::new( + sender, + handle, + self.stream_load_http_timeout, + )) + } +} diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs index 994e4e43d817..18a89d09b392 100644 --- a/src/connector/src/sink/starrocks.rs +++ b/src/connector/src/sink/starrocks.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::HashMap; +use std::num::NonZeroU64; use std::sync::Arc; use anyhow::anyhow; @@ -23,28 +24,44 @@ use mysql_async::Opts; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; +use risingwave_common::session_config::sink_decouple::SinkDecouple; use risingwave_common::types::DataType; +use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized; +use risingwave_pb::connector_service::sink_metadata::SerializedMetadata; +use risingwave_pb::connector_service::SinkMetadata; use serde::Deserialize; use serde_derive::Serialize; use serde_json::Value; -use serde_with::serde_as; +use serde_with::{serde_as, DisplayFromStr}; use thiserror_ext::AsReport; +use tokio::task::JoinHandle; +use url::form_urlencoded; use with_options::WithOptions; use super::doris_starrocks_connector::{ - HeaderBuilder, InserterInner, InserterInnerBuilder, DORIS_SUCCESS_STATUS, STARROCKS_DELETE_SIGN, + HeaderBuilder, InserterInner, StarrocksTxnRequestBuilder, STARROCKS_DELETE_SIGN, + STARROCKS_SUCCESS_STATUS, }; use super::encoder::{JsonEncoder, RowEncoder}; -use super::writer::LogSinkerOf; -use super::{SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; -use crate::sink::writer::SinkWriterExt; -use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam}; +use super::{ + SinkCommitCoordinator, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, + SINK_TYPE_UPSERT, +}; +use crate::deserialize_optional_u64_from_string; +use crate::sink::catalog::desc::SinkDesc; +use crate::sink::coordinate::CoordinatedSinkWriter; +use crate::sink::decouple_checkpoint_log_sink::DecoupleCheckpointLogSinkerOf; +use crate::sink::{Result, Sink, SinkWriter, SinkWriterParam}; pub const STARROCKS_SINK: &str = "starrocks"; const STARROCK_MYSQL_PREFER_SOCKET: &str = "false"; const STARROCK_MYSQL_MAX_ALLOWED_PACKET: usize = 1024; const STARROCK_MYSQL_WAIT_TIMEOUT: usize = 28800; +const fn _default_stream_load_http_timeout_ms() -> u64 { + 30 * 1000 +} + #[derive(Deserialize, Debug, Clone, WithOptions)] pub struct StarrocksCommon { /// The `StarRocks` host address. @@ -68,8 +85,6 @@ pub struct StarrocksCommon { /// The `StarRocks` table you want to sink data to. #[serde(rename = "starrocks.table")] pub table: String, - #[serde(rename = "starrocks.partial_update")] - pub partial_update: Option, } #[serde_as] @@ -78,8 +93,29 @@ pub struct StarrocksConfig { #[serde(flatten)] pub common: StarrocksCommon, + /// The timeout in milliseconds for stream load http request, defaults to 10 seconds. + #[serde( + rename = "starrocks.stream_load.http.timeout.ms", + default = "_default_stream_load_http_timeout_ms" + )] + #[serde_as(as = "DisplayFromStr")] + pub stream_load_http_timeout_ms: u64, + + /// Set this option to a positive integer n, RisingWave will try to commit data + /// to Starrocks at every n checkpoints by leveraging the + /// [StreamLoad Transaction API](https://docs.starrocks.io/docs/loading/Stream_Load_transaction_interface/), + /// also, in this time, the `sink_decouple` option should be enabled as well. + /// Defaults to 1 if commit_checkpoint_interval <= 0 + #[serde(default, deserialize_with = "deserialize_optional_u64_from_string")] + pub commit_checkpoint_interval: Option, + + /// Enable partial update + #[serde(rename = "starrocks.partial_update")] + pub partial_update: Option, + pub r#type: String, // accept "append-only" or "upsert" } + impl StarrocksConfig { pub fn from_hashmap(properties: HashMap) -> Result { let config = @@ -93,12 +129,18 @@ impl StarrocksConfig { SINK_TYPE_UPSERT ))); } + if config.commit_checkpoint_interval == Some(0) { + return Err(SinkError::Config(anyhow!( + "commit_checkpoint_interval must be greater than 0" + ))); + } Ok(config) } } #[derive(Debug)] pub struct StarrocksSink { + param: SinkParam, pub config: StarrocksConfig, schema: Schema, pk_indices: Vec, @@ -106,13 +148,11 @@ pub struct StarrocksSink { } impl StarrocksSink { - pub fn new( - config: StarrocksConfig, - schema: Schema, - pk_indices: Vec, - is_append_only: bool, - ) -> Result { + pub fn new(param: SinkParam, config: StarrocksConfig, schema: Schema) -> Result { + let pk_indices = param.downstream_pk.clone(); + let is_append_only = param.sink_type.is_append_only(); Ok(Self { + param, config, schema, pk_indices, @@ -211,19 +251,33 @@ impl StarrocksSink { } impl Sink for StarrocksSink { - type Coordinator = DummySinkCommitCoordinator; - type LogSinker = LogSinkerOf; + type Coordinator = StarrocksSinkCommitter; + type LogSinker = DecoupleCheckpointLogSinkerOf>; const SINK_NAME: &'static str = STARROCKS_SINK; - async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { - Ok(StarrocksSinkWriter::new( - self.config.clone(), - self.schema.clone(), - self.pk_indices.clone(), - self.is_append_only, - )? - .into_log_sinker(writer_param.sink_metrics)) + fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { + let config_decouple = if let Some(interval) = + desc.properties.get("commit_checkpoint_interval") + && interval.parse::().unwrap_or(0) > 1 + { + true + } else { + false + }; + + match user_specified { + SinkDecouple::Default => Ok(config_decouple), + SinkDecouple::Disable => { + if config_decouple { + return Err(SinkError::Config(anyhow!( + "config conflict: StarRocks config `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled" + ))); + } + Ok(false) + } + SinkDecouple::Enable => Ok(true), + } } async fn validate(&self) -> Result<()> { @@ -263,16 +317,78 @@ impl Sink for StarrocksSink { self.check_column_name_and_type(starrocks_columns_desc)?; Ok(()) } + + async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { + let commit_checkpoint_interval = + NonZeroU64::new(self.config.commit_checkpoint_interval.unwrap_or(1)).expect( + "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation", + ); + + let inner = StarrocksSinkWriter::new( + self.config.clone(), + self.schema.clone(), + self.pk_indices.clone(), + self.is_append_only, + writer_param.executor_id, + )?; + let writer = CoordinatedSinkWriter::new( + writer_param + .meta_client + .expect("should have meta client") + .sink_coordinate_client() + .await, + self.param.clone(), + writer_param.vnode_bitmap.ok_or_else(|| { + SinkError::Remote(anyhow!( + "sink needs coordination should not have singleton input" + )) + })?, + inner, + ) + .await?; + + Ok(DecoupleCheckpointLogSinkerOf::new( + writer, + writer_param.sink_metrics, + commit_checkpoint_interval, + )) + } + + async fn new_coordinator(&self) -> Result { + let header = HeaderBuilder::new() + .add_common_header() + .set_user_password( + self.config.common.user.clone(), + self.config.common.password.clone(), + ) + .set_db(self.config.common.database.clone()) + .set_table(self.config.common.table.clone()) + .build(); + + let txn_request_builder = StarrocksTxnRequestBuilder::new( + format!( + "http://{}:{}", + self.config.common.host, self.config.common.http_port + ), + header, + self.config.stream_load_http_timeout_ms, + )?; + Ok(StarrocksSinkCommitter { + client: Arc::new(StarrocksTxnClient::new(txn_request_builder)), + }) + } } pub struct StarrocksSinkWriter { pub config: StarrocksConfig, schema: Schema, pk_indices: Vec, - inserter_innet_builder: InserterInnerBuilder, is_append_only: bool, client: Option, + txn_client: StarrocksTxnClient, row_encoder: JsonEncoder, + executor_id: u64, + curr_txn_label: Option, } impl TryFrom for StarrocksSink { @@ -280,13 +396,8 @@ impl TryFrom for StarrocksSink { fn try_from(param: SinkParam) -> std::result::Result { let schema = param.schema(); - let config = StarrocksConfig::from_hashmap(param.properties)?; - StarrocksSink::new( - config, - schema, - param.downstream_pk, - param.sink_type.is_append_only(), - ) + let config = StarrocksConfig::from_hashmap(param.properties.clone())?; + StarrocksSink::new(param, config, schema) } } @@ -296,6 +407,7 @@ impl StarrocksSinkWriter { schema: Schema, pk_indices: Vec, is_append_only: bool, + executor_id: u64, ) -> Result { let mut fields_name = schema.names_str(); if !is_append_only { @@ -306,24 +418,28 @@ impl StarrocksSinkWriter { .add_common_header() .set_user_password(config.common.user.clone(), config.common.password.clone()) .add_json_format() - .set_partial_update(config.common.partial_update.clone()) + .set_partial_update(config.partial_update.clone()) .set_columns_name(fields_name) + .set_db(config.common.database.clone()) + .set_table(config.common.table.clone()) .build(); - let starrocks_insert_builder = InserterInnerBuilder::new( + let txn_request_builder = StarrocksTxnRequestBuilder::new( format!("http://{}:{}", config.common.host, config.common.http_port), - config.common.database.clone(), - config.common.table.clone(), header, + config.stream_load_http_timeout_ms, )?; + Ok(Self { config, schema: schema.clone(), pk_indices, - inserter_innet_builder: starrocks_insert_builder, is_append_only, client: None, + txn_client: StarrocksTxnClient::new(txn_request_builder), row_encoder: JsonEncoder::new_with_starrocks(schema, None), + executor_id, + curr_txn_label: None, }) } @@ -403,14 +519,46 @@ impl StarrocksSinkWriter { } Ok(()) } + + /// Generating a new transaction label, should be unique across all `SinkWriters` even under rewinding. + #[inline(always)] + fn new_txn_label(&self) -> String { + format!( + "rw-txn-{}-{}", + self.executor_id, + chrono::Utc::now().timestamp_micros() + ) + } } #[async_trait] impl SinkWriter for StarrocksSinkWriter { + type CommitMetadata = Option; + + async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> { + Ok(()) + } + async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { + // We check whether start a new transaction in `write_batch`. Therefore, if no data has been written + // within the `commit_checkpoint_interval` period, no meta requests will be made. Otherwise if we request + // `prepare` against an empty transaction, the `StarRocks` will report a `hasn't send any data yet` error. + if self.curr_txn_label.is_none() { + let txn_label = self.new_txn_label(); + tracing::debug!(?txn_label, "begin transaction"); + let txn_label_res = self.txn_client.begin(txn_label.clone()).await?; + assert_eq!( + txn_label, txn_label_res, + "label responding from StarRocks: {} differ from generated one: {}", + txn_label, txn_label_res + ); + self.curr_txn_label = Some(txn_label.clone()); + } if self.client.is_none() { + let txn_label = self.curr_txn_label.clone(); + assert!(txn_label.is_some(), "transaction label is none during load"); self.client = Some(StarrocksClient::new( - self.inserter_innet_builder.build().await?, + self.txn_client.load(txn_label.unwrap()).await?, )); } if self.is_append_only { @@ -420,22 +568,45 @@ impl SinkWriter for StarrocksSinkWriter { } } - async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> { - Ok(()) - } - - async fn abort(&mut self) -> Result<()> { - Ok(()) - } - - async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> { + async fn barrier(&mut self, is_checkpoint: bool) -> Result> { if self.client.is_some() { + // Here we finish the `/api/transaction/load` request when a barrier is received. Therefore, + // one or more load requests should be made within one commit_checkpoint_interval period. + // StarRocks will take care of merging those splits into a larger one during prepare transaction. + // Thus, only one version will be produced when the transaction is committed. See Stream Load + // transaction interface for more information. let client = self .client .take() .ok_or_else(|| SinkError::Starrocks("Can't find starrocks inserter".to_string()))?; client.finish().await?; } + + if is_checkpoint { + if self.curr_txn_label.is_some() { + let txn_label = self.curr_txn_label.take().unwrap(); + tracing::debug!(?txn_label, "prepare transaction"); + let txn_label_res = self.txn_client.prepare(txn_label.clone()).await?; + assert_eq!( + txn_label, txn_label_res, + "label responding from StarRocks differs from the current one" + ); + Ok(Some(StarrocksWriteResult(Some(txn_label)).try_into()?)) + } else { + // no data was written within previous epoch + Ok(Some(StarrocksWriteResult(None).try_into()?)) + } + } else { + Ok(None) + } + } + + async fn abort(&mut self) -> Result<()> { + if self.curr_txn_label.is_some() { + let txn_label = self.curr_txn_label.take().unwrap(); + tracing::debug!(?txn_label, "rollback transaction"); + self.txn_client.rollback(txn_label).await?; + } Ok(()) } @@ -459,6 +630,11 @@ impl StarrocksSchemaClient { user: String, password: String, ) -> Result { + // username & password may contain special chars, so we need to do URL encoding on them. + // Otherwise, Opts::from_url may report a `Parse error` + let user = form_urlencoded::byte_serialize(user.as_bytes()).collect::(); + let password = form_urlencoded::byte_serialize(password.as_bytes()).collect::(); + let conn_uri = format!( "mysql://{}:{}@{}:{}/{}?prefer_socket={}&max_allowed_packet={}&wait_timeout={}", user, @@ -472,12 +648,12 @@ impl StarrocksSchemaClient { ); let pool = mysql_async::Pool::new( Opts::from_url(&conn_uri) - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?, + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?, ); let conn = pool .get_conn() .await - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?; Ok(Self { table, db, conn }) } @@ -490,7 +666,7 @@ impl StarrocksSchemaClient { query_map.insert(column_name, column_type) }) .await - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?; Ok(query_map) } @@ -502,7 +678,7 @@ impl StarrocksSchemaClient { (table_model, primary_key) }) .await - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))? + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))? .first() .ok_or_else(|| { SinkError::Starrocks(format!( @@ -518,35 +694,39 @@ impl StarrocksSchemaClient { #[derive(Debug, Serialize, Deserialize)] pub struct StarrocksInsertResultResponse { #[serde(rename = "TxnId")] - txn_id: i64, + pub txn_id: Option, + #[serde(rename = "Seq")] + pub seq: Option, #[serde(rename = "Label")] - label: String, + pub label: Option, #[serde(rename = "Status")] - status: String, + pub status: String, #[serde(rename = "Message")] - message: String, + pub message: String, #[serde(rename = "NumberTotalRows")] - number_total_rows: i64, + pub number_total_rows: Option, #[serde(rename = "NumberLoadedRows")] - number_loaded_rows: i64, + pub number_loaded_rows: Option, #[serde(rename = "NumberFilteredRows")] - number_filtered_rows: i32, + pub number_filtered_rows: Option, #[serde(rename = "NumberUnselectedRows")] - number_unselected_rows: i32, + pub number_unselected_rows: Option, #[serde(rename = "LoadBytes")] - load_bytes: i64, + pub load_bytes: Option, #[serde(rename = "LoadTimeMs")] - load_time_ms: i32, + pub load_time_ms: Option, #[serde(rename = "BeginTxnTimeMs")] - begin_txn_time_ms: i32, + pub begin_txn_time_ms: Option, #[serde(rename = "ReadDataTimeMs")] - read_data_time_ms: i32, + pub read_data_time_ms: Option, #[serde(rename = "WriteDataTimeMs")] - write_data_time_ms: i32, + pub write_data_time_ms: Option, #[serde(rename = "CommitAndPublishTimeMs")] - commit_and_publish_time_ms: i32, + pub commit_and_publish_time_ms: Option, #[serde(rename = "StreamLoadPlanTimeMs")] - stream_load_plan_time_ms: Option, + pub stream_load_plan_time_ms: Option, + #[serde(rename = "ExistingJobStatus")] + pub existing_job_status: Option, } pub struct StarrocksClient { @@ -565,9 +745,9 @@ impl StarrocksClient { pub async fn finish(self) -> Result { let raw = self.insert.finish().await?; let res: StarrocksInsertResultResponse = serde_json::from_slice(&raw) - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?; - if !DORIS_SUCCESS_STATUS.contains(&res.status.as_str()) { + if !STARROCKS_SUCCESS_STATUS.contains(&res.status.as_str()) { return Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!( "Insert error: {:?}", res.message, @@ -576,3 +756,140 @@ impl StarrocksClient { Ok(res) } } + +pub struct StarrocksTxnClient { + request_builder: StarrocksTxnRequestBuilder, +} + +impl StarrocksTxnClient { + pub fn new(request_builder: StarrocksTxnRequestBuilder) -> Self { + Self { request_builder } + } + + fn check_response_and_extract_label(&self, res: Bytes) -> Result { + let res: StarrocksInsertResultResponse = serde_json::from_slice(&res) + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?; + if !STARROCKS_SUCCESS_STATUS.contains(&res.status.as_str()) { + return Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!( + "transaction error: {:?}", + res.message, + ))); + } + res.label.ok_or_else(|| { + SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't get label from response")) + }) + } + + pub async fn begin(&self, label: String) -> Result { + let res = self + .request_builder + .build_begin_request_sender(label)? + .send() + .await?; + self.check_response_and_extract_label(res) + } + + pub async fn prepare(&self, label: String) -> Result { + let res = self + .request_builder + .build_prepare_request_sender(label)? + .send() + .await?; + self.check_response_and_extract_label(res) + } + + pub async fn commit(&self, label: String) -> Result { + let res = self + .request_builder + .build_commit_request_sender(label)? + .send() + .await?; + self.check_response_and_extract_label(res) + } + + pub async fn rollback(&self, label: String) -> Result { + let res = self + .request_builder + .build_rollback_request_sender(label)? + .send() + .await?; + self.check_response_and_extract_label(res) + } + + pub async fn load(&self, label: String) -> Result { + self.request_builder.build_txn_inserter(label).await + } +} + +struct StarrocksWriteResult(Option); + +impl TryFrom for SinkMetadata { + type Error = SinkError; + + fn try_from(value: StarrocksWriteResult) -> std::result::Result { + match value.0 { + Some(label) => { + let metadata = label.into_bytes(); + Ok(SinkMetadata { + metadata: Some(Serialized(SerializedMetadata { metadata })), + }) + } + None => Ok(SinkMetadata { metadata: None }), + } + } +} + +impl TryFrom for StarrocksWriteResult { + type Error = SinkError; + + fn try_from(value: SinkMetadata) -> std::result::Result { + if let Some(Serialized(v)) = value.metadata { + Ok(StarrocksWriteResult(Some( + String::from_utf8(v.metadata) + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?, + ))) + } else { + Ok(StarrocksWriteResult(None)) + } + } +} + +pub struct StarrocksSinkCommitter { + client: Arc, +} + +#[async_trait::async_trait] +impl SinkCommitCoordinator for StarrocksSinkCommitter { + async fn init(&mut self) -> Result<()> { + tracing::info!("Starrocks commit coordinator inited."); + Ok(()) + } + + async fn commit(&mut self, epoch: u64, metadata: Vec) -> Result<()> { + let write_results = metadata + .into_iter() + .map(TryFrom::try_from) + .collect::>>()?; + + let txn_labels = write_results + .into_iter() + .filter_map(|v| v.0) + .collect::>(); + + tracing::debug!(?epoch, ?txn_labels, "commit transaction"); + + if !txn_labels.is_empty() { + let join_handles = txn_labels + .into_iter() + .map(|txn_label| { + let client = self.client.clone(); + tokio::spawn(async move { client.commit(txn_label).await }) + }) + .collect::>>>(); + futures::future::try_join_all(join_handles) + .await + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?; + } + Ok(()) + } +} diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index a879cd74f1c4..2324d03a0e1d 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -109,6 +109,7 @@ DeltaLakeConfig: required: false - name: commit_checkpoint_interval field_type: u64 + comments: Commit every n(>0) checkpoints, if n is not set, we will commit every checkpoint. required: false default: Default::default - name: r#type @@ -730,8 +731,19 @@ StarrocksConfig: field_type: String comments: The `StarRocks` table you want to sink data to. required: true + - name: starrocks.stream_load.http.timeout.ms + field_type: u64 + comments: The timeout in milliseconds for stream load http request, defaults to 10 seconds. + required: false + default: 30 * 1000 + - name: commit_checkpoint_interval + field_type: u64 + comments: Set this option to a positive integer n, RisingWave will try to commit data to Starrocks at every n checkpoints by leveraging the [StreamLoad Transaction API](https://docs.starrocks.io/docs/loading/Stream_Load_transaction_interface/), also, in this time, the `sink_decouple` option should be enabled as well. Defaults to 1 if commit_checkpoint_interval <= 0 + required: false + default: Default::default - name: starrocks.partial_update field_type: String + comments: Enable partial update required: false - name: r#type field_type: String