diff --git a/dozer-ingestion/aerospike/src/connector.rs b/dozer-ingestion/aerospike/src/connector.rs index 23dd38bdde..87b3efc6aa 100644 --- a/dozer-ingestion/aerospike/src/connector.rs +++ b/dozer-ingestion/aerospike/src/connector.rs @@ -14,6 +14,7 @@ use dozer_ingestion_connector::{ use std::collections::HashMap; use std::ffi::CString; use std::num::TryFromIntError; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use dozer_ingestion_connector::dozer_types::serde::Deserialize; @@ -370,16 +371,29 @@ impl Connector for AerospikeConnector { &mut self, ingestor: &Ingestor, tables: Vec, - _last_checkpoint: Option, + last_checkpoint: Option, ) -> Result<(), BoxedError> { let hosts = CString::new(self.config.hosts.as_str())?; let client = Client::new(&hosts).map_err(Box::new)?; + unsafe { let dc_name = self.config.replication.datacenter.clone(); let namespace = self.config.namespace.clone(); - let request = CString::new(format!( + + let request = if let Some(last_checkpoint) = last_checkpoint { + let start = SystemTime::now(); + let since_the_epoch = start.duration_since(UNIX_EPOCH).expect("Invalid time"); + let lut = Duration::from_millis(last_checkpoint.txid); + let seconds = (since_the_epoch - lut + Duration::from_secs(60)).as_secs() as u64; + CString::new(format!( + "set-config:context=xdr;dc={}namespace={};action=add;rewind={}", + dc_name, namespace, seconds + ))? + } else { + CString::new(format!( "set-config:context=xdr;dc={dc_name};namespace={namespace};action=add;rewind=all" - ))?; + ))? + }; let mut response: *mut i8 = std::ptr::null_mut(); client.info(&request, &mut response).map_err(Box::new)?; }