Skip to content

Commit

Permalink
feat: use Map to represent json field when building doc in es sink (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
hzxa21 authored Sep 20, 2023
1 parent 1985bc8 commit 1b70059
Showing 1 changed file with 31 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@

package com.risingwave.connector;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.api.sink.SinkRow;
import com.risingwave.connector.api.sink.SinkWriterBase;
Expand Down Expand Up @@ -183,17 +187,35 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
*
* @param row
* @return Map from Field name to Value
* @throws JsonProcessingException
* @throws JsonMappingException
*/
private Map<String, Object> buildDoc(SinkRow row) {
private Map<String, Object> buildDoc(SinkRow row)
throws JsonMappingException, JsonProcessingException {
Map<String, Object> doc = new HashMap();
for (int i = 0; i < getTableSchema().getNumColumns(); i++) {
var tableSchema = getTableSchema();
var columnDescs = tableSchema.getColumnDescs();
for (int i = 0; i < row.size(); i++) {
var type = columnDescs.get(i).getDataType().getTypeName();
Object col = row.get(i);
if (col instanceof Date) {
// es client doesn't natively support java.sql.Timestamp/Time/Date
// so we need to convert Date type into a string as suggested in
// https://github.com/elastic/elasticsearch/issues/31377#issuecomment-398102292
col = col.toString();
switch (type) {
case DATE:
// es client doesn't natively support java.sql.Timestamp/Time/Date
// so we need to convert Date type into a string as suggested in
// https://github.com/elastic/elasticsearch/issues/31377#issuecomment-398102292
col = col.toString();
break;
case JSONB:
ObjectMapper mapper = new ObjectMapper();
col =
mapper.readValue(
(String) col, new TypeReference<Map<String, Object>>() {});
break;
default:
break;
}
if (col instanceof Date) {}

doc.put(getTableSchema().getColumnDesc(i).getName(), col);
}
return doc;
Expand All @@ -219,7 +241,7 @@ private String buildId(SinkRow row) {
return id;
}

private void processUpsert(SinkRow row) {
private void processUpsert(SinkRow row) throws JsonMappingException, JsonProcessingException {
Map<String, Object> doc = buildDoc(row);
final String key = buildId(row);

Expand All @@ -234,7 +256,7 @@ private void processDelete(SinkRow row) {
bulkProcessor.add(deleteRequest);
}

private void writeRow(SinkRow row) {
private void writeRow(SinkRow row) throws JsonMappingException, JsonProcessingException {
switch (row.getOp()) {
case INSERT:
case UPDATE_INSERT:
Expand Down

0 comments on commit 1b70059

Please sign in to comment.