Skip to content

Commit

Permalink
Fix postgres field reordering during CDC (#2151)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jesse-Bakker authored Oct 12, 2023
1 parent 8129a18 commit abd3161
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 9 deletions.
8 changes: 8 additions & 0 deletions dozer-ingestion/src/connectors/postgres/xlog_mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,14 @@ impl XlogMapper {
})
}

columns.sort_by_cached_key(|column| {
wanted_columns
.iter()
.position(|wanted| wanted == &column.name)
// Unwrap is safe because we filtered on present keys above
.unwrap()
});

let replica_identity = match relation.replica_identity() {
ReplicaIdentity::Default => ReplicaIdentity::Default,
ReplicaIdentity::Nothing => ReplicaIdentity::Nothing,
Expand Down
27 changes: 18 additions & 9 deletions dozer-ingestion/tests/test_suite/basic.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{sync::Arc, time::Duration};

use dozer_ingestion::{
connectors::{CdcType, Connector, SourceSchema, TableIdentifier},
connectors::{CdcType, Connector, SourceSchema, TableIdentifier, TableInfo},
test_util::spawn_connector,
};
use dozer_types::{
Expand All @@ -11,6 +11,8 @@ use dozer_types::{
};
use tokio::runtime::Runtime;

use crate::test_suite::data::reorder;

use super::{
data,
records::{Operation as RecordsOperation, Records},
Expand Down Expand Up @@ -197,20 +199,27 @@ pub async fn run_test_suite_basic_cud<T: CudConnectorTest>(runtime: Arc<Runtime>
// Create connector.
let schema_name = None;
let table_name = "test_table".to_string();
let (connector_test, connector, (_, actual_primary_index)) = T::new(
let (connector_test, connector, (actual_fields, actual_primary_index)) = T::new(
schema_name.clone(),
table_name.clone(),
(fields, primary_index),
(fields.clone(), primary_index),
vec![],
)
.await
.unwrap();

// Get schema.
let tables = connector
.list_columns(vec![TableIdentifier::new(schema_name, table_name)])
.await
.unwrap();
let ((reordered_fields, _reordered_primary_index), reordered_operations) =
reorder(&actual_fields, &actual_primary_index, &operations);

// Create schema.
let tables = vec![TableInfo {
schema: schema_name,
name: table_name,
column_names: reordered_fields
.into_iter()
.map(|field| field.name)
.collect(),
}];
let mut schemas = connector.get_schemas(&tables).await.unwrap();
let actual_schema = schemas.remove(0).unwrap().schema;

Expand Down Expand Up @@ -262,7 +271,7 @@ pub async fn run_test_suite_basic_cud<T: CudConnectorTest>(runtime: Arc<Runtime>
// We can't check operation exact match because the connector may have batched some of them,
// so we check that the final state is the same.
let mut expected_records = Records::new(actual_primary_index);
for operation in operations {
for operation in reordered_operations {
expected_records.append_operation(operation);
}
assert_eq!(records, expected_records);
Expand Down
22 changes: 22 additions & 0 deletions dozer-ingestion/tests/test_suite/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,25 @@ pub fn cud_operations() -> (FieldsAndPk, Vec<Operation>) {
];
(schema, operations)
}

pub fn reorder(
fields: &[FieldDefinition],
pk: &[usize],
operations: &[Operation],
) -> (FieldsAndPk, Vec<Operation>) {
let reversed_fields = fields.iter().rev().cloned().collect();
let reversed_pk = pk.iter().map(|pk| fields.len() - 1 - pk).collect();

let mut reversed_operations = operations.to_vec();
for op in reversed_operations.iter_mut() {
match op {
Operation::Insert { new } => new.reverse(),
Operation::Update { old, new } => {
old.reverse();
new.reverse();
}
Operation::Delete { old } => old.reverse(),
}
}
((reversed_fields, reversed_pk), reversed_operations)
}

0 comments on commit abd3161

Please sign in to comment.