Skip to content

Commit

Permalink
feat(Sink): support es dynamic index (#15835) (#16311)
Browse files Browse the repository at this point in the history
Co-authored-by: Xinhao Xu <[email protected]>
  • Loading branch information
github-actions[bot] and xxhZs authored Apr 15, 2024
1 parent b78d43c commit be1f1e2
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,17 @@ public void testEsSink(ElasticsearchContainer container, String username, String
throws IOException {
EsSink sink =
new EsSink(
new EsSinkConfig(container.getHttpHostAddress(), "test")
new EsSinkConfig(container.getHttpHostAddress())
.withIndex("test")
.withDelimiter("$")
.withUsername(username)
.withPassword(password),
getTestTableSchema());
sink.write(
Iterators.forArray(
new ArraySinkRow(Op.INSERT, "1$Alice", "{\"id\":1,\"name\":\"Alice\"}"),
new ArraySinkRow(Op.INSERT, "2$Bob", "{\"id\":2,\"name\":\"Bob\"}")));
new ArraySinkRow(
Op.INSERT, null, "1$Alice", "{\"id\":1,\"name\":\"Alice\"}"),
new ArraySinkRow(Op.INSERT, null, "2$Bob", "{\"id\":2,\"name\":\"Bob\"}")));
sink.sync();
// container is slow here, but our default flush time is 5s,
// so 3s is enough for sync test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,20 +278,32 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
}

private void processUpsert(SinkRow row) throws JsonMappingException, JsonProcessingException {
final String key = (String) row.get(0);
String doc = (String) row.get(1);

UpdateRequest updateRequest =
new UpdateRequest(config.getIndex(), "_doc", key).doc(doc, XContentType.JSON);
final String index = (String) row.get(0);
final String key = (String) row.get(1);
String doc = (String) row.get(2);

UpdateRequest updateRequest;
if (config.getIndex() != null) {
updateRequest =
new UpdateRequest(config.getIndex(), "_doc", key).doc(doc, XContentType.JSON);
} else {
updateRequest = new UpdateRequest(index, "_doc", key).doc(doc, XContentType.JSON);
}
updateRequest.docAsUpsert(true);
this.requestTracker.addWriteTask();
bulkProcessor.add(updateRequest);
}

private void processDelete(SinkRow row) throws JsonMappingException, JsonProcessingException {
final String key = (String) row.get(0);

DeleteRequest deleteRequest = new DeleteRequest(config.getIndex(), "_doc", key);
final String index = (String) row.get(0);
final String key = (String) row.get(1);

DeleteRequest deleteRequest;
if (config.getIndex() != null) {
deleteRequest = new DeleteRequest(config.getIndex(), "_doc", key);
} else {
deleteRequest = new DeleteRequest(index, "_doc", key);
}
this.requestTracker.addWriteTask();
bulkProcessor.add(deleteRequest);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ public class EsSinkConfig extends CommonSinkConfig {
/** Required */
private String url;

/** Required */
/** Optional */
@JsonProperty(value = "index")
private String index;

/** Optional, delimiter for generating id */
Expand All @@ -37,11 +38,12 @@ public class EsSinkConfig extends CommonSinkConfig {
@JsonProperty(value = "password")
private String password;

@JsonProperty(value = "index_column")
private String indexColumn;

@JsonCreator
public EsSinkConfig(
@JsonProperty(value = "url") String url, @JsonProperty(value = "index") String index) {
public EsSinkConfig(@JsonProperty(value = "url") String url) {
this.url = url;
this.index = index;
}

public String getUrl() {
Expand All @@ -52,6 +54,11 @@ public String getIndex() {
return index;
}

public EsSinkConfig withIndex(String index) {
this.index = index;
return this;
}

public String getDelimiter() {
return delimiter;
}
Expand All @@ -78,4 +85,13 @@ public EsSinkConfig withPassword(String password) {
this.password = password;
return this;
}

public String getIndexColumn() {
return indexColumn;
}

public EsSinkConfig withIndexColumn(String indexColumn) {
this.indexColumn = indexColumn;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.risingwave.connector.api.sink.SinkWriter;
import com.risingwave.connector.api.sink.SinkWriterV1;
import com.risingwave.proto.Catalog;
import com.risingwave.proto.Data;
import io.grpc.Status;
import java.io.IOException;
import java.util.Map;
Expand Down Expand Up @@ -61,6 +62,32 @@ public void validate(
} catch (IllegalArgumentException e) {
throw Status.INVALID_ARGUMENT.withDescription(e.getMessage()).asRuntimeException();
}
if (config.getIndexColumn() != null) {
Data.DataType.TypeName typeName = tableSchema.getColumnType(config.getIndexColumn());
if (typeName == null) {
throw Status.INVALID_ARGUMENT
.withDescription(
"Index column " + config.getIndexColumn() + " not found in schema")
.asRuntimeException();
}
if (!typeName.equals(Data.DataType.TypeName.VARCHAR)) {
throw Status.INVALID_ARGUMENT
.withDescription(
"Index column must be of type String, but found " + typeName)
.asRuntimeException();
}
if (config.getIndex() != null) {
throw Status.INVALID_ARGUMENT
.withDescription("index and index_column cannot be set at the same time")
.asRuntimeException();
}
} else {
if (config.getIndex() == null) {
throw Status.INVALID_ARGUMENT
.withDescription("index or index_column must be set")
.asRuntimeException();
}
}

// 2. check connection
RestClientBuilder builder = RestClient.builder(host);
Expand Down
56 changes: 45 additions & 11 deletions src/connector/src/sink/elasticsearch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::collections::HashMap;

use anyhow::anyhow;
use risingwave_common::array::{
ArrayImpl, JsonbArrayBuilder, RowRef, StreamChunk, Utf8ArrayBuilder,
ArrayBuilder, ArrayImpl, JsonbArrayBuilder, RowRef, StreamChunk, Utf8ArrayBuilder,
};
use risingwave_common::catalog::Schema;
use risingwave_common::row::Row;
Expand All @@ -27,6 +27,7 @@ use super::encoder::{JsonEncoder, RowEncoder};
use super::remote::ElasticSearchSink;
use crate::sink::{Result, Sink};
pub const ES_OPTION_DELIMITER: &str = "delimiter";
pub const ES_OPTION_INDEX_COLUMN: &str = "index_column";

pub enum StreamChunkConverter {
Es(EsStreamChunkConverter),
Expand All @@ -40,10 +41,22 @@ impl StreamChunkConverter {
properties: &HashMap<String, String>,
) -> Result<Self> {
if sink_name == ElasticSearchSink::SINK_NAME {
let index_column = properties
.get(ES_OPTION_INDEX_COLUMN)
.cloned()
.map(|n| {
schema
.fields()
.iter()
.position(|s| s.name == n)
.ok_or_else(|| anyhow!("Cannot find {}", ES_OPTION_INDEX_COLUMN))
})
.transpose()?;
Ok(StreamChunkConverter::Es(EsStreamChunkConverter::new(
schema,
pk_indices.clone(),
properties.get(ES_OPTION_DELIMITER).cloned(),
index_column,
)?))
} else {
Ok(StreamChunkConverter::Other)
Expand All @@ -60,9 +73,15 @@ impl StreamChunkConverter {
pub struct EsStreamChunkConverter {
json_encoder: JsonEncoder,
fn_build_id: Box<dyn Fn(RowRef<'_>) -> Result<String> + Send>,
index_column: Option<usize>,
}
impl EsStreamChunkConverter {
fn new(schema: Schema, pk_indices: Vec<usize>, delimiter: Option<String>) -> Result<Self> {
fn new(
schema: Schema,
pk_indices: Vec<usize>,
delimiter: Option<String>,
index_column: Option<usize>,
) -> Result<Self> {
let fn_build_id: Box<dyn Fn(RowRef<'_>) -> Result<String> + Send> = if pk_indices.is_empty()
{
Box::new(|row: RowRef<'_>| {
Expand Down Expand Up @@ -96,10 +115,18 @@ impl EsStreamChunkConverter {
Ok(keys.join(&delimiter))
})
};
let json_encoder = JsonEncoder::new_with_es(schema, None);
let col_indices = if let Some(index) = index_column {
let mut col_indices: Vec<usize> = (0..schema.len()).collect();
col_indices.remove(index);
Some(col_indices)
} else {
None
};
let json_encoder = JsonEncoder::new_with_es(schema, col_indices);
Ok(Self {
json_encoder,
fn_build_id,
index_column,
})
}

Expand All @@ -109,23 +136,30 @@ impl EsStreamChunkConverter {
<Utf8ArrayBuilder as risingwave_common::array::ArrayBuilder>::new(chunk.capacity());
let mut json_builder =
<JsonbArrayBuilder as risingwave_common::array::ArrayBuilder>::new(chunk.capacity());
let mut index_builder =
<Utf8ArrayBuilder as risingwave_common::array::ArrayBuilder>::new(chunk.capacity());
for (op, row) in chunk.rows() {
ops.push(op);
id_string_builder.append(Some(&self.build_id(row)?));
if let Some(index) = self.index_column {
index_builder.append(Some(
row.datum_at(index)
.ok_or_else(|| anyhow!("No value find in row, index is {}", index))?
.into_utf8(),
));
} else {
index_builder.append_null();
}
let json = JsonbVal::from(Value::Object(self.json_encoder.encode(row)?));
risingwave_common::array::ArrayBuilder::append(
&mut id_string_builder,
Some(&self.build_id(row)?),
);
risingwave_common::array::ArrayBuilder::append(
&mut json_builder,
Some(json.as_scalar_ref()),
);
json_builder.append(Some(json.as_scalar_ref()));
}
let json_array = risingwave_common::array::ArrayBuilder::finish(json_builder);
let id_string_array = risingwave_common::array::ArrayBuilder::finish(id_string_builder);
let index_string_array = risingwave_common::array::ArrayBuilder::finish(index_builder);
Ok(StreamChunk::new(
ops,
vec![
std::sync::Arc::new(ArrayImpl::Utf8(index_string_array)),
std::sync::Arc::new(ArrayImpl::Utf8(id_string_array)),
std::sync::Arc::new(ArrayImpl::Jsonb(json_array)),
],
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/sink/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,8 @@ impl RemoteLogSinker {
let payload_schema = if sink_name == ElasticSearchSink::SINK_NAME {
let columns = vec![
ColumnDesc::unnamed(ColumnId::from(0), DataType::Varchar).to_protobuf(),
ColumnDesc::unnamed(ColumnId::from(1), DataType::Jsonb).to_protobuf(),
ColumnDesc::unnamed(ColumnId::from(1), DataType::Varchar).to_protobuf(),
ColumnDesc::unnamed(ColumnId::from(2), DataType::Jsonb).to_protobuf(),
];
Some(TableSchema {
columns,
Expand Down

0 comments on commit be1f1e2

Please sign in to comment.