Skip to content

Commit

Permalink
Add targets in log
Browse files Browse the repository at this point in the history
  • Loading branch information
karolisg committed Mar 7, 2024
1 parent 75ee6ee commit e737e49
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 7 deletions.
14 changes: 9 additions & 5 deletions dozer-ingestion/aerospike/src/connector.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use dozer_ingestion_connector::dozer_types::errors::internal::BoxedError;
use dozer_ingestion_connector::dozer_types::event::Event;
use dozer_ingestion_connector::dozer_types::log::{error, info, trace, warn};
use dozer_ingestion_connector::dozer_types::log::{debug, error, info, trace, warn};
use dozer_ingestion_connector::dozer_types::models::connection::AerospikeConnection;
use dozer_ingestion_connector::dozer_types::models::ingestion_types::{
IngestionMessage, TransactionInfo,
Expand Down Expand Up @@ -336,15 +336,15 @@ async fn event_request_handler(
let event = json.into_inner();
let state = data.into_inner();

trace!("Event data: {:?}", event);
trace!(target: "aerospike_http_server", "Event data: {:?}", event);
// TODO: Handle delete
if event.msg != "write" {
return HttpResponse::Ok().finish();
}

let message = map_record(event, &state.tables_index_map).await;

trace!("Mapped message {:?}", message);
trace!(target: "aerospike_http_server", "Mapped message {:?}", message);
match message {
Ok(None) => HttpResponse::Ok().finish(),
Ok(Some(message)) => {
Expand Down Expand Up @@ -376,7 +376,9 @@ async fn batch_event_request_handler(
let state = data.into_inner();

let mut messages = vec![];
trace!("Aerospike events (count {:?}) {:?}", events.len(), events);
debug!(target: "aerospike_http_server", "Aerospike events count {:?}", events.len());
trace!(target: "aerospike_http_server", "Aerospike events {:?}", events);

for event in events {
match map_record(event, &state.tables_index_map).await {
Ok(None) => {}
Expand All @@ -385,7 +387,9 @@ async fn batch_event_request_handler(
}
}

trace!("Mapped {:?} messages {:?}", messages.len(), messages);
debug!(target: "aerospike_http_server", "Mapped {:?} messages", messages.len());
trace!(target: "aerospike_http_server", "Mapped messages {:?}", messages);

let (sender, receiver) = oneshot::channel::<()>();
if let Err(e) = state.sender.send(PendingMessage { messages, sender }) {
error!("Ingestor is down: {:?}", e);
Expand Down
4 changes: 2 additions & 2 deletions dozer-sink-oracle/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ enum OpKind {

impl OracleSink {
fn exec_batch(&mut self) -> oracle::Result<()> {
debug!("Executing batch of size {}", self.batch_params.len());
debug!(target: "oracle_sink", "Executing batch of size {}", self.batch_params.len());
let started = std::time::Instant::now();
let mut batch = self
.conn
Expand All @@ -621,7 +621,7 @@ impl OracleSink {
batch.append_row(&[])?;
}
batch.execute()?;
info!("Execution took {:?}", started.elapsed());
debug!(target: "oracle_sink", "Execution took {:?}", started.elapsed());
Ok(())
}

Expand Down

0 comments on commit e737e49

Please sign in to comment.