Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): support es sink struct and refactor es sink #14231

Merged
merged 17 commits into from
Jan 4, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
import com.risingwave.metrics.ConnectorNodeMetrics;
import com.risingwave.metrics.MonitoredRowIterable;
import com.risingwave.proto.ConnectorServiceProto;
import com.risingwave.proto.Data;
import com.risingwave.proto.Data.DataType.TypeName;
import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -207,6 +210,17 @@ private void bindSink(
String connectorName = getConnectorName(sinkParam);
SinkFactory sinkFactory = SinkUtils.getSinkFactory(connectorName);
sink = sinkFactory.createWriter(tableSchema, sinkParam.getPropertiesMap());
if (connectorName.equals("elasticsearch")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unnecessary to set the special schema, since the schema is fixed and we should not access it in es
sink.

Copy link
Contributor Author

@xxhZs xxhZs Dec 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The schema here is for use by the StreamChunkDeserializer, which needs to use our mock schema, not the original one deserialize StreamChunk

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

Previously we assume the stream chunk schema is the same as the sink logical schema. Now we have broken the assumption. If so, we think we should have a separated field named like payload_schema in the StartSink proto. The StreamChunkDeserializer will now use the schema in the field instead of the field in the SinkParam.

On rust side, when we send the initial start sink request, for es sink we fill in the special schema, and for other sink we fill in the original sink schema, so that on java side we don't need this special logic.

tableSchema =
new TableSchema(
List.of("id", "json_result"),
List.of(
Data.DataType.newBuilder()
.setTypeName(TypeName.VARCHAR)
.build(),
Data.DataType.newBuilder().setTypeName(TypeName.JSONB).build()),
List.of());
}
switch (format) {
case FORMAT_UNSPECIFIED:
case UNRECOGNIZED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ public void testEsSink(ElasticsearchContainer container, String username, String
getTestTableSchema());
sink.write(
Iterators.forArray(
new ArraySinkRow(Op.INSERT, 1, "Alice"),
new ArraySinkRow(Op.INSERT, 2, "Bob")));
new ArraySinkRow(Op.INSERT, "1$Alice", "{\"id\":1,\"name\":\"Alice\"}"),
new ArraySinkRow(Op.INSERT, "2$Bob", "{\"id\":2,\"name\":\"Bob\"}")));
sink.sync();
// container is slow here, but our default flush time is 5s,
// so 3s is enough for sync test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.api.sink.SinkRow;
import com.risingwave.connector.api.sink.SinkWriterBase;
Expand All @@ -27,7 +25,6 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
Expand All @@ -47,6 +44,7 @@
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -67,11 +65,6 @@ public class EsSink extends SinkWriterBase {
private static final Logger LOG = LoggerFactory.getLogger(EsSink.class);
private static final String ERROR_REPORT_TEMPLATE = "Error message %s";

private static final TimeZone UTCTimeZone = TimeZone.getTimeZone("UTC");
private final SimpleDateFormat tDfm;
private final SimpleDateFormat tsDfm;
private final SimpleDateFormat tstzDfm;

private final EsSinkConfig config;
private BulkProcessor bulkProcessor;
private final RestHighLevelClient client;
Expand Down Expand Up @@ -198,13 +191,9 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) {
this.bulkProcessor = createBulkProcessor(this.requestTracker);

primaryKeyIndexes = new ArrayList<Integer>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The primaryKeyIndexes should not be used any more and should be removed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we still need a schema from the upstream, why we don't need pk index, even though the schema is (varchar, jsonb) in most cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this PR, all processing is handled in rust side, including generating the doc id and processing the content body. On java side we just build and send request from the processed doc id and content body, so the pk index is not used on java side.

for (String primaryKey : tableSchema.getPrimaryKeys()) {
primaryKeyIndexes.add(tableSchema.getColumnIndex(primaryKey));
for (String primaryKey : getTableSchema().getPrimaryKeys()) {
primaryKeyIndexes.add(getTableSchema().getColumnIndex(primaryKey));
}

tDfm = createSimpleDateFormat("HH:mm:ss.SSS", UTCTimeZone);
tsDfm = createSimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS", UTCTimeZone);
tstzDfm = createSimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", UTCTimeZone);
}

private static RestClientBuilder configureRestClientBuilder(
Expand Down Expand Up @@ -297,116 +286,20 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
}
}

/**
* The api accepts doc in map form.
*
* @param row
* @return Map from Field name to Value
* @throws JsonProcessingException
* @throws JsonMappingException
*/
private Map<String, Object> buildDoc(SinkRow row)
throws JsonMappingException, JsonProcessingException {
Map<String, Object> doc = new HashMap();
var tableSchema = getTableSchema();
var columnDescs = tableSchema.getColumnDescs();
for (int i = 0; i < row.size(); i++) {
var type = columnDescs.get(i).getDataType().getTypeName();
Object col = row.get(i);
switch (type) {
// es client doesn't natively support java.sql.Timestamp/Time/Date
// so we need to convert Date/Time/Timestamp type into a string as suggested in
// https://github.com/elastic/elasticsearch/issues/31377#issuecomment-398102292
case DATE:
col = col.toString();
break;
// construct java.sql.Time/Timestamp with milliseconds time value.
// it will use system timezone by default, so we have to set timezone manually
case TIME:
col = tDfm.format(col);
break;
case TIMESTAMP:
col = tsDfm.format(col);
break;
case TIMESTAMPTZ:
col = tstzDfm.format(col);
break;
case JSONB:
ObjectMapper mapper = new ObjectMapper();
JsonNode jsonNode = mapper.readTree((String) col);
col = convertJsonNode(jsonNode);
break;
default:
break;
}

doc.put(getTableSchema().getColumnDesc(i).getName(), col);
}
return doc;
}

private static Object convertJsonNode(JsonNode jsonNode) {
if (jsonNode.isObject()) {
Map<String, Object> resultMap = new HashMap<>();
jsonNode.fields()
.forEachRemaining(
entry -> {
resultMap.put(entry.getKey(), convertJsonNode(entry.getValue()));
});
return resultMap;
} else if (jsonNode.isArray()) {
List<Object> resultList = new ArrayList<>();
jsonNode.elements()
.forEachRemaining(
element -> {
resultList.add(convertJsonNode(element));
});
return resultList;
} else if (jsonNode.isNumber()) {
return jsonNode.numberValue();
} else if (jsonNode.isTextual()) {
return jsonNode.textValue();
} else if (jsonNode.isBoolean()) {
return jsonNode.booleanValue();
} else if (jsonNode.isNull()) {
return null;
} else {
throw new IllegalArgumentException("Unsupported JSON type");
}
}

/**
* use primary keys as id concatenated by a specific delimiter.
*
* @param row
* @return
*/
private String buildId(SinkRow row) {
String id;
if (primaryKeyIndexes.isEmpty()) {
id = row.get(0).toString();
} else {
List<String> keys =
primaryKeyIndexes.stream()
.map(index -> row.get(primaryKeyIndexes.get(index)).toString())
.collect(Collectors.toList());
id = String.join(config.getDelimiter(), keys);
}
return id;
}

private void processUpsert(SinkRow row) throws JsonMappingException, JsonProcessingException {
Map<String, Object> doc = buildDoc(row);
final String key = buildId(row);
final String key = (String) row.get(0);
String doc = (String) row.get(1);
Comment on lines +282 to +283
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concern about the forward compatibility


UpdateRequest updateRequest =
new UpdateRequest(config.getIndex(), "doc", key).doc(doc).upsert(doc);
new UpdateRequest(config.getIndex(), "doc", key).doc(doc, XContentType.JSON);
updateRequest.docAsUpsert(true);
this.requestTracker.addWriteTask();
bulkProcessor.add(updateRequest);
}

private void processDelete(SinkRow row) {
final String key = buildId(row);
private void processDelete(SinkRow row) throws JsonMappingException, JsonProcessingException {
final String key = (String) row.get(0);

DeleteRequest deleteRequest = new DeleteRequest(config.getIndex(), "doc", key);
this.requestTracker.addWriteTask();
bulkProcessor.add(deleteRequest);
Expand Down
10 changes: 8 additions & 2 deletions src/connector/src/sink/big_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ use url::Url;
use with_options::WithOptions;
use yup_oauth2::ServiceAccountKey;

use super::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode};
use super::encoder::{
DateHandlingMode, JsonEncoder, RowEncoder, TimeHandlingMode, TimestampHandlingMode,
TimestamptzHandlingMode,
};
use super::writer::LogSinkerOf;
use super::{SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT};
use crate::aws_utils::load_file_descriptor_from_s3;
Expand Down Expand Up @@ -306,10 +309,13 @@ impl BigQuerySinkWriter {
client,
is_append_only,
insert_request: TableDataInsertAllRequest::new(),
row_encoder: JsonEncoder::new_with_big_query(
row_encoder: JsonEncoder::new(
schema,
None,
DateHandlingMode::String,
TimestampHandlingMode::String,
TimestamptzHandlingMode::UtcString,
TimeHandlingMode::Milli,
),
})
}
Expand Down
Loading
Loading