From abd3161494d485243c81cced4f694b0f7cb4d6ed Mon Sep 17 00:00:00 2001 From: Jesse Date: Thu, 12 Oct 2023 04:37:04 +0200 Subject: [PATCH] Fix postgres field reordering during CDC (#2151) --- .../src/connectors/postgres/xlog_mapper.rs | 8 ++++++ dozer-ingestion/tests/test_suite/basic.rs | 27 ++++++++++++------- dozer-ingestion/tests/test_suite/data.rs | 22 +++++++++++++++ 3 files changed, 48 insertions(+), 9 deletions(-) diff --git a/dozer-ingestion/src/connectors/postgres/xlog_mapper.rs b/dozer-ingestion/src/connectors/postgres/xlog_mapper.rs index 1b1d28e800..3e0ebaf119 100644 --- a/dozer-ingestion/src/connectors/postgres/xlog_mapper.rs +++ b/dozer-ingestion/src/connectors/postgres/xlog_mapper.rs @@ -178,6 +178,14 @@ impl XlogMapper { }) } + columns.sort_by_cached_key(|column| { + wanted_columns + .iter() + .position(|wanted| wanted == &column.name) + // Unwrap is safe because we filtered on present keys above + .unwrap() + }); + let replica_identity = match relation.replica_identity() { ReplicaIdentity::Default => ReplicaIdentity::Default, ReplicaIdentity::Nothing => ReplicaIdentity::Nothing, diff --git a/dozer-ingestion/tests/test_suite/basic.rs b/dozer-ingestion/tests/test_suite/basic.rs index 5971621ec5..2da1d8da41 100644 --- a/dozer-ingestion/tests/test_suite/basic.rs +++ b/dozer-ingestion/tests/test_suite/basic.rs @@ -1,7 +1,7 @@ use std::{sync::Arc, time::Duration}; use dozer_ingestion::{ - connectors::{CdcType, Connector, SourceSchema, TableIdentifier}, + connectors::{CdcType, Connector, SourceSchema, TableIdentifier, TableInfo}, test_util::spawn_connector, }; use dozer_types::{ @@ -11,6 +11,8 @@ use dozer_types::{ }; use tokio::runtime::Runtime; +use crate::test_suite::data::reorder; + use super::{ data, records::{Operation as RecordsOperation, Records}, @@ -197,20 +199,27 @@ pub async fn run_test_suite_basic_cud(runtime: Arc // Create connector. let schema_name = None; let table_name = "test_table".to_string(); - let (connector_test, connector, (_, actual_primary_index)) = T::new( + let (connector_test, connector, (actual_fields, actual_primary_index)) = T::new( schema_name.clone(), table_name.clone(), - (fields, primary_index), + (fields.clone(), primary_index), vec![], ) .await .unwrap(); - // Get schema. - let tables = connector - .list_columns(vec![TableIdentifier::new(schema_name, table_name)]) - .await - .unwrap(); + let ((reordered_fields, _reordered_primary_index), reordered_operations) = + reorder(&actual_fields, &actual_primary_index, &operations); + + // Create schema. + let tables = vec![TableInfo { + schema: schema_name, + name: table_name, + column_names: reordered_fields + .into_iter() + .map(|field| field.name) + .collect(), + }]; let mut schemas = connector.get_schemas(&tables).await.unwrap(); let actual_schema = schemas.remove(0).unwrap().schema; @@ -262,7 +271,7 @@ pub async fn run_test_suite_basic_cud(runtime: Arc // We can't check operation exact match because the connector may have batched some of them, // so we check that the final state is the same. let mut expected_records = Records::new(actual_primary_index); - for operation in operations { + for operation in reordered_operations { expected_records.append_operation(operation); } assert_eq!(records, expected_records); diff --git a/dozer-ingestion/tests/test_suite/data.rs b/dozer-ingestion/tests/test_suite/data.rs index 0e59e5975d..7892a35689 100644 --- a/dozer-ingestion/tests/test_suite/data.rs +++ b/dozer-ingestion/tests/test_suite/data.rs @@ -45,3 +45,25 @@ pub fn cud_operations() -> (FieldsAndPk, Vec) { ]; (schema, operations) } + +pub fn reorder( + fields: &[FieldDefinition], + pk: &[usize], + operations: &[Operation], +) -> (FieldsAndPk, Vec) { + let reversed_fields = fields.iter().rev().cloned().collect(); + let reversed_pk = pk.iter().map(|pk| fields.len() - 1 - pk).collect(); + + let mut reversed_operations = operations.to_vec(); + for op in reversed_operations.iter_mut() { + match op { + Operation::Insert { new } => new.reverse(), + Operation::Update { old, new } => { + old.reverse(); + new.reverse(); + } + Operation::Delete { old } => old.reverse(), + } + } + ((reversed_fields, reversed_pk), reversed_operations) +}