-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sink): support es sink struct and refactor es sink #14231
Changes from 7 commits
c931597
70c48a9
f7ebc49
dfa9b79
9f5ae31
116dff8
9365a08
12a164d
937858e
611ddd2
f24254c
f4f4689
2dcd7eb
db75d4e
61d0999
90d698a
bd4a8a9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,8 +16,6 @@ | |
|
||
import com.fasterxml.jackson.core.JsonProcessingException; | ||
import com.fasterxml.jackson.databind.JsonMappingException; | ||
import com.fasterxml.jackson.databind.JsonNode; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import com.risingwave.connector.api.TableSchema; | ||
import com.risingwave.connector.api.sink.SinkRow; | ||
import com.risingwave.connector.api.sink.SinkWriterBase; | ||
|
@@ -27,7 +25,6 @@ | |
import java.util.concurrent.BlockingQueue; | ||
import java.util.concurrent.LinkedBlockingQueue; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.stream.Collectors; | ||
import org.apache.http.HttpHost; | ||
import org.apache.http.auth.AuthScope; | ||
import org.apache.http.auth.UsernamePasswordCredentials; | ||
|
@@ -47,6 +44,7 @@ | |
import org.elasticsearch.common.unit.ByteSizeUnit; | ||
import org.elasticsearch.common.unit.ByteSizeValue; | ||
import org.elasticsearch.core.TimeValue; | ||
import org.elasticsearch.xcontent.XContentType; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
|
@@ -67,11 +65,6 @@ public class EsSink extends SinkWriterBase { | |
private static final Logger LOG = LoggerFactory.getLogger(EsSink.class); | ||
private static final String ERROR_REPORT_TEMPLATE = "Error message %s"; | ||
|
||
private static final TimeZone UTCTimeZone = TimeZone.getTimeZone("UTC"); | ||
private final SimpleDateFormat tDfm; | ||
private final SimpleDateFormat tsDfm; | ||
private final SimpleDateFormat tstzDfm; | ||
|
||
private final EsSinkConfig config; | ||
private BulkProcessor bulkProcessor; | ||
private final RestHighLevelClient client; | ||
|
@@ -198,13 +191,9 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) { | |
this.bulkProcessor = createBulkProcessor(this.requestTracker); | ||
|
||
primaryKeyIndexes = new ArrayList<Integer>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we still need a schema from the upstream, why we don't need pk index, even though the schema is (varchar, jsonb) in most cases. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this PR, all processing is handled in rust side, including generating the doc id and processing the content body. On java side we just build and send request from the processed doc id and content body, so the pk index is not used on java side. |
||
for (String primaryKey : tableSchema.getPrimaryKeys()) { | ||
primaryKeyIndexes.add(tableSchema.getColumnIndex(primaryKey)); | ||
for (String primaryKey : getTableSchema().getPrimaryKeys()) { | ||
primaryKeyIndexes.add(getTableSchema().getColumnIndex(primaryKey)); | ||
} | ||
|
||
tDfm = createSimpleDateFormat("HH:mm:ss.SSS", UTCTimeZone); | ||
tsDfm = createSimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS", UTCTimeZone); | ||
tstzDfm = createSimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", UTCTimeZone); | ||
} | ||
|
||
private static RestClientBuilder configureRestClientBuilder( | ||
|
@@ -297,116 +286,20 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) | |
} | ||
} | ||
|
||
/** | ||
* The api accepts doc in map form. | ||
* | ||
* @param row | ||
* @return Map from Field name to Value | ||
* @throws JsonProcessingException | ||
* @throws JsonMappingException | ||
*/ | ||
private Map<String, Object> buildDoc(SinkRow row) | ||
throws JsonMappingException, JsonProcessingException { | ||
Map<String, Object> doc = new HashMap(); | ||
var tableSchema = getTableSchema(); | ||
var columnDescs = tableSchema.getColumnDescs(); | ||
for (int i = 0; i < row.size(); i++) { | ||
var type = columnDescs.get(i).getDataType().getTypeName(); | ||
Object col = row.get(i); | ||
switch (type) { | ||
// es client doesn't natively support java.sql.Timestamp/Time/Date | ||
// so we need to convert Date/Time/Timestamp type into a string as suggested in | ||
// https://github.com/elastic/elasticsearch/issues/31377#issuecomment-398102292 | ||
case DATE: | ||
col = col.toString(); | ||
break; | ||
// construct java.sql.Time/Timestamp with milliseconds time value. | ||
// it will use system timezone by default, so we have to set timezone manually | ||
case TIME: | ||
col = tDfm.format(col); | ||
break; | ||
case TIMESTAMP: | ||
col = tsDfm.format(col); | ||
break; | ||
case TIMESTAMPTZ: | ||
col = tstzDfm.format(col); | ||
break; | ||
case JSONB: | ||
ObjectMapper mapper = new ObjectMapper(); | ||
JsonNode jsonNode = mapper.readTree((String) col); | ||
col = convertJsonNode(jsonNode); | ||
break; | ||
default: | ||
break; | ||
} | ||
|
||
doc.put(getTableSchema().getColumnDesc(i).getName(), col); | ||
} | ||
return doc; | ||
} | ||
|
||
private static Object convertJsonNode(JsonNode jsonNode) { | ||
if (jsonNode.isObject()) { | ||
Map<String, Object> resultMap = new HashMap<>(); | ||
jsonNode.fields() | ||
.forEachRemaining( | ||
entry -> { | ||
resultMap.put(entry.getKey(), convertJsonNode(entry.getValue())); | ||
}); | ||
return resultMap; | ||
} else if (jsonNode.isArray()) { | ||
List<Object> resultList = new ArrayList<>(); | ||
jsonNode.elements() | ||
.forEachRemaining( | ||
element -> { | ||
resultList.add(convertJsonNode(element)); | ||
}); | ||
return resultList; | ||
} else if (jsonNode.isNumber()) { | ||
return jsonNode.numberValue(); | ||
} else if (jsonNode.isTextual()) { | ||
return jsonNode.textValue(); | ||
} else if (jsonNode.isBoolean()) { | ||
return jsonNode.booleanValue(); | ||
} else if (jsonNode.isNull()) { | ||
return null; | ||
} else { | ||
throw new IllegalArgumentException("Unsupported JSON type"); | ||
} | ||
} | ||
|
||
/** | ||
* use primary keys as id concatenated by a specific delimiter. | ||
* | ||
* @param row | ||
* @return | ||
*/ | ||
private String buildId(SinkRow row) { | ||
String id; | ||
if (primaryKeyIndexes.isEmpty()) { | ||
id = row.get(0).toString(); | ||
} else { | ||
List<String> keys = | ||
primaryKeyIndexes.stream() | ||
.map(index -> row.get(primaryKeyIndexes.get(index)).toString()) | ||
.collect(Collectors.toList()); | ||
id = String.join(config.getDelimiter(), keys); | ||
} | ||
return id; | ||
} | ||
|
||
private void processUpsert(SinkRow row) throws JsonMappingException, JsonProcessingException { | ||
Map<String, Object> doc = buildDoc(row); | ||
final String key = buildId(row); | ||
final String key = (String) row.get(0); | ||
String doc = (String) row.get(1); | ||
Comment on lines
+282
to
+283
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Concern about the forward compatibility |
||
|
||
UpdateRequest updateRequest = | ||
new UpdateRequest(config.getIndex(), "doc", key).doc(doc).upsert(doc); | ||
new UpdateRequest(config.getIndex(), "doc", key).doc(doc, XContentType.JSON); | ||
updateRequest.docAsUpsert(true); | ||
this.requestTracker.addWriteTask(); | ||
bulkProcessor.add(updateRequest); | ||
} | ||
|
||
private void processDelete(SinkRow row) { | ||
final String key = buildId(row); | ||
private void processDelete(SinkRow row) throws JsonMappingException, JsonProcessingException { | ||
final String key = (String) row.get(0); | ||
|
||
DeleteRequest deleteRequest = new DeleteRequest(config.getIndex(), "doc", key); | ||
this.requestTracker.addWriteTask(); | ||
bulkProcessor.add(deleteRequest); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's unnecessary to set the special schema, since the schema is fixed and we should not access it in es
sink.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The schema here is for use by the StreamChunkDeserializer, which needs to use our mock schema, not the original one deserialize StreamChunk
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see.
Previously we assume the stream chunk schema is the same as the sink logical schema. Now we have broken the assumption. If so, we think we should have a separated field named like
payload_schema
in theStartSink
proto. The StreamChunkDeserializer will now use the schema in the field instead of the field in theSinkParam
.On rust side, when we send the initial start sink request, for es sink we fill in the special schema, and for other sink we fill in the original sink schema, so that on java side we don't need this special logic.