Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: handle upsert json in prev versions #15226

Merged
merged 12 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions backwards-compat-tests/scripts/utils.sh
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,21 @@ insert_json_kafka() {
local JSON=$1
echo "$JSON" | "$KAFKA_PATH"/bin/kafka-console-producer.sh \
--topic backwards_compat_test_kafka_source \
--bootstrap-server localhost:29092
--bootstrap-server localhost:29092 \
--property "parse.key=true" \
--property "key.separator=,"
}

seed_json_kafka() {
insert_json_kafka '{"timestamp": "2023-07-28 07:11:00", "user_id": 1, "page_id": 1, "action": "gtrgretrg"}'
insert_json_kafka '{"timestamp": "2023-07-28 07:11:00", "user_id": 2, "page_id": 1, "action": "fsdfgerrg"}'
insert_json_kafka '{"timestamp": "2023-07-28 07:11:00", "user_id": 3, "page_id": 1, "action": "sdfergtth"}'
insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 4, "page_id": 2, "action": "erwerhghj"}'
insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 5, "page_id": 2, "action": "kiku7ikkk"}'
insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 6, "page_id": 3, "action": "6786745ge"}'
insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 7, "page_id": 3, "action": "fgbgfnyyy"}'
insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 8, "page_id": 4, "action": "werwerwwe"}'
insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 9, "page_id": 4, "action": "yjtyjtyyy"}'
insert_json_kafka '{"user_id": 1},{"timestamp": "2023-07-28 07:11:00", "user_id": 1, "page_id": 1, "action": "gtrgretrg"}'
insert_json_kafka '{"user_id": 2},{"timestamp": "2023-07-28 07:11:00", "user_id": 2, "page_id": 1, "action": "fsdfgerrg"}'
insert_json_kafka '{"user_id": 3},{"timestamp": "2023-07-28 07:11:00", "user_id": 3, "page_id": 1, "action": "sdfergtth"}'
insert_json_kafka '{"user_id": 4},{"timestamp": "2023-07-28 06:54:00", "user_id": 4, "page_id": 2, "action": "erwerhghj"}'
insert_json_kafka '{"user_id": 5},{"timestamp": "2023-07-28 06:54:00", "user_id": 5, "page_id": 2, "action": "kiku7ikkk"}'
insert_json_kafka '{"user_id": 6},{"timestamp": "2023-07-28 06:54:00", "user_id": 6, "page_id": 3, "action": "6786745ge"}'
insert_json_kafka '{"user_id": 7},{"timestamp": "2023-07-28 06:54:00", "user_id": 7, "page_id": 3, "action": "fgbgfnyyy"}'
insert_json_kafka '{"user_id": 8},{"timestamp": "2023-07-28 06:54:00", "user_id": 8, "page_id": 4, "action": "werwerwwe"}'
insert_json_kafka '{"user_id": 9},{"timestamp": "2023-07-28 06:54:00", "user_id": 9, "page_id": 4, "action": "yjtyjtyyy"}'
}

# https://stackoverflow.com/a/4024263
Expand Down
19 changes: 19 additions & 0 deletions backwards-compat-tests/slt/kafka/seed.slt
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,22 @@ WITH (

statement ok
CREATE MATERIALIZED VIEW kafka_mv1 as SELECT * FROM kafka_source;

statement ok
CREATE TABLE IF NOT EXISTS kafka_table
(
action varchar,
user_id integer,
obj_id integer,
name varchar,
page_id integer,
age integer,
primary key (_rw_key)
)
INCLUDE key as _rw_key
WITH (
connector='kafka',
topic='backwards_compat_test_kafka_source',
properties.bootstrap.server='localhost:29092',
scan.startup.mode='earliest',
) FORMAT UPSERT ENCODE JSON;
13 changes: 13 additions & 0 deletions backwards-compat-tests/slt/kafka/validate_restart.slt
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,16 @@ werwerwwe 8 NULL NULL 4 NULL
yjtyjtyyy 9 NULL NULL 4 NULL
yjtyjtyyy 9 NULL NULL 4 NULL

# kafka_table should do the upsert and overwrite the existing records
query I rowsort
SELECT * FROM kafka_table;
----
6786745ge 6 NULL NULL 3 NULL
erwerhghj 4 NULL NULL 2 NULL
fgbgfnyyy 7 NULL NULL 3 NULL
fsdfgerrg 2 NULL NULL 1 NULL
gtrgretrg 1 NULL NULL 1 NULL
kiku7ikkk 5 NULL NULL 2 NULL
sdfergtth 3 NULL NULL 1 NULL
werwerwwe 8 NULL NULL 4 NULL
yjtyjtyyy 9 NULL NULL 4 NULL
17 changes: 13 additions & 4 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
COLUMN_DESC_VERSION_MAX = 2147483647;
}

message ColumnDesc {

Check failure on line 28 in proto/plan_common.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present reserved name "additional_column_type" on message "ColumnDesc" was deleted.

Check failure on line 28 in proto/plan_common.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present reserved range "[9]" on message "ColumnDesc" was deleted.
data.DataType column_type = 1;
int32 column_id = 2;
// we store the column name in column desc now just for debug, but in future
Expand Down Expand Up @@ -54,10 +54,8 @@
// This field is used to represent the connector-spec additional column type.
// UNSPECIFIED or unset for normal column.

// deprecated, use AdditionalColumn instead
// AdditionalColumnType additional_column_type = 9;
reserved "additional_column_type";
reserved 9;
// deprecated, use AdditionalColumn instead, keep for compatibility with v1.6.x
AdditionalColumnType additional_column_type = 9;

ColumnDescVersion version = 10;

Expand Down Expand Up @@ -218,3 +216,14 @@
AdditionalColumnHeaders headers = 7;
}
}

enum AdditionalColumnType {
ADDITIONAL_COLUMN_TYPE_UNSPECIFIED = 0;
ADDITIONAL_COLUMN_TYPE_KEY = 1;
ADDITIONAL_COLUMN_TYPE_TIMESTAMP = 2;
ADDITIONAL_COLUMN_TYPE_PARTITION = 3;
ADDITIONAL_COLUMN_TYPE_OFFSET = 4;
ADDITIONAL_COLUMN_TYPE_HEADER = 5;
ADDITIONAL_COLUMN_TYPE_FILENAME = 6;
ADDITIONAL_COLUMN_TYPE_NORMAL = 7;
}
2 changes: 2 additions & 0 deletions src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ impl ColumnDesc {
type_name: self.type_name.clone(),
generated_or_default_column: self.generated_or_default_column.clone(),
description: self.description.clone(),
additional_column_type: 0, // deprecated
additional_column: Some(self.additional_column.clone()),
version: self.version as i32,
}
Expand Down Expand Up @@ -305,6 +306,7 @@ impl From<&ColumnDesc> for PbColumnDesc {
type_name: c.type_name.clone(),
generated_or_default_column: c.generated_or_default_column.clone(),
description: c.description.clone(),
additional_column_type: 0, // deprecated
additional_column: c.additional_column.clone().into(),
version: c.version as i32,
}
Expand Down
1 change: 1 addition & 0 deletions src/common/src/catalog/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ impl ColumnDescTestExt for ColumnDesc {
field_descs: fields,
generated_or_default_column: None,
description: None,
additional_column_type: 0, // deprecated
additional_column: Some(AdditionalColumn { column_type: None }),
version: ColumnDescVersion::Pr13707 as i32,
}
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/parser/avro/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ fn avro_field_to_column_desc(
type_name: schema_name.to_string(),
generated_or_default_column: None,
description: None,
additional_column_type: 0, // deprecated
additional_column: Some(AdditionalColumn { column_type: None }),
version: ColumnDescVersion::Pr13707 as i32,
})
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/parser/protobuf/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ impl ProtobufParserConfig {
type_name: m.full_name().to_string(),
generated_or_default_column: None,
description: None,
additional_column_type: 0, // deprecated
additional_column: Some(AdditionalColumn { column_type: None }),
version: ColumnDescVersion::Pr13707 as i32,
})
Expand Down
30 changes: 21 additions & 9 deletions src/stream/src/from_proto/source/trad_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ use risingwave_connector::source::{
use risingwave_pb::data::data_type::TypeName as PbTypeName;
use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
use risingwave_pb::plan_common::{
AdditionalColumn, AdditionalColumnKey, AdditionalColumnTimestamp, ColumnDescVersion,
FormatType, PbEncodeType,
AdditionalColumn, AdditionalColumnKey, AdditionalColumnTimestamp,
AdditionalColumnType as LegacyAdditionalColumnType, ColumnDescVersion, FormatType,
PbEncodeType,
};
use risingwave_pb::stream_plan::SourceNode;
use risingwave_storage::panic_store::PanicStateStore;
Expand Down Expand Up @@ -75,16 +76,16 @@ impl ExecutorBuilder for SourceExecutorBuilder {
}

let mut source_columns = source.columns.clone();

{
// compatible code: introduced in https://github.com/risingwavelabs/risingwave/pull/13707
// for upsert and (avro | protobuf) overwrite the `_rw_key` column's ColumnDesc.additional_column_type to Key
if source_info.format() == FormatType::Upsert
&& (source_info.row_encode() == PbEncodeType::Avro
|| source_info.row_encode() == PbEncodeType::Protobuf)
|| source_info.row_encode() == PbEncodeType::Protobuf
|| source_info.row_encode() == PbEncodeType::Json)
{
let _ = source_columns.iter_mut().map(|c| {
let _ = c.column_desc.as_mut().map(|desc| {
for c in &mut source_columns {
if let Some(desc) = c.column_desc.as_mut() {
let is_bytea = desc
.get_column_type()
.map(|col_type| col_type.type_name == PbTypeName::Bytea as i32)
Expand All @@ -93,7 +94,7 @@ impl ExecutorBuilder for SourceExecutorBuilder {
&desc.version()
)
&& is_bytea
// the column is from a legacy version
// the column is from a legacy version (before v1.5.x)
&& desc.version == ColumnDescVersion::Unspecified as i32
{
desc.additional_column = Some(AdditionalColumn {
Expand All @@ -102,8 +103,19 @@ impl ExecutorBuilder for SourceExecutorBuilder {
)),
});
}
});
});

// the column is from a legacy version (v1.6.x)
tabVersion marked this conversation as resolved.
Show resolved Hide resolved
if desc.additional_column_type
== LegacyAdditionalColumnType::Key as i32
{
desc.additional_column = Some(AdditionalColumn {
column_type: Some(AdditionalColumnType::Key(
AdditionalColumnKey {},
)),
});
}
}
}
}
}

Expand Down
Loading