From 7f78492cdca4c26bc1d9fb65f624916a2901d371 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Fri, 6 Sep 2024 15:13:35 +0800 Subject: [PATCH 01/13] add new additional column: payload Signed-off-by: tabVersion --- proto/plan_common.proto | 4 ++ .../src/parser/additional_columns.rs | 47 ++++++++++++++----- src/prost/build.rs | 1 + 3 files changed, 41 insertions(+), 11 deletions(-) diff --git a/proto/plan_common.proto b/proto/plan_common.proto index bc2e60503f103..c240d59052605 100644 --- a/proto/plan_common.proto +++ b/proto/plan_common.proto @@ -230,6 +230,8 @@ message AdditionalTableName {} message AdditionalCollectionName {} +message AdditionalColumnPayload {} + // this type means we read all headers as a whole message AdditionalColumnHeaders {} @@ -246,6 +248,7 @@ message AdditionalColumn { AdditionalSchemaName schema_name = 9; AdditionalTableName table_name = 10; AdditionalCollectionName collection_name = 11; + AdditionalColumnPayload payload = 12; } } @@ -258,4 +261,5 @@ enum AdditionalColumnType { ADDITIONAL_COLUMN_TYPE_HEADER = 5; ADDITIONAL_COLUMN_TYPE_FILENAME = 6; ADDITIONAL_COLUMN_TYPE_NORMAL = 7; + ADDITIONAL_COLUMN_TYPE_Payload = 8; } diff --git a/src/connector/src/parser/additional_columns.rs b/src/connector/src/parser/additional_columns.rs index c30f5f74ba390..c889b3b36ddf1 100644 --- a/src/connector/src/parser/additional_columns.rs +++ b/src/connector/src/parser/additional_columns.rs @@ -24,15 +24,15 @@ use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColum use risingwave_pb::plan_common::{ AdditionalCollectionName, AdditionalColumn, AdditionalColumnFilename, AdditionalColumnHeader, AdditionalColumnHeaders, AdditionalColumnKey, AdditionalColumnOffset, - AdditionalColumnPartition, AdditionalColumnTimestamp, AdditionalDatabaseName, - AdditionalSchemaName, AdditionalTableName, + AdditionalColumnPartition, AdditionalColumnPayload, AdditionalColumnTimestamp, + AdditionalDatabaseName, AdditionalSchemaName, AdditionalTableName, }; use crate::error::ConnectorResult; use crate::source::cdc::MONGODB_CDC_CONNECTOR; use crate::source::{ AZBLOB_CONNECTOR, GCS_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, OPENDAL_S3_CONNECTOR, - POSIX_FS_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR, + POSIX_FS_CONNECTOR, PULSAR_CONNECTOR, }; // Hidden additional columns connectors which do not support `include` syntax. @@ -44,21 +44,38 @@ pub static COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock ColumnDesc::named_with_additional_column( + column_name, + column_id, + DataType::Jsonb, + AdditionalColumn { + column_type: Some(AdditionalColumnType::Payload(AdditionalColumnPayload {})), + }, + ), "offset" => ColumnDesc::named_with_additional_column( column_name, column_id, diff --git a/src/prost/build.rs b/src/prost/build.rs index 18bc2d4ae9494..63c6313e35956 100644 --- a/src/prost/build.rs +++ b/src/prost/build.rs @@ -147,6 +147,7 @@ fn main() -> Result<(), Box> { "plan_common.AdditionalColumnPartition", "#[derive(Eq, Hash)]", ) + .type_attribute("plan_common.AdditionalColumnPayload", "#[derive(Eq, Hash)]") .type_attribute( "plan_common.AdditionalColumnTimestamp", "#[derive(Eq, Hash)]", From 20f6bfadbf84205068bf73486ffca7d1118c32f1 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Fri, 6 Sep 2024 17:12:13 +0800 Subject: [PATCH 02/13] check on encode json Signed-off-by: tabVersion --- src/frontend/src/handler/create_source.rs | 50 +++++++++++++++++------ src/frontend/src/handler/create_table.rs | 1 + 2 files changed, 39 insertions(+), 12 deletions(-) diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 432f814cd4c41..2af7aa8b4b098 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -67,7 +67,7 @@ use risingwave_sqlparser::ast::{ get_delimiter, AstString, ColumnDef, ConnectorSchema, CreateSourceStatement, Encode, Format, ObjectName, ProtobufSchema, SourceWatermark, TableConstraint, }; -use risingwave_sqlparser::parser::IncludeOption; +use risingwave_sqlparser::parser::{IncludeOption, IncludeOptionItem}; use thiserror_ext::AsReport; use super::RwPgResponse; @@ -595,8 +595,43 @@ fn bind_columns_from_source_for_cdc( Ok((Some(columns), stream_source_info)) } +// check the additional column compatibility with the format and encode +fn check_additional_column_compatibility( + column_def: &IncludeOptionItem, + source_schema: Option<&ConnectorSchema>, +) -> Result<()> { + // only allow header column have inner field + if column_def.inner_field.is_some() + && !column_def + .column_type + .real_value() + .eq_ignore_ascii_case("header") + { + return Err(RwError::from(ProtocolError(format!( + "Only header column can have inner field, but got {:?}", + column_def.column_type.real_value(), + )))); + } + + // Payload column only allowed when encode is JSON + if let Some(schema) = source_schema + && column_def + .column_type + .real_value() + .eq_ignore_ascii_case("payload") + && !matches!(schema.row_encode, Encode::Json) + { + return Err(RwError::from(ProtocolError(format!( + "Payload column is only allowed when row encode is JSON, but got {:?}", + schema.row_encode + )))); + } + Ok(()) +} + /// add connector-spec columns to the end of column catalog pub fn handle_addition_columns( + source_schema: Option<&ConnectorSchema>, with_properties: &BTreeMap, mut additional_columns: IncludeOption, columns: &mut Vec, @@ -620,17 +655,7 @@ pub fn handle_addition_columns( .unwrap(); // there must be at least one column in the column catalog while let Some(item) = additional_columns.pop() { - { - // only allow header column have inner field - if item.inner_field.is_some() - && !item.column_type.real_value().eq_ignore_ascii_case("header") - { - return Err(RwError::from(ProtocolError(format!( - "Only header column can have inner field, but got {:?}", - item.column_type.real_value(), - )))); - } - } + check_additional_column_compatibility(&item, source_schema)?; let data_type_name: Option = item .header_inner_expect_type @@ -1513,6 +1538,7 @@ pub async fn bind_create_source_or_table_with_connector( // add additional columns before bind pk, because `format upsert` requires the key column handle_addition_columns( + Some(&source_schema), &with_properties, include_column_options, &mut columns, diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index a10453a43ea4e..e2ef048143cd2 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -772,6 +772,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_table( // append additional columns to the end handle_addition_columns( + None, &connect_properties, include_column_options, &mut columns, From 6daffc2de86f677d94e2257a67293ac82bd136dd Mon Sep 17 00:00:00 2001 From: tabVersion Date: Fri, 6 Sep 2024 18:07:22 +0800 Subject: [PATCH 03/13] access layer Signed-off-by: tabVersion --- src/connector/src/parser/mod.rs | 10 ++++++++++ src/connector/src/parser/unified/json.rs | 4 ++++ 2 files changed, 14 insertions(+) diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 4b14654bf518d..91d5320b80a55 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -488,6 +488,16 @@ impl SourceStreamChunkRowWriter<'_> { .map(|ele| ScalarRefImpl::Utf8(ele.split_id)), )); } + (_, &Some(AdditionalColumnType::Payload(_))) => { + // Get the whole payload as a single column + // The frontend check guarantees that row encode must be `SourceEncode::Json` + // here fake a column named "." to represent the whole payload + // see the json accessor hack in `impl Access for JsonAccess<'_>` + let mut desc_mock = desc.clone(); + desc_mock.name = ".".to_string(); + desc_mock.additional_column.column_type = None; + parse_field(&desc_mock) + } (_, _) => { // For normal columns, call the user provided closure. parse_field(desc) diff --git a/src/connector/src/parser/unified/json.rs b/src/connector/src/parser/unified/json.rs index ca709e2eebc73..b6f2ccdade0e0 100644 --- a/src/connector/src/parser/unified/json.rs +++ b/src/connector/src/parser/unified/json.rs @@ -646,6 +646,10 @@ impl<'a> JsonAccess<'a> { impl Access for JsonAccess<'_> { fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult> { let mut value = &self.value; + if path.len() == 1 && path[0] == "." { + return self.options.parse(value, type_expected); + } + for (idx, &key) in path.iter().enumerate() { if let Some(sub_value) = if self.options.ignoring_keycase { json_object_get_case_insensitive(value, key) From 81107a68b3dd737901cd56c52557dd304cacba60 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Fri, 6 Sep 2024 18:11:33 +0800 Subject: [PATCH 04/13] fix proto naming Signed-off-by: tabVersion --- proto/plan_common.proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proto/plan_common.proto b/proto/plan_common.proto index c240d59052605..610f40968755c 100644 --- a/proto/plan_common.proto +++ b/proto/plan_common.proto @@ -261,5 +261,5 @@ enum AdditionalColumnType { ADDITIONAL_COLUMN_TYPE_HEADER = 5; ADDITIONAL_COLUMN_TYPE_FILENAME = 6; ADDITIONAL_COLUMN_TYPE_NORMAL = 7; - ADDITIONAL_COLUMN_TYPE_Payload = 8; + ADDITIONAL_COLUMN_TYPE_PAYLOAD = 8; } From cd4414da00fb429963b6a7e03fdea9a65e92f994 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Fri, 6 Sep 2024 18:19:43 +0800 Subject: [PATCH 05/13] test Signed-off-by: tabVersion --- e2e_test/source/basic/kafka.slt | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/e2e_test/source/basic/kafka.slt b/e2e_test/source/basic/kafka.slt index 40e9b46036112..d5a1d630b941e 100644 --- a/e2e_test/source/basic/kafka.slt +++ b/e2e_test/source/basic/kafka.slt @@ -498,6 +498,16 @@ FORMAT DEBEZIUM ENCODE JSON ( ignore_key = 'true' ) +statement ok +CREATE TABLE test_include_payload (v1 int, v2 varchar) +INCLUDE payload +WITH ( + connector = 'kafka', + topic = 'kafka_1_partition_topic', + properties.bootstrap.server = 'message_queue:29092', + scan.startup.mode = 'earliest' +) FORMAT PLAIN ENCODE JSON + statement ok flush; @@ -512,6 +522,13 @@ select v1, v2 from t0; 3 333 4 4444 +query ITT rowsort +select v1, v2, _rw_kafka_payload from test_include_payload; +---- +1 1 {"v1": 1, "v2": "1"} +2 22 {"v1": 2, "v2": "22"} +3 333 {"v1": 3, "v2": "333"} +4 4444 {"v1": 4, "v2": "4444"} query IT rowsort select v1, v2 from s0; @@ -916,3 +933,6 @@ drop table source_with_rdkafka_props; statement ok drop table debezium_ignore_key; + +statement ok +drop table test_include_payload; From 1d85abf656739b6d4383c6a0fc0a399d8538f729 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Fri, 6 Sep 2024 23:53:58 +0800 Subject: [PATCH 06/13] test Signed-off-by: tabVersion --- e2e_test/source/basic/kafka.slt | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/e2e_test/source/basic/kafka.slt b/e2e_test/source/basic/kafka.slt index d5a1d630b941e..6326c48263db8 100644 --- a/e2e_test/source/basic/kafka.slt +++ b/e2e_test/source/basic/kafka.slt @@ -498,6 +498,16 @@ FORMAT DEBEZIUM ENCODE JSON ( ignore_key = 'true' ) +statement error Payload column is only allowed when row encode is JSON, but got Bytes +CREATE TABLE test_include_payload (a bytea) +INCLUDE payload +WITH ( + connector = 'kafka', + topic = 'kafka_1_partition_topic', + properties.bootstrap.server = 'message_queue:29092', + scan.startup.mode = 'earliest' +) FORMAT PLAIN ENCODE BYTES + statement ok CREATE TABLE test_include_payload (v1 int, v2 varchar) INCLUDE payload From 02337a54a1e425da35d3ccd9d20ec1bdbed9a042 Mon Sep 17 00:00:00 2001 From: Bohan Zhang Date: Tue, 10 Sep 2024 14:38:49 +0800 Subject: [PATCH 07/13] Update src/frontend/src/handler/create_source.rs Co-authored-by: xxchan --- src/frontend/src/handler/create_source.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 2af7aa8b4b098..480c30e66244c 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -622,7 +622,7 @@ fn check_additional_column_compatibility( && !matches!(schema.row_encode, Encode::Json) { return Err(RwError::from(ProtocolError(format!( - "Payload column is only allowed when row encode is JSON, but got {:?}", + "INCLUDE payload is only allowed when using ENCODE JSON, but got ENCODE {:?}", schema.row_encode )))); } From b5961b46db30749e2b4fc8c4c7e24d25485dcd8e Mon Sep 17 00:00:00 2001 From: tabVersion Date: Tue, 10 Sep 2024 15:03:57 +0800 Subject: [PATCH 08/13] fix Signed-off-by: tabVersion --- e2e_test/source/basic/kafka.slt | 2 +- src/connector/src/parser/mod.rs | 11 +---------- src/connector/src/parser/unified/kv_event.rs | 3 +++ 3 files changed, 5 insertions(+), 11 deletions(-) diff --git a/e2e_test/source/basic/kafka.slt b/e2e_test/source/basic/kafka.slt index 6326c48263db8..0e413c3389d58 100644 --- a/e2e_test/source/basic/kafka.slt +++ b/e2e_test/source/basic/kafka.slt @@ -498,7 +498,7 @@ FORMAT DEBEZIUM ENCODE JSON ( ignore_key = 'true' ) -statement error Payload column is only allowed when row encode is JSON, but got Bytes +statement error INCLUDE payload is only allowed when using ENCODE JSON, but got ENCODE Bytes CREATE TABLE test_include_payload (a bytea) INCLUDE payload WITH ( diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 91d5320b80a55..1ed9d84551524 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -488,16 +488,7 @@ impl SourceStreamChunkRowWriter<'_> { .map(|ele| ScalarRefImpl::Utf8(ele.split_id)), )); } - (_, &Some(AdditionalColumnType::Payload(_))) => { - // Get the whole payload as a single column - // The frontend check guarantees that row encode must be `SourceEncode::Json` - // here fake a column named "." to represent the whole payload - // see the json accessor hack in `impl Access for JsonAccess<'_>` - let mut desc_mock = desc.clone(); - desc_mock.name = ".".to_string(); - desc_mock.additional_column.column_type = None; - parse_field(&desc_mock) - } + (_, &Some(AdditionalColumnType::Payload(_))) => parse_field(desc), (_, _) => { // For normal columns, call the user provided closure. parse_field(desc) diff --git a/src/connector/src/parser/unified/kv_event.rs b/src/connector/src/parser/unified/kv_event.rs index 7e52d2f4c3c24..cad70bc23429e 100644 --- a/src/connector/src/parser/unified/kv_event.rs +++ b/src/connector/src/parser/unified/kv_event.rs @@ -79,6 +79,9 @@ where pub fn access_field(&self, desc: &SourceColumnDesc) -> AccessResult> { match desc.additional_column.column_type { Some(AdditionalColumnType::Key(_)) => self.access_key(&[&desc.name], &desc.data_type), + // hack here: Get the whole payload as a single column + // use a special mark "." as column name to represent the whole payload + Some(AdditionalColumnType::Payload(_)) => self.access_value(&[&"."], &desc.data_type), None => self.access_value(&[&desc.name], &desc.data_type), _ => unreachable!(), } From a833f96a2cffc9a84296eb19a589de80d5e15462 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Tue, 10 Sep 2024 15:15:17 +0800 Subject: [PATCH 09/13] fix Signed-off-by: tabVersion --- src/connector/src/parser/unified/kv_event.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connector/src/parser/unified/kv_event.rs b/src/connector/src/parser/unified/kv_event.rs index cad70bc23429e..3f3e1312f8121 100644 --- a/src/connector/src/parser/unified/kv_event.rs +++ b/src/connector/src/parser/unified/kv_event.rs @@ -81,7 +81,7 @@ where Some(AdditionalColumnType::Key(_)) => self.access_key(&[&desc.name], &desc.data_type), // hack here: Get the whole payload as a single column // use a special mark "." as column name to represent the whole payload - Some(AdditionalColumnType::Payload(_)) => self.access_value(&[&"."], &desc.data_type), + Some(AdditionalColumnType::Payload(_)) => self.access_value(&["."], &desc.data_type), None => self.access_value(&[&desc.name], &desc.data_type), _ => unreachable!(), } From d7c1dd0552e540bcce80b1793c764e3e72a2c486 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Tue, 10 Sep 2024 16:37:39 +0800 Subject: [PATCH 10/13] fix Signed-off-by: tabVersion --- src/connector/src/parser/unified/json.rs | 2 +- src/connector/src/parser/unified/kv_event.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/connector/src/parser/unified/json.rs b/src/connector/src/parser/unified/json.rs index b6f2ccdade0e0..88ed6c94ab08b 100644 --- a/src/connector/src/parser/unified/json.rs +++ b/src/connector/src/parser/unified/json.rs @@ -646,7 +646,7 @@ impl<'a> JsonAccess<'a> { impl Access for JsonAccess<'_> { fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult> { let mut value = &self.value; - if path.len() == 1 && path[0] == "." { + if path.is_empty() { return self.options.parse(value, type_expected); } diff --git a/src/connector/src/parser/unified/kv_event.rs b/src/connector/src/parser/unified/kv_event.rs index 3f3e1312f8121..ad09efe2e2699 100644 --- a/src/connector/src/parser/unified/kv_event.rs +++ b/src/connector/src/parser/unified/kv_event.rs @@ -81,7 +81,7 @@ where Some(AdditionalColumnType::Key(_)) => self.access_key(&[&desc.name], &desc.data_type), // hack here: Get the whole payload as a single column // use a special mark "." as column name to represent the whole payload - Some(AdditionalColumnType::Payload(_)) => self.access_value(&["."], &desc.data_type), + Some(AdditionalColumnType::Payload(_)) => self.access_value(&[], &desc.data_type), None => self.access_value(&[&desc.name], &desc.data_type), _ => unreachable!(), } From e097acbf92ba4004c327d4a63872b7b3844cbcee Mon Sep 17 00:00:00 2001 From: tabVersion Date: Tue, 10 Sep 2024 17:40:36 +0800 Subject: [PATCH 11/13] fix comments Signed-off-by: tabVersion --- src/connector/src/parser/mod.rs | 6 +++++- src/connector/src/parser/unified/json.rs | 3 --- src/connector/src/parser/unified/kv_event.rs | 2 +- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 1ed9d84551524..a49390c2752f4 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -488,7 +488,11 @@ impl SourceStreamChunkRowWriter<'_> { .map(|ele| ScalarRefImpl::Utf8(ele.split_id)), )); } - (_, &Some(AdditionalColumnType::Payload(_))) => parse_field(desc), + (_, &Some(AdditionalColumnType::Payload(_))) => { + // ingest the whole payload as a single column + // do special logic in `KvEvent::access_field` + parse_field(desc) + } (_, _) => { // For normal columns, call the user provided closure. parse_field(desc) diff --git a/src/connector/src/parser/unified/json.rs b/src/connector/src/parser/unified/json.rs index 88ed6c94ab08b..8ee8f9fe9386f 100644 --- a/src/connector/src/parser/unified/json.rs +++ b/src/connector/src/parser/unified/json.rs @@ -646,9 +646,6 @@ impl<'a> JsonAccess<'a> { impl Access for JsonAccess<'_> { fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult> { let mut value = &self.value; - if path.is_empty() { - return self.options.parse(value, type_expected); - } for (idx, &key) in path.iter().enumerate() { if let Some(sub_value) = if self.options.ignoring_keycase { diff --git a/src/connector/src/parser/unified/kv_event.rs b/src/connector/src/parser/unified/kv_event.rs index ad09efe2e2699..6ab7925b9bb48 100644 --- a/src/connector/src/parser/unified/kv_event.rs +++ b/src/connector/src/parser/unified/kv_event.rs @@ -80,7 +80,7 @@ where match desc.additional_column.column_type { Some(AdditionalColumnType::Key(_)) => self.access_key(&[&desc.name], &desc.data_type), // hack here: Get the whole payload as a single column - // use a special mark "." as column name to represent the whole payload + // use a special mark empty slice as path to represent the whole payload Some(AdditionalColumnType::Payload(_)) => self.access_value(&[], &desc.data_type), None => self.access_value(&[&desc.name], &desc.data_type), _ => unreachable!(), From 22db5647a6241f1d39fb42f55a4665da302e00fc Mon Sep 17 00:00:00 2001 From: Bohan Zhang Date: Wed, 11 Sep 2024 16:36:39 +0800 Subject: [PATCH 12/13] Update src/connector/src/parser/additional_columns.rs Co-authored-by: xxchan --- src/connector/src/parser/additional_columns.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/connector/src/parser/additional_columns.rs b/src/connector/src/parser/additional_columns.rs index c889b3b36ddf1..645220b401c5a 100644 --- a/src/connector/src/parser/additional_columns.rs +++ b/src/connector/src/parser/additional_columns.rs @@ -61,8 +61,6 @@ pub static COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock Date: Wed, 11 Sep 2024 17:27:49 +0800 Subject: [PATCH 13/13] check payload s3 source Signed-off-by: tabVersion --- e2e_test/s3/fs_source_v2.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/e2e_test/s3/fs_source_v2.py b/e2e_test/s3/fs_source_v2.py index eaef004dd600a..eb9ec69ae3dd5 100644 --- a/e2e_test/s3/fs_source_v2.py +++ b/e2e_test/s3/fs_source_v2.py @@ -69,7 +69,9 @@ def _encode(): name TEXT, sex int, mark int, - ) WITH ( + ) + INCLUDE payload as rw_payload + WITH ( connector = 's3', match_pattern = '{prefix}*.{fmt}', s3.region_name = '{config['S3_REGION']}', @@ -105,6 +107,18 @@ def _assert_eq(field, got, expect): _assert_eq('sum(sex)', result[2], total_rows / 2) _assert_eq('sum(mark)', result[3], 0) + # check rw_payload + print('Check rw_payload') + stmt = f"select id, name, sex, mark, rw_payload from {_table()} limit 1;" + cur.execute(stmt) + result = cur.fetchone() + print("Got one line with rw_payload: ", result) + payload = json.loads(result[4]) + _assert_eq('id', payload['id'], result[0]) + _assert_eq('name', payload['name'], result[1]) + _assert_eq('sex', payload['sex'], result[2]) + _assert_eq('mark', payload['mark'], result[3]) + print('Test pass') if need_drop_table: