From 9e5c722860facccbf9985bd37a1ab47883e950cf Mon Sep 17 00:00:00 2001 From: tabVersion Date: Fri, 23 Feb 2024 16:28:09 +0800 Subject: [PATCH 01/12] stash --- src/stream/src/from_proto/source/trad_source.rs | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index 28d923ffb69cc..ce1e465b3f43e 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -75,13 +75,24 @@ impl ExecutorBuilder for SourceExecutorBuilder { } let mut source_columns = source.columns.clone(); + // print format, row encode and column spec + tracing::info!( + "source_id: {:?}, source_name: {}, format: {:?}, row_encode: {:?}, columns: {:?}", + source_id, + source_name, + source_info.format(), + source_info.row_encode(), + source_columns.iter().map(|c| {let desc = c.column_desc.as_ref().unwrap(); + (desc.name.clone(), desc.get_column_type())}).collect::>() + ); { // compatible code: introduced in https://github.com/risingwavelabs/risingwave/pull/13707 // for upsert and (avro | protobuf) overwrite the `_rw_key` column's ColumnDesc.additional_column_type to Key if source_info.format() == FormatType::Upsert && (source_info.row_encode() == PbEncodeType::Avro - || source_info.row_encode() == PbEncodeType::Protobuf) + || source_info.row_encode() == PbEncodeType::Protobuf + || source_info.row_encode() == PbEncodeType::Json) { let _ = source_columns.iter_mut().map(|c| { let _ = c.column_desc.as_mut().map(|desc| { @@ -101,6 +112,10 @@ impl ExecutorBuilder for SourceExecutorBuilder { AdditionalColumnKey {}, )), }); + tracing::info!( + "set column {}'s additional_column_type to Key", + desc.name + ); } }); }); From b618167a8216383ddd8e5b9fcdf870738ba729d6 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Fri, 23 Feb 2024 16:54:33 +0800 Subject: [PATCH 02/12] fix --- .../src/from_proto/source/trad_source.rs | 19 ++++--------------- 1 file changed, 4 insertions(+), 15 deletions(-) diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index ce1e465b3f43e..a9ee3310b3100 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -75,17 +75,6 @@ impl ExecutorBuilder for SourceExecutorBuilder { } let mut source_columns = source.columns.clone(); - // print format, row encode and column spec - tracing::info!( - "source_id: {:?}, source_name: {}, format: {:?}, row_encode: {:?}, columns: {:?}", - source_id, - source_name, - source_info.format(), - source_info.row_encode(), - source_columns.iter().map(|c| {let desc = c.column_desc.as_ref().unwrap(); - (desc.name.clone(), desc.get_column_type())}).collect::>() - ); - { // compatible code: introduced in https://github.com/risingwavelabs/risingwave/pull/13707 // for upsert and (avro | protobuf) overwrite the `_rw_key` column's ColumnDesc.additional_column_type to Key @@ -94,8 +83,8 @@ impl ExecutorBuilder for SourceExecutorBuilder { || source_info.row_encode() == PbEncodeType::Protobuf || source_info.row_encode() == PbEncodeType::Json) { - let _ = source_columns.iter_mut().map(|c| { - let _ = c.column_desc.as_mut().map(|desc| { + for c in &mut source_columns { + if let Some(desc) = c.column_desc.as_mut() { let is_bytea = desc .get_column_type() .map(|col_type| col_type.type_name == PbTypeName::Bytea as i32) @@ -117,8 +106,8 @@ impl ExecutorBuilder for SourceExecutorBuilder { desc.name ); } - }); - }); + } + } } } From c5444ab687211386c81042586541bba03c3c1f4f Mon Sep 17 00:00:00 2001 From: tabVersion Date: Fri, 23 Feb 2024 17:15:58 +0800 Subject: [PATCH 03/12] update test --- backwards-compat-tests/scripts/utils.sh | 22 ++++++++++--------- backwards-compat-tests/slt/kafka/seed.slt | 19 ++++++++++++++++ .../slt/kafka/validate_restart.slt | 13 +++++++++++ 3 files changed, 44 insertions(+), 10 deletions(-) diff --git a/backwards-compat-tests/scripts/utils.sh b/backwards-compat-tests/scripts/utils.sh index 1afbf08dd4441..80e269263f3fb 100644 --- a/backwards-compat-tests/scripts/utils.sh +++ b/backwards-compat-tests/scripts/utils.sh @@ -103,19 +103,21 @@ insert_json_kafka() { local JSON=$1 echo "$JSON" | "$KAFKA_PATH"/bin/kafka-console-producer.sh \ --topic backwards_compat_test_kafka_source \ - --bootstrap-server localhost:29092 + --bootstrap-server localhost:29092 \ + --property "parse.key=true" \ + --property "key.separator=," } seed_json_kafka() { - insert_json_kafka '{"timestamp": "2023-07-28 07:11:00", "user_id": 1, "page_id": 1, "action": "gtrgretrg"}' - insert_json_kafka '{"timestamp": "2023-07-28 07:11:00", "user_id": 2, "page_id": 1, "action": "fsdfgerrg"}' - insert_json_kafka '{"timestamp": "2023-07-28 07:11:00", "user_id": 3, "page_id": 1, "action": "sdfergtth"}' - insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 4, "page_id": 2, "action": "erwerhghj"}' - insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 5, "page_id": 2, "action": "kiku7ikkk"}' - insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 6, "page_id": 3, "action": "6786745ge"}' - insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 7, "page_id": 3, "action": "fgbgfnyyy"}' - insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 8, "page_id": 4, "action": "werwerwwe"}' - insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 9, "page_id": 4, "action": "yjtyjtyyy"}' + insert_json_kafka '{"user_id": 1},{"timestamp": "2023-07-28 07:11:00", "user_id": 1, "page_id": 1, "action": "gtrgretrg"}' + insert_json_kafka '{"user_id": 2},{"timestamp": "2023-07-28 07:11:00", "user_id": 2, "page_id": 1, "action": "fsdfgerrg"}' + insert_json_kafka '{"user_id": 3},{"timestamp": "2023-07-28 07:11:00", "user_id": 3, "page_id": 1, "action": "sdfergtth"}' + insert_json_kafka '{"user_id": 4},{"timestamp": "2023-07-28 06:54:00", "user_id": 4, "page_id": 2, "action": "erwerhghj"}' + insert_json_kafka '{"user_id": 5},{"timestamp": "2023-07-28 06:54:00", "user_id": 5, "page_id": 2, "action": "kiku7ikkk"}' + insert_json_kafka '{"user_id": 6},{"timestamp": "2023-07-28 06:54:00", "user_id": 6, "page_id": 3, "action": "6786745ge"}' + insert_json_kafka '{"user_id": 7},{"timestamp": "2023-07-28 06:54:00", "user_id": 7, "page_id": 3, "action": "fgbgfnyyy"}' + insert_json_kafka '{"user_id": 8},{"timestamp": "2023-07-28 06:54:00", "user_id": 8, "page_id": 4, "action": "werwerwwe"}' + insert_json_kafka '{"user_id": 9},{"timestamp": "2023-07-28 06:54:00", "user_id": 9, "page_id": 4, "action": "yjtyjtyyy"}' } # https://stackoverflow.com/a/4024263 diff --git a/backwards-compat-tests/slt/kafka/seed.slt b/backwards-compat-tests/slt/kafka/seed.slt index 3840ce0c96b15..16a12498bbf1d 100644 --- a/backwards-compat-tests/slt/kafka/seed.slt +++ b/backwards-compat-tests/slt/kafka/seed.slt @@ -17,3 +17,22 @@ WITH ( statement ok CREATE MATERIALIZED VIEW kafka_mv1 as SELECT * FROM kafka_source; + +statement ok +CREATE TABLE IF NOT EXISTS kafka_table +( + action varchar, + user_id integer, + obj_id integer, + name varchar, + page_id integer, + age integer, + primary key (_rw_key) +) +INCLUDE key as _rw_key +WITH ( + connector='kafka', + topic='backwards_compat_test_kafka_source', + properties.bootstrap.server='localhost:29092', + scan.startup.mode='earliest', +) FORMAT UPSERT ENCODE JSON; diff --git a/backwards-compat-tests/slt/kafka/validate_restart.slt b/backwards-compat-tests/slt/kafka/validate_restart.slt index 7058b118f4d20..ee050ca4bb8c5 100644 --- a/backwards-compat-tests/slt/kafka/validate_restart.slt +++ b/backwards-compat-tests/slt/kafka/validate_restart.slt @@ -50,3 +50,16 @@ werwerwwe 8 NULL NULL 4 NULL yjtyjtyyy 9 NULL NULL 4 NULL yjtyjtyyy 9 NULL NULL 4 NULL +# kafka_table should do the upsert and overwrite the existing records +query I rowsort +SELECT * FROM kafka_table; +---- +6786745ge 6 NULL NULL 3 NULL +erwerhghj 4 NULL NULL 2 NULL +fgbgfnyyy 7 NULL NULL 3 NULL +fsdfgerrg 2 NULL NULL 1 NULL +gtrgretrg 1 NULL NULL 1 NULL +kiku7ikkk 5 NULL NULL 2 NULL +sdfergtth 3 NULL NULL 1 NULL +werwerwwe 8 NULL NULL 4 NULL +yjtyjtyyy 9 NULL NULL 4 NULL From d50bc6b0512e07ef3cc9ba229a57046751138d3d Mon Sep 17 00:00:00 2001 From: tabVersion Date: Fri, 23 Feb 2024 21:48:57 +0800 Subject: [PATCH 04/12] fix compatibility v1.6.x --- proto/plan_common.proto | 18 +++++++++++---- src/common/src/catalog/column.rs | 2 ++ src/common/src/catalog/test_utils.rs | 1 + src/connector/src/parser/avro/util.rs | 1 + src/connector/src/parser/protobuf/parser.rs | 1 + .../src/from_proto/source/trad_source.rs | 22 +++++++++++++------ 6 files changed, 34 insertions(+), 11 deletions(-) diff --git a/proto/plan_common.proto b/proto/plan_common.proto index 1dd45ad08a6ef..da4810c7df550 100644 --- a/proto/plan_common.proto +++ b/proto/plan_common.proto @@ -54,10 +54,8 @@ message ColumnDesc { // This field is used to represent the connector-spec additional column type. // UNSPECIFIED or unset for normal column. - // deprecated, use AdditionalColumn instead - // AdditionalColumnType additional_column_type = 9; - reserved "additional_column_type"; - reserved 9; + // deprecated, use AdditionalColumn instead, keep for compatibility with v1.6.x + AdditionalColumnType additional_column_type = 9; ColumnDescVersion version = 10; @@ -218,3 +216,15 @@ message AdditionalColumn { AdditionalColumnHeaders headers = 7; } } + +enum AdditionalColumnType { + AdditionalColumnType_UNSPECIFIED = 0; + AdditionalColumnType_KEY = 1; + AdditionalColumnType_TIMESTAMP = 2; + AdditionalColumnType_PARTITION = 3; + AdditionalColumnType_OFFSET = 4; + AdditionalColumnType_HEADER = 5; + AdditionalColumnType_FILENAME = 6; + AdditionalColumnType_NORMAL = 7; +} + diff --git a/src/common/src/catalog/column.rs b/src/common/src/catalog/column.rs index f82e96a80c0e2..82d2f22f41cb4 100644 --- a/src/common/src/catalog/column.rs +++ b/src/common/src/catalog/column.rs @@ -170,6 +170,7 @@ impl ColumnDesc { type_name: self.type_name.clone(), generated_or_default_column: self.generated_or_default_column.clone(), description: self.description.clone(), + additional_column_type: 0, // deprecated additional_column: Some(self.additional_column.clone()), version: self.version as i32, } @@ -305,6 +306,7 @@ impl From<&ColumnDesc> for PbColumnDesc { type_name: c.type_name.clone(), generated_or_default_column: c.generated_or_default_column.clone(), description: c.description.clone(), + additional_column_type: 0, // deprecated additional_column: c.additional_column.clone().into(), version: c.version as i32, } diff --git a/src/common/src/catalog/test_utils.rs b/src/common/src/catalog/test_utils.rs index 9930a5717b849..ae87b3a881f84 100644 --- a/src/common/src/catalog/test_utils.rs +++ b/src/common/src/catalog/test_utils.rs @@ -60,6 +60,7 @@ impl ColumnDescTestExt for ColumnDesc { field_descs: fields, generated_or_default_column: None, description: None, + additional_column_type: 0, // deprecated additional_column: Some(AdditionalColumn { column_type: None }), version: ColumnDescVersion::Pr13707 as i32, } diff --git a/src/connector/src/parser/avro/util.rs b/src/connector/src/parser/avro/util.rs index 8d2d4265883e6..354201e55419d 100644 --- a/src/connector/src/parser/avro/util.rs +++ b/src/connector/src/parser/avro/util.rs @@ -61,6 +61,7 @@ fn avro_field_to_column_desc( type_name: schema_name.to_string(), generated_or_default_column: None, description: None, + additional_column_type: 0, // deprecated additional_column: Some(AdditionalColumn { column_type: None }), version: ColumnDescVersion::Pr13707 as i32, }) diff --git a/src/connector/src/parser/protobuf/parser.rs b/src/connector/src/parser/protobuf/parser.rs index 922705e3d3f8f..3064562057317 100644 --- a/src/connector/src/parser/protobuf/parser.rs +++ b/src/connector/src/parser/protobuf/parser.rs @@ -172,6 +172,7 @@ impl ProtobufParserConfig { type_name: m.full_name().to_string(), generated_or_default_column: None, description: None, + additional_column_type: 0, // deprecated additional_column: Some(AdditionalColumn { column_type: None }), version: ColumnDescVersion::Pr13707 as i32, }) diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index a9ee3310b3100..e2f20af63ed08 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -22,8 +22,9 @@ use risingwave_connector::source::{ use risingwave_pb::data::data_type::TypeName as PbTypeName; use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType; use risingwave_pb::plan_common::{ - AdditionalColumn, AdditionalColumnKey, AdditionalColumnTimestamp, ColumnDescVersion, - FormatType, PbEncodeType, + AdditionalColumn, AdditionalColumnKey, AdditionalColumnTimestamp, + AdditionalColumnType as LegacyAdditionalColumnType, ColumnDescVersion, FormatType, + PbEncodeType, }; use risingwave_pb::stream_plan::SourceNode; use risingwave_storage::panic_store::PanicStateStore; @@ -93,7 +94,7 @@ impl ExecutorBuilder for SourceExecutorBuilder { &desc.version() ) && is_bytea - // the column is from a legacy version + // the column is from a legacy version (before v1.5.x) && desc.version == ColumnDescVersion::Unspecified as i32 { desc.additional_column = Some(AdditionalColumn { @@ -101,10 +102,17 @@ impl ExecutorBuilder for SourceExecutorBuilder { AdditionalColumnKey {}, )), }); - tracing::info!( - "set column {}'s additional_column_type to Key", - desc.name - ); + } + + // the column is from a legacy version (v1.6.x) + if desc.additional_column_type + == LegacyAdditionalColumnType::Key as i32 + { + desc.additional_column = Some(AdditionalColumn { + column_type: Some(AdditionalColumnType::Key( + AdditionalColumnKey {}, + )), + }); } } } From 38ba30e98d521662fe7e6df31f187d549c6b61fe Mon Sep 17 00:00:00 2001 From: tabVersion Date: Fri, 23 Feb 2024 22:00:48 +0800 Subject: [PATCH 05/12] fix --- proto/plan_common.proto | 1 - 1 file changed, 1 deletion(-) diff --git a/proto/plan_common.proto b/proto/plan_common.proto index da4810c7df550..866829c64140c 100644 --- a/proto/plan_common.proto +++ b/proto/plan_common.proto @@ -227,4 +227,3 @@ enum AdditionalColumnType { AdditionalColumnType_FILENAME = 6; AdditionalColumnType_NORMAL = 7; } - From 598532936acaf2f7ea424b36ab2689c36a0079e1 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Fri, 23 Feb 2024 22:05:53 +0800 Subject: [PATCH 06/12] fix --- proto/plan_common.proto | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/proto/plan_common.proto b/proto/plan_common.proto index 866829c64140c..79a1b1622704e 100644 --- a/proto/plan_common.proto +++ b/proto/plan_common.proto @@ -218,12 +218,12 @@ message AdditionalColumn { } enum AdditionalColumnType { - AdditionalColumnType_UNSPECIFIED = 0; - AdditionalColumnType_KEY = 1; - AdditionalColumnType_TIMESTAMP = 2; - AdditionalColumnType_PARTITION = 3; - AdditionalColumnType_OFFSET = 4; - AdditionalColumnType_HEADER = 5; - AdditionalColumnType_FILENAME = 6; - AdditionalColumnType_NORMAL = 7; + ADDITIONAL_COLUMN_TYPE_UNSPECIFIED = 0; + ADDITIONAL_COLUMN_TYPE_KEY = 1; + ADDITIONAL_COLUMN_TYPE_TIMESTAMP = 2; + ADDITIONAL_COLUMN_TYPE_PARTITION = 3; + ADDITIONAL_COLUMN_TYPE_OFFSET = 4; + ADDITIONAL_COLUMN_TYPE_HEADER = 5; + ADDITIONAL_COLUMN_TYPE_FILENAME = 6; + ADDITIONAL_COLUMN_TYPE_NORMAL = 7; } From 903b8a5a7cca7dfb8caadaccb5d125396c0d5bd6 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Fri, 23 Feb 2024 22:14:53 +0800 Subject: [PATCH 07/12] rerun Signed-off-by: tabVersion From ccbb1875bf215bd4d6d0701835fb60ae02cc635d Mon Sep 17 00:00:00 2001 From: tabVersion Date: Fri, 23 Feb 2024 22:15:01 +0800 Subject: [PATCH 08/12] rerun Signed-off-by: tabVersion From 0560e94babf8566c69c7a1da39b86896c48341d9 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Sun, 25 Feb 2024 23:01:37 +0800 Subject: [PATCH 09/12] add key part in slt --- .../slt/kafka/validate_restart.slt | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/backwards-compat-tests/slt/kafka/validate_restart.slt b/backwards-compat-tests/slt/kafka/validate_restart.slt index ee050ca4bb8c5..1e53b4dd5673c 100644 --- a/backwards-compat-tests/slt/kafka/validate_restart.slt +++ b/backwards-compat-tests/slt/kafka/validate_restart.slt @@ -54,12 +54,12 @@ yjtyjtyyy 9 NULL NULL 4 NULL query I rowsort SELECT * FROM kafka_table; ---- -6786745ge 6 NULL NULL 3 NULL -erwerhghj 4 NULL NULL 2 NULL -fgbgfnyyy 7 NULL NULL 3 NULL -fsdfgerrg 2 NULL NULL 1 NULL -gtrgretrg 1 NULL NULL 1 NULL -kiku7ikkk 5 NULL NULL 2 NULL -sdfergtth 3 NULL NULL 1 NULL -werwerwwe 8 NULL NULL 4 NULL -yjtyjtyyy 9 NULL NULL 4 NULL +6786745ge 6 NULL NULL 3 NULL \x7b22757365725f6964223a20367d +erwerhghj 4 NULL NULL 2 NULL \x7b22757365725f6964223a20347d +fgbgfnyyy 7 NULL NULL 3 NULL \x7b22757365725f6964223a20377d +fsdfgerrg 2 NULL NULL 1 NULL \x7b22757365725f6964223a20327d +gtrgretrg 1 NULL NULL 1 NULL \x7b22757365725f6964223a20317d +kiku7ikkk 5 NULL NULL 2 NULL \x7b22757365725f6964223a20357d +sdfergtth 3 NULL NULL 1 NULL \x7b22757365725f6964223a20337d +werwerwwe 8 NULL NULL 4 NULL \x7b22757365725f6964223a20387d +yjtyjtyyy 9 NULL NULL 4 NULL \x7b22757365725f6964223a20397d From bf6d3c6f5375be252e072ad23895d6bf3f927524 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Sun, 25 Feb 2024 23:03:25 +0800 Subject: [PATCH 10/12] add pr link in comments --- src/stream/src/from_proto/source/trad_source.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index e2f20af63ed08..ca74369178eb8 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -105,6 +105,7 @@ impl ExecutorBuilder for SourceExecutorBuilder { } // the column is from a legacy version (v1.6.x) + // introduced in https://github.com/risingwavelabs/risingwave/pull/15226 if desc.additional_column_type == LegacyAdditionalColumnType::Key as i32 { From 8df830d919a13825cb5e46e7b6daab2ae195c585 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Mon, 26 Feb 2024 10:50:18 +0800 Subject: [PATCH 11/12] rerun Signed-off-by: tabVersion From a95e142af738560322573b25038c38c15f818a38 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Mon, 26 Feb 2024 11:26:55 +0800 Subject: [PATCH 12/12] config syntax for different version --- backwards-compat-tests/scripts/utils.sh | 6 ++++++ backwards-compat-tests/slt/kafka/seed.slt | 19 ------------------- .../slt/kafka/upsert/deprecate_upsert.slt | 16 ++++++++++++++++ .../slt/kafka/upsert/include_key_as.slt | 18 ++++++++++++++++++ .../slt/kafka/validate_restart.slt | 2 +- 5 files changed, 41 insertions(+), 20 deletions(-) create mode 100644 backwards-compat-tests/slt/kafka/upsert/deprecate_upsert.slt create mode 100644 backwards-compat-tests/slt/kafka/upsert/include_key_as.slt diff --git a/backwards-compat-tests/scripts/utils.sh b/backwards-compat-tests/scripts/utils.sh index 80e269263f3fb..5990aac026077 100644 --- a/backwards-compat-tests/scripts/utils.sh +++ b/backwards-compat-tests/scripts/utils.sh @@ -227,6 +227,12 @@ seed_old_cluster() { create_kafka_topic seed_json_kafka sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/kafka/seed.slt" + # use the old syntax for version at most 1.5.4 + if version_le "$OLD_VERSION" "1.5.4" ; then + sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/kafka/upsert/deprecate_upsert.slt" + else + sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/kafka/upsert/include_key_as.slt" + fi echo "--- KAFKA TEST: wait 5s for kafka to process data" sleep 5 diff --git a/backwards-compat-tests/slt/kafka/seed.slt b/backwards-compat-tests/slt/kafka/seed.slt index 16a12498bbf1d..3840ce0c96b15 100644 --- a/backwards-compat-tests/slt/kafka/seed.slt +++ b/backwards-compat-tests/slt/kafka/seed.slt @@ -17,22 +17,3 @@ WITH ( statement ok CREATE MATERIALIZED VIEW kafka_mv1 as SELECT * FROM kafka_source; - -statement ok -CREATE TABLE IF NOT EXISTS kafka_table -( - action varchar, - user_id integer, - obj_id integer, - name varchar, - page_id integer, - age integer, - primary key (_rw_key) -) -INCLUDE key as _rw_key -WITH ( - connector='kafka', - topic='backwards_compat_test_kafka_source', - properties.bootstrap.server='localhost:29092', - scan.startup.mode='earliest', -) FORMAT UPSERT ENCODE JSON; diff --git a/backwards-compat-tests/slt/kafka/upsert/deprecate_upsert.slt b/backwards-compat-tests/slt/kafka/upsert/deprecate_upsert.slt new file mode 100644 index 0000000000000..55cfce886455d --- /dev/null +++ b/backwards-compat-tests/slt/kafka/upsert/deprecate_upsert.slt @@ -0,0 +1,16 @@ +statement ok +CREATE TABLE IF NOT EXISTS kafka_table +( + action varchar, + user_id integer, + obj_id integer, + name varchar, + page_id integer, + age integer +) +WITH ( + connector='kafka', + topic='backwards_compat_test_kafka_source', + properties.bootstrap.server='localhost:29092', + scan.startup.mode='earliest', +) FORMAT UPSERT ENCODE JSON; \ No newline at end of file diff --git a/backwards-compat-tests/slt/kafka/upsert/include_key_as.slt b/backwards-compat-tests/slt/kafka/upsert/include_key_as.slt new file mode 100644 index 0000000000000..36ef426574223 --- /dev/null +++ b/backwards-compat-tests/slt/kafka/upsert/include_key_as.slt @@ -0,0 +1,18 @@ +statement ok +CREATE TABLE IF NOT EXISTS kafka_table +( + action varchar, + user_id integer, + obj_id integer, + name varchar, + page_id integer, + age integer, + primary key (_rw_key) +) +INCLUDE key as _rw_key +WITH ( + connector='kafka', + topic='backwards_compat_test_kafka_source', + properties.bootstrap.server='localhost:29092', + scan.startup.mode='earliest', +) FORMAT UPSERT ENCODE JSON; \ No newline at end of file diff --git a/backwards-compat-tests/slt/kafka/validate_restart.slt b/backwards-compat-tests/slt/kafka/validate_restart.slt index 1e53b4dd5673c..6d853007b9829 100644 --- a/backwards-compat-tests/slt/kafka/validate_restart.slt +++ b/backwards-compat-tests/slt/kafka/validate_restart.slt @@ -52,7 +52,7 @@ yjtyjtyyy 9 NULL NULL 4 NULL # kafka_table should do the upsert and overwrite the existing records query I rowsort -SELECT * FROM kafka_table; +SELECT action, user_id, obj_id, name, page_id, age, _rw_key FROM kafka_table; ---- 6786745ge 6 NULL NULL 3 NULL \x7b22757365725f6964223a20367d erwerhghj 4 NULL NULL 2 NULL \x7b22757365725f6964223a20347d