-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(sink): refactor es and opensearch to support async #17746
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After we change to use the rust implementation by default, we can also run the es e2e so that the new logic implemented in this PR can be covered in the test.
src/connector/src/sink/opensearch.rs
Outdated
use super::{DummySinkCommitCoordinator, Sink, SinkError, SinkParam, SinkWriterParam}; | ||
use crate::sink::Result; | ||
|
||
pub const OPENSEARCH_SINK: &str = "opensearch_rust"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this PR we can just deprecate the original java implementation and always use the rust implementation when the connector is opensearch
or elastic-search
. As long as the with option is the same as the java implementation, we can switch to the rust implementation without any compatibility issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already made the parameters consistent, but not sure about migrating in this pr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Calling it _rust
doesn't look good because users shouldn't care about the implementation language.
We should either
- Call it
opensearch_v2
(Then replace like Deprecate the s3 source connector and rename s3_v2 to s3 #16234) - Directly replace it and deprecate old version like @wenym1 said. (Actually I also prefer this if possible)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed with yim and decided to replace the previous implementation in this pr, so rust's will be renamed opensearch
, and java's will remain, or is it called opensearch_v1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the new impl is called opensearch
, and replaces the java impl, I think java impl will become dead code and we can directly remove it?
a663b69
to
88264e5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have any test or CI to validate whether the current implementation works?
Yes, since we've changed our name to elasticsearch, the original tests can be used on the new implementation |
5787b79
to
0086ea9
Compare
0086ea9
to
66e902d
Compare
66e902d
to
202f0e4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM.
src/connector/src/sink/elasticsearch_opensearch/elasticsearch_converter.rs
Outdated
Show resolved
Hide resolved
{"took":3,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":7,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"test2","_type":"_doc","_id":"5_5-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":5,"st2":5},"t":"20:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":5,"v2":2,"v3":"5-2"}},{"_index":"test2","_type":"_doc","_id":"8_8-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":8,"st2":8},"t":"20:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":8,"v2":2,"v3":"8-2"}},{"_index":"test2","_type":"_doc","_id":"3_3-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":3,"st2":3},"t":"00:00:00.123456","ts":"1970-01-01 00:00:00.123456","tz":"1970-01-01 00:00:00.123456","v1":3,"v2":2,"v3":"3-2"}},{"_index":"test2","_type":"_doc","_id":"13_13-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":13,"st2":13},"t":"20:00:00.123456","ts":"1970-01-01 20:00:00.123456","tz":"1970-01-01 20:00:00.123456","v1":13,"v2":2,"v3":"13-2"}},{"_index":"test2","_type":"_doc","_id":"2_2-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":2,"st2":2},"t":"00:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":2,"v2":2,"v3":"2-2"}},{"_index":"test2","_type":"_doc","_id":"1_1-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":1,"st2":1},"t":"00:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":1,"v2":2,"v3":"1-2"}},{"_index":"test2","_type":"_doc","_id":"1_1-50","_score":1.0,"_source":{"d":"2000-01-01","st":{"st1":1,"st2":1},"t":"00:00:00.123456","ts":"2000-01-01 00:00:00.123456","tz":"2000-01-01 00:00:00.123456","v1":1,"v2":50,"v3":"1-50"}}]}} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What causes the change in the result? Is the changes expected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I noticed a missing column in the previous test
src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_common.rs
Outdated
Show resolved
Hide resolved
src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_common.rs
Outdated
Show resolved
Hide resolved
src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_common.rs
Outdated
Show resolved
Hide resolved
src/connector/src/sink/remote.rs
Outdated
{ ElasticSearchJava, ElasticSearchJavaSink, "elasticsearch_v1" } | ||
{ OpensearchJava, OpenSearchJavaSink, "opensearch_v1"} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May I ask why to keep _v1
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since there are some users using java es, I'd like to be able to keep him for a while, in case something goes wrong with the migration to v2, so that they can use v1 first. Then delete v1 when v2 is more stable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To clarify,
- For users already using
connector = elasticsearch
, when they upgrade, they will switch to v2 immediately. - If they meet problems, they can manually create new sinks with
elasticsearch_v1
as a temporary workaround.
Is this what you want?
The main concern for me is that when we offer elasticsearch_v1
, it might be hard to delete the legacy implementation in the future. (It might be possible if we only make it an undocumented feature and tell only some users about it...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes yes, I'm leaning towards,Only problems that are difficult to solve with v2 will make users try v1, and no updates will be made after v1, he may just exist for a version or two as a transition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For existing users, if they encounter any problem in this new rust implementation, instead of letting user recreate a sink with connector as _v1
, I think we can make a patch and build a separate image to change back to use the java implementation for that users, and meanwhile investigate the fix the rust implementation.
If this is feasible, we can exclude the _v1
from the sink list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for the PR.
src/connector/src/sink/remote.rs
Outdated
{ ElasticSearchJava, ElasticSearchJavaSink, "elasticsearch_v1" } | ||
{ OpensearchJava, OpenSearchJavaSink, "opensearch_v1"} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For existing users, if they encounter any problem in this new rust implementation, instead of letting user recreate a sink with connector as _v1
, I think we can make a patch and build a separate image to change back to use the java implementation for that users, and meanwhile investigate the fix the rust implementation.
If this is feasible, we can exclude the _v1
from the sink list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM for Cargo.lock
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
refactor es and opensearch with https://docs.rs/opensearch/latest/opensearch/ and https://docs.rs/elasticsearch/latest/elasticsearch/
they support async, and their sink_debuple defaults to true
#17073
resolve #17443
The new sink is exactly the same as the original creation parameters. For now, we keep two copies of the code (elasticsearch and elasticsearch_rust), after which we can delete the old one.
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.