Skip to content

Commit

Permalink
feat: Use temp table in oracle sink and improve logging in aerospike …
Browse files Browse the repository at this point in the history
…sink
  • Loading branch information
karolisg committed Mar 28, 2024
1 parent f155736 commit a44fcc7
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 42 deletions.
7 changes: 2 additions & 5 deletions dozer-ingestion/aerospike/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ impl Connector for AerospikeConnector {
} else {
FieldType::String
},
nullable: true,
nullable: name != "PK",
source: Default::default(),
})
.collect(),
Expand Down Expand Up @@ -713,10 +713,7 @@ async fn map_record(
fields[*pk] = Field::String(s.clone());
}
serde_json::Value::Number(n) => {
fields[*pk] = Field::UInt(
n.as_u64()
.ok_or(AerospikeConnectorError::ParsingUIntFailed)?,
);
fields[*pk] = Field::String(n.as_str().to_string());
}
v => return Err(AerospikeConnectorError::KeyNotSupported(v)),
}
Expand Down
3 changes: 2 additions & 1 deletion dozer-ingestion/oracle/src/connector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub struct Connector {

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("oracle error: {0}")]
#[error("oracle error: {0:?}")]
Oracle(#[from] oracle::Error),
#[error("pdb not found: {0}")]
PdbNotFound(String),
Expand Down Expand Up @@ -264,6 +264,7 @@ impl Connector {
let columns = table.column_names.join(", ");
let owner = table.schema.unwrap_or_else(|| self.username.clone());
let sql = format!("SELECT {} FROM {}.{}", columns, owner, table.name);
debug!("{}", sql);
let rows = self.connection.query(&sql, &[])?;

let mut batch = Vec::with_capacity(self.batch_size);
Expand Down
7 changes: 4 additions & 3 deletions dozer-ingestion/oracle/src/connector/replicate/log/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{sync::mpsc::SyncSender, time::Duration};

use dozer_ingestion_connector::dozer_types::log::debug;
use dozer_ingestion_connector::{
dozer_types::{
chrono::{DateTime, Utc},
Expand Down Expand Up @@ -72,7 +73,7 @@ fn log_reader_loop(
let mut last_rba: Option<LastRba> = None;

loop {
info!("Listing logs starting from SCN {}", start_scn);
debug!(target: "oracle_replication", "Listing logs starting from SCN {}", start_scn);
let mut logs = match list_logs(connection, start_scn) {
Ok(logs) => logs,
Err(e) => {
Expand All @@ -95,7 +96,7 @@ fn log_reader_loop(

'replicate_logs: while !logs.is_empty() {
let log = logs.remove(0);
info!(
debug!(target: "oracle_replication",
"Reading log {} ({}) ({}, {}), starting from {:?}",
log.name, log.sequence, log.first_change, log.next_change, last_rba
);
Expand Down Expand Up @@ -145,7 +146,7 @@ fn log_reader_loop(
if ingestor.is_closed() {
return;
}
info!("Read all logs, retrying after {:?}", poll_interval);
debug!(target: "oracle_replication", "Read all logs, retrying after {:?}", poll_interval);
std::thread::sleep(poll_interval);
} else {
// If there are more logs, we need to start from the next log's first change.
Expand Down
24 changes: 21 additions & 3 deletions dozer-sink-aerospike/src/aerospike.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::time::Instant;
use std::{
alloc::{handle_alloc_error, Layout},
ffi::{c_char, c_void, CStr, CString, NulError},
Expand All @@ -9,6 +10,7 @@ use std::{
use itertools::Itertools;

use aerospike_client_sys::*;
use dozer_types::log::{debug, info};
use dozer_types::{
chrono::{DateTime, NaiveDate},
geo::{Coord, Point},
Expand Down Expand Up @@ -147,7 +149,9 @@ impl Client {
as_config_init(config.as_mut_ptr());
config.assume_init()
};
config.policies.batch.base.total_timeout = 10000;

config.policies.batch.base.total_timeout = 30000;
config.policies.batch.base.socket_timeout = 30000;
config.policies.write.key = as_policy_key_e_AS_POLICY_KEY_SEND;
config.policies.batch_write.key = as_policy_key_e_AS_POLICY_KEY_SEND;
unsafe {
Expand Down Expand Up @@ -185,6 +189,7 @@ impl Client {
if let Some(filter) = filter {
policy.base.filter_exp = filter.as_ptr();
}

as_try(|err| {
aerospike_key_put(
self.inner.as_ptr(),
Expand All @@ -204,6 +209,7 @@ impl Client {
) -> Result<(), AerospikeError> {
let mut policy = self.inner.as_ref().config.policies.write;
policy.exists = as_policy_exists_e_AS_POLICY_EXISTS_CREATE;

self.put(key, new, policy, filter)
}

Expand Down Expand Up @@ -252,7 +258,18 @@ impl Client {
&self,
batch: *mut as_batch_records,
) -> Result<(), AerospikeError> {
as_try(|err| aerospike_batch_write(self.inner.as_ptr(), err, std::ptr::null(), batch))
let started = Instant::now();
let policy = self.inner.as_ref().config.policies.batch;
as_try(|err| {
aerospike_batch_write(
self.inner.as_ptr(),
err,
&policy as *const as_policy_batch,
batch,
)
})?;
debug!(target: "aerospike_sink", "Batch write took {:?}", started.elapsed());
Ok(())
}

pub(crate) unsafe fn _select(
Expand Down Expand Up @@ -293,7 +310,7 @@ impl Client {
&self,
batch: *mut as_batch_records,
) -> Result<(), AerospikeError> {
dbg!("Batch get {} records", (*batch).list.size);
info!("Batch get {} records", (*batch).list.size);
as_try(|err| aerospike_batch_read(self.inner.as_ptr(), err, std::ptr::null(), batch))
}

Expand All @@ -306,6 +323,7 @@ impl Client {
request: &CStr,
response: &mut *mut i8,
) -> Result<(), AerospikeError> {
info!("Info");
as_try(|err| {
aerospike_info_any(
self.inner.as_ptr(),
Expand Down
2 changes: 2 additions & 0 deletions dozer-sink-aerospike/src/denorm_dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use dozer_core::petgraph::visit::{
EdgeRef, IntoEdgesDirected, IntoNeighborsDirected, IntoNodeReferences,
};
use dozer_types::indexmap::IndexMap;
use dozer_types::log::info;
use dozer_types::models::sink::{AerospikeSet, AerospikeSinkTable};
use dozer_types::thiserror;
use dozer_types::types::{Field, Record, Schema, TableOperation};
Expand Down Expand Up @@ -498,6 +499,7 @@ impl DenormalizationState {
.sum();

let batch_size: u32 = batch_size_upper_bound.try_into().unwrap();
info!("Writing denorm batch of size {}", batch_size);
let mut write_batch = RecordBatch::new(batch_size, batch_size);

for node in self.dag.node_weights_mut() {
Expand Down
24 changes: 14 additions & 10 deletions dozer-sink-aerospike/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pub use crate::aerospike::Client;
use aerospike_client_sys::*;
use denorm_dag::DenormalizationState;
use dozer_core::event::EventHub;
use dozer_types::log::error;
use dozer_types::log::{error, info};
use dozer_types::models::connection::AerospikeConnection;
use dozer_types::node::OpIdentifier;
use dozer_types::thiserror;
Expand Down Expand Up @@ -239,7 +239,6 @@ impl Drop for AsRecord<'_> {
struct AerospikeSink {
config: AerospikeSinkConfig,
replication_worker: AerospikeSinkWorker,
current_transaction: Option<u64>,
metadata_namespace: CString,
metadata_set: CString,
client: Arc<Client>,
Expand Down Expand Up @@ -365,7 +364,6 @@ impl AerospikeSink {
Ok(Self {
config,
replication_worker: worker_instance,
current_transaction: None,
metadata_namespace,
metadata_set,
client,
Expand Down Expand Up @@ -441,6 +439,8 @@ impl AerospikeSinkWorker {
.sum();
// Write denormed tables
let mut batch = RecordBatch::new(batch_size_est as u32, batch_size_est as u32);

info!("Sink batch size {batch_size_est}");
for table in denormalized_tables {
for (key, record) in table.records {
batch.add_write(
Expand Down Expand Up @@ -475,17 +475,21 @@ impl Sink for AerospikeSink {
Ok(())
}

fn commit(&mut self, _epoch_details: &dozer_core::epoch::Epoch) -> Result<(), BoxedError> {
self.replication_worker
.commit(self.current_transaction.take())?;
fn commit(&mut self, epoch_details: &dozer_core::epoch::Epoch) -> Result<(), BoxedError> {
debug_assert_eq!(epoch_details.common_info.source_states.len(), 1);
let txid = epoch_details
.common_info
.source_states
.iter()
.next()
.and_then(|(_, state)| state.op_id())
.map(|op_id| op_id.txid);

self.replication_worker.commit(txid)?;
Ok(())
}

fn process(&mut self, op: TableOperation) -> Result<(), BoxedError> {
// Set current transaction before any error can be thrown, so we don't
// get stuck in an error loop if this error gets ignored by the caller
self.current_transaction = op.id.map(|id| id.txid);

self.replication_worker.process(op)?;
Ok(())
}
Expand Down
Loading

0 comments on commit a44fcc7

Please sign in to comment.