Skip to content

Commit

Permalink
add new additional column: payload
Browse files Browse the repository at this point in the history
Signed-off-by: tabVersion <[email protected]>
  • Loading branch information
tabVersion committed Sep 6, 2024
1 parent df7f54f commit 7f78492
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 11 deletions.
4 changes: 4 additions & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ message AdditionalTableName {}

message AdditionalCollectionName {}

message AdditionalColumnPayload {}

// this type means we read all headers as a whole
message AdditionalColumnHeaders {}

Expand All @@ -246,6 +248,7 @@ message AdditionalColumn {
AdditionalSchemaName schema_name = 9;
AdditionalTableName table_name = 10;
AdditionalCollectionName collection_name = 11;
AdditionalColumnPayload payload = 12;
}
}

Expand All @@ -258,4 +261,5 @@ enum AdditionalColumnType {
ADDITIONAL_COLUMN_TYPE_HEADER = 5;
ADDITIONAL_COLUMN_TYPE_FILENAME = 6;
ADDITIONAL_COLUMN_TYPE_NORMAL = 7;
ADDITIONAL_COLUMN_TYPE_Payload = 8;
}
47 changes: 36 additions & 11 deletions src/connector/src/parser/additional_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColum
use risingwave_pb::plan_common::{
AdditionalCollectionName, AdditionalColumn, AdditionalColumnFilename, AdditionalColumnHeader,
AdditionalColumnHeaders, AdditionalColumnKey, AdditionalColumnOffset,
AdditionalColumnPartition, AdditionalColumnTimestamp, AdditionalDatabaseName,
AdditionalSchemaName, AdditionalTableName,
AdditionalColumnPartition, AdditionalColumnPayload, AdditionalColumnTimestamp,
AdditionalDatabaseName, AdditionalSchemaName, AdditionalTableName,
};

use crate::error::ConnectorResult;
use crate::source::cdc::MONGODB_CDC_CONNECTOR;
use crate::source::{
AZBLOB_CONNECTOR, GCS_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, OPENDAL_S3_CONNECTOR,
POSIX_FS_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR,
POSIX_FS_CONNECTOR, PULSAR_CONNECTOR,
};

// Hidden additional columns connectors which do not support `include` syntax.
Expand All @@ -44,21 +44,38 @@ pub static COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock<HashMap<&'static str, HashSet
HashMap::from([
(
KAFKA_CONNECTOR,
HashSet::from(["key", "timestamp", "partition", "offset", "header"]),
HashSet::from([
"key",
"timestamp",
"partition",
"offset",
"header",
"payload",
]),
),
(
PULSAR_CONNECTOR,
HashSet::from(["key", "partition", "offset"]),
HashSet::from(["key", "partition", "offset", "payload"]),
),
(
KINESIS_CONNECTOR,
HashSet::from(["key", "partition", "offset", "timestamp"]),
HashSet::from(["key", "partition", "offset", "timestamp", "payload"]),
),
// remove s3 for no longer use
// (S3_CONNECTOR, HashSet::from(["file", "offset"])),
(
OPENDAL_S3_CONNECTOR,
HashSet::from(["file", "offset", "payload"]),
),
(GCS_CONNECTOR, HashSet::from(["file", "offset", "payload"])),
(
AZBLOB_CONNECTOR,
HashSet::from(["file", "offset", "payload"]),
),
(
POSIX_FS_CONNECTOR,
HashSet::from(["file", "offset", "payload"]),
),
(OPENDAL_S3_CONNECTOR, HashSet::from(["file", "offset"])),
(S3_CONNECTOR, HashSet::from(["file", "offset"])),
(GCS_CONNECTOR, HashSet::from(["file", "offset"])),
(AZBLOB_CONNECTOR, HashSet::from(["file", "offset"])),
(POSIX_FS_CONNECTOR, HashSet::from(["file", "offset"])),
// mongodb-cdc doesn't support cdc backfill table
(
MONGODB_CDC_CONNECTOR,
Expand Down Expand Up @@ -186,6 +203,14 @@ pub fn build_additional_column_desc(
)),
},
),
"payload" => ColumnDesc::named_with_additional_column(
column_name,
column_id,
DataType::Jsonb,
AdditionalColumn {
column_type: Some(AdditionalColumnType::Payload(AdditionalColumnPayload {})),
},
),
"offset" => ColumnDesc::named_with_additional_column(
column_name,
column_id,
Expand Down
1 change: 1 addition & 0 deletions src/prost/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
"plan_common.AdditionalColumnPartition",
"#[derive(Eq, Hash)]",
)
.type_attribute("plan_common.AdditionalColumnPayload", "#[derive(Eq, Hash)]")
.type_attribute(
"plan_common.AdditionalColumnTimestamp",
"#[derive(Eq, Hash)]",
Expand Down

0 comments on commit 7f78492

Please sign in to comment.