From 8c30ee02f9930cf9877db45bf678c5af2347a289 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karolis=20Gudi=C5=A1kis?= Date: Mon, 19 Feb 2024 16:22:33 +0200 Subject: [PATCH] Dont use batch insert --- dozer-cli/src/pipeline/dummy_sink.rs | 2 +- dozer-ingestion/aerospike/src/connector.rs | 31 ++++++++++++---------- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/dozer-cli/src/pipeline/dummy_sink.rs b/dozer-cli/src/pipeline/dummy_sink.rs index 500bddfecf..c5985b6785 100644 --- a/dozer-cli/src/pipeline/dummy_sink.rs +++ b/dozer-cli/src/pipeline/dummy_sink.rs @@ -39,7 +39,7 @@ impl SinkFactory for DummySinkFactory { .next() .and_then(|schema| { schema.fields.into_iter().enumerate().find(|(_, field)| { - field.name == "inserted_at" && field.typ == FieldType::Timestamp + field.name.to_lowercase() == "inserted_at" && field.typ == FieldType::Timestamp }) }) .map(|(index, _)| index); diff --git a/dozer-ingestion/aerospike/src/connector.rs b/dozer-ingestion/aerospike/src/connector.rs index f096b09d18..4c86715b2d 100644 --- a/dozer-ingestion/aerospike/src/connector.rs +++ b/dozer-ingestion/aerospike/src/connector.rs @@ -5,7 +5,7 @@ use dozer_ingestion_connector::dozer_types::models::ingestion_types::{ IngestionMessage, TransactionInfo, }; use dozer_ingestion_connector::dozer_types::node::OpIdentifier; -use dozer_ingestion_connector::dozer_types::types::Operation::{BatchInsert, Insert}; +use dozer_ingestion_connector::dozer_types::types::Operation::Insert; use dozer_ingestion_connector::dozer_types::types::{ Field, FieldDefinition, FieldType, Record, Schema, }; @@ -36,6 +36,7 @@ use base64::prelude::*; use dozer_ingestion_connector::dozer_types::chrono::{ DateTime, FixedOffset, NaiveDate, NaiveDateTime, Utc, }; + use dozer_ingestion_connector::dozer_types::thiserror::{self, Error}; use dozer_ingestion_connector::schema_parser::SchemaParser; @@ -217,31 +218,33 @@ async fn batch_event_request_handler( let events = json.into_inner(); let state = data.into_inner(); - let mut mapped_events: HashMap> = HashMap::new(); for event in events { - match map_record(event, state.tables_index_map.clone()).await { - Ok(Some((table_index, record))) => { - mapped_events.entry(table_index).or_default().push(record); + match map_events(event, state.tables_index_map.clone()).await { + Ok(Some(message)) => { + if let Err(e) = state.ingestor.handle_message(message).await { + error!("Aerospike ingestion message send error: {:?}", e); + return HttpResponse::InternalServerError().finish(); + } } Ok(None) => {} Err(e) => return map_error(e), } } - for (table_index, records) in mapped_events { - let event = IngestionMessage::OperationEvent { - table_index, - op: BatchInsert { new: records }, + let transaction_ingestion_result = state + .ingestor + .handle_message(IngestionMessage::TransactionInfo(TransactionInfo::Commit { id: None, - }; + })) + .await; - if let Err(e) = state.ingestor.handle_message(event).await { + match transaction_ingestion_result { + Ok(_) => HttpResponse::Ok().finish(), + Err(e) => { error!("Aerospike ingestion message send error: {:?}", e); - return HttpResponse::InternalServerError().finish(); + HttpResponse::InternalServerError().finish() } } - - HttpResponse::Ok().finish() } #[derive(Clone, Debug)]