From 1b70059714d6e52cd50f6bed7be92fb1a09ec367 Mon Sep 17 00:00:00 2001 From: "Zhanxiang (Patrick) Huang" Date: Wed, 20 Sep 2023 22:13:14 +0800 Subject: [PATCH] feat: use Map to represent json field when building doc in es sink (#12464) --- .../java/com/risingwave/connector/EsSink.java | 40 ++++++++++++++----- 1 file changed, 31 insertions(+), 9 deletions(-) diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java index 7c1727f4a82f3..178c76edf3e40 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java @@ -14,6 +14,10 @@ package com.risingwave.connector; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.risingwave.connector.api.TableSchema; import com.risingwave.connector.api.sink.SinkRow; import com.risingwave.connector.api.sink.SinkWriterBase; @@ -183,17 +187,35 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) * * @param row * @return Map from Field name to Value + * @throws JsonProcessingException + * @throws JsonMappingException */ - private Map buildDoc(SinkRow row) { + private Map buildDoc(SinkRow row) + throws JsonMappingException, JsonProcessingException { Map doc = new HashMap(); - for (int i = 0; i < getTableSchema().getNumColumns(); i++) { + var tableSchema = getTableSchema(); + var columnDescs = tableSchema.getColumnDescs(); + for (int i = 0; i < row.size(); i++) { + var type = columnDescs.get(i).getDataType().getTypeName(); Object col = row.get(i); - if (col instanceof Date) { - // es client doesn't natively support java.sql.Timestamp/Time/Date - // so we need to convert Date type into a string as suggested in - // https://github.com/elastic/elasticsearch/issues/31377#issuecomment-398102292 - col = col.toString(); + switch (type) { + case DATE: + // es client doesn't natively support java.sql.Timestamp/Time/Date + // so we need to convert Date type into a string as suggested in + // https://github.com/elastic/elasticsearch/issues/31377#issuecomment-398102292 + col = col.toString(); + break; + case JSONB: + ObjectMapper mapper = new ObjectMapper(); + col = + mapper.readValue( + (String) col, new TypeReference>() {}); + break; + default: + break; } + if (col instanceof Date) {} + doc.put(getTableSchema().getColumnDesc(i).getName(), col); } return doc; @@ -219,7 +241,7 @@ private String buildId(SinkRow row) { return id; } - private void processUpsert(SinkRow row) { + private void processUpsert(SinkRow row) throws JsonMappingException, JsonProcessingException { Map doc = buildDoc(row); final String key = buildId(row); @@ -234,7 +256,7 @@ private void processDelete(SinkRow row) { bulkProcessor.add(deleteRequest); } - private void writeRow(SinkRow row) { + private void writeRow(SinkRow row) throws JsonMappingException, JsonProcessingException { switch (row.getOp()) { case INSERT: case UPDATE_INSERT: