diff --git a/dozer-admin/src/services/connection_service.rs b/dozer-admin/src/services/connection_service.rs index f66d505785..1fc08c38be 100644 --- a/dozer-admin/src/services/connection_service.rs +++ b/dozer-admin/src/services/connection_service.rs @@ -34,7 +34,7 @@ impl ConnectionService { ) -> Result, ErrorResponse> { let res = thread::spawn(|| { let connector = get_connector(connection).map_err(|err| err.to_string())?; - connector.get_tables(None).map_err(|err| err.to_string()) + connector.get_tables().map_err(|err| err.to_string()) }) .join() .unwrap(); diff --git a/dozer-core/src/dag_impl.rs b/dozer-core/src/dag_impl.rs index e15273792f..f85a2bc4d8 100644 --- a/dozer-core/src/dag_impl.rs +++ b/dozer-core/src/dag_impl.rs @@ -392,7 +392,7 @@ fn contains_port( } NodeKind::Source(s) => { if direction == PortDirection::Output { - s.get_output_ports()?.iter().any(|e| e.handle == port) + s.get_output_ports().iter().any(|e| e.handle == port) } else { false } diff --git a/dozer-core/src/dag_schemas.rs b/dozer-core/src/dag_schemas.rs index bee4b3f54b..78d78a7ed2 100644 --- a/dozer-core/src/dag_schemas.rs +++ b/dozer-core/src/dag_schemas.rs @@ -203,7 +203,7 @@ fn populate_schemas( match &node.kind { NodeKind::Source(source) => { - let ports = source.get_output_ports()?; + let ports = source.get_output_ports(); for edge in dag.graph().edges(node_index) { let port = find_output_port_def(&ports, edge); diff --git a/dozer-core/src/errors.rs b/dozer-core/src/errors.rs index 9b79704c97..af2129aa44 100644 --- a/dozer-core/src/errors.rs +++ b/dozer-core/src/errors.rs @@ -28,10 +28,6 @@ pub enum ExecutionError { InvalidDatabase, #[error("Field not found at position {0}")] FieldNotFound(String), - #[error("Port not found in source for schema_id: {0}.")] - PortNotFound(String), - #[error("Replication type not found")] - ReplicationTypeNotFound, #[error("Record not found")] RecordNotFound(), #[error("Node {node} has incompatible {typ:?} schemas: {source}")] diff --git a/dozer-core/src/node.rs b/dozer-core/src/node.rs index 49c58e84e1..7dd63e288b 100644 --- a/dozer-core/src/node.rs +++ b/dozer-core/src/node.rs @@ -47,7 +47,7 @@ impl OutputPortDef { pub trait SourceFactory: Send + Sync + Debug { fn get_output_schema(&self, port: &PortHandle) -> Result<(Schema, T), ExecutionError>; - fn get_output_ports(&self) -> Result, ExecutionError>; + fn get_output_ports(&self) -> Vec; fn build( &self, output_schemas: HashMap, diff --git a/dozer-core/src/tests/app.rs b/dozer-core/src/tests/app.rs index 102f3311ac..a636d1ce14 100644 --- a/dozer-core/src/tests/app.rs +++ b/dozer-core/src/tests/app.rs @@ -36,7 +36,7 @@ impl SourceFactory for NoneSourceFactory { todo!() } - fn get_output_ports(&self) -> Result, ExecutionError> { + fn get_output_ports(&self) -> Vec { todo!() } diff --git a/dozer-core/src/tests/dag_base_create_errors.rs b/dozer-core/src/tests/dag_base_create_errors.rs index 3814bebfa9..bc57c26fe4 100644 --- a/dozer-core/src/tests/dag_base_create_errors.rs +++ b/dozer-core/src/tests/dag_base_create_errors.rs @@ -54,11 +54,11 @@ impl SourceFactory for CreateErrSourceFactory { )) } - fn get_output_ports(&self) -> Result, ExecutionError> { - Ok(vec![OutputPortDef::new( + fn get_output_ports(&self) -> Vec { + vec![OutputPortDef::new( DEFAULT_PORT_HANDLE, OutputPortType::Stateless, - )]) + )] } fn build( diff --git a/dozer-core/src/tests/dag_base_errors.rs b/dozer-core/src/tests/dag_base_errors.rs index 9680765896..dc5f6d6d25 100644 --- a/dozer-core/src/tests/dag_base_errors.rs +++ b/dozer-core/src/tests/dag_base_errors.rs @@ -320,11 +320,11 @@ impl SourceFactory for ErrGeneratorSourceFactory { )) } - fn get_output_ports(&self) -> Result, ExecutionError> { - Ok(vec![OutputPortDef::new( + fn get_output_ports(&self) -> Vec { + vec![OutputPortDef::new( GENERATOR_SOURCE_OUTPUT_PORT, OutputPortType::Stateless, - )]) + )] } fn build( diff --git a/dozer-core/src/tests/dag_ports.rs b/dozer-core/src/tests/dag_ports.rs index 4d24c67a2a..a103edac8c 100644 --- a/dozer-core/src/tests/dag_ports.rs +++ b/dozer-core/src/tests/dag_ports.rs @@ -28,12 +28,11 @@ impl SourceFactory for DynPortsSourceFactory { todo!() } - fn get_output_ports(&self) -> Result, ExecutionError> { - Ok(self - .output_ports + fn get_output_ports(&self) -> Vec { + self.output_ports .iter() .map(|p| OutputPortDef::new(*p, OutputPortType::Stateless)) - .collect()) + .collect() } fn build( diff --git a/dozer-core/src/tests/dag_recordreader_update.rs b/dozer-core/src/tests/dag_recordreader_update.rs index 4f3a888f18..b2b79a9754 100644 --- a/dozer-core/src/tests/dag_recordreader_update.rs +++ b/dozer-core/src/tests/dag_recordreader_update.rs @@ -72,8 +72,8 @@ impl SourceFactory for GeneratorSourceFactory { )) } - fn get_output_ports(&self) -> Result, ExecutionError> { - Ok(vec![OutputPortDef::new( + fn get_output_ports(&self) -> Vec { + vec![OutputPortDef::new( GENERATOR_SOURCE_OUTPUT_PORT, if self.stateful { OutputPortType::StatefulWithPrimaryKeyLookup { @@ -83,7 +83,7 @@ impl SourceFactory for GeneratorSourceFactory { } else { OutputPortType::Stateless }, - )]) + )] } fn build( diff --git a/dozer-core/src/tests/dag_schemas.rs b/dozer-core/src/tests/dag_schemas.rs index c3d918f070..c13b3cc8c9 100644 --- a/dozer-core/src/tests/dag_schemas.rs +++ b/dozer-core/src/tests/dag_schemas.rs @@ -62,11 +62,11 @@ impl SourceFactory for TestUsersSourceFactory { )) } - fn get_output_ports(&self) -> Result, ExecutionError> { - Ok(vec![OutputPortDef::new( + fn get_output_ports(&self) -> Vec { + vec![OutputPortDef::new( DEFAULT_PORT_HANDLE, OutputPortType::Stateless, - )]) + )] } fn build( @@ -110,11 +110,11 @@ impl SourceFactory for TestCountriesSourceFactory { )) } - fn get_output_ports(&self) -> Result, ExecutionError> { - Ok(vec![OutputPortDef::new( + fn get_output_ports(&self) -> Vec { + vec![OutputPortDef::new( DEFAULT_PORT_HANDLE, OutputPortType::Stateless, - )]) + )] } fn build( diff --git a/dozer-core/src/tests/sources.rs b/dozer-core/src/tests/sources.rs index ad657e7c13..350d534b20 100644 --- a/dozer-core/src/tests/sources.rs +++ b/dozer-core/src/tests/sources.rs @@ -64,8 +64,8 @@ impl SourceFactory for GeneratorSourceFactory { )) } - fn get_output_ports(&self) -> Result, ExecutionError> { - Ok(vec![OutputPortDef::new( + fn get_output_ports(&self) -> Vec { + vec![OutputPortDef::new( GENERATOR_SOURCE_OUTPUT_PORT, if self.stateful { OutputPortType::StatefulWithPrimaryKeyLookup { @@ -75,7 +75,7 @@ impl SourceFactory for GeneratorSourceFactory { } else { OutputPortType::Stateless }, - )]) + )] } fn build( @@ -188,8 +188,8 @@ impl SourceFactory for DualPortGeneratorSourceFactory { )) } - fn get_output_ports(&self) -> Result, ExecutionError> { - Ok(vec![ + fn get_output_ports(&self) -> Vec { + vec![ OutputPortDef::new( DUAL_PORT_GENERATOR_SOURCE_OUTPUT_PORT_1, if self.stateful { @@ -212,7 +212,7 @@ impl SourceFactory for DualPortGeneratorSourceFactory { OutputPortType::Stateless }, ), - ]) + ] } fn build( @@ -335,15 +335,15 @@ impl SourceFactory for NoPkGeneratorSourceFactory { )) } - fn get_output_ports(&self) -> Result, ExecutionError> { - Ok(vec![OutputPortDef::new( + fn get_output_ports(&self) -> Vec { + vec![OutputPortDef::new( GENERATOR_SOURCE_OUTPUT_PORT, if self.stateful { OutputPortType::AutogenRowKeyLookup } else { OutputPortType::Stateless }, - )]) + )] } fn build( @@ -417,11 +417,11 @@ impl SourceFactory for ConnectivityTestSourceFactory { unimplemented!("This struct is for connectivity test, only output ports are defined") } - fn get_output_ports(&self) -> Result, ExecutionError> { - Ok(vec![OutputPortDef::new( + fn get_output_ports(&self) -> Vec { + vec![OutputPortDef::new( DEFAULT_PORT_HANDLE, OutputPortType::Stateless, - )]) + )] } fn build( diff --git a/dozer-ingestion/benches/helper.rs b/dozer-ingestion/benches/helper.rs index 8c9326cbde..63c1bd3a55 100644 --- a/dozer-ingestion/benches/helper.rs +++ b/dozer-ingestion/benches/helper.rs @@ -42,7 +42,7 @@ pub async fn get_connection_iterator(config: TestConfig) -> IngestionIterator { std::thread::spawn(move || { let grpc_connector = dozer_ingestion::connectors::get_connector(config.connection).unwrap(); - let mut tables = grpc_connector.get_tables(None).unwrap(); + let mut tables = grpc_connector.get_tables().unwrap(); if let Some(tables_filter) = config.tables_filter { tables.retain(|t| tables_filter.contains(&t.name)); } diff --git a/dozer-ingestion/examples/postgres/main.rs b/dozer-ingestion/examples/postgres/main.rs index f4fa2459be..9321d511f9 100644 --- a/dozer-ingestion/examples/postgres/main.rs +++ b/dozer-ingestion/examples/postgres/main.rs @@ -13,8 +13,6 @@ fn main() { let (ingestor, mut iterator) = Ingestor::initialize_channel(IngestionConfig::default()); let tables = vec![TableInfo { name: "users".to_string(), - table_name: "users".to_string(), - id: 0, columns: None, }]; let postgres_config = PostgresConfig { diff --git a/dozer-ingestion/src/connectors/ethereum/log/connector.rs b/dozer-ingestion/src/connectors/ethereum/log/connector.rs index 6b4a23a827..bb467f0ac3 100644 --- a/dozer-ingestion/src/connectors/ethereum/log/connector.rs +++ b/dozer-ingestion/src/connectors/ethereum/log/connector.rs @@ -141,7 +141,7 @@ impl Connector for EthLogConnector { let schemas = if let Some(tables) = tables { schemas .iter() - .filter(|s| tables.iter().any(|t| t.table_name == s.name)) + .filter(|s| tables.iter().any(|t| t.name == s.name)) .cloned() .collect() } else { @@ -191,8 +191,8 @@ impl Connector for EthLogConnector { Ok(HashMap::new()) } - fn get_tables(&self, tables: Option<&[TableInfo]>) -> Result, ConnectorError> { - self.get_tables_default(tables) + fn get_tables(&self) -> Result, ConnectorError> { + self.get_tables_default() } fn can_start_from(&self, _last_checkpoint: (u64, u64)) -> Result { diff --git a/dozer-ingestion/src/connectors/ethereum/log/helper.rs b/dozer-ingestion/src/connectors/ethereum/log/helper.rs index cd179d2713..ded5adb915 100644 --- a/dozer-ingestion/src/connectors/ethereum/log/helper.rs +++ b/dozer-ingestion/src/connectors/ethereum/log/helper.rs @@ -105,7 +105,7 @@ pub fn decode_event( .to_owned(); let table_name = get_table_name(contract_tuple, &event.name); - let is_table_required = tables.iter().any(|t| t.table_name == table_name); + let is_table_required = tables.iter().any(|t| t.name == table_name); if is_table_required { let parsed_event = event.parse_log(RawLog { topics: log.topics, @@ -172,10 +172,7 @@ pub fn map_abitype_to_field(f: web3::ethabi::Token) -> Field { } pub fn map_log_to_event(log: Log, details: Arc) -> Option { // Check if table is requested - let is_table_required = details - .tables - .iter() - .any(|t| t.table_name == ETH_LOGS_TABLE); + let is_table_required = details.tables.iter().any(|t| t.name == ETH_LOGS_TABLE); if !is_table_required { None diff --git a/dozer-ingestion/src/connectors/ethereum/trace/connector.rs b/dozer-ingestion/src/connectors/ethereum/trace/connector.rs index a9bf6a8dd9..4c15307695 100644 --- a/dozer-ingestion/src/connectors/ethereum/trace/connector.rs +++ b/dozer-ingestion/src/connectors/ethereum/trace/connector.rs @@ -69,8 +69,8 @@ impl Connector for EthTraceConnector { Ok(HashMap::new()) } - fn get_tables(&self, tables: Option<&[TableInfo]>) -> Result, ConnectorError> { - self.get_tables_default(tables) + fn get_tables(&self) -> Result, ConnectorError> { + self.get_tables_default() } fn can_start_from(&self, _last_checkpoint: (u64, u64)) -> Result { diff --git a/dozer-ingestion/src/connectors/grpc/adapter/arrow.rs b/dozer-ingestion/src/connectors/grpc/adapter/arrow.rs index d3de07c985..50dc7617fd 100644 --- a/dozer-ingestion/src/connectors/grpc/adapter/arrow.rs +++ b/dozer-ingestion/src/connectors/grpc/adapter/arrow.rs @@ -44,7 +44,7 @@ impl IngestAdapter for ArrowAdapter { serde_json::from_str(schemas_str).map_err(ConnectorError::map_serialization_error)?; let mut schemas = vec![]; - for (id, grpc_schema) in grpc_schemas.iter().enumerate() { + for (id, grpc_schema) in grpc_schemas.into_iter().enumerate() { let mut schema = arrow_types::from_arrow::map_schema_to_dozer(&grpc_schema.schema) .map_err(|e| ConnectorError::InternalError(Box::new(e)))?; schema.identifier = Some(SchemaIdentifier { @@ -53,7 +53,7 @@ impl IngestAdapter for ArrowAdapter { }); schemas.push(SourceSchema { - name: grpc_schema.name.clone(), + name: grpc_schema.name, schema, replication_type: grpc_schema.replication_type.clone(), }); diff --git a/dozer-ingestion/src/connectors/grpc/connector.rs b/dozer-ingestion/src/connectors/grpc/connector.rs index bf7e72198b..9109871177 100644 --- a/dozer-ingestion/src/connectors/grpc/connector.rs +++ b/dozer-ingestion/src/connectors/grpc/connector.rs @@ -174,8 +174,8 @@ where Ok(results) } - fn get_tables(&self, tables: Option<&[TableInfo]>) -> Result, ConnectorError> { - self.get_tables_default(tables) + fn get_tables(&self) -> Result, ConnectorError> { + self.get_tables_default() } fn can_start_from(&self, _last_checkpoint: (u64, u64)) -> Result { diff --git a/dozer-ingestion/src/connectors/grpc/tests.rs b/dozer-ingestion/src/connectors/grpc/tests.rs index 9b9065fb31..c2bb7f3312 100644 --- a/dozer-ingestion/src/connectors/grpc/tests.rs +++ b/dozer-ingestion/src/connectors/grpc/tests.rs @@ -43,7 +43,7 @@ async fn ingest_grpc( }) .unwrap(); - let tables = grpc_connector.get_tables(None).unwrap(); + let tables = grpc_connector.get_tables().unwrap(); grpc_connector.start(None, &ingestor, tables).unwrap(); }); diff --git a/dozer-ingestion/src/connectors/kafka/connector.rs b/dozer-ingestion/src/connectors/kafka/connector.rs index f56b82469f..6176ac2ad0 100644 --- a/dozer-ingestion/src/connectors/kafka/connector.rs +++ b/dozer-ingestion/src/connectors/kafka/connector.rs @@ -44,7 +44,7 @@ impl Connector for KafkaConnector { ) -> Result<(), ConnectorError> { let topic = tables .get(0) - .map_or(Err(TopicNotDefined), |table| Ok(&table.table_name))?; + .map_or(Err(TopicNotDefined), |table| Ok(&table.name))?; let broker = self.config.broker.to_owned(); Runtime::new() @@ -60,8 +60,8 @@ impl Connector for KafkaConnector { todo!() } - fn get_tables(&self, tables: Option<&[TableInfo]>) -> Result, ConnectorError> { - self.get_tables_default(tables) + fn get_tables(&self) -> Result, ConnectorError> { + self.get_tables_default() } fn can_start_from(&self, _last_checkpoint: (u64, u64)) -> Result { diff --git a/dozer-ingestion/src/connectors/kafka/debezium/no_schema_registry.rs b/dozer-ingestion/src/connectors/kafka/debezium/no_schema_registry.rs index 443044ab88..6c7f1340a4 100644 --- a/dozer-ingestion/src/connectors/kafka/debezium/no_schema_registry.rs +++ b/dozer-ingestion/src/connectors/kafka/debezium/no_schema_registry.rs @@ -20,7 +20,7 @@ impl NoSchemaRegistry { table_names.map_or(Ok(vec![]), |tables| { tables.get(0).map_or(Ok(vec![]), |table| { let mut con = Consumer::from_hosts(vec![config.broker.clone()]) - .with_topic(table.table_name.clone()) + .with_topic(table.name.clone()) .with_fallback_offset(FetchOffset::Earliest) .with_offset_storage(GroupOffsetStorage::Kafka) .create() @@ -52,7 +52,7 @@ impl NoSchemaRegistry { })?; schemas.push(SourceSchema::new( - table.table_name.clone(), + table.name.clone(), mapped_schema, ReplicationChangesTrackingType::FullChanges, )); diff --git a/dozer-ingestion/src/connectors/kafka/debezium/schema_registry.rs b/dozer-ingestion/src/connectors/kafka/debezium/schema_registry.rs index 7857508403..264e85a33a 100644 --- a/dozer-ingestion/src/connectors/kafka/debezium/schema_registry.rs +++ b/dozer-ingestion/src/connectors/kafka/debezium/schema_registry.rs @@ -86,10 +86,8 @@ impl SchemaRegistry { let sr_settings = SrSettings::new(config.schema_registry_url.unwrap()); table_names.map_or(Ok(vec![]), |tables| { tables.get(0).map_or(Ok(vec![]), |table| { - let key_result = - SchemaRegistry::fetch_struct(&sr_settings, &table.table_name, true)?; - let schema_result = - SchemaRegistry::fetch_struct(&sr_settings, &table.table_name, false)?; + let key_result = SchemaRegistry::fetch_struct(&sr_settings, &table.name, true)?; + let schema_result = SchemaRegistry::fetch_struct(&sr_settings, &table.name, false)?; let pk_fields = key_result.fields.map_or(vec![], |fields| { fields @@ -142,7 +140,7 @@ impl SchemaRegistry { }; schema_data = Some(Ok(vec![SourceSchema::new( - table.table_name.clone(), + table.name.clone(), schema, ReplicationChangesTrackingType::FullChanges, )])); diff --git a/dozer-ingestion/src/connectors/mod.rs b/dozer-ingestion/src/connectors/mod.rs index bd4b7e2655..6e010fb554 100644 --- a/dozer-ingestion/src/connectors/mod.rs +++ b/dozer-ingestion/src/connectors/mod.rs @@ -47,27 +47,22 @@ pub trait Connector: Send + Sync + Debug { ingestor: &Ingestor, tables: Vec, ) -> Result<(), ConnectorError>; - fn get_tables(&self, tables: Option<&[TableInfo]>) -> Result, ConnectorError>; + fn get_tables(&self) -> Result, ConnectorError>; // This is a default table mapping from schemas. It will result in errors if connector has unsupported data types. - fn get_tables_default( - &self, - tables: Option<&[TableInfo]>, - ) -> Result, ConnectorError> { + fn get_tables_default(&self) -> Result, ConnectorError> { Ok(self - .get_schemas(tables.map(|t| t.to_vec()))? - .iter() - .enumerate() - .map(|(id, s)| TableInfo { - name: s.name.to_string(), - table_name: s.name.to_string(), - id: id as u32, + .get_schemas(None)? + .into_iter() + .map(|source_schema| TableInfo { + name: source_schema.name, columns: Some( - s.schema + source_schema + .schema .fields - .iter() + .into_iter() .map(|f| ColumnInfo { - name: f.name.to_string(), + name: f.name, data_type: Some(f.typ.to_string()), }) .collect(), @@ -81,8 +76,6 @@ pub trait Connector: Send + Sync + Debug { #[serde(crate = "self::serde")] pub struct TableInfo { pub name: String, - pub table_name: String, - pub id: u32, pub columns: Option>, } diff --git a/dozer-ingestion/src/connectors/object_store/connector.rs b/dozer-ingestion/src/connectors/object_store/connector.rs index ea8193d367..8469adfa31 100644 --- a/dozer-ingestion/src/connectors/object_store/connector.rs +++ b/dozer-ingestion/src/connectors/object_store/connector.rs @@ -53,7 +53,7 @@ impl Connector for ObjectStoreConnector { TableReader::new(self.config.clone()).read_tables(&tables, ingestor) } - fn get_tables(&self, tables: Option<&[TableInfo]>) -> ConnectorResult> { - self.get_tables_default(tables) + fn get_tables(&self) -> ConnectorResult> { + self.get_tables_default() } } diff --git a/dozer-ingestion/src/connectors/object_store/schema_mapper.rs b/dozer-ingestion/src/connectors/object_store/schema_mapper.rs index a6a42be856..6542cdb5ac 100644 --- a/dozer-ingestion/src/connectors/object_store/schema_mapper.rs +++ b/dozer-ingestion/src/connectors/object_store/schema_mapper.rs @@ -68,8 +68,6 @@ impl Mapper for SchemaMapper { .iter() .map(|t| TableInfo { name: t.name.clone(), - table_name: t.name.clone(), - id: 0, columns: None, }) .collect() @@ -79,7 +77,7 @@ impl Mapper for SchemaMapper { .iter() .enumerate() .map(|(id, table)| { - let table_name = table.table_name.clone(); + let table_name = table.name.clone(); let params = self.config.table_params(&table_name)?; diff --git a/dozer-ingestion/src/connectors/object_store/tests/local_storage_tests.rs b/dozer-ingestion/src/connectors/object_store/tests/local_storage_tests.rs index 247432b415..4d20d37eb8 100644 --- a/dozer-ingestion/src/connectors/object_store/tests/local_storage_tests.rs +++ b/dozer-ingestion/src/connectors/object_store/tests/local_storage_tests.rs @@ -75,8 +75,6 @@ fn test_read_parquet_file() { let table = TableInfo { name: "all_types_parquet".to_string(), - table_name: "all_types_parquet".to_string(), - id: 0, columns: None, }; thread::spawn(move || { @@ -127,8 +125,6 @@ fn test_csv_read() { let table = TableInfo { name: "all_types_csv".to_string(), - table_name: "all_types_csv".to_string(), - id: 0, columns: None, }; @@ -203,9 +199,7 @@ fn test_missing_directory() { None, &ingestor, vec![TableInfo { - name: table.name.clone(), - table_name: table.name, - id: 0, + name: table.name, columns: None, }], ); diff --git a/dozer-ingestion/src/connectors/postgres/connection/validator.rs b/dozer-ingestion/src/connectors/postgres/connection/validator.rs index e7e95f8685..21b97117e1 100644 --- a/dozer-ingestion/src/connectors/postgres/connection/validator.rs +++ b/dozer-ingestion/src/connectors/postgres/connection/validator.rs @@ -135,8 +135,8 @@ fn validate_wal_level(client: &mut Client) -> Result<(), PostgresConnectorError> fn validate_tables_names(table_info: &Vec) -> Result<(), PostgresConnectorError> { let table_regex = Regex::new(r"^([[:lower:]_][[:alnum:]_]*)$").unwrap(); for t in table_info { - if !table_regex.is_match(&t.table_name) { - return Err(TableNameNotValid(t.table_name.clone())); + if !table_regex.is_match(&t.name) { + return Err(TableNameNotValid(t.name.clone())); } } @@ -164,7 +164,7 @@ fn validate_tables( ) -> Result<(), PostgresConnectorError> { let mut tables_names: HashMap = HashMap::new(); table_info.iter().for_each(|t| { - tables_names.insert(t.table_name.clone(), true); + tables_names.insert(t.name.clone(), true); }); validate_tables_names(table_info)?; @@ -209,7 +209,7 @@ fn validate_tables( let columns = table_info .iter() - .find(|x| x.table_name == table_name) + .find(|x| x.name == table_name) .unwrap() .clone() .columns; @@ -295,8 +295,8 @@ pub fn validate_slot( } for t in tables_list { - if !publication_tables.contains(&t.table_name) { - return Err(MissingTableInReplicationSlot(t.table_name.clone())); + if !publication_tables.contains(&t.name) { + return Err(MissingTableInReplicationSlot(t.name.clone())); } } } @@ -428,8 +428,6 @@ mod tests { let tables = vec![TableInfo { name: "not_existing".to_string(), - table_name: "not_existing".to_string(), - id: 0, columns: None, }]; let result = validate_connection("pg_test_conn", config, Some(&tables), None); @@ -478,8 +476,6 @@ mod tests { let tables = vec![TableInfo { name: "existing".to_string(), - table_name: "existing".to_string(), - id: 0, columns: Some(columns), }]; @@ -658,8 +654,6 @@ mod tests { for (table_name, expected_result) in tables_with_result { let res = validate_tables_names(&vec![TableInfo { name: table_name.to_string(), - table_name: table_name.to_string(), - id: 0, columns: None, }]); @@ -679,8 +673,6 @@ mod tests { for (column_name, expected_result) in columns_names_with_result { let res = validate_columns_names(&vec![TableInfo { name: "column_test_table".to_string(), - table_name: "column_test_table".to_string(), - id: 0, columns: Some(vec![ColumnInfo { name: column_name.to_string(), data_type: None, @@ -714,9 +706,7 @@ mod tests { let result = validate_tables( &mut pg_client, &vec![TableInfo { - name: table_name.clone(), - table_name, - id: 0, + name: table_name, columns: None, }], ); @@ -726,9 +716,7 @@ mod tests { let result = validate_tables( &mut pg_client, &vec![TableInfo { - name: view_name.clone(), - table_name: view_name, - id: 0, + name: view_name, columns: None, }], ); diff --git a/dozer-ingestion/src/connectors/postgres/connector.rs b/dozer-ingestion/src/connectors/postgres/connector.rs index eacd92c95b..83f6434a10 100644 --- a/dozer-ingestion/src/connectors/postgres/connector.rs +++ b/dozer-ingestion/src/connectors/postgres/connector.rs @@ -85,7 +85,7 @@ impl Connector for PostgresConnector { table_names: Option>, ) -> Result, ConnectorError> { self.schema_helper - .get_schemas(table_names) + .get_schemas(table_names.as_deref()) .map_err(PostgresConnectorError) } @@ -106,7 +106,7 @@ impl Connector for PostgresConnector { self.name.clone(), self.get_publication_name(), self.get_slot_name(), - tables, + self.schema_helper.get_tables(Some(&tables))?, self.replication_conn_config.clone(), ingestor, self.conn_config.clone(), @@ -130,8 +130,15 @@ impl Connector for PostgresConnector { SchemaHelper::validate(&self.schema_helper, tables).map_err(PostgresConnectorError) } - fn get_tables(&self, _tables: Option<&[TableInfo]>) -> Result, ConnectorError> { - self.schema_helper.get_tables(None) + fn get_tables(&self) -> Result, ConnectorError> { + let tables = self.schema_helper.get_tables(None)?; + Ok(tables + .into_iter() + .map(|table_info| TableInfo { + name: table_info.name, + columns: Some(table_info.columns), + }) + .collect()) } fn can_start_from(&self, (lsn, _): (u64, u64)) -> Result { @@ -162,7 +169,7 @@ impl PostgresConnector { let table_str: String = match self.tables.as_ref() { None => "ALL TABLES".to_string(), Some(arr) => { - let table_names: Vec = arr.iter().map(|t| t.table_name.clone()).collect(); + let table_names: Vec = arr.iter().map(|t| t.name.clone()).collect(); format!("TABLE {}", table_names.join(" , ")) } }; diff --git a/dozer-ingestion/src/connectors/postgres/iterator.rs b/dozer-ingestion/src/connectors/postgres/iterator.rs index 60e4f4b811..1bd1b9f5ed 100644 --- a/dozer-ingestion/src/connectors/postgres/iterator.rs +++ b/dozer-ingestion/src/connectors/postgres/iterator.rs @@ -1,5 +1,4 @@ use crate::connectors::TableInfo; - use crate::errors::{ConnectorError, PostgresConnectorError}; use crate::ingestion::Ingestor; use dozer_types::ingestion_types::IngestionMessage; @@ -20,11 +19,13 @@ use crate::errors::PostgresConnectorError::{ use postgres_types::PgLsn; use tokio::runtime::Runtime; +use super::schema::helper::PostgresTableInfo; + pub struct Details { name: String, publication_name: String, slot_name: String, - tables: Vec, + tables: Vec, replication_conn_config: tokio_postgres::Config, conn_config: tokio_postgres::Config, } @@ -49,7 +50,7 @@ impl<'a> PostgresIterator<'a> { name: String, publication_name: String, slot_name: String, - tables: Vec, + tables: Vec, replication_conn_config: tokio_postgres::Config, ingestor: &'a Ingestor, conn_config: tokio_postgres::Config, @@ -122,7 +123,6 @@ impl<'a> PostgresIteratorHandler<'a> { // - When snapshot replication is not completed // - When there is gap between available lsn (in case when slot dropped and new created) and last lsn // - When publication tables changes - let tables = details.tables.clone(); if self.lsn.clone().into_inner().is_none() { debug!("\nCreating Slot...."); let slot_exist = @@ -161,19 +161,26 @@ impl<'a> PostgresIteratorHandler<'a> { debug!("\nInitializing snapshots..."); let snapshotter = PostgresSnapshotter { - tables: details.tables.clone(), conn_config: details.conn_config.to_owned(), ingestor: self.ingestor, connector_id: self.connector_id, }; - snapshotter.sync_tables(details.tables.clone())?; + let tables = details + .tables + .iter() + .map(|table_info| TableInfo { + name: table_info.name.clone(), + columns: Some(table_info.columns.clone()), + }) + .collect::>(); + snapshotter.sync_tables(&tables)?; let lsn = self.lsn.borrow().map_or(0, |(lsn, _)| u64::from(lsn)); self.ingestor .handle_message(IngestionMessage::new_snapshotting_done(lsn, 0)) .map_err(ConnectorError::IngestorError)?; - debug!("\nInitialized with tables: {:?}", tables); + debug!("\nInitialized with tables: {:?}", details.tables); client.borrow_mut().simple_query("COMMIT;").map_err(|_e| { debug!("failed to commit txn for replication"); @@ -184,10 +191,10 @@ impl<'a> PostgresIteratorHandler<'a> { self.state.replace(ReplicationState::Replicating); /* #################### Replicating ###################### */ - self.replicate(tables) + self.replicate() } - fn replicate(&self, tables: Vec) -> Result<(), ConnectorError> { + fn replicate(&self) -> Result<(), ConnectorError> { let rt = Runtime::new().unwrap(); let lsn = self.lsn.borrow(); let (lsn, offset) = lsn @@ -196,6 +203,7 @@ impl<'a> PostgresIteratorHandler<'a> { let publication_name = self.details.publication_name.clone(); let slot_name = self.details.slot_name.clone(); + let tables = self.details.tables.clone(); rt.block_on(async { let mut replicator = CDCHandler { replication_conn_config: self.details.replication_conn_config.clone(), diff --git a/dozer-ingestion/src/connectors/postgres/replicator.rs b/dozer-ingestion/src/connectors/postgres/replicator.rs index 022cc6e868..a39670e73d 100644 --- a/dozer-ingestion/src/connectors/postgres/replicator.rs +++ b/dozer-ingestion/src/connectors/postgres/replicator.rs @@ -14,13 +14,12 @@ use futures::StreamExt; use postgres_protocol::message::backend::ReplicationMessage::*; use postgres_protocol::message::backend::{LogicalReplicationMessage, ReplicationMessage}; use postgres_types::PgLsn; -use std::collections::HashMap; -use crate::connectors::{ColumnInfo, TableInfo}; use std::time::SystemTime; use tokio_postgres::replication::LogicalReplicationStream; use tokio_postgres::Error; +use super::schema::helper::PostgresTableInfo; use super::xlog_mapper::MappedReplicationMessage; pub struct CDCHandler<'a> { @@ -42,7 +41,7 @@ pub struct CDCHandler<'a> { } impl<'a> CDCHandler<'a> { - pub async fn start(&mut self, tables: Vec) -> Result<(), ConnectorError> { + pub async fn start(&mut self, tables: Vec) -> Result<(), ConnectorError> { let replication_conn_config = self.replication_conn_config.clone(); let client: tokio_postgres::Client = helper::async_connect(replication_conn_config).await?; @@ -72,10 +71,10 @@ impl<'a> CDCHandler<'a> { .map_err(|e| ConnectorError::InternalError(Box::new(e)))?; let stream = LogicalReplicationStream::new(copy_stream); - let mut tables_columns: HashMap> = HashMap::new(); - tables.iter().for_each(|t| { - tables_columns.insert(t.id, t.clone().columns.map_or(vec![], |t| t)); - }); + let tables_columns = tables + .into_iter() + .map(|table_info| (table_info.id, table_info.columns)) + .collect(); let mut mapper = XlogMapper::new(tables_columns); tokio::pin!(stream); diff --git a/dozer-ingestion/src/connectors/postgres/schema/helper.rs b/dozer-ingestion/src/connectors/postgres/schema/helper.rs index 1363bb1bc8..b3f29fbc4d 100644 --- a/dozer-ingestion/src/connectors/postgres/schema/helper.rs +++ b/dozer-ingestion/src/connectors/postgres/schema/helper.rs @@ -28,13 +28,12 @@ pub struct SchemaHelper { schema: String, } -pub struct PostgresTableRow { - pub table_name: String, - pub column_name: String, - pub field: FieldDefinition, - pub is_column_used_in_index: bool, - pub table_id: u32, - pub replication_type: String, +struct PostgresTableRow { + table_name: String, + field: FieldDefinition, + is_column_used_in_index: bool, + table_id: u32, + replication_type: String, } #[derive(Clone)] @@ -87,6 +86,13 @@ impl PostgresTable { } } +#[derive(Debug, Clone)] +pub struct PostgresTableInfo { + pub name: String, + pub id: u32, + pub columns: Vec, +} + type RowsWithColumnsMap = (Vec, HashMap>); impl SchemaHelper { @@ -101,11 +107,10 @@ impl SchemaHelper { pub fn get_tables( &self, tables: Option<&[TableInfo]>, - ) -> Result, ConnectorError> { + ) -> Result, ConnectorError> { let (results, tables_columns_map) = self.get_columns(tables)?; - let mut columns_map: HashMap> = HashMap::new(); - let mut tables_id: HashMap = HashMap::new(); + let mut id_columns_map: HashMap)> = HashMap::new(); for row in results { let table_name: String = row.get(0); let column_name: String = row.get(1); @@ -122,26 +127,36 @@ impl SchemaHelper { }); if add_column_table { - let vals = columns_map.get(&table_name); - let mut columns = vals.map_or_else(Vec::new, |columns| columns.clone()); - - columns.push(ColumnInfo { - name: column_name, - data_type: None, - }); - - columns_map.insert(table_name.clone(), columns); - tables_id.insert(table_name, table_id); + match id_columns_map.get_mut(&table_name) { + Some((id, columns)) => { + columns.push(ColumnInfo { + name: column_name, + data_type: None, + }); + *id = table_id; + } + None => { + id_columns_map.insert( + table_name.clone(), + ( + table_id, + vec![ColumnInfo { + name: column_name, + data_type: None, + }], + ), + ); + } + } } } - Ok(columns_map - .iter() - .map(|(table_name, columns)| TableInfo { - name: table_name.clone(), - table_name: table_name.clone(), - id: *tables_id.get(&table_name.clone()).unwrap(), - columns: Some(columns.clone()), + Ok(id_columns_map + .into_iter() + .map(|(table_name, (id, columns))| PostgresTableInfo { + name: table_name, + id, + columns, }) .collect()) } @@ -157,12 +172,12 @@ impl SchemaHelper { tables.iter().for_each(|t| { if let Some(columns) = t.columns.clone() { tables_columns_map.insert( - t.table_name.clone(), + t.name.clone(), columns.iter().map(|c| c.name.clone()).collect(), ); } }); - let table_names: Vec = tables.iter().map(|t| t.table_name.clone()).collect(); + let table_names: Vec = tables.iter().map(|t| t.name.clone()).collect(); let sql = str::replace(SQL, ":tables_name_condition", "t.table_name = ANY($2)"); client.query(&sql, &[&schema, &table_names]) } else { @@ -177,9 +192,9 @@ impl SchemaHelper { pub fn get_schemas( &self, - tables: Option>, + tables: Option<&[TableInfo]>, ) -> Result, PostgresConnectorError> { - let (results, tables_columns_map) = self.get_columns(tables.as_deref())?; + let (results, tables_columns_map) = self.get_columns(tables)?; let mut columns_map: HashMap = HashMap::new(); results @@ -209,7 +224,7 @@ impl SchemaHelper { Ok(()) })?; - let columns_map = sort_schemas(tables.as_ref().map(Vec::as_ref), &columns_map)?; + let columns_map = sort_schemas(tables, &columns_map)?; Self::map_columns_to_schemas(columns_map) .map_err(PostgresConnectorError::PostgresSchemaError) @@ -302,7 +317,7 @@ impl SchemaHelper { for table in tables { if let Some(columns) = &table.columns { let mut existing_columns = HashMap::new(); - if let Some(res) = validation_result.get(&table.table_name) { + if let Some(res) = validation_result.get(&table.name) { for (col_name, _) in res { if let Some(name) = col_name { existing_columns.insert(name.clone(), ()); @@ -313,14 +328,14 @@ impl SchemaHelper { for ColumnInfo { name, .. } in columns { if existing_columns.get(name).is_none() { validation_result - .entry(table.table_name.clone()) + .entry(table.name.clone()) .and_modify(|r| { r.push(( None, Err(ConnectorError::PostgresConnectorError( PostgresConnectorError::ColumnNotFound( name.to_string(), - table.table_name.clone(), + table.name.clone(), ), )), )) @@ -366,7 +381,6 @@ impl SchemaHelper { Ok(PostgresTableRow { table_name, - column_name: column_name.clone(), field: FieldDefinition::new(column_name, typ, is_nullable, SourceDefinition::Dynamic), is_column_used_in_index, table_id, diff --git a/dozer-ingestion/src/connectors/postgres/schema/sorter.rs b/dozer-ingestion/src/connectors/postgres/schema/sorter.rs index ba000c1c9a..24ca7a5d45 100644 --- a/dozer-ingestion/src/connectors/postgres/schema/sorter.rs +++ b/dozer-ingestion/src/connectors/postgres/schema/sorter.rs @@ -19,7 +19,7 @@ pub fn sort_schemas( |tables| { let mut sorted_tables: Vec<(String, PostgresTable)> = Vec::new(); for table in tables.iter() { - let postgres_table = mapped_tables.get(&table.table_name).ok_or(ColumnNotFound)?; + let postgres_table = mapped_tables.get(&table.name).ok_or(ColumnNotFound)?; let sorted_table = table.columns.as_ref().map_or_else( || Ok(postgres_table.clone()), @@ -40,7 +40,7 @@ pub fn sort_schemas( }, )?; - sorted_tables.push((table.table_name.clone(), sorted_table)); + sorted_tables.push((table.name.clone(), sorted_table)); } Ok(sorted_tables) @@ -152,8 +152,6 @@ mod tests { let expected_table_order = &[TableInfo { name: "sort_test".to_string(), - table_name: "sort_test".to_string(), - id: 0, columns: None, }]; @@ -197,8 +195,6 @@ mod tests { }]; let expected_table_order = &[TableInfo { name: "sort_test".to_string(), - table_name: "sort_test".to_string(), - id: 0, columns: Some(columns_order.clone()), }]; @@ -232,8 +228,6 @@ mod tests { ]; let expected_table_order = &[TableInfo { name: "sort_test".to_string(), - table_name: "sort_test".to_string(), - id: 0, columns: Some(columns_order.clone()), }]; @@ -292,14 +286,10 @@ mod tests { let expected_table_order = &[ TableInfo { name: "sort_test_first".to_string(), - table_name: "sort_test_first".to_string(), - id: 0, columns: Some(columns_order_1.clone()), }, TableInfo { name: "sort_test_second".to_string(), - table_name: "sort_test_second".to_string(), - id: 0, columns: Some(columns_order_2.clone()), }, ]; diff --git a/dozer-ingestion/src/connectors/postgres/schema/tests.rs b/dozer-ingestion/src/connectors/postgres/schema/tests.rs index 253a346425..9e05febda3 100644 --- a/dozer-ingestion/src/connectors/postgres/schema/tests.rs +++ b/dozer-ingestion/src/connectors/postgres/schema/tests.rs @@ -10,7 +10,7 @@ use serial_test::serial; use std::collections::HashSet; use std::hash::Hash; -fn assert_vec_eq(a: Vec, b: Vec) -> bool +fn assert_vec_eq(a: &[T], b: &[T]) -> bool where T: Eq + Hash, { @@ -39,15 +39,15 @@ fn test_connector_get_tables() { let result = schema_helper.get_tables(None).unwrap(); let table = result.get(0).unwrap(); - assert_eq!(table_name, table.table_name.clone()); + assert_eq!(table_name, table.name); assert!(assert_vec_eq( - vec![ + &[ ColumnInfo::new("name".to_string(), None), ColumnInfo::new("description".to_string(), None), ColumnInfo::new("weight".to_string(), None), ColumnInfo::new("id".to_string(), None), ], - table.columns.clone().unwrap() + &table.columns )); client.drop_schema(&schema); @@ -72,8 +72,6 @@ fn test_connector_get_schema_with_selected_columns() { let schema_helper = SchemaHelper::new(client.postgres_config.clone(), Some(schema.clone())); let table_info = TableInfo { name: table_name.clone(), - table_name: table_name.clone(), - id: 0, columns: Some(vec![ ColumnInfo::new("name".to_string(), None), ColumnInfo::new("id".to_string(), None), @@ -82,13 +80,13 @@ fn test_connector_get_schema_with_selected_columns() { let result = schema_helper.get_tables(Some(&[table_info])).unwrap(); let table = result.get(0).unwrap(); - assert_eq!(table_name, table.table_name.clone()); + assert_eq!(table_name, table.name); assert!(assert_vec_eq( - vec![ + &[ ColumnInfo::new("name".to_string(), None), ColumnInfo::new("id".to_string(), None) ], - table.columns.clone().unwrap() + &table.columns )); client.drop_schema(&schema); @@ -113,22 +111,20 @@ fn test_connector_get_schema_without_selected_columns() { let schema_helper = SchemaHelper::new(client.postgres_config.clone(), Some(schema.clone())); let table_info = TableInfo { name: table_name.clone(), - table_name: table_name.clone(), - id: 0, columns: Some(vec![]), }; let result = schema_helper.get_tables(Some(&[table_info])).unwrap(); let table = result.get(0).unwrap(); - assert_eq!(table_name, table.table_name.clone()); + assert_eq!(table_name, table.name.clone()); assert!(assert_vec_eq( - vec![ + &[ ColumnInfo::new("id".to_string(), None), ColumnInfo::new("name".to_string(), None), ColumnInfo::new("description".to_string(), None), ColumnInfo::new("weight".to_string(), None), ], - table.columns.clone().unwrap() + &table.columns )); client.drop_schema(&schema); @@ -154,13 +150,11 @@ fn test_connector_view_cannot_be_used() { let schema_helper = SchemaHelper::new(client.postgres_config.clone(), Some(schema.clone())); let table_info = TableInfo { - name: view_name.clone(), - table_name: view_name.clone(), - id: 0, + name: view_name, columns: Some(vec![]), }; - let result = schema_helper.get_schemas(Some(vec![table_info])); + let result = schema_helper.get_schemas(Some(&[table_info])); assert!(result.is_err()); assert!(matches!( result, @@ -168,12 +162,10 @@ fn test_connector_view_cannot_be_used() { )); let table_info = TableInfo { - name: table_name.clone(), - table_name: table_name.clone(), - id: 0, + name: table_name, columns: Some(vec![]), }; - let result = schema_helper.get_schemas(Some(vec![table_info])); + let result = schema_helper.get_schemas(Some(&[table_info])); assert!(result.is_ok()); client.drop_schema(&schema); diff --git a/dozer-ingestion/src/connectors/postgres/snapshotter.rs b/dozer-ingestion/src/connectors/postgres/snapshotter.rs index 3c9ef12089..ae5ba58703 100644 --- a/dozer-ingestion/src/connectors/postgres/snapshotter.rs +++ b/dozer-ingestion/src/connectors/postgres/snapshotter.rs @@ -20,14 +20,13 @@ use dozer_types::ingestion_types::IngestionMessage; use dozer_types::types::Operation; pub struct PostgresSnapshotter<'a> { - pub tables: Vec, pub conn_config: tokio_postgres::Config, pub ingestor: &'a Ingestor, pub connector_id: u64, } impl<'a> PostgresSnapshotter<'a> { - pub fn get_tables(&self, tables: Vec) -> Result, ConnectorError> { + pub fn get_tables(&self, tables: &[TableInfo]) -> Result, ConnectorError> { let helper = SchemaHelper::new(self.conn_config.clone(), None); helper .get_schemas(Some(tables)) @@ -85,7 +84,7 @@ impl<'a> PostgresSnapshotter<'a> { Ok(()) } - pub fn sync_tables(&self, tables: Vec) -> Result<(), ConnectorError> { + pub fn sync_tables(&self, tables: &[TableInfo]) -> Result<(), ConnectorError> { let tables = self.get_tables(tables)?; let mut left_tables_count = tables.len(); @@ -175,8 +174,6 @@ mod tests { let tables = vec![TableInfo { name: table_name.clone(), - table_name: table_name.clone(), - id: 0, columns: None, }]; @@ -191,9 +188,7 @@ mod tests { let connector = PostgresConnector::new(1, postgres_config); let input_tables = vec![TableInfo { - name: table_name.clone(), - table_name: table_name.clone(), - id: 0, + name: table_name, columns: None, }]; @@ -201,13 +196,12 @@ mod tests { let (ingestor, mut iterator) = Ingestor::initialize_channel(ingestion_config); let snapshotter = PostgresSnapshotter { - tables, conn_config, ingestor: &ingestor, connector_id: connector.id, }; - let actual = snapshotter.sync_tables(input_tables); + let actual = snapshotter.sync_tables(&input_tables); assert!(actual.is_ok()); @@ -250,9 +244,7 @@ mod tests { test_client.insert_rows(&table_name, 2, None); let tables = vec![TableInfo { - name: table_name.clone(), - table_name: table_name.clone(), - id: 0, + name: table_name, columns: None, }]; @@ -268,9 +260,7 @@ mod tests { let input_table_name = String::from("not_existing_table"); let input_tables = vec![TableInfo { - name: input_table_name.clone(), - table_name: input_table_name, - id: 0, + name: input_table_name, columns: None, }]; @@ -278,13 +268,12 @@ mod tests { let (ingestor, mut _iterator) = Ingestor::initialize_channel(ingestion_config); let snapshotter = PostgresSnapshotter { - tables, conn_config, ingestor: &ingestor, connector_id: connector.id, }; - let actual = snapshotter.sync_tables(input_tables); + let actual = snapshotter.sync_tables(&input_tables); assert!(actual.is_err()); @@ -316,8 +305,6 @@ mod tests { let tables = vec![TableInfo { name: table_name.clone(), - table_name: table_name.clone(), - id: 0, columns: None, }]; @@ -332,9 +319,7 @@ mod tests { let connector = PostgresConnector::new(1, postgres_config); let input_tables = vec![TableInfo { - name: table_name.clone(), - table_name, - id: 0, + name: table_name, columns: None, }]; @@ -342,13 +327,12 @@ mod tests { let (ingestor, mut _iterator) = Ingestor::initialize_channel(ingestion_config); let snapshotter = PostgresSnapshotter { - tables, conn_config, ingestor: &ingestor, connector_id: connector.id, }; - let actual = snapshotter.sync_tables(input_tables); + let actual = snapshotter.sync_tables(&input_tables); assert!(actual.is_err()); diff --git a/dozer-ingestion/src/connectors/postgres/test_utils.rs b/dozer-ingestion/src/connectors/postgres/test_utils.rs index bdd388a538..466c2ffa08 100644 --- a/dozer-ingestion/src/connectors/postgres/test_utils.rs +++ b/dozer-ingestion/src/connectors/postgres/test_utils.rs @@ -32,9 +32,7 @@ pub fn get_iterator(config: Connection, table_name: String) -> IngestionIterator thread::spawn(move || { let tables: Vec = vec![TableInfo { - name: table_name.clone(), - table_name: table_name.clone(), - id: 0, + name: table_name, columns: None, }]; diff --git a/dozer-ingestion/src/connectors/postgres/tests/continue_replication_tests.rs b/dozer-ingestion/src/connectors/postgres/tests/continue_replication_tests.rs index 1cfd9bc366..2a68e18a40 100644 --- a/dozer-ingestion/src/connectors/postgres/tests/continue_replication_tests.rs +++ b/dozer-ingestion/src/connectors/postgres/tests/continue_replication_tests.rs @@ -89,8 +89,6 @@ mod tests { let tables = vec![TableInfo { name: table_name.clone(), - table_name: table_name.clone(), - id: 0, columns: None, }]; diff --git a/dozer-ingestion/src/connectors/snowflake/connection/client.rs b/dozer-ingestion/src/connectors/snowflake/connection/client.rs index 46a0221c8f..3694ef0ec9 100644 --- a/dozer-ingestion/src/connectors/snowflake/connection/client.rs +++ b/dozer-ingestion/src/connectors/snowflake/connection/client.rs @@ -399,8 +399,7 @@ impl Client { if idx > 0 { buf.write_char(',').unwrap(); } - buf.write_str(&format!("\'{}\'", table_info.table_name)) - .unwrap(); + buf.write_str(&format!("\'{}\'", table_info.name)).unwrap(); } buf.write_char(')').unwrap(); buf diff --git a/dozer-ingestion/src/connectors/snowflake/connector.rs b/dozer-ingestion/src/connectors/snowflake/connector.rs index 2913005b37..58a56822ca 100644 --- a/dozer-ingestion/src/connectors/snowflake/connector.rs +++ b/dozer-ingestion/src/connectors/snowflake/connector.rs @@ -86,8 +86,8 @@ impl Connector for SnowflakeConnector { }) } - fn get_tables(&self, tables: Option<&[TableInfo]>) -> Result, ConnectorError> { - self.get_tables_default(tables) + fn get_tables(&self) -> Result, ConnectorError> { + self.get_tables_default() } fn can_start_from(&self, _last_checkpoint: (u64, u64)) -> Result { @@ -117,19 +117,17 @@ async fn run( if iteration == 0 { match from_seq { None | Some((0, _)) => { - info!("[{}][{}] Creating new stream", name, table.table_name); - StreamConsumer::drop_stream(&client, &table.table_name)?; - StreamConsumer::create_stream(&client, &table.table_name)?; + info!("[{}][{}] Creating new stream", name, table.name); + StreamConsumer::drop_stream(&client, &table.name)?; + StreamConsumer::create_stream(&client, &table.name)?; } Some((lsn, seq)) => { info!( "[{}][{}] Continuing ingestion from {}/{}", - name, table.table_name, lsn, seq + name, table.name, lsn, seq ); iteration = lsn; - if let Ok(false) = - StreamConsumer::is_stream_created(&client, &table.table_name) - { + if let Ok(false) = StreamConsumer::is_stream_created(&client, &table.name) { return Err(ConnectorError::SnowflakeError( SnowflakeError::SnowflakeStreamError( SnowflakeStreamError::StreamNotFound, @@ -140,12 +138,9 @@ async fn run( } } - debug!( - "[{}][{}] Reading from changes stream", - name, table.table_name - ); + debug!("[{}][{}] Reading from changes stream", name, table.name); - consumer.consume_stream(&stream_client, &table.table_name, ingestor, idx, iteration)?; + consumer.consume_stream(&stream_client, &table.name, ingestor, idx, iteration)?; interval.tick().await; } diff --git a/dozer-ingestion/src/connectors/snowflake/schema_helper.rs b/dozer-ingestion/src/connectors/snowflake/schema_helper.rs index 7b50fa5dfb..204897fb10 100644 --- a/dozer-ingestion/src/connectors/snowflake/schema_helper.rs +++ b/dozer-ingestion/src/connectors/snowflake/schema_helper.rs @@ -29,7 +29,7 @@ impl SchemaHelper { let tables_indexes = table_names.clone().map_or(HashMap::new(), |tables| { let mut result = HashMap::new(); for (idx, table) in tables.iter().enumerate() { - result.insert(table.table_name.clone(), idx); + result.insert(table.name.clone(), idx); } result @@ -74,8 +74,8 @@ impl SchemaHelper { let existing_schemas_names: Vec = schemas.iter().map(|s| s.name.clone()).collect(); for table in tables { let mut result = vec![]; - if !existing_schemas_names.contains(&table.table_name) { - result.push((None, Err(TableNotFound(table.table_name.clone())))); + if !existing_schemas_names.contains(&table.name) { + result.push((None, Err(TableNotFound(table.name.clone())))); } validation_result.insert(table.name.clone(), result); diff --git a/dozer-ingestion/src/connectors/snowflake/tests.rs b/dozer-ingestion/src/connectors/snowflake/tests.rs index c97e35d963..048257970e 100644 --- a/dozer-ingestion/src/connectors/snowflake/tests.rs +++ b/dozer-ingestion/src/connectors/snowflake/tests.rs @@ -55,8 +55,6 @@ fn test_disabled_connector_and_read_from_stream() { let connection_config = connection.clone(); let table = TableInfo { name: table_name.clone(), - table_name: table_name.clone(), - id: 0, columns: None, }; thread::spawn(move || { @@ -141,9 +139,7 @@ fn test_disabled_connector_get_schemas_test() { let schemas = connector .as_ref() .get_schemas(Some(vec![TableInfo { - name: table_name.to_string(), - table_name: table_name.to_string(), - id: 0, + name: table_name.clone(), columns: None, }])) .unwrap(); @@ -185,9 +181,7 @@ fn test_disabled_connector_missing_table_validator() { let not_existing_table = "not_existing_table".to_string(); let result = connector .validate_schemas(&[TableInfo { - name: not_existing_table.clone(), - table_name: not_existing_table, - id: 0, + name: not_existing_table, columns: None, }]) .unwrap(); @@ -201,8 +195,6 @@ fn test_disabled_connector_missing_table_validator() { let result = connector .validate_schemas(&[TableInfo { name: existing_table.clone(), - table_name: existing_table.clone(), - id: 0, columns: None, }]) .unwrap(); diff --git a/dozer-orchestrator/src/pipeline/connector_source.rs b/dozer-orchestrator/src/pipeline/connector_source.rs index b039c3ed1b..7a6bb03b28 100644 --- a/dozer-orchestrator/src/pipeline/connector_source.rs +++ b/dozer-orchestrator/src/pipeline/connector_source.rs @@ -1,8 +1,8 @@ use dozer_core::channels::SourceChannelForwarder; -use dozer_core::errors::ExecutionError::{InternalError, ReplicationTypeNotFound}; +use dozer_core::errors::ExecutionError::InternalError; use dozer_core::errors::{ExecutionError, SourceError}; use dozer_core::node::{OutputPortDef, OutputPortType, PortHandle, Source, SourceFactory}; -use dozer_ingestion::connectors::{get_connector, Connector, TableInfo}; +use dozer_ingestion::connectors::{get_connector, ColumnInfo, Connector, TableInfo}; use dozer_ingestion::errors::ConnectorError; use dozer_ingestion::ingestion::{IngestionConfig, IngestionIterator, Ingestor}; use dozer_sql::pipeline::builder::SchemaSQLContext; @@ -13,7 +13,6 @@ use dozer_types::models::connection::Connection; use dozer_types::parking_lot::Mutex; use dozer_types::types::{ Operation, ReplicationChangesTrackingType, Schema, SchemaIdentifier, SourceDefinition, - SourceSchema, }; use std::collections::HashMap; use std::thread; @@ -39,15 +38,22 @@ fn attach_progress(multi_pb: Option) -> ProgressBar { pb } +#[derive(Debug)] +struct Table { + name: String, + columns: Option>, + schema: Schema, + replication_type: ReplicationChangesTrackingType, + port: PortHandle, +} + #[derive(Debug)] pub struct ConnectorSourceFactory { - pub ports: HashMap, - pub schema_port_map: HashMap, - pub schema_map: HashMap, - pub replication_changes_type_map: HashMap, - pub tables: Vec, - pub connection: Connection, - pub progress: Option, + connection_name: String, + tables: Vec, + /// Will be moved to `ConnectorSource` in `build`. + connector: Mutex>>, + progress: Option, } fn map_replication_type_to_output_port_type( @@ -70,70 +76,48 @@ fn map_replication_type_to_output_port_type( impl ConnectorSourceFactory { pub fn new( - ports: HashMap, - tables: Vec, + table_and_ports: Vec<(TableInfo, PortHandle)>, connection: Connection, progress: Option, ) -> Result { - let (schema_map, schema_port_map, replication_changes_type_map) = - Self::get_schema_map(connection.clone(), tables.clone(), ports.clone())?; - Ok(Self { - ports, - schema_port_map, - schema_map, - replication_changes_type_map, - tables, - connection, - progress, - }) - } - - #[allow(clippy::type_complexity)] - fn get_schema_map( - connection: Connection, - tables: Vec, - ports: HashMap, - ) -> Result< - ( - HashMap, - HashMap, - HashMap, - ), - ExecutionError, - > { - let tables_map = tables - .iter() - .map(|table| (table.table_name.clone(), table.name.clone())) - .collect::>(); + let connection_name = connection.name.clone(); let connector = get_connector(connection).map_err(|e| InternalError(Box::new(e)))?; - let schema_tuples = connector - .get_schemas(Some(tables)) + let source_schemas = connector + .get_schemas(Some( + table_and_ports + .iter() + .map(|(table, _)| table.clone()) + .collect(), + )) .map_err(|e| InternalError(Box::new(e)))?; - let mut schema_map = HashMap::new(); - let mut schema_port_map: HashMap = HashMap::new(); - let mut replication_changes_type_map: HashMap = - HashMap::new(); - - for SourceSchema { - name, - schema, - replication_type, - } in schema_tuples + let mut tables = vec![]; + for ((table, port), source_schema) in + table_and_ports.into_iter().zip(source_schemas.into_iter()) { - let source_name = tables_map.get(&name).unwrap(); - let port: u16 = *ports - .get(source_name) - .ok_or(ExecutionError::PortNotFound(name))?; - let schema_id = get_schema_id(schema.identifier)?; + let name = table.name; + let columns = table.columns; + let schema = source_schema.schema; + let replication_type = source_schema.replication_type; + + let table = Table { + name, + columns, + schema, + replication_type, + port, + }; - schema_port_map.insert(schema_id, port); - schema_map.insert(port, schema); - replication_changes_type_map.insert(port, replication_type); + tables.push(table); } - Ok((schema_map, schema_port_map, replication_changes_type_map)) + Ok(Self { + connection_name, + tables, + connector: Mutex::new(Some(connector)), + progress, + }) } } @@ -142,25 +126,21 @@ impl SourceFactory for ConnectorSourceFactory { &self, port: &PortHandle, ) -> Result<(Schema, SchemaSQLContext), ExecutionError> { - let mut schema = self - .schema_map - .get(port) - .map_or(Err(ExecutionError::PortNotFoundInSource(*port)), |s| { - Ok(s.clone()) - })?; + let table = self + .tables + .iter() + .find(|table| table.port == *port) + .ok_or(ExecutionError::PortNotFoundInSource(*port))?; + let mut schema = table.schema.clone(); + let table_name = &table.name; - let table_name = self.ports.iter().find(|(_, p)| **p == *port).unwrap().0; // Add source information to the schema. - let mut fields = vec![]; - for field in schema.fields { - let mut f = field.clone(); - f.source = SourceDefinition::Table { - connection: self.connection.name.clone(), + for field in &mut schema.fields { + field.source = SourceDefinition::Table { + connection: self.connection_name.clone(), name: table_name.clone(), }; - fields.push(f); } - schema.fields = fields; use std::println as info; info!("Source: Initializing input schema: {table_name}"); @@ -169,19 +149,12 @@ impl SourceFactory for ConnectorSourceFactory { Ok((schema, SchemaSQLContext::default())) } - fn get_output_ports(&self) -> Result, ExecutionError> { - self.ports - .values() - .map(|e| { - self.replication_changes_type_map.get(e).map_or( - Err(ReplicationTypeNotFound), - |typ| { - Ok(OutputPortDef::new( - *e, - map_replication_type_to_output_port_type(typ), - )) - }, - ) + fn get_output_ports(&self) -> Vec { + self.tables + .iter() + .map(|table| { + let typ = map_replication_type_to_output_port_type(&table.replication_type); + OutputPortDef::new(table.port, typ) }) .collect() } @@ -191,21 +164,40 @@ impl SourceFactory for ConnectorSourceFactory { _output_schemas: HashMap, ) -> Result, ExecutionError> { let (ingestor, iterator) = Ingestor::initialize_channel(IngestionConfig::default()); - let connector = get_connector(self.connection.clone()) - .map_err(|e| ExecutionError::ConnectorError(Box::new(e)))?; + + let mut schema_port_map = HashMap::new(); + for table in &self.tables { + let schema_id = get_schema_id(table.schema.identifier)?; + schema_port_map.insert(schema_id, table.port); + } + + let tables = self + .tables + .iter() + .map(|table| TableInfo { + name: table.name.clone(), + columns: table.columns.clone(), + }) + .collect(); + + let connector = self + .connector + .lock() + .take() + .expect("ConnectorSource was already built"); let mut bars = HashMap::new(); - for (name, port) in self.ports.iter() { + for table in &self.tables { let pb = attach_progress(self.progress.clone()); - pb.set_message(name.clone()); - bars.insert(*port, pb); + pb.set_message(table.name.clone()); + bars.insert(table.port, pb); } Ok(Box::new(ConnectorSource { ingestor, iterator: Mutex::new(iterator), - schema_port_map: self.schema_port_map.clone(), - tables: self.tables.clone(), + schema_port_map, + tables, connector, bars, })) @@ -216,10 +208,10 @@ impl SourceFactory for ConnectorSourceFactory { pub struct ConnectorSource { ingestor: Ingestor, iterator: Mutex, - schema_port_map: HashMap, + schema_port_map: HashMap, tables: Vec, connector: Box, - bars: HashMap, + bars: HashMap, } impl Source for ConnectorSource { diff --git a/dozer-orchestrator/src/pipeline/source_builder.rs b/dozer-orchestrator/src/pipeline/source_builder.rs index f964b29fc8..f6a6e356db 100644 --- a/dozer-orchestrator/src/pipeline/source_builder.rs +++ b/dozer-orchestrator/src/pipeline/source_builder.rs @@ -56,34 +56,34 @@ impl<'a> SourceBuilder<'a> { if let Some(connection) = &first_source.connection { let mut ports = HashMap::new(); - let mut tables = vec![]; + let mut table_and_ports = vec![]; for source in &sources_group { if self.used_sources.contains(&source.name) { ports.insert(source.name.clone(), port); - tables.push(TableInfo { - name: source.name.clone(), - table_name: source.table_name.clone(), - id: port as u32, - columns: Some( - source - .columns - .iter() - .map(|c| ColumnInfo { - name: c.clone(), - data_type: None, - }) - .collect(), - ), - }); + table_and_ports.push(( + TableInfo { + name: source.table_name.clone(), + columns: Some( + source + .columns + .iter() + .map(|c| ColumnInfo { + name: c.clone(), + data_type: None, + }) + .collect(), + ), + }, + port, + )); port += 1; } } let source_factory = ConnectorSourceFactory::new( - ports.clone(), - tables, + table_and_ports, connection.clone(), self.progress.cloned(), )?; diff --git a/dozer-orchestrator/src/pipeline/validate.rs b/dozer-orchestrator/src/pipeline/validate.rs index c29b7b13e6..81c9577192 100644 --- a/dozer-orchestrator/src/pipeline/validate.rs +++ b/dozer-orchestrator/src/pipeline/validate.rs @@ -23,9 +23,7 @@ pub fn validate_grouped_connections( let tables: Vec = sources_group .iter() .map(|source| TableInfo { - name: source.name.clone(), - table_name: source.table_name.clone(), - id: 0, + name: source.table_name.clone(), columns: Some( source .columns diff --git a/dozer-sql/src/pipeline/product/tests/left_join_test.rs b/dozer-sql/src/pipeline/product/tests/left_join_test.rs index ff5cf696bd..32ba964728 100644 --- a/dozer-sql/src/pipeline/product/tests/left_join_test.rs +++ b/dozer-sql/src/pipeline/product/tests/left_join_test.rs @@ -41,8 +41,8 @@ impl TestSourceFactory { } impl SourceFactory for TestSourceFactory { - fn get_output_ports(&self) -> Result, ExecutionError> { - Ok(vec![ + fn get_output_ports(&self) -> Vec { + vec![ OutputPortDef::new( USER_PORT, OutputPortType::StatefulWithPrimaryKeyLookup { @@ -64,7 +64,7 @@ impl SourceFactory for TestSourceFactory { retr_old_records_for_deletes: true, }, ), - ]) + ] } fn get_output_schema( diff --git a/dozer-sql/src/pipeline/product/tests/pipeline_test.rs b/dozer-sql/src/pipeline/product/tests/pipeline_test.rs index 160e587444..e581398083 100644 --- a/dozer-sql/src/pipeline/product/tests/pipeline_test.rs +++ b/dozer-sql/src/pipeline/product/tests/pipeline_test.rs @@ -41,8 +41,8 @@ impl TestSourceFactory { } impl SourceFactory for TestSourceFactory { - fn get_output_ports(&self) -> Result, ExecutionError> { - Ok(vec![ + fn get_output_ports(&self) -> Vec { + vec![ OutputPortDef::new( USER_PORT, OutputPortType::StatefulWithPrimaryKeyLookup { @@ -58,7 +58,7 @@ impl SourceFactory for TestSourceFactory { retr_old_records_for_deletes: true, }, ), - ]) + ] } fn get_output_schema( diff --git a/dozer-sql/src/pipeline/product/tests/set_operator_test.rs b/dozer-sql/src/pipeline/product/tests/set_operator_test.rs index 7de934398d..0641bbefc4 100644 --- a/dozer-sql/src/pipeline/product/tests/set_operator_test.rs +++ b/dozer-sql/src/pipeline/product/tests/set_operator_test.rs @@ -192,8 +192,8 @@ impl TestSourceFactory { } impl SourceFactory for TestSourceFactory { - fn get_output_ports(&self) -> Result, ExecutionError> { - Ok(vec![ + fn get_output_ports(&self) -> Vec { + vec![ OutputPortDef::new( SUPPLIERS_PORT, OutputPortType::StatefulWithPrimaryKeyLookup { @@ -208,7 +208,7 @@ impl SourceFactory for TestSourceFactory { retr_old_records_for_deletes: true, }, ), - ]) + ] } fn get_output_schema( diff --git a/dozer-sql/src/pipeline/tests/builder_test.rs b/dozer-sql/src/pipeline/tests/builder_test.rs index b1a63f9bdc..c38dac9369 100644 --- a/dozer-sql/src/pipeline/tests/builder_test.rs +++ b/dozer-sql/src/pipeline/tests/builder_test.rs @@ -41,12 +41,11 @@ impl TestSourceFactory { } impl SourceFactory for TestSourceFactory { - fn get_output_ports(&self) -> Result, ExecutionError> { - Ok(self - .output_ports + fn get_output_ports(&self) -> Vec { + self.output_ports .iter() .map(|e| OutputPortDef::new(*e, OutputPortType::Stateless)) - .collect()) + .collect() } fn get_output_schema( diff --git a/dozer-tests/src/sql_tests/pipeline.rs b/dozer-tests/src/sql_tests/pipeline.rs index f087899be2..8b0417e426 100644 --- a/dozer-tests/src/sql_tests/pipeline.rs +++ b/dozer-tests/src/sql_tests/pipeline.rs @@ -91,9 +91,8 @@ impl SourceFactory for TestSourceFactory { Ok((schema, SchemaSQLContext::default())) } - fn get_output_ports(&self) -> Result, ExecutionError> { - Ok(self - .schemas + fn get_output_ports(&self) -> Vec { + self.schemas .iter() .enumerate() .map(|(idx, _)| { @@ -105,7 +104,7 @@ impl SourceFactory for TestSourceFactory { }, ) }) - .collect()) + .collect() } fn build(