diff --git a/ingestion/supported-sources-and-formats.mdx b/ingestion/supported-sources-and-formats.mdx index 7079f957..5d563c72 100644 --- a/ingestion/supported-sources-and-formats.mdx +++ b/ingestion/supported-sources-and-formats.mdx @@ -12,17 +12,25 @@ To ingest data in formats marked with "T", you need to create tables (with conne | Connector | Version | Format | | :------------ | :------------ | :------------------- | -| [Kafka](/integrations/sources/kafka) | 3.1.0 or later versions | [Avro](#avro), [JSON](#json), [protobuf](#protobuf), [Debezium JSON](#debezium-json) (T), [Debezium AVRO](#debezium-avro) (T), [DEBEZIUM\_MONGO\_JSON](#debezium-mongo-json) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T), [Upsert JSON](#upsert-json) (T), [Upsert AVRO](#upsert-avro) (T), [Bytes](#bytes) | -| [Redpanda](/integrations/sources/redpanda) | Latest | [Avro](#avro), [JSON](#json), [protobuf](#protobuf) | -| [Pulsar](/integrations/sources/pulsar) | 2.8.0 or later versions | [Avro](#avro), [JSON](#json), [protobuf](#protobuf), [Debezium JSON](#debezium-json) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T) | -| [Kinesis](/integrations/sources/kinesis) | Latest | [Avro](#avro), [JSON](#json), [protobuf](#protobuf), [Debezium JSON](#debezium-json) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T) | -| [PostgreSQL CDC](/integrations/sources/postgresql-cdc) | 10, 11, 12, 13, 14 | [Debezium JSON](#debezium-json) (T) | -| [MySQL CDC](/integrations/sources/mysql-cdc) | 5.7, 8.0 | [Debezium JSON](#debezium-json) (T) | -| [CDC via Kafka](/ingestion/change-data-capture-with-risingwave) | | [Debezium JSON](#debezium-json) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T) | -| [Amazon S3](/integrations/sources/s3) | Latest | [JSON](#json), CSV | -| [Load generator](/ingestion/generate-test-data) | Built-in | [JSON](#json) | -| [Google Pub/Sub](/integrations/sources/google-pub-sub) | | [Avro](#avro), [JSON](#json), [protobuf](#protobuf), [Debezium JSON](#debezium-json) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T) | -| [Google Cloud Storage](/integrations/sources/google-cloud-storage) | | [JSON](#json) | +| [Kafka](/integrations/sources/kafka) | 3.1.0 or later versions | [JSON](#json), [Protobuf](#protobuf), [Avro](#avro), [Bytes](#bytes), [CSV](#csv), [Upsert JSON](#upsert-json) (T), [Upsert Avro](#upsert-avro) (T), Upsert Protobuf (T), [Debezium JSON](#debezium-json) (T), [Debezium Avro](#debezium-avro) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T), [Debezium Mongo JSON](#debezium-mongo-json) (T) | +| [Redpanda](/integrations/sources/redpanda) | Latest | [JSON](#json), [Protobuf](#protobuf), [Avro](#avro) | +| [Pulsar](/integrations/sources/pulsar) | 2.8.0 or later versions | [JSON](#json), [Protobuf](#protobuf), [Avro](#avro), [Bytes](#bytes), [Upsert JSON](#upsert-json) (T), [Upsert Avro](#upsert-avro) (T), [Debezium JSON](#debezium-json) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T) | +| [Kinesis](/integrations/sources/kinesis) | Latest | [JSON](#json), [Protobuf](#protobuf), [Avro](#avro), [Bytes](#bytes), [Upsert JSON](#upsert-json) (T), [Upsert Avro](#upsert-avro) (T), [Debezium JSON](#debezium-json) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T) | +| [PostgreSQL CDC](/integrations/sources/postgresql-cdc) | 10, 11, 12, 13, 14 | [Debezium JSON](#debezium-json) (T) | +| [MySQL CDC](/integrations/sources/mysql-cdc) | 5.7, 8.0 | [Debezium JSON](#debezium-json) (T) | +| [SQL Server CDC](/integrations/sources/sql-server-cdc) | 2019, 2022 | [Debezium JSON](#debezium-json) (T) | +| [MongoDB CDC](/integrations/sources/mongodb-cdc) | | [Debezium Mongo JSON](#debezium-mongo-json) (T) | +| [Citus CDC](/integrations/sources/citus-cdc) | 10.2 | [Debezium JSON](#debezium-json) (T) | +| [CDC via Kafka](/ingestion/change-data-capture-with-risingwave) | | [Debezium JSON](#debezium-json) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T) | +| [Google Pub/Sub](/integrations/sources/google-pub-sub) | | [JSON](#json), [Protobuf](#protobuf), [Avro](#avro), [Bytes](#bytes), [Debezium JSON](#debezium-json) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T) | +| [Amazon S3](/integrations/sources/s3) | Latest | [JSON](#json), [CSV](#csv), [Parquet](#parquet) | +| [Google Cloud Storage](/integrations/sources/google-cloud-storage) | | [JSON](#json), [CSV](#csv), [Parquet](#parquet) | +| [Azure Blob](/integrations/sources/azure-blob) | | [JSON](#json), [CSV](#csv), [Parquet](#parquet) | +{/* | [POSIX File System]() | | [CSV](#csv) | */} +| [NATS JetStream](/integrations/sources/nats-jetstream) | | [JSON](#json), [Protobuf](#protobuf), [Bytes](#bytes) | +| [MQTT](/integrations/sources/mqtt) | | [JSON](#json), [Bytes](#bytes) | +| [Apache Iceberg](/integrations/sources/apache-iceberg) | | No need to specify `FORMAT` | +| [Load generator](/ingestion/generate-test-data) | Built-in | [JSON](#json) | When a source is created, RisingWave does not ingest data immediately. RisingWave starts to process data when a materialized view is created based on the source. @@ -72,7 +80,7 @@ FORMAT PLAIN ENCODE BYTES ``` -### Debezium AVRO +### Debezium Avro When creating a source from streams in with Debezium AVRO, the schema of the source does not need to be defined in the `CREATE TABLE` statement as it can be inferred from the `SCHEMA REGISTRY`. This means that the schema file location must be specified. The schema file location can be an actual Web location, which is in `http://...`, `https://...`, or `S3://...` format, or a Confluent Schema Registry. For more details about using Schema Registry for Kafka data, see [Read schema from Schema Registry](/integrations/sources/kafka#read-schemas-from-confluent-schema-registry). @@ -190,11 +198,26 @@ ENCODE JSON [ ( ) ] ``` +### CSV + +To consume data in CSV format, you can use `ENCODE PLAIN FORMAT CSV` with options. Configurable options include `delimiter` and `without_header`. + +Syntax: + +```sql +FORMAT PLAIN +ENCODE CSV ( + delimiter = 'delimiter', + without_header = 'false' | 'true' +) +``` + +The `delimiter` option is required, while the `without_header` option is optional, with a default value of `false`. + ### Parquet Parquet format allows you to efficiently store and retrieve large datasets by utilizing a columnar storage architecture. RisingWave supports reading Parquet files from object storage systems including Amazon S3, Google Cloud Storage (GCS), and Azure Blob Storage. - Syntax: ```sql @@ -230,7 +253,6 @@ ENCODE PROTOBUF ( For more information on supported protobuf types, refer to [Supported protobuf types](/sql/data-types/supported-protobuf-types). - ## General parameters for supported formats Here are some notes regarding parameters that can be applied to multiple formats supported by our systems.