Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reorganize file source part #95

Merged
merged 12 commits into from
Nov 28, 2024
2 changes: 1 addition & 1 deletion delivery/overview.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ When creating an `upsert` sink, note whether or not you need to specify the prim
<Note>
**PUBLIC PREVIEW**

Sink data in parquet encode is in the public preview stage, meaning it's nearing the final product but is not yet fully stable. If you encounter any issues or have feedback, please contact us through our [Slack channel](https://www.risingwave.com/slack). Your input is valuable in helping us improve the feature. For more information, see our [Public preview feature list](/changelog/product-lifecycle#features-in-the-public-preview-stage).
Sink data in Parquet encode is in the public preview stage, meaning it's nearing the final product but is not yet fully stable. If you encounter any issues or have feedback, please contact us through our [Slack channel](https://www.risingwave.com/slack). Your input is valuable in helping us improve the feature. For more information, see our [Public preview feature list](/changelog/product-lifecycle#features-in-the-public-preview-stage).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WanYixian Let's always refer to Parquet as a format. "Sinking data in Parquet format..."

WanYixian marked this conversation as resolved.
Show resolved Hide resolved
</Note>

RisingWave supports sinking data in Parquet or JSON encode to file systems including S3, Google Cloud Storage (GCS), Azure Blob Storage, and WebHDFS. This eliminates the need for complex data lake setups. Once the data is saved, the files can be queried using the batch processing engine of RisingWave through the `file_scan` API. You can also leverage third-party OLAP query engines for further data processing.
WanYixian marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
48 changes: 47 additions & 1 deletion ingestion/overview.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ The statement will create a streaming job that continuously ingests data from th

### Insert data into tables

You can load data in batch to RisingWave by creating a table ([CREATE TABLE](/sql/commands/sql-create-table)) and then inserting data into it ([INSERT](/sql/commands/sql-insert)). For example, the statement below creates a table `website_visits` and inserts 5 rows of data.
You can load data in batch to RisingWave by [creating a table](/sql/commands/sql-create-table) and then [inserting](/sql/commands/sql-insert) data into it. For example, the statement below creates a table `website_visits` and inserts 5 rows of data.
WanYixian marked this conversation as resolved.
Show resolved Hide resolved

```sql
CREATE TABLE website_visits (
Expand Down Expand Up @@ -120,6 +120,52 @@ CREATE TABLE t1(
INSERT INTO t1 SELECT * FROM source_iceberg_t1;
```

## File source management

RisingWave supports reading data from file sources including AWS S3, GCS, and Azure Blob Storage.

### Batch reading from file source

To read data in batch from file sources, you need to create a materialized view from the source or create a table with the appropriate connector. You can also directly query the file source. Below are examples using AWS S3.

```sql
-- Create a source that connects to S3
CREATE SOURCE s3_source WITH ( connector = 's3', ... );

-- Create a materialized view from the source for batch processing
CREATE MATERIALIZED VIEW mv AS SELECT * FROM s3_source;

-- Create a table using the S3 connector
CREATE TABLE s3_table ( ... ) WITH ( connector = 's3', ... );

-- Select from the source directly
SELECT count(*) from s3_source;
```

### Data type mapping in Parquet

You can use the table function `file_scan()` to read Parquet files from sources. Below is the data type mapping that shows how RisingWave converts data types from file sources in Parquet format.
WanYixian marked this conversation as resolved.
Show resolved Hide resolved

| File source type | RisingWave type |
| :----------- | :-------------- |
| boolean | boolean |
| integer | int |
| long | bigint |
| float | real |
| double | double |
| string | varchar |
| date | date |
| timestamptz | timestamptz |
| timestamp | timestamp |
| decimal | decimal |
| Int8 | Int16 |
| UInt8 | Int16 |
| UInt16 | Int32 |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Int16 should be changed into smallint, Int32 -> int, Int64 -> bigint?
cc @xiangjinwu to confirm.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The right column for RisingWave shall use the names smallint, int, bigint, decimal, real, double precision.

The left column for Parquet shall also be consistent. Names shall be int32 and int64 rather than integer and long. (And seems int16 is missing?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For unsupported datatype, I think Int96 , FIXED_LEN_BYTE_ARRAY , refer to https://parquet.apache.org/docs/file-format/types/

Not sure if there is any omission, can you help to confirm?

| UInt32 | Int64 |
| UInt64 | Decimal |
| Float16 | Float32 |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto. Float32 -> real



## Topics in this section

The information presented above provides a brief overview of the data ingestion process in RisingWave. To gain a more comprehensive understanding of this process, the following topics in this section will delve more deeply into the subject matter. Here is a brief introduction to what you can expect to find in each topic:
Expand Down
49 changes: 31 additions & 18 deletions ingestion/supported-sources-and-formats.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ To ingest data in formats marked with "T", you need to create tables (with conne
| [Kinesis](/integrations/sources/kinesis) | Latest | [Avro](#avro), [JSON](#json), [protobuf](#protobuf), [Debezium JSON](#debezium-json) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T) |
| [PostgreSQL CDC](/integrations/sources/postgresql-cdc) | 10, 11, 12, 13, 14 | [Debezium JSON](#debezium-json) (T) |
| [MySQL CDC](/integrations/sources/mysql-cdc) | 5.7, 8.0 | [Debezium JSON](#debezium-json) (T) |
| [CDC via Kafka](/ingestion/change-data-capture-with-risingwave) | [Debezium JSON](#debezium-json) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T) | |
| [CDC via Kafka](/ingestion/change-data-capture-with-risingwave) | | [Debezium JSON](#debezium-json) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T) |
| [Amazon S3](/integrations/sources/s3) | Latest | [JSON](#json), CSV |
| [Load generator](/ingestion/generate-test-data) | Built-in | [JSON](#json) |
| [Google Pub/Sub](/integrations/sources/google-pub-sub) | [Avro](#avro), [JSON](#json), [protobuf](#protobuf), [Debezium JSON](#debezium-json) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T) | |
| [Google Cloud Storage](/integrations/sources/google-cloud-storage) | [JSON](#json) | |
| [Google Pub/Sub](/integrations/sources/google-pub-sub) | | [Avro](#avro), [JSON](#json), [protobuf](#protobuf), [Debezium JSON](#debezium-json) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T) |
| [Google Cloud Storage](/integrations/sources/google-cloud-storage) | | [JSON](#json) |

<Note>
When a source is created, RisingWave does not ingest data immediately. RisingWave starts to process data when a materialized view is created based on the source.
Expand Down Expand Up @@ -63,6 +63,15 @@ FORMAT [ DEBEZIUM | UPSERT | PLAIN ] ENCODE AVRO (

Note that for `map.handling.mode = 'jsonb'`, the value types can only be: `null`, `boolean`, `int`, `string`, or `map`/`record`/`array` with these types.

### Bytes

RisingWave allows you to read data streams without decoding the data by using the `BYTES` row format. However, the table or source can have exactly one field of `BYTEA` data.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to have an example here? I don't get this exactly one field part. :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try to sort these formats in alphabetical order, so move this one from down below to above 🤣
image


```sql
FORMAT PLAIN
ENCODE BYTES
```

### Debezium AVRO

When creating a source from streams in with Debezium AVRO, the schema of the source does not need to be defined in the `CREATE TABLE` statement as it can be inferred from the `SCHEMA REGISTRY`. This means that the schema file location must be specified. The schema file location can be an actual Web location, which is in `http://...`, `https://...`, or `S3://...` format, or a Confluent Schema Registry. For more details about using Schema Registry for Kafka data, see [Read schema from Schema Registry](/integrations/sources/kafka#read-schemas-from-confluent-schema-registry).
Expand Down Expand Up @@ -120,12 +129,12 @@ RisingWave supports the TiCDC dialect of the Canal CDC format. When creating a s

Syntax:

```js
```sql
FORMAT CANAL
ENCODE JSON
```

### Debezium json
### Debezium JSON

When creating a source from streams in Debezium JSON, you can define the schema of the source within the parentheses after the source name (`schema_definition` in the syntax), and specify the data and encoding formats in the `FORMAT` and `ENCODE` sections. You can directly reference data fields in the JSON payload by their names as column names in the schema.

Expand All @@ -135,7 +144,7 @@ Note that if you are ingesting data of type `timestamp` or `timestamptz` in Risi

Syntax:

```js
```sql
FORMAT DEBEZIUM
ENCODE JSON [ (
[ ignore_key = 'true | false ' ]
Expand All @@ -148,7 +157,7 @@ When loading data from MongoDB via Kafka topics in Debezium Mongo JSON format, t

Syntax:

```js
```sql
FORMAT DEBEZIUM_MONGO
ENCODE JSON
```
Expand All @@ -159,7 +168,7 @@ When creating a source from streams in Maxwell JSON, you can define the schema o

Syntax:

```js
```sql
FORMAT MAXWELL
ENCODE JSON
```
Expand All @@ -172,7 +181,7 @@ You can define the schema of the source within the parentheses after the source

Syntax:

```js
```sql
FORMAT UPSERT
ENCODE JSON [ (
schema.registry = 'schema_registry_url [, ...]',
Expand All @@ -181,6 +190,18 @@ ENCODE JSON [ (
) ]
```

### Parquet

Parquet format allows you to efficiently store and retrieve large datasets by utilizing a columnar storage architecture. RisingWave supports reading Parquet files from object storage systems including Amazon S3, Google Cloud Storage (GCS), and Azure Blob Storage.


Syntax:

```sql
FORMAT PLAIN
ENCODE PARQUET
```

### Protobuf

For data in protobuf format, you must specify a message (fully qualified by package path) and a schema location. The schema location can be an actual Web location that is in `http://...`, `https://...`, or `S3://...` format. For Kafka data in protobuf, instead of providing a schema location, you can provide a Confluent Schema Registry that RisingWave can get the schema from. For more details about using Schema Registry for Kafka data, see [Read schema from Schema Registry](/integrations/sources/kafka#read-schemas-from-confluent-schema-registry).
Expand All @@ -199,7 +220,7 @@ protoc -I=$include_path --include_imports --descriptor_set_out=schema.pb schema.

Syntax:

```js
```sql
FORMAT PLAIN
ENCODE PROTOBUF (
message = 'com.example.MyMessage',
Expand All @@ -209,14 +230,6 @@ ENCODE PROTOBUF (

For more information on supported protobuf types, refer to [Supported protobuf types](/sql/data-types/supported-protobuf-types).

### Bytes

RisingWave allows you to read data streams without decoding the data by using the `BYTES` row format. However, the table or source can have exactly one field of `BYTEA` data.

```js
FORMAT PLAIN
ENCODE BYTES
```

## General parameters for supported formats

Expand Down
13 changes: 5 additions & 8 deletions integrations/sources/s3.mdx
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
title: "Ingest data from S3 buckets"
sidebarTitle: AWS S3
description: "Use the SQL statement below to connect RisingWave to an Amazon S3 source. RisingWave supports both CSV and [ndjson](https://github.com/ndjson) file formats."
description: "Use the SQL statement below to connect RisingWave to an Amazon S3 source. RisingWave supports both CSV, [ndjson](https://github.com/ndjson) and Parquet file formats."
WanYixian marked this conversation as resolved.
Show resolved Hide resolved
---

The S3 connector does not guarantee the sequential reading of files or complete file reading.
Expand Down Expand Up @@ -52,10 +52,12 @@ For CSV data, specify the delimiter in the `delimiter` option in `ENCODE propert
| s3.assume\_role | Optional. Specifies the ARN of an IAM role to assume when accessing S3\. It allows temporary, secure access to S3 resources without sharing long-term credentials. |
| refresh.interval.sec | Optional. Configure the time interval between operations of listing files. It determines the delay in discovering new files, with a default value of 60 seconds. |

note
<Note>

Empty cells in CSV files will be parsed to `NULL`.

</Note>

| Field | Notes |
| :---------------- | :--------------------------------------------------------------------------------------------------------------------------------------- |
| _data\_format_ | Supported data format: PLAIN. |
Expand Down Expand Up @@ -174,13 +176,8 @@ CREATE MATERIALIZED VIEW mv AS SELECT * FROM s3_source;

-- Create a table with the S3 connector
CREATE TABLE s3_table ( ... ) WITH ( connector = 's3', ... );
```

To view the progress of the source, you can also read the source directly

```sql
-- Create a materialized view from the source
CREATE SOURCE s3_source WITH ( connector = 's3_v2', ... );
-- Select from the source directly
SELECT count(*) from s3_source;
```

Expand Down
Loading