-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sink): add es retry_on_conflict and max_task_num #17867
Conversation
Is it possible these properties be made configurable, e.g. from the toml, rather than being fixed during sink creation? |
checkEsWriteResultResp(esWriteResultResp); | ||
} | ||
|
||
if (taskCount >= maxTaskNum) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ES bulkprocessoer seems to be built-in support for batching and concurrency limit. Should we use setBulkActions(maxTaskNum)
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like setBulkActions is used to set the batch size, and this is to set the parallelism of writing to es. They seem to control two things
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, setConcurrentRequests
is used to control the concurrency and setBulkActions
is used to control the batch size. We use setConcurrentRequests(1)
in es sink so I guess the concurrency is always 1. That is why I thought the maxTaskNum
is used to control batch size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix
d0032e4
to
c01e4b0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM
...risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java
Outdated
Show resolved
Hide resolved
@JsonProperty(value = "bulk_actions") | ||
private Integer bulkActions; | ||
|
||
@JsonProperty(value = "bulk_size") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
batch_size_kb
@JsonProperty(value = "retry_on_conflict") | ||
private Integer retryOnConflict; | ||
|
||
@JsonProperty(value = "bulk_actions") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
batch_num_messages
Co-authored-by: Xinhao Xu <[email protected]>
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
max_task_num:Maximum number of lines written to es at the same time
retry_on_conflict:Number of retries for es ooc
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
We add two optional option for es sink
retry_on_conflict
: default is 3. The num of retries after an optimistic lock failure for esbatch_size_kb
: the max size of each request batchbatch_num_messages
: the max rows of each request batch(batch exceeds the limit of
batch_size_kb
orbatch_num_messages
, we will send a batch)concurrent_requests
:the max number of threads per degree of parallelism