From a663b6958e71e6635cad1ad70cc9ceea8fcbbffe Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Mon, 23 Sep 2024 15:17:19 +0800 Subject: [PATCH] fix comm --- src/common/src/row/mod.rs | 6 ++ src/connector/src/sink/elasticsearch.rs | 97 ++++++------------- .../sink/elasticsearch_opensearch_common.rs | 57 +++++++++-- src/connector/src/sink/elasticsearch_rust.rs | 81 ++++++++++++---- src/connector/src/sink/opensearch.rs | 87 ++++++++++++----- 5 files changed, 207 insertions(+), 121 deletions(-) diff --git a/src/common/src/row/mod.rs b/src/common/src/row/mod.rs index 8114b96cb37e5..0b2181105352b 100644 --- a/src/common/src/row/mod.rs +++ b/src/common/src/row/mod.rs @@ -95,6 +95,12 @@ pub trait Row: Sized + std::fmt::Debug + PartialEq + Eq { buf.freeze() } + fn value_estimate_size(&self) -> usize { + self.iter() + .map(value_encoding::estimate_serialize_datum_size) + .sum() + } + /// Serializes the row with memcomparable encoding, into the given `buf`. As each datum may have /// different order type, a `serde` should be provided. #[inline] diff --git a/src/connector/src/sink/elasticsearch.rs b/src/connector/src/sink/elasticsearch.rs index a5bb30f2590c2..7a3b61e0b4c06 100644 --- a/src/connector/src/sink/elasticsearch.rs +++ b/src/connector/src/sink/elasticsearch.rs @@ -16,18 +16,18 @@ use std::collections::BTreeMap; use anyhow::anyhow; use risingwave_common::array::{ - ArrayBuilder, ArrayImpl, JsonbArrayBuilder, RowRef, StreamChunk, Utf8ArrayBuilder, + ArrayBuilder, ArrayImpl, JsonbArrayBuilder, StreamChunk, Utf8ArrayBuilder, }; use risingwave_common::catalog::Schema; -use risingwave_common::row::Row; -use risingwave_common::types::{JsonbVal, Scalar, ToText}; +use risingwave_common::types::{JsonbVal, Scalar}; use serde_json::Value; -use super::encoder::{JsonEncoder, RowEncoder}; +use super::elasticsearch_opensearch_common::{BuildBulkPara, ElasticSearchOpenSearchFormatter}; use super::remote::{ElasticSearchSink, OpenSearchSink}; use crate::sink::{Result, Sink}; pub const ES_OPTION_DELIMITER: &str = "delimiter"; pub const ES_OPTION_INDEX_COLUMN: &str = "index_column"; +pub const ES_OPTION_INDEX: &str = "index"; pub enum StreamChunkConverter { Es(EsStreamChunkConverter), @@ -52,11 +52,13 @@ impl StreamChunkConverter { .ok_or_else(|| anyhow!("Cannot find {}", ES_OPTION_INDEX_COLUMN)) }) .transpose()?; + let index = properties.get(ES_OPTION_INDEX).cloned(); Ok(StreamChunkConverter::Es(EsStreamChunkConverter::new( schema, pk_indices.clone(), properties.get(ES_OPTION_DELIMITER).cloned(), index_column, + index, )?)) } else { Ok(StreamChunkConverter::Other) @@ -71,9 +73,7 @@ impl StreamChunkConverter { } } pub struct EsStreamChunkConverter { - json_encoder: JsonEncoder, - fn_build_id: Box) -> Result + Send>, - index_column: Option, + formatter: ElasticSearchOpenSearchFormatter, } impl EsStreamChunkConverter { pub fn new( @@ -81,77 +81,40 @@ impl EsStreamChunkConverter { pk_indices: Vec, delimiter: Option, index_column: Option, + index: Option, ) -> Result { - let fn_build_id: Box) -> Result + Send> = if pk_indices.is_empty() - { - Box::new(|row: RowRef<'_>| { - Ok(row - .datum_at(0) - .ok_or_else(|| anyhow!("No value found in row, index is 0"))? - .to_text()) - }) - } else if pk_indices.len() == 1 { - let index = *pk_indices.get(0).unwrap(); - Box::new(move |row: RowRef<'_>| { - Ok(row - .datum_at(index) - .ok_or_else(|| anyhow!("No value found in row, index is 0"))? - .to_text()) - }) - } else { - let delimiter = delimiter - .as_ref() - .ok_or_else(|| anyhow!("Please set delimiter in with option"))? - .clone(); - Box::new(move |row: RowRef<'_>| { - let mut keys = vec![]; - for index in &pk_indices { - keys.push( - row.datum_at(*index) - .ok_or_else(|| anyhow!("No value found in row, index is {}", index))? - .to_text(), - ); - } - Ok(keys.join(&delimiter)) - }) - }; - let col_indices = if let Some(index) = index_column { - let mut col_indices: Vec = (0..schema.len()).collect(); - col_indices.remove(index); - Some(col_indices) - } else { - None - }; - let json_encoder = JsonEncoder::new_with_es(schema, col_indices); - Ok(Self { - json_encoder, - fn_build_id, + let formatter = ElasticSearchOpenSearchFormatter::new( + pk_indices, + &schema, + delimiter, index_column, - }) + index, + )?; + Ok(Self { formatter }) } fn convert_chunk(&self, chunk: StreamChunk) -> Result { - let mut ops = vec![]; + let mut ops = Vec::with_capacity(chunk.capacity()); let mut id_string_builder = ::new(chunk.capacity()); let mut json_builder = ::new(chunk.capacity()); let mut index_builder = ::new(chunk.capacity()); - for (op, row) in chunk.rows() { - ops.push(op); - id_string_builder.append(Some(&self.build_id(row)?)); - if let Some(index) = self.index_column { - index_builder.append(Some( - row.datum_at(index) - .ok_or_else(|| anyhow!("No value found in row, index is {}", index))? - .into_utf8(), - )); + for build_bulk_para in self.formatter.covert_chunk(chunk)? { + let BuildBulkPara { + key, value, index, .. + } = build_bulk_para; + + id_string_builder.append(Some(&key)); + index_builder.append(Some(&index)); + if value.is_some() { + ops.push(risingwave_common::array::Op::Insert); } else { - index_builder.append_null(); + ops.push(risingwave_common::array::Op::Delete); } - let json = JsonbVal::from(Value::Object(self.json_encoder.encode(row)?)); - json_builder.append(Some(json.as_scalar_ref())); + let value = value.map(|json| JsonbVal::from(Value::Object(json))); + json_builder.append(value.as_ref().map(|json| json.as_scalar_ref())); } let json_array = risingwave_common::array::ArrayBuilder::finish(json_builder); let id_string_array = risingwave_common::array::ArrayBuilder::finish(id_string_builder); @@ -165,10 +128,6 @@ impl EsStreamChunkConverter { ], )) } - - fn build_id(&self, row: RowRef<'_>) -> Result { - (self.fn_build_id)(row) - } } pub fn is_es_sink(sink_name: &str) -> bool { diff --git a/src/connector/src/sink/elasticsearch_opensearch_common.rs b/src/connector/src/sink/elasticsearch_opensearch_common.rs index 1c0f2ad4a8613..30e941cf473ae 100644 --- a/src/connector/src/sink/elasticsearch_opensearch_common.rs +++ b/src/connector/src/sink/elasticsearch_opensearch_common.rs @@ -54,20 +54,41 @@ pub struct ElasticSearchOpenSearchConfig { #[serde_as(as = "Option")] pub index_column: Option, - // #[serde(rename = "max_task_num")] - // #[serde_as(as = "Option")] - // pub max_task_num: Option, #[serde(rename = "retry_on_conflict")] - pub retry_on_conflict: Option, + #[serde_as(as = "DisplayFromStr")] + #[serde(default = "default_retry_on_conflict")] + pub retry_on_conflict: i32, #[serde(rename = "batch_num_messages")] - pub batch_num_messages: Option, + #[serde_as(as = "DisplayFromStr")] + #[serde(default = "default_batch_num_messages")] + pub batch_num_messages: usize, #[serde(rename = "batch_size_kb")] - pub batch_size_kb: Option, + #[serde_as(as = "DisplayFromStr")] + #[serde(default = "default_batch_size_kb")] + pub batch_size_kb: usize, #[serde(rename = "concurrent_requests")] - pub concurrent_requests: Option, + #[serde_as(as = "DisplayFromStr")] + #[serde(default = "default_concurrent_requests")] + pub concurrent_requests: usize, +} + +fn default_retry_on_conflict() -> i32 { + 3 +} + +fn default_batch_num_messages() -> usize { + 512 +} + +fn default_batch_size_kb() -> usize { + 5 * 1024 +} + +fn default_concurrent_requests() -> usize { + 1024 } impl ElasticSearchOpenSearchConfig { @@ -136,7 +157,12 @@ pub struct ElasticSearchOpenSearchFormatter { index: Option, } -type BuildBulkPara = (String, String, Option>); +pub struct BuildBulkPara { + pub index: String, + pub key: String, + pub value: Option>, + pub mem_size_b: usize, +} impl ElasticSearchOpenSearchFormatter { pub fn new( @@ -229,11 +255,22 @@ impl ElasticSearchOpenSearchFormatter { Op::Insert | Op::UpdateInsert => { let key = self.key_encoder.encode(rows)?; let value = self.value_encoder.encode(rows)?; - result_vec.push((index.to_string(), key, Some(value))); + result_vec.push(BuildBulkPara { + index: index.to_string(), + key, + value: Some(value), + mem_size_b: rows.value_estimate_size(), + }); } Op::Delete => { let key = self.key_encoder.encode(rows)?; - result_vec.push((index.to_string(), key, None)); + let mem_size_b = std::mem::size_of_val(&key); + result_vec.push(BuildBulkPara { + index: index.to_string(), + key, + value: None, + mem_size_b, + }); } Op::UpdateDelete => continue, } diff --git a/src/connector/src/sink/elasticsearch_rust.rs b/src/connector/src/sink/elasticsearch_rust.rs index 8863145cea2ff..7b8896116dcb7 100644 --- a/src/connector/src/sink/elasticsearch_rust.rs +++ b/src/connector/src/sink/elasticsearch_rust.rs @@ -20,11 +20,11 @@ use futures::prelude::TryFuture; use futures::FutureExt; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; -use serde_json::Value; +use serde_json::{json, Value}; use tonic::async_trait; use super::elasticsearch_opensearch_common::{ - validate_config, ElasticSearchOpenSearchConfig, ElasticSearchOpenSearchFormatter, + validate_config, BuildBulkPara, ElasticSearchOpenSearchConfig, ElasticSearchOpenSearchFormatter, }; use super::log_store::DeliveryFutureManagerAddFuture; use super::writer::{ @@ -79,7 +79,7 @@ impl Sink for ElasticSearchSink { self.schema.clone(), self.pk_indices.clone(), )? - .into_log_sinker(usize::MAX)) + .into_log_sinker(self.config.concurrent_requests)) } } @@ -103,7 +103,11 @@ impl ElasticSearchSinkWriter { config.index_column, config.index.clone(), )?; - Ok(Self { client, formatter, config}) + Ok(Self { + client, + formatter, + config, + }) } } @@ -115,29 +119,66 @@ impl AsyncTruncateSinkWriter for ElasticSearchSinkWriter { chunk: StreamChunk, mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, ) -> Result<()> { - let mut bulks: Vec> = Vec::with_capacity(chunk.capacity()); - for (index, key, value) in self.formatter.covert_chunk(chunk)? { + let chunk_capacity = chunk.capacity(); + let mut all_bulks: Vec>> = vec![]; + let mut bulks: Vec> = Vec::with_capacity(chunk_capacity); + + let mut bulks_size = 0; + for build_bulk_para in self.formatter.covert_chunk(chunk)? { + let BuildBulkPara { + key, + value, + index, + mem_size_b, + } = build_bulk_para; + + bulks_size += mem_size_b; if let Some(value) = value { - bulks.push(BulkOperation::update(key,value).index(index).retry_on_conflict(self.config.retry_on_conflict.unwrap()).into()); + let value = json!({ + "doc": value, + "doc_as_upsert": true + }); + bulks.push( + BulkOperation::update(key, value) + .index(index) + .retry_on_conflict(self.config.retry_on_conflict) + .into(), + ); } else { bulks.push(BulkOperation::delete(key).index(index).into()); } + if bulks.len() >= self.config.batch_num_messages + || bulks_size >= self.config.batch_size_kb * 1024 + { + all_bulks.push(bulks); + bulks = Vec::with_capacity(chunk_capacity); + bulks_size = 0; + } } - let clent_clone = self.client.clone(); - let future = async move { - let result = clent_clone.bulk(BulkParts::None).body(bulks).send().await?; - let json = result.json::().await?; - if json["errors"].as_bool().is_none() || json["errors"].as_bool().unwrap() { - Err(SinkError::ElasticSearchOpenSearch(anyhow!( - "send bulk to elasticsearch failed: {:?}", - json - ))) - } else { - Ok(()) + if !bulks.is_empty() { + all_bulks.push(bulks); + } + for bulks in all_bulks { + let client_clone = self.client.clone(); + let future = async move { + let result = client_clone + .bulk(BulkParts::None) + .body(bulks) + .send() + .await?; + let json = result.json::().await?; + if json["errors"].as_bool().is_none() || json["errors"].as_bool().unwrap() { + Err(SinkError::ElasticSearchOpenSearch(anyhow!( + "send bulk to elasticsearch failed: {:?}", + json + ))) + } else { + Ok(()) + } } + .boxed(); + add_future.add_future_may_await(future).await?; } - .boxed(); - add_future.add_future_may_await(future).await?; Ok(()) } } diff --git a/src/connector/src/sink/opensearch.rs b/src/connector/src/sink/opensearch.rs index 437189fb82405..cdee195dc21a5 100644 --- a/src/connector/src/sink/opensearch.rs +++ b/src/connector/src/sink/opensearch.rs @@ -19,11 +19,11 @@ use futures::FutureExt; use opensearch::{BulkOperation, BulkParts, OpenSearch}; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; -use serde_json::Value; +use serde_json::{json, Value}; use tonic::async_trait; use super::elasticsearch_opensearch_common::{ - validate_config, ElasticSearchOpenSearchConfig, ElasticSearchOpenSearchFormatter, + validate_config, BuildBulkPara, ElasticSearchOpenSearchConfig, ElasticSearchOpenSearchFormatter, }; use super::log_store::DeliveryFutureManagerAddFuture; use super::writer::{ @@ -78,13 +78,14 @@ impl Sink for OpenSearchSink { self.schema.clone(), self.pk_indices.clone(), )? - .into_log_sinker(usize::MAX)) + .into_log_sinker(self.config.concurrent_requests)) } } pub struct OpenSearchSinkWriter { client: Arc, formatter: ElasticSearchOpenSearchFormatter, + config: ElasticSearchOpenSearchConfig, } impl OpenSearchSinkWriter { @@ -97,11 +98,15 @@ impl OpenSearchSinkWriter { let formatter = ElasticSearchOpenSearchFormatter::new( pk_indices, &schema, - config.delimiter, + config.delimiter.clone(), config.index_column, - config.index, + config.index.clone(), )?; - Ok(Self { client, formatter }) + Ok(Self { + client, + formatter, + config, + }) } } @@ -113,29 +118,67 @@ impl AsyncTruncateSinkWriter for OpenSearchSinkWriter { chunk: StreamChunk, mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, ) -> Result<()> { - let mut bulks: Vec> = Vec::with_capacity(chunk.capacity()); - for (index, key, value) in self.formatter.covert_chunk(chunk)? { + let chunk_capacity = chunk.capacity(); + let mut all_bulks: Vec>> = vec![]; + let mut bulks: Vec> = Vec::with_capacity(chunk_capacity); + + let mut bulks_size = 0; + for build_bulk_para in self.formatter.covert_chunk(chunk)? { + let BuildBulkPara { + key, + value, + index, + mem_size_b, + } = build_bulk_para; + + bulks_size += mem_size_b; if let Some(value) = value { - bulks.push(BulkOperation::index(value).index(index).id(key).into()); + let value = json!({ + "doc": value, + "doc_as_upsert": true + }); + bulks.push( + BulkOperation::update(key, value) + .index(index) + .retry_on_conflict(self.config.retry_on_conflict) + .into(), + ); } else { bulks.push(BulkOperation::delete(key).index(index).into()); } + if bulks.len() >= self.config.batch_num_messages + || bulks_size >= self.config.batch_size_kb * 1024 + { + println!("bulks_size: {}", bulks_size); + all_bulks.push(bulks); + bulks = Vec::with_capacity(chunk_capacity); + bulks_size = 0; + } } - let clent_clone = self.client.clone(); - let future = async move { - let result = clent_clone.bulk(BulkParts::None).body(bulks).send().await?; - let json = result.json::().await?; - if json["errors"].as_bool().is_none() || json["errors"].as_bool().unwrap() { - Err(SinkError::ElasticSearchOpenSearch(anyhow!( - "send bulk to opensearch failed: {:?}", - json - ))) - } else { - Ok(()) + if !bulks.is_empty() { + all_bulks.push(bulks); + } + for bulks in all_bulks { + let client_clone = self.client.clone(); + let future = async move { + let result = client_clone + .bulk(BulkParts::None) + .body(bulks) + .send() + .await?; + let json = result.json::().await?; + if json["errors"].as_bool().is_none() || json["errors"].as_bool().unwrap() { + Err(SinkError::ElasticSearchOpenSearch(anyhow!( + "send bulk to elasticsearch failed: {:?}", + json + ))) + } else { + Ok(()) + } } + .boxed(); + add_future.add_future_may_await(future).await?; } - .boxed(); - add_future.add_future_may_await(future).await?; Ok(()) } }