Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: tabVersion <[email protected]>
  • Loading branch information
tabVersion committed Sep 10, 2024
1 parent 19325fe commit b5961b4
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 11 deletions.
2 changes: 1 addition & 1 deletion e2e_test/source/basic/kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ FORMAT DEBEZIUM ENCODE JSON (
ignore_key = 'true'
)

statement error Payload column is only allowed when row encode is JSON, but got Bytes
statement error INCLUDE payload is only allowed when using ENCODE JSON, but got ENCODE Bytes
CREATE TABLE test_include_payload (a bytea)
INCLUDE payload
WITH (
Expand Down
11 changes: 1 addition & 10 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,16 +488,7 @@ impl SourceStreamChunkRowWriter<'_> {
.map(|ele| ScalarRefImpl::Utf8(ele.split_id)),
));
}
(_, &Some(AdditionalColumnType::Payload(_))) => {
// Get the whole payload as a single column
// The frontend check guarantees that row encode must be `SourceEncode::Json`
// here fake a column named "." to represent the whole payload
// see the json accessor hack in `impl Access for JsonAccess<'_>`
let mut desc_mock = desc.clone();
desc_mock.name = ".".to_string();
desc_mock.additional_column.column_type = None;
parse_field(&desc_mock)
}
(_, &Some(AdditionalColumnType::Payload(_))) => parse_field(desc),
(_, _) => {
// For normal columns, call the user provided closure.
parse_field(desc)
Expand Down
3 changes: 3 additions & 0 deletions src/connector/src/parser/unified/kv_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ where
pub fn access_field(&self, desc: &SourceColumnDesc) -> AccessResult<DatumCow<'_>> {
match desc.additional_column.column_type {
Some(AdditionalColumnType::Key(_)) => self.access_key(&[&desc.name], &desc.data_type),
// hack here: Get the whole payload as a single column
// use a special mark "." as column name to represent the whole payload
Some(AdditionalColumnType::Payload(_)) => self.access_value(&[&"."], &desc.data_type),
None => self.access_value(&[&desc.name], &desc.data_type),
_ => unreachable!(),
}
Expand Down

0 comments on commit b5961b4

Please sign in to comment.