diff --git a/cloud/create-a-connection.mdx b/cloud/create-a-connection.mdx index 374a255a..500ecd1e 100644 --- a/cloud/create-a-connection.mdx +++ b/cloud/create-a-connection.mdx @@ -62,5 +62,3 @@ We aim to automate this process in the future to make it even easier. ## What's next Now, you can create a source or sink with the PrivateLink connection using SQL. - -For details on how to use the VPC endpoint to create a source with the PrivateLink connection, see [Create source with PrivateLink connection](/integrations/sources/kafka#create-source-with-privatelink-connection); for creating a sink, see [Create sink with PrivateLink connection](/integrations/destinations/apache-kafka#create-sink-with-privatelink-connection). diff --git a/integrations/destinations/apache-kafka.mdx b/integrations/destinations/apache-kafka.mdx index 0e8281b7..5f9cda83 100644 --- a/integrations/destinations/apache-kafka.mdx +++ b/integrations/destinations/apache-kafka.mdx @@ -231,33 +231,6 @@ WITH ( FORMAT PLAIN ENCODE JSON; ``` -## Create sink with PrivateLink connection - -If your Kafka sink service is located in a different VPC from RisingWave, use AWS PrivateLink or GCP Private Service Connect to establish a secure and direct connection. For details on how to set up an AWS PrivateLink connection, see [Create an AWS PrivateLink connection](/sql/commands/sql-create-connection#create-an-aws-privatelink-connection). - -To create a Kafka sink with a PrivateLink connection, in the WITH section of your `CREATE SINK` statement, specify the following parameters. - -| Parameter | Notes | -| :------------------- | :---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| privatelink.targets | The PrivateLink targets that correspond to the Kafka brokers. The targets should be in JSON format. Note that each target listed corresponds to each broker specified in the properties.bootstrap.server field. If the order is incorrect, there will be connectivity issues. | -| privatelink.endpoint | The DNS name of the VPC endpoint. If you're using RisingWave Cloud, you can find the auto-generated endpoint after you created a connection. See details in [Create a VPC connection](/cloud/create-a-connection/#whats-next). | - -Here is an example of creating a Kafka sink using a PrivateLink connection. Notice that `{"port": 8001}` corresponds to the broker `ip1:9092`, and `{"port": 8002}` corresponds to the broker `ip2:9092`. - -```sql -CREATE SINK sink2 FROM mv2 -WITH ( - connector='kafka', - properties.bootstrap.server='b-1.xxx.amazonaws.com:9092,b-2.test.xxx.amazonaws.com:9092', - topic='msk_topic', - privatelink.endpoint='10.148.0.4', - privatelink.targets = '[{"port": 8001}, {"port": 8002}]' -) -FORMAT PLAIN ENCODE JSON ( - force_append_only='true' -); -``` - ## TLS/SSL encryption and SASL authentication RisingWave can sink data to Kafka that is encrypted with [Transport Layer Security (TLS)](https://en.wikipedia.org/wiki/Transport%5FLayer%5FSecurity) and/or authenticated with SASL. diff --git a/integrations/sources/kafka.mdx b/integrations/sources/kafka.mdx index 24a423e6..0c1e3709 100644 --- a/integrations/sources/kafka.mdx +++ b/integrations/sources/kafka.mdx @@ -362,45 +362,6 @@ ENCODE AVRO ( ``` -## Create source with PrivateLink connection - -If your Kafka source service is located in a different VPC from RisingWave, use AWS PrivateLink to establish a secure and direct connection. For details on how to set up an AWS PrivateLink connection, see [Create an AWS PrivateLink connection](/sql/commands/sql-create-connection#create-an-aws-privatelink-connection). - -To create a Kafka source with a PrivateLink connection, in the WITH section of your `CREATE SOURCE` or `CREATE TABLE` statement, specify the following parameters. - -| Parameter | Notes | -| :------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| privatelink.targets | The PrivateLink targets that correspond to the Kafka brokers. The targets should be in JSON format. Note that each target listed corresponds to each broker specified in the properties.bootstrap.server field. If the order is incorrect, there will be connectivity issues. | -| privatelink.endpoint | The DNS name of the VPC endpoint. If you're using RisingWave Cloud, you can find the auto-generated endpoint after you created a connection. See details in [Create a PrivateLink connection](/cloud/create-a-connection/#whats-next). | -| connection.name | The name of the connection. This parameter should only be included if you are using a connection created with the [CREATE CONNECTION](/sql/commands/sql-create-connection) statement. Omit this parameter if you have provisioned a VPC endpoint using privatelink.endpoint (recommended). | - -Here is an example of creating a Kafka source using a PrivateLink connection. Notice that `{"port": 9094}` corresponds to the broker `broker1-endpoint`, `{"port": 9095}` corresponds to the broker `broker2-endpoint`, and `{"port": 9096}` corresponds to the broker `broker3-endpoint`. - -```sql -CREATE TABLE IF NOT EXISTS crypto_source ( - product_id VARCHAR, - price NUMERIC, - open_24h NUMERIC, - volume_24h NUMERIC, - low_24h NUMERIC, - high_24h NUMERIC, - volume_30d NUMERIC, - best_bid NUMERIC, - best_ask NUMERIC, - side VARCHAR, - time timestamp, - trade_id bigint, -) -WITH ( - connector='kafka', - topic='crypto', - privatelink.endpoint='10.148.0.4', - privatelink.targets='[{"port": 9094}, {"port": 9095}, {"port": 9096}]', - properties.bootstrap.server='broker1-endpoint,broker2-endpoint,broker3-endpoint', - scan.startup.mode='latest' -) FORMAT PLAIN ENCODE JSON; -``` - ## TLS/SSL encryption and SASL authentication RisingWave can read Kafka data that is encrypted with [Transport Layer Security (TLS)](https://en.wikipedia.org/wiki/Transport%5FLayer%5FSecurity) and/or authenticated with SASL.