Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs committed Jul 18, 2024
1 parent 92db3d4 commit 5ae7a0d
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 2 deletions.
4 changes: 2 additions & 2 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ csv = "1.3"
deltalake = { workspace = true }
duration-str = "0.11.2"
easy-ext = "1"
elasticsearch = {version = "8.5.0-alpha.1", features = ["rustls-tls"]}
elasticsearch = { version = "8.5.0-alpha.1", features = ["rustls-tls"] }
enum-as-inner = "0.6"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
futures-async-stream = { workspace = true }
Expand Down Expand Up @@ -93,7 +93,7 @@ opendal = { workspace = true, features = [
"services-memory",
"services-s3",
] }
opensearch = {version = "2.2.0" , features = ["rustls-tls"]}
opensearch = { version = "2.2.0", features = ["rustls-tls"] }
openssl = "0.10"
parking_lot = { workspace = true }
parquet = { workspace = true }
Expand Down
9 changes: 9 additions & 0 deletions src/connector/src/sink/elasticsearch_rust.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ use futures::prelude::TryFuture;
use futures::FutureExt;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::session_config::sink_decouple::SinkDecouple;
use serde_json::Value;
use tonic::async_trait;

use super::catalog::desc::SinkDesc;
use super::elasticsearch_opensearch_common::{
validate_config, ElasticSearchOpenSearchConfig, ElasticSearchOpenSearchFormatter,
};
Expand Down Expand Up @@ -66,6 +68,13 @@ impl Sink for ElasticSearchSink {

const SINK_NAME: &'static str = ES_SINK;

fn is_sink_decouple(_desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
match user_specified {
SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
SinkDecouple::Disable => Ok(false),
}
}

async fn validate(&self) -> Result<()> {
validate_config(&self.config, &self.schema)?;
let client = self.config.build_elasticsearch_client()?;
Expand Down
9 changes: 9 additions & 0 deletions src/connector/src/sink/opensearch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ use futures::FutureExt;
use opensearch::{BulkOperation, BulkParts, OpenSearch};
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::session_config::sink_decouple::SinkDecouple;
use serde_json::Value;
use tonic::async_trait;

use super::catalog::desc::SinkDesc;
use super::elasticsearch_opensearch_common::{
validate_config, ElasticSearchOpenSearchConfig, ElasticSearchOpenSearchFormatter,
};
Expand Down Expand Up @@ -65,6 +67,13 @@ impl Sink for OpenSearchSink {

const SINK_NAME: &'static str = OPENSEARCH_SINK;

fn is_sink_decouple(_desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
match user_specified {
SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
SinkDecouple::Disable => Ok(false),
}
}

async fn validate(&self) -> Result<()> {
validate_config(&self.config, &self.schema)?;
let client = self.config.build_opensearch_client()?;
Expand Down

0 comments on commit 5ae7a0d

Please sign in to comment.