Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): add check license for snowflake, dynamodb opensearch sink #17912

Merged
merged 7 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 195 additions & 0 deletions e2e_test/sink/license.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
ALTER SYSTEM SET license_key TO '';

statement ok
CREATE TABLE t (k INT);

statement error
CREATE SINK dynamodb_sink
FROM
t
WITH
(
connector = 'dynamodb',
table = 'xx',
primary_key = 'k',
region = 'xx',
access_key = 'xx',
secret_key = 'xx'
);
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: gRPC request to meta service failed: Internal error
2: failed to validate sink
3: Internal error
4: feature DynamoDbSink is only available for tier Paid and above, while the current tier is Free

Hint: You may want to set a license key with `ALTER SYSTEM SET license_key = '...';` command.


statement error
CREATE SINK snowflake_sink
FROM t
WITH (
connector = 'snowflake',
type = 'append-only',
force_append_only = 'true',
s3.bucket_name = 'xx',
s3.credentials.access = 'xx',
s3.credentials.secret = 'xx',
s3.region_name = 'xx',
s3.path = 'xx',
);
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: gRPC request to meta service failed: Internal error
2: failed to validate sink
3: Internal error
4: feature SnowflakeSink is only available for tier Paid and above, while the current tier is Free

Hint: You may want to set a license key with `ALTER SYSTEM SET license_key = '...';` command.


statement error
CREATE SINK opensearch_sink
FROM t
WITH (
connector = 'opensearch',
url = 'xx',
username = 'xx',
password = 'xx',
);
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: gRPC request to meta service failed: Internal error
2: failed to validate sink
3: feature OpenSearchSink is only available for tier Paid and above, while the current tier is Free

Hint: You may want to set a license key with `ALTER SYSTEM SET license_key = '...';` command.


statement error
CREATE SINK bigquery_sink
FROM
t
WITH
(
connector = 'bigquery',
type = 'append-only',
force_append_only='true',
bigquery.local.path= 'xx',
bigquery.project= 'xx',
bigquery.dataset= 'xx',
bigquery.table= 'xx'
);
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: gRPC request to meta service failed: Internal error
2: failed to validate sink
3: Internal error
4: feature BigQuerySink is only available for tier Paid and above, while the current tier is Free

Hint: You may want to set a license key with `ALTER SYSTEM SET license_key = '...';` command.


statement ok
ALTER SYSTEM SET license_key TO DEFAULT;

statement ok
flush;

statement error
CREATE SINK dynamodb_sink
FROM
t
WITH
(
connector = 'dynamodb',
table = 'xx',
primary_key = 'xx',
region = 'xx',
access_key = 'xx',
secret_key = 'xx'
);
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: Sink error
2: Sink primary key column not found: xx. Please use ',' as the delimiter for different primary key columns.


statement ok
CREATE SINK snowflake_sink
FROM t
WITH (
connector = 'snowflake',
type = 'append-only',
force_append_only = 'true',
s3.bucket_name = 'xx',
s3.credentials.access = 'xx',
s3.credentials.secret = 'xx',
s3.region_name = 'xx',
s3.path = 'xx',
);


statement error
CREATE SINK opensearch_sink
FROM t
WITH (
connector = 'opensearch',
url = 'xx',
username = 'xx',
password = 'xx',
index = 'xx',
);
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: gRPC request to meta service failed: Internal error
2: failed to validate sink
3: sink cannot pass validation: INTERNAL: Connection is closed


statement error
CREATE SINK bigquery_sink
FROM
t
WITH
(
connector = 'bigquery',
type = 'append-only',
force_append_only='true',
bigquery.local.path= 'xx',
bigquery.project= 'xx',
bigquery.dataset= 'xx',
bigquery.table= 'xx'
);
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: gRPC request to meta service failed: Internal error
2: failed to validate sink
3: BigQuery error
4: No such file or directory (os error 2)


statement ok
DROP SINK snowflake_sink;

statement ok
DROP TABLE t;
3 changes: 3 additions & 0 deletions src/connector/src/sink/big_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,9 @@ impl Sink for BigQuerySink {
}

async fn validate(&self) -> Result<()> {
risingwave_common::license::Feature::BigQuerySink
.check_available()
.map_err(|e| anyhow::anyhow!(e))?;
if !self.is_append_only && self.pk_indices.is_empty() {
return Err(SinkError::Config(anyhow!(
"Primary key not defined for upsert bigquery sink (please define in `primary_key` field)")));
Expand Down
3 changes: 3 additions & 0 deletions src/connector/src/sink/dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ impl Sink for DynamoDbSink {
const SINK_NAME: &'static str = DYNAMO_DB_SINK;

async fn validate(&self) -> Result<()> {
risingwave_common::license::Feature::DynamoDbSink
.check_available()
.map_err(|e| anyhow::anyhow!(e))?;
let client = (self.config.build_client().await)
.context("validate DynamoDB sink error")
.map_err(SinkError::DynamoDb)?;
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/sink/elasticsearch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_common::types::{JsonbVal, Scalar, ToText};
use serde_json::Value;

use super::encoder::{JsonEncoder, RowEncoder};
use super::remote::{ElasticSearchSink, OpensearchSink};
use super::remote::{ElasticSearchSink, OpenSearchSink};
use crate::sink::{Result, Sink};
pub const ES_OPTION_DELIMITER: &str = "delimiter";
pub const ES_OPTION_INDEX_COLUMN: &str = "index_column";
Expand Down Expand Up @@ -172,5 +172,5 @@ impl EsStreamChunkConverter {
}

pub fn is_es_sink(sink_name: &str) -> bool {
sink_name == ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME
sink_name == ElasticSearchSink::SINK_NAME || sink_name == OpenSearchSink::SINK_NAME
}
2 changes: 1 addition & 1 deletion src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ macro_rules! for_all_sinks {
{ Nats, $crate::sink::nats::NatsSink },
{ Jdbc, $crate::sink::remote::JdbcSink },
{ ElasticSearch, $crate::sink::remote::ElasticSearchSink },
{ Opensearch, $crate::sink::remote::OpensearchSink },
{ Opensearch, $crate::sink::remote::OpenSearchSink },
{ Cassandra, $crate::sink::remote::CassandraSink },
{ HttpJava, $crate::sink::remote::HttpJavaSink },
{ Doris, $crate::sink::doris::DorisSink },
Expand Down
7 changes: 6 additions & 1 deletion src/connector/src/sink/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ macro_rules! def_remote_sink {
() => {
def_remote_sink! {
{ ElasticSearch, ElasticSearchSink, "elasticsearch" }
{ Opensearch, OpensearchSink, "opensearch"}
{ Opensearch, OpenSearchSink, "opensearch"}
{ Cassandra, CassandraSink, "cassandra" }
{ Jdbc, JdbcSink, "jdbc", |desc| {
desc.sink_type.is_append_only()
Expand Down Expand Up @@ -165,6 +165,11 @@ impl<R: RemoteSinkTrait> Sink for RemoteSink<R> {
}

async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorResult<()> {
if sink_name == OpenSearchSink::SINK_NAME {
risingwave_common::license::Feature::OpenSearchSink
.check_available()
.map_err(|e| anyhow::anyhow!(e))?;
}
if is_es_sink(sink_name)
&& param.downstream_pk.len() > 1
&& !param.properties.contains_key(ES_OPTION_DELIMITER)
Expand Down
3 changes: 3 additions & 0 deletions src/connector/src/sink/snowflake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ impl Sink for SnowflakeSink {
}

async fn validate(&self) -> Result<()> {
risingwave_common::license::Feature::SnowflakeSink
.check_available()
.map_err(|e| anyhow::anyhow!(e))?;
if !self.is_append_only {
return Err(SinkError::Config(
anyhow!("SnowflakeSink only supports append-only mode at present, please change the query to append-only, or use `force_append_only = 'true'`")
Expand Down
4 changes: 4 additions & 0 deletions src/license/src/feature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ macro_rules! for_all_features {
{ TestPaid, Paid, "A dummy feature that's only available on paid tier for testing purposes." },
{ TimeTravel, Paid, "Query historical data within the retention period."},
{ GlueSchemaRegistry, Paid, "Use Schema Registry from AWS Glue rather than Confluent." },
{ SnowflakeSink, Paid, "Delivering data to SnowFlake." },
{ DynamoDbSink, Paid, "Delivering data to DynamoDb." },
{ OpenSearchSink, Paid, "Delivering data to OpenSearch." },
{ BigQuerySink, Paid, "Delivering data to BigQuery." },
{ SecretManagement, Paid, "Secret management." },
{ SqlServerSink, Paid, "Sink data from RisingWave to SQL Server." },
}
Expand Down
Loading