Skip to content

Commit

Permalink
Add one min
Browse files Browse the repository at this point in the history
  • Loading branch information
mediuminvader committed Feb 15, 2024
1 parent 9b9fd3d commit 93dca54
Showing 1 changed file with 17 additions and 3 deletions.
20 changes: 17 additions & 3 deletions dozer-ingestion/aerospike/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use dozer_ingestion_connector::{
use std::collections::HashMap;
use std::ffi::CString;
use std::num::TryFromIntError;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use dozer_ingestion_connector::dozer_types::serde::Deserialize;

Expand Down Expand Up @@ -370,16 +371,29 @@ impl Connector for AerospikeConnector {
&mut self,
ingestor: &Ingestor,
tables: Vec<TableInfo>,
_last_checkpoint: Option<OpIdentifier>,
last_checkpoint: Option<OpIdentifier>,
) -> Result<(), BoxedError> {
let hosts = CString::new(self.config.hosts.as_str())?;
let client = Client::new(&hosts).map_err(Box::new)?;

unsafe {
let dc_name = self.config.replication.datacenter.clone();
let namespace = self.config.namespace.clone();
let request = CString::new(format!(

let request = if let Some(last_checkpoint) = last_checkpoint {
let start = SystemTime::now();
let since_the_epoch = start.duration_since(UNIX_EPOCH).expect("Invalid time");
let lut = Duration::from_millis(last_checkpoint.txid);
let seconds = (since_the_epoch - lut + Duration::from_secs(60)).as_secs() as u64;
CString::new(format!(
"set-config:context=xdr;dc={}namespace={};action=add;rewind={}",
dc_name, namespace, seconds
))?
} else {
CString::new(format!(
"set-config:context=xdr;dc={dc_name};namespace={namespace};action=add;rewind=all"
))?;
))?
};
let mut response: *mut i8 = std::ptr::null_mut();
client.info(&request, &mut response).map_err(Box::new)?;
}
Expand Down

0 comments on commit 93dca54

Please sign in to comment.