Skip to content

Commit

Permalink
feat(sink): add check license for snowflake, dynamodb opensearch sink (
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored Aug 14, 2024
1 parent f09f195 commit ebc0e47
Show file tree
Hide file tree
Showing 8 changed files with 217 additions and 4 deletions.
195 changes: 195 additions & 0 deletions e2e_test/sink/license.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
ALTER SYSTEM SET license_key TO '';

statement ok
CREATE TABLE t (k INT);

statement error
CREATE SINK dynamodb_sink
FROM
t
WITH
(
connector = 'dynamodb',
table = 'xx',
primary_key = 'k',
region = 'xx',
access_key = 'xx',
secret_key = 'xx'
);
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: gRPC request to meta service failed: Internal error
2: failed to validate sink
3: Internal error
4: feature DynamoDbSink is only available for tier Paid and above, while the current tier is Free

Hint: You may want to set a license key with `ALTER SYSTEM SET license_key = '...';` command.


statement error
CREATE SINK snowflake_sink
FROM t
WITH (
connector = 'snowflake',
type = 'append-only',
force_append_only = 'true',
s3.bucket_name = 'xx',
s3.credentials.access = 'xx',
s3.credentials.secret = 'xx',
s3.region_name = 'xx',
s3.path = 'xx',
);
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: gRPC request to meta service failed: Internal error
2: failed to validate sink
3: Internal error
4: feature SnowflakeSink is only available for tier Paid and above, while the current tier is Free

Hint: You may want to set a license key with `ALTER SYSTEM SET license_key = '...';` command.


statement error
CREATE SINK opensearch_sink
FROM t
WITH (
connector = 'opensearch',
url = 'xx',
username = 'xx',
password = 'xx',
);
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: gRPC request to meta service failed: Internal error
2: failed to validate sink
3: feature OpenSearchSink is only available for tier Paid and above, while the current tier is Free

Hint: You may want to set a license key with `ALTER SYSTEM SET license_key = '...';` command.


statement error
CREATE SINK bigquery_sink
FROM
t
WITH
(
connector = 'bigquery',
type = 'append-only',
force_append_only='true',
bigquery.local.path= 'xx',
bigquery.project= 'xx',
bigquery.dataset= 'xx',
bigquery.table= 'xx'
);
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: gRPC request to meta service failed: Internal error
2: failed to validate sink
3: Internal error
4: feature BigQuerySink is only available for tier Paid and above, while the current tier is Free

Hint: You may want to set a license key with `ALTER SYSTEM SET license_key = '...';` command.


statement ok
ALTER SYSTEM SET license_key TO DEFAULT;

statement ok
flush;

statement error
CREATE SINK dynamodb_sink
FROM
t
WITH
(
connector = 'dynamodb',
table = 'xx',
primary_key = 'xx',
region = 'xx',
access_key = 'xx',
secret_key = 'xx'
);
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: Sink error
2: Sink primary key column not found: xx. Please use ',' as the delimiter for different primary key columns.


statement ok
CREATE SINK snowflake_sink
FROM t
WITH (
connector = 'snowflake',
type = 'append-only',
force_append_only = 'true',
s3.bucket_name = 'xx',
s3.credentials.access = 'xx',
s3.credentials.secret = 'xx',
s3.region_name = 'xx',
s3.path = 'xx',
);


statement error
CREATE SINK opensearch_sink
FROM t
WITH (
connector = 'opensearch',
url = 'xx',
username = 'xx',
password = 'xx',
index = 'xx',
);
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: gRPC request to meta service failed: Internal error
2: failed to validate sink
3: sink cannot pass validation: INTERNAL: Connection is closed


statement error
CREATE SINK bigquery_sink
FROM
t
WITH
(
connector = 'bigquery',
type = 'append-only',
force_append_only='true',
bigquery.local.path= 'xx',
bigquery.project= 'xx',
bigquery.dataset= 'xx',
bigquery.table= 'xx'
);
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: gRPC request to meta service failed: Internal error
2: failed to validate sink
3: BigQuery error
4: No such file or directory (os error 2)


statement ok
DROP SINK snowflake_sink;

statement ok
DROP TABLE t;
3 changes: 3 additions & 0 deletions src/connector/src/sink/big_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,9 @@ impl Sink for BigQuerySink {
}

async fn validate(&self) -> Result<()> {
risingwave_common::license::Feature::BigQuerySink
.check_available()
.map_err(|e| anyhow::anyhow!(e))?;
if !self.is_append_only && self.pk_indices.is_empty() {
return Err(SinkError::Config(anyhow!(
"Primary key not defined for upsert bigquery sink (please define in `primary_key` field)")));
Expand Down
3 changes: 3 additions & 0 deletions src/connector/src/sink/dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ impl Sink for DynamoDbSink {
const SINK_NAME: &'static str = DYNAMO_DB_SINK;

async fn validate(&self) -> Result<()> {
risingwave_common::license::Feature::DynamoDbSink
.check_available()
.map_err(|e| anyhow::anyhow!(e))?;
let client = (self.config.build_client().await)
.context("validate DynamoDB sink error")
.map_err(SinkError::DynamoDb)?;
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/sink/elasticsearch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_common::types::{JsonbVal, Scalar, ToText};
use serde_json::Value;

use super::encoder::{JsonEncoder, RowEncoder};
use super::remote::{ElasticSearchSink, OpensearchSink};
use super::remote::{ElasticSearchSink, OpenSearchSink};
use crate::sink::{Result, Sink};
pub const ES_OPTION_DELIMITER: &str = "delimiter";
pub const ES_OPTION_INDEX_COLUMN: &str = "index_column";
Expand Down Expand Up @@ -172,5 +172,5 @@ impl EsStreamChunkConverter {
}

pub fn is_es_sink(sink_name: &str) -> bool {
sink_name == ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME
sink_name == ElasticSearchSink::SINK_NAME || sink_name == OpenSearchSink::SINK_NAME
}
2 changes: 1 addition & 1 deletion src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ macro_rules! for_all_sinks {
{ Nats, $crate::sink::nats::NatsSink },
{ Jdbc, $crate::sink::remote::JdbcSink },
{ ElasticSearch, $crate::sink::remote::ElasticSearchSink },
{ Opensearch, $crate::sink::remote::OpensearchSink },
{ Opensearch, $crate::sink::remote::OpenSearchSink },
{ Cassandra, $crate::sink::remote::CassandraSink },
{ HttpJava, $crate::sink::remote::HttpJavaSink },
{ Doris, $crate::sink::doris::DorisSink },
Expand Down
7 changes: 6 additions & 1 deletion src/connector/src/sink/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ macro_rules! def_remote_sink {
() => {
def_remote_sink! {
{ ElasticSearch, ElasticSearchSink, "elasticsearch" }
{ Opensearch, OpensearchSink, "opensearch"}
{ Opensearch, OpenSearchSink, "opensearch"}
{ Cassandra, CassandraSink, "cassandra" }
{ Jdbc, JdbcSink, "jdbc", |desc| {
desc.sink_type.is_append_only()
Expand Down Expand Up @@ -165,6 +165,11 @@ impl<R: RemoteSinkTrait> Sink for RemoteSink<R> {
}

async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorResult<()> {
if sink_name == OpenSearchSink::SINK_NAME {
risingwave_common::license::Feature::OpenSearchSink
.check_available()
.map_err(|e| anyhow::anyhow!(e))?;
}
if is_es_sink(sink_name)
&& param.downstream_pk.len() > 1
&& !param.properties.contains_key(ES_OPTION_DELIMITER)
Expand Down
3 changes: 3 additions & 0 deletions src/connector/src/sink/snowflake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ impl Sink for SnowflakeSink {
}

async fn validate(&self) -> Result<()> {
risingwave_common::license::Feature::SnowflakeSink
.check_available()
.map_err(|e| anyhow::anyhow!(e))?;
if !self.is_append_only {
return Err(SinkError::Config(
anyhow!("SnowflakeSink only supports append-only mode at present, please change the query to append-only, or use `force_append_only = 'true'`")
Expand Down
4 changes: 4 additions & 0 deletions src/license/src/feature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ macro_rules! for_all_features {
{ TestPaid, Paid, "A dummy feature that's only available on paid tier for testing purposes." },
{ TimeTravel, Paid, "Query historical data within the retention period."},
{ GlueSchemaRegistry, Paid, "Use Schema Registry from AWS Glue rather than Confluent." },
{ SnowflakeSink, Paid, "Delivering data to SnowFlake." },
{ DynamoDbSink, Paid, "Delivering data to DynamoDb." },
{ OpenSearchSink, Paid, "Delivering data to OpenSearch." },
{ BigQuerySink, Paid, "Delivering data to BigQuery." },
{ ClickHouseSharedEngine,Paid, "Delivering data to Shared tree on clickhouse cloud"},
{ SecretManagement, Paid, "Secret management." },
{ CdcTableSchemaMap, Paid, "Automatically map upstream schema to CDC Table."},
Expand Down

0 comments on commit ebc0e47

Please sign in to comment.