From f86cfd4c831f1a64cd625ec56a396c9adf5a112a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karolis=20Gudi=C5=A1kis?= Date: Thu, 7 Mar 2024 10:11:36 +0200 Subject: [PATCH] feat: Support aerospike batching (#2403) * feat: Support aerospike batching * Dont use batch insert * Aerospike batching * Implement batch for pending messages * Testing * Add json parsing error * Fix info * Don't wait for confirmation in the middle of a commit --------- Co-authored-by: chubei <914745487@qq.com> --- dozer-cli/src/pipeline/dummy_sink.rs | 2 +- dozer-ingestion/aerospike/src/connector.rs | 71 +++++++++++++++++++--- dozer-sink-oracle/src/lib.rs | 2 + 3 files changed, 67 insertions(+), 8 deletions(-) diff --git a/dozer-cli/src/pipeline/dummy_sink.rs b/dozer-cli/src/pipeline/dummy_sink.rs index f5d07ddca2..9607278bad 100644 --- a/dozer-cli/src/pipeline/dummy_sink.rs +++ b/dozer-cli/src/pipeline/dummy_sink.rs @@ -44,7 +44,7 @@ impl SinkFactory for DummySinkFactory { .next() .and_then(|schema| { schema.fields.into_iter().enumerate().find(|(_, field)| { - field.name == "inserted_at" && field.typ == FieldType::Timestamp + field.name.to_lowercase() == "inserted_at" && field.typ == FieldType::Timestamp }) }) .map(|(index, _)| index); diff --git a/dozer-ingestion/aerospike/src/connector.rs b/dozer-ingestion/aerospike/src/connector.rs index 97956f7fd8..9502f1d63b 100644 --- a/dozer-ingestion/aerospike/src/connector.rs +++ b/dozer-ingestion/aerospike/src/connector.rs @@ -166,9 +166,21 @@ impl AerospikeConnector { Ok(HttpServer::new(move || { App::new() + .app_data(web::JsonConfig::default().error_handler(|err, _req| { + error!("Error parsing json: {:?}", err); + actix_web::error::InternalError::from_response( + "", + HttpResponse::BadRequest() + .content_type("application/json") + .body(format!(r#"{{"error":"{}"}}"#, err)), + ) + .into() + })) .app_data(web::Data::new(server_state.clone())) .service(healthcheck) + .service(healthcheck_batch) .service(event_request_handler) + .service(batch_event_request_handler) }) .bind(address)? .run()) @@ -205,7 +217,7 @@ impl AerospikeConnector { #[derive(Debug)] struct PendingMessage { - message: IngestionMessage, + messages: Vec, sender: oneshot::Sender<()>, } @@ -232,7 +244,9 @@ async fn ingestor_loop( operation_id_sender.send(pending_operation_id).unwrap(); // Ignore the error, because the server can be down. - let _ = ingestor.handle_message(message.message).await; + for message in message.messages { + let _ = ingestor.handle_message(message).await; + } let _ = ingestor .handle_message(IngestionMessage::TransactionInfo(TransactionInfo::Commit { id: Some(OpIdentifier::new(0, operation_id)), @@ -309,6 +323,11 @@ async fn healthcheck(_req: HttpRequest) -> HttpResponse { HttpResponse::Ok().finish() } +#[get("/batch")] +async fn healthcheck_batch(_req: HttpRequest) -> HttpResponse { + HttpResponse::Ok().finish() +} + #[post("/")] async fn event_request_handler( json: web::Json, @@ -323,14 +342,17 @@ async fn event_request_handler( return HttpResponse::Ok().finish(); } - let operation_events = map_events(event, &state.tables_index_map).await; + let message = map_record(event, &state.tables_index_map).await; - trace!("Mapped events {:?}", operation_events); - match operation_events { + trace!("Mapped message {:?}", message); + match message { Ok(None) => HttpResponse::Ok().finish(), Ok(Some(message)) => { let (sender, receiver) = oneshot::channel::<()>(); - if let Err(e) = state.sender.send(PendingMessage { message, sender }) { + if let Err(e) = state.sender.send(PendingMessage { + messages: vec![message], + sender, + }) { error!("Ingestor is down: {:?}", e); return HttpResponse::InternalServerError().finish(); } @@ -345,6 +367,41 @@ async fn event_request_handler( } } +#[post("/batch")] +async fn batch_event_request_handler( + json: web::Json>, + data: web::Data, +) -> HttpResponse { + let events = json.into_inner(); + let state = data.into_inner(); + + let mut messages = vec![]; + trace!("Aerospike events {:?}", events); + info!("Events counts: {}", events.len()); + for event in events { + match map_record(event, &state.tables_index_map).await { + Ok(None) => {} + Ok(Some(message)) => messages.push(message), + Err(e) => return map_error(e), + } + } + + trace!("Mapped messages {:?}", messages); + info!("Mapped messages count {}", messages.len()); + let (sender, receiver) = oneshot::channel::<()>(); + if let Err(e) = state.sender.send(PendingMessage { messages, sender }) { + error!("Ingestor is down: {:?}", e); + return HttpResponse::InternalServerError().finish(); + } + + if let Err(e) = receiver.await { + error!("Pipeline event processor is down: {:?}", e); + return HttpResponse::InternalServerError().finish(); + } + + HttpResponse::Ok().finish() +} + #[derive(Clone, Debug)] struct TableIndexMap { table_index: usize, @@ -604,7 +661,7 @@ impl Connector for AerospikeConnector { } } -async fn map_events( +async fn map_record( event: AerospikeEvent, tables_map: &HashMap, ) -> Result, AerospikeConnectorError> { diff --git a/dozer-sink-oracle/src/lib.rs b/dozer-sink-oracle/src/lib.rs index 3299a52f7a..4f25ed9250 100644 --- a/dozer-sink-oracle/src/lib.rs +++ b/dozer-sink-oracle/src/lib.rs @@ -598,6 +598,7 @@ enum OpKind { impl OracleSink { fn exec_batch(&mut self) -> oracle::Result<()> { debug!("Executing batch of size {}", self.batch_params.len()); + let started = std::time::Instant::now(); let mut batch = self .conn .batch(&self.merge_statement, self.batch_params.len()) @@ -620,6 +621,7 @@ impl OracleSink { batch.append_row(&[])?; } batch.execute()?; + info!("Execution took {:?}", started.elapsed()); Ok(()) }