Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(sink): refactor es and opensearch to support async #17746

Merged
merged 22 commits into from
Oct 10, 2024
Prev Previous commit
fix ci
  • Loading branch information
xxhZs committed Oct 9, 2024
commit 02560db2695078dfcf6c3c139cf3d95f01730b00
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@ use risingwave_common::catalog::Schema;
use risingwave_common::types::{JsonbVal, Scalar};
use serde_json::Value;

use super::super::remote::{ElasticSearchJavaSink, OpenSearchJavaSink};
use super::elasticsearch_opensearch_config::{
ES_OPTION_DELIMITER, ES_OPTION_INDEX, ES_OPTION_INDEX_COLUMN, ES_OPTION_ROUTING_COLUMN,
};
use super::elasticsearch_opensearch_formatter::{BuildBulkPara, ElasticSearchOpenSearchFormatter};
use crate::sink::{Result, Sink};
use crate::sink::Result;

pub enum StreamChunkConverter {
Es(EsStreamChunkConverter),
Expand Down Expand Up @@ -153,6 +152,7 @@ impl EsStreamChunkConverter {
}
}

pub fn is_remote_es_sink(sink_name: &str) -> bool {
sink_name == ElasticSearchJavaSink::SINK_NAME || sink_name == OpenSearchJavaSink::SINK_NAME
pub fn is_remote_es_sink(_sink_name: &str) -> bool {
// sink_name == ElasticSearchJavaSink::SINK_NAME || sink_name == OpenSearchJavaSink::SINK_NAME
false
}
3 changes: 3 additions & 0 deletions src/connector/src/sink/elasticsearch_opensearch/opensearch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ impl Sink for OpenSearchSink {
const SINK_NAME: &'static str = OPENSEARCH_SINK;

async fn validate(&self) -> Result<()> {
risingwave_common::license::Feature::OpenSearchSink
.check_available()
.map_err(|e| anyhow::anyhow!(e))?;
self.config.validate_config(&self.schema)?;
let client = self.config.build_client(Self::SINK_NAME)?;
client.ping().await?;
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ macro_rules! for_all_sinks {
{ GooglePubSub, $crate::sink::google_pubsub::GooglePubSubSink },
{ Nats, $crate::sink::nats::NatsSink },
{ Jdbc, $crate::sink::remote::JdbcSink },
{ ElasticSearchJava, $crate::sink::remote::ElasticSearchJavaSink },
{ OpensearchJava, $crate::sink::remote::OpenSearchJavaSink },
// { ElasticSearchJava, $crate::sink::remote::ElasticSearchJavaSink },
// { OpensearchJava, $crate::sink::remote::OpenSearchJavaSink },
{ ElasticSearch, $crate::sink::elasticsearch_opensearch::elasticsearch::ElasticSearchSink },
{ Opensearch, $crate::sink::elasticsearch_opensearch::opensearch::OpenSearchSink },
{ Cassandra, $crate::sink::remote::CassandraSink },
Expand Down
10 changes: 5 additions & 5 deletions src/connector/src/sink/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,11 @@ impl<R: RemoteSinkTrait> Sink for RemoteSink<R> {
}

async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorResult<()> {
if sink_name == OpenSearchJavaSink::SINK_NAME {
risingwave_common::license::Feature::OpenSearchSink
.check_available()
.map_err(|e| anyhow::anyhow!(e))?;
}
// if sink_name == OpenSearchJavaSink::SINK_NAME {
// risingwave_common::license::Feature::OpenSearchSink
// .check_available()
// .map_err(|e| anyhow::anyhow!(e))?;
// }
if is_remote_es_sink(sink_name)
&& param.downstream_pk.len() > 1
&& !param.properties.contains_key(ES_OPTION_DELIMITER)
Expand Down
Loading