Skip to content

Commit

Permalink
fix: handle upsert json in prev versions (#15226)
Browse files Browse the repository at this point in the history
Signed-off-by: tabVersion <[email protected]>
  • Loading branch information
tabVersion authored Feb 26, 2024
1 parent e19aaec commit fcdeb3f
Show file tree
Hide file tree
Showing 10 changed files with 105 additions and 23 deletions.
28 changes: 18 additions & 10 deletions backwards-compat-tests/scripts/utils.sh
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,21 @@ insert_json_kafka() {
local JSON=$1
echo "$JSON" | "$KAFKA_PATH"/bin/kafka-console-producer.sh \
--topic backwards_compat_test_kafka_source \
--bootstrap-server localhost:29092
--bootstrap-server localhost:29092 \
--property "parse.key=true" \
--property "key.separator=,"
}

seed_json_kafka() {
insert_json_kafka '{"timestamp": "2023-07-28 07:11:00", "user_id": 1, "page_id": 1, "action": "gtrgretrg"}'
insert_json_kafka '{"timestamp": "2023-07-28 07:11:00", "user_id": 2, "page_id": 1, "action": "fsdfgerrg"}'
insert_json_kafka '{"timestamp": "2023-07-28 07:11:00", "user_id": 3, "page_id": 1, "action": "sdfergtth"}'
insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 4, "page_id": 2, "action": "erwerhghj"}'
insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 5, "page_id": 2, "action": "kiku7ikkk"}'
insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 6, "page_id": 3, "action": "6786745ge"}'
insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 7, "page_id": 3, "action": "fgbgfnyyy"}'
insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 8, "page_id": 4, "action": "werwerwwe"}'
insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 9, "page_id": 4, "action": "yjtyjtyyy"}'
insert_json_kafka '{"user_id": 1},{"timestamp": "2023-07-28 07:11:00", "user_id": 1, "page_id": 1, "action": "gtrgretrg"}'
insert_json_kafka '{"user_id": 2},{"timestamp": "2023-07-28 07:11:00", "user_id": 2, "page_id": 1, "action": "fsdfgerrg"}'
insert_json_kafka '{"user_id": 3},{"timestamp": "2023-07-28 07:11:00", "user_id": 3, "page_id": 1, "action": "sdfergtth"}'
insert_json_kafka '{"user_id": 4},{"timestamp": "2023-07-28 06:54:00", "user_id": 4, "page_id": 2, "action": "erwerhghj"}'
insert_json_kafka '{"user_id": 5},{"timestamp": "2023-07-28 06:54:00", "user_id": 5, "page_id": 2, "action": "kiku7ikkk"}'
insert_json_kafka '{"user_id": 6},{"timestamp": "2023-07-28 06:54:00", "user_id": 6, "page_id": 3, "action": "6786745ge"}'
insert_json_kafka '{"user_id": 7},{"timestamp": "2023-07-28 06:54:00", "user_id": 7, "page_id": 3, "action": "fgbgfnyyy"}'
insert_json_kafka '{"user_id": 8},{"timestamp": "2023-07-28 06:54:00", "user_id": 8, "page_id": 4, "action": "werwerwwe"}'
insert_json_kafka '{"user_id": 9},{"timestamp": "2023-07-28 06:54:00", "user_id": 9, "page_id": 4, "action": "yjtyjtyyy"}'
}

# https://stackoverflow.com/a/4024263
Expand Down Expand Up @@ -225,6 +227,12 @@ seed_old_cluster() {
create_kafka_topic
seed_json_kafka
sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/kafka/seed.slt"
# use the old syntax for version at most 1.5.4
if version_le "$OLD_VERSION" "1.5.4" ; then
sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/kafka/upsert/deprecate_upsert.slt"
else
sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/kafka/upsert/include_key_as.slt"
fi

echo "--- KAFKA TEST: wait 5s for kafka to process data"
sleep 5
Expand Down
16 changes: 16 additions & 0 deletions backwards-compat-tests/slt/kafka/upsert/deprecate_upsert.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
statement ok
CREATE TABLE IF NOT EXISTS kafka_table
(
action varchar,
user_id integer,
obj_id integer,
name varchar,
page_id integer,
age integer
)
WITH (
connector='kafka',
topic='backwards_compat_test_kafka_source',
properties.bootstrap.server='localhost:29092',
scan.startup.mode='earliest',
) FORMAT UPSERT ENCODE JSON;
18 changes: 18 additions & 0 deletions backwards-compat-tests/slt/kafka/upsert/include_key_as.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
statement ok
CREATE TABLE IF NOT EXISTS kafka_table
(
action varchar,
user_id integer,
obj_id integer,
name varchar,
page_id integer,
age integer,
primary key (_rw_key)
)
INCLUDE key as _rw_key
WITH (
connector='kafka',
topic='backwards_compat_test_kafka_source',
properties.bootstrap.server='localhost:29092',
scan.startup.mode='earliest',
) FORMAT UPSERT ENCODE JSON;
13 changes: 13 additions & 0 deletions backwards-compat-tests/slt/kafka/validate_restart.slt
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,16 @@ werwerwwe 8 NULL NULL 4 NULL
yjtyjtyyy 9 NULL NULL 4 NULL
yjtyjtyyy 9 NULL NULL 4 NULL

# kafka_table should do the upsert and overwrite the existing records
query I rowsort
SELECT action, user_id, obj_id, name, page_id, age, _rw_key FROM kafka_table;
----
6786745ge 6 NULL NULL 3 NULL \x7b22757365725f6964223a20367d
erwerhghj 4 NULL NULL 2 NULL \x7b22757365725f6964223a20347d
fgbgfnyyy 7 NULL NULL 3 NULL \x7b22757365725f6964223a20377d
fsdfgerrg 2 NULL NULL 1 NULL \x7b22757365725f6964223a20327d
gtrgretrg 1 NULL NULL 1 NULL \x7b22757365725f6964223a20317d
kiku7ikkk 5 NULL NULL 2 NULL \x7b22757365725f6964223a20357d
sdfergtth 3 NULL NULL 1 NULL \x7b22757365725f6964223a20337d
werwerwwe 8 NULL NULL 4 NULL \x7b22757365725f6964223a20387d
yjtyjtyyy 9 NULL NULL 4 NULL \x7b22757365725f6964223a20397d
17 changes: 13 additions & 4 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,8 @@ message ColumnDesc {
// This field is used to represent the connector-spec additional column type.
// UNSPECIFIED or unset for normal column.

// deprecated, use AdditionalColumn instead
// AdditionalColumnType additional_column_type = 9;
reserved "additional_column_type";
reserved 9;
// deprecated, use AdditionalColumn instead, keep for compatibility with v1.6.x
AdditionalColumnType additional_column_type = 9;

ColumnDescVersion version = 10;

Expand Down Expand Up @@ -218,3 +216,14 @@ message AdditionalColumn {
AdditionalColumnHeaders headers = 7;
}
}

enum AdditionalColumnType {
ADDITIONAL_COLUMN_TYPE_UNSPECIFIED = 0;
ADDITIONAL_COLUMN_TYPE_KEY = 1;
ADDITIONAL_COLUMN_TYPE_TIMESTAMP = 2;
ADDITIONAL_COLUMN_TYPE_PARTITION = 3;
ADDITIONAL_COLUMN_TYPE_OFFSET = 4;
ADDITIONAL_COLUMN_TYPE_HEADER = 5;
ADDITIONAL_COLUMN_TYPE_FILENAME = 6;
ADDITIONAL_COLUMN_TYPE_NORMAL = 7;
}
2 changes: 2 additions & 0 deletions src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ impl ColumnDesc {
type_name: self.type_name.clone(),
generated_or_default_column: self.generated_or_default_column.clone(),
description: self.description.clone(),
additional_column_type: 0, // deprecated
additional_column: Some(self.additional_column.clone()),
version: self.version as i32,
}
Expand Down Expand Up @@ -305,6 +306,7 @@ impl From<&ColumnDesc> for PbColumnDesc {
type_name: c.type_name.clone(),
generated_or_default_column: c.generated_or_default_column.clone(),
description: c.description.clone(),
additional_column_type: 0, // deprecated
additional_column: c.additional_column.clone().into(),
version: c.version as i32,
}
Expand Down
1 change: 1 addition & 0 deletions src/common/src/catalog/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ impl ColumnDescTestExt for ColumnDesc {
field_descs: fields,
generated_or_default_column: None,
description: None,
additional_column_type: 0, // deprecated
additional_column: Some(AdditionalColumn { column_type: None }),
version: ColumnDescVersion::Pr13707 as i32,
}
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/parser/avro/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ fn avro_field_to_column_desc(
type_name: schema_name.to_string(),
generated_or_default_column: None,
description: None,
additional_column_type: 0, // deprecated
additional_column: Some(AdditionalColumn { column_type: None }),
version: ColumnDescVersion::Pr13707 as i32,
})
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/parser/protobuf/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ impl ProtobufParserConfig {
type_name: m.full_name().to_string(),
generated_or_default_column: None,
description: None,
additional_column_type: 0, // deprecated
additional_column: Some(AdditionalColumn { column_type: None }),
version: ColumnDescVersion::Pr13707 as i32,
})
Expand Down
31 changes: 22 additions & 9 deletions src/stream/src/from_proto/source/trad_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ use risingwave_connector::source::{
use risingwave_pb::data::data_type::TypeName as PbTypeName;
use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
use risingwave_pb::plan_common::{
AdditionalColumn, AdditionalColumnKey, AdditionalColumnTimestamp, ColumnDescVersion,
FormatType, PbEncodeType,
AdditionalColumn, AdditionalColumnKey, AdditionalColumnTimestamp,
AdditionalColumnType as LegacyAdditionalColumnType, ColumnDescVersion, FormatType,
PbEncodeType,
};
use risingwave_pb::stream_plan::SourceNode;
use risingwave_storage::panic_store::PanicStateStore;
Expand Down Expand Up @@ -75,16 +76,16 @@ impl ExecutorBuilder for SourceExecutorBuilder {
}

let mut source_columns = source.columns.clone();

{
// compatible code: introduced in https://github.com/risingwavelabs/risingwave/pull/13707
// for upsert and (avro | protobuf) overwrite the `_rw_key` column's ColumnDesc.additional_column_type to Key
if source_info.format() == FormatType::Upsert
&& (source_info.row_encode() == PbEncodeType::Avro
|| source_info.row_encode() == PbEncodeType::Protobuf)
|| source_info.row_encode() == PbEncodeType::Protobuf
|| source_info.row_encode() == PbEncodeType::Json)
{
let _ = source_columns.iter_mut().map(|c| {
let _ = c.column_desc.as_mut().map(|desc| {
for c in &mut source_columns {
if let Some(desc) = c.column_desc.as_mut() {
let is_bytea = desc
.get_column_type()
.map(|col_type| col_type.type_name == PbTypeName::Bytea as i32)
Expand All @@ -93,7 +94,7 @@ impl ExecutorBuilder for SourceExecutorBuilder {
&desc.version()
)
&& is_bytea
// the column is from a legacy version
// the column is from a legacy version (before v1.5.x)
&& desc.version == ColumnDescVersion::Unspecified as i32
{
desc.additional_column = Some(AdditionalColumn {
Expand All @@ -102,8 +103,20 @@ impl ExecutorBuilder for SourceExecutorBuilder {
)),
});
}
});
});

// the column is from a legacy version (v1.6.x)
// introduced in https://github.com/risingwavelabs/risingwave/pull/15226
if desc.additional_column_type
== LegacyAdditionalColumnType::Key as i32
{
desc.additional_column = Some(AdditionalColumn {
column_type: Some(AdditionalColumnType::Key(
AdditionalColumnKey {},
)),
});
}
}
}
}
}

Expand Down

0 comments on commit fcdeb3f

Please sign in to comment.