diff --git a/integrations/sources/kafka.mdx b/integrations/sources/kafka.mdx
index 8ff26140..19949177 100644
--- a/integrations/sources/kafka.mdx
+++ b/integrations/sources/kafka.mdx
@@ -93,6 +93,19 @@ When creating a source in RisingWave, you can specify the following Kafka parame
| receive.message.max.bytes | properties.receive.message.max.bytes | int |
| ssl.endpoint.identification.algorithm | properties.ssl.endpoint.identification.algorithm | str |
+The additional Kafka parameters `queued.min.messages` and `queued.max.messages.kbytes` are specified with `properties.queued.min.messages` and `properties.queued.max.messages.kbytes`, respectively, when creating the source.
+
+```sql
+CREATE SOURCE s1 (v1 int, v2 varchar) with (
+ connector = 'kafka',
+ topic = 'kafka_1_partition_topic',
+ properties.bootstrap.server = 'message_queue:29092',
+ scan.startup.mode = 'earliest',
+ properties.queued.min.messages = 10000,
+ properties.queued.max.messages.kbytes = 65536
+) FORMAT PLAIN ENCODE JSON;
+```
+
Set `properties.ssl.endpoint.identification.algorithm` to `none` to bypass the verification of CA certificates and resolve SSL handshake failure. This parameter can be set to either `https` or `none`. By default, it is `https`.
@@ -121,20 +134,26 @@ WITH (
scan.startup.timestamp.millis='140000000'
) FORMAT PLAIN ENCODE AVRO (
message = 'message_name',
- schema.registry = 'http://127.0.0.1:8081'
+ schema.registry = 'http://127.0.0.1:8081',
+ schema.registry.username='your_schema_registry_username',
+ schema.registry.password='your_schema_registry_password'
);
```
```sql
-CREATE TABLE IF NOT EXISTS source_abc
+CREATE TABLE IF NOT EXISTS source_abc (
+ primary key (rw_key)
+)
+include key as rw_key
WITH (
connector='kafka',
properties.bootstrap.server='localhost:9092',
topic='test_topic'
)
FORMAT UPSERT ENCODE AVRO (
+ message = 'message_name',
schema.registry = 'http://127.0.0.1:8081',
schema.registry.username='your_schema_registry_username',
schema.registry.password='your_schema_registry_password'
@@ -167,19 +186,6 @@ WITH (
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
-```
-
-The additional Kafka parameters `queued.min.messages` and `queued.max.messages.kbytes` are specified with `properties.queued.min.messages` and `properties.queued.max.messages.kbytes`, respectively, when creating the source.
-
-```sql
-CREATE SOURCE s1 (v1 int, v2 varchar) with (
- connector = 'kafka',
- topic = 'kafka_1_partition_topic',
- properties.bootstrap.server = 'message_queue:29092',
- scan.startup.mode = 'earliest',
- properties.queued.min.messages = 10000,
- properties.queued.max.messages.kbytes = 65536
-) FORMAT PLAIN ENCODE JSON;
```
@@ -187,7 +193,9 @@ CREATE SOURCE s1 (v1 int, v2 varchar) with (
CREATE TABLE IF NOT EXISTS source_abc (
column1 varchar,
column2 integer,
+ primary key (rw_key)
)
+include key as rw_key
WITH (
connector='kafka',
properties.bootstrap.server='localhost:9092',