Skip to content

Commit

Permalink
fix debezium
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Jul 2, 2024
1 parent 2710f11 commit eb2fb5a
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 2 deletions.
3 changes: 2 additions & 1 deletion src/connector/codec/src/decoder/avro/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,8 @@ impl Access for AvroAccess<'_> {
let mut options: AvroParseOptions<'_> = self.options.clone();

debug_assert!(
path.len() == 1 || (path.len() == 2 && path[0] == "before"),
path.len() == 1
|| (path.len() == 2 && matches!(path[0], "before" | "after" | "source")),
"unexpected path access: {:?}",
path
);
Expand Down
4 changes: 3 additions & 1 deletion src/connector/codec/src/decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ pub trait Access {
///
/// What `path` to access is decided by the CDC layer, i.e., the `FORMAT ...` part (`ChangeEvent`).
/// e.g.,
/// - `DebeziumChangeEvent` accesses `["before", "col_name"]` for value, `["op"]` for op type.
/// - `DebeziumChangeEvent` accesses `["before", "col_name"]` for value,
/// `["source", "db"]`, `["source", "table"]` etc. for additional columns' values,
/// `["op"]` for op type.
/// - `MaxwellChangeEvent` accesses `["data", "col_name"]` for value, `["type"]` for op type.
/// - In the simplest case, for `FORMAT PLAIN/UPSERT` (`KvEvent`), they just access `["col_name"]` for value, and op type is derived.
///
Expand Down
8 changes: 8 additions & 0 deletions src/connector/src/parser/debezium/avro_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,14 @@ impl DebeziumAvroParserConfig {
// "default": null
// },
// ...]

// Other fields are:
// - source: describes the source metadata for the event
// - op
// - ts_ms
// - transaction
// See <https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-events>

avro_schema_to_column_descs(
avro_schema_skip_nullable_union(avro_extract_field_schema(
// FIXME: use resolved schema here.
Expand Down

0 comments on commit eb2fb5a

Please sign in to comment.