Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): minor improvements for starrocks sink #17321

Closed
wants to merge 3 commits into from

Conversation

ly9chee
Copy link
Contributor

@ly9chee ly9chee commented Jun 18, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

This PR adds 3 minor improvements for StarRocks sink

1. Support upserting Aggregation and Unique models

We currently only support upserting the Primary Key model, which is sufficient in most cases. However, the Primary Key model does not allow the table's primary key to exceed 128 bytes, which may be too restrictive in some cases. For example, in our environment, there are 10+ tables whose primary key exceed 128 bytes, a typical case is a MV's primary key is composed of 4 uuid strings, resulting in a primary key size of 144 bytes. As a result, we cannot sink these tables into StarRocks within RisingWave. The adhoc workaround is we sink these tables to kafka, and then we use flink to consume these topics and sink them to StarRocks.

Below is a demonstration of a unique model table demo_unique, with its primary key consisting of (pk1, pk2, pk3, pk4). The op field signifies the record's current operation, for example, u stands for upsert and d stands for delete.

pk1 pk2 pk3 pk4 op
7e672c80-6d31-45d4-84ce-5353f332d6e8 87202e74-b020-4676-b13f-2d87e06a114e 852045ec-3605-4adc-86a5-8f20a0d2e155 706cd645-ad43-42a6-9bae-5490a64472d7 u
9c5b79d6-49b6-4f10-ac5c-1f359c1a4be4 28556f11-7f26-49b2-a095-5b82326aef8c b2aedcca-122e-4b57-a1f9-bdea5b60d642 a1272ee7-5a90-4d8d-8b7c-c61fe4ec4f9c d

Under this PR, we can sink this MV to StarRocks directly, with no additional workaround needed:

CREATE SINK demo_unique_sink
FROM demo_upsert WITH (
    connector = 'starrocks',
    type = 'upsert',
    starrocks.host = 'starrocks-fe',
    starrocks.mysqlport = '9030',
    starrocks.httpport = '8030',
    starrocks.user = 'root',
    starrocks.password = '123456',
    starrocks.database = 'demo',
    starrocks.table = 'demo_unique',
    primary_key='pk1,pk2,pk3,pk4',
    starrocks.stream_load.delete_sign.field='op',
    starrocks.stream_load.delete_sign.upsert='u',
    starrocks.stream_load.delete_sign.delete='d',
    commit_checkpoint_interval='10'
);

Notes

  • The reason we allow users to specify the delete_sign_field is because:
    1. The default value of starrocks.stream_load.delete_sign.field is __op which is the reserved keyword in StarRocks, and StarRocks by default prohibits creating a table with an __op field.
    2. Users who want to migrate their ETL workflow to RisingWave may have their specific delete_sign field name or values. As mentioned above, the delete_sign_field of table demo_unique is op, where the value u stands for upsert and d stands for delete. With this ability, users can easily migrate their jobs to RisingWave without changing their queries to adapt to RisingWave.
  • Options related to starrocks.stream_load.delete_sign.* are only allowed when the StarRocks table's model is Unique or Aggregation in upsert mode.

2. Allow starrocks.host to be an FE follower

This may be a bug fix because the current implementation assumes that stream load requests made to starrocks.host will in turn redirect to a BE. This assumption is only makes sense if the address denoted by starrocks.host is an FE leader. However, if the address of starrocks.host is an FE follower, the follower will redirect the request to the FE leader not a BE. In fact, the FE leader can change over time. For example, when the StarRocks cluster restarts, the previous leader may become a follower. we found this problem recently because after we upgraded the StarRocks cluster, and the StarRocks sink kept failing due to the FE leader change. Please see the comments of send_stream_load_request for more details.

3. quoting the column names in stream load http request header

If the field name of the StarRocks table is a keyword, for example:

create table demo(
    `id` bigint not null,
    `order` int not null
)
unique key(`id`)
distributed by hash(`id`);

The order field is a keyword, if we send stream load request without quoting it, the StarRocks will respond with an error.

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

@xxhZs
Copy link
Contributor

xxhZs commented Jun 21, 2024

about 1,We support changelog to implement the generic olap class upsert.
#17132
Preference for changelog over customized implementation of upserts

@ly9chee
Copy link
Contributor Author

ly9chee commented Jun 24, 2024

about 1,We support changelog to implement the generic olap class upsert. #17132 Preference for changelog over customized implementation of upserts

I have recently noticed the changelog feature which should be the preferred approach for custom upserting. I can revert starrocks.stream_load.delete_sign.* related code or we can leave it until the changelog feature becomes stable and mark them deprecated. Either option is acceptable to me. What do you think?

@xxhZs
Copy link
Contributor

xxhZs commented Jun 24, 2024

about 1,We support changelog to implement the generic olap class upsert. #17132 Preference for changelog over customized implementation of upserts

I have recently noticed the changelog feature which should be the preferred approach for custom upserting. I can revert starrocks.stream_load.delete_sign.* related code or we can leave it until the changelog feature becomes stable and mark them deprecated. Either option is acceptable to me. What do you think?

cc @hzxa21

@xxhZs
Copy link
Contributor

xxhZs commented Jun 28, 2024

#17132 has merged, can you restore the starrocks.stream_load.delete_sign.* code and try to use 17132 for the upsert?

@hzxa21
Copy link
Collaborator

hzxa21 commented Jun 28, 2024

about 1,We support changelog to implement the generic olap class upsert. #17132 Preference for changelog over customized implementation of upserts

I have recently noticed the changelog feature which should be the preferred approach for custom upserting. I can revert starrocks.stream_load.delete_sign.* related code or we can leave it until the changelog feature becomes stable and mark them deprecated. Either option is acceptable to me. What do you think?

First of all, thanks for providing feedback and contributions. In terms of your question, I prefer leveraging #17132 to do upsert in starrocks aggregation and unique tables because:

  1. Based on the doc, strarocks only supports upsert/delete for primary key table. That means supporting upsert/delete for aggregation and unique table is a application level customization because it happens outside of starrocks and requires the read path to be modified accordingly to filter out the deletion via the user-defined op column. Given that the changelog feature also provides the flexibility for user to transform upsert stream into an append-only stream with a customized op column, I think it is better to leverge that instead of having logic invading into starrocks sink.
  2. One of the motivations of feat(steam): support stream change log  #17132 is exactly to solve the case you mentioned above for starrocks. Actually, your feedback from the community channel provides a concrete and valid use case that drives us from developing feat(steam): support stream change log  #17132.
  3. feat(steam): support stream change log  #17132 is merged and will be officially released in v1.11). Feel free to try it out and give us early feedback. We can continue improve that for sure.

Here is an example on top of my mind on how to leverage #17132 to solve the issue:

CREATE SINK demo_unique_sink AS
WITH demo_upsert_changelog AS CHANGELOG FROM demo_upsert
SELECT * EXCEPT(changelog_op ), 
            CASE changelog_op 
                WHEN 1 THEN NULL, 
                WHEN 2 THEN 'd',
                WHEN 3 THEN 'u'
                WHEN 4 THEN 'd'
                ELSE 'unknown' 
            END as op
FROM demo_upsert_changelog WITH (
    connector = 'starrocks',
    type = 'append-only',
    starrocks.host = 'starrocks-fe',
    starrocks.mysqlport = '9030',
    starrocks.httpport = '8030',
    starrocks.user = 'root',
    starrocks.password = '123456',
    starrocks.database = 'demo',
    starrocks.table = 'demo_unique',
    primary_key='pk1,pk2,pk3,pk4',
    commit_checkpoint_interval='10'
);

@hzxa21
Copy link
Collaborator

hzxa21 commented Jun 28, 2024

  1. Allow starrocks.host to be an FE follower
  2. quoting the column names in stream load http request header

For these two improvements, feel free to open a separate PR for them.

@ly9chee
Copy link
Contributor Author

ly9chee commented Jun 28, 2024

@hzxa21 Thanks for the detailed explanation👍. The changelog feature looks exciting, and I will raise a separated PR soon.

@hzxa21
Copy link
Collaborator

hzxa21 commented Jul 8, 2024

@hzxa21 Thanks for the detailed explanation👍. The changelog feature looks exciting, and I will raise a separated PR soon.

Np. #17132 is merged to main and will be released in v1.11. Feel free to try it out beforehand.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants