From 29a801d59c1280377fded65db3acd5cf21b90765 Mon Sep 17 00:00:00 2001 From: Rossil <40714231+Rossil2012@users.noreply.github.com> Date: Tue, 26 Dec 2023 11:05:21 +0800 Subject: [PATCH 1/5] log --- src/connector/src/parser/json_parser.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/connector/src/parser/json_parser.rs b/src/connector/src/parser/json_parser.rs index 481c9b570acbe..168c4406b5af9 100644 --- a/src/connector/src/parser/json_parser.rs +++ b/src/connector/src/parser/json_parser.rs @@ -50,6 +50,7 @@ impl AccessBuilder for JsonAccessBuilder { } else { self.value = Some(payload); } + tracing::warn!("{:#?}", self.value); let value = simd_json::to_borrowed_value( &mut self.value.as_mut().unwrap()[self.payload_start_idx..], ) From 73de808278c42be1c726a97504504cf17ca7bbeb Mon Sep 17 00:00:00 2001 From: Rossil <40714231+Rossil2012@users.noreply.github.com> Date: Tue, 26 Dec 2023 12:32:07 +0800 Subject: [PATCH 2/5] try fix --- src/connector/src/parser/json_parser.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/connector/src/parser/json_parser.rs b/src/connector/src/parser/json_parser.rs index 168c4406b5af9..28bed4af3574b 100644 --- a/src/connector/src/parser/json_parser.rs +++ b/src/connector/src/parser/json_parser.rs @@ -45,12 +45,7 @@ pub struct JsonAccessBuilder { impl AccessBuilder for JsonAccessBuilder { #[allow(clippy::unused_async)] async fn generate_accessor(&mut self, payload: Vec) -> Result> { - if payload.is_empty() { - self.value = Some("{}".into()); - } else { - self.value = Some(payload); - } - tracing::warn!("{:#?}", self.value); + self.value = Some(payload); let value = simd_json::to_borrowed_value( &mut self.value.as_mut().unwrap()[self.payload_start_idx..], ) From 3370c0e247ed9713d3a775029e732e0901d0ef06 Mon Sep 17 00:00:00 2001 From: Rossil <40714231+Rossil2012@users.noreply.github.com> Date: Tue, 26 Dec 2023 14:03:52 +0800 Subject: [PATCH 3/5] Revert "try fix" This reverts commit 73de808278c42be1c726a97504504cf17ca7bbeb. --- src/connector/src/parser/json_parser.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/connector/src/parser/json_parser.rs b/src/connector/src/parser/json_parser.rs index 28bed4af3574b..168c4406b5af9 100644 --- a/src/connector/src/parser/json_parser.rs +++ b/src/connector/src/parser/json_parser.rs @@ -45,7 +45,12 @@ pub struct JsonAccessBuilder { impl AccessBuilder for JsonAccessBuilder { #[allow(clippy::unused_async)] async fn generate_accessor(&mut self, payload: Vec) -> Result> { - self.value = Some(payload); + if payload.is_empty() { + self.value = Some("{}".into()); + } else { + self.value = Some(payload); + } + tracing::warn!("{:#?}", self.value); let value = simd_json::to_borrowed_value( &mut self.value.as_mut().unwrap()[self.payload_start_idx..], ) From 1f24386c171cce30635c3d8594b1e71d6b5123dd Mon Sep 17 00:00:00 2001 From: Rossil <40714231+Rossil2012@users.noreply.github.com> Date: Tue, 26 Dec 2023 14:04:12 +0800 Subject: [PATCH 4/5] Revert "log" This reverts commit 29a801d59c1280377fded65db3acd5cf21b90765. --- src/connector/src/parser/json_parser.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/connector/src/parser/json_parser.rs b/src/connector/src/parser/json_parser.rs index 168c4406b5af9..481c9b570acbe 100644 --- a/src/connector/src/parser/json_parser.rs +++ b/src/connector/src/parser/json_parser.rs @@ -50,7 +50,6 @@ impl AccessBuilder for JsonAccessBuilder { } else { self.value = Some(payload); } - tracing::warn!("{:#?}", self.value); let value = simd_json::to_borrowed_value( &mut self.value.as_mut().unwrap()[self.payload_start_idx..], ) From 4ef16dc4394ca4c9d6dd421c437d778e3ecc2884 Mon Sep 17 00:00:00 2001 From: Rossil <40714231+Rossil2012@users.noreply.github.com> Date: Tue, 26 Dec 2023 17:37:32 +0800 Subject: [PATCH 5/5] fix --- src/connector/src/parser/mod.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 448c98ec571ae..5c31408317f97 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -794,6 +794,9 @@ impl ByteStreamSourceParserImpl { (ProtocolProperties::Plain, EncodingProperties::Csv(config)) => { CsvParser::new(rw_columns, *config, source_ctx).map(Self::Csv) } + (ProtocolProperties::Plain, EncodingProperties::Json(_config)) => { + JsonParser::new(parser_config.specific, rw_columns, source_ctx).map(Self::Json) + } (ProtocolProperties::DebeziumMongo, EncodingProperties::Json(_)) => { DebeziumMongoJsonParser::new(rw_columns, source_ctx).map(Self::DebeziumMongoJson) }