diff --git a/Cargo.lock b/Cargo.lock index 2835cd00fcc34..01d0a064a8be5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1943,6 +1943,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4c7f02d4ea65f2c1853089ffd8d2787bdbc63de2f0d29dedbcf8ccdfa0ccd4cf" +[[package]] +name = "base64" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b41b7ea54a0c9d92199de89e20e58d49f02f8e699814ef3fdf266f6f748d15c7" + [[package]] name = "base64" version = "0.13.1" @@ -4261,6 +4267,26 @@ dependencies = [ "serde", ] +[[package]] +name = "elasticsearch" +version = "8.5.0-alpha.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40d9bd57d914cc66ce878f098f63ed7b5d5b64c30644a5adb950b008f874a6c6" +dependencies = [ + "base64 0.11.0", + "bytes", + "dyn-clone", + "lazy_static", + "percent-encoding", + "reqwest 0.11.20", + "rustc_version 0.2.3", + "serde", + "serde_json", + "serde_with 1.14.0", + "url", + "void", +] + [[package]] name = "elliptic-curve" version = "0.12.3" @@ -8053,6 +8079,26 @@ dependencies = [ "url", ] +[[package]] +name = "opensearch" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd2846759315751e04d8b45a0bdbd89ce442282ffb916cf54f6b0adf8df4b44c" +dependencies = [ + "base64 0.21.7", + "bytes", + "dyn-clone", + "lazy_static", + "percent-encoding", + "reqwest 0.11.20", + "rustc_version 0.4.0", + "serde", + "serde_json", + "serde_with 3.8.0", + "url", + "void", +] + [[package]] name = "openssl" version = "0.10.66" @@ -9885,6 +9931,7 @@ version = "0.11.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3e9ad3fe7488d7e34558a2033d45a0c90b72d97b4f80705666fea71472e2e6a1" dependencies = [ + "async-compression", "base64 0.21.7", "bytes", "encoding_rs", @@ -10662,6 +10709,7 @@ dependencies = [ "deltalake", "duration-str", "easy-ext", + "elasticsearch", "enum-as-inner 0.6.0", "expect-test", "fs-err", @@ -10694,6 +10742,7 @@ dependencies = [ "nexmark", "num-bigint", "opendal 0.49.2", + "opensearch", "openssl", "parking_lot 0.12.1", "parquet 53.0.0", @@ -15092,6 +15141,12 @@ version = "0.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9dcc60c0624df774c82a0ef104151231d37da4962957d691c011c852b2473314" +[[package]] +name = "void" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" + [[package]] name = "vsimd" version = "0.8.0" diff --git a/e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.result b/e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.result index 3e03f5a511c0a..6a979eaa4f8b7 100644 --- a/e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.result +++ b/e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.result @@ -1 +1 @@ -{"took":3,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"test1","_type":"_doc","_id":"5_5-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":5,"st2":5},"t":"20:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":5,"v2":2,"v3":"5-2"}},{"_index":"test1","_type":"_doc","_id":"2_2-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":2,"st2":2},"t":"00:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":2,"v2":2,"v3":"2-2"}},{"_index":"test1","_type":"_doc","_id":"13_13-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":13,"st2":13},"t":"20:00:00.123456","ts":"1970-01-01 20:00:00.123456","tz":"1970-01-01 20:00:00.123456","v1":13,"v2":2,"v3":"13-2"}},{"_index":"test1","_type":"_doc","_id":"3_3-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":3,"st2":3},"t":"00:00:00.123456","ts":"1970-01-01 00:00:00.123456","tz":"1970-01-01 00:00:00.123456","v1":3,"v2":2,"v3":"3-2"}},{"_index":"test1","_type":"_doc","_id":"8_8-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":8,"st2":8},"t":"20:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":8,"v2":2,"v3":"8-2"}},{"_index":"test1","_type":"_doc","_id":"1_1-50","_score":1.0,"_source":{"d":"2000-01-01","st":{"st1":1,"st2":1},"t":"00:00:00.123456","ts":"2000-01-01 00:00:00.123456","tz":"2000-01-01 00:00:00.123456","v1":1,"v2":50,"v3":"1-50"}}]}} \ No newline at end of file +{"took":3,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":7,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"test1","_type":"_doc","_id":"5_5-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":5,"st2":5},"t":"20:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":5,"v2":2,"v3":"5-2"}},{"_index":"test1","_type":"_doc","_id":"8_8-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":8,"st2":8},"t":"20:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":8,"v2":2,"v3":"8-2"}},{"_index":"test1","_type":"_doc","_id":"3_3-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":3,"st2":3},"t":"00:00:00.123456","ts":"1970-01-01 00:00:00.123456","tz":"1970-01-01 00:00:00.123456","v1":3,"v2":2,"v3":"3-2"}},{"_index":"test1","_type":"_doc","_id":"13_13-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":13,"st2":13},"t":"20:00:00.123456","ts":"1970-01-01 20:00:00.123456","tz":"1970-01-01 20:00:00.123456","v1":13,"v2":2,"v3":"13-2"}},{"_index":"test1","_type":"_doc","_id":"2_2-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":2,"st2":2},"t":"00:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":2,"v2":2,"v3":"2-2"}},{"_index":"test1","_type":"_doc","_id":"1_1-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":1,"st2":1},"t":"00:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":1,"v2":2,"v3":"1-2"}},{"_index":"test1","_type":"_doc","_id":"1_1-50","_score":1.0,"_source":{"d":"2000-01-01","st":{"st1":1,"st2":1},"t":"00:00:00.123456","ts":"2000-01-01 00:00:00.123456","tz":"2000-01-01 00:00:00.123456","v1":1,"v2":50,"v3":"1-50"}}]}} \ No newline at end of file diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java index 73f0799e44d1d..4d274a2ef20b6 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java @@ -39,8 +39,8 @@ public static SinkFactory getSinkFactory(String sinkName) { return new FileSinkFactory(); case "jdbc": return new JDBCSinkFactory(); - case "elasticsearch": - case "opensearch": + case "elasticsearch_v1": + case "opensearch_v1": return new EsSinkFactory(); case "cassandra": return new CassandraFactory(); diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java index d2873fac9d216..1c4392d0b7afd 100644 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java +++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java @@ -59,7 +59,7 @@ public void testEsSink(ElasticsearchContainer container, String username, String .withDelimiter("$") .withUsername(username) .withPassword(password); - config.setConnector("elasticsearch"); + config.setConnector("elasticsearch_v1"); EsSink sink = new EsSink(config, getTestTableSchema()); sink.write( Iterators.forArray( diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java index 5f9c574cde6a2..ab86b6c86463c 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java @@ -152,12 +152,12 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) { this.config = config; this.requestTracker = new RequestTracker(); // ApiCompatibilityMode is enabled to ensure the client can talk to newer version es sever. - if (config.getConnector().equals("elasticsearch")) { + if (config.getConnector().equals("elasticsearch_v1")) { ElasticRestHighLevelClientAdapter client = new ElasticRestHighLevelClientAdapter(host, config); this.bulkProcessor = new ElasticBulkProcessorAdapter(this.requestTracker, client, config); - } else if (config.getConnector().equals("opensearch")) { + } else if (config.getConnector().equals("opensearch_v1")) { OpensearchRestHighLevelClientAdapter client = new OpensearchRestHighLevelClientAdapter(host, config); this.bulkProcessor = diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java index 5108f4eab4787..f601101b21aec 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java @@ -74,7 +74,7 @@ public void validate( // 2. check connection try { - if (config.getConnector().equals("elasticsearch")) { + if (config.getConnector().equals("elasticsearch_v1")) { ElasticRestHighLevelClientAdapter esClient = new ElasticRestHighLevelClientAdapter(host, config); if (!esClient.ping(org.elasticsearch.client.RequestOptions.DEFAULT)) { @@ -83,7 +83,7 @@ public void validate( .asRuntimeException(); } esClient.close(); - } else if (config.getConnector().equals("opensearch")) { + } else if (config.getConnector().equals("opensearch_v1")) { OpensearchRestHighLevelClientAdapter opensearchClient = new OpensearchRestHighLevelClientAdapter(host, config); if (!opensearchClient.ping(org.opensearch.client.RequestOptions.DEFAULT)) { diff --git a/src/common/src/row/mod.rs b/src/common/src/row/mod.rs index 8114b96cb37e5..0b2181105352b 100644 --- a/src/common/src/row/mod.rs +++ b/src/common/src/row/mod.rs @@ -95,6 +95,12 @@ pub trait Row: Sized + std::fmt::Debug + PartialEq + Eq { buf.freeze() } + fn value_estimate_size(&self) -> usize { + self.iter() + .map(value_encoding::estimate_serialize_datum_size) + .sum() + } + /// Serializes the row with memcomparable encoding, into the given `buf`. As each datum may have /// different order type, a `serde` should be provided. #[inline] diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 4b3e75188d26f..90433e41c8092 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -50,6 +50,7 @@ csv = "1.3" deltalake = { workspace = true } duration-str = "0.11.2" easy-ext = "1" +elasticsearch = { version = "8.5.0-alpha.1", features = ["rustls-tls"] } enum-as-inner = "0.6" futures = { version = "0.3", default-features = false, features = ["alloc"] } futures-async-stream = { workspace = true } @@ -89,6 +90,7 @@ opendal = { workspace = true, features = [ "services-s3", "services-webhdfs", ] } +opensearch = { version = "2.2.0", features = ["rustls-tls"] } openssl = "0.10" parking_lot = { workspace = true } parquet = { workspace = true } diff --git a/src/connector/src/sink/elasticsearch_opensearch/elasticsearch.rs b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch.rs new file mode 100644 index 0000000000000..bb0c93f8f774f --- /dev/null +++ b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch.rs @@ -0,0 +1,70 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::catalog::Schema; +use tonic::async_trait; + +use super::super::writer::{AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriterExt}; +use super::super::{DummySinkCommitCoordinator, Sink, SinkError, SinkParam, SinkWriterParam}; +use super::elasticsearch_opensearch_client::ElasticSearchOpenSearchSinkWriter; +use super::elasticsearch_opensearch_config::ElasticSearchOpenSearchConfig; +use crate::sink::Result; + +pub const ES_SINK: &str = "elasticsearch"; + +#[derive(Debug)] +pub struct ElasticSearchSink { + config: ElasticSearchOpenSearchConfig, + schema: Schema, + pk_indices: Vec, +} + +#[async_trait] +impl TryFrom for ElasticSearchSink { + type Error = SinkError; + + fn try_from(param: SinkParam) -> std::result::Result { + let schema = param.schema(); + let config = ElasticSearchOpenSearchConfig::from_btreemap(param.properties)?; + Ok(Self { + config, + schema, + pk_indices: param.downstream_pk, + }) + } +} + +impl Sink for ElasticSearchSink { + type Coordinator = DummySinkCommitCoordinator; + type LogSinker = AsyncTruncateLogSinkerOf; + + const SINK_NAME: &'static str = ES_SINK; + + async fn validate(&self) -> Result<()> { + self.config.validate_config(&self.schema)?; + let client = self.config.build_client(Self::SINK_NAME)?; + client.ping().await?; + Ok(()) + } + + async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result { + Ok(ElasticSearchOpenSearchSinkWriter::new( + self.config.clone(), + self.schema.clone(), + self.pk_indices.clone(), + Self::SINK_NAME, + )? + .into_log_sinker(self.config.concurrent_requests)) + } +} diff --git a/src/connector/src/sink/elasticsearch.rs b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_converter.rs similarity index 53% rename from src/connector/src/sink/elasticsearch.rs rename to src/connector/src/sink/elasticsearch_opensearch/elasticsearch_converter.rs index bdcd2e515bbb2..195c976ba2b76 100644 --- a/src/connector/src/sink/elasticsearch.rs +++ b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_converter.rs @@ -16,19 +16,17 @@ use std::collections::BTreeMap; use anyhow::anyhow; use risingwave_common::array::{ - ArrayBuilder, ArrayImpl, JsonbArrayBuilder, RowRef, StreamChunk, Utf8ArrayBuilder, + ArrayBuilder, ArrayImpl, JsonbArrayBuilder, StreamChunk, Utf8ArrayBuilder, }; use risingwave_common::catalog::Schema; -use risingwave_common::row::Row; -use risingwave_common::types::{JsonbVal, Scalar, ToText}; +use risingwave_common::types::{JsonbVal, Scalar}; use serde_json::Value; -use super::encoder::{JsonEncoder, RowEncoder}; -use super::remote::{ElasticSearchSink, OpenSearchSink}; -use crate::sink::{Result, Sink}; -pub const ES_OPTION_DELIMITER: &str = "delimiter"; -pub const ES_OPTION_INDEX_COLUMN: &str = "index_column"; -pub const ES_OPTION_ROUTING_COLUMN: &str = "routing_column"; +use super::elasticsearch_opensearch_config::{ + ES_OPTION_DELIMITER, ES_OPTION_INDEX, ES_OPTION_INDEX_COLUMN, ES_OPTION_ROUTING_COLUMN, +}; +use super::elasticsearch_opensearch_formatter::{BuildBulkPara, ElasticSearchOpenSearchFormatter}; +use crate::sink::Result; pub enum StreamChunkConverter { Es(EsStreamChunkConverter), @@ -41,7 +39,7 @@ impl StreamChunkConverter { pk_indices: &Vec, properties: &BTreeMap, ) -> Result { - if is_es_sink(sink_name) { + if is_remote_es_sink(sink_name) { let index_column = properties .get(ES_OPTION_INDEX_COLUMN) .cloned() @@ -53,6 +51,7 @@ impl StreamChunkConverter { .ok_or_else(|| anyhow!("Cannot find {}", ES_OPTION_INDEX_COLUMN)) }) .transpose()?; + let index = properties.get(ES_OPTION_INDEX).cloned(); let routing_column = properties .get(ES_OPTION_ROUTING_COLUMN) .cloned() @@ -69,6 +68,7 @@ impl StreamChunkConverter { pk_indices.clone(), properties.get(ES_OPTION_DELIMITER).cloned(), index_column, + index, routing_column, )?)) } else { @@ -84,70 +84,30 @@ impl StreamChunkConverter { } } pub struct EsStreamChunkConverter { - json_encoder: JsonEncoder, - fn_build_id: Box) -> Result + Send>, - index_column: Option, - routing_column: Option, + formatter: ElasticSearchOpenSearchFormatter, } impl EsStreamChunkConverter { - fn new( + pub fn new( schema: Schema, pk_indices: Vec, delimiter: Option, index_column: Option, + index: Option, routing_column: Option, ) -> Result { - let fn_build_id: Box) -> Result + Send> = if pk_indices.is_empty() - { - Box::new(|row: RowRef<'_>| { - Ok(row - .datum_at(0) - .ok_or_else(|| anyhow!("No value found in row, index is 0"))? - .to_text()) - }) - } else if pk_indices.len() == 1 { - let index = *pk_indices.get(0).unwrap(); - Box::new(move |row: RowRef<'_>| { - Ok(row - .datum_at(index) - .ok_or_else(|| anyhow!("No value found in row, index is 0"))? - .to_text()) - }) - } else { - let delimiter = delimiter - .as_ref() - .ok_or_else(|| anyhow!("Please set delimiter in with option"))? - .clone(); - Box::new(move |row: RowRef<'_>| { - let mut keys = vec![]; - for index in &pk_indices { - keys.push( - row.datum_at(*index) - .ok_or_else(|| anyhow!("No value found in row, index is {}", index))? - .to_text(), - ); - } - Ok(keys.join(&delimiter)) - }) - }; - let col_indices = if let Some(index) = index_column { - let mut col_indices: Vec = (0..schema.len()).collect(); - col_indices.remove(index); - Some(col_indices) - } else { - None - }; - let json_encoder = JsonEncoder::new_with_es(schema, col_indices); - Ok(Self { - json_encoder, - fn_build_id, + let formatter = ElasticSearchOpenSearchFormatter::new( + pk_indices, + &schema, + delimiter, index_column, + index, routing_column, - }) + )?; + Ok(Self { formatter }) } fn convert_chunk(&self, chunk: StreamChunk) -> Result { - let mut ops = vec![]; + let mut ops = Vec::with_capacity(chunk.capacity()); let mut id_string_builder = ::new(chunk.capacity()); let mut json_builder = @@ -156,29 +116,25 @@ impl EsStreamChunkConverter { ::new(chunk.capacity()); let mut routing_builder = ::new(chunk.capacity()); - for (op, row) in chunk.rows() { - ops.push(op); - id_string_builder.append(Some(&self.build_id(row)?)); - if let Some(index) = self.index_column { - index_builder.append(Some( - row.datum_at(index) - .ok_or_else(|| anyhow!("No value found in row, index is {}", index))? - .into_utf8(), - )); - } else { - index_builder.append_null(); - } - if let Some(index) = self.routing_column { - routing_builder.append(Some( - row.datum_at(index) - .ok_or_else(|| anyhow!("No value found in row, index is {}", index))? - .into_utf8(), - )); + for build_bulk_para in self.formatter.convert_chunk(chunk)? { + let BuildBulkPara { + key, + value, + index, + routing_column, + .. + } = build_bulk_para; + + id_string_builder.append(Some(&key)); + index_builder.append(Some(&index)); + routing_builder.append(routing_column.as_deref()); + if value.is_some() { + ops.push(risingwave_common::array::Op::Insert); } else { - routing_builder.append_null(); + ops.push(risingwave_common::array::Op::Delete); } - let json = JsonbVal::from(Value::Object(self.json_encoder.encode(row)?)); - json_builder.append(Some(json.as_scalar_ref())); + let value = value.map(|json| JsonbVal::from(Value::Object(json))); + json_builder.append(value.as_ref().map(|json| json.as_scalar_ref())); } let json_array = risingwave_common::array::ArrayBuilder::finish(json_builder); let id_string_array = risingwave_common::array::ArrayBuilder::finish(id_string_builder); @@ -194,12 +150,9 @@ impl EsStreamChunkConverter { ], )) } - - fn build_id(&self, row: RowRef<'_>) -> Result { - (self.fn_build_id)(row) - } } -pub fn is_es_sink(sink_name: &str) -> bool { - sink_name == ElasticSearchSink::SINK_NAME || sink_name == OpenSearchSink::SINK_NAME +pub fn is_remote_es_sink(_sink_name: &str) -> bool { + // sink_name == ElasticSearchJavaSink::SINK_NAME || sink_name == OpenSearchJavaSink::SINK_NAME + false } diff --git a/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_client.rs b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_client.rs new file mode 100644 index 0000000000000..b8c06eaadbf6a --- /dev/null +++ b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_client.rs @@ -0,0 +1,262 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use anyhow::anyhow; +use futures::{FutureExt, TryFuture}; +use itertools::Itertools; +use risingwave_common::array::StreamChunk; +use risingwave_common::catalog::Schema; +use serde_json::{json, Value}; + +use super::super::SinkError; +use super::elasticsearch_opensearch_config::ElasticSearchOpenSearchConfig; +use super::elasticsearch_opensearch_formatter::{BuildBulkPara, ElasticSearchOpenSearchFormatter}; +use crate::sink::log_store::DeliveryFutureManagerAddFuture; +use crate::sink::writer::AsyncTruncateSinkWriter; +use crate::sink::Result; + +pub enum ElasticSearchOpenSearchClient { + ElasticSearch(elasticsearch::Elasticsearch), + OpenSearch(opensearch::OpenSearch), +} +enum ElasticSearchOpenSearchBulk { + ElasticSearch(elasticsearch::BulkOperation), + OpenSearch(opensearch::BulkOperation), +} + +impl ElasticSearchOpenSearchBulk { + pub fn into_elasticsearch_bulk(self) -> elasticsearch::BulkOperation { + if let ElasticSearchOpenSearchBulk::ElasticSearch(bulk) = self { + bulk + } else { + panic!("not a elasticsearch bulk") + } + } + + pub fn into_opensearch_bulk(self) -> opensearch::BulkOperation { + if let ElasticSearchOpenSearchBulk::OpenSearch(bulk) = self { + bulk + } else { + panic!("not a opensearch bulk") + } + } +} + +impl ElasticSearchOpenSearchClient { + async fn send(&self, bulks: Vec) -> Result { + match self { + ElasticSearchOpenSearchClient::ElasticSearch(client) => { + let bulks = bulks + .into_iter() + .map(ElasticSearchOpenSearchBulk::into_elasticsearch_bulk) + .collect_vec(); + let result = client + .bulk(elasticsearch::BulkParts::None) + .body(bulks) + .send() + .await?; + Ok(result.json::().await?) + } + ElasticSearchOpenSearchClient::OpenSearch(client) => { + let bulks = bulks + .into_iter() + .map(ElasticSearchOpenSearchBulk::into_opensearch_bulk) + .collect_vec(); + let result = client + .bulk(opensearch::BulkParts::None) + .body(bulks) + .send() + .await?; + Ok(result.json::().await?) + } + } + } + + pub async fn ping(&self) -> Result<()> { + match self { + ElasticSearchOpenSearchClient::ElasticSearch(client) => { + client.ping().send().await?; + } + ElasticSearchOpenSearchClient::OpenSearch(client) => { + client.ping().send().await?; + } + } + Ok(()) + } + + fn new_update( + &self, + key: String, + index: String, + retry_on_conflict: i32, + routing_column: Option, + value: serde_json::Value, + ) -> ElasticSearchOpenSearchBulk { + match self { + ElasticSearchOpenSearchClient::ElasticSearch(_) => { + let bulk = elasticsearch::BulkOperation::update(key, value) + .index(index) + .retry_on_conflict(retry_on_conflict); + if let Some(routing_column) = routing_column { + ElasticSearchOpenSearchBulk::ElasticSearch(bulk.routing(routing_column).into()) + } else { + ElasticSearchOpenSearchBulk::ElasticSearch(bulk.into()) + } + } + ElasticSearchOpenSearchClient::OpenSearch(_) => { + let bulk = opensearch::BulkOperation::update(key, value) + .index(index) + .retry_on_conflict(retry_on_conflict); + if let Some(routing_column) = routing_column { + ElasticSearchOpenSearchBulk::OpenSearch(bulk.routing(routing_column).into()) + } else { + ElasticSearchOpenSearchBulk::OpenSearch(bulk.into()) + } + } + } + } + + fn new_delete( + &self, + key: String, + index: String, + routing_column: Option, + ) -> ElasticSearchOpenSearchBulk { + match self { + ElasticSearchOpenSearchClient::ElasticSearch(_) => { + let bulk = elasticsearch::BulkOperation::delete(key).index(index); + if let Some(routing_column) = routing_column { + ElasticSearchOpenSearchBulk::ElasticSearch(bulk.routing(routing_column).into()) + } else { + ElasticSearchOpenSearchBulk::ElasticSearch(bulk.into()) + } + } + ElasticSearchOpenSearchClient::OpenSearch(_) => { + let bulk = opensearch::BulkOperation::delete(key).index(index); + if let Some(routing_column) = routing_column { + ElasticSearchOpenSearchBulk::OpenSearch(bulk.routing(routing_column).into()) + } else { + ElasticSearchOpenSearchBulk::OpenSearch(bulk.into()) + } + } + } + } +} + +pub struct ElasticSearchOpenSearchSinkWriter { + client: Arc, + formatter: ElasticSearchOpenSearchFormatter, + config: ElasticSearchOpenSearchConfig, +} + +impl ElasticSearchOpenSearchSinkWriter { + pub fn new( + config: ElasticSearchOpenSearchConfig, + schema: Schema, + pk_indices: Vec, + connector: &str, + ) -> Result { + let client = Arc::new(config.build_client(connector)?); + let formatter = ElasticSearchOpenSearchFormatter::new( + pk_indices, + &schema, + config.delimiter.clone(), + config.get_index_column_index(&schema)?, + config.index.clone(), + config.get_routing_column_index(&schema)?, + )?; + Ok(Self { + client, + formatter, + config, + }) + } +} + +pub type ElasticSearchOpenSearchSinkDeliveryFuture = + impl TryFuture + Unpin + 'static; + +impl AsyncTruncateSinkWriter for ElasticSearchOpenSearchSinkWriter { + type DeliveryFuture = ElasticSearchOpenSearchSinkDeliveryFuture; + + async fn write_chunk<'a>( + &'a mut self, + chunk: StreamChunk, + mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, + ) -> Result<()> { + let chunk_capacity = chunk.capacity(); + let mut all_bulks: Vec> = vec![]; + let mut bulks: Vec = Vec::with_capacity(chunk_capacity); + + let mut bulks_size = 0; + for build_bulk_para in self.formatter.convert_chunk(chunk)? { + let BuildBulkPara { + key, + value, + index, + mem_size_b, + routing_column, + } = build_bulk_para; + + bulks_size += mem_size_b; + if let Some(value) = value { + let value = json!({ + "doc": value, + "doc_as_upsert": true + }); + let bulk = self.client.new_update( + key, + index, + self.config.retry_on_conflict, + routing_column, + value, + ); + bulks.push(bulk); + } else { + let bulk = self.client.new_delete(key, index, routing_column); + bulks.push(bulk); + }; + + if bulks.len() >= self.config.batch_num_messages + || bulks_size >= self.config.batch_size_kb * 1024 + { + all_bulks.push(bulks); + bulks = Vec::with_capacity(chunk_capacity); + bulks_size = 0; + } + } + if !bulks.is_empty() { + all_bulks.push(bulks); + } + for bulks in all_bulks { + let client_clone = self.client.clone(); + let future = async move { + let result = client_clone.send(bulks).await?; + if result["errors"].as_bool().is_none() || result["errors"].as_bool().unwrap() { + Err(SinkError::ElasticSearchOpenSearch(anyhow!( + "send bulk to elasticsearch failed: {:?}", + result + ))) + } else { + Ok(()) + } + } + .boxed(); + add_future.add_future_may_await(future).await?; + } + Ok(()) + } +} diff --git a/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_config.rs b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_config.rs new file mode 100644 index 0000000000000..82275dfebb1d8 --- /dev/null +++ b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_config.rs @@ -0,0 +1,210 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; + +use anyhow::anyhow; +use risingwave_common::catalog::Schema; +use risingwave_common::types::DataType; +use serde::Deserialize; +use serde_with::{serde_as, DisplayFromStr}; +use url::Url; +use with_options::WithOptions; + +use super::super::SinkError; +use super::elasticsearch::ES_SINK; +use super::elasticsearch_opensearch_client::ElasticSearchOpenSearchClient; +use super::opensearch::OPENSEARCH_SINK; +use crate::sink::Result; + +pub const ES_OPTION_DELIMITER: &str = "delimiter"; +pub const ES_OPTION_INDEX_COLUMN: &str = "index_column"; +pub const ES_OPTION_INDEX: &str = "index"; +pub const ES_OPTION_ROUTING_COLUMN: &str = "routing_column"; + +#[serde_as] +#[derive(Deserialize, Debug, Clone, WithOptions)] +pub struct ElasticSearchOpenSearchConfig { + #[serde(rename = "url")] + pub url: String, + /// The index's name of elasticsearch or openserach + #[serde(rename = "index")] + pub index: Option, + /// If pk is set, then "pk1+delimiter+pk2+delimiter..." will be used as the key, if pk is not set, we will just use the first column as the key. + #[serde(rename = "delimiter")] + pub delimiter: Option, + /// The username of elasticsearch or openserach + #[serde(rename = "username")] + pub username: String, + /// The username of elasticsearch or openserach + #[serde(rename = "password")] + pub password: String, + /// It is used for dynamic index, if it is be set, the value of this column will be used as the index. It and `index` can only set one + #[serde(rename = "index_column")] + pub index_column: Option, + + /// It is used for dynamic route, if it is be set, the value of this column will be used as the route + #[serde(rename = "routing_column")] + pub routing_column: Option, + + #[serde(rename = "retry_on_conflict")] + #[serde_as(as = "DisplayFromStr")] + #[serde(default = "default_retry_on_conflict")] + pub retry_on_conflict: i32, + + #[serde(rename = "batch_num_messages")] + #[serde_as(as = "DisplayFromStr")] + #[serde(default = "default_batch_num_messages")] + pub batch_num_messages: usize, + + #[serde(rename = "batch_size_kb")] + #[serde_as(as = "DisplayFromStr")] + #[serde(default = "default_batch_size_kb")] + pub batch_size_kb: usize, + + #[serde(rename = "concurrent_requests")] + #[serde_as(as = "DisplayFromStr")] + #[serde(default = "default_concurrent_requests")] + pub concurrent_requests: usize, +} + +fn default_retry_on_conflict() -> i32 { + 3 +} + +fn default_batch_num_messages() -> usize { + 512 +} + +fn default_batch_size_kb() -> usize { + 5 * 1024 +} + +fn default_concurrent_requests() -> usize { + 1024 +} + +impl ElasticSearchOpenSearchConfig { + pub fn from_btreemap(properties: BTreeMap) -> Result { + let config = serde_json::from_value::( + serde_json::to_value(properties).unwrap(), + ) + .map_err(|e| SinkError::Config(anyhow!(e)))?; + Ok(config) + } + + pub fn build_client(&self, connector: &str) -> Result { + let url = + Url::parse(&self.url).map_err(|e| SinkError::ElasticSearchOpenSearch(anyhow!(e)))?; + if connector.eq(ES_SINK) { + let transport = elasticsearch::http::transport::TransportBuilder::new( + elasticsearch::http::transport::SingleNodeConnectionPool::new(url), + ) + .auth(elasticsearch::auth::Credentials::Basic( + self.username.clone(), + self.password.clone(), + )) + .build() + .map_err(|e| SinkError::ElasticSearchOpenSearch(anyhow!(e)))?; + let client = elasticsearch::Elasticsearch::new(transport); + Ok(ElasticSearchOpenSearchClient::ElasticSearch(client)) + } else if connector.eq(OPENSEARCH_SINK) { + let transport = opensearch::http::transport::TransportBuilder::new( + opensearch::http::transport::SingleNodeConnectionPool::new(url), + ) + .auth(opensearch::auth::Credentials::Basic( + self.username.clone(), + self.password.clone(), + )) + .build() + .map_err(|e| SinkError::ElasticSearchOpenSearch(anyhow!(e)))?; + let client = opensearch::OpenSearch::new(transport); + Ok(ElasticSearchOpenSearchClient::OpenSearch(client)) + } else { + panic!( + "connector type must be {} or {}, but get {}", + ES_SINK, OPENSEARCH_SINK, connector + ); + } + } + + pub fn validate_config(&self, schema: &Schema) -> Result<()> { + if self.index_column.is_some() && self.index.is_some() + || self.index_column.is_none() && self.index.is_none() + { + return Err(SinkError::Config(anyhow!( + "please set only one of the 'index_column' or 'index' properties." + ))); + } + + if let Some(index_column) = &self.index_column { + let filed = schema + .fields() + .iter() + .find(|f| &f.name == index_column) + .unwrap(); + if filed.data_type() != DataType::Varchar { + return Err(SinkError::Config(anyhow!( + "please ensure the data type of {} is varchar.", + index_column + ))); + } + } + + if let Some(routing_column) = &self.routing_column { + let filed = schema + .fields() + .iter() + .find(|f| &f.name == routing_column) + .unwrap(); + if filed.data_type() != DataType::Varchar { + return Err(SinkError::Config(anyhow!( + "please ensure the data type of {} is varchar.", + routing_column + ))); + } + } + Ok(()) + } + + pub fn get_index_column_index(&self, schema: &Schema) -> Result> { + let index_column_idx = self + .index_column + .as_ref() + .map(|n| { + schema + .fields() + .iter() + .position(|s| &s.name == n) + .ok_or_else(|| anyhow!("Cannot find {}", ES_OPTION_INDEX_COLUMN)) + }) + .transpose()?; + Ok(index_column_idx) + } + + pub fn get_routing_column_index(&self, schema: &Schema) -> Result> { + let routing_column_idx = self + .routing_column + .as_ref() + .map(|n| { + schema + .fields() + .iter() + .position(|s| &s.name == n) + .ok_or_else(|| anyhow!("Cannot find {}", ES_OPTION_ROUTING_COLUMN)) + }) + .transpose()?; + Ok(routing_column_idx) + } +} diff --git a/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_formatter.rs b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_formatter.rs new file mode 100644 index 0000000000000..47bc6b302a57c --- /dev/null +++ b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_formatter.rs @@ -0,0 +1,175 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use anyhow::anyhow; +use risingwave_common::array::{Op, StreamChunk}; +use risingwave_common::catalog::Schema; +use risingwave_common::row::Row; +use serde_json::{Map, Value}; + +use super::super::encoder::template::TemplateEncoder; +use super::super::encoder::{JsonEncoder, RowEncoder}; +use super::super::SinkError; +use crate::sink::Result; + +pub struct ElasticSearchOpenSearchFormatter { + key_encoder: TemplateEncoder, + value_encoder: JsonEncoder, + index_column: Option, + index: Option, + routing_column: Option, +} + +pub struct BuildBulkPara { + pub index: String, + pub key: String, + pub value: Option>, + pub mem_size_b: usize, + pub routing_column: Option, +} + +impl ElasticSearchOpenSearchFormatter { + pub fn new( + pk_indices: Vec, + schema: &Schema, + delimiter: Option, + index_column: Option, + index: Option, + routing_column: Option, + ) -> Result { + let key_format = if pk_indices.is_empty() { + let name = &schema + .fields() + .get(0) + .ok_or_else(|| { + SinkError::ElasticSearchOpenSearch(anyhow!( + "no value find in sink schema, index is 0" + )) + })? + .name; + format!("{{{}}}", name) + } else if pk_indices.len() == 1 { + let index = *pk_indices.get(0).unwrap(); + let name = &schema + .fields() + .get(index) + .ok_or_else(|| { + SinkError::ElasticSearchOpenSearch(anyhow!( + "no value find in sink schema, index is {:?}", + index + )) + })? + .name; + format!("{{{}}}", name) + } else { + let delimiter = delimiter + .as_ref() + .ok_or_else(|| anyhow!("please set the separator in the with option, when there are multiple primary key values"))? + .clone(); + let mut names = Vec::with_capacity(pk_indices.len()); + for index in &pk_indices { + names.push(format!( + "{{{}}}", + schema + .fields() + .get(*index) + .ok_or_else(|| { + SinkError::ElasticSearchOpenSearch(anyhow!( + "no value find in sink schema, index is {:?}", + index + )) + })? + .name + )); + } + names.join(&delimiter) + }; + let col_indices = if let Some(index) = index_column { + let mut col_indices: Vec = (0..schema.len()).collect(); + col_indices.remove(index); + Some(col_indices) + } else { + None + }; + let key_encoder = TemplateEncoder::new(schema.clone(), col_indices.clone(), key_format); + let value_encoder = JsonEncoder::new_with_es(schema.clone(), col_indices.clone()); + Ok(Self { + key_encoder, + value_encoder, + index_column, + index, + routing_column, + }) + } + + pub fn convert_chunk(&self, chunk: StreamChunk) -> Result> { + let mut result_vec = Vec::with_capacity(chunk.capacity()); + for (op, rows) in chunk.rows() { + let index = if let Some(index_column) = self.index_column { + rows.datum_at(index_column) + .ok_or_else(|| { + SinkError::ElasticSearchOpenSearch(anyhow!( + "no value find in sink schema, index is {:?}", + index_column + )) + })? + .into_utf8() + } else { + self.index.as_ref().unwrap() + }; + let routing_column = self + .routing_column + .map(|routing_column| { + Ok::( + rows.datum_at(routing_column) + .ok_or_else(|| { + SinkError::ElasticSearchOpenSearch(anyhow!( + "no value find in sink schema, index is {:?}", + routing_column + )) + })? + .into_utf8() + .to_string(), + ) + }) + .transpose()?; + match op { + Op::Insert | Op::UpdateInsert => { + let key = self.key_encoder.encode(rows)?; + let value = self.value_encoder.encode(rows)?; + result_vec.push(BuildBulkPara { + index: index.to_string(), + key, + value: Some(value), + mem_size_b: rows.value_estimate_size(), + routing_column, + }); + } + Op::Delete => { + let key = self.key_encoder.encode(rows)?; + let mem_size_b = std::mem::size_of_val(&key); + result_vec.push(BuildBulkPara { + index: index.to_string(), + key, + value: None, + mem_size_b, + routing_column, + }); + } + Op::UpdateDelete => continue, + } + } + Ok(result_vec) + } +} diff --git a/src/connector/src/sink/elasticsearch_opensearch/mod.rs b/src/connector/src/sink/elasticsearch_opensearch/mod.rs new file mode 100644 index 0000000000000..7b88167c944d7 --- /dev/null +++ b/src/connector/src/sink/elasticsearch_opensearch/mod.rs @@ -0,0 +1,20 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod elasticsearch; +pub mod elasticsearch_converter; +pub mod elasticsearch_opensearch_client; +pub mod elasticsearch_opensearch_config; +pub mod elasticsearch_opensearch_formatter; +pub mod opensearch; diff --git a/src/connector/src/sink/elasticsearch_opensearch/opensearch.rs b/src/connector/src/sink/elasticsearch_opensearch/opensearch.rs new file mode 100644 index 0000000000000..ee1e4ceb38a44 --- /dev/null +++ b/src/connector/src/sink/elasticsearch_opensearch/opensearch.rs @@ -0,0 +1,128 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use anyhow::anyhow; +use risingwave_common::catalog::Schema; +use risingwave_common::session_config::sink_decouple::SinkDecouple; +use tonic::async_trait; + +use super::super::writer::{AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriterExt}; +use super::super::{DummySinkCommitCoordinator, Sink, SinkError, SinkParam, SinkWriterParam}; +use super::elasticsearch_opensearch_client::ElasticSearchOpenSearchSinkWriter; +use super::elasticsearch_opensearch_config::ElasticSearchOpenSearchConfig; +use crate::sink::Result; + +pub const OPENSEARCH_SINK: &str = "opensearch"; + +#[derive(Debug)] +pub struct OpenSearchSink { + config: ElasticSearchOpenSearchConfig, + schema: Schema, + pk_indices: Vec, +} + +#[async_trait] +impl TryFrom for OpenSearchSink { + type Error = SinkError; + + fn try_from(param: SinkParam) -> std::result::Result { + let schema = param.schema(); + let config = ElasticSearchOpenSearchConfig::from_btreemap(param.properties)?; + Ok(Self { + config, + schema, + pk_indices: param.downstream_pk, + }) + } +} + +impl Sink for OpenSearchSink { + type Coordinator = DummySinkCommitCoordinator; + type LogSinker = AsyncTruncateLogSinkerOf; + + const SINK_NAME: &'static str = OPENSEARCH_SINK; + + async fn validate(&self) -> Result<()> { + risingwave_common::license::Feature::OpenSearchSink + .check_available() + .map_err(|e| anyhow::anyhow!(e))?; + self.config.validate_config(&self.schema)?; + let client = self.config.build_client(Self::SINK_NAME)?; + client.ping().await?; + Ok(()) + } + + async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result { + Ok(ElasticSearchOpenSearchSinkWriter::new( + self.config.clone(), + self.schema.clone(), + self.pk_indices.clone(), + Self::SINK_NAME, + )? + .into_log_sinker(self.config.concurrent_requests)) + } + + fn set_default_commit_checkpoint_interval( + desc: &mut crate::sink::catalog::desc::SinkDesc, + user_specified: &risingwave_common::session_config::sink_decouple::SinkDecouple, + ) -> Result<()> { + if crate::sink::is_sink_support_commit_checkpoint_interval(Self::SINK_NAME) { + match desc + .properties + .get(crate::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL) + { + Some(commit_checkpoint_interval) => { + let commit_checkpoint_interval = commit_checkpoint_interval + .parse::() + .map_err(|e| SinkError::Config(anyhow!(e)))?; + if std::matches!(user_specified, SinkDecouple::Disable) + && commit_checkpoint_interval > 1 + { + return Err(SinkError::Config(anyhow!("config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"))); + } + } + None => match user_specified { + risingwave_common::session_config::sink_decouple::SinkDecouple::Default + | risingwave_common::session_config::sink_decouple::SinkDecouple::Enable => { + desc.properties.insert( + crate::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL.to_string(), + crate::sink::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE.to_string(), + ); + } + risingwave_common::session_config::sink_decouple::SinkDecouple::Disable => { + desc.properties.insert( + crate::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL.to_string(), + crate::sink::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(), + ); + } + }, + } + } + Ok(()) + } + + fn is_sink_decouple( + user_specified: &risingwave_common::session_config::sink_decouple::SinkDecouple, + ) -> Result { + match user_specified { + risingwave_common::session_config::sink_decouple::SinkDecouple::Default + | risingwave_common::session_config::sink_decouple::SinkDecouple::Enable => Ok(true), + risingwave_common::session_config::sink_decouple::SinkDecouple::Disable => Ok(false), + } + } + + async fn new_coordinator(&self) -> Result { + Err(SinkError::Coordinator(anyhow!("no coordinator"))) + } +} diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 0001ce4a58be2..f8dd8480c8532 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -21,7 +21,7 @@ pub mod deltalake; pub mod doris; pub mod doris_starrocks_connector; pub mod dynamodb; -pub mod elasticsearch; +pub mod elasticsearch_opensearch; pub mod encoder; pub mod file_sink; pub mod formatter; @@ -113,8 +113,10 @@ macro_rules! for_all_sinks { { GooglePubSub, $crate::sink::google_pubsub::GooglePubSubSink }, { Nats, $crate::sink::nats::NatsSink }, { Jdbc, $crate::sink::remote::JdbcSink }, - { ElasticSearch, $crate::sink::remote::ElasticSearchSink }, - { Opensearch, $crate::sink::remote::OpenSearchSink }, + // { ElasticSearchJava, $crate::sink::remote::ElasticSearchJavaSink }, + // { OpensearchJava, $crate::sink::remote::OpenSearchJavaSink }, + { ElasticSearch, $crate::sink::elasticsearch_opensearch::elasticsearch::ElasticSearchSink }, + { Opensearch, $crate::sink::elasticsearch_opensearch::opensearch::OpenSearchSink }, { Cassandra, $crate::sink::remote::CassandraSink }, { HttpJava, $crate::sink::remote::HttpJavaSink }, { Doris, $crate::sink::doris::DorisSink }, @@ -794,6 +796,12 @@ pub enum SinkError { #[backtrace] anyhow::Error, ), + #[error("ElasticSearch/OpenSearch error: {0}")] + ElasticSearchOpenSearch( + #[source] + #[backtrace] + anyhow::Error, + ), #[error("Starrocks error: {0}")] Starrocks(String), #[error("File error: {0}")] @@ -907,3 +915,15 @@ impl From for SinkError { SinkError::SqlServer(anyhow!(err)) } } + +impl From<::elasticsearch::Error> for SinkError { + fn from(err: ::elasticsearch::Error) -> Self { + SinkError::ElasticSearchOpenSearch(anyhow!(err)) + } +} + +impl From<::opensearch::Error> for SinkError { + fn from(err: ::opensearch::Error) -> Self { + SinkError::ElasticSearchOpenSearch(anyhow!(err)) + } +} diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index d72991094f689..76e65418cdf18 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -57,7 +57,10 @@ use tokio::task::spawn_blocking; use tokio_stream::wrappers::ReceiverStream; use tracing::warn; -use super::elasticsearch::{is_es_sink, StreamChunkConverter, ES_OPTION_DELIMITER}; +use super::elasticsearch_opensearch::elasticsearch_converter::{ + is_remote_es_sink, StreamChunkConverter, +}; +use super::elasticsearch_opensearch::elasticsearch_opensearch_config::ES_OPTION_DELIMITER; use crate::error::ConnectorResult; use crate::sink::coordinate::CoordinatedSinkWriter; use crate::sink::log_store::{LogStoreReadItem, LogStoreResult, TruncateOffset}; @@ -70,8 +73,9 @@ use crate::sink::{ macro_rules! def_remote_sink { () => { def_remote_sink! { - { ElasticSearch, ElasticSearchSink, "elasticsearch" } - { Opensearch, OpenSearchSink, "opensearch"} + //todo!, delete java impl + // { ElasticSearchJava, ElasticSearchJavaSink, "elasticsearch_v1" } + // { OpensearchJava, OpenSearchJavaSink, "opensearch_v1"} { Cassandra, CassandraSink, "cassandra" } { Jdbc, JdbcSink, "jdbc" } { DeltaLake, DeltaLakeSink, "deltalake" } @@ -161,12 +165,12 @@ impl Sink for RemoteSink { } async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorResult<()> { - if sink_name == OpenSearchSink::SINK_NAME { - risingwave_common::license::Feature::OpenSearchSink - .check_available() - .map_err(|e| anyhow::anyhow!(e))?; - } - if is_es_sink(sink_name) + // if sink_name == OpenSearchJavaSink::SINK_NAME { + // risingwave_common::license::Feature::OpenSearchSink + // .check_available() + // .map_err(|e| anyhow::anyhow!(e))?; + // } + if is_remote_es_sink(sink_name) && param.downstream_pk.len() > 1 && !param.properties.contains_key(ES_OPTION_DELIMITER) { @@ -191,7 +195,7 @@ async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorRe | DataType::Jsonb | DataType::Bytea => Ok(()), DataType::List(list) => { - if is_es_sink(sink_name) || matches!(list.as_ref(), DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 | DataType::Float64 | DataType::Varchar){ + if is_remote_es_sink(sink_name) || matches!(list.as_ref(), DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 | DataType::Float64 | DataType::Varchar){ Ok(()) } else{ Err(SinkError::Remote(anyhow!( @@ -202,7 +206,7 @@ async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorRe } }, DataType::Struct(_) => { - if is_es_sink(sink_name){ + if is_remote_es_sink(sink_name){ Ok(()) }else{ Err(SinkError::Remote(anyhow!( @@ -266,7 +270,7 @@ impl RemoteLogSinker { sink_name: &str, ) -> Result { let sink_proto = sink_param.to_proto(); - let payload_schema = if is_es_sink(sink_name) { + let payload_schema = if is_remote_es_sink(sink_name) { let columns = vec![ ColumnDesc::unnamed(ColumnId::from(0), DataType::Varchar).to_protobuf(), ColumnDesc::unnamed(ColumnId::from(1), DataType::Varchar).to_protobuf(), diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index ba2fa8746a1da..a92c3eaf844dd 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -232,6 +232,47 @@ DynamoDbConfig: field_type: usize required: false default: '256' +ElasticSearchOpenSearchConfig: + fields: + - name: url + field_type: String + required: true + - name: index + field_type: String + comments: The index's name of elasticsearch or openserach + required: false + - name: delimiter + field_type: String + comments: If pk is set, then "pk1+delimiter+pk2+delimiter..." will be used as the key, if pk is not set, we will just use the first column as the key. + required: false + - name: username + field_type: String + comments: The username of elasticsearch or openserach + required: true + - name: password + field_type: String + comments: The username of elasticsearch or openserach + required: true + - name: index_column + field_type: String + comments: It is used for dynamic index, if it is be set, the value of this column will be used as the index. It and `index` can only set one + required: false + - name: routing_column + field_type: String + comments: It is used for dynamic route, if it is be set, the value of this column will be used as the route + required: false + - name: retry_on_conflict + field_type: i32 + required: true + - name: batch_num_messages + field_type: usize + required: true + - name: batch_size_kb + field_type: usize + required: true + - name: concurrent_requests + field_type: usize + required: true FsConfig: fields: - name: fs.path