Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): add es retry_on_conflict and max_task_num #17867

Merged
merged 7 commits into from
Aug 29, 2024
Merged

Conversation

xxhZs
Copy link
Contributor

@xxhZs xxhZs commented Jul 30, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

max_task_num:Maximum number of lines written to es at the same time
retry_on_conflict:Number of retries for es ooc

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

We add two optional option for es sink
retry_on_conflict : default is 3. The num of retries after an optimistic lock failure for es
batch_size_kb : the max size of each request batch
batch_num_messages: the max rows of each request batch
(batch exceeds the limit of batch_size_kb or batch_num_messages, we will send a batch)
concurrent_requests :the max number of threads per degree of parallelism

@zwang28
Copy link
Contributor

zwang28 commented Jul 30, 2024

Is it possible these properties be made configurable, e.g. from the toml, rather than being fixed during sink creation?

@xxhZs xxhZs requested a review from hzxa21 July 30, 2024 09:55
@hzxa21 hzxa21 added the user-facing-changes Contains changes that are visible to users label Jul 31, 2024
checkEsWriteResultResp(esWriteResultResp);
}

if (taskCount >= maxTaskNum) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ES bulkprocessoer seems to be built-in support for batching and concurrency limit. Should we use setBulkActions(maxTaskNum) instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like setBulkActions is used to set the batch size, and this is to set the parallelism of writing to es. They seem to control two things

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, setConcurrentRequests is used to control the concurrency and setBulkActions is used to control the batch size. We use setConcurrentRequests(1) in es sink so I guess the concurrency is always 1. That is why I thought the maxTaskNum is used to control batch size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix

remove
@xxhZs xxhZs force-pushed the xxh/add-es-retry branch from d0032e4 to c01e4b0 Compare August 28, 2024 08:52
@xxhZs xxhZs requested a review from hzxa21 August 28, 2024 08:55
Copy link
Collaborator

@hzxa21 hzxa21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest LGTM

@JsonProperty(value = "bulk_actions")
private Integer bulkActions;

@JsonProperty(value = "bulk_size")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

batch_size_kb

@JsonProperty(value = "retry_on_conflict")
private Integer retryOnConflict;

@JsonProperty(value = "bulk_actions")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

batch_num_messages

@xxhZs xxhZs enabled auto-merge August 29, 2024 02:52
@xxhZs xxhZs added this pull request to the merge queue Aug 29, 2024
Merged via the queue into main with commit 2529c56 Aug 29, 2024
32 of 33 checks passed
@xxhZs xxhZs deleted the xxh/add-es-retry branch August 29, 2024 03:54
github-merge-queue bot pushed a commit that referenced this pull request Sep 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants