diff --git a/changelog/product-lifecycle.mdx b/changelog/product-lifecycle.mdx index e70447d6..9355df90 100644 --- a/changelog/product-lifecycle.mdx +++ b/changelog/product-lifecycle.mdx @@ -30,7 +30,7 @@ Below is a list of all features in the public preview phase: | [Approx percentile](/sql/functions/aggregate#approx-percentile) | 2.0 | | [Auto schema change in MySQL CDC](/integrations/sources/mysql-cdc#automatically-change-schema) | 2.0 | | [SQL Server CDC source](/integrations/sources/sql-server-cdc) | 2.0 | -| [Sink data in parquet encode](/delivery/overview#sink-data-in-parquet-or-json-encode) | 2.0 | +| [Sink data in Parquet format](/delivery/overview#sink-data-in-parquet-or-json-format) | 2.0 | | [Time travel queries](/processing/time-travel-queries) | 2.0 | | [Manage secrets](/operate/manage-secrets) | 2.0 | | [Amazon DynamoDB sink](/integrations/destinations/amazon-dynamodb) | 1.10 | diff --git a/delivery/overview.mdx b/delivery/overview.mdx index e5eff912..b937eaa0 100644 --- a/delivery/overview.mdx +++ b/delivery/overview.mdx @@ -100,15 +100,15 @@ When creating an `upsert` sink, note whether or not you need to specify the prim * If the downstream system supports primary keys but the table in the downstream system has no primary key, then RisingWave does not allow users to create an upsert sink. A primary key must be defined in the table in the downstream system. * If the downstream system does not support primary keys, then users must define the primary key when creating an upsert sink. -## Sink data in parquet or json encode +## Sink data in parquet or json format **PUBLIC PREVIEW** -Sink data in parquet encode is in the public preview stage, meaning it's nearing the final product but is not yet fully stable. If you encounter any issues or have feedback, please contact us through our [Slack channel](https://www.risingwave.com/slack). Your input is valuable in helping us improve the feature. For more information, see our [Public preview feature list](/changelog/product-lifecycle#features-in-the-public-preview-stage). +Sinking data in Parquet format is currently in public preview, meaning it is nearing the final product but may not yet be fully stable. If you encounter any issues or have feedback, please reach out to us via our [Slack channel](https://www.risingwave.com/slack). Your input is valuable in helping us improve this feature. For more details, see our [Public Preview Feature List](/changelog/product-lifecycle#features-in-the-public-preview-stage). -RisingWave supports sinking data in Parquet or JSON encode to file systems including S3, Google Cloud Storage (GCS), Azure Blob Storage, and WebHDFS. This eliminates the need for complex data lake setups. Once the data is saved, the files can be queried using the batch processing engine of RisingWave through the `file_scan` API. You can also leverage third-party OLAP query engines for further data processing. +RisingWave supports sinking data in Parquet or JSON formats to cloud storage services, including S3, Google Cloud Storage (GCS), Azure Blob Storage, and WebHDFS. This eliminates the need for complex data lake setups. Once the data is saved, the files can be queried using RisingWave's batch processing engine through the `file_scan` API. You can also leverage third-party OLAP query engines to enhance data processing capabilities. Below is an example to sink data to S3: diff --git a/ingestion/overview.mdx b/ingestion/overview.mdx index 898d0157..bade4093 100644 --- a/ingestion/overview.mdx +++ b/ingestion/overview.mdx @@ -78,7 +78,7 @@ The statement will create a streaming job that continuously ingests data from th ### Insert data into tables -You can load data in batch to RisingWave by creating a table ([CREATE TABLE](/sql/commands/sql-create-table)) and then inserting data into it ([INSERT](/sql/commands/sql-insert)). For example, the statement below creates a table `website_visits` and inserts 5 rows of data. +You can load data in batch mode to RisingWave by [creating a table](/sql/commands/sql-create-table) and then [inserting](/sql/commands/sql-insert) data into it. For example, the statement below creates a table `website_visits` and inserts 5 rows of data. ```sql CREATE TABLE website_visits ( @@ -120,6 +120,53 @@ CREATE TABLE t1( INSERT INTO t1 SELECT * FROM source_iceberg_t1; ``` +## File source management + +RisingWave supports reading data from file sources including AWS S3, GCS, and Azure Blob Storage. + +### Batch reading from file source + +To read data in batch from file sources, you need to create a materialized view from the source or create a table with the appropriate connector. You can also directly query the file source. Below are examples using AWS S3. + +```sql +-- Create a source that connects to S3 +CREATE SOURCE s3_source WITH ( connector = 's3', ... ); + +-- Create a materialized view from the source for batch processing +CREATE MATERIALIZED VIEW mv AS SELECT * FROM s3_source; + +-- Create a table using the S3 connector +CREATE TABLE s3_table ( ... ) WITH ( connector = 's3', ... ); + +-- Select from the source directly +SELECT count(*) from s3_source; +``` + +### Data type mapping in Parquet + +When using file source to read parquet files, please define the schema information according to the following mapping. + +| File source type | RisingWave type | +| :----------- | :-------------- | +| boolean | boolean | +| int16 | smallint | +| int32 | int | +| int64 | bigint | +| float | real | +| double | double precision| +| string | varchar | +| date | date | +| `timestamp(_, Some(_))` | timestamptz | +| `timestamp(_, None)` | timestamp | +| decimal | decimal | +| int8 | smallint | +| uint8 | smallint | +| uint16 | int | +| uint32 | bigint | +| uint64 | decimal | +| float16 | double precision| + + ## Topics in this section The information presented above provides a brief overview of the data ingestion process in RisingWave. To gain a more comprehensive understanding of this process, the following topics in this section will delve more deeply into the subject matter. Here is a brief introduction to what you can expect to find in each topic: diff --git a/ingestion/supported-sources-and-formats.mdx b/ingestion/supported-sources-and-formats.mdx index 9522962e..7079f957 100644 --- a/ingestion/supported-sources-and-formats.mdx +++ b/ingestion/supported-sources-and-formats.mdx @@ -18,11 +18,11 @@ To ingest data in formats marked with "T", you need to create tables (with conne | [Kinesis](/integrations/sources/kinesis) | Latest | [Avro](#avro), [JSON](#json), [protobuf](#protobuf), [Debezium JSON](#debezium-json) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T) | | [PostgreSQL CDC](/integrations/sources/postgresql-cdc) | 10, 11, 12, 13, 14 | [Debezium JSON](#debezium-json) (T) | | [MySQL CDC](/integrations/sources/mysql-cdc) | 5.7, 8.0 | [Debezium JSON](#debezium-json) (T) | -| [CDC via Kafka](/ingestion/change-data-capture-with-risingwave) | [Debezium JSON](#debezium-json) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T) | | +| [CDC via Kafka](/ingestion/change-data-capture-with-risingwave) | | [Debezium JSON](#debezium-json) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T) | | [Amazon S3](/integrations/sources/s3) | Latest | [JSON](#json), CSV | | [Load generator](/ingestion/generate-test-data) | Built-in | [JSON](#json) | -| [Google Pub/Sub](/integrations/sources/google-pub-sub) | [Avro](#avro), [JSON](#json), [protobuf](#protobuf), [Debezium JSON](#debezium-json) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T) | | -| [Google Cloud Storage](/integrations/sources/google-cloud-storage) | [JSON](#json) | | +| [Google Pub/Sub](/integrations/sources/google-pub-sub) | | [Avro](#avro), [JSON](#json), [protobuf](#protobuf), [Debezium JSON](#debezium-json) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T) | +| [Google Cloud Storage](/integrations/sources/google-cloud-storage) | | [JSON](#json) | When a source is created, RisingWave does not ingest data immediately. RisingWave starts to process data when a materialized view is created based on the source. @@ -63,6 +63,15 @@ FORMAT [ DEBEZIUM | UPSERT | PLAIN ] ENCODE AVRO ( Note that for `map.handling.mode = 'jsonb'`, the value types can only be: `null`, `boolean`, `int`, `string`, or `map`/`record`/`array` with these types. +### Bytes + +RisingWave allows you to read data streams without decoding the data by using the `BYTES` row format. However, the table or source can have exactly one field of `BYTEA` data. + +```sql +FORMAT PLAIN +ENCODE BYTES +``` + ### Debezium AVRO When creating a source from streams in with Debezium AVRO, the schema of the source does not need to be defined in the `CREATE TABLE` statement as it can be inferred from the `SCHEMA REGISTRY`. This means that the schema file location must be specified. The schema file location can be an actual Web location, which is in `http://...`, `https://...`, or `S3://...` format, or a Confluent Schema Registry. For more details about using Schema Registry for Kafka data, see [Read schema from Schema Registry](/integrations/sources/kafka#read-schemas-from-confluent-schema-registry). @@ -120,12 +129,12 @@ RisingWave supports the TiCDC dialect of the Canal CDC format. When creating a s Syntax: -```js +```sql FORMAT CANAL ENCODE JSON ``` -### Debezium json +### Debezium JSON When creating a source from streams in Debezium JSON, you can define the schema of the source within the parentheses after the source name (`schema_definition` in the syntax), and specify the data and encoding formats in the `FORMAT` and `ENCODE` sections. You can directly reference data fields in the JSON payload by their names as column names in the schema. @@ -135,7 +144,7 @@ Note that if you are ingesting data of type `timestamp` or `timestamptz` in Risi Syntax: -```js +```sql FORMAT DEBEZIUM ENCODE JSON [ ( [ ignore_key = 'true | false ' ] @@ -148,7 +157,7 @@ When loading data from MongoDB via Kafka topics in Debezium Mongo JSON format, t Syntax: -```js +```sql FORMAT DEBEZIUM_MONGO ENCODE JSON ``` @@ -159,7 +168,7 @@ When creating a source from streams in Maxwell JSON, you can define the schema o Syntax: -```js +```sql FORMAT MAXWELL ENCODE JSON ``` @@ -172,7 +181,7 @@ You can define the schema of the source within the parentheses after the source Syntax: -```js +```sql FORMAT UPSERT ENCODE JSON [ ( schema.registry = 'schema_registry_url [, ...]', @@ -181,6 +190,18 @@ ENCODE JSON [ ( ) ] ``` +### Parquet + +Parquet format allows you to efficiently store and retrieve large datasets by utilizing a columnar storage architecture. RisingWave supports reading Parquet files from object storage systems including Amazon S3, Google Cloud Storage (GCS), and Azure Blob Storage. + + +Syntax: + +```sql +FORMAT PLAIN +ENCODE PARQUET +``` + ### Protobuf For data in protobuf format, you must specify a message (fully qualified by package path) and a schema location. The schema location can be an actual Web location that is in `http://...`, `https://...`, or `S3://...` format. For Kafka data in protobuf, instead of providing a schema location, you can provide a Confluent Schema Registry that RisingWave can get the schema from. For more details about using Schema Registry for Kafka data, see [Read schema from Schema Registry](/integrations/sources/kafka#read-schemas-from-confluent-schema-registry). @@ -199,7 +220,7 @@ protoc -I=$include_path --include_imports --descriptor_set_out=schema.pb schema. Syntax: -```js +```sql FORMAT PLAIN ENCODE PROTOBUF ( message = 'com.example.MyMessage', @@ -209,14 +230,6 @@ ENCODE PROTOBUF ( For more information on supported protobuf types, refer to [Supported protobuf types](/sql/data-types/supported-protobuf-types). -### Bytes - -RisingWave allows you to read data streams without decoding the data by using the `BYTES` row format. However, the table or source can have exactly one field of `BYTEA` data. - -```js -FORMAT PLAIN -ENCODE BYTES -``` ## General parameters for supported formats diff --git a/integrations/destinations/aws-s3.mdx b/integrations/destinations/aws-s3.mdx index 28111b7b..1014096c 100644 --- a/integrations/destinations/aws-s3.mdx +++ b/integrations/destinations/aws-s3.mdx @@ -48,4 +48,4 @@ WITH ( )FORMAT PLAIN ENCODE PARQUET(force_append_only=true); ``` -For more information about encode `Parquet` or `JSON`, see [Sink data in parquet or json encode](/delivery/overview). \ No newline at end of file +For more information about encode `Parquet` or `JSON`, see [Sink data in parquet or json format](/delivery/overview#sink-data-in-parquet-or-json-format). \ No newline at end of file diff --git a/integrations/destinations/azure-blob.mdx b/integrations/destinations/azure-blob.mdx index 32903bfd..d97455c5 100644 --- a/integrations/destinations/azure-blob.mdx +++ b/integrations/destinations/azure-blob.mdx @@ -52,4 +52,4 @@ WITH ( )FORMAT PLAIN ENCODE PARQUET(force_append_only=true); ``` -For more information about encode `Parquet` or `JSON`, see [Sink data in parquet or json encode](/delivery/overview). +For more information about encode `Parquet` or `JSON`, see [Sink data in parquet or json format](/delivery/overview#sink-data-in-parquet-or-json-format). diff --git a/integrations/destinations/google-cloud-storage.mdx b/integrations/destinations/google-cloud-storage.mdx index e159cbea..f87ad542 100644 --- a/integrations/destinations/google-cloud-storage.mdx +++ b/integrations/destinations/google-cloud-storage.mdx @@ -43,4 +43,4 @@ WITH ( )FORMAT PLAIN ENCODE PARQUET(force_append_only=true); ``` -For more information about encode `Parquet` or `JSON`, see [Sink data in parquet or json encode](/delivery/overview). \ No newline at end of file +For more information about encode `Parquet` or `JSON`, see [Sink data in parquet or json format](/delivery/overview#sink-data-in-parquet-or-json-format). \ No newline at end of file diff --git a/integrations/destinations/webhdfs.mdx b/integrations/destinations/webhdfs.mdx index 6e133cbf..e60f787f 100644 --- a/integrations/destinations/webhdfs.mdx +++ b/integrations/destinations/webhdfs.mdx @@ -39,4 +39,4 @@ WITH ( )FORMAT PLAIN ENCODE PARQUET(force_append_only=true); ``` -For more information about encode `Parquet` or `JSON`, see [Sink data in parquet or json encode](/delivery/overview). \ No newline at end of file +For more information about encode `Parquet` or `JSON`, see [Sink data in parquet or json format](/delivery/overview#sink-data-in-parquet-or-json-format). \ No newline at end of file diff --git a/integrations/sources/s3.mdx b/integrations/sources/s3.mdx index 4a1ff358..c63c66c5 100644 --- a/integrations/sources/s3.mdx +++ b/integrations/sources/s3.mdx @@ -1,7 +1,7 @@ --- title: "Ingest data from S3 buckets" sidebarTitle: AWS S3 -description: "Use the SQL statement below to connect RisingWave to an Amazon S3 source. RisingWave supports both CSV and [ndjson](https://github.com/ndjson) file formats." +description: "Use the SQL statement below to connect RisingWave to an Amazon S3 source. RisingWave supports CSV, [ndjson](https://github.com/ndjson) and Parquet file formats." --- The S3 connector does not guarantee the sequential reading of files or complete file reading. @@ -52,10 +52,12 @@ For CSV data, specify the delimiter in the `delimiter` option in `ENCODE propert | s3.assume\_role | Optional. Specifies the ARN of an IAM role to assume when accessing S3\. It allows temporary, secure access to S3 resources without sharing long-term credentials. | | refresh.interval.sec | Optional. Configure the time interval between operations of listing files. It determines the delay in discovering new files, with a default value of 60 seconds. | -note + Empty cells in CSV files will be parsed to `NULL`. + + | Field | Notes | | :---------------- | :--------------------------------------------------------------------------------------------------------------------------------------- | | _data\_format_ | Supported data format: PLAIN. | @@ -174,13 +176,8 @@ CREATE MATERIALIZED VIEW mv AS SELECT * FROM s3_source; -- Create a table with the S3 connector CREATE TABLE s3_table ( ... ) WITH ( connector = 's3', ... ); -``` -To view the progress of the source, you can also read the source directly - -```sql --- Create a materialized view from the source -CREATE SOURCE s3_source WITH ( connector = 's3_v2', ... ); +-- Select from the source directly SELECT count(*) from s3_source; ```