diff --git a/src/connector/src/parser/unified/debezium.rs b/src/connector/src/parser/unified/debezium.rs index 067324ec01980..5a54a550e816d 100644 --- a/src/connector/src/parser/unified/debezium.rs +++ b/src/connector/src/parser/unified/debezium.rs @@ -257,10 +257,11 @@ where &bson_doc.take(), )?) } else { - unreachable!( - "access result must match the type_expected. path: {:?}, payload: {:?}, type_expected: {:?}", - path, payload, type_expected - ) + // fail to extract the "_id" field from the message payload + Err(AccessError::Undefined { + name: "_id".to_string(), + path: path[0].to_string(), + }) } } ["after" | "before", "payload"] => self.access(&[path[0]], Some(&DataType::Jsonb)), @@ -277,10 +278,11 @@ where &bson_doc.take(), )?) } else { - unreachable!( - "access result must match the type_expected. path: [\"id\"], id: {:?}, type_expected: {:?}", - id_bson, type_expected - ) + // fail to extract the "_id" field from the message key + Err(AccessError::Undefined { + name: "_id".to_string(), + path: "id".to_string(), + }) } } else { ret diff --git a/src/connector/src/source/cdc/split.rs b/src/connector/src/source/cdc/split.rs index 27a39946c015b..0fa639ebbaebe 100644 --- a/src/connector/src/source/cdc/split.rs +++ b/src/connector/src/source/cdc/split.rs @@ -196,14 +196,13 @@ impl CdcSplitTrait for MongoDbCdcSplit { let mut snapshot_done = self.inner.snapshot_done; // extract snapshot state from debezium offset if !snapshot_done { - let dbz_offset: DebeziumOffset = serde_json::from_str(&start_offset).map_err(|e| { - anyhow!( - "invalid mongodb offset: {}, error: {}, split: {}", - start_offset, - e, - self.inner.split_id - ) - })?; + let dbz_offset: DebeziumOffset = + serde_json::from_str(&start_offset).with_context(|| { + format!( + "invalid mongodb offset: {}, split: {}", + start_offset, self.inner.split_id + ) + })?; // heartbeat event should not update the `snapshot_done` flag if !dbz_offset.is_heartbeat { @@ -235,7 +234,7 @@ pub struct DebeziumCdcSplit { pub _phantom: PhantomData, } -macro_rules! dispatch_cdc_split { +macro_rules! dispatch_cdc_split_inner { ($dbz_split:expr, $as_type:tt, {$($cdc_source_type:tt),*}, $body:expr) => { match T::source_type() { $( @@ -255,6 +254,13 @@ macro_rules! dispatch_cdc_split { } } +// call corresponding split method of the specific cdc source type +macro_rules! dispatch_cdc_split { + ($dbz_split:expr, $as_type:tt, $body:expr) => { + dispatch_cdc_split_inner!($dbz_split, $as_type, {Mysql, Postgres, Citus, Mongodb}, $body) + } +} + impl SplitMetaData for DebeziumCdcSplit { fn id(&self) -> SplitId { format!("{}", self.split_id()).into() @@ -307,39 +313,19 @@ impl DebeziumCdcSplit { } pub fn split_id(&self) -> u32 { - dispatch_cdc_split!(self, ref, { - Mysql, - Postgres, - Citus, - Mongodb - }, split_id()) + dispatch_cdc_split!(self, ref, split_id()) } pub fn start_offset(&self) -> &Option { - dispatch_cdc_split!(self, ref, { - Mysql, - Postgres, - Citus, - Mongodb - }, start_offset()) + dispatch_cdc_split!(self, ref, start_offset()) } pub fn snapshot_done(&self) -> bool { - dispatch_cdc_split!(self, ref, { - Mysql, - Postgres, - Citus, - Mongodb - }, is_snapshot_done()) + dispatch_cdc_split!(self, ref, is_snapshot_done()) } pub fn update_with_offset(&mut self, start_offset: String) -> anyhow::Result<()> { - dispatch_cdc_split!(self, mut, { - Mysql, - Postgres, - Citus, - Mongodb - }, update_with_offset(start_offset)?); + dispatch_cdc_split!(self, mut, update_with_offset(start_offset)?); Ok(()) } }