Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): add check license for snowflake, dynamodb opensearch sink #17912

Merged
merged 7 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions e2e_test/sink/license.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
ALTER SYSTEM SET license_key TO '';

statement ok
CREATE TABLE t (k INT);

statement error
CREATE SINK dynamodb_sink
FROM
t
WITH
(
connector = 'dynamodb',
table = 'xx',
primary_key = 'xx',
region = 'xx',
access_key = 'xx',
secret_key = 'xx'
);
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: Sink error
2: feature DynamoDbSink is only available for tier Paid and above, while the current tier is Free

Hint: You may want to set a license key with `ALTER SYSTEM SET license_key = '...';` command.


statement error
CREATE SINK snowflake_sink
FROM t
WITH (
connector = 'snowflake',
type = 'append-only',
force_append_only = 'true',
s3.bucket_name = 'xx',
s3.credentials.access = 'xx',
s3.credentials.secret = 'xx',
s3.region_name = 'xx',
s3.path = 'xx',
);
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: Sink error
2: feature SnowflakeSink is only available for tier Paid and above, while the current tier is Free

Hint: You may want to set a license key with `ALTER SYSTEM SET license_key = '...';` command.


statement error
CREATE SINK opensearch_sink
FROM t
WITH (
connector = 'opensearch',
url = 'xx',
username = 'xx',
password = 'xx',
);
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: Sink error
2: feature OpenSearchSink is only available for tier Paid and above, while the current tier is Free

Hint: You may want to set a license key with `ALTER SYSTEM SET license_key = '...';` command.


statement ok
ALTER SYSTEM SET license_key TO DEFAULT;

statement ok
flush;

statement error
CREATE SINK dynamodb_sink
FROM
t
WITH
(
connector = 'dynamodb',
table = 'xx',
primary_key = 'xx',
region = 'xx',
access_key = 'xx',
secret_key = 'xx'
);
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: Sink error
2: Sink primary key column not found: xx. Please use ',' as the delimiter for different primary key columns.


statement ok
CREATE SINK snowflake_sink
FROM t
WITH (
connector = 'snowflake',
type = 'append-only',
force_append_only = 'true',
s3.bucket_name = 'xx',
s3.credentials.access = 'xx',
s3.credentials.secret = 'xx',
s3.region_name = 'xx',
s3.path = 'xx',
);


statement error
CREATE SINK opensearch_sink
FROM t
WITH (
connector = 'opensearch',
url = 'xx',
username = 'xx',
password = 'xx',
);
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: gRPC request to meta service failed: Internal error
2: failed to validate sink
3: Java exception was thrown


statement ok
DROP SINK snowflake_sink;

statement ok
DROP TABLE t;
4 changes: 2 additions & 2 deletions src/connector/src/sink/elasticsearch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_common::types::{JsonbVal, Scalar, ToText};
use serde_json::Value;

use super::encoder::{JsonEncoder, RowEncoder};
use super::remote::{ElasticSearchSink, OpensearchSink};
use super::remote::{ElasticSearchSink, OpenSearchSink};
use crate::sink::{Result, Sink};
pub const ES_OPTION_DELIMITER: &str = "delimiter";
pub const ES_OPTION_INDEX_COLUMN: &str = "index_column";
Expand Down Expand Up @@ -172,5 +172,5 @@ impl EsStreamChunkConverter {
}

pub fn is_es_sink(sink_name: &str) -> bool {
sink_name == ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME
sink_name == ElasticSearchSink::SINK_NAME || sink_name == OpenSearchSink::SINK_NAME
}
2 changes: 1 addition & 1 deletion src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ macro_rules! for_all_sinks {
{ Nats, $crate::sink::nats::NatsSink },
{ Jdbc, $crate::sink::remote::JdbcSink },
{ ElasticSearch, $crate::sink::remote::ElasticSearchSink },
{ Opensearch, $crate::sink::remote::OpensearchSink },
{ Opensearch, $crate::sink::remote::OpenSearchSink },
{ Cassandra, $crate::sink::remote::CassandraSink },
{ HttpJava, $crate::sink::remote::HttpJavaSink },
{ Doris, $crate::sink::doris::DorisSink },
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ macro_rules! def_remote_sink {
() => {
def_remote_sink! {
{ ElasticSearch, ElasticSearchSink, "elasticsearch" }
{ Opensearch, OpensearchSink, "opensearch"}
{ Opensearch, OpenSearchSink, "opensearch"}
{ Cassandra, CassandraSink, "cassandra" }
{ Jdbc, JdbcSink, "jdbc", |desc| {
desc.sink_type.is_append_only()
Expand Down
24 changes: 23 additions & 1 deletion src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@ use risingwave_common::secret::LocalSecretManager;
use risingwave_common::types::DataType;
use risingwave_common::{bail, catalog};
use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc, SinkType};
use risingwave_connector::sink::dynamodb::DynamoDbSink;
use risingwave_connector::sink::iceberg::{IcebergConfig, ICEBERG_SINK};
use risingwave_connector::sink::remote::OpenSearchSink;
use risingwave_connector::sink::snowflake::SnowflakeSink;
use risingwave_connector::sink::{
CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION, SINK_WITHOUT_BACKFILL,
Sink, CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION,
SINK_WITHOUT_BACKFILL,
};
use risingwave_pb::catalog::{PbSource, Table};
use risingwave_pb::ddl_service::{ReplaceTablePlan, TableJobType};
Expand Down Expand Up @@ -166,6 +170,24 @@ pub async fn gen_sink_plan(
.cloned()
.ok_or_else(|| ErrorCode::BindError(format!("missing field '{CONNECTOR_TYPE_KEY}'")))?;

if connector == SnowflakeSink::SINK_NAME {
risingwave_common::license::Feature::SnowflakeSink
.check_available()
.map_err(|e| RwError::from(ErrorCode::SinkError(anyhow::Error::from(e).into())))?;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may consider adding the checker in

async fn validate(&self) -> Result<()> {

If I understand correctly, this is the right place for adding checker for sinks.


if connector == DynamoDbSink::SINK_NAME {
risingwave_common::license::Feature::DynamoDbSink
.check_available()
.map_err(|e| RwError::from(ErrorCode::SinkError(anyhow::Error::from(e).into())))?;
}

if connector == OpenSearchSink::SINK_NAME {
risingwave_common::license::Feature::OpenSearchSink
.check_available()
.map_err(|e| RwError::from(ErrorCode::SinkError(anyhow::Error::from(e).into())))?;
}

let format_desc = match stmt.sink_schema {
// Case A: new syntax `format ... encode ...`
Some(f) => {
Expand Down
3 changes: 3 additions & 0 deletions src/license/src/feature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ macro_rules! for_all_features {
{ TestPaid, Paid, "A dummy feature that's only available on paid tier for testing purposes." },
{ TimeTravel, Paid, "Query historical data within the retention period."},
{ GlueSchemaRegistry, Paid, "Use Schema Registry from AWS Glue rather than Confluent." },
{ SnowflakeSink, Paid, "Delivering data to SnowFlake." },
{ DynamoDbSink, Paid, "Delivering data to DynamoDb." },
{ OpenSearchSink, Paid, "Delivering data to OpenSearch." },
}
};
}
Expand Down