From ebc0e47102eeab6533991a06277b3fe4b9a46a3f Mon Sep 17 00:00:00 2001 From: Xinhao Xu <84456268+xxhZs@users.noreply.github.com> Date: Wed, 14 Aug 2024 21:28:57 +0800 Subject: [PATCH] feat(sink): add check license for snowflake, dynamodb opensearch sink (#17912) --- e2e_test/sink/license.slt | 195 ++++++++++++++++++++++++ src/connector/src/sink/big_query.rs | 3 + src/connector/src/sink/dynamodb.rs | 3 + src/connector/src/sink/elasticsearch.rs | 4 +- src/connector/src/sink/mod.rs | 2 +- src/connector/src/sink/remote.rs | 7 +- src/connector/src/sink/snowflake.rs | 3 + src/license/src/feature.rs | 4 + 8 files changed, 217 insertions(+), 4 deletions(-) create mode 100644 e2e_test/sink/license.slt diff --git a/e2e_test/sink/license.slt b/e2e_test/sink/license.slt new file mode 100644 index 0000000000000..852d7c0fe7bfc --- /dev/null +++ b/e2e_test/sink/license.slt @@ -0,0 +1,195 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +ALTER SYSTEM SET license_key TO ''; + +statement ok +CREATE TABLE t (k INT); + +statement error +CREATE SINK dynamodb_sink +FROM + t +WITH +( + connector = 'dynamodb', + table = 'xx', + primary_key = 'k', + region = 'xx', + access_key = 'xx', + secret_key = 'xx' +); +---- +db error: ERROR: Failed to run the query + +Caused by these errors (recent errors listed first): + 1: gRPC request to meta service failed: Internal error + 2: failed to validate sink + 3: Internal error + 4: feature DynamoDbSink is only available for tier Paid and above, while the current tier is Free + +Hint: You may want to set a license key with `ALTER SYSTEM SET license_key = '...';` command. + + +statement error +CREATE SINK snowflake_sink +FROM t +WITH ( + connector = 'snowflake', + type = 'append-only', + force_append_only = 'true', + s3.bucket_name = 'xx', + s3.credentials.access = 'xx', + s3.credentials.secret = 'xx', + s3.region_name = 'xx', + s3.path = 'xx', +); +---- +db error: ERROR: Failed to run the query + +Caused by these errors (recent errors listed first): + 1: gRPC request to meta service failed: Internal error + 2: failed to validate sink + 3: Internal error + 4: feature SnowflakeSink is only available for tier Paid and above, while the current tier is Free + +Hint: You may want to set a license key with `ALTER SYSTEM SET license_key = '...';` command. + + +statement error +CREATE SINK opensearch_sink +FROM t +WITH ( + connector = 'opensearch', + url = 'xx', + username = 'xx', + password = 'xx', +); +---- +db error: ERROR: Failed to run the query + +Caused by these errors (recent errors listed first): + 1: gRPC request to meta service failed: Internal error + 2: failed to validate sink + 3: feature OpenSearchSink is only available for tier Paid and above, while the current tier is Free + +Hint: You may want to set a license key with `ALTER SYSTEM SET license_key = '...';` command. + + +statement error +CREATE SINK bigquery_sink +FROM + t +WITH +( + connector = 'bigquery', + type = 'append-only', + force_append_only='true', + bigquery.local.path= 'xx', + bigquery.project= 'xx', + bigquery.dataset= 'xx', + bigquery.table= 'xx' +); +---- +db error: ERROR: Failed to run the query + +Caused by these errors (recent errors listed first): + 1: gRPC request to meta service failed: Internal error + 2: failed to validate sink + 3: Internal error + 4: feature BigQuerySink is only available for tier Paid and above, while the current tier is Free + +Hint: You may want to set a license key with `ALTER SYSTEM SET license_key = '...';` command. + + +statement ok +ALTER SYSTEM SET license_key TO DEFAULT; + +statement ok +flush; + +statement error +CREATE SINK dynamodb_sink +FROM + t +WITH +( + connector = 'dynamodb', + table = 'xx', + primary_key = 'xx', + region = 'xx', + access_key = 'xx', + secret_key = 'xx' +); +---- +db error: ERROR: Failed to run the query + +Caused by these errors (recent errors listed first): + 1: Sink error + 2: Sink primary key column not found: xx. Please use ',' as the delimiter for different primary key columns. + + +statement ok +CREATE SINK snowflake_sink +FROM t +WITH ( + connector = 'snowflake', + type = 'append-only', + force_append_only = 'true', + s3.bucket_name = 'xx', + s3.credentials.access = 'xx', + s3.credentials.secret = 'xx', + s3.region_name = 'xx', + s3.path = 'xx', +); + + +statement error +CREATE SINK opensearch_sink +FROM t +WITH ( + connector = 'opensearch', + url = 'xx', + username = 'xx', + password = 'xx', + index = 'xx', +); +---- +db error: ERROR: Failed to run the query + +Caused by these errors (recent errors listed first): + 1: gRPC request to meta service failed: Internal error + 2: failed to validate sink + 3: sink cannot pass validation: INTERNAL: Connection is closed + + +statement error +CREATE SINK bigquery_sink +FROM + t +WITH +( + connector = 'bigquery', + type = 'append-only', + force_append_only='true', + bigquery.local.path= 'xx', + bigquery.project= 'xx', + bigquery.dataset= 'xx', + bigquery.table= 'xx' +); +---- +db error: ERROR: Failed to run the query + +Caused by these errors (recent errors listed first): + 1: gRPC request to meta service failed: Internal error + 2: failed to validate sink + 3: BigQuery error + 4: No such file or directory (os error 2) + + +statement ok +DROP SINK snowflake_sink; + +statement ok +DROP TABLE t; \ No newline at end of file diff --git a/src/connector/src/sink/big_query.rs b/src/connector/src/sink/big_query.rs index ebd18fed2063b..22146e86d0d1d 100644 --- a/src/connector/src/sink/big_query.rs +++ b/src/connector/src/sink/big_query.rs @@ -358,6 +358,9 @@ impl Sink for BigQuerySink { } async fn validate(&self) -> Result<()> { + risingwave_common::license::Feature::BigQuerySink + .check_available() + .map_err(|e| anyhow::anyhow!(e))?; if !self.is_append_only && self.pk_indices.is_empty() { return Err(SinkError::Config(anyhow!( "Primary key not defined for upsert bigquery sink (please define in `primary_key` field)"))); diff --git a/src/connector/src/sink/dynamodb.rs b/src/connector/src/sink/dynamodb.rs index 2df15f517ca0b..6d73bf2d478c8 100644 --- a/src/connector/src/sink/dynamodb.rs +++ b/src/connector/src/sink/dynamodb.rs @@ -88,6 +88,9 @@ impl Sink for DynamoDbSink { const SINK_NAME: &'static str = DYNAMO_DB_SINK; async fn validate(&self) -> Result<()> { + risingwave_common::license::Feature::DynamoDbSink + .check_available() + .map_err(|e| anyhow::anyhow!(e))?; let client = (self.config.build_client().await) .context("validate DynamoDB sink error") .map_err(SinkError::DynamoDb)?; diff --git a/src/connector/src/sink/elasticsearch.rs b/src/connector/src/sink/elasticsearch.rs index 31dde5c52509e..5e45c2b8c74aa 100644 --- a/src/connector/src/sink/elasticsearch.rs +++ b/src/connector/src/sink/elasticsearch.rs @@ -24,7 +24,7 @@ use risingwave_common::types::{JsonbVal, Scalar, ToText}; use serde_json::Value; use super::encoder::{JsonEncoder, RowEncoder}; -use super::remote::{ElasticSearchSink, OpensearchSink}; +use super::remote::{ElasticSearchSink, OpenSearchSink}; use crate::sink::{Result, Sink}; pub const ES_OPTION_DELIMITER: &str = "delimiter"; pub const ES_OPTION_INDEX_COLUMN: &str = "index_column"; @@ -172,5 +172,5 @@ impl EsStreamChunkConverter { } pub fn is_es_sink(sink_name: &str) -> bool { - sink_name == ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME + sink_name == ElasticSearchSink::SINK_NAME || sink_name == OpenSearchSink::SINK_NAME } diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index cad35a278edb0..3391520ed0c23 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -94,7 +94,7 @@ macro_rules! for_all_sinks { { Nats, $crate::sink::nats::NatsSink }, { Jdbc, $crate::sink::remote::JdbcSink }, { ElasticSearch, $crate::sink::remote::ElasticSearchSink }, - { Opensearch, $crate::sink::remote::OpensearchSink }, + { Opensearch, $crate::sink::remote::OpenSearchSink }, { Cassandra, $crate::sink::remote::CassandraSink }, { HttpJava, $crate::sink::remote::HttpJavaSink }, { Doris, $crate::sink::doris::DorisSink }, diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index eb62b7fbc8cd8..606965a8424d7 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -73,7 +73,7 @@ macro_rules! def_remote_sink { () => { def_remote_sink! { { ElasticSearch, ElasticSearchSink, "elasticsearch" } - { Opensearch, OpensearchSink, "opensearch"} + { Opensearch, OpenSearchSink, "opensearch"} { Cassandra, CassandraSink, "cassandra" } { Jdbc, JdbcSink, "jdbc", |desc| { desc.sink_type.is_append_only() @@ -165,6 +165,11 @@ impl Sink for RemoteSink { } async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorResult<()> { + if sink_name == OpenSearchSink::SINK_NAME { + risingwave_common::license::Feature::OpenSearchSink + .check_available() + .map_err(|e| anyhow::anyhow!(e))?; + } if is_es_sink(sink_name) && param.downstream_pk.len() > 1 && !param.properties.contains_key(ES_OPTION_DELIMITER) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index 6c3cc291f58e2..d87072a2502e9 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -118,6 +118,9 @@ impl Sink for SnowflakeSink { } async fn validate(&self) -> Result<()> { + risingwave_common::license::Feature::SnowflakeSink + .check_available() + .map_err(|e| anyhow::anyhow!(e))?; if !self.is_append_only { return Err(SinkError::Config( anyhow!("SnowflakeSink only supports append-only mode at present, please change the query to append-only, or use `force_append_only = 'true'`") diff --git a/src/license/src/feature.rs b/src/license/src/feature.rs index 00c6d200aa0a3..e434e3709ac26 100644 --- a/src/license/src/feature.rs +++ b/src/license/src/feature.rs @@ -46,6 +46,10 @@ macro_rules! for_all_features { { TestPaid, Paid, "A dummy feature that's only available on paid tier for testing purposes." }, { TimeTravel, Paid, "Query historical data within the retention period."}, { GlueSchemaRegistry, Paid, "Use Schema Registry from AWS Glue rather than Confluent." }, + { SnowflakeSink, Paid, "Delivering data to SnowFlake." }, + { DynamoDbSink, Paid, "Delivering data to DynamoDb." }, + { OpenSearchSink, Paid, "Delivering data to OpenSearch." }, + { BigQuerySink, Paid, "Delivering data to BigQuery." }, { ClickHouseSharedEngine,Paid, "Delivering data to Shared tree on clickhouse cloud"}, { SecretManagement, Paid, "Secret management." }, { CdcTableSchemaMap, Paid, "Automatically map upstream schema to CDC Table."},