Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): support es sink struct and refactor es sink #14231

Merged
merged 17 commits into from
Jan 4, 2024
Merged

Conversation

xxhZs
Copy link
Contributor

@xxhZs xxhZs commented Dec 27, 2023

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

Currently, we will be converted to id + json row in rust, and then packaged into chunk, after java's es sink directly forwards our json.
and fix this issue(decimal will coverted to text)

complete #14110

fix #13992

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

remove sout
@xxhZs xxhZs force-pushed the xxh/add-es-struct branch from b2c8006 to f7ebc49 Compare December 27, 2023 07:53
@lmatz
Copy link
Contributor

lmatz commented Dec 27, 2023

and fix this issue(decimal will coverted to text)
#13992

how about adding a test case of decimal into integration tests

@xxhZs
Copy link
Contributor Author

xxhZs commented Dec 27, 2023

and fix this issue(decimal will coverted to text)
#13992

how about adding a test case of decimal into integration tests

decimal will coverted to text
example:
1.23213213 => "1.23213213"

@xxhZs xxhZs marked this pull request as ready for review December 27, 2023 08:46
@xxhZs xxhZs requested a review from hzxa21 December 27, 2023 08:46
Copy link
Collaborator

@hzxa21 hzxa21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest LGTM.

@wenym1 PTAL as well.

src/connector/src/sink/remote.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@wenym1 wenym1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we have some e2e test to cover the struct case of es?

src/connector/src/sink/remote.rs Outdated Show resolved Hide resolved
src/connector/src/sink/encoder/json.rs Outdated Show resolved Hide resolved
src/connector/src/sink/encoder/json.rs Outdated Show resolved Hide resolved
@@ -134,8 +140,8 @@ pub enum CustomJsonType {
// The internal order of the struct should follow the insertion order.
// The decimal needs verification and calibration.
Doris(HashMap<String, (u8, u8)>),
// Bigquery's json need date is string.
Bigquery,
// Es's json need jsonb is struct
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the Bigquery cases is removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I found that our code went through some refactoring to include the data types, bigquery is no longer a special option and can be used with other sinks using common creation logic

src/connector/src/sink/remote.rs Outdated Show resolved Hide resolved
src/connector/src/sink/remote.rs Outdated Show resolved Hide resolved
@@ -174,11 +188,12 @@ async fn validate_remote_sink(param: &SinkParam) -> Result<()> {
| DataType::Jsonb
| DataType::Bytea
| DataType::List(_)
| DataType::Struct(_)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Struct is only valid for es sink. We should reject struct type for other sinks.

Copy link
Contributor

@xuefengze xuefengze Dec 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

array only support int16, int32, int64, float, double, varchar type. How about reject other types as well for other sinks?
#13866

src/connector/src/sink/remote.rs Outdated Show resolved Hide resolved
@@ -207,6 +210,17 @@ private void bindSink(
String connectorName = getConnectorName(sinkParam);
SinkFactory sinkFactory = SinkUtils.getSinkFactory(connectorName);
sink = sinkFactory.createWriter(tableSchema, sinkParam.getPropertiesMap());
if (connectorName.equals("elasticsearch")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unnecessary to set the special schema, since the schema is fixed and we should not access it in es
sink.

Copy link
Contributor Author

@xxhZs xxhZs Dec 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The schema here is for use by the StreamChunkDeserializer, which needs to use our mock schema, not the original one deserialize StreamChunk

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

Previously we assume the stream chunk schema is the same as the sink logical schema. Now we have broken the assumption. If so, we think we should have a separated field named like payload_schema in the StartSink proto. The StreamChunkDeserializer will now use the schema in the field instead of the field in the SinkParam.

On rust side, when we send the initial start sink request, for es sink we fill in the special schema, and for other sink we fill in the original sink schema, so that on java side we don't need this special logic.

@@ -198,13 +191,9 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) {
this.bulkProcessor = createBulkProcessor(this.requestTracker);

primaryKeyIndexes = new ArrayList<Integer>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The primaryKeyIndexes should not be used any more and should be removed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we still need a schema from the upstream, why we don't need pk index, even though the schema is (varchar, jsonb) in most cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this PR, all processing is handled in rust side, including generating the doc id and processing the content body. On java side we just build and send request from the processed doc id and content body, so the pk index is not used on java side.

Comment on lines +290 to +291
final String key = (String) row.get(0);
String doc = (String) row.get(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concern about the forward compatibility

Comment on lines 246 to 249
enum StreamChunkConverter {
Es(EsStreamChunkConverter),
Other,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite get the design, if the logic is specified to es, maybe we can implement it inside es sink.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like

In this PR, all processing is handled in rust side, including generating the doc id and processing the content body. On java side we just build and send request from the processed doc id and content body, so the pk index is not used on java side.

The goal is to make it easy to implement the struct of es.

src/connector/src/sink/remote.rs Outdated Show resolved Hide resolved
proto/connector_service.proto Outdated Show resolved Hide resolved
src/connector/src/sink/remote.rs Outdated Show resolved Hide resolved
src/connector/src/sink/remote.rs Outdated Show resolved Hide resolved
src/connector/src/sink/remote.rs Outdated Show resolved Hide resolved
src/rpc_client/src/connector_client.rs Outdated Show resolved Hide resolved
src/connector/src/sink/remote.rs Outdated Show resolved Hide resolved
src/connector/src/sink/remote.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@wenym1 wenym1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest LGTM. Thanks for the PR!

src/rpc_client/src/connector_client.rs Outdated Show resolved Hide resolved
src/connector/src/sink/remote.rs Outdated Show resolved Hide resolved
src/connector/src/sink/remote.rs Outdated Show resolved Hide resolved
@xxhZs xxhZs enabled auto-merge January 4, 2024 06:52
@xxhZs xxhZs disabled auto-merge January 4, 2024 06:52
fix ci
@xxhZs xxhZs force-pushed the xxh/add-es-struct branch from 887d002 to bd4a8a9 Compare January 4, 2024 08:03
@xxhZs xxhZs added this pull request to the merge queue Jan 4, 2024
Merged via the queue into main with commit f47a892 Jan 4, 2024
28 of 29 checks passed
@xxhZs xxhZs deleted the xxh/add-es-struct branch January 4, 2024 08:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Sink: elasticsearch sink get error with decimal type
6 participants