diff --git a/dozer-ingestion/aerospike/src/connector.rs b/dozer-ingestion/aerospike/src/connector.rs index cd61a42c56..a6246638da 100644 --- a/dozer-ingestion/aerospike/src/connector.rs +++ b/dozer-ingestion/aerospike/src/connector.rs @@ -506,7 +506,7 @@ impl Connector for AerospikeConnector { } else { FieldType::String }, - nullable: true, + nullable: name != "PK", source: Default::default(), }) .collect(), diff --git a/dozer-ingestion/oracle/src/connector/mod.rs b/dozer-ingestion/oracle/src/connector/mod.rs index 78c7e13b7a..4b8f874a81 100644 --- a/dozer-ingestion/oracle/src/connector/mod.rs +++ b/dozer-ingestion/oracle/src/connector/mod.rs @@ -33,7 +33,7 @@ pub struct Connector { #[derive(Debug, thiserror::Error)] pub enum Error { - #[error("oracle error: {0}")] + #[error("oracle error: {0:?}")] Oracle(#[from] oracle::Error), #[error("pdb not found: {0}")] PdbNotFound(String), @@ -264,6 +264,7 @@ impl Connector { let columns = table.column_names.join(", "); let owner = table.schema.unwrap_or_else(|| self.username.clone()); let sql = format!("SELECT {} FROM {}.{}", columns, owner, table.name); + debug!("{}", sql); let rows = self.connection.query(&sql, &[])?; let mut batch = Vec::with_capacity(self.batch_size); diff --git a/dozer-ingestion/oracle/src/connector/replicate/log/mod.rs b/dozer-ingestion/oracle/src/connector/replicate/log/mod.rs index 65196ab4cc..aa0a15a098 100644 --- a/dozer-ingestion/oracle/src/connector/replicate/log/mod.rs +++ b/dozer-ingestion/oracle/src/connector/replicate/log/mod.rs @@ -1,5 +1,6 @@ use std::{sync::mpsc::SyncSender, time::Duration}; +use dozer_ingestion_connector::dozer_types::log::debug; use dozer_ingestion_connector::{ dozer_types::{ chrono::{DateTime, Utc}, @@ -72,7 +73,7 @@ fn log_reader_loop( let mut last_rba: Option = None; loop { - info!("Listing logs starting from SCN {}", start_scn); + debug!(target: "oracle_replication", "Listing logs starting from SCN {}", start_scn); let mut logs = match list_logs(connection, start_scn) { Ok(logs) => logs, Err(e) => { @@ -95,7 +96,7 @@ fn log_reader_loop( 'replicate_logs: while !logs.is_empty() { let log = logs.remove(0); - info!( + debug!(target: "oracle_replication", "Reading log {} ({}) ({}, {}), starting from {:?}", log.name, log.sequence, log.first_change, log.next_change, last_rba ); @@ -145,7 +146,7 @@ fn log_reader_loop( if ingestor.is_closed() { return; } - info!("Read all logs, retrying after {:?}", poll_interval); + debug!(target: "oracle_replication", "Read all logs, retrying after {:?}", poll_interval); std::thread::sleep(poll_interval); } else { // If there are more logs, we need to start from the next log's first change. diff --git a/dozer-sink-aerospike/src/aerospike.rs b/dozer-sink-aerospike/src/aerospike.rs index be4f7fa39e..0b1c53305c 100644 --- a/dozer-sink-aerospike/src/aerospike.rs +++ b/dozer-sink-aerospike/src/aerospike.rs @@ -1,3 +1,4 @@ +use std::time::Instant; use std::{ alloc::{handle_alloc_error, Layout}, ffi::{c_char, c_void, CStr, CString, NulError}, @@ -9,6 +10,7 @@ use std::{ use itertools::Itertools; use aerospike_client_sys::*; +use dozer_types::log::debug; use dozer_types::{ chrono::{DateTime, NaiveDate}, geo::{Coord, Point}, @@ -147,7 +149,9 @@ impl Client { as_config_init(config.as_mut_ptr()); config.assume_init() }; - config.policies.batch.base.total_timeout = 10000; + + config.policies.batch.base.total_timeout = 30000; + config.policies.batch.base.socket_timeout = 30000; config.policies.write.key = as_policy_key_e_AS_POLICY_KEY_SEND; config.policies.batch_write.key = as_policy_key_e_AS_POLICY_KEY_SEND; unsafe { @@ -185,6 +189,7 @@ impl Client { if let Some(filter) = filter { policy.base.filter_exp = filter.as_ptr(); } + as_try(|err| { aerospike_key_put( self.inner.as_ptr(), @@ -204,6 +209,7 @@ impl Client { ) -> Result<(), AerospikeError> { let mut policy = self.inner.as_ref().config.policies.write; policy.exists = as_policy_exists_e_AS_POLICY_EXISTS_CREATE; + self.put(key, new, policy, filter) } @@ -252,7 +258,20 @@ impl Client { &self, batch: *mut as_batch_records, ) -> Result<(), AerospikeError> { - as_try(|err| aerospike_batch_write(self.inner.as_ptr(), err, std::ptr::null(), batch)) + debug!(target: "aerospike_sink", "Writing batch of size {}", batch.as_ref().unwrap().list.size); + + let started = Instant::now(); + let policy = self.inner.as_ref().config.policies.batch; + as_try(|err| { + aerospike_batch_write( + self.inner.as_ptr(), + err, + &policy as *const as_policy_batch, + batch, + ) + })?; + debug!(target: "aerospike_sink", "Batch write took {:?}", started.elapsed()); + Ok(()) } pub(crate) unsafe fn _select( diff --git a/dozer-sink-aerospike/src/denorm_dag.rs b/dozer-sink-aerospike/src/denorm_dag.rs index 9d80c0265f..488eae0892 100644 --- a/dozer-sink-aerospike/src/denorm_dag.rs +++ b/dozer-sink-aerospike/src/denorm_dag.rs @@ -10,6 +10,7 @@ use dozer_core::petgraph::visit::{ EdgeRef, IntoEdgesDirected, IntoNeighborsDirected, IntoNodeReferences, }; use dozer_types::indexmap::IndexMap; + use dozer_types::models::sink::{AerospikeSet, AerospikeSinkTable}; use dozer_types::thiserror; use dozer_types::types::{Field, Record, Schema, TableOperation}; diff --git a/dozer-sink-aerospike/src/lib.rs b/dozer-sink-aerospike/src/lib.rs index 7c7e1883e3..762436f8a7 100644 --- a/dozer-sink-aerospike/src/lib.rs +++ b/dozer-sink-aerospike/src/lib.rs @@ -239,7 +239,6 @@ impl Drop for AsRecord<'_> { struct AerospikeSink { config: AerospikeSinkConfig, replication_worker: AerospikeSinkWorker, - current_transaction: Option, metadata_namespace: CString, metadata_set: CString, client: Arc, @@ -365,7 +364,6 @@ impl AerospikeSink { Ok(Self { config, replication_worker: worker_instance, - current_transaction: None, metadata_namespace, metadata_set, client, @@ -441,6 +439,7 @@ impl AerospikeSinkWorker { .sum(); // Write denormed tables let mut batch = RecordBatch::new(batch_size_est as u32, batch_size_est as u32); + for table in denormalized_tables { for (key, record) in table.records { batch.add_write( @@ -475,17 +474,21 @@ impl Sink for AerospikeSink { Ok(()) } - fn commit(&mut self, _epoch_details: &dozer_core::epoch::Epoch) -> Result<(), BoxedError> { - self.replication_worker - .commit(self.current_transaction.take())?; + fn commit(&mut self, epoch_details: &dozer_core::epoch::Epoch) -> Result<(), BoxedError> { + debug_assert_eq!(epoch_details.common_info.source_states.len(), 1); + let txid = epoch_details + .common_info + .source_states + .iter() + .next() + .and_then(|(_, state)| state.op_id()) + .map(|op_id| op_id.txid); + + self.replication_worker.commit(txid)?; Ok(()) } fn process(&mut self, op: TableOperation) -> Result<(), BoxedError> { - // Set current transaction before any error can be thrown, so we don't - // get stuck in an error loop if this error gets ignored by the caller - self.current_transaction = op.id.map(|id| id.txid); - self.replication_worker.process(op)?; Ok(()) } diff --git a/dozer-sink-oracle/src/lib.rs b/dozer-sink-oracle/src/lib.rs index 757659d4e1..088b9b7c92 100644 --- a/dozer-sink-oracle/src/lib.rs +++ b/dozer-sink-oracle/src/lib.rs @@ -27,6 +27,7 @@ use oracle::{ const TXN_ID_COL: &str = "__txn_id"; const TXN_SEQ_COL: &str = "__txn_seq"; +const OPKIND_COL: &str = "DOZER_OPKIND"; const METADATA_TABLE: &str = "__replication_metadata"; const META_TXN_ID_COL: &str = "txn_id"; const META_TABLE_COL: &str = "table"; @@ -96,6 +97,8 @@ struct OracleSink { update_metadata: String, select_metadata: String, latest_txid: Option, + insert_statement: String, + delete_statement: String, } #[derive(Debug)] @@ -265,13 +268,10 @@ impl OracleSinkFactory { &self, connection: &Connection, table: &Table, + temp_table: Option<&Table>, schema: &Schema, ) -> Result<(), Error> { - if self.validate_table(connection, table, schema)? { - return Ok(()); - } - - let mut column_defs = Vec::with_capacity(schema.fields.len() + 2); + let mut column_defs = Vec::with_capacity(schema.fields.len()); for field in &schema.fields { let name = &field.name; let col_type = match field.typ { @@ -297,9 +297,18 @@ impl OracleSinkFactory { if field.nullable { "" } else { " NOT NULL" } )); } - let table = format!("CREATE TABLE {table} ({})", column_defs.join(",\n")); - info!("### CREATE TABLE #### \n: {:?}", table); - connection.execute(&table, &[])?; + + if !(self.validate_table(connection, table, schema)?) { + let table_query = format!("CREATE TABLE {table} ({})", column_defs.join(",\n")); + info!("### CREATE TABLE ####\n{}", table_query); + connection.execute(&table_query, &[])?; + } + + if let Some(temp_table) = temp_table { + let temp_table_query = format!("CREATE PRIVATE TEMPORARY TABLE {temp_table} ({},\n {OPKIND_COL} NUMBER(1)) ON COMMIT PRESERVE DEFINITION", column_defs.join(",\n")).replace("NOT NULL", ""); + info!("### CREATE TEMPORARY TABLE ####\n{}", temp_table_query); + connection.execute(&temp_table_query, &[])?; + } Ok(()) } @@ -349,20 +358,14 @@ impl OracleSinkFactory { Ok(()) } } -fn generate_merge_statement(table: &Table, schema: &Schema) -> String { + +fn generate_merge_statement(table: &Table, temp_table: &Table, schema: &Schema) -> String { let field_names = schema .fields .iter() .map(|field| field.name.as_str()) .chain([TXN_ID_COL, TXN_SEQ_COL]); - let mut parameter_index = 1usize..; - let input_fields = field_names - .clone() - .zip(&mut parameter_index) - .map(|(name, i)| format!(":{i} \"{name}\"")) - .collect::>() - .join(", "); let destination_columns = field_names .clone() .map(|name| format!("D.\"{name}\"")) @@ -394,8 +397,6 @@ fn generate_merge_statement(table: &Table, schema: &Schema) -> String { pk_select = "1 = 1".to_owned(); } - let opkind_idx = parameter_index.next().unwrap(); - let opid_select = format!( r#"(D."{TXN_ID_COL}" IS NULL OR S."{TXN_ID_COL}" > D."{TXN_ID_COL}" @@ -408,7 +409,7 @@ fn generate_merge_statement(table: &Table, schema: &Schema) -> String { // do nothing (if the op is INSERT, format!( r#"MERGE INTO {table} D - USING (SELECT {input_fields}, :{opkind_idx} DOZER_OPKIND FROM DUAL) S + USING {temp_table} S ON ({pk_select}) WHEN NOT MATCHED THEN INSERT ({destination_columns}) VALUES ({source_values}) WHERE S.DOZER_OPKIND = 0 WHEN MATCHED THEN UPDATE SET {destination_assign} WHERE S.DOZER_OPKIND = 1 AND {opid_select} @@ -417,6 +418,37 @@ fn generate_merge_statement(table: &Table, schema: &Schema) -> String { ) } +fn generate_insert_statement(table: &Table, schema: &Schema) -> String { + let field_names = schema + .fields + .iter() + .map(|field| field.name.as_str()) + .chain([TXN_ID_COL, TXN_SEQ_COL, OPKIND_COL]); + + let mut parameter_index = 1usize..; + let input_fields = field_names + .clone() + .zip(&mut parameter_index) + .map(|(name, i)| format!(":{i} \"{name}\"")) + .collect::>() + .join(", "); + + // Match on PK and txn_id. + // If the record does not exist and the op is INSERT, do the INSERT + // If the record exists, but the txid is higher than the operation's txid, + // do nothing (if the op is INSERT, + format!( + r#"INSERT INTO {table} + SELECT * + FROM + (SELECT {input_fields} FROM DUAL) + "# + ) +} +fn generate_delete_statement(table: &Table) -> String { + format!(r#"DELETE FROM {table}"#) +} + #[derive(Debug, Clone)] struct Table { owner: String, @@ -477,7 +509,17 @@ impl SinkFactory for OracleSinkFactory { false, ); - self.validate_or_create_table(&connection, &self.table, &amended_schema)?; + let temp_table = Table { + owner: self.table.owner.clone(), + name: format!("ORA$PTT_{}", &self.table.name), + }; + + self.validate_or_create_table( + &connection, + &self.table, + Some(&temp_table), + &amended_schema, + )?; self.create_index(&connection, &self.table, &amended_schema)?; let meta_table = Table { owner: self.table.owner.clone(), @@ -486,6 +528,7 @@ impl SinkFactory for OracleSinkFactory { self.validate_or_create_table( &connection, &meta_table, + None, Schema::new() .field( FieldDefinition { @@ -518,10 +561,22 @@ impl SinkFactory for OracleSinkFactory { ); let field_types = schema.fields.iter().map(|field| field.typ).collect(); + + let merge_statement = generate_merge_statement(&self.table, &temp_table, &schema); + info!(target: "oracle_sink", "Merge statement {}", merge_statement); + + let insert_statement = generate_insert_statement(&temp_table, &schema); + info!(target: "oracle_sink", "Insert statement {}", insert_statement); + + let delete_statement = generate_delete_statement(&temp_table); + info!(target: "oracle_sink", "Delete statement {}", delete_statement); + Ok(Box::new(OracleSink { conn: connection, insert_append, - merge_statement: generate_merge_statement(&self.table, &schema), + merge_statement, + insert_statement, + delete_statement, field_types, pk: schema.primary_index, batch_params: Vec::new(), @@ -599,9 +654,10 @@ impl OracleSink { fn exec_batch(&mut self) -> oracle::Result<()> { debug!(target: "oracle_sink", "Executing batch of size {}", self.batch_params.len()); let started = std::time::Instant::now(); + let mut batch = self .conn - .batch(&self.merge_statement, self.batch_params.len()) + .batch(&self.insert_statement, self.batch_params.len()) .build()?; for params in self.batch_params.drain(..) { let mut bind_idx = 1..; @@ -621,6 +677,11 @@ impl OracleSink { batch.append_row(&[])?; } batch.execute()?; + + self.conn.execute(&self.merge_statement, &[])?; + + self.conn.execute(&self.delete_statement, &[])?; + debug!(target: "oracle_sink", "Execution took {:?}", started.elapsed()); Ok(()) } @@ -648,6 +709,7 @@ impl Sink for OracleSink { &mut self, _epoch_details: &dozer_core::epoch::Epoch, ) -> Result<(), dozer_types::errors::internal::BoxedError> { + // Ok(self.conn.commit()?) Ok(()) } @@ -786,13 +848,19 @@ mod tests { owner: "owner".to_owned(), name: "tablename".to_owned(), }; - let stmt = generate_merge_statement(&table, &schema); + + let temp_table = Table { + owner: "owner".to_owned(), + name: "tablename_temp".to_owned(), + }; + + let stmt = generate_merge_statement(&table, &temp_table, &schema); assert_eq!( trim_str(stmt), trim_str( r#" MERGE INTO "owner"."tablename" D - USING (SELECT :1 "id", :2 "name", :3 "content", :4 "__txn_id", :5 "__txn_seq", :6 DOZER_OPKIND FROM DUAL) S + USING "owner"."tablename_temp" S ON (D."id" = S."id" AND D."name" = S."name") WHEN NOT MATCHED THEN INSERT (D."id", D."name", D."content", D."__txn_id", D."__txn_seq") VALUES (S."id", S."name", S."content", S."__txn_id", S."__txn_seq") WHERE S.DOZER_OPKIND = 0 WHEN MATCHED THEN UPDATE SET D."content" = S."content", D."__txn_id" = S."__txn_id", D."__txn_seq" = S."__txn_seq" diff --git a/dozer-types/src/node.rs b/dozer-types/src/node.rs index f84485f0f7..b5b5562ae5 100644 --- a/dozer-types/src/node.rs +++ b/dozer-types/src/node.rs @@ -117,6 +117,16 @@ pub enum SourceState { Restartable(OpIdentifier), } +impl SourceState { + pub fn op_id(&self) -> Option<&OpIdentifier> { + if let Self::Restartable(op_id) = self { + Some(op_id) + } else { + None + } + } +} + /// Map from a `Source` node's handle to it state. /// /// This uniquely identifies the state of the Dozer pipeline.