Skip to content

Commit

Permalink
feat: Support aerospike batching (#2403)
Browse files Browse the repository at this point in the history
* feat: Support aerospike batching

* Dont use batch insert

* Aerospike batching

* Implement batch for pending messages

* Testing

* Add json parsing error

* Fix info

* Don't wait for confirmation in the middle of a commit

---------

Co-authored-by: chubei <[email protected]>
  • Loading branch information
karolisg and chubei authored Mar 7, 2024
1 parent 9a5e355 commit f86cfd4
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 8 deletions.
2 changes: 1 addition & 1 deletion dozer-cli/src/pipeline/dummy_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl SinkFactory for DummySinkFactory {
.next()
.and_then(|schema| {
schema.fields.into_iter().enumerate().find(|(_, field)| {
field.name == "inserted_at" && field.typ == FieldType::Timestamp
field.name.to_lowercase() == "inserted_at" && field.typ == FieldType::Timestamp
})
})
.map(|(index, _)| index);
Expand Down
71 changes: 64 additions & 7 deletions dozer-ingestion/aerospike/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,21 @@ impl AerospikeConnector {

Ok(HttpServer::new(move || {
App::new()
.app_data(web::JsonConfig::default().error_handler(|err, _req| {
error!("Error parsing json: {:?}", err);
actix_web::error::InternalError::from_response(
"",
HttpResponse::BadRequest()
.content_type("application/json")
.body(format!(r#"{{"error":"{}"}}"#, err)),
)
.into()
}))
.app_data(web::Data::new(server_state.clone()))
.service(healthcheck)
.service(healthcheck_batch)
.service(event_request_handler)
.service(batch_event_request_handler)
})
.bind(address)?
.run())
Expand Down Expand Up @@ -205,7 +217,7 @@ impl AerospikeConnector {

#[derive(Debug)]
struct PendingMessage {
message: IngestionMessage,
messages: Vec<IngestionMessage>,
sender: oneshot::Sender<()>,
}

Expand All @@ -232,7 +244,9 @@ async fn ingestor_loop(
operation_id_sender.send(pending_operation_id).unwrap();

// Ignore the error, because the server can be down.
let _ = ingestor.handle_message(message.message).await;
for message in message.messages {
let _ = ingestor.handle_message(message).await;
}
let _ = ingestor
.handle_message(IngestionMessage::TransactionInfo(TransactionInfo::Commit {
id: Some(OpIdentifier::new(0, operation_id)),
Expand Down Expand Up @@ -309,6 +323,11 @@ async fn healthcheck(_req: HttpRequest) -> HttpResponse {
HttpResponse::Ok().finish()
}

#[get("/batch")]
async fn healthcheck_batch(_req: HttpRequest) -> HttpResponse {
HttpResponse::Ok().finish()
}

#[post("/")]
async fn event_request_handler(
json: web::Json<AerospikeEvent>,
Expand All @@ -323,14 +342,17 @@ async fn event_request_handler(
return HttpResponse::Ok().finish();
}

let operation_events = map_events(event, &state.tables_index_map).await;
let message = map_record(event, &state.tables_index_map).await;

trace!("Mapped events {:?}", operation_events);
match operation_events {
trace!("Mapped message {:?}", message);
match message {
Ok(None) => HttpResponse::Ok().finish(),
Ok(Some(message)) => {
let (sender, receiver) = oneshot::channel::<()>();
if let Err(e) = state.sender.send(PendingMessage { message, sender }) {
if let Err(e) = state.sender.send(PendingMessage {
messages: vec![message],
sender,
}) {
error!("Ingestor is down: {:?}", e);
return HttpResponse::InternalServerError().finish();
}
Expand All @@ -345,6 +367,41 @@ async fn event_request_handler(
}
}

#[post("/batch")]
async fn batch_event_request_handler(
json: web::Json<Vec<AerospikeEvent>>,
data: web::Data<ServerState>,
) -> HttpResponse {
let events = json.into_inner();
let state = data.into_inner();

let mut messages = vec![];
trace!("Aerospike events {:?}", events);
info!("Events counts: {}", events.len());
for event in events {
match map_record(event, &state.tables_index_map).await {
Ok(None) => {}
Ok(Some(message)) => messages.push(message),
Err(e) => return map_error(e),
}
}

trace!("Mapped messages {:?}", messages);
info!("Mapped messages count {}", messages.len());
let (sender, receiver) = oneshot::channel::<()>();
if let Err(e) = state.sender.send(PendingMessage { messages, sender }) {
error!("Ingestor is down: {:?}", e);
return HttpResponse::InternalServerError().finish();
}

if let Err(e) = receiver.await {
error!("Pipeline event processor is down: {:?}", e);
return HttpResponse::InternalServerError().finish();
}

HttpResponse::Ok().finish()
}

#[derive(Clone, Debug)]
struct TableIndexMap {
table_index: usize,
Expand Down Expand Up @@ -604,7 +661,7 @@ impl Connector for AerospikeConnector {
}
}

async fn map_events(
async fn map_record(
event: AerospikeEvent,
tables_map: &HashMap<String, TableIndexMap>,
) -> Result<Option<IngestionMessage>, AerospikeConnectorError> {
Expand Down
2 changes: 2 additions & 0 deletions dozer-sink-oracle/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,7 @@ enum OpKind {
impl OracleSink {
fn exec_batch(&mut self) -> oracle::Result<()> {
debug!("Executing batch of size {}", self.batch_params.len());
let started = std::time::Instant::now();
let mut batch = self
.conn
.batch(&self.merge_statement, self.batch_params.len())
Expand All @@ -620,6 +621,7 @@ impl OracleSink {
batch.append_row(&[])?;
}
batch.execute()?;
info!("Execution took {:?}", started.elapsed());
Ok(())
}

Expand Down

0 comments on commit f86cfd4

Please sign in to comment.