Skip to content

Commit

Permalink
fix: minor change to kafka examples #104
Browse files Browse the repository at this point in the history
Signed-off-by: Bohan Zhang <[email protected]>
  • Loading branch information
tabVersion authored Dec 4, 2024
1 parent 508dabd commit e83a29e
Showing 1 changed file with 23 additions and 15 deletions.
38 changes: 23 additions & 15 deletions integrations/sources/kafka.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,19 @@ When creating a source in RisingWave, you can specify the following Kafka parame
| receive.message.max.bytes | properties.receive.message.max.bytes | int |
| ssl.endpoint.identification.algorithm | properties.ssl.endpoint.identification.algorithm | str |

The additional Kafka parameters `queued.min.messages` and `queued.max.messages.kbytes` are specified with `properties.queued.min.messages` and `properties.queued.max.messages.kbytes`, respectively, when creating the source.

```sql
CREATE SOURCE s1 (v1 int, v2 varchar) with (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest',
properties.queued.min.messages = 10000,
properties.queued.max.messages.kbytes = 65536
) FORMAT PLAIN ENCODE JSON;
```

<Note>
Set `properties.ssl.endpoint.identification.algorithm` to `none` to bypass the verification of CA certificates and resolve SSL handshake failure. This parameter can be set to either `https` or `none`. By default, it is `https`.
</Note>
Expand Down Expand Up @@ -121,20 +134,26 @@ WITH (
scan.startup.timestamp.millis='140000000'
) FORMAT PLAIN ENCODE AVRO (
message = 'message_name',
schema.registry = 'http://127.0.0.1:8081'
schema.registry = 'http://127.0.0.1:8081',
schema.registry.username='your_schema_registry_username',
schema.registry.password='your_schema_registry_password'
);
```

</Tab>
<Tab title="Upsert Avro">
```sql
CREATE TABLE IF NOT EXISTS source_abc
CREATE TABLE IF NOT EXISTS source_abc (
primary key (rw_key)
)
include key as rw_key
WITH (
connector='kafka',
properties.bootstrap.server='localhost:9092',
topic='test_topic'
)
FORMAT UPSERT ENCODE AVRO (
message = 'message_name',
schema.registry = 'http://127.0.0.1:8081',
schema.registry.username='your_schema_registry_username',
schema.registry.password='your_schema_registry_password'
Expand Down Expand Up @@ -167,27 +186,16 @@ WITH (
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
```

The additional Kafka parameters `queued.min.messages` and `queued.max.messages.kbytes` are specified with `properties.queued.min.messages` and `properties.queued.max.messages.kbytes`, respectively, when creating the source.

```sql
CREATE SOURCE s1 (v1 int, v2 varchar) with (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest',
properties.queued.min.messages = 10000,
properties.queued.max.messages.kbytes = 65536
) FORMAT PLAIN ENCODE JSON;
```
</Tab>
<Tab title="Upsert JSON">
```sql
CREATE TABLE IF NOT EXISTS source_abc (
column1 varchar,
column2 integer,
primary key (rw_key)
)
include key as rw_key
WITH (
connector='kafka',
properties.bootstrap.server='localhost:9092',
Expand Down

0 comments on commit e83a29e

Please sign in to comment.