diff --git a/dozer-ingestion/aerospike/src/connector.rs b/dozer-ingestion/aerospike/src/connector.rs index 80463cc7fe..5c99c192b4 100644 --- a/dozer-ingestion/aerospike/src/connector.rs +++ b/dozer-ingestion/aerospike/src/connector.rs @@ -7,9 +7,7 @@ use dozer_ingestion_connector::dozer_types::models::ingestion_types::{ }; use dozer_ingestion_connector::dozer_types::node::{NodeHandle, OpIdentifier, SourceState}; use dozer_ingestion_connector::dozer_types::types::Operation::Insert; -use dozer_ingestion_connector::dozer_types::types::{ - Field, FieldDefinition, FieldType, Record, Schema, -}; +use dozer_ingestion_connector::dozer_types::types::{Field, FieldDefinition, FieldType, Schema}; use dozer_ingestion_connector::tokio::sync::broadcast::error::RecvError; use dozer_ingestion_connector::tokio::sync::broadcast::Receiver; use dozer_ingestion_connector::tokio::sync::{mpsc, oneshot}; @@ -363,32 +361,26 @@ async fn batch_event_request_handler( let state = data.into_inner(); for event in events { - match map_events(event, state.tables_index_map.clone()).await { + trace!("Aerospike event {:?}", event); + match map_events(event, &state.tables_index_map).await { Ok(Some(message)) => { - if let Err(e) = state.ingestor.handle_message(message).await { + trace!("Mapped message {:?}", message); + let (sender, receiver) = oneshot::channel::<()>(); + if let Err(e) = state.sender.send(PendingMessage { message, sender }) { error!("Aerospike ingestion message send error: {:?}", e); return HttpResponse::InternalServerError().finish(); } + if let Err(e) = receiver.await { + error!("Pipeline event processor is down: {:?}", e); + return HttpResponse::InternalServerError().finish(); + } } Ok(None) => {} Err(e) => return map_error(e), } } - let transaction_ingestion_result = state - .ingestor - .handle_message(IngestionMessage::TransactionInfo(TransactionInfo::Commit { - id: None, - })) - .await; - - match transaction_ingestion_result { - Ok(_) => HttpResponse::Ok().finish(), - Err(e) => { - error!("Aerospike ingestion message send error: {:?}", e); - HttpResponse::InternalServerError().finish() - } - } + HttpResponse::Ok().finish() } #[derive(Clone, Debug)] @@ -734,17 +726,9 @@ async fn map_record( async fn map_events( event: AerospikeEvent, - tables_map: HashMap, + tables_map: &HashMap, ) -> Result, AerospikeConnectorError> { - let record = map_record(event, tables_map.clone()).await?; - match record { - Some((table_index, record)) => Ok(Some(IngestionMessage::OperationEvent { - table_index, - op: Insert { new: record }, - id: None, - })), - None => Ok(None), - } + map_record(event, tables_map).await } pub(crate) fn map_value_to_field(