Skip to content

Commit

Permalink
Aerospike batching
Browse files Browse the repository at this point in the history
  • Loading branch information
karolisg committed Mar 5, 2024
1 parent a6afd77 commit 5c40e56
Showing 1 changed file with 13 additions and 29 deletions.
42 changes: 13 additions & 29 deletions dozer-ingestion/aerospike/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ use dozer_ingestion_connector::dozer_types::models::ingestion_types::{
};
use dozer_ingestion_connector::dozer_types::node::{NodeHandle, OpIdentifier, SourceState};
use dozer_ingestion_connector::dozer_types::types::Operation::Insert;
use dozer_ingestion_connector::dozer_types::types::{
Field, FieldDefinition, FieldType, Record, Schema,
};
use dozer_ingestion_connector::dozer_types::types::{Field, FieldDefinition, FieldType, Schema};
use dozer_ingestion_connector::tokio::sync::broadcast::error::RecvError;
use dozer_ingestion_connector::tokio::sync::broadcast::Receiver;
use dozer_ingestion_connector::tokio::sync::{mpsc, oneshot};
Expand Down Expand Up @@ -363,32 +361,26 @@ async fn batch_event_request_handler(
let state = data.into_inner();

for event in events {
match map_events(event, state.tables_index_map.clone()).await {
trace!("Aerospike event {:?}", event);
match map_events(event, &state.tables_index_map).await {
Ok(Some(message)) => {
if let Err(e) = state.ingestor.handle_message(message).await {
trace!("Mapped message {:?}", message);
let (sender, receiver) = oneshot::channel::<()>();
if let Err(e) = state.sender.send(PendingMessage { message, sender }) {
error!("Aerospike ingestion message send error: {:?}", e);
return HttpResponse::InternalServerError().finish();
}
if let Err(e) = receiver.await {
error!("Pipeline event processor is down: {:?}", e);
return HttpResponse::InternalServerError().finish();
}
}
Ok(None) => {}
Err(e) => return map_error(e),
}
}

let transaction_ingestion_result = state
.ingestor
.handle_message(IngestionMessage::TransactionInfo(TransactionInfo::Commit {
id: None,
}))
.await;

match transaction_ingestion_result {
Ok(_) => HttpResponse::Ok().finish(),
Err(e) => {
error!("Aerospike ingestion message send error: {:?}", e);
HttpResponse::InternalServerError().finish()
}
}
HttpResponse::Ok().finish()
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -734,17 +726,9 @@ async fn map_record(

async fn map_events(
event: AerospikeEvent,
tables_map: HashMap<String, TableIndexMap>,
tables_map: &HashMap<String, TableIndexMap>,
) -> Result<Option<IngestionMessage>, AerospikeConnectorError> {
let record = map_record(event, tables_map.clone()).await?;
match record {
Some((table_index, record)) => Ok(Some(IngestionMessage::OperationEvent {
table_index,
op: Insert { new: record },
id: None,
})),
None => Ok(None),
}
map_record(event, tables_map).await
}

pub(crate) fn map_value_to_field(
Expand Down

0 comments on commit 5c40e56

Please sign in to comment.