-
Notifications
You must be signed in to change notification settings - Fork 591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sink): minor improvements for starrocks sink #17321
Conversation
about 1,We support changelog to implement the generic olap class upsert. |
I have recently noticed the changelog feature which should be the preferred approach for custom upserting. I can revert |
cc @hzxa21 |
#17132 has merged, can you restore the starrocks.stream_load.delete_sign.* code and try to use 17132 for the upsert? |
First of all, thanks for providing feedback and contributions. In terms of your question, I prefer leveraging #17132 to do upsert in starrocks aggregation and unique tables because:
Here is an example on top of my mind on how to leverage #17132 to solve the issue:
|
For these two improvements, feel free to open a separate PR for them. |
@hzxa21 Thanks for the detailed explanation👍. The |
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
This PR adds 3 minor improvements for StarRocks sink
1. Support upserting
Aggregation
andUnique
modelsWe currently only support upserting the
Primary Key
model, which is sufficient in most cases. However, thePrimary Key
model does not allow the table's primary key to exceed 128 bytes, which may be too restrictive in some cases. For example, in our environment, there are 10+ tables whose primary key exceed 128 bytes, a typical case is a MV's primary key is composed of 4 uuid strings, resulting in a primary key size of 144 bytes. As a result, we cannot sink these tables into StarRocks within RisingWave. The adhoc workaround is we sink these tables to kafka, and then we use flink to consume these topics and sink them to StarRocks.Below is a demonstration of a unique model table
demo_unique
, with its primary key consisting of (pk1, pk2, pk3, pk4). The op field signifies the record's current operation, for example,u
stands forupsert
andd
stands fordelete
.Under this PR, we can sink this MV to StarRocks directly, with no additional workaround needed:
Notes
delete_sign_field
is because:starrocks.stream_load.delete_sign.field
is__op
which is the reserved keyword in StarRocks, and StarRocks by default prohibits creating a table with an__op
field.delete_sign
field name or values. As mentioned above, thedelete_sign_field
of tabledemo_unique
isop
, where the valueu
stands forupsert
andd
stands fordelete
. With this ability, users can easily migrate their jobs to RisingWave without changing their queries to adapt to RisingWave.starrocks.stream_load.delete_sign.*
are only allowed when the StarRocks table's model isUnique
orAggregation
in upsert mode.2. Allow
starrocks.host
to be an FE followerThis may be a bug fix because the current implementation assumes that
stream load
requests made tostarrocks.host
will in turn redirect to a BE. This assumption is only makes sense if the address denoted bystarrocks.host
is an FE leader. However, if the address ofstarrocks.host
is an FE follower, the follower will redirect the request to the FE leader not a BE. In fact, the FE leader can change over time. For example, when the StarRocks cluster restarts, the previous leader may become a follower. we found this problem recently because after we upgraded the StarRocks cluster, and the StarRocks sink kept failing due to the FE leader change. Please see the comments ofsend_stream_load_request
for more details.3. quoting the column names in
stream load
http request headerIf the field name of the StarRocks table is a keyword, for example:
The
order
field is a keyword, if we sendstream load
request without quoting it, the StarRocks will respond with an error.Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.