Skip to content

Commit

Permalink
Revert "remove"
Browse files Browse the repository at this point in the history
This reverts commit ae52738.
  • Loading branch information
WanYixian committed Dec 20, 2024
1 parent ae52738 commit d5bc2f0
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 0 deletions.
2 changes: 2 additions & 0 deletions cloud/create-a-connection.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,5 @@ We aim to automate this process in the future to make it even easier.
## What's next

Now, you can create a source or sink with the PrivateLink connection using SQL.

For details on how to use the VPC endpoint to create a source with the PrivateLink connection, see [Create source with PrivateLink connection](/integrations/sources/kafka#create-source-with-privatelink-connection); for creating a sink, see [Create sink with PrivateLink connection](/integrations/destinations/apache-kafka#create-sink-with-privatelink-connection).
27 changes: 27 additions & 0 deletions integrations/destinations/apache-kafka.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,33 @@ WITH (
FORMAT PLAIN ENCODE JSON;
```

## Create sink with PrivateLink connection

If your Kafka sink service is located in a different VPC from RisingWave, use AWS PrivateLink or GCP Private Service Connect to establish a secure and direct connection. For details on how to set up an AWS PrivateLink connection, see [Create an AWS PrivateLink connection](/sql/commands/sql-create-connection#create-an-aws-privatelink-connection).

To create a Kafka sink with a PrivateLink connection, in the WITH section of your `CREATE SINK` statement, specify the following parameters.

| Parameter | Notes |
| :------------------- | :---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| privatelink.targets | The PrivateLink targets that correspond to the Kafka brokers. The targets should be in JSON format. Note that each target listed corresponds to each broker specified in the properties.bootstrap.server field. If the order is incorrect, there will be connectivity issues. |
| privatelink.endpoint | The DNS name of the VPC endpoint. If you're using RisingWave Cloud, you can find the auto-generated endpoint after you created a connection. See details in [Create a VPC connection](/cloud/create-a-connection/#whats-next). |

Here is an example of creating a Kafka sink using a PrivateLink connection. Notice that `{"port": 8001}` corresponds to the broker `ip1:9092`, and `{"port": 8002}` corresponds to the broker `ip2:9092`.

```sql
CREATE SINK sink2 FROM mv2
WITH (
connector='kafka',
properties.bootstrap.server='b-1.xxx.amazonaws.com:9092,b-2.test.xxx.amazonaws.com:9092',
topic='msk_topic',
privatelink.endpoint='10.148.0.4',
privatelink.targets = '[{"port": 8001}, {"port": 8002}]'
)
FORMAT PLAIN ENCODE JSON (
force_append_only='true'
);
```

## TLS/SSL encryption and SASL authentication
RisingWave can sink data to Kafka that is encrypted with [Transport Layer Security (TLS)](https://en.wikipedia.org/wiki/Transport%5FLayer%5FSecurity) and/or authenticated with SASL.

Expand Down
39 changes: 39 additions & 0 deletions integrations/sources/kafka.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,45 @@ ENCODE AVRO (

```

## Create source with PrivateLink connection

If your Kafka source service is located in a different VPC from RisingWave, use AWS PrivateLink to establish a secure and direct connection. For details on how to set up an AWS PrivateLink connection, see [Create an AWS PrivateLink connection](/sql/commands/sql-create-connection#create-an-aws-privatelink-connection).

To create a Kafka source with a PrivateLink connection, in the WITH section of your `CREATE SOURCE` or `CREATE TABLE` statement, specify the following parameters.

| Parameter | Notes |
| :------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| privatelink.targets | The PrivateLink targets that correspond to the Kafka brokers. The targets should be in JSON format. Note that each target listed corresponds to each broker specified in the properties.bootstrap.server field. If the order is incorrect, there will be connectivity issues. |
| privatelink.endpoint | The DNS name of the VPC endpoint. If you're using RisingWave Cloud, you can find the auto-generated endpoint after you created a connection. See details in [Create a PrivateLink connection](/cloud/create-a-connection/#whats-next). |
| connection.name | The name of the connection. This parameter should only be included if you are using a connection created with the [CREATE CONNECTION](/sql/commands/sql-create-connection) statement. Omit this parameter if you have provisioned a VPC endpoint using privatelink.endpoint (recommended). |

Here is an example of creating a Kafka source using a PrivateLink connection. Notice that `{"port": 9094}` corresponds to the broker `broker1-endpoint`, `{"port": 9095}` corresponds to the broker `broker2-endpoint`, and `{"port": 9096}` corresponds to the broker `broker3-endpoint`.

```sql
CREATE TABLE IF NOT EXISTS crypto_source (
product_id VARCHAR,
price NUMERIC,
open_24h NUMERIC,
volume_24h NUMERIC,
low_24h NUMERIC,
high_24h NUMERIC,
volume_30d NUMERIC,
best_bid NUMERIC,
best_ask NUMERIC,
side VARCHAR,
time timestamp,
trade_id bigint,
)
WITH (
connector='kafka',
topic='crypto',
privatelink.endpoint='10.148.0.4',
privatelink.targets='[{"port": 9094}, {"port": 9095}, {"port": 9096}]',
properties.bootstrap.server='broker1-endpoint,broker2-endpoint,broker3-endpoint',
scan.startup.mode='latest'
) FORMAT PLAIN ENCODE JSON;
```

## TLS/SSL encryption and SASL authentication

RisingWave can read Kafka data that is encrypted with [Transport Layer Security (TLS)](https://en.wikipedia.org/wiki/Transport%5FLayer%5FSecurity) and/or authenticated with SASL.
Expand Down

0 comments on commit d5bc2f0

Please sign in to comment.