Skip to content

Commit

Permalink
Implement batch for pending messages
Browse files Browse the repository at this point in the history
  • Loading branch information
karolisg committed Mar 5, 2024
1 parent 5c40e56 commit 1e945ed
Showing 1 changed file with 26 additions and 9 deletions.
35 changes: 26 additions & 9 deletions dozer-ingestion/aerospike/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ impl AerospikeConnector {
#[derive(Debug)]
struct PendingMessage {
message: IngestionMessage,
commit: bool,
sender: oneshot::Sender<()>,
}

Expand Down Expand Up @@ -235,11 +236,13 @@ async fn ingestor_loop(

// Ignore the error, because the server can be down.
let _ = ingestor.handle_message(message.message).await;
let _ = ingestor
.handle_message(IngestionMessage::TransactionInfo(TransactionInfo::Commit {
id: Some(OpIdentifier::new(0, operation_id)),
}))
.await;
if message.commit {
let _ = ingestor
.handle_message(IngestionMessage::TransactionInfo(TransactionInfo::Commit {
id: Some(OpIdentifier::new(0, operation_id)),
}))
.await;
}

operation_id += 1;
}
Expand Down Expand Up @@ -337,7 +340,11 @@ async fn event_request_handler(
Ok(None) => HttpResponse::Ok().finish(),
Ok(Some(message)) => {
let (sender, receiver) = oneshot::channel::<()>();
if let Err(e) = state.sender.send(PendingMessage { message, sender }) {
if let Err(e) = state.sender.send(PendingMessage {
message,
sender,
commit: true,
}) {
error!("Ingestor is down: {:?}", e);
return HttpResponse::InternalServerError().finish();
}
Expand All @@ -360,13 +367,23 @@ async fn batch_event_request_handler(
let events = json.into_inner();
let state = data.into_inner();

let mut mapped_messages = vec![];
trace!("Aerospike events {:?}", events);
for event in events {
trace!("Aerospike event {:?}", event);
match map_events(event, &state.tables_index_map).await {
mapped_messages.push(map_events(event, &state.tables_index_map).await);
}

let last_idx = mapped_messages.len() - 1;
for (idx, mapped_message) in mapped_messages.into_iter().enumerate() {
match mapped_message {
Ok(Some(message)) => {
trace!("Mapped message {:?}", message);
let (sender, receiver) = oneshot::channel::<()>();
if let Err(e) = state.sender.send(PendingMessage { message, sender }) {
if let Err(e) = state.sender.send(PendingMessage {
message,
sender,
commit: last_idx == idx,
}) {
error!("Aerospike ingestion message send error: {:?}", e);
return HttpResponse::InternalServerError().finish();
}
Expand Down

0 comments on commit 1e945ed

Please sign in to comment.