Skip to content

Commit

Permalink
Dont use batch insert
Browse files Browse the repository at this point in the history
  • Loading branch information
karolisg committed Feb 19, 2024
1 parent 9d8c5ee commit 8c30ee0
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 15 deletions.
2 changes: 1 addition & 1 deletion dozer-cli/src/pipeline/dummy_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl SinkFactory for DummySinkFactory {
.next()
.and_then(|schema| {
schema.fields.into_iter().enumerate().find(|(_, field)| {
field.name == "inserted_at" && field.typ == FieldType::Timestamp
field.name.to_lowercase() == "inserted_at" && field.typ == FieldType::Timestamp
})
})
.map(|(index, _)| index);
Expand Down
31 changes: 17 additions & 14 deletions dozer-ingestion/aerospike/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use dozer_ingestion_connector::dozer_types::models::ingestion_types::{
IngestionMessage, TransactionInfo,
};
use dozer_ingestion_connector::dozer_types::node::OpIdentifier;
use dozer_ingestion_connector::dozer_types::types::Operation::{BatchInsert, Insert};
use dozer_ingestion_connector::dozer_types::types::Operation::Insert;
use dozer_ingestion_connector::dozer_types::types::{
Field, FieldDefinition, FieldType, Record, Schema,
};
Expand Down Expand Up @@ -36,6 +36,7 @@ use base64::prelude::*;
use dozer_ingestion_connector::dozer_types::chrono::{
DateTime, FixedOffset, NaiveDate, NaiveDateTime, Utc,
};

use dozer_ingestion_connector::dozer_types::thiserror::{self, Error};
use dozer_ingestion_connector::schema_parser::SchemaParser;

Expand Down Expand Up @@ -217,31 +218,33 @@ async fn batch_event_request_handler(
let events = json.into_inner();
let state = data.into_inner();

let mut mapped_events: HashMap<usize, Vec<Record>> = HashMap::new();
for event in events {
match map_record(event, state.tables_index_map.clone()).await {
Ok(Some((table_index, record))) => {
mapped_events.entry(table_index).or_default().push(record);
match map_events(event, state.tables_index_map.clone()).await {
Ok(Some(message)) => {
if let Err(e) = state.ingestor.handle_message(message).await {
error!("Aerospike ingestion message send error: {:?}", e);
return HttpResponse::InternalServerError().finish();
}
}
Ok(None) => {}
Err(e) => return map_error(e),
}
}

for (table_index, records) in mapped_events {
let event = IngestionMessage::OperationEvent {
table_index,
op: BatchInsert { new: records },
let transaction_ingestion_result = state
.ingestor
.handle_message(IngestionMessage::TransactionInfo(TransactionInfo::Commit {
id: None,
};
}))
.await;

if let Err(e) = state.ingestor.handle_message(event).await {
match transaction_ingestion_result {
Ok(_) => HttpResponse::Ok().finish(),
Err(e) => {
error!("Aerospike ingestion message send error: {:?}", e);
return HttpResponse::InternalServerError().finish();
HttpResponse::InternalServerError().finish()
}
}

HttpResponse::Ok().finish()
}

#[derive(Clone, Debug)]
Expand Down

0 comments on commit 8c30ee0

Please sign in to comment.