From fcdeb3f75a1ef443b3dac1b1dfcde47379ec506a Mon Sep 17 00:00:00 2001 From: Bohan Zhang Date: Mon, 26 Feb 2024 12:11:44 +0800 Subject: [PATCH] fix: handle upsert json in prev versions (#15226) Signed-off-by: tabVersion --- backwards-compat-tests/scripts/utils.sh | 28 +++++++++++------ .../slt/kafka/upsert/deprecate_upsert.slt | 16 ++++++++++ .../slt/kafka/upsert/include_key_as.slt | 18 +++++++++++ .../slt/kafka/validate_restart.slt | 13 ++++++++ proto/plan_common.proto | 17 +++++++--- src/common/src/catalog/column.rs | 2 ++ src/common/src/catalog/test_utils.rs | 1 + src/connector/src/parser/avro/util.rs | 1 + src/connector/src/parser/protobuf/parser.rs | 1 + .../src/from_proto/source/trad_source.rs | 31 +++++++++++++------ 10 files changed, 105 insertions(+), 23 deletions(-) create mode 100644 backwards-compat-tests/slt/kafka/upsert/deprecate_upsert.slt create mode 100644 backwards-compat-tests/slt/kafka/upsert/include_key_as.slt diff --git a/backwards-compat-tests/scripts/utils.sh b/backwards-compat-tests/scripts/utils.sh index 1afbf08dd4441..5990aac026077 100644 --- a/backwards-compat-tests/scripts/utils.sh +++ b/backwards-compat-tests/scripts/utils.sh @@ -103,19 +103,21 @@ insert_json_kafka() { local JSON=$1 echo "$JSON" | "$KAFKA_PATH"/bin/kafka-console-producer.sh \ --topic backwards_compat_test_kafka_source \ - --bootstrap-server localhost:29092 + --bootstrap-server localhost:29092 \ + --property "parse.key=true" \ + --property "key.separator=," } seed_json_kafka() { - insert_json_kafka '{"timestamp": "2023-07-28 07:11:00", "user_id": 1, "page_id": 1, "action": "gtrgretrg"}' - insert_json_kafka '{"timestamp": "2023-07-28 07:11:00", "user_id": 2, "page_id": 1, "action": "fsdfgerrg"}' - insert_json_kafka '{"timestamp": "2023-07-28 07:11:00", "user_id": 3, "page_id": 1, "action": "sdfergtth"}' - insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 4, "page_id": 2, "action": "erwerhghj"}' - insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 5, "page_id": 2, "action": "kiku7ikkk"}' - insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 6, "page_id": 3, "action": "6786745ge"}' - insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 7, "page_id": 3, "action": "fgbgfnyyy"}' - insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 8, "page_id": 4, "action": "werwerwwe"}' - insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 9, "page_id": 4, "action": "yjtyjtyyy"}' + insert_json_kafka '{"user_id": 1},{"timestamp": "2023-07-28 07:11:00", "user_id": 1, "page_id": 1, "action": "gtrgretrg"}' + insert_json_kafka '{"user_id": 2},{"timestamp": "2023-07-28 07:11:00", "user_id": 2, "page_id": 1, "action": "fsdfgerrg"}' + insert_json_kafka '{"user_id": 3},{"timestamp": "2023-07-28 07:11:00", "user_id": 3, "page_id": 1, "action": "sdfergtth"}' + insert_json_kafka '{"user_id": 4},{"timestamp": "2023-07-28 06:54:00", "user_id": 4, "page_id": 2, "action": "erwerhghj"}' + insert_json_kafka '{"user_id": 5},{"timestamp": "2023-07-28 06:54:00", "user_id": 5, "page_id": 2, "action": "kiku7ikkk"}' + insert_json_kafka '{"user_id": 6},{"timestamp": "2023-07-28 06:54:00", "user_id": 6, "page_id": 3, "action": "6786745ge"}' + insert_json_kafka '{"user_id": 7},{"timestamp": "2023-07-28 06:54:00", "user_id": 7, "page_id": 3, "action": "fgbgfnyyy"}' + insert_json_kafka '{"user_id": 8},{"timestamp": "2023-07-28 06:54:00", "user_id": 8, "page_id": 4, "action": "werwerwwe"}' + insert_json_kafka '{"user_id": 9},{"timestamp": "2023-07-28 06:54:00", "user_id": 9, "page_id": 4, "action": "yjtyjtyyy"}' } # https://stackoverflow.com/a/4024263 @@ -225,6 +227,12 @@ seed_old_cluster() { create_kafka_topic seed_json_kafka sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/kafka/seed.slt" + # use the old syntax for version at most 1.5.4 + if version_le "$OLD_VERSION" "1.5.4" ; then + sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/kafka/upsert/deprecate_upsert.slt" + else + sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/kafka/upsert/include_key_as.slt" + fi echo "--- KAFKA TEST: wait 5s for kafka to process data" sleep 5 diff --git a/backwards-compat-tests/slt/kafka/upsert/deprecate_upsert.slt b/backwards-compat-tests/slt/kafka/upsert/deprecate_upsert.slt new file mode 100644 index 0000000000000..55cfce886455d --- /dev/null +++ b/backwards-compat-tests/slt/kafka/upsert/deprecate_upsert.slt @@ -0,0 +1,16 @@ +statement ok +CREATE TABLE IF NOT EXISTS kafka_table +( + action varchar, + user_id integer, + obj_id integer, + name varchar, + page_id integer, + age integer +) +WITH ( + connector='kafka', + topic='backwards_compat_test_kafka_source', + properties.bootstrap.server='localhost:29092', + scan.startup.mode='earliest', +) FORMAT UPSERT ENCODE JSON; \ No newline at end of file diff --git a/backwards-compat-tests/slt/kafka/upsert/include_key_as.slt b/backwards-compat-tests/slt/kafka/upsert/include_key_as.slt new file mode 100644 index 0000000000000..36ef426574223 --- /dev/null +++ b/backwards-compat-tests/slt/kafka/upsert/include_key_as.slt @@ -0,0 +1,18 @@ +statement ok +CREATE TABLE IF NOT EXISTS kafka_table +( + action varchar, + user_id integer, + obj_id integer, + name varchar, + page_id integer, + age integer, + primary key (_rw_key) +) +INCLUDE key as _rw_key +WITH ( + connector='kafka', + topic='backwards_compat_test_kafka_source', + properties.bootstrap.server='localhost:29092', + scan.startup.mode='earliest', +) FORMAT UPSERT ENCODE JSON; \ No newline at end of file diff --git a/backwards-compat-tests/slt/kafka/validate_restart.slt b/backwards-compat-tests/slt/kafka/validate_restart.slt index 7058b118f4d20..6d853007b9829 100644 --- a/backwards-compat-tests/slt/kafka/validate_restart.slt +++ b/backwards-compat-tests/slt/kafka/validate_restart.slt @@ -50,3 +50,16 @@ werwerwwe 8 NULL NULL 4 NULL yjtyjtyyy 9 NULL NULL 4 NULL yjtyjtyyy 9 NULL NULL 4 NULL +# kafka_table should do the upsert and overwrite the existing records +query I rowsort +SELECT action, user_id, obj_id, name, page_id, age, _rw_key FROM kafka_table; +---- +6786745ge 6 NULL NULL 3 NULL \x7b22757365725f6964223a20367d +erwerhghj 4 NULL NULL 2 NULL \x7b22757365725f6964223a20347d +fgbgfnyyy 7 NULL NULL 3 NULL \x7b22757365725f6964223a20377d +fsdfgerrg 2 NULL NULL 1 NULL \x7b22757365725f6964223a20327d +gtrgretrg 1 NULL NULL 1 NULL \x7b22757365725f6964223a20317d +kiku7ikkk 5 NULL NULL 2 NULL \x7b22757365725f6964223a20357d +sdfergtth 3 NULL NULL 1 NULL \x7b22757365725f6964223a20337d +werwerwwe 8 NULL NULL 4 NULL \x7b22757365725f6964223a20387d +yjtyjtyyy 9 NULL NULL 4 NULL \x7b22757365725f6964223a20397d diff --git a/proto/plan_common.proto b/proto/plan_common.proto index 1dd45ad08a6ef..79a1b1622704e 100644 --- a/proto/plan_common.proto +++ b/proto/plan_common.proto @@ -54,10 +54,8 @@ message ColumnDesc { // This field is used to represent the connector-spec additional column type. // UNSPECIFIED or unset for normal column. - // deprecated, use AdditionalColumn instead - // AdditionalColumnType additional_column_type = 9; - reserved "additional_column_type"; - reserved 9; + // deprecated, use AdditionalColumn instead, keep for compatibility with v1.6.x + AdditionalColumnType additional_column_type = 9; ColumnDescVersion version = 10; @@ -218,3 +216,14 @@ message AdditionalColumn { AdditionalColumnHeaders headers = 7; } } + +enum AdditionalColumnType { + ADDITIONAL_COLUMN_TYPE_UNSPECIFIED = 0; + ADDITIONAL_COLUMN_TYPE_KEY = 1; + ADDITIONAL_COLUMN_TYPE_TIMESTAMP = 2; + ADDITIONAL_COLUMN_TYPE_PARTITION = 3; + ADDITIONAL_COLUMN_TYPE_OFFSET = 4; + ADDITIONAL_COLUMN_TYPE_HEADER = 5; + ADDITIONAL_COLUMN_TYPE_FILENAME = 6; + ADDITIONAL_COLUMN_TYPE_NORMAL = 7; +} diff --git a/src/common/src/catalog/column.rs b/src/common/src/catalog/column.rs index f82e96a80c0e2..82d2f22f41cb4 100644 --- a/src/common/src/catalog/column.rs +++ b/src/common/src/catalog/column.rs @@ -170,6 +170,7 @@ impl ColumnDesc { type_name: self.type_name.clone(), generated_or_default_column: self.generated_or_default_column.clone(), description: self.description.clone(), + additional_column_type: 0, // deprecated additional_column: Some(self.additional_column.clone()), version: self.version as i32, } @@ -305,6 +306,7 @@ impl From<&ColumnDesc> for PbColumnDesc { type_name: c.type_name.clone(), generated_or_default_column: c.generated_or_default_column.clone(), description: c.description.clone(), + additional_column_type: 0, // deprecated additional_column: c.additional_column.clone().into(), version: c.version as i32, } diff --git a/src/common/src/catalog/test_utils.rs b/src/common/src/catalog/test_utils.rs index 9930a5717b849..ae87b3a881f84 100644 --- a/src/common/src/catalog/test_utils.rs +++ b/src/common/src/catalog/test_utils.rs @@ -60,6 +60,7 @@ impl ColumnDescTestExt for ColumnDesc { field_descs: fields, generated_or_default_column: None, description: None, + additional_column_type: 0, // deprecated additional_column: Some(AdditionalColumn { column_type: None }), version: ColumnDescVersion::Pr13707 as i32, } diff --git a/src/connector/src/parser/avro/util.rs b/src/connector/src/parser/avro/util.rs index ba065b7da4dc4..958f4c9ca5db5 100644 --- a/src/connector/src/parser/avro/util.rs +++ b/src/connector/src/parser/avro/util.rs @@ -64,6 +64,7 @@ fn avro_field_to_column_desc( type_name: schema_name.to_string(), generated_or_default_column: None, description: None, + additional_column_type: 0, // deprecated additional_column: Some(AdditionalColumn { column_type: None }), version: ColumnDescVersion::Pr13707 as i32, }) diff --git a/src/connector/src/parser/protobuf/parser.rs b/src/connector/src/parser/protobuf/parser.rs index d4287a869b221..4248fa2b7470c 100644 --- a/src/connector/src/parser/protobuf/parser.rs +++ b/src/connector/src/parser/protobuf/parser.rs @@ -174,6 +174,7 @@ impl ProtobufParserConfig { type_name: m.full_name().to_string(), generated_or_default_column: None, description: None, + additional_column_type: 0, // deprecated additional_column: Some(AdditionalColumn { column_type: None }), version: ColumnDescVersion::Pr13707 as i32, }) diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index 142b4ad9e1553..8ce6b88b0196b 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -22,8 +22,9 @@ use risingwave_connector::source::{ use risingwave_pb::data::data_type::TypeName as PbTypeName; use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType; use risingwave_pb::plan_common::{ - AdditionalColumn, AdditionalColumnKey, AdditionalColumnTimestamp, ColumnDescVersion, - FormatType, PbEncodeType, + AdditionalColumn, AdditionalColumnKey, AdditionalColumnTimestamp, + AdditionalColumnType as LegacyAdditionalColumnType, ColumnDescVersion, FormatType, + PbEncodeType, }; use risingwave_pb::stream_plan::SourceNode; use risingwave_storage::panic_store::PanicStateStore; @@ -75,16 +76,16 @@ impl ExecutorBuilder for SourceExecutorBuilder { } let mut source_columns = source.columns.clone(); - { // compatible code: introduced in https://github.com/risingwavelabs/risingwave/pull/13707 // for upsert and (avro | protobuf) overwrite the `_rw_key` column's ColumnDesc.additional_column_type to Key if source_info.format() == FormatType::Upsert && (source_info.row_encode() == PbEncodeType::Avro - || source_info.row_encode() == PbEncodeType::Protobuf) + || source_info.row_encode() == PbEncodeType::Protobuf + || source_info.row_encode() == PbEncodeType::Json) { - let _ = source_columns.iter_mut().map(|c| { - let _ = c.column_desc.as_mut().map(|desc| { + for c in &mut source_columns { + if let Some(desc) = c.column_desc.as_mut() { let is_bytea = desc .get_column_type() .map(|col_type| col_type.type_name == PbTypeName::Bytea as i32) @@ -93,7 +94,7 @@ impl ExecutorBuilder for SourceExecutorBuilder { &desc.version() ) && is_bytea - // the column is from a legacy version + // the column is from a legacy version (before v1.5.x) && desc.version == ColumnDescVersion::Unspecified as i32 { desc.additional_column = Some(AdditionalColumn { @@ -102,8 +103,20 @@ impl ExecutorBuilder for SourceExecutorBuilder { )), }); } - }); - }); + + // the column is from a legacy version (v1.6.x) + // introduced in https://github.com/risingwavelabs/risingwave/pull/15226 + if desc.additional_column_type + == LegacyAdditionalColumnType::Key as i32 + { + desc.additional_column = Some(AdditionalColumn { + column_type: Some(AdditionalColumnType::Key( + AdditionalColumnKey {}, + )), + }); + } + } + } } }