From 695d5f9102e19136e644352eb65adf21398314c4 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 1 Feb 2024 14:25:30 +0800 Subject: [PATCH] resolve comments Signed-off-by: Bugen Zhao --- src/connector/src/parser/debezium/mongo_json_parser.rs | 6 +++--- src/connector/src/parser/protobuf/parser.rs | 7 ++----- src/connector/src/parser/unified/debezium.rs | 8 ++++---- 3 files changed, 9 insertions(+), 12 deletions(-) diff --git a/src/connector/src/parser/debezium/mongo_json_parser.rs b/src/connector/src/parser/debezium/mongo_json_parser.rs index 899068a4cccb5..21129a816e0a3 100644 --- a/src/connector/src/parser/debezium/mongo_json_parser.rs +++ b/src/connector/src/parser/debezium/mongo_json_parser.rs @@ -50,14 +50,14 @@ impl DebeziumMongoJsonParser { ) }) .ok_or_else(|| RwError::from(ProtocolError( - "Debezuim Mongo needs a `_id` column with supported types (Varchar Jsonb int32 int64) in table".into(), + "Debezium Mongo needs a `_id` column with supported types (Varchar Jsonb int32 int64) in table".into(), )))?.clone(); let payload_column = rw_columns .iter() .find(|desc| desc.name == "payload" && matches!(desc.data_type, DataType::Jsonb)) .ok_or_else(|| { RwError::from(ProtocolError( - "Debezuim Mongo needs a `payload` column with supported types Jsonb in table" + "Debezium Mongo needs a `payload` column with supported types Jsonb in table" .into(), )) })? @@ -66,7 +66,7 @@ impl DebeziumMongoJsonParser { // _rw_{connector}_file/partition & _rw_{connector}_offset are created automatically. if rw_columns.iter().filter(|desc| desc.is_visible()).count() != 2 { return Err(RwError::from(ProtocolError( - "Debezuim Mongo needs no more columns except `_id` and `payload` in table".into(), + "Debezium Mongo needs no more columns except `_id` and `payload` in table".into(), ))); } diff --git a/src/connector/src/parser/protobuf/parser.rs b/src/connector/src/parser/protobuf/parser.rs index c0b4f86d1e045..13da5c5b86b2d 100644 --- a/src/connector/src/parser/protobuf/parser.rs +++ b/src/connector/src/parser/protobuf/parser.rs @@ -458,11 +458,8 @@ pub fn from_protobuf_value( } } Value::List(values) => { - let data_type = protobuf_type_mapping(field_desc, &mut vec![]).map_err(|e| { - AccessError::Uncategorized { - message: e.to_report_string(), - } - })?; + let data_type = protobuf_type_mapping(field_desc, &mut vec![]) + .map_err(|e| uncategorized!("{}", e.to_report_string()))?; let mut builder = data_type.as_list().create_array_builder(values.len()); for value in values { builder.append(from_protobuf_value(field_desc, value, descriptor_pool)?); diff --git a/src/connector/src/parser/unified/debezium.rs b/src/connector/src/parser/unified/debezium.rs index 98147db00078a..86d7e3d4e5e61 100644 --- a/src/connector/src/parser/unified/debezium.rs +++ b/src/connector/src/parser/unified/debezium.rs @@ -15,6 +15,7 @@ use risingwave_common::types::{DataType, Datum, ScalarImpl}; use super::{Access, AccessError, AccessResult, ChangeEvent, ChangeEventOperation}; +use crate::parser::unified::uncategorized; use crate::parser::TransactionControl; use crate::source::{ConnectorProperties, SourceColumnDesc}; @@ -163,10 +164,9 @@ pub struct MongoProjection { } pub fn extract_bson_id(id_type: &DataType, bson_doc: &serde_json::Value) -> AccessResult { - let id_field = bson_doc.get("_id").ok_or_else(|| AccessError::Undefined { - name: "_id".to_owned(), - path: "Debezuim Mongo".to_owned(), - })?; + let id_field = bson_doc + .get("_id") + .ok_or_else(|| uncategorized!("Debezium Mongo requires document has a `_id` field"))?; let type_error = || AccessError::TypeError { expected: id_type.to_string(),