-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sink): support es sink struct and refactor es sink #14231
Conversation
remove sout
b2c8006
to
f7ebc49
Compare
how about adding a test case of decimal into integration tests |
decimal will coverted to text |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM.
@wenym1 PTAL as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we have some e2e test to cover the struct case of es?
@@ -134,8 +140,8 @@ pub enum CustomJsonType { | |||
// The internal order of the struct should follow the insertion order. | |||
// The decimal needs verification and calibration. | |||
Doris(HashMap<String, (u8, u8)>), | |||
// Bigquery's json need date is string. | |||
Bigquery, | |||
// Es's json need jsonb is struct |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the Bigquery
cases is removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because I found that our code went through some refactoring to include the data types, bigquery is no longer a special option and can be used with other sinks using common creation logic
src/connector/src/sink/remote.rs
Outdated
@@ -174,11 +188,12 @@ async fn validate_remote_sink(param: &SinkParam) -> Result<()> { | |||
| DataType::Jsonb | |||
| DataType::Bytea | |||
| DataType::List(_) | |||
| DataType::Struct(_) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Struct
is only valid for es sink. We should reject struct type for other sinks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
array
only support int16
, int32
, int64
, float
, double
, varchar
type. How about reject other types as well for other sinks?
#13866
@@ -207,6 +210,17 @@ private void bindSink( | |||
String connectorName = getConnectorName(sinkParam); | |||
SinkFactory sinkFactory = SinkUtils.getSinkFactory(connectorName); | |||
sink = sinkFactory.createWriter(tableSchema, sinkParam.getPropertiesMap()); | |||
if (connectorName.equals("elasticsearch")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's unnecessary to set the special schema, since the schema is fixed and we should not access it in es
sink.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The schema here is for use by the StreamChunkDeserializer, which needs to use our mock schema, not the original one deserialize StreamChunk
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see.
Previously we assume the stream chunk schema is the same as the sink logical schema. Now we have broken the assumption. If so, we think we should have a separated field named like payload_schema
in the StartSink
proto. The StreamChunkDeserializer will now use the schema in the field instead of the field in the SinkParam
.
On rust side, when we send the initial start sink request, for es sink we fill in the special schema, and for other sink we fill in the original sink schema, so that on java side we don't need this special logic.
@@ -198,13 +191,9 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) { | |||
this.bulkProcessor = createBulkProcessor(this.requestTracker); | |||
|
|||
primaryKeyIndexes = new ArrayList<Integer>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The primaryKeyIndexes
should not be used any more and should be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we still need a schema from the upstream, why we don't need pk index, even though the schema is (varchar, jsonb) in most cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this PR, all processing is handled in rust side, including generating the doc id and processing the content body. On java side we just build and send request from the processed doc id and content body, so the pk index is not used on java side.
final String key = (String) row.get(0); | ||
String doc = (String) row.get(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Concern about the forward compatibility
src/connector/src/sink/remote.rs
Outdated
enum StreamChunkConverter { | ||
Es(EsStreamChunkConverter), | ||
Other, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not quite get the design, if the logic is specified to es, maybe we can implement it inside es sink.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like
In this PR, all processing is handled in rust side, including generating the doc id and processing the content body. On java side we just build and send request from the processed doc id and content body, so the pk index is not used on java side.
The goal is to make it easy to implement the struct of es.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM. Thanks for the PR!
887d002
to
bd4a8a9
Compare
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Currently, we will be converted to id + json row in rust, and then packaged into chunk, after java's es sink directly forwards our json.
and fix this issue(decimal will coverted to text)
complete #14110
fix #13992
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.