Skip to content

Commit

Permalink
resolve comments
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Feb 1, 2024
1 parent a0c9f1c commit 695d5f9
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 12 deletions.
6 changes: 3 additions & 3 deletions src/connector/src/parser/debezium/mongo_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ impl DebeziumMongoJsonParser {
)
})
.ok_or_else(|| RwError::from(ProtocolError(
"Debezuim Mongo needs a `_id` column with supported types (Varchar Jsonb int32 int64) in table".into(),
"Debezium Mongo needs a `_id` column with supported types (Varchar Jsonb int32 int64) in table".into(),
)))?.clone();
let payload_column = rw_columns
.iter()
.find(|desc| desc.name == "payload" && matches!(desc.data_type, DataType::Jsonb))
.ok_or_else(|| {
RwError::from(ProtocolError(
"Debezuim Mongo needs a `payload` column with supported types Jsonb in table"
"Debezium Mongo needs a `payload` column with supported types Jsonb in table"
.into(),
))
})?
Expand All @@ -66,7 +66,7 @@ impl DebeziumMongoJsonParser {
// _rw_{connector}_file/partition & _rw_{connector}_offset are created automatically.
if rw_columns.iter().filter(|desc| desc.is_visible()).count() != 2 {
return Err(RwError::from(ProtocolError(
"Debezuim Mongo needs no more columns except `_id` and `payload` in table".into(),
"Debezium Mongo needs no more columns except `_id` and `payload` in table".into(),
)));
}

Expand Down
7 changes: 2 additions & 5 deletions src/connector/src/parser/protobuf/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,11 +458,8 @@ pub fn from_protobuf_value(
}
}
Value::List(values) => {
let data_type = protobuf_type_mapping(field_desc, &mut vec![]).map_err(|e| {
AccessError::Uncategorized {
message: e.to_report_string(),
}
})?;
let data_type = protobuf_type_mapping(field_desc, &mut vec![])
.map_err(|e| uncategorized!("{}", e.to_report_string()))?;
let mut builder = data_type.as_list().create_array_builder(values.len());
for value in values {
builder.append(from_protobuf_value(field_desc, value, descriptor_pool)?);
Expand Down
8 changes: 4 additions & 4 deletions src/connector/src/parser/unified/debezium.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use risingwave_common::types::{DataType, Datum, ScalarImpl};

use super::{Access, AccessError, AccessResult, ChangeEvent, ChangeEventOperation};
use crate::parser::unified::uncategorized;
use crate::parser::TransactionControl;
use crate::source::{ConnectorProperties, SourceColumnDesc};

Expand Down Expand Up @@ -163,10 +164,9 @@ pub struct MongoProjection<A> {
}

pub fn extract_bson_id(id_type: &DataType, bson_doc: &serde_json::Value) -> AccessResult {
let id_field = bson_doc.get("_id").ok_or_else(|| AccessError::Undefined {
name: "_id".to_owned(),
path: "Debezuim Mongo".to_owned(),
})?;
let id_field = bson_doc
.get("_id")
.ok_or_else(|| uncategorized!("Debezium Mongo requires document has a `_id` field"))?;

let type_error = || AccessError::TypeError {
expected: id_type.to_string(),
Expand Down

0 comments on commit 695d5f9

Please sign in to comment.