Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

complete source format table #121

Merged
merged 4 commits into from
Dec 10, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 35 additions & 14 deletions ingestion/supported-sources-and-formats.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,24 @@ To ingest data in formats marked with "T", you need to create tables (with conne

| Connector | Version | Format |
| :------------ | :------------ | :------------------- |
| [Kafka](/integrations/sources/kafka) | 3.1.0 or later versions | [Avro](#avro), [JSON](#json), [protobuf](#protobuf), [Debezium JSON](#debezium-json) (T), [Debezium AVRO](#debezium-avro) (T), [DEBEZIUM\_MONGO\_JSON](#debezium-mongo-json) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T), [Upsert JSON](#upsert-json) (T), [Upsert AVRO](#upsert-avro) (T), [Bytes](#bytes) |
| [Redpanda](/integrations/sources/redpanda) | Latest | [Avro](#avro), [JSON](#json), [protobuf](#protobuf) |
| [Pulsar](/integrations/sources/pulsar) | 2.8.0 or later versions | [Avro](#avro), [JSON](#json), [protobuf](#protobuf), [Debezium JSON](#debezium-json) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T) |
| [Kinesis](/integrations/sources/kinesis) | Latest | [Avro](#avro), [JSON](#json), [protobuf](#protobuf), [Debezium JSON](#debezium-json) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T) |
| [PostgreSQL CDC](/integrations/sources/postgresql-cdc) | 10, 11, 12, 13, 14 | [Debezium JSON](#debezium-json) (T) |
| [MySQL CDC](/integrations/sources/mysql-cdc) | 5.7, 8.0 | [Debezium JSON](#debezium-json) (T) |
| [CDC via Kafka](/ingestion/change-data-capture-with-risingwave) | | [Debezium JSON](#debezium-json) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T) |
| [Amazon S3](/integrations/sources/s3) | Latest | [JSON](#json), CSV |
| [Load generator](/ingestion/generate-test-data) | Built-in | [JSON](#json) |
| [Google Pub/Sub](/integrations/sources/google-pub-sub) | | [Avro](#avro), [JSON](#json), [protobuf](#protobuf), [Debezium JSON](#debezium-json) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T) |
| [Google Cloud Storage](/integrations/sources/google-cloud-storage) | | [JSON](#json) |
| [Kafka](/integrations/sources/kafka) | 3.1.0 or later versions | [JSON](#json), [Protobuf](#protobuf), [Avro](#avro), [Bytes](#bytes), [CSV](#csv), [Upsert JSON](#upsert-json) (T), [Upsert Avro](#upsert-avro) (T), Upsert Protobuf (T), [Debezium JSON](#debezium-json) (T), [Debezium Avro](#debezium-avro) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T), [Debezium Mongo JSON](#debezium-mongo-json) (T) |
| [Redpanda](/integrations/sources/redpanda) | Latest | [JSON](#json), [Protobuf](#protobuf), [Avro](#avro) |
| [Pulsar](/integrations/sources/pulsar) | 2.8.0 or later versions | [JSON](#json), [Protobuf](#protobuf), [Avro](#avro), [Bytes](#bytes), [Upsert JSON](#upsert-json) (T), [Upsert Avro](#upsert-avro) (T), [Debezium JSON](#debezium-json) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T) |
| [Kinesis](/integrations/sources/kinesis) | Latest | [JSON](#json), [Protobuf](#protobuf), [Avro](#avro), [Bytes](#bytes), [Upsert JSON](#upsert-json) (T), [Upsert Avro](#upsert-avro) (T), [Debezium JSON](#debezium-json) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T) |
| [PostgreSQL CDC](/integrations/sources/postgresql-cdc) | 10, 11, 12, 13, 14 | [Debezium JSON](#debezium-json) (T) |
| [MySQL CDC](/integrations/sources/mysql-cdc) | 5.7, 8.0 | [Debezium JSON](#debezium-json) (T) |
| [SQL Server CDC](/integrations/sources/sql-server-cdc) | 2019, 2022 | [Debezium JSON](#debezium-json) (T) |
| [MongoDB CDC](/integrations/sources/mongodb-cdc) | | [Debezium Mongo JSON](#debezium-mongo-json) (T) |
| [Citus CDC](/integrations/sources/citus-cdc) | 10.2 | [Debezium JSON](#debezium-json) (T) |
| [CDC via Kafka](/ingestion/change-data-capture-with-risingwave) | | [Debezium JSON](#debezium-json) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T) |
| [Google Pub/Sub](/integrations/sources/google-pub-sub) | | [JSON](#json), [Protobuf](#protobuf), [Avro](#avro), [Bytes](#bytes), [Debezium JSON](#debezium-json) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T) |
| [Amazon S3](/integrations/sources/s3) | Latest | [JSON](#json), [CSV](#csv), [Parquet](#parquet) |
| [Google Cloud Storage](/integrations/sources/google-cloud-storage) | Latest | [JSON](#json), [CSV](#csv), [Parquet](#parquet) |
| [Azure Blob](/integrations/sources/azure-blob) | Latest | [JSON](#json), [CSV](#csv), [Parquet](#parquet) |
| [NATS JetStream](/integrations/sources/nats-jetstream) | | [JSON](#json), [Protobuf](#protobuf), [Bytes](#bytes) |
| [MQTT](/integrations/sources/mqtt) | | [JSON](#json), [Bytes](#bytes) |
| [Apache Iceberg](/integrations/sources/apache-iceberg) | | No need to specify `FORMAT` |
| [Load generator](/ingestion/generate-test-data) | Built-in | [JSON](#json) |

<Note>
When a source is created, RisingWave does not ingest data immediately. RisingWave starts to process data when a materialized view is created based on the source.
Expand Down Expand Up @@ -72,7 +79,7 @@ FORMAT PLAIN
ENCODE BYTES
```

### Debezium AVRO
### Debezium Avro

When creating a source from streams in with Debezium AVRO, the schema of the source does not need to be defined in the `CREATE TABLE` statement as it can be inferred from the `SCHEMA REGISTRY`. This means that the schema file location must be specified. The schema file location can be an actual Web location, which is in `http://...`, `https://...`, or `S3://...` format, or a Confluent Schema Registry. For more details about using Schema Registry for Kafka data, see [Read schema from Schema Registry](/integrations/sources/kafka#read-schemas-from-confluent-schema-registry).

Expand Down Expand Up @@ -190,11 +197,26 @@ ENCODE JSON [ (
) ]
```

### CSV

To consume data in CSV format, you can use `ENCODE PLAIN FORMAT CSV` with options. Configurable options include `delimiter` and `without_header`.

Syntax:

```sql
FORMAT PLAIN
ENCODE CSV (
delimiter = 'delimiter',
without_header = 'false' | 'true'
)
```

The `delimiter` option is required, while the `without_header` option is optional, with a default value of `false`.

### Parquet

Parquet format allows you to efficiently store and retrieve large datasets by utilizing a columnar storage architecture. RisingWave supports reading Parquet files from object storage systems including Amazon S3, Google Cloud Storage (GCS), and Azure Blob Storage.


Syntax:

```sql
Expand Down Expand Up @@ -230,7 +252,6 @@ ENCODE PROTOBUF (

For more information on supported protobuf types, refer to [Supported protobuf types](/sql/data-types/supported-protobuf-types).


## General parameters for supported formats

Here are some notes regarding parameters that can be applied to multiple formats supported by our systems.
Expand Down
Loading