Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: only ingest key-ed value in additional header column #14628

Merged
merged 31 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
4aae804
frontend
tabVersion Jan 17, 2024
c62e98f
stash
tabVersion Jan 18, 2024
c5d4197
fix prost
tabVersion Jan 18, 2024
7cfbb15
fix compile
tabVersion Jan 18, 2024
89dc44f
feat: Refactor `do_action` method and extract headers from Kafka mess…
tabVersion Jan 19, 2024
205b89a
fix
tabVersion Jan 19, 2024
0d07a66
more test
tabVersion Jan 19, 2024
d7c94ef
Merge branch 'main' into tab/header-col
tabVersion Jan 19, 2024
be58ada
format
tabVersion Jan 19, 2024
735dbba
format
tabVersion Jan 19, 2024
054cf0e
Merge branch 'main' into tab/header-col
tabVersion Jan 21, 2024
626be23
fix
tabVersion Jan 22, 2024
650f831
rename `additional_column_type` to `additional_columns`
tabVersion Jan 22, 2024
01f2116
fix
tabVersion Jan 22, 2024
2c0dbd0
fix
tabVersion Jan 22, 2024
9540879
rerun
tabVersion Jan 22, 2024
8b715b4
separate header inner and headers
tabVersion Jan 22, 2024
4c3b96a
Merge branch 'main' into tab/header-col
tabVersion Jan 22, 2024
b7310ff
fix comments
tabVersion Jan 23, 2024
8cba0b9
add header col type hint
tabVersion Jan 23, 2024
f00632a
handle col name
tabVersion Jan 23, 2024
c296e8d
add test case in e2e
tabVersion Jan 23, 2024
f9f4df5
rerun
tabVersion Jan 23, 2024
dc2f42e
Merge branch 'main' into tab/header-col
tabVersion Jan 23, 2024
37570d1
handle non exist header key
tabVersion Jan 24, 2024
8528acb
fix
tabVersion Jan 24, 2024
bea93b9
refactor
tabVersion Jan 24, 2024
372988d
remove additional_column_normal
tabVersion Jan 24, 2024
6653119
resolve comments
tabVersion Jan 24, 2024
d0053d5
fix misc
tabVersion Jan 24, 2024
9c1d2f8
fix ut
tabVersion Jan 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions e2e_test/source/basic/inlcude_key_as.slt
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,28 @@ WITH (
topic = 'kafka_additional_columns')
FORMAT PLAIN ENCODE JSON

statement ok
create table additional_columns_1 (a int)
include key as key_col
include partition as partition_col
include offset as offset_col
include timestamp as timestamp_col
include header 'header1' as header_col
WITH (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'kafka_additional_columns')
FORMAT PLAIN ENCODE JSON

statement ok
select * from upsert_students_default_key;

statement ok
select * from additional_columns;

statement ok
select * from additional_columns_1;

# Wait enough time to ensure SourceExecutor consumes all Kafka data.
sleep 3s

Expand Down Expand Up @@ -98,8 +114,16 @@ FROM additional_columns limit 1;
----
header1 \x7631

query T
select header_col from additional_columns_1 limit 1
----
\x7631

statement ok
drop table upsert_students_default_key

statement ok
drop table additional_columns

statement ok
drop table additional_columns_1
46 changes: 34 additions & 12 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,6 @@ message Field {
string name = 2;
}

enum AdditionalColumnType {
UNSPECIFIED = 0;
KEY = 1;
TIMESTAMP = 2;
PARTITION = 3;
OFFSET = 4;
HEADER = 5;
FILENAME = 6;
NORMAL = 7;
}

enum ColumnDescVersion {
COLUMN_DESC_VERSION_UNSPECIFIED = 0;
// Introduced in https://github.com/risingwavelabs/risingwave/pull/13707#discussion_r1429947537,
Expand Down Expand Up @@ -64,9 +53,14 @@ message ColumnDesc {

// This field is used to represent the connector-spec additional column type.
// UNSPECIFIED or unset for normal column.
AdditionalColumnType additional_column_type = 9;

// deprecated, use AdditionalColumn instead
// AdditionalColumnType additional_column_type = 9;
tabVersion marked this conversation as resolved.
Show resolved Hide resolved
reserved 9;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

additional_column_type is included in 1.6, but not documented.

#14215 (comment)

Why do we need to deprecate this field here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, for source created in 1.6, it will have AdditionalColumnType::NORMAL. So we cannot change the type for field 9

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we told any poc user to try include before? We might need to tell them to rebuild the sources later. Or maybe we just document this breaking change in the release note.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the code is in v1.6.0 but the feature but not considered as released. It is ok to ignore the non-normal columns.
And I don't want to make breaking changes to normal columns here so I choose to use a new field.

I want to make things flexible when handling additional columns. Just like this change, the prev enum is not sufficient with handling an extra inner field arg. I don't know what comes next, so I make all columns a message instead of an enum.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The breaking change looks acceptable to me, although it seems not hard to make it backward compatible.


ColumnDescVersion version = 10;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add a new ColumnDescVersion? TBH I'm not sure about why it's added, and it doesn't seem to be needed here. Ask just in case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the field is introduced in #13707 to deal with DEFAULT_KEY_COLUMN_NAME change in future.
discussions are available here #13707 (comment)


AdditionalColumn additional_column_type = 11;
tabVersion marked this conversation as resolved.
Show resolved Hide resolved
}

message ColumnCatalog {
Expand Down Expand Up @@ -190,3 +184,31 @@ message Cardinality {
message ExprContext {
string time_zone = 1;
}

message AdditionalColumnNormal {}
tabVersion marked this conversation as resolved.
Show resolved Hide resolved

message AdditionalColumnKey {}

message AdditionalColumnTimestamp {}

message AdditionalColumnPartition {}

message AdditionalColumnOffset {}

message AdditionalColumnFilename {}

message AdditionalColumnHeader {
optional string inner_field = 1;
}
tabVersion marked this conversation as resolved.
Show resolved Hide resolved

message AdditionalColumn {
oneof column_type {
AdditionalColumnNormal normal = 1;
AdditionalColumnKey key = 2;
AdditionalColumnTimestamp timestamp = 3;
AdditionalColumnPartition partition = 4;
AdditionalColumnOffset offset = 5;
AdditionalColumnHeader header = 6;
AdditionalColumnFilename filename = 7;
}
}
38 changes: 27 additions & 11 deletions src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ use std::borrow::Cow;

use itertools::Itertools;
use risingwave_pb::expr::ExprNode;
use risingwave_pb::plan_common::additional_column::ColumnType;
use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
use risingwave_pb::plan_common::{
AdditionalColumnType, ColumnDescVersion, PbColumnCatalog, PbColumnDesc,
AdditionalColumn, AdditionalColumnNormal, ColumnDescVersion, PbColumnCatalog, PbColumnDesc,
};

use super::row_id_column_desc;
Expand Down Expand Up @@ -103,7 +104,7 @@ pub struct ColumnDesc {
pub type_name: String,
pub generated_or_default_column: Option<GeneratedOrDefaultColumn>,
pub description: Option<String>,
pub additional_column_type: AdditionalColumnType,
pub additional_column_type: AdditionalColumn,
pub version: ColumnDescVersion,
}

Expand All @@ -117,7 +118,9 @@ impl ColumnDesc {
type_name: String::new(),
generated_or_default_column: None,
description: None,
additional_column_type: AdditionalColumnType::Normal,
additional_column_type: AdditionalColumn {
column_type: Some(ColumnType::Normal(AdditionalColumnNormal {})),
},
version: ColumnDescVersion::Pr13707,
}
}
Expand All @@ -131,7 +134,9 @@ impl ColumnDesc {
type_name: String::new(),
generated_or_default_column: None,
description: None,
additional_column_type: AdditionalColumnType::Normal,
additional_column_type: AdditionalColumn {
column_type: Some(ColumnType::Normal(AdditionalColumnNormal {})),
},
version: ColumnDescVersion::Pr13707,
}
}
Expand All @@ -140,7 +145,7 @@ impl ColumnDesc {
name: impl Into<String>,
column_id: ColumnId,
data_type: DataType,
additional_column_type: AdditionalColumnType,
additional_column_type: AdditionalColumn,
) -> ColumnDesc {
ColumnDesc {
data_type,
Expand Down Expand Up @@ -170,7 +175,7 @@ impl ColumnDesc {
type_name: self.type_name.clone(),
generated_or_default_column: self.generated_or_default_column.clone(),
description: self.description.clone(),
additional_column_type: self.additional_column_type as i32,
additional_column_type: Some(self.additional_column_type.clone()),
version: self.version as i32,
}
}
Expand Down Expand Up @@ -198,7 +203,9 @@ impl ColumnDesc {
type_name: "".to_string(),
generated_or_default_column: None,
description: None,
additional_column_type: AdditionalColumnType::Normal,
additional_column_type: AdditionalColumn {
column_type: Some(ColumnType::Normal(AdditionalColumnNormal {})),
},
version: ColumnDescVersion::Pr13707,
}
}
Expand All @@ -221,7 +228,9 @@ impl ColumnDesc {
type_name: type_name.to_string(),
generated_or_default_column: None,
description: None,
additional_column_type: AdditionalColumnType::Normal,
additional_column_type: AdditionalColumn {
column_type: Some(ColumnType::Normal(AdditionalColumnNormal {})),
},
version: ColumnDescVersion::Pr13707,
}
}
Expand All @@ -239,7 +248,9 @@ impl ColumnDesc {
type_name: field.type_name.clone(),
description: None,
generated_or_default_column: None,
additional_column_type: AdditionalColumnType::Normal,
additional_column_type: AdditionalColumn {
column_type: Some(ColumnType::Normal(AdditionalColumnNormal {})),
},
version: ColumnDescVersion::Pr13707,
}
}
Expand All @@ -265,7 +276,12 @@ impl ColumnDesc {

impl From<PbColumnDesc> for ColumnDesc {
fn from(prost: PbColumnDesc) -> Self {
let additional_column_type = prost.additional_column_type();
let additional_column_type = prost
.get_additional_column_type()
.unwrap_or(&AdditionalColumn {
column_type: Some(ColumnType::Normal(AdditionalColumnNormal {})),
})
.clone();
let version = prost.version();
let field_descs: Vec<ColumnDesc> = prost
.field_descs
Expand Down Expand Up @@ -302,7 +318,7 @@ impl From<&ColumnDesc> for PbColumnDesc {
type_name: c.type_name.clone(),
generated_or_default_column: c.generated_or_default_column.clone(),
description: c.description.clone(),
additional_column_type: c.additional_column_type as i32,
additional_column_type: c.additional_column_type.clone().into(),
version: c.version as i32,
}
}
Expand Down
13 changes: 10 additions & 3 deletions src/common/src/catalog/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
use itertools::Itertools;
use risingwave_pb::data::data_type::TypeName;
use risingwave_pb::data::DataType;
use risingwave_pb::plan_common::{AdditionalColumnType, ColumnDesc, ColumnDescVersion};
use risingwave_pb::plan_common::additional_column::ColumnType;
use risingwave_pb::plan_common::{
AdditionalColumn, AdditionalColumnNormal, ColumnDesc, ColumnDescVersion,
};

pub trait ColumnDescTestExt {
/// Create a [`ColumnDesc`] with the given name and type.
Expand All @@ -35,7 +38,9 @@ impl ColumnDescTestExt for ColumnDesc {
column_type: Some(data_type),
column_id,
name: name.to_string(),
additional_column_type: AdditionalColumnType::Normal as i32,
additional_column_type: Some(AdditionalColumn {
column_type: Some(ColumnType::Normal(AdditionalColumnNormal {})),
}),
version: ColumnDescVersion::Pr13707 as i32,
..Default::default()
}
Expand All @@ -60,7 +65,9 @@ impl ColumnDescTestExt for ColumnDesc {
field_descs: fields,
generated_or_default_column: None,
description: None,
additional_column_type: AdditionalColumnType::Normal as i32,
additional_column_type: Some(AdditionalColumn {
column_type: Some(ColumnType::Normal(AdditionalColumnNormal {})),
}),
version: ColumnDescVersion::Pr13707 as i32,
}
}
Expand Down
Loading
Loading