diff --git a/integrations/sources/kafka.mdx b/integrations/sources/kafka.mdx index 8ff26140..19949177 100644 --- a/integrations/sources/kafka.mdx +++ b/integrations/sources/kafka.mdx @@ -93,6 +93,19 @@ When creating a source in RisingWave, you can specify the following Kafka parame | receive.message.max.bytes | properties.receive.message.max.bytes | int | | ssl.endpoint.identification.algorithm | properties.ssl.endpoint.identification.algorithm | str | +The additional Kafka parameters `queued.min.messages` and `queued.max.messages.kbytes` are specified with `properties.queued.min.messages` and `properties.queued.max.messages.kbytes`, respectively, when creating the source. + +```sql +CREATE SOURCE s1 (v1 int, v2 varchar) with ( + connector = 'kafka', + topic = 'kafka_1_partition_topic', + properties.bootstrap.server = 'message_queue:29092', + scan.startup.mode = 'earliest', + properties.queued.min.messages = 10000, + properties.queued.max.messages.kbytes = 65536 +) FORMAT PLAIN ENCODE JSON; +``` + Set `properties.ssl.endpoint.identification.algorithm` to `none` to bypass the verification of CA certificates and resolve SSL handshake failure. This parameter can be set to either `https` or `none`. By default, it is `https`. @@ -121,20 +134,26 @@ WITH ( scan.startup.timestamp.millis='140000000' ) FORMAT PLAIN ENCODE AVRO ( message = 'message_name', - schema.registry = 'http://127.0.0.1:8081' + schema.registry = 'http://127.0.0.1:8081', + schema.registry.username='your_schema_registry_username', + schema.registry.password='your_schema_registry_password' ); ``` ```sql -CREATE TABLE IF NOT EXISTS source_abc +CREATE TABLE IF NOT EXISTS source_abc ( + primary key (rw_key) +) +include key as rw_key WITH ( connector='kafka', properties.bootstrap.server='localhost:9092', topic='test_topic' ) FORMAT UPSERT ENCODE AVRO ( + message = 'message_name', schema.registry = 'http://127.0.0.1:8081', schema.registry.username='your_schema_registry_username', schema.registry.password='your_schema_registry_password' @@ -167,19 +186,6 @@ WITH ( properties.bootstrap.server = 'message_queue:29092', scan.startup.mode = 'earliest' ) FORMAT PLAIN ENCODE JSON; -``` - -The additional Kafka parameters `queued.min.messages` and `queued.max.messages.kbytes` are specified with `properties.queued.min.messages` and `properties.queued.max.messages.kbytes`, respectively, when creating the source. - -```sql -CREATE SOURCE s1 (v1 int, v2 varchar) with ( - connector = 'kafka', - topic = 'kafka_1_partition_topic', - properties.bootstrap.server = 'message_queue:29092', - scan.startup.mode = 'earliest', - properties.queued.min.messages = 10000, - properties.queued.max.messages.kbytes = 65536 -) FORMAT PLAIN ENCODE JSON; ``` @@ -187,7 +193,9 @@ CREATE SOURCE s1 (v1 int, v2 varchar) with ( CREATE TABLE IF NOT EXISTS source_abc ( column1 varchar, column2 integer, + primary key (rw_key) ) +include key as rw_key WITH ( connector='kafka', properties.bootstrap.server='localhost:9092',