Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix error handling in postgres schema #2469

Merged
merged 5 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 51 additions & 25 deletions dozer-ingestion/postgres/src/schema/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,50 +212,76 @@ impl SchemaHelper {
) -> Result<Vec<Result<SourceSchema, PostgresConnectorError>>, PostgresConnectorError> {
let (results, tables_columns_map) = self.get_columns(Some(tables)).await?;

let mut columns_map: HashMap<SchemaTableIdentifier, PostgresTable> = HashMap::new();
let mut columns_map: HashMap<
SchemaTableIdentifier,
Result<PostgresTable, PostgresSchemaError>,
> = HashMap::new();

results
.iter()
.filter(|row| {
.filter_map(|row| {
let schema: String = row.get(8);
let table_name: String = row.get(0);
let column_name: String = row.get(1);

tables_columns_map
.get(&(schema, table_name))
.map_or(true, |columns| {
columns.is_empty() || columns.contains(&column_name)
.get(&(schema.clone(), table_name.clone()))
.map_or(Some((schema.clone(), table_name.clone(), row)), |columns| {
if columns.is_empty() || columns.contains(&column_name) {
Some((schema, table_name, row))
} else {
None
}
})
})
.map(|r| self.convert_row(r))
.try_for_each(|table_row| -> Result<(), PostgresSchemaError> {
let row = table_row?;
columns_map
.entry((row.schema, row.table_name))
.and_modify(|table| {
table.add_field(row.field.clone(), row.is_column_used_in_index)
})
.or_insert_with(|| {
let mut table = PostgresTable::new(row.replication_type);
table.add_field(row.field, row.is_column_used_in_index);
table
});

Ok(())
})?;
.map(|(schema, table_name, r)| (schema, table_name, self.convert_row(r)))
.try_for_each(
|(schema, table_name, table_row)| -> Result<(), PostgresSchemaError> {
match table_row {
Ok(row) => {
columns_map
.entry((row.schema, row.table_name))
.and_modify(|table| {
if let Ok(ref mut t) = table {
t.add_field(row.field.clone(), row.is_column_used_in_index);
}
})
.or_insert_with(|| {
let mut table = PostgresTable::new(row.replication_type);
table.add_field(row.field, row.is_column_used_in_index);
Ok(table)
});
}
Err(e) => {
columns_map.insert((schema, table_name), Err(e));
}
}

let columns_map = sort_schemas(tables, &columns_map)?;
Ok(())
},
)?;

Ok(Self::map_columns_to_schemas(columns_map))
Ok(Self::map_columns_to_schemas(sort_schemas(
tables,
columns_map,
)?))
}

fn map_columns_to_schemas(
postgres_tables: Vec<(SchemaTableIdentifier, PostgresTable)>,
postgres_tables: Vec<(
SchemaTableIdentifier,
Result<PostgresTable, PostgresSchemaError>,
)>,
) -> Vec<Result<SourceSchema, PostgresConnectorError>> {
postgres_tables
.into_iter()
.map(|((_, table_name), table)| {
Self::map_schema(&table_name, table)
table
.map_err(PostgresConnectorError::PostgresSchemaError)
.and_then(|table| {
Self::map_schema(&table_name, table)
.map_err(PostgresConnectorError::PostgresSchemaError)
})
})
.collect()
}
Expand Down
154 changes: 100 additions & 54 deletions dozer-ingestion/postgres/src/schema/sorter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ use crate::PostgresSchemaError;

use super::helper::{PostgresTable, SchemaTableIdentifier, DEFAULT_SCHEMA_NAME};

pub type PostgresTableResult = Result<PostgresTable, PostgresSchemaError>;

pub fn sort_schemas(
expected_tables_order: &[ListOrFilterColumns],
mapped_tables: &HashMap<SchemaTableIdentifier, PostgresTable>,
) -> Result<Vec<(SchemaTableIdentifier, PostgresTable)>, PostgresSchemaError> {
let mut sorted_tables: Vec<(SchemaTableIdentifier, PostgresTable)> = Vec::new();
mut mapped_tables: HashMap<SchemaTableIdentifier, PostgresTableResult>,
) -> Result<Vec<(SchemaTableIdentifier, PostgresTableResult)>, PostgresSchemaError> {
let mut sorted_tables: Vec<(SchemaTableIdentifier, PostgresTableResult)> = Vec::new();

for table in expected_tables_order.iter() {
let table_identifier = (
table
Expand All @@ -19,28 +22,36 @@ pub fn sort_schemas(
.unwrap_or(DEFAULT_SCHEMA_NAME.to_string()),
table.name.clone(),
);
let postgres_table = mapped_tables
.get(&table_identifier)
.ok_or(PostgresSchemaError::ColumnNotFound)?;

let sorted_table = table.columns.as_ref().map_or_else(
|| Ok::<PostgresTable, PostgresSchemaError>(postgres_table.clone()),
|expected_order| {
if expected_order.is_empty() {
Ok(postgres_table.clone())
} else {
let sorted_fields = sort_fields(postgres_table, expected_order)?;
let mut new_table =
PostgresTable::new(postgres_table.replication_type().clone());
sorted_fields
.into_iter()
.for_each(|(f, is_index_field)| new_table.add_field(f, is_index_field));
Ok(new_table)
}
},
)?;
let postgres_table_result = mapped_tables
.remove(&table_identifier)
.ok_or(PostgresSchemaError::ColumnNotFound)?;

sorted_tables.push((table_identifier, sorted_table));
let sorted_table = match postgres_table_result {
Ok(postgres_table) => table.columns.as_ref().map_or_else(
|| Ok::<PostgresTable, PostgresSchemaError>(postgres_table.clone()),
|expected_order| {
if expected_order.is_empty() {
Ok(postgres_table.clone())
} else {
match sort_fields(&postgres_table, expected_order) {
Ok(sorted_fields) => {
let mut new_table =
PostgresTable::new(postgres_table.replication_type().clone());
sorted_fields.into_iter().for_each(|(f, is_index_field)| {
new_table.add_field(f.clone(), is_index_field)
});
Ok(new_table)
}
Err(e) => Err(e),
}
}
},
),
Err(e) => Err(e),
};

sorted_tables.push((table_identifier, sorted_table))
}

Ok(sorted_tables)
Expand Down Expand Up @@ -142,7 +153,7 @@ mod tests {
let mut mapped_tables = HashMap::new();
mapped_tables.insert(
("public".to_string(), "sort_test".to_string()),
postgres_table.clone(),
Ok(postgres_table.clone()),
);

let expected_table_order = &[ListOrFilterColumns {
Expand All @@ -151,30 +162,49 @@ mod tests {
columns: None,
}];

let result = sort_schemas(expected_table_order, &mapped_tables).unwrap();
let result = sort_schemas(expected_table_order, mapped_tables).unwrap();
let fields = result.first().unwrap().1.as_ref().unwrap().fields();
assert_eq!(
result.first().unwrap().1.fields().first().unwrap().name,
fields.first().unwrap().name,
postgres_table.get_field(0).unwrap().name
);
assert_eq!(
result.first().unwrap().1.fields().get(1).unwrap().name,
fields.get(1).unwrap().name,
postgres_table.get_field(1).unwrap().name
);
assert_eq!(
result.first().unwrap().1.fields().get(2).unwrap().name,
fields.get(2).unwrap().name,
postgres_table.get_field(2).unwrap().name
);

assert_eq!(
result.first().unwrap().1.is_index_field(0),
result
.first()
.unwrap()
.1
.as_ref()
.unwrap()
.is_index_field(0),
postgres_table.is_index_field(0)
);
assert_eq!(
result.first().unwrap().1.is_index_field(1),
result
.first()
.unwrap()
.1
.as_ref()
.unwrap()
.is_index_field(1),
postgres_table.is_index_field(1)
);
assert_eq!(
result.first().unwrap().1.is_index_field(2),
result
.first()
.unwrap()
.1
.as_ref()
.unwrap()
.is_index_field(2),
postgres_table.is_index_field(2)
);
}
Expand All @@ -185,7 +215,7 @@ mod tests {
let mut mapped_tables = HashMap::new();
mapped_tables.insert(
("public".to_string(), "sort_test".to_string()),
postgres_table,
Ok(postgres_table),
);

let columns_order = vec!["third field".to_string()];
Expand All @@ -195,12 +225,24 @@ mod tests {
columns: Some(columns_order.clone()),
}];

let result = sort_schemas(expected_table_order, &mapped_tables).unwrap();
let result = sort_schemas(expected_table_order, mapped_tables).unwrap();
assert_eq!(
&result.first().unwrap().1.fields().first().unwrap().name,
&result
.first()
.unwrap()
.1
.as_ref()
.unwrap()
.fields()
.first()
.unwrap()
.name,
columns_order.first().unwrap()
);
assert_eq!(result.first().unwrap().1.fields().len(), 1);
assert_eq!(
result.first().unwrap().1.as_ref().unwrap().fields().len(),
1
);
}

#[test]
Expand All @@ -209,7 +251,7 @@ mod tests {
let mut mapped_tables = HashMap::new();
mapped_tables.insert(
("public".to_string(), "sort_test".to_string()),
postgres_table,
Ok(postgres_table),
);

let columns_order = vec![
Expand All @@ -223,20 +265,18 @@ mod tests {
columns: Some(columns_order.clone()),
}];

let result = sort_schemas(expected_table_order, &mapped_tables).unwrap();
let result = sort_schemas(expected_table_order, mapped_tables).unwrap();
let fields = result.first().unwrap().1.as_ref().unwrap().fields();
assert_eq!(
&result.first().unwrap().1.fields().first().unwrap().name,
&fields.first().unwrap().name,
columns_order.first().unwrap()
);
assert_eq!(&fields.get(1).unwrap().name, columns_order.get(1).unwrap());
assert_eq!(&fields.get(2).unwrap().name, columns_order.get(2).unwrap());
assert_eq!(
&result.first().unwrap().1.fields().get(1).unwrap().name,
columns_order.get(1).unwrap()
);
assert_eq!(
&result.first().unwrap().1.fields().get(2).unwrap().name,
columns_order.get(2).unwrap()
result.first().unwrap().1.as_ref().unwrap().fields().len(),
3
);
assert_eq!(result.first().unwrap().1.fields().len(), 3);
}

#[test]
Expand All @@ -246,11 +286,11 @@ mod tests {
let mut mapped_tables = HashMap::new();
mapped_tables.insert(
("public".to_string(), "sort_test_second".to_string()),
postgres_table_1,
Ok(postgres_table_1),
);
mapped_tables.insert(
("public".to_string(), "sort_test_first".to_string()),
postgres_table_2,
Ok(postgres_table_2),
);

let columns_order_1 = vec![
Expand All @@ -276,10 +316,16 @@ mod tests {
},
];

let result = sort_schemas(expected_table_order, &mapped_tables).unwrap();
let result = sort_schemas(expected_table_order, mapped_tables).unwrap();
let first_table_after_sort = result.first().unwrap();
let second_table_after_sort = result.get(1).unwrap();

let first_table = first_table_after_sort.1.as_ref().unwrap().clone();
let first_table_fields = first_table.fields();

let second_table = second_table_after_sort.1.as_ref().unwrap().clone();
let second_table_fields = second_table.fields();

assert_eq!(
first_table_after_sort.0 .1,
expected_table_order.first().unwrap().name
Expand All @@ -289,27 +335,27 @@ mod tests {
expected_table_order.get(1).unwrap().name
);
assert_eq!(
&first_table_after_sort.1.fields().first().unwrap().name,
&first_table_fields.first().unwrap().name,
columns_order_1.first().unwrap()
);
assert_eq!(
&first_table_after_sort.1.fields().get(1).unwrap().name,
&first_table_fields.get(1).unwrap().name,
columns_order_1.get(1).unwrap()
);
assert_eq!(
&first_table_after_sort.1.fields().get(2).unwrap().name,
&first_table_fields.get(2).unwrap().name,
columns_order_1.get(2).unwrap()
);
assert_eq!(
&second_table_after_sort.1.fields().first().unwrap().name,
&second_table_fields.first().unwrap().name,
columns_order_2.first().unwrap()
);
assert_eq!(
&second_table_after_sort.1.fields().get(1).unwrap().name,
&second_table_fields.get(1).unwrap().name,
columns_order_2.get(1).unwrap()
);
assert_eq!(
&second_table_after_sort.1.fields().get(2).unwrap().name,
&second_table_fields.get(2).unwrap().name,
columns_order_2.get(2).unwrap()
);
}
Expand Down
Loading
Loading