Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW committed Feb 18, 2024
1 parent 45398f9 commit 5c7420f
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 41 deletions.
18 changes: 10 additions & 8 deletions src/connector/src/parser/unified/debezium.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,11 @@ where
&bson_doc.take(),
)?)
} else {
unreachable!(
"access result must match the type_expected. path: {:?}, payload: {:?}, type_expected: {:?}",
path, payload, type_expected
)
// fail to extract the "_id" field from the message payload
Err(AccessError::Undefined {
name: "_id".to_string(),
path: path[0].to_string(),
})
}
}
["after" | "before", "payload"] => self.access(&[path[0]], Some(&DataType::Jsonb)),
Expand All @@ -277,10 +278,11 @@ where
&bson_doc.take(),
)?)
} else {
unreachable!(
"access result must match the type_expected. path: [\"id\"], id: {:?}, type_expected: {:?}",
id_bson, type_expected
)
// fail to extract the "_id" field from the message key
Err(AccessError::Undefined {
name: "_id".to_string(),
path: "id".to_string(),
})
}
} else {
ret
Expand Down
52 changes: 19 additions & 33 deletions src/connector/src/source/cdc/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,13 @@ impl CdcSplitTrait for MongoDbCdcSplit {
let mut snapshot_done = self.inner.snapshot_done;
// extract snapshot state from debezium offset
if !snapshot_done {
let dbz_offset: DebeziumOffset = serde_json::from_str(&start_offset).map_err(|e| {
anyhow!(
"invalid mongodb offset: {}, error: {}, split: {}",
start_offset,
e,
self.inner.split_id
)
})?;
let dbz_offset: DebeziumOffset =
serde_json::from_str(&start_offset).with_context(|| {
format!(
"invalid mongodb offset: {}, split: {}",
start_offset, self.inner.split_id
)
})?;

// heartbeat event should not update the `snapshot_done` flag
if !dbz_offset.is_heartbeat {
Expand Down Expand Up @@ -235,7 +234,7 @@ pub struct DebeziumCdcSplit<T: CdcSourceTypeTrait> {
pub _phantom: PhantomData<T>,
}

macro_rules! dispatch_cdc_split {
macro_rules! dispatch_cdc_split_inner {
($dbz_split:expr, $as_type:tt, {$($cdc_source_type:tt),*}, $body:expr) => {
match T::source_type() {
$(
Expand All @@ -255,6 +254,13 @@ macro_rules! dispatch_cdc_split {
}
}

// call corresponding split method of the specific cdc source type
macro_rules! dispatch_cdc_split {
($dbz_split:expr, $as_type:tt, $body:expr) => {
dispatch_cdc_split_inner!($dbz_split, $as_type, {Mysql, Postgres, Citus, Mongodb}, $body)
}
}

impl<T: CdcSourceTypeTrait> SplitMetaData for DebeziumCdcSplit<T> {
fn id(&self) -> SplitId {
format!("{}", self.split_id()).into()
Expand Down Expand Up @@ -307,39 +313,19 @@ impl<T: CdcSourceTypeTrait> DebeziumCdcSplit<T> {
}

pub fn split_id(&self) -> u32 {
dispatch_cdc_split!(self, ref, {
Mysql,
Postgres,
Citus,
Mongodb
}, split_id())
dispatch_cdc_split!(self, ref, split_id())
}

pub fn start_offset(&self) -> &Option<String> {
dispatch_cdc_split!(self, ref, {
Mysql,
Postgres,
Citus,
Mongodb
}, start_offset())
dispatch_cdc_split!(self, ref, start_offset())
}

pub fn snapshot_done(&self) -> bool {
dispatch_cdc_split!(self, ref, {
Mysql,
Postgres,
Citus,
Mongodb
}, is_snapshot_done())
dispatch_cdc_split!(self, ref, is_snapshot_done())
}

pub fn update_with_offset(&mut self, start_offset: String) -> anyhow::Result<()> {
dispatch_cdc_split!(self, mut, {
Mysql,
Postgres,
Citus,
Mongodb
}, update_with_offset(start_offset)?);
dispatch_cdc_split!(self, mut, update_with_offset(start_offset)?);
Ok(())
}
}

0 comments on commit 5c7420f

Please sign in to comment.