From a44fcc705bd29bd653c4baec68d417c674f9d417 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karolis=20Gudi=C5=A1kis?= Date: Tue, 12 Mar 2024 11:43:36 +0200 Subject: [PATCH] feat: Use temp table in oracle sink and improve logging in aerospike sink --- dozer-ingestion/aerospike/src/connector.rs | 7 +- dozer-ingestion/oracle/src/connector/mod.rs | 3 +- .../oracle/src/connector/replicate/log/mod.rs | 7 +- dozer-sink-aerospike/src/aerospike.rs | 24 ++- dozer-sink-aerospike/src/denorm_dag.rs | 2 + dozer-sink-aerospike/src/lib.rs | 24 +-- dozer-sink-oracle/src/lib.rs | 137 +++++++++++++++--- dozer-types/src/node.rs | 10 ++ 8 files changed, 172 insertions(+), 42 deletions(-) diff --git a/dozer-ingestion/aerospike/src/connector.rs b/dozer-ingestion/aerospike/src/connector.rs index cd61a42c56..cef23499b9 100644 --- a/dozer-ingestion/aerospike/src/connector.rs +++ b/dozer-ingestion/aerospike/src/connector.rs @@ -506,7 +506,7 @@ impl Connector for AerospikeConnector { } else { FieldType::String }, - nullable: true, + nullable: name != "PK", source: Default::default(), }) .collect(), @@ -713,10 +713,7 @@ async fn map_record( fields[*pk] = Field::String(s.clone()); } serde_json::Value::Number(n) => { - fields[*pk] = Field::UInt( - n.as_u64() - .ok_or(AerospikeConnectorError::ParsingUIntFailed)?, - ); + fields[*pk] = Field::String(n.as_str().to_string()); } v => return Err(AerospikeConnectorError::KeyNotSupported(v)), } diff --git a/dozer-ingestion/oracle/src/connector/mod.rs b/dozer-ingestion/oracle/src/connector/mod.rs index 78c7e13b7a..4b8f874a81 100644 --- a/dozer-ingestion/oracle/src/connector/mod.rs +++ b/dozer-ingestion/oracle/src/connector/mod.rs @@ -33,7 +33,7 @@ pub struct Connector { #[derive(Debug, thiserror::Error)] pub enum Error { - #[error("oracle error: {0}")] + #[error("oracle error: {0:?}")] Oracle(#[from] oracle::Error), #[error("pdb not found: {0}")] PdbNotFound(String), @@ -264,6 +264,7 @@ impl Connector { let columns = table.column_names.join(", "); let owner = table.schema.unwrap_or_else(|| self.username.clone()); let sql = format!("SELECT {} FROM {}.{}", columns, owner, table.name); + debug!("{}", sql); let rows = self.connection.query(&sql, &[])?; let mut batch = Vec::with_capacity(self.batch_size); diff --git a/dozer-ingestion/oracle/src/connector/replicate/log/mod.rs b/dozer-ingestion/oracle/src/connector/replicate/log/mod.rs index 65196ab4cc..aa0a15a098 100644 --- a/dozer-ingestion/oracle/src/connector/replicate/log/mod.rs +++ b/dozer-ingestion/oracle/src/connector/replicate/log/mod.rs @@ -1,5 +1,6 @@ use std::{sync::mpsc::SyncSender, time::Duration}; +use dozer_ingestion_connector::dozer_types::log::debug; use dozer_ingestion_connector::{ dozer_types::{ chrono::{DateTime, Utc}, @@ -72,7 +73,7 @@ fn log_reader_loop( let mut last_rba: Option = None; loop { - info!("Listing logs starting from SCN {}", start_scn); + debug!(target: "oracle_replication", "Listing logs starting from SCN {}", start_scn); let mut logs = match list_logs(connection, start_scn) { Ok(logs) => logs, Err(e) => { @@ -95,7 +96,7 @@ fn log_reader_loop( 'replicate_logs: while !logs.is_empty() { let log = logs.remove(0); - info!( + debug!(target: "oracle_replication", "Reading log {} ({}) ({}, {}), starting from {:?}", log.name, log.sequence, log.first_change, log.next_change, last_rba ); @@ -145,7 +146,7 @@ fn log_reader_loop( if ingestor.is_closed() { return; } - info!("Read all logs, retrying after {:?}", poll_interval); + debug!(target: "oracle_replication", "Read all logs, retrying after {:?}", poll_interval); std::thread::sleep(poll_interval); } else { // If there are more logs, we need to start from the next log's first change. diff --git a/dozer-sink-aerospike/src/aerospike.rs b/dozer-sink-aerospike/src/aerospike.rs index be4f7fa39e..615d92fe62 100644 --- a/dozer-sink-aerospike/src/aerospike.rs +++ b/dozer-sink-aerospike/src/aerospike.rs @@ -1,3 +1,4 @@ +use std::time::Instant; use std::{ alloc::{handle_alloc_error, Layout}, ffi::{c_char, c_void, CStr, CString, NulError}, @@ -9,6 +10,7 @@ use std::{ use itertools::Itertools; use aerospike_client_sys::*; +use dozer_types::log::{debug, info}; use dozer_types::{ chrono::{DateTime, NaiveDate}, geo::{Coord, Point}, @@ -147,7 +149,9 @@ impl Client { as_config_init(config.as_mut_ptr()); config.assume_init() }; - config.policies.batch.base.total_timeout = 10000; + + config.policies.batch.base.total_timeout = 30000; + config.policies.batch.base.socket_timeout = 30000; config.policies.write.key = as_policy_key_e_AS_POLICY_KEY_SEND; config.policies.batch_write.key = as_policy_key_e_AS_POLICY_KEY_SEND; unsafe { @@ -185,6 +189,7 @@ impl Client { if let Some(filter) = filter { policy.base.filter_exp = filter.as_ptr(); } + as_try(|err| { aerospike_key_put( self.inner.as_ptr(), @@ -204,6 +209,7 @@ impl Client { ) -> Result<(), AerospikeError> { let mut policy = self.inner.as_ref().config.policies.write; policy.exists = as_policy_exists_e_AS_POLICY_EXISTS_CREATE; + self.put(key, new, policy, filter) } @@ -252,7 +258,18 @@ impl Client { &self, batch: *mut as_batch_records, ) -> Result<(), AerospikeError> { - as_try(|err| aerospike_batch_write(self.inner.as_ptr(), err, std::ptr::null(), batch)) + let started = Instant::now(); + let policy = self.inner.as_ref().config.policies.batch; + as_try(|err| { + aerospike_batch_write( + self.inner.as_ptr(), + err, + &policy as *const as_policy_batch, + batch, + ) + })?; + debug!(target: "aerospike_sink", "Batch write took {:?}", started.elapsed()); + Ok(()) } pub(crate) unsafe fn _select( @@ -293,7 +310,7 @@ impl Client { &self, batch: *mut as_batch_records, ) -> Result<(), AerospikeError> { - dbg!("Batch get {} records", (*batch).list.size); + info!("Batch get {} records", (*batch).list.size); as_try(|err| aerospike_batch_read(self.inner.as_ptr(), err, std::ptr::null(), batch)) } @@ -306,6 +323,7 @@ impl Client { request: &CStr, response: &mut *mut i8, ) -> Result<(), AerospikeError> { + info!("Info"); as_try(|err| { aerospike_info_any( self.inner.as_ptr(), diff --git a/dozer-sink-aerospike/src/denorm_dag.rs b/dozer-sink-aerospike/src/denorm_dag.rs index 9d80c0265f..60f7029257 100644 --- a/dozer-sink-aerospike/src/denorm_dag.rs +++ b/dozer-sink-aerospike/src/denorm_dag.rs @@ -10,6 +10,7 @@ use dozer_core::petgraph::visit::{ EdgeRef, IntoEdgesDirected, IntoNeighborsDirected, IntoNodeReferences, }; use dozer_types::indexmap::IndexMap; +use dozer_types::log::info; use dozer_types::models::sink::{AerospikeSet, AerospikeSinkTable}; use dozer_types::thiserror; use dozer_types::types::{Field, Record, Schema, TableOperation}; @@ -498,6 +499,7 @@ impl DenormalizationState { .sum(); let batch_size: u32 = batch_size_upper_bound.try_into().unwrap(); + info!("Writing denorm batch of size {}", batch_size); let mut write_batch = RecordBatch::new(batch_size, batch_size); for node in self.dag.node_weights_mut() { diff --git a/dozer-sink-aerospike/src/lib.rs b/dozer-sink-aerospike/src/lib.rs index 7c7e1883e3..4c68005f6b 100644 --- a/dozer-sink-aerospike/src/lib.rs +++ b/dozer-sink-aerospike/src/lib.rs @@ -3,7 +3,7 @@ pub use crate::aerospike::Client; use aerospike_client_sys::*; use denorm_dag::DenormalizationState; use dozer_core::event::EventHub; -use dozer_types::log::error; +use dozer_types::log::{error, info}; use dozer_types::models::connection::AerospikeConnection; use dozer_types::node::OpIdentifier; use dozer_types::thiserror; @@ -239,7 +239,6 @@ impl Drop for AsRecord<'_> { struct AerospikeSink { config: AerospikeSinkConfig, replication_worker: AerospikeSinkWorker, - current_transaction: Option, metadata_namespace: CString, metadata_set: CString, client: Arc, @@ -365,7 +364,6 @@ impl AerospikeSink { Ok(Self { config, replication_worker: worker_instance, - current_transaction: None, metadata_namespace, metadata_set, client, @@ -441,6 +439,8 @@ impl AerospikeSinkWorker { .sum(); // Write denormed tables let mut batch = RecordBatch::new(batch_size_est as u32, batch_size_est as u32); + + info!("Sink batch size {batch_size_est}"); for table in denormalized_tables { for (key, record) in table.records { batch.add_write( @@ -475,17 +475,21 @@ impl Sink for AerospikeSink { Ok(()) } - fn commit(&mut self, _epoch_details: &dozer_core::epoch::Epoch) -> Result<(), BoxedError> { - self.replication_worker - .commit(self.current_transaction.take())?; + fn commit(&mut self, epoch_details: &dozer_core::epoch::Epoch) -> Result<(), BoxedError> { + debug_assert_eq!(epoch_details.common_info.source_states.len(), 1); + let txid = epoch_details + .common_info + .source_states + .iter() + .next() + .and_then(|(_, state)| state.op_id()) + .map(|op_id| op_id.txid); + + self.replication_worker.commit(txid)?; Ok(()) } fn process(&mut self, op: TableOperation) -> Result<(), BoxedError> { - // Set current transaction before any error can be thrown, so we don't - // get stuck in an error loop if this error gets ignored by the caller - self.current_transaction = op.id.map(|id| id.txid); - self.replication_worker.process(op)?; Ok(()) } diff --git a/dozer-sink-oracle/src/lib.rs b/dozer-sink-oracle/src/lib.rs index 757659d4e1..974ca2f9f1 100644 --- a/dozer-sink-oracle/src/lib.rs +++ b/dozer-sink-oracle/src/lib.rs @@ -25,8 +25,9 @@ use oracle::{ Connection, }; -const TXN_ID_COL: &str = "__txn_id"; -const TXN_SEQ_COL: &str = "__txn_seq"; +const TXN_ID_COL: &str = "txn_id"; +const TXN_SEQ_COL: &str = "txn_seq"; +const OPKIND_COL: &str = "DOZER_OPKIND"; const METADATA_TABLE: &str = "__replication_metadata"; const META_TXN_ID_COL: &str = "txn_id"; const META_TABLE_COL: &str = "table"; @@ -96,6 +97,7 @@ struct OracleSink { update_metadata: String, select_metadata: String, latest_txid: Option, + insert_statement: String, } #[derive(Debug)] @@ -265,6 +267,7 @@ impl OracleSinkFactory { &self, connection: &Connection, table: &Table, + temp_table: Option<&Table>, schema: &Schema, ) -> Result<(), Error> { if self.validate_table(connection, table, schema)? { @@ -297,9 +300,15 @@ impl OracleSinkFactory { if field.nullable { "" } else { " NOT NULL" } )); } - let table = format!("CREATE TABLE {table} ({})", column_defs.join(",\n")); - info!("### CREATE TABLE #### \n: {:?}", table); - connection.execute(&table, &[])?; + let table_query = format!("CREATE TABLE {table} ({})", column_defs.join(",\n")); + info!("### CREATE TABLE #### \n: {:?}", table_query); + connection.execute(&table_query, &[])?; + + if let Some(temp_table) = temp_table { + let temp_table_query = format!("CREATE GLOBAL TEMPORARY TABLE {temp_table} ({},\n {OPKIND_COL} NUMBER(1)) ON COMMIT DELETE ROWS", column_defs.join(",\n")); + info!("### CREATE TEMPORARY TABLE #### \n: {:?}", temp_table_query); + connection.execute(&temp_table_query, &[])?; + } Ok(()) } @@ -348,21 +357,51 @@ impl OracleSinkFactory { Ok(()) } + + fn create_pk( + &self, + connection: &Connection, + table: &Table, + schema: &Schema, + ) -> Result<(), Error> { + let mut columns = schema + .primary_index + .iter() + .map(|ix| schema.fields[*ix].name.clone()) + .collect::>(); + + columns.iter_mut().for_each(|col| { + *col = col.to_uppercase(); + }); + + let index_name = + format!("{}_{}_{}_PK", table.owner, table.name, columns.join("_")).replace('#', ""); + + let query = "SELECT index_name FROM all_indexes WHERE table_name = :1 AND owner = :2"; + info!("Index check query {query}"); + + let mut index_exist = connection.query(query, &[&table.name, &table.owner])?; + if index_exist.next().is_some() { + info!("Index {index_name} already exist"); + } else { + let query = format!( + "ALTER TABLE {table} ADD CONSTRAINT {index_name} PRIMARY KEY ({})", + columns.join(", ") + ); + info!("### CREATE PK #### \n: {index_name}. Query: {query}"); + connection.execute(&query, &[])?; + } + + Ok(()) + } } -fn generate_merge_statement(table: &Table, schema: &Schema) -> String { +fn generate_merge_statement(table: &Table, temp_table: &Table, schema: &Schema) -> String { let field_names = schema .fields .iter() .map(|field| field.name.as_str()) .chain([TXN_ID_COL, TXN_SEQ_COL]); - let mut parameter_index = 1usize..; - let input_fields = field_names - .clone() - .zip(&mut parameter_index) - .map(|(name, i)| format!(":{i} \"{name}\"")) - .collect::>() - .join(", "); let destination_columns = field_names .clone() .map(|name| format!("D.\"{name}\"")) @@ -394,8 +433,6 @@ fn generate_merge_statement(table: &Table, schema: &Schema) -> String { pk_select = "1 = 1".to_owned(); } - let opkind_idx = parameter_index.next().unwrap(); - let opid_select = format!( r#"(D."{TXN_ID_COL}" IS NULL OR S."{TXN_ID_COL}" > D."{TXN_ID_COL}" @@ -408,7 +445,7 @@ fn generate_merge_statement(table: &Table, schema: &Schema) -> String { // do nothing (if the op is INSERT, format!( r#"MERGE INTO {table} D - USING (SELECT {input_fields}, :{opkind_idx} DOZER_OPKIND FROM DUAL) S + USING {temp_table} S ON ({pk_select}) WHEN NOT MATCHED THEN INSERT ({destination_columns}) VALUES ({source_values}) WHERE S.DOZER_OPKIND = 0 WHEN MATCHED THEN UPDATE SET {destination_assign} WHERE S.DOZER_OPKIND = 1 AND {opid_select} @@ -417,6 +454,40 @@ fn generate_merge_statement(table: &Table, schema: &Schema) -> String { ) } +fn generate_insert_statement(table: &Table, schema: &Schema) -> String { + let field_names = schema + .fields + .iter() + .map(|field| field.name.as_str()) + .chain([TXN_ID_COL, TXN_SEQ_COL, OPKIND_COL]); + + let destination_columns = field_names + .clone() + .map(|name| format!("\"{name}\"")) + .collect::>() + .join(", "); + + let mut parameter_index = 1usize..; + let input_fields = field_names + .clone() + .zip(&mut parameter_index) + .map(|(name, i)| format!(":{i} \"{name}\"")) + .collect::>() + .join(", "); + + // Match on PK and txn_id. + // If the record does not exist and the op is INSERT, do the INSERT + // If the record exists, but the txid is higher than the operation's txid, + // do nothing (if the op is INSERT, + format!( + r#"INSERT INTO {table} ({destination_columns}) + SELECT * + FROM + (SELECT {input_fields} FROM DUAL) + "# + ) +} + #[derive(Debug, Clone)] struct Table { owner: String, @@ -477,8 +548,19 @@ impl SinkFactory for OracleSinkFactory { false, ); - self.validate_or_create_table(&connection, &self.table, &amended_schema)?; + let temp_table = Table { + owner: self.table.owner.clone(), + name: format!("{}_TEMP", &self.table.name), + }; + + self.validate_or_create_table( + &connection, + &self.table, + Some(&temp_table), + &amended_schema, + )?; self.create_index(&connection, &self.table, &amended_schema)?; + self.create_pk(&connection, &temp_table, &amended_schema)?; let meta_table = Table { owner: self.table.owner.clone(), name: METADATA_TABLE.to_owned(), @@ -486,6 +568,7 @@ impl SinkFactory for OracleSinkFactory { self.validate_or_create_table( &connection, &meta_table, + None, Schema::new() .field( FieldDefinition { @@ -518,10 +601,12 @@ impl SinkFactory for OracleSinkFactory { ); let field_types = schema.fields.iter().map(|field| field.typ).collect(); + Ok(Box::new(OracleSink { conn: connection, insert_append, - merge_statement: generate_merge_statement(&self.table, &schema), + merge_statement: generate_merge_statement(&self.table, &temp_table, &schema), + insert_statement: generate_insert_statement(&temp_table, &schema), field_types, pk: schema.primary_index, batch_params: Vec::new(), @@ -599,9 +684,10 @@ impl OracleSink { fn exec_batch(&mut self) -> oracle::Result<()> { debug!(target: "oracle_sink", "Executing batch of size {}", self.batch_params.len()); let started = std::time::Instant::now(); + let mut batch = self .conn - .batch(&self.merge_statement, self.batch_params.len()) + .batch(&self.insert_statement, self.batch_params.len()) .build()?; for params in self.batch_params.drain(..) { let mut bind_idx = 1..; @@ -621,6 +707,11 @@ impl OracleSink { batch.append_row(&[])?; } batch.execute()?; + + self.conn.execute(&self.merge_statement, &[])?; + + self.conn.commit()?; + debug!(target: "oracle_sink", "Execution took {:?}", started.elapsed()); Ok(()) } @@ -786,7 +877,13 @@ mod tests { owner: "owner".to_owned(), name: "tablename".to_owned(), }; - let stmt = generate_merge_statement(&table, &schema); + + let temp_table = Table { + owner: "owner".to_owned(), + name: "tablename_temp".to_owned(), + }; + + let stmt = generate_merge_statement(&table, &temp_table, &schema); assert_eq!( trim_str(stmt), trim_str( diff --git a/dozer-types/src/node.rs b/dozer-types/src/node.rs index f84485f0f7..b5b5562ae5 100644 --- a/dozer-types/src/node.rs +++ b/dozer-types/src/node.rs @@ -117,6 +117,16 @@ pub enum SourceState { Restartable(OpIdentifier), } +impl SourceState { + pub fn op_id(&self) -> Option<&OpIdentifier> { + if let Self::Restartable(op_id) = self { + Some(op_id) + } else { + None + } + } +} + /// Map from a `Source` node's handle to it state. /// /// This uniquely identifies the state of the Dozer pipeline.