diff --git a/dozer-ingestion/postgres/src/schema/helper.rs b/dozer-ingestion/postgres/src/schema/helper.rs index b0a037f521..aa869c7e2d 100644 --- a/dozer-ingestion/postgres/src/schema/helper.rs +++ b/dozer-ingestion/postgres/src/schema/helper.rs @@ -212,50 +212,76 @@ impl SchemaHelper { ) -> Result>, PostgresConnectorError> { let (results, tables_columns_map) = self.get_columns(Some(tables)).await?; - let mut columns_map: HashMap = HashMap::new(); + let mut columns_map: HashMap< + SchemaTableIdentifier, + Result, + > = HashMap::new(); + results .iter() - .filter(|row| { + .filter_map(|row| { let schema: String = row.get(8); let table_name: String = row.get(0); let column_name: String = row.get(1); tables_columns_map - .get(&(schema, table_name)) - .map_or(true, |columns| { - columns.is_empty() || columns.contains(&column_name) + .get(&(schema.clone(), table_name.clone())) + .map_or(Some((schema.clone(), table_name.clone(), row)), |columns| { + if columns.is_empty() || columns.contains(&column_name) { + Some((schema, table_name, row)) + } else { + None + } }) }) - .map(|r| self.convert_row(r)) - .try_for_each(|table_row| -> Result<(), PostgresSchemaError> { - let row = table_row?; - columns_map - .entry((row.schema, row.table_name)) - .and_modify(|table| { - table.add_field(row.field.clone(), row.is_column_used_in_index) - }) - .or_insert_with(|| { - let mut table = PostgresTable::new(row.replication_type); - table.add_field(row.field, row.is_column_used_in_index); - table - }); - - Ok(()) - })?; + .map(|(schema, table_name, r)| (schema, table_name, self.convert_row(r))) + .try_for_each( + |(schema, table_name, table_row)| -> Result<(), PostgresSchemaError> { + match table_row { + Ok(row) => { + columns_map + .entry((row.schema, row.table_name)) + .and_modify(|table| { + if let Ok(ref mut t) = table { + t.add_field(row.field.clone(), row.is_column_used_in_index); + } + }) + .or_insert_with(|| { + let mut table = PostgresTable::new(row.replication_type); + table.add_field(row.field, row.is_column_used_in_index); + Ok(table) + }); + } + Err(e) => { + columns_map.insert((schema, table_name), Err(e)); + } + } - let columns_map = sort_schemas(tables, &columns_map)?; + Ok(()) + }, + )?; - Ok(Self::map_columns_to_schemas(columns_map)) + Ok(Self::map_columns_to_schemas(sort_schemas( + tables, + columns_map, + )?)) } fn map_columns_to_schemas( - postgres_tables: Vec<(SchemaTableIdentifier, PostgresTable)>, + postgres_tables: Vec<( + SchemaTableIdentifier, + Result, + )>, ) -> Vec> { postgres_tables .into_iter() .map(|((_, table_name), table)| { - Self::map_schema(&table_name, table) + table .map_err(PostgresConnectorError::PostgresSchemaError) + .and_then(|table| { + Self::map_schema(&table_name, table) + .map_err(PostgresConnectorError::PostgresSchemaError) + }) }) .collect() } diff --git a/dozer-ingestion/postgres/src/schema/sorter.rs b/dozer-ingestion/postgres/src/schema/sorter.rs index a910463a2b..f6bb1a880e 100644 --- a/dozer-ingestion/postgres/src/schema/sorter.rs +++ b/dozer-ingestion/postgres/src/schema/sorter.rs @@ -6,11 +6,14 @@ use crate::PostgresSchemaError; use super::helper::{PostgresTable, SchemaTableIdentifier, DEFAULT_SCHEMA_NAME}; +pub type PostgresTableResult = Result; + pub fn sort_schemas( expected_tables_order: &[ListOrFilterColumns], - mapped_tables: &HashMap, -) -> Result, PostgresSchemaError> { - let mut sorted_tables: Vec<(SchemaTableIdentifier, PostgresTable)> = Vec::new(); + mut mapped_tables: HashMap, +) -> Result, PostgresSchemaError> { + let mut sorted_tables: Vec<(SchemaTableIdentifier, PostgresTableResult)> = Vec::new(); + for table in expected_tables_order.iter() { let table_identifier = ( table @@ -19,28 +22,36 @@ pub fn sort_schemas( .unwrap_or(DEFAULT_SCHEMA_NAME.to_string()), table.name.clone(), ); - let postgres_table = mapped_tables - .get(&table_identifier) - .ok_or(PostgresSchemaError::ColumnNotFound)?; - let sorted_table = table.columns.as_ref().map_or_else( - || Ok::(postgres_table.clone()), - |expected_order| { - if expected_order.is_empty() { - Ok(postgres_table.clone()) - } else { - let sorted_fields = sort_fields(postgres_table, expected_order)?; - let mut new_table = - PostgresTable::new(postgres_table.replication_type().clone()); - sorted_fields - .into_iter() - .for_each(|(f, is_index_field)| new_table.add_field(f, is_index_field)); - Ok(new_table) - } - }, - )?; + let postgres_table_result = mapped_tables + .remove(&table_identifier) + .ok_or(PostgresSchemaError::ColumnNotFound)?; - sorted_tables.push((table_identifier, sorted_table)); + let sorted_table = match postgres_table_result { + Ok(postgres_table) => table.columns.as_ref().map_or_else( + || Ok::(postgres_table.clone()), + |expected_order| { + if expected_order.is_empty() { + Ok(postgres_table.clone()) + } else { + match sort_fields(&postgres_table, expected_order) { + Ok(sorted_fields) => { + let mut new_table = + PostgresTable::new(postgres_table.replication_type().clone()); + sorted_fields.into_iter().for_each(|(f, is_index_field)| { + new_table.add_field(f.clone(), is_index_field) + }); + Ok(new_table) + } + Err(e) => Err(e), + } + } + }, + ), + Err(e) => Err(e), + }; + + sorted_tables.push((table_identifier, sorted_table)) } Ok(sorted_tables) @@ -142,7 +153,7 @@ mod tests { let mut mapped_tables = HashMap::new(); mapped_tables.insert( ("public".to_string(), "sort_test".to_string()), - postgres_table.clone(), + Ok(postgres_table.clone()), ); let expected_table_order = &[ListOrFilterColumns { @@ -151,30 +162,49 @@ mod tests { columns: None, }]; - let result = sort_schemas(expected_table_order, &mapped_tables).unwrap(); + let result = sort_schemas(expected_table_order, mapped_tables).unwrap(); + let fields = result.first().unwrap().1.as_ref().unwrap().fields(); assert_eq!( - result.first().unwrap().1.fields().first().unwrap().name, + fields.first().unwrap().name, postgres_table.get_field(0).unwrap().name ); assert_eq!( - result.first().unwrap().1.fields().get(1).unwrap().name, + fields.get(1).unwrap().name, postgres_table.get_field(1).unwrap().name ); assert_eq!( - result.first().unwrap().1.fields().get(2).unwrap().name, + fields.get(2).unwrap().name, postgres_table.get_field(2).unwrap().name ); assert_eq!( - result.first().unwrap().1.is_index_field(0), + result + .first() + .unwrap() + .1 + .as_ref() + .unwrap() + .is_index_field(0), postgres_table.is_index_field(0) ); assert_eq!( - result.first().unwrap().1.is_index_field(1), + result + .first() + .unwrap() + .1 + .as_ref() + .unwrap() + .is_index_field(1), postgres_table.is_index_field(1) ); assert_eq!( - result.first().unwrap().1.is_index_field(2), + result + .first() + .unwrap() + .1 + .as_ref() + .unwrap() + .is_index_field(2), postgres_table.is_index_field(2) ); } @@ -185,7 +215,7 @@ mod tests { let mut mapped_tables = HashMap::new(); mapped_tables.insert( ("public".to_string(), "sort_test".to_string()), - postgres_table, + Ok(postgres_table), ); let columns_order = vec!["third field".to_string()]; @@ -195,12 +225,24 @@ mod tests { columns: Some(columns_order.clone()), }]; - let result = sort_schemas(expected_table_order, &mapped_tables).unwrap(); + let result = sort_schemas(expected_table_order, mapped_tables).unwrap(); assert_eq!( - &result.first().unwrap().1.fields().first().unwrap().name, + &result + .first() + .unwrap() + .1 + .as_ref() + .unwrap() + .fields() + .first() + .unwrap() + .name, columns_order.first().unwrap() ); - assert_eq!(result.first().unwrap().1.fields().len(), 1); + assert_eq!( + result.first().unwrap().1.as_ref().unwrap().fields().len(), + 1 + ); } #[test] @@ -209,7 +251,7 @@ mod tests { let mut mapped_tables = HashMap::new(); mapped_tables.insert( ("public".to_string(), "sort_test".to_string()), - postgres_table, + Ok(postgres_table), ); let columns_order = vec![ @@ -223,20 +265,18 @@ mod tests { columns: Some(columns_order.clone()), }]; - let result = sort_schemas(expected_table_order, &mapped_tables).unwrap(); + let result = sort_schemas(expected_table_order, mapped_tables).unwrap(); + let fields = result.first().unwrap().1.as_ref().unwrap().fields(); assert_eq!( - &result.first().unwrap().1.fields().first().unwrap().name, + &fields.first().unwrap().name, columns_order.first().unwrap() ); + assert_eq!(&fields.get(1).unwrap().name, columns_order.get(1).unwrap()); + assert_eq!(&fields.get(2).unwrap().name, columns_order.get(2).unwrap()); assert_eq!( - &result.first().unwrap().1.fields().get(1).unwrap().name, - columns_order.get(1).unwrap() - ); - assert_eq!( - &result.first().unwrap().1.fields().get(2).unwrap().name, - columns_order.get(2).unwrap() + result.first().unwrap().1.as_ref().unwrap().fields().len(), + 3 ); - assert_eq!(result.first().unwrap().1.fields().len(), 3); } #[test] @@ -246,11 +286,11 @@ mod tests { let mut mapped_tables = HashMap::new(); mapped_tables.insert( ("public".to_string(), "sort_test_second".to_string()), - postgres_table_1, + Ok(postgres_table_1), ); mapped_tables.insert( ("public".to_string(), "sort_test_first".to_string()), - postgres_table_2, + Ok(postgres_table_2), ); let columns_order_1 = vec![ @@ -276,10 +316,16 @@ mod tests { }, ]; - let result = sort_schemas(expected_table_order, &mapped_tables).unwrap(); + let result = sort_schemas(expected_table_order, mapped_tables).unwrap(); let first_table_after_sort = result.first().unwrap(); let second_table_after_sort = result.get(1).unwrap(); + let first_table = first_table_after_sort.1.as_ref().unwrap().clone(); + let first_table_fields = first_table.fields(); + + let second_table = second_table_after_sort.1.as_ref().unwrap().clone(); + let second_table_fields = second_table.fields(); + assert_eq!( first_table_after_sort.0 .1, expected_table_order.first().unwrap().name @@ -289,27 +335,27 @@ mod tests { expected_table_order.get(1).unwrap().name ); assert_eq!( - &first_table_after_sort.1.fields().first().unwrap().name, + &first_table_fields.first().unwrap().name, columns_order_1.first().unwrap() ); assert_eq!( - &first_table_after_sort.1.fields().get(1).unwrap().name, + &first_table_fields.get(1).unwrap().name, columns_order_1.get(1).unwrap() ); assert_eq!( - &first_table_after_sort.1.fields().get(2).unwrap().name, + &first_table_fields.get(2).unwrap().name, columns_order_1.get(2).unwrap() ); assert_eq!( - &second_table_after_sort.1.fields().first().unwrap().name, + &second_table_fields.first().unwrap().name, columns_order_2.first().unwrap() ); assert_eq!( - &second_table_after_sort.1.fields().get(1).unwrap().name, + &second_table_fields.get(1).unwrap().name, columns_order_2.get(1).unwrap() ); assert_eq!( - &second_table_after_sort.1.fields().get(2).unwrap().name, + &second_table_fields.get(2).unwrap().name, columns_order_2.get(2).unwrap() ); } diff --git a/dozer-ingestion/postgres/src/schema/tests.rs b/dozer-ingestion/postgres/src/schema/tests.rs index d7ffe05209..1a31816b2d 100644 --- a/dozer-ingestion/postgres/src/schema/tests.rs +++ b/dozer-ingestion/postgres/src/schema/tests.rs @@ -152,9 +152,13 @@ async fn test_connector_view_cannot_be_used() { }; let result = schema_helper.get_schemas(&[table_info]).await; - assert!(result.is_err()); + assert!( + result.as_ref().unwrap().first().unwrap().is_err(), + "Result is not an error. Result: {:?}", + result + ); assert!(matches!( - result, + result.unwrap().first().unwrap(), Err(PostgresConnectorError::PostgresSchemaError( PostgresSchemaError::UnsupportedTableType(_, _) )) @@ -166,7 +170,7 @@ async fn test_connector_view_cannot_be_used() { columns: Some(vec![]), }; let result = schema_helper.get_schemas(&[table_info]).await; - assert!(result.is_ok()); + assert!(result.unwrap().first().unwrap().is_ok()); client.drop_schema(&schema).await; }