Skip to content

Commit

Permalink
Reorganize file source part (#95)
Browse files Browse the repository at this point in the history
* minor revision

* add parquet as supported formats

* batch read

* add data type mapping in parquet

* Update delivery/overview.mdx

Co-authored-by: hengm3467 <[email protected]>
Signed-off-by: IrisWan <[email protected]>

* Update delivery/overview.mdx

Co-authored-by: hengm3467 <[email protected]>
Signed-off-by: IrisWan <[email protected]>

* Apply suggestions from code review

Co-authored-by: hengm3467 <[email protected]>
Signed-off-by: IrisWan <[email protected]>

* use parquet format

* Update ingestion/overview.mdx

Co-authored-by: congyi wang <[email protected]>
Signed-off-by: IrisWan <[email protected]>

* update data type mapping

* Revert "update data type mapping"

This reverts commit 11018d2.

* update mapping

---------

Signed-off-by: IrisWan <[email protected]>
Co-authored-by: hengm3467 <[email protected]>
Co-authored-by: congyi wang <[email protected]>
  • Loading branch information
3 people authored Nov 28, 2024
1 parent 4d56439 commit a595aa4
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 35 deletions.
2 changes: 1 addition & 1 deletion changelog/product-lifecycle.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Below is a list of all features in the public preview phase:
| [Approx percentile](/sql/functions/aggregate#approx-percentile) | 2.0 |
| [Auto schema change in MySQL CDC](/integrations/sources/mysql-cdc#automatically-change-schema) | 2.0 |
| [SQL Server CDC source](/integrations/sources/sql-server-cdc) | 2.0 |
| [Sink data in parquet encode](/delivery/overview#sink-data-in-parquet-or-json-encode) | 2.0 |
| [Sink data in Parquet format](/delivery/overview#sink-data-in-parquet-or-json-format) | 2.0 |
| [Time travel queries](/processing/time-travel-queries) | 2.0 |
| [Manage secrets](/operate/manage-secrets) | 2.0 |
| [Amazon DynamoDB sink](/integrations/destinations/amazon-dynamodb) | 1.10 |
Expand Down
6 changes: 3 additions & 3 deletions delivery/overview.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,15 @@ When creating an `upsert` sink, note whether or not you need to specify the prim
* If the downstream system supports primary keys but the table in the downstream system has no primary key, then RisingWave does not allow users to create an upsert sink. A primary key must be defined in the table in the downstream system.
* If the downstream system does not support primary keys, then users must define the primary key when creating an upsert sink.

## Sink data in parquet or json encode
## Sink data in parquet or json format

<Note>
**PUBLIC PREVIEW**

Sink data in parquet encode is in the public preview stage, meaning it's nearing the final product but is not yet fully stable. If you encounter any issues or have feedback, please contact us through our [Slack channel](https://www.risingwave.com/slack). Your input is valuable in helping us improve the feature. For more information, see our [Public preview feature list](/changelog/product-lifecycle#features-in-the-public-preview-stage).
Sinking data in Parquet format is currently in public preview, meaning it is nearing the final product but may not yet be fully stable. If you encounter any issues or have feedback, please reach out to us via our [Slack channel](https://www.risingwave.com/slack). Your input is valuable in helping us improve this feature. For more details, see our [Public Preview Feature List](/changelog/product-lifecycle#features-in-the-public-preview-stage).
</Note>

RisingWave supports sinking data in Parquet or JSON encode to file systems including S3, Google Cloud Storage (GCS), Azure Blob Storage, and WebHDFS. This eliminates the need for complex data lake setups. Once the data is saved, the files can be queried using the batch processing engine of RisingWave through the `file_scan` API. You can also leverage third-party OLAP query engines for further data processing.
RisingWave supports sinking data in Parquet or JSON formats to cloud storage services, including S3, Google Cloud Storage (GCS), Azure Blob Storage, and WebHDFS. This eliminates the need for complex data lake setups. Once the data is saved, the files can be queried using RisingWave's batch processing engine through the `file_scan` API. You can also leverage third-party OLAP query engines to enhance data processing capabilities.

Below is an example to sink data to S3:

Expand Down
49 changes: 48 additions & 1 deletion ingestion/overview.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ The statement will create a streaming job that continuously ingests data from th

### Insert data into tables

You can load data in batch to RisingWave by creating a table ([CREATE TABLE](/sql/commands/sql-create-table)) and then inserting data into it ([INSERT](/sql/commands/sql-insert)). For example, the statement below creates a table `website_visits` and inserts 5 rows of data.
You can load data in batch mode to RisingWave by [creating a table](/sql/commands/sql-create-table) and then [inserting](/sql/commands/sql-insert) data into it. For example, the statement below creates a table `website_visits` and inserts 5 rows of data.

```sql
CREATE TABLE website_visits (
Expand Down Expand Up @@ -120,6 +120,53 @@ CREATE TABLE t1(
INSERT INTO t1 SELECT * FROM source_iceberg_t1;
```

## File source management

RisingWave supports reading data from file sources including AWS S3, GCS, and Azure Blob Storage.

### Batch reading from file source

To read data in batch from file sources, you need to create a materialized view from the source or create a table with the appropriate connector. You can also directly query the file source. Below are examples using AWS S3.

```sql
-- Create a source that connects to S3
CREATE SOURCE s3_source WITH ( connector = 's3', ... );

-- Create a materialized view from the source for batch processing
CREATE MATERIALIZED VIEW mv AS SELECT * FROM s3_source;

-- Create a table using the S3 connector
CREATE TABLE s3_table ( ... ) WITH ( connector = 's3', ... );

-- Select from the source directly
SELECT count(*) from s3_source;
```

### Data type mapping in Parquet

When using file source to read parquet files, please define the schema information according to the following mapping.

| File source type | RisingWave type |
| :----------- | :-------------- |
| boolean | boolean |
| int16 | smallint |
| int32 | int |
| int64 | bigint |
| float | real |
| double | double precision|
| string | varchar |
| date | date |
| `timestamp(_, Some(_))` | timestamptz |
| `timestamp(_, None)` | timestamp |
| decimal | decimal |
| int8 | smallint |
| uint8 | smallint |
| uint16 | int |
| uint32 | bigint |
| uint64 | decimal |
| float16 | double precision|


## Topics in this section

The information presented above provides a brief overview of the data ingestion process in RisingWave. To gain a more comprehensive understanding of this process, the following topics in this section will delve more deeply into the subject matter. Here is a brief introduction to what you can expect to find in each topic:
Expand Down
49 changes: 31 additions & 18 deletions ingestion/supported-sources-and-formats.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ To ingest data in formats marked with "T", you need to create tables (with conne
| [Kinesis](/integrations/sources/kinesis) | Latest | [Avro](#avro), [JSON](#json), [protobuf](#protobuf), [Debezium JSON](#debezium-json) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T) |
| [PostgreSQL CDC](/integrations/sources/postgresql-cdc) | 10, 11, 12, 13, 14 | [Debezium JSON](#debezium-json) (T) |
| [MySQL CDC](/integrations/sources/mysql-cdc) | 5.7, 8.0 | [Debezium JSON](#debezium-json) (T) |
| [CDC via Kafka](/ingestion/change-data-capture-with-risingwave) | [Debezium JSON](#debezium-json) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T) | |
| [CDC via Kafka](/ingestion/change-data-capture-with-risingwave) | | [Debezium JSON](#debezium-json) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T) |
| [Amazon S3](/integrations/sources/s3) | Latest | [JSON](#json), CSV |
| [Load generator](/ingestion/generate-test-data) | Built-in | [JSON](#json) |
| [Google Pub/Sub](/integrations/sources/google-pub-sub) | [Avro](#avro), [JSON](#json), [protobuf](#protobuf), [Debezium JSON](#debezium-json) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T) | |
| [Google Cloud Storage](/integrations/sources/google-cloud-storage) | [JSON](#json) | |
| [Google Pub/Sub](/integrations/sources/google-pub-sub) | | [Avro](#avro), [JSON](#json), [protobuf](#protobuf), [Debezium JSON](#debezium-json) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T) |
| [Google Cloud Storage](/integrations/sources/google-cloud-storage) | | [JSON](#json) |

<Note>
When a source is created, RisingWave does not ingest data immediately. RisingWave starts to process data when a materialized view is created based on the source.
Expand Down Expand Up @@ -63,6 +63,15 @@ FORMAT [ DEBEZIUM | UPSERT | PLAIN ] ENCODE AVRO (

Note that for `map.handling.mode = 'jsonb'`, the value types can only be: `null`, `boolean`, `int`, `string`, or `map`/`record`/`array` with these types.

### Bytes

RisingWave allows you to read data streams without decoding the data by using the `BYTES` row format. However, the table or source can have exactly one field of `BYTEA` data.

```sql
FORMAT PLAIN
ENCODE BYTES
```

### Debezium AVRO

When creating a source from streams in with Debezium AVRO, the schema of the source does not need to be defined in the `CREATE TABLE` statement as it can be inferred from the `SCHEMA REGISTRY`. This means that the schema file location must be specified. The schema file location can be an actual Web location, which is in `http://...`, `https://...`, or `S3://...` format, or a Confluent Schema Registry. For more details about using Schema Registry for Kafka data, see [Read schema from Schema Registry](/integrations/sources/kafka#read-schemas-from-confluent-schema-registry).
Expand Down Expand Up @@ -120,12 +129,12 @@ RisingWave supports the TiCDC dialect of the Canal CDC format. When creating a s

Syntax:

```js
```sql
FORMAT CANAL
ENCODE JSON
```

### Debezium json
### Debezium JSON

When creating a source from streams in Debezium JSON, you can define the schema of the source within the parentheses after the source name (`schema_definition` in the syntax), and specify the data and encoding formats in the `FORMAT` and `ENCODE` sections. You can directly reference data fields in the JSON payload by their names as column names in the schema.

Expand All @@ -135,7 +144,7 @@ Note that if you are ingesting data of type `timestamp` or `timestamptz` in Risi

Syntax:

```js
```sql
FORMAT DEBEZIUM
ENCODE JSON [ (
[ ignore_key = 'true | false ' ]
Expand All @@ -148,7 +157,7 @@ When loading data from MongoDB via Kafka topics in Debezium Mongo JSON format, t

Syntax:

```js
```sql
FORMAT DEBEZIUM_MONGO
ENCODE JSON
```
Expand All @@ -159,7 +168,7 @@ When creating a source from streams in Maxwell JSON, you can define the schema o

Syntax:

```js
```sql
FORMAT MAXWELL
ENCODE JSON
```
Expand All @@ -172,7 +181,7 @@ You can define the schema of the source within the parentheses after the source

Syntax:

```js
```sql
FORMAT UPSERT
ENCODE JSON [ (
schema.registry = 'schema_registry_url [, ...]',
Expand All @@ -181,6 +190,18 @@ ENCODE JSON [ (
) ]
```

### Parquet

Parquet format allows you to efficiently store and retrieve large datasets by utilizing a columnar storage architecture. RisingWave supports reading Parquet files from object storage systems including Amazon S3, Google Cloud Storage (GCS), and Azure Blob Storage.


Syntax:

```sql
FORMAT PLAIN
ENCODE PARQUET
```

### Protobuf

For data in protobuf format, you must specify a message (fully qualified by package path) and a schema location. The schema location can be an actual Web location that is in `http://...`, `https://...`, or `S3://...` format. For Kafka data in protobuf, instead of providing a schema location, you can provide a Confluent Schema Registry that RisingWave can get the schema from. For more details about using Schema Registry for Kafka data, see [Read schema from Schema Registry](/integrations/sources/kafka#read-schemas-from-confluent-schema-registry).
Expand All @@ -199,7 +220,7 @@ protoc -I=$include_path --include_imports --descriptor_set_out=schema.pb schema.

Syntax:

```js
```sql
FORMAT PLAIN
ENCODE PROTOBUF (
message = 'com.example.MyMessage',
Expand All @@ -209,14 +230,6 @@ ENCODE PROTOBUF (

For more information on supported protobuf types, refer to [Supported protobuf types](/sql/data-types/supported-protobuf-types).

### Bytes

RisingWave allows you to read data streams without decoding the data by using the `BYTES` row format. However, the table or source can have exactly one field of `BYTEA` data.

```js
FORMAT PLAIN
ENCODE BYTES
```

## General parameters for supported formats

Expand Down
2 changes: 1 addition & 1 deletion integrations/destinations/aws-s3.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,4 @@ WITH (
)FORMAT PLAIN ENCODE PARQUET(force_append_only=true);
```

For more information about encode `Parquet` or `JSON`, see [Sink data in parquet or json encode](/delivery/overview).
For more information about encode `Parquet` or `JSON`, see [Sink data in parquet or json format](/delivery/overview#sink-data-in-parquet-or-json-format).
2 changes: 1 addition & 1 deletion integrations/destinations/azure-blob.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,4 @@ WITH (
)FORMAT PLAIN ENCODE PARQUET(force_append_only=true);
```

For more information about encode `Parquet` or `JSON`, see [Sink data in parquet or json encode](/delivery/overview).
For more information about encode `Parquet` or `JSON`, see [Sink data in parquet or json format](/delivery/overview#sink-data-in-parquet-or-json-format).
2 changes: 1 addition & 1 deletion integrations/destinations/google-cloud-storage.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,4 @@ WITH (
)FORMAT PLAIN ENCODE PARQUET(force_append_only=true);
```

For more information about encode `Parquet` or `JSON`, see [Sink data in parquet or json encode](/delivery/overview).
For more information about encode `Parquet` or `JSON`, see [Sink data in parquet or json format](/delivery/overview#sink-data-in-parquet-or-json-format).
2 changes: 1 addition & 1 deletion integrations/destinations/webhdfs.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ WITH (
)FORMAT PLAIN ENCODE PARQUET(force_append_only=true);
```

For more information about encode `Parquet` or `JSON`, see [Sink data in parquet or json encode](/delivery/overview).
For more information about encode `Parquet` or `JSON`, see [Sink data in parquet or json format](/delivery/overview#sink-data-in-parquet-or-json-format).
13 changes: 5 additions & 8 deletions integrations/sources/s3.mdx
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
title: "Ingest data from S3 buckets"
sidebarTitle: AWS S3
description: "Use the SQL statement below to connect RisingWave to an Amazon S3 source. RisingWave supports both CSV and [ndjson](https://github.com/ndjson) file formats."
description: "Use the SQL statement below to connect RisingWave to an Amazon S3 source. RisingWave supports CSV, [ndjson](https://github.com/ndjson) and Parquet file formats."
---

The S3 connector does not guarantee the sequential reading of files or complete file reading.
Expand Down Expand Up @@ -52,10 +52,12 @@ For CSV data, specify the delimiter in the `delimiter` option in `ENCODE propert
| s3.assume\_role | Optional. Specifies the ARN of an IAM role to assume when accessing S3\. It allows temporary, secure access to S3 resources without sharing long-term credentials. |
| refresh.interval.sec | Optional. Configure the time interval between operations of listing files. It determines the delay in discovering new files, with a default value of 60 seconds. |

note
<Note>

Empty cells in CSV files will be parsed to `NULL`.

</Note>

| Field | Notes |
| :---------------- | :--------------------------------------------------------------------------------------------------------------------------------------- |
| _data\_format_ | Supported data format: PLAIN. |
Expand Down Expand Up @@ -174,13 +176,8 @@ CREATE MATERIALIZED VIEW mv AS SELECT * FROM s3_source;

-- Create a table with the S3 connector
CREATE TABLE s3_table ( ... ) WITH ( connector = 's3', ... );
```

To view the progress of the source, you can also read the source directly

```sql
-- Create a materialized view from the source
CREATE SOURCE s3_source WITH ( connector = 's3_v2', ... );
-- Select from the source directly
SELECT count(*) from s3_source;
```

Expand Down

0 comments on commit a595aa4

Please sign in to comment.