diff --git a/dozer-ingestion/aerospike/src/connector.rs b/dozer-ingestion/aerospike/src/connector.rs index 5c99c192b4..344354ad57 100644 --- a/dozer-ingestion/aerospike/src/connector.rs +++ b/dozer-ingestion/aerospike/src/connector.rs @@ -208,6 +208,7 @@ impl AerospikeConnector { #[derive(Debug)] struct PendingMessage { message: IngestionMessage, + commit: bool, sender: oneshot::Sender<()>, } @@ -235,11 +236,13 @@ async fn ingestor_loop( // Ignore the error, because the server can be down. let _ = ingestor.handle_message(message.message).await; - let _ = ingestor - .handle_message(IngestionMessage::TransactionInfo(TransactionInfo::Commit { - id: Some(OpIdentifier::new(0, operation_id)), - })) - .await; + if message.commit { + let _ = ingestor + .handle_message(IngestionMessage::TransactionInfo(TransactionInfo::Commit { + id: Some(OpIdentifier::new(0, operation_id)), + })) + .await; + } operation_id += 1; } @@ -337,7 +340,11 @@ async fn event_request_handler( Ok(None) => HttpResponse::Ok().finish(), Ok(Some(message)) => { let (sender, receiver) = oneshot::channel::<()>(); - if let Err(e) = state.sender.send(PendingMessage { message, sender }) { + if let Err(e) = state.sender.send(PendingMessage { + message, + sender, + commit: true, + }) { error!("Ingestor is down: {:?}", e); return HttpResponse::InternalServerError().finish(); } @@ -360,13 +367,23 @@ async fn batch_event_request_handler( let events = json.into_inner(); let state = data.into_inner(); + let mut mapped_messages = vec![]; + trace!("Aerospike events {:?}", events); for event in events { - trace!("Aerospike event {:?}", event); - match map_events(event, &state.tables_index_map).await { + mapped_messages.push(map_events(event, &state.tables_index_map).await); + } + + let last_idx = mapped_messages.len() - 1; + for (idx, mapped_message) in mapped_messages.into_iter().enumerate() { + match mapped_message { Ok(Some(message)) => { trace!("Mapped message {:?}", message); let (sender, receiver) = oneshot::channel::<()>(); - if let Err(e) = state.sender.send(PendingMessage { message, sender }) { + if let Err(e) = state.sender.send(PendingMessage { + message, + sender, + commit: last_idx == idx, + }) { error!("Aerospike ingestion message send error: {:?}", e); return HttpResponse::InternalServerError().finish(); }