Skip to content

Commit

Permalink
Update ingest-additional-fields-with-include-clause.mdx
Browse files Browse the repository at this point in the history
  • Loading branch information
WanYixian committed Dec 12, 2024
1 parent b1d5f98 commit 025b719
Showing 1 changed file with 19 additions and 14 deletions.
33 changes: 19 additions & 14 deletions ingestion/ingest-additional-fields-with-include-clause.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ sidebarTitle: "Ingest additional source fields"
To add additional columns, use the `INCLUDE` clause.

```sql
INCLUDE { header | key | offset | partition | timestamp } [AS <column_name>]
INCLUDE { header | key | offset | partition | timestamp | payload } [AS <column_name>]
```

If `<column_name>` is not specified, a default one will be generated in the format `_rw_{connector}_{col}`, where `connector` is the name of the source connector used (Kafka, Pulsar, Kinesis, etc.), and `col` is the type of column being generated (key, offset, timestamp, etc.). For instance, if an offset column is added to a Kafka source, the default column name would be `_rw_kafka_offset`.
Expand Down Expand Up @@ -43,6 +43,7 @@ When ingesting data from Kafka, the following additional fields can be included.
| partition | `varchar` | The partition the message is from. |
| offset | `varchar` | The offset in the partition. |
| headers | `struct<varchar, bytea>[]` | Key-value pairs along with the message. |
| payload | `json` | The actual content or data of the message. Only supports `JSON` format. |

In the case of headers, there are two ways to define it.

Expand All @@ -64,10 +65,11 @@ When ingesting data from Kinesis, here are some things to note when including th

| Allowed components | Default type | Note |
| :----------------- | :---------------- | :-------------- |
| key | bytea | Can be overwritten by encode and key encode. |
| timestamp | timestamp with time zone | See the approximate\_arrival\_timestamp field at [Struct aws\_sdk\_kinesis::types::Record](https://docs.rs/aws-sdk-kinesis/latest/aws%5Fsdk%5Fkinesis/types/struct.Record.html). |
| partition | varchar | The partition the message is from. |
| offset | varchar | The offset in the partition, which corresponds to Kinesis sequence numbers. |
| key | `bytea` | Can be overwritten by `ENCODE` and `KEY ENCODE`. |
| timestamp | `timestamp with time zone` | See the approximate\_arrival\_timestamp field at [Struct aws\_sdk\_kinesis::types::Record](https://docs.rs/aws-sdk-kinesis/latest/aws%5Fsdk%5Fkinesis/types/struct.Record.html). |
| partition | `varchar` | The partition the message is from. |
| offset | `varchar` | The offset in the partition, which corresponds to Kinesis sequence numbers. |
| payload | `json` | The actual content or data of the message. Only supports `JSON` format. |

For more components, see [Struct aws\_sdk\_kinesis::types::Record](https://docs.rs/aws-sdk-kinesis/latest/aws%5Fsdk%5Fkinesis/types/struct.Record.html).

Expand All @@ -77,20 +79,22 @@ When ingesting data from Pulsar, here are some things to note when including the

| Allowed components | Default type | Note |
| :----------------- | :----------- | :------------------------------------------- |
| key | bytea | Can be overwritten by ENCODE and KEY ENCODE. |
| partition | varchar | The partition the message is from. |
| offset | varchar | The offset in the partition. |
| key | `bytea` | Can be overwritten by `ENCODE` and `KEY ENCODE`. |
| partition | `varchar` | The partition the message is from. |
| offset | `varchar` | The offset in the partition. |
| payload | `json` | The actual content or data of the message. Only supports `JSON` format. |

For more components, see [Struct pulsar::message::proto::MessageMetadata](https://docs.rs/pulsar/latest/pulsar/message/proto/struct.MessageMetadata.html).

### S3 and GCS
### S3, GCS, and Azure Blob

When ingesting data from AWS S3 or GCS, the following additional fields can be included.
When ingesting data from AWS S3, GCS or Azure Blob, the following additional fields can be included.

| Allowed components | Default type | Note |
| :----------------- | :----------- | :--------------------------- |
| file | varchar | The file the record is from. |
| offset | varchar | The offset in the file. |
| file | `varchar` | The file the record is from. |
| offset | `varchar` | The offset in the file. |
| payload | `json` | The actual content or data of the message. Only supports `JSON` format. |

### MQTT

Expand All @@ -102,9 +106,9 @@ When ingesting data from MQTT, the following additional fields can be included.

## Examples

Here we create a table, `additional_columns`, that ingests data from a Kafka broker. Aside from the `a` column, which is part of the message payload, the additional fields `key`, `partition`, `offset`, `timestamp`, and `header`, are also added to the table.
Here we create a table, `additional_columns`, that ingests data from a Kafka broker. Aside from the `a` column, which is part of the message payload, the additional fields `key`, `partition`, `offset`, `timestamp`, `header`, and `payload` are also added to the table.

```js
```sql
CREATE TABLE additional_columns (
a int,
primary key (key_col)
Expand All @@ -114,6 +118,7 @@ INCLUDE partition AS partition_col
INCLUDE offset AS offset_col
INCLUDE timestamp AS timestamp_col
INCLUDE header AS header_col
INCLUDE payload AS payload_col
WITH (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
Expand Down

0 comments on commit 025b719

Please sign in to comment.