Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): support es sink struct and refactor es sink #14231

Merged
merged 17 commits into from
Jan 4, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@
import com.risingwave.metrics.ConnectorNodeMetrics;
import com.risingwave.metrics.MonitoredRowIterable;
import com.risingwave.proto.ConnectorServiceProto;
import com.risingwave.proto.Data;
import com.risingwave.proto.Data.DataType.TypeName;
import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -104,7 +101,7 @@ public void onNext(ConnectorServiceProto.SinkWriterStreamRequest sinkTask) {
.asRuntimeException();
}
sinkId = sinkTask.getStart().getSinkParam().getSinkId();
bindSink(sinkTask.getStart().getSinkParam(), sinkTask.getStart().getFormat());
bindSink(sinkTask.getStart());
currentEpoch = null;
currentBatchId = null;
epochStarted = false;
Expand Down Expand Up @@ -204,35 +201,35 @@ private void cleanup() {
}

private void bindSink(
ConnectorServiceProto.SinkParam sinkParam,
ConnectorServiceProto.SinkPayloadFormat format) {
com.risingwave.proto.ConnectorServiceProto.SinkWriterStreamRequest.StartSink
startSink) {
var sinkParam = startSink.getSinkParam();
var format = startSink.getSinkPayloadFormatCase();
tableSchema = TableSchema.fromProto(sinkParam.getTableSchema());
String connectorName = getConnectorName(sinkParam);
SinkFactory sinkFactory = SinkUtils.getSinkFactory(connectorName);
sink = sinkFactory.createWriter(tableSchema, sinkParam.getPropertiesMap());
if (connectorName.equals("elasticsearch")) {
tableSchema =
new TableSchema(
List.of("id", "json_result"),
List.of(
Data.DataType.newBuilder()
.setTypeName(TypeName.VARCHAR)
.build(),
Data.DataType.newBuilder().setTypeName(TypeName.JSONB).build()),
List.of());
}

switch (format) {
case FORMAT_UNSPECIFIED:
case UNRECOGNIZED:
case UNSPECIFIED_FORMAT:
case SINKPAYLOADFORMAT_NOT_SET:
throw INVALID_ARGUMENT
.withDescription("should specify payload format in request")
.asRuntimeException();
case JSON:
case JSON_FORMAT:
deserializer = new JsonDeserializer(tableSchema);
break;
case STREAM_CHUNK:
case STREAM_CHUNK_FORMAT:
deserializer = new StreamChunkDeserializer(tableSchema);
break;
case STREAM_CHUNK_WITH_SCHEMA_FORMAT:
deserializer =
new StreamChunkDeserializer(
TableSchema.fromProto(
startSink
.getStreamChunkWithSchemaFormat()
.getTableSchema()));
break;
}
this.connectorName = connectorName.toUpperCase();
ConnectorNodeMetrics.incActiveSinkConnections(connectorName, "node1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,6 @@ public class EsSink extends SinkWriterBase {
// Used to handle the return message of ES and throw errors
private final RequestTracker requestTracker;

// For bulk listener
private final List<Integer> primaryKeyIndexes;

class RequestTracker {
// Used to save the return results of es asynchronous writes. The capacity is Integer.Max
private final BlockingQueue<EsWriteResultResp> blockingQueue = new LinkedBlockingQueue<>();
Expand Down Expand Up @@ -189,11 +186,6 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) {
throw Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException();
}
this.bulkProcessor = createBulkProcessor(this.requestTracker);

primaryKeyIndexes = new ArrayList<Integer>();
for (String primaryKey : getTableSchema().getPrimaryKeys()) {
primaryKeyIndexes.add(getTableSchema().getColumnIndex(primaryKey));
}
}

private static RestClientBuilder configureRestClientBuilder(
Expand Down Expand Up @@ -291,7 +283,7 @@ private void processUpsert(SinkRow row) throws JsonMappingException, JsonProcess
String doc = (String) row.get(1);

UpdateRequest updateRequest =
new UpdateRequest(config.getIndex(), "doc", key).doc(doc, XContentType.JSON);
new UpdateRequest(config.getIndex(), "_doc", key).doc(doc, XContentType.JSON);
updateRequest.docAsUpsert(true);
this.requestTracker.addWriteTask();
bulkProcessor.add(updateRequest);
Expand All @@ -300,7 +292,7 @@ private void processUpsert(SinkRow row) throws JsonMappingException, JsonProcess
private void processDelete(SinkRow row) throws JsonMappingException, JsonProcessingException {
final String key = (String) row.get(0);

DeleteRequest deleteRequest = new DeleteRequest(config.getIndex(), "doc", key);
DeleteRequest deleteRequest = new DeleteRequest(config.getIndex(), "_doc", key);
this.requestTracker.addWriteTask();
bulkProcessor.add(deleteRequest);
}
Expand Down
19 changes: 14 additions & 5 deletions proto/connector_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,25 @@
optional uint32 target_table = 8;
}

enum SinkPayloadFormat {
FORMAT_UNSPECIFIED = 0;
JSON = 1;
STREAM_CHUNK = 2;
message JsonFormat{}

message StreamChunkFormat{}

message UnspecifiedFormat{}

message StreamChunkWithSchemaFormat{
xxhZs marked this conversation as resolved.
Show resolved Hide resolved
TableSchema table_schema = 1;
}

message SinkWriterStreamRequest {
message StartSink {

Check failure on line 44 in proto/connector_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "2" with name "format" on message "StartSink" was deleted without reserving the number "2".
SinkParam sink_param = 1;
SinkPayloadFormat format = 2;
oneof sink_payload_format {
JsonFormat json_format = 11;
StreamChunkFormat stream_chunk_format = 12;
UnspecifiedFormat unspecified_format = 13;
StreamChunkWithSchemaFormat stream_chunk_with_schema_format = 14;
}
}

message WriteBatch {
Expand Down
9 changes: 6 additions & 3 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ use risingwave_common_service::tracing::TracingExtractLayer;
use risingwave_connector::source::monitor::GLOBAL_SOURCE_METRICS;
use risingwave_pb::common::WorkerType;
use risingwave_pb::compute::config_service_server::ConfigServiceServer;
use risingwave_pb::connector_service::SinkPayloadFormat;
use risingwave_pb::connector_service::sink_writer_stream_request::start_sink::SinkPayloadFormat;
use risingwave_pb::connector_service::{JsonFormat, StreamChunkFormat};
use risingwave_pb::health::health_server::HealthServer;
use risingwave_pb::meta::add_worker_node_request::Property;
use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
Expand Down Expand Up @@ -332,8 +333,10 @@ pub async fn compute_node_serve(
let connector_params = risingwave_connector::ConnectorParams {
connector_client,
sink_payload_format: match opts.connector_rpc_sink_payload_format.as_deref() {
None | Some("stream_chunk") => SinkPayloadFormat::StreamChunk,
Some("json") => SinkPayloadFormat::Json,
None | Some("stream_chunk") => {
SinkPayloadFormat::StreamChunkFormat(StreamChunkFormat {})
}
Some("json") => SinkPayloadFormat::JsonFormat(JsonFormat {}),
_ => {
unreachable!(
"invalid sink payload format: {:?}. Should be either json or stream_chunk",
Expand Down
13 changes: 11 additions & 2 deletions src/connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
use std::time::Duration;

use duration_str::parse_std;
use risingwave_pb::connector_service::SinkPayloadFormat;
use risingwave_pb::connector_service::sink_writer_stream_request::start_sink::SinkPayloadFormat;
use risingwave_pb::connector_service::UnspecifiedFormat;
use risingwave_rpc_client::ConnectorClient;
use serde::de;

Expand All @@ -60,11 +61,19 @@ mod with_options;
#[cfg(test)]
mod with_options_test;

#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug)]
pub struct ConnectorParams {
pub connector_client: Option<ConnectorClient>,
pub sink_payload_format: SinkPayloadFormat,
}
impl Default for ConnectorParams {
fn default() -> Self {
Self {
connector_client: Default::default(),
sink_payload_format: SinkPayloadFormat::UnspecifiedFormat(UnspecifiedFormat {}),
}
}
}

impl ConnectorParams {
pub fn new(
Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/sink/encoder/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ fn datum_to_json_object(
}
json!(v_string)
}
_ => {
CustomJsonType::Es | CustomJsonType::None => {
json!(v.to_text())
}
},
Expand Down Expand Up @@ -271,7 +271,7 @@ fn datum_to_json_object(
}
(DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => match custom_json_type {
CustomJsonType::Es => JsonbVal::from(jsonb_ref).take(),
_ => json!(jsonb_ref.to_string()),
CustomJsonType::Doris(_) | CustomJsonType::None => json!(jsonb_ref.to_string()),
},
(DataType::List(datatype), ScalarRefImpl::List(list_ref)) => {
let elems = list_ref.iter();
Expand Down Expand Up @@ -315,7 +315,7 @@ fn datum_to_json_object(
ArrayError::internal(format!("Json to string err{:?}", err))
})?)
}
_ => {
CustomJsonType::Es | CustomJsonType::None => {
let mut map = Map::with_capacity(st.len());
for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug(
st.iter()
Expand Down
Loading
Loading