-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sink): decouple starrocks commit from risingwave commit #16816
feat(sink): decouple starrocks commit from risingwave commit #16816
Conversation
Co-authored-by: TennyZhuang <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM
let mut parsed_be_url = | ||
Url::parse(&be_url).map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; | ||
|
||
if fe_host != LOCALHOST && fe_host != LOCALHOST_IP { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, actually any IP of 127.
refers to localhost i.e. loop-back. Does SR guarantee to return 127.0.0.1
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This snippet is actually borrowed from previous version 😅。I think it may related to solve some starrocks cluster network issues that deployed for testing, which may redirect BE to a localhost address.
risingwave/src/connector/src/sink/doris_starrocks_connector.rs
Lines 235 to 241 in 5dc3861
if be_host == LOCALHOST || be_host == LOCALHOST_IP { | |
// if be host is 127.0.0.1, we may can't connect to it directly, | |
// so replace it with fe host | |
parsed_be_url | |
.set_host(Some(self.fe_host.as_str())) | |
.map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; | |
be_url = parsed_be_url.as_str().into(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related PR #16018
let be_url = try_get_be_url(&resp, self.fe_host.clone())?.ok_or_else(|| { | ||
SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't get doris BE url",)) | ||
})?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should use the error returned by try_get_be_url
, rather than anyhow!
let be_url = try_get_be_url(&resp, self.fe_host.clone())?.ok_or_else(|| { | |
SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't get doris BE url",)) | |
})?; | |
let be_url = try_get_be_url(&resp, self.fe_host.clone())?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, the returned error should not be wrapped twice.
try_get_be_url
will return a Result<Option<Url>>
, returning Ok(None)
indicates this request does not redirect. For exmaple, /api/transaction/commit
interface sometimes will not redirect to BE. In this case, commit
request should be treated as finished. But other interfaces /api/transaction/begin
, /api/transaction/load
and /api/db/table/_stream_load
(now only used in Doris sink) are expected redirect to BE. ok_or_else
is used to handle this case where no redirection is considered an error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And I don't find any documents in starrocks that describe BE redirection behavior. we cannot make sure in which situation the FE should redirect to BE or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, if reqwest
supports auto redirection without removing sensitive headers like Authorization
, we may make our code more elegant.
let be_url = try_get_be_url(&resp, self.fe_host.clone())?.ok_or_else(|| { | ||
SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't get doris BE url",)) | ||
})?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto.
let be_url = try_get_be_url(&resp, self.fe_host.clone())?.ok_or_else(|| { | |
SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't get doris BE url",)) | |
})?; | |
let be_url = try_get_be_url(&resp, self.fe_host.clone())?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By the way, why not do try_get_be_url()
when the connector initialize and use the URL in later?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is because we send requests to FE, FE acts like a balancer and will redirect the request to a BE based on its internal balance algorithm, so we need to get the BE url for each request.
String::from_utf8(raw) | ||
.map_err(|err| SinkError::DorisStarrocksConnect(err.into()))? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Internal err
was lost. Please either pass the err as source
(recommended) or use err.as_report()
to get the proper error message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just send an access request for this doc, please check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The owner of the doc is vacationing. Let me export it.
A_Guide_to_Error_Handling_that_Just_Works_(Part_I).pdf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have replaced SinkError::DorisStarrocksConnect(err.into())
with SinkError::DorisStarrocksConnect(anyhow!(err))
to keep the uniformity with other sinks.
Should we run doris sink e2e test as well?, There're some refactors made to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
…chee/risingwave into decouple-starrocks-commit-v2
@ly9chee Hi, shall we merge this PR? |
@StrikeW |
e01eb2e
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
The current StarRocks sink implementation using
StreamLoad
API writes data to StarRocks. It works well when the upstream does not change frequently. However, if the upstream changes frequently, many versions of the table being written to are generated, which will impact the query performance and bring significant pressure to theBE Compactor
.Implementation details
This PR attempts to solve this problem by leveraging the Starrocks Stream Load Transaction Interface. The idea is inspired by RFC Support sink coordinator and RFC Decouple iceberg sink commit from risingwave checkpoint. Despite these two RFCs being designed for data lakes, but after doing some research, I found OLAP System like StarRocks can apply the same pattern as described in these RFCs as well, i.e., Changing the current
StreamLoad
to theStream Load Transaction Interface
.As the doc says, after a transaction is created, we can write multiple mini-batches to the transaction through
/api/transaction/load
interface. StarRocks will take care of merging these batches into a larger one when the/api/transaction/commit
is called. So inSinkWriter
implementation, we can write data to the transaction at every barrier, but only commit the transaction when a checkpoint barrier is received, which is delayed bycommit_checkpoint_interval
. In fact, thecommit
is requested after the coordinator collects the metadata from eachSinkWriter
.Assume the barrier interval is 1 second and the checkpoint frequency is 1. The current sink implementation will commit data to StarRocks at every barrier, thus producing a version every second. But this PR will commit data to StarRocks only when the user-specified
commit_checkpoint_interval
is reached. If we setcommit_checkpoint_interval
to 10, a version is created roughly every 10 seconds.Minor refactors
mysql_conn_uri
to allow special characters to be included.StarrocksCommon.partial_update
option has been moved up toStarrocksConfig
, which is a sink-only option.Simple benchmark on my local environment
Sinking 5 million rows to StarRocks
Current implementation, parallelism=4
This PR, parallelism=4, commit_checkpoint_interval=10
We can see that the
Version
has reduced from 352 to 45Notes
commit_checkpoint_interval
period due to thebegin
,prepare
andcommit
transaction meta requests being made.commit_checkpoint_interval
should not exceed FE's config parameterstream_load_default_timeout_second
; otherwise, the transaction will timeout and the sink will fail repeatedly.commit_checkpoint_interval
to 1, which is the default.information_schema.tables_config
, which was introduced in version 2.5.commit_checkpoint_interval
to a higher value, we sacrifice the freshness. BTW, we can further reduce the versions by lowering the sink parallelism as well.This PR's code is base on #16777, because we all used
DecoupleCheckpointLogSinkerOf
.Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.