Skip to content

Commit

Permalink
fix comm
Browse files Browse the repository at this point in the history
remove print
  • Loading branch information
xxhZs committed Sep 23, 2024
1 parent 535b0b6 commit 88264e5
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 121 deletions.
6 changes: 6 additions & 0 deletions src/common/src/row/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ pub trait Row: Sized + std::fmt::Debug + PartialEq + Eq {
buf.freeze()
}

fn value_estimate_size(&self) -> usize {
self.iter()
.map(value_encoding::estimate_serialize_datum_size)
.sum()
}

/// Serializes the row with memcomparable encoding, into the given `buf`. As each datum may have
/// different order type, a `serde` should be provided.
#[inline]
Expand Down
97 changes: 28 additions & 69 deletions src/connector/src/sink/elasticsearch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@ use std::collections::BTreeMap;

use anyhow::anyhow;
use risingwave_common::array::{
ArrayBuilder, ArrayImpl, JsonbArrayBuilder, RowRef, StreamChunk, Utf8ArrayBuilder,
ArrayBuilder, ArrayImpl, JsonbArrayBuilder, StreamChunk, Utf8ArrayBuilder,
};
use risingwave_common::catalog::Schema;
use risingwave_common::row::Row;
use risingwave_common::types::{JsonbVal, Scalar, ToText};
use risingwave_common::types::{JsonbVal, Scalar};
use serde_json::Value;

use super::encoder::{JsonEncoder, RowEncoder};
use super::elasticsearch_opensearch_common::{BuildBulkPara, ElasticSearchOpenSearchFormatter};
use super::remote::{ElasticSearchSink, OpenSearchSink};
use crate::sink::{Result, Sink};
pub const ES_OPTION_DELIMITER: &str = "delimiter";
pub const ES_OPTION_INDEX_COLUMN: &str = "index_column";
pub const ES_OPTION_INDEX: &str = "index";

pub enum StreamChunkConverter {
Es(EsStreamChunkConverter),
Expand All @@ -52,11 +52,13 @@ impl StreamChunkConverter {
.ok_or_else(|| anyhow!("Cannot find {}", ES_OPTION_INDEX_COLUMN))
})
.transpose()?;
let index = properties.get(ES_OPTION_INDEX).cloned();
Ok(StreamChunkConverter::Es(EsStreamChunkConverter::new(
schema,
pk_indices.clone(),
properties.get(ES_OPTION_DELIMITER).cloned(),
index_column,
index,
)?))
} else {
Ok(StreamChunkConverter::Other)
Expand All @@ -71,87 +73,48 @@ impl StreamChunkConverter {
}
}
pub struct EsStreamChunkConverter {
json_encoder: JsonEncoder,
fn_build_id: Box<dyn Fn(RowRef<'_>) -> Result<String> + Send>,
index_column: Option<usize>,
formatter: ElasticSearchOpenSearchFormatter,
}
impl EsStreamChunkConverter {
pub fn new(
schema: Schema,
pk_indices: Vec<usize>,
delimiter: Option<String>,
index_column: Option<usize>,
index: Option<String>,
) -> Result<Self> {
let fn_build_id: Box<dyn Fn(RowRef<'_>) -> Result<String> + Send> = if pk_indices.is_empty()
{
Box::new(|row: RowRef<'_>| {
Ok(row
.datum_at(0)
.ok_or_else(|| anyhow!("No value found in row, index is 0"))?
.to_text())
})
} else if pk_indices.len() == 1 {
let index = *pk_indices.get(0).unwrap();
Box::new(move |row: RowRef<'_>| {
Ok(row
.datum_at(index)
.ok_or_else(|| anyhow!("No value found in row, index is 0"))?
.to_text())
})
} else {
let delimiter = delimiter
.as_ref()
.ok_or_else(|| anyhow!("Please set delimiter in with option"))?
.clone();
Box::new(move |row: RowRef<'_>| {
let mut keys = vec![];
for index in &pk_indices {
keys.push(
row.datum_at(*index)
.ok_or_else(|| anyhow!("No value found in row, index is {}", index))?
.to_text(),
);
}
Ok(keys.join(&delimiter))
})
};
let col_indices = if let Some(index) = index_column {
let mut col_indices: Vec<usize> = (0..schema.len()).collect();
col_indices.remove(index);
Some(col_indices)
} else {
None
};
let json_encoder = JsonEncoder::new_with_es(schema, col_indices);
Ok(Self {
json_encoder,
fn_build_id,
let formatter = ElasticSearchOpenSearchFormatter::new(
pk_indices,
&schema,
delimiter,
index_column,
})
index,
)?;
Ok(Self { formatter })
}

fn convert_chunk(&self, chunk: StreamChunk) -> Result<StreamChunk> {
let mut ops = vec![];
let mut ops = Vec::with_capacity(chunk.capacity());
let mut id_string_builder =
<Utf8ArrayBuilder as risingwave_common::array::ArrayBuilder>::new(chunk.capacity());
let mut json_builder =
<JsonbArrayBuilder as risingwave_common::array::ArrayBuilder>::new(chunk.capacity());
let mut index_builder =
<Utf8ArrayBuilder as risingwave_common::array::ArrayBuilder>::new(chunk.capacity());
for (op, row) in chunk.rows() {
ops.push(op);
id_string_builder.append(Some(&self.build_id(row)?));
if let Some(index) = self.index_column {
index_builder.append(Some(
row.datum_at(index)
.ok_or_else(|| anyhow!("No value found in row, index is {}", index))?
.into_utf8(),
));
for build_bulk_para in self.formatter.covert_chunk(chunk)? {
let BuildBulkPara {
key, value, index, ..
} = build_bulk_para;

id_string_builder.append(Some(&key));
index_builder.append(Some(&index));
if value.is_some() {
ops.push(risingwave_common::array::Op::Insert);
} else {
index_builder.append_null();
ops.push(risingwave_common::array::Op::Delete);
}
let json = JsonbVal::from(Value::Object(self.json_encoder.encode(row)?));
json_builder.append(Some(json.as_scalar_ref()));
let value = value.map(|json| JsonbVal::from(Value::Object(json)));
json_builder.append(value.as_ref().map(|json| json.as_scalar_ref()));
}
let json_array = risingwave_common::array::ArrayBuilder::finish(json_builder);
let id_string_array = risingwave_common::array::ArrayBuilder::finish(id_string_builder);
Expand All @@ -165,10 +128,6 @@ impl EsStreamChunkConverter {
],
))
}

fn build_id(&self, row: RowRef<'_>) -> Result<String> {
(self.fn_build_id)(row)
}
}

pub fn is_es_sink(sink_name: &str) -> bool {
Expand Down
57 changes: 47 additions & 10 deletions src/connector/src/sink/elasticsearch_opensearch_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,41 @@ pub struct ElasticSearchOpenSearchConfig {
#[serde_as(as = "Option<DisplayFromStr>")]
pub index_column: Option<usize>,

// #[serde(rename = "max_task_num")]
// #[serde_as(as = "Option<DisplayFromStr>")]
// pub max_task_num: Option<usize>,
#[serde(rename = "retry_on_conflict")]
pub retry_on_conflict: Option<i32>,
#[serde_as(as = "DisplayFromStr")]
#[serde(default = "default_retry_on_conflict")]
pub retry_on_conflict: i32,

#[serde(rename = "batch_num_messages")]
pub batch_num_messages: Option<usize>,
#[serde_as(as = "DisplayFromStr")]
#[serde(default = "default_batch_num_messages")]
pub batch_num_messages: usize,

#[serde(rename = "batch_size_kb")]
pub batch_size_kb: Option<usize>,
#[serde_as(as = "DisplayFromStr")]
#[serde(default = "default_batch_size_kb")]
pub batch_size_kb: usize,

#[serde(rename = "concurrent_requests")]
pub concurrent_requests: Option<usize>,
#[serde_as(as = "DisplayFromStr")]
#[serde(default = "default_concurrent_requests")]
pub concurrent_requests: usize,
}

fn default_retry_on_conflict() -> i32 {
3
}

fn default_batch_num_messages() -> usize {
512
}

fn default_batch_size_kb() -> usize {
5 * 1024
}

fn default_concurrent_requests() -> usize {
1024
}

impl ElasticSearchOpenSearchConfig {
Expand Down Expand Up @@ -136,7 +157,12 @@ pub struct ElasticSearchOpenSearchFormatter {
index: Option<String>,
}

type BuildBulkPara = (String, String, Option<Map<String, Value>>);
pub struct BuildBulkPara {
pub index: String,
pub key: String,
pub value: Option<Map<String, Value>>,
pub mem_size_b: usize,
}

impl ElasticSearchOpenSearchFormatter {
pub fn new(
Expand Down Expand Up @@ -229,11 +255,22 @@ impl ElasticSearchOpenSearchFormatter {
Op::Insert | Op::UpdateInsert => {
let key = self.key_encoder.encode(rows)?;
let value = self.value_encoder.encode(rows)?;
result_vec.push((index.to_string(), key, Some(value)));
result_vec.push(BuildBulkPara {
index: index.to_string(),
key,
value: Some(value),
mem_size_b: rows.value_estimate_size(),
});
}
Op::Delete => {
let key = self.key_encoder.encode(rows)?;
result_vec.push((index.to_string(), key, None));
let mem_size_b = std::mem::size_of_val(&key);
result_vec.push(BuildBulkPara {
index: index.to_string(),
key,
value: None,
mem_size_b,
});
}
Op::UpdateDelete => continue,
}
Expand Down
81 changes: 61 additions & 20 deletions src/connector/src/sink/elasticsearch_rust.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ use futures::prelude::TryFuture;
use futures::FutureExt;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use serde_json::Value;
use serde_json::{json, Value};
use tonic::async_trait;

use super::elasticsearch_opensearch_common::{
validate_config, ElasticSearchOpenSearchConfig, ElasticSearchOpenSearchFormatter,
validate_config, BuildBulkPara, ElasticSearchOpenSearchConfig, ElasticSearchOpenSearchFormatter,
};
use super::log_store::DeliveryFutureManagerAddFuture;
use super::writer::{
Expand Down Expand Up @@ -79,7 +79,7 @@ impl Sink for ElasticSearchSink {
self.schema.clone(),
self.pk_indices.clone(),
)?
.into_log_sinker(usize::MAX))
.into_log_sinker(self.config.concurrent_requests))
}
}

Expand All @@ -103,7 +103,11 @@ impl ElasticSearchSinkWriter {
config.index_column,
config.index.clone(),
)?;
Ok(Self { client, formatter, config})
Ok(Self {
client,
formatter,
config,
})
}
}

Expand All @@ -115,29 +119,66 @@ impl AsyncTruncateSinkWriter for ElasticSearchSinkWriter {
chunk: StreamChunk,
mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
) -> Result<()> {
let mut bulks: Vec<BulkOperation<_>> = Vec::with_capacity(chunk.capacity());
for (index, key, value) in self.formatter.covert_chunk(chunk)? {
let chunk_capacity = chunk.capacity();
let mut all_bulks: Vec<Vec<BulkOperation<_>>> = vec![];
let mut bulks: Vec<BulkOperation<_>> = Vec::with_capacity(chunk_capacity);

let mut bulks_size = 0;
for build_bulk_para in self.formatter.covert_chunk(chunk)? {
let BuildBulkPara {
key,
value,
index,
mem_size_b,
} = build_bulk_para;

bulks_size += mem_size_b;
if let Some(value) = value {
bulks.push(BulkOperation::update(key,value).index(index).retry_on_conflict(self.config.retry_on_conflict.unwrap()).into());
let value = json!({
"doc": value,
"doc_as_upsert": true
});
bulks.push(
BulkOperation::update(key, value)
.index(index)
.retry_on_conflict(self.config.retry_on_conflict)
.into(),
);
} else {
bulks.push(BulkOperation::delete(key).index(index).into());
}
if bulks.len() >= self.config.batch_num_messages
|| bulks_size >= self.config.batch_size_kb * 1024
{
all_bulks.push(bulks);
bulks = Vec::with_capacity(chunk_capacity);
bulks_size = 0;
}
}
let clent_clone = self.client.clone();
let future = async move {
let result = clent_clone.bulk(BulkParts::None).body(bulks).send().await?;
let json = result.json::<Value>().await?;
if json["errors"].as_bool().is_none() || json["errors"].as_bool().unwrap() {
Err(SinkError::ElasticSearchOpenSearch(anyhow!(
"send bulk to elasticsearch failed: {:?}",
json
)))
} else {
Ok(())
if !bulks.is_empty() {
all_bulks.push(bulks);
}
for bulks in all_bulks {
let client_clone = self.client.clone();
let future = async move {
let result = client_clone
.bulk(BulkParts::None)
.body(bulks)
.send()
.await?;
let json = result.json::<Value>().await?;
if json["errors"].as_bool().is_none() || json["errors"].as_bool().unwrap() {
Err(SinkError::ElasticSearchOpenSearch(anyhow!(
"send bulk to elasticsearch failed: {:?}",
json
)))
} else {
Ok(())
}
}
.boxed();
add_future.add_future_may_await(future).await?;
}
.boxed();
add_future.add_future_may_await(future).await?;
Ok(())
}
}
Loading

0 comments on commit 88264e5

Please sign in to comment.