diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 175bb60ae64e..6544b6e45267 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -56,7 +56,7 @@ csv = "1.3" deltalake = { workspace = true } duration-str = "0.11.2" easy-ext = "1" -elasticsearch = {version = "8.5.0-alpha.1", features = ["rustls-tls"]} +elasticsearch = { version = "8.5.0-alpha.1", features = ["rustls-tls"] } enum-as-inner = "0.6" futures = { version = "0.3", default-features = false, features = ["alloc"] } futures-async-stream = { workspace = true } @@ -93,7 +93,7 @@ opendal = { workspace = true, features = [ "services-memory", "services-s3", ] } -opensearch = {version = "2.2.0" , features = ["rustls-tls"]} +opensearch = { version = "2.2.0", features = ["rustls-tls"] } openssl = "0.10" parking_lot = { workspace = true } parquet = { workspace = true } diff --git a/src/connector/src/sink/elasticsearch_rust.rs b/src/connector/src/sink/elasticsearch_rust.rs index 70d1babdb570..e527f4975b01 100644 --- a/src/connector/src/sink/elasticsearch_rust.rs +++ b/src/connector/src/sink/elasticsearch_rust.rs @@ -20,9 +20,11 @@ use futures::prelude::TryFuture; use futures::FutureExt; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; +use risingwave_common::session_config::sink_decouple::SinkDecouple; use serde_json::Value; use tonic::async_trait; +use super::catalog::desc::SinkDesc; use super::elasticsearch_opensearch_common::{ validate_config, ElasticSearchOpenSearchConfig, ElasticSearchOpenSearchFormatter, }; @@ -66,6 +68,13 @@ impl Sink for ElasticSearchSink { const SINK_NAME: &'static str = ES_SINK; + fn is_sink_decouple(_desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { + match user_specified { + SinkDecouple::Default | SinkDecouple::Enable => Ok(true), + SinkDecouple::Disable => Ok(false), + } + } + async fn validate(&self) -> Result<()> { validate_config(&self.config, &self.schema)?; let client = self.config.build_elasticsearch_client()?; diff --git a/src/connector/src/sink/opensearch.rs b/src/connector/src/sink/opensearch.rs index 437189fb8240..f5c33b0105be 100644 --- a/src/connector/src/sink/opensearch.rs +++ b/src/connector/src/sink/opensearch.rs @@ -19,9 +19,11 @@ use futures::FutureExt; use opensearch::{BulkOperation, BulkParts, OpenSearch}; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; +use risingwave_common::session_config::sink_decouple::SinkDecouple; use serde_json::Value; use tonic::async_trait; +use super::catalog::desc::SinkDesc; use super::elasticsearch_opensearch_common::{ validate_config, ElasticSearchOpenSearchConfig, ElasticSearchOpenSearchFormatter, }; @@ -65,6 +67,13 @@ impl Sink for OpenSearchSink { const SINK_NAME: &'static str = OPENSEARCH_SINK; + fn is_sink_decouple(_desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { + match user_specified { + SinkDecouple::Default | SinkDecouple::Enable => Ok(true), + SinkDecouple::Disable => Ok(false), + } + } + async fn validate(&self) -> Result<()> { validate_config(&self.config, &self.schema)?; let client = self.config.build_opensearch_client()?;