From ca811facc6c007bd19496da5c0af3d011db81fa3 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 2 Jul 2024 23:16:20 +0800 Subject: [PATCH] fix debezium Signed-off-by: xxchan --- src/connector/codec/src/decoder/avro/mod.rs | 3 ++- src/connector/codec/src/decoder/mod.rs | 4 +++- src/connector/src/parser/debezium/avro_parser.rs | 8 ++++++++ 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/src/connector/codec/src/decoder/avro/mod.rs b/src/connector/codec/src/decoder/avro/mod.rs index a001531019759..bafb5cfe4e075 100644 --- a/src/connector/codec/src/decoder/avro/mod.rs +++ b/src/connector/codec/src/decoder/avro/mod.rs @@ -337,7 +337,8 @@ impl Access for AvroAccess<'_> { let mut options: AvroParseOptions<'_> = self.options.clone(); debug_assert!( - path.len() == 1 || (path.len() == 2 && path[0] == "before"), + path.len() == 1 + || (path.len() == 2 && matches!(path[0], "before" | "after" | "source")), "unexpected path access: {:?}", path ); diff --git a/src/connector/codec/src/decoder/mod.rs b/src/connector/codec/src/decoder/mod.rs index a2823cceca2ff..158c5b532b29d 100644 --- a/src/connector/codec/src/decoder/mod.rs +++ b/src/connector/codec/src/decoder/mod.rs @@ -67,7 +67,9 @@ pub trait Access { /// /// What `path` to access is decided by the CDC layer, i.e., the `FORMAT ...` part (`ChangeEvent`). /// e.g., - /// - `DebeziumChangeEvent` accesses `["before", "col_name"]` for value, `["op"]` for op type. + /// - `DebeziumChangeEvent` accesses `["before", "col_name"]` for value, + /// `["source", "db"]`, `["source", "table"]` etc. for additional columns' values, + /// `["op"]` for op type. /// - `MaxwellChangeEvent` accesses `["data", "col_name"]` for value, `["type"]` for op type. /// - In the simplest case, for `FORMAT PLAIN/UPSERT` (`KvEvent`), they just access `["col_name"]` for value, and op type is derived. /// diff --git a/src/connector/src/parser/debezium/avro_parser.rs b/src/connector/src/parser/debezium/avro_parser.rs index 70e7304c0eea8..04f80ebba1ca1 100644 --- a/src/connector/src/parser/debezium/avro_parser.rs +++ b/src/connector/src/parser/debezium/avro_parser.rs @@ -149,6 +149,14 @@ impl DebeziumAvroParserConfig { // "default": null // }, // ...] + + // Other fields are: + // - source: describes the source metadata for the event + // - op + // - ts_ms + // - transaction + // See + avro_schema_to_column_descs( avro_schema_skip_nullable_union(avro_extract_field_schema( // FIXME: use resolved schema here.