Skip to content

Commit

Permalink
fix: Fix error handling in postgres schema
Browse files Browse the repository at this point in the history
  • Loading branch information
karolisg committed Mar 29, 2024
1 parent 2963edb commit 8e6055c
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 79 deletions.
76 changes: 51 additions & 25 deletions dozer-ingestion/postgres/src/schema/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,50 +212,76 @@ impl SchemaHelper {
) -> Result<Vec<Result<SourceSchema, PostgresConnectorError>>, PostgresConnectorError> {
let (results, tables_columns_map) = self.get_columns(Some(tables)).await?;

let mut columns_map: HashMap<SchemaTableIdentifier, PostgresTable> = HashMap::new();
let mut columns_map: HashMap<
SchemaTableIdentifier,
Result<PostgresTable, PostgresSchemaError>,
> = HashMap::new();

results
.iter()
.filter(|row| {
.filter_map(|row| {
let schema: String = row.get(8);
let table_name: String = row.get(0);
let column_name: String = row.get(1);

tables_columns_map
.get(&(schema, table_name))
.map_or(true, |columns| {
columns.is_empty() || columns.contains(&column_name)
.get(&(schema.clone(), table_name.clone()))
.map_or(Some((schema.clone(), table_name.clone(), row)), |columns| {
if columns.is_empty() || columns.contains(&column_name) {
Some((schema, table_name, row))
} else {
None
}
})
})
.map(|r| self.convert_row(r))
.try_for_each(|table_row| -> Result<(), PostgresSchemaError> {
let row = table_row?;
columns_map
.entry((row.schema, row.table_name))
.and_modify(|table| {
table.add_field(row.field.clone(), row.is_column_used_in_index)
})
.or_insert_with(|| {
let mut table = PostgresTable::new(row.replication_type);
table.add_field(row.field, row.is_column_used_in_index);
table
});

Ok(())
})?;
.map(|(schema, table_name, r)| (schema, table_name, self.convert_row(r)))
.try_for_each(
|(schema, table_name, table_row)| -> Result<(), PostgresSchemaError> {
match table_row {
Ok(row) => {
columns_map
.entry((row.schema, row.table_name))
.and_modify(|table| {
if let Ok(ref mut t) = table {
t.add_field(row.field.clone(), row.is_column_used_in_index);
}
})
.or_insert_with(|| {
let mut table = PostgresTable::new(row.replication_type);
table.add_field(row.field, row.is_column_used_in_index);
Ok(table)
});
}
Err(e) => {
columns_map.insert((schema, table_name), Err(e));
}
}

let columns_map = sort_schemas(tables, &columns_map)?;
Ok(())
},
)?;

Ok(Self::map_columns_to_schemas(columns_map))
Ok(Self::map_columns_to_schemas(sort_schemas(
tables,
columns_map,
)?))
}

fn map_columns_to_schemas(
postgres_tables: Vec<(SchemaTableIdentifier, PostgresTable)>,
postgres_tables: Vec<(
SchemaTableIdentifier,
Result<PostgresTable, PostgresSchemaError>,
)>,
) -> Vec<Result<SourceSchema, PostgresConnectorError>> {
postgres_tables
.into_iter()
.map(|((_, table_name), table)| {
Self::map_schema(&table_name, table)
table
.map_err(PostgresConnectorError::PostgresSchemaError)
.and_then(|table| {
Self::map_schema(&table_name, table)
.map_err(PostgresConnectorError::PostgresSchemaError)
})
})
.collect()
}
Expand Down
154 changes: 100 additions & 54 deletions dozer-ingestion/postgres/src/schema/sorter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ use crate::PostgresSchemaError;

use super::helper::{PostgresTable, SchemaTableIdentifier, DEFAULT_SCHEMA_NAME};

pub type PostgresTableResult = Result<PostgresTable, PostgresSchemaError>;

pub fn sort_schemas(
expected_tables_order: &[ListOrFilterColumns],
mapped_tables: &HashMap<SchemaTableIdentifier, PostgresTable>,
) -> Result<Vec<(SchemaTableIdentifier, PostgresTable)>, PostgresSchemaError> {
let mut sorted_tables: Vec<(SchemaTableIdentifier, PostgresTable)> = Vec::new();
mut mapped_tables: HashMap<SchemaTableIdentifier, PostgresTableResult>,
) -> Result<Vec<(SchemaTableIdentifier, PostgresTableResult)>, PostgresSchemaError> {
let mut sorted_tables: Vec<(SchemaTableIdentifier, PostgresTableResult)> = Vec::new();

for table in expected_tables_order.iter() {
let table_identifier = (
table
Expand All @@ -19,28 +22,36 @@ pub fn sort_schemas(
.unwrap_or(DEFAULT_SCHEMA_NAME.to_string()),
table.name.clone(),
);
let postgres_table = mapped_tables
.get(&table_identifier)
.ok_or(PostgresSchemaError::ColumnNotFound)?;

let sorted_table = table.columns.as_ref().map_or_else(
|| Ok::<PostgresTable, PostgresSchemaError>(postgres_table.clone()),
|expected_order| {
if expected_order.is_empty() {
Ok(postgres_table.clone())
} else {
let sorted_fields = sort_fields(postgres_table, expected_order)?;
let mut new_table =
PostgresTable::new(postgres_table.replication_type().clone());
sorted_fields
.into_iter()
.for_each(|(f, is_index_field)| new_table.add_field(f, is_index_field));
Ok(new_table)
}
},
)?;
let postgres_table_result = mapped_tables
.remove(&table_identifier)
.ok_or(PostgresSchemaError::ColumnNotFound)?;

sorted_tables.push((table_identifier, sorted_table));
let sorted_table = match postgres_table_result {
Ok(postgres_table) => table.columns.as_ref().map_or_else(
|| Ok::<PostgresTable, PostgresSchemaError>(postgres_table.clone()),
|expected_order| {
if expected_order.is_empty() {
Ok(postgres_table.clone())
} else {
match sort_fields(&postgres_table, expected_order) {
Ok(sorted_fields) => {
let mut new_table =
PostgresTable::new(postgres_table.replication_type().clone());
sorted_fields.into_iter().for_each(|(f, is_index_field)| {
new_table.add_field(f.clone(), is_index_field)
});
Ok(new_table)
}
Err(e) => Err(e),
}
}
},
),
Err(e) => Err(e),
};

sorted_tables.push((table_identifier, sorted_table))
}

Ok(sorted_tables)
Expand Down Expand Up @@ -142,7 +153,7 @@ mod tests {
let mut mapped_tables = HashMap::new();
mapped_tables.insert(
("public".to_string(), "sort_test".to_string()),
postgres_table.clone(),
Ok(postgres_table.clone()),
);

let expected_table_order = &[ListOrFilterColumns {
Expand All @@ -151,30 +162,49 @@ mod tests {
columns: None,
}];

let result = sort_schemas(expected_table_order, &mapped_tables).unwrap();
let result = sort_schemas(expected_table_order, mapped_tables).unwrap();
let fields = result.first().unwrap().1.as_ref().unwrap().fields();
assert_eq!(
result.first().unwrap().1.fields().first().unwrap().name,
fields.first().unwrap().name,
postgres_table.get_field(0).unwrap().name
);
assert_eq!(
result.first().unwrap().1.fields().get(1).unwrap().name,
fields.get(1).unwrap().name,
postgres_table.get_field(1).unwrap().name
);
assert_eq!(
result.first().unwrap().1.fields().get(2).unwrap().name,
fields.get(2).unwrap().name,
postgres_table.get_field(2).unwrap().name
);

assert_eq!(
result.first().unwrap().1.is_index_field(0),
result
.first()
.unwrap()
.1
.as_ref()
.unwrap()
.is_index_field(0),
postgres_table.is_index_field(0)
);
assert_eq!(
result.first().unwrap().1.is_index_field(1),
result
.first()
.unwrap()
.1
.as_ref()
.unwrap()
.is_index_field(1),
postgres_table.is_index_field(1)
);
assert_eq!(
result.first().unwrap().1.is_index_field(2),
result
.first()
.unwrap()
.1
.as_ref()
.unwrap()
.is_index_field(2),
postgres_table.is_index_field(2)
);
}
Expand All @@ -185,7 +215,7 @@ mod tests {
let mut mapped_tables = HashMap::new();
mapped_tables.insert(
("public".to_string(), "sort_test".to_string()),
postgres_table,
Ok(postgres_table),
);

let columns_order = vec!["third field".to_string()];
Expand All @@ -195,12 +225,24 @@ mod tests {
columns: Some(columns_order.clone()),
}];

let result = sort_schemas(expected_table_order, &mapped_tables).unwrap();
let result = sort_schemas(expected_table_order, mapped_tables).unwrap();
assert_eq!(
&result.first().unwrap().1.fields().first().unwrap().name,
&result
.first()
.unwrap()
.1
.as_ref()
.unwrap()
.fields()
.first()
.unwrap()
.name,
columns_order.first().unwrap()
);
assert_eq!(result.first().unwrap().1.fields().len(), 1);
assert_eq!(
result.first().unwrap().1.as_ref().unwrap().fields().len(),
1
);
}

#[test]
Expand All @@ -209,7 +251,7 @@ mod tests {
let mut mapped_tables = HashMap::new();
mapped_tables.insert(
("public".to_string(), "sort_test".to_string()),
postgres_table,
Ok(postgres_table),
);

let columns_order = vec![
Expand All @@ -223,20 +265,18 @@ mod tests {
columns: Some(columns_order.clone()),
}];

let result = sort_schemas(expected_table_order, &mapped_tables).unwrap();
let result = sort_schemas(expected_table_order, mapped_tables).unwrap();
let fields = result.first().unwrap().1.as_ref().unwrap().fields();
assert_eq!(
&result.first().unwrap().1.fields().first().unwrap().name,
&fields.first().unwrap().name,
columns_order.first().unwrap()
);
assert_eq!(&fields.get(1).unwrap().name, columns_order.get(1).unwrap());
assert_eq!(&fields.get(2).unwrap().name, columns_order.get(2).unwrap());
assert_eq!(
&result.first().unwrap().1.fields().get(1).unwrap().name,
columns_order.get(1).unwrap()
);
assert_eq!(
&result.first().unwrap().1.fields().get(2).unwrap().name,
columns_order.get(2).unwrap()
result.first().unwrap().1.as_ref().unwrap().fields().len(),
3
);
assert_eq!(result.first().unwrap().1.fields().len(), 3);
}

#[test]
Expand All @@ -246,11 +286,11 @@ mod tests {
let mut mapped_tables = HashMap::new();
mapped_tables.insert(
("public".to_string(), "sort_test_second".to_string()),
postgres_table_1,
Ok(postgres_table_1),
);
mapped_tables.insert(
("public".to_string(), "sort_test_first".to_string()),
postgres_table_2,
Ok(postgres_table_2),
);

let columns_order_1 = vec![
Expand All @@ -276,10 +316,16 @@ mod tests {
},
];

let result = sort_schemas(expected_table_order, &mapped_tables).unwrap();
let result = sort_schemas(expected_table_order, mapped_tables).unwrap();
let first_table_after_sort = result.first().unwrap();
let second_table_after_sort = result.get(1).unwrap();

let first_table = first_table_after_sort.1.as_ref().unwrap().clone();
let first_table_fields = first_table.fields();

let second_table = second_table_after_sort.1.as_ref().unwrap().clone();
let second_table_fields = second_table.fields();

assert_eq!(
first_table_after_sort.0 .1,
expected_table_order.first().unwrap().name
Expand All @@ -289,27 +335,27 @@ mod tests {
expected_table_order.get(1).unwrap().name
);
assert_eq!(
&first_table_after_sort.1.fields().first().unwrap().name,
&first_table_fields.first().unwrap().name,
columns_order_1.first().unwrap()
);
assert_eq!(
&first_table_after_sort.1.fields().get(1).unwrap().name,
&first_table_fields.get(1).unwrap().name,
columns_order_1.get(1).unwrap()
);
assert_eq!(
&first_table_after_sort.1.fields().get(2).unwrap().name,
&first_table_fields.get(2).unwrap().name,
columns_order_1.get(2).unwrap()
);
assert_eq!(
&second_table_after_sort.1.fields().first().unwrap().name,
&second_table_fields.first().unwrap().name,
columns_order_2.first().unwrap()
);
assert_eq!(
&second_table_after_sort.1.fields().get(1).unwrap().name,
&second_table_fields.get(1).unwrap().name,
columns_order_2.get(1).unwrap()
);
assert_eq!(
&second_table_after_sort.1.fields().get(2).unwrap().name,
&second_table_fields.get(2).unwrap().name,
columns_order_2.get(2).unwrap()
);
}
Expand Down

0 comments on commit 8e6055c

Please sign in to comment.