From a44fcc705bd29bd653c4baec68d417c674f9d417 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karolis=20Gudi=C5=A1kis?= Date: Tue, 12 Mar 2024 11:43:36 +0200 Subject: [PATCH 1/5] feat: Use temp table in oracle sink and improve logging in aerospike sink --- dozer-ingestion/aerospike/src/connector.rs | 7 +- dozer-ingestion/oracle/src/connector/mod.rs | 3 +- .../oracle/src/connector/replicate/log/mod.rs | 7 +- dozer-sink-aerospike/src/aerospike.rs | 24 ++- dozer-sink-aerospike/src/denorm_dag.rs | 2 + dozer-sink-aerospike/src/lib.rs | 24 +-- dozer-sink-oracle/src/lib.rs | 137 +++++++++++++++--- dozer-types/src/node.rs | 10 ++ 8 files changed, 172 insertions(+), 42 deletions(-) diff --git a/dozer-ingestion/aerospike/src/connector.rs b/dozer-ingestion/aerospike/src/connector.rs index cd61a42c56..cef23499b9 100644 --- a/dozer-ingestion/aerospike/src/connector.rs +++ b/dozer-ingestion/aerospike/src/connector.rs @@ -506,7 +506,7 @@ impl Connector for AerospikeConnector { } else { FieldType::String }, - nullable: true, + nullable: name != "PK", source: Default::default(), }) .collect(), @@ -713,10 +713,7 @@ async fn map_record( fields[*pk] = Field::String(s.clone()); } serde_json::Value::Number(n) => { - fields[*pk] = Field::UInt( - n.as_u64() - .ok_or(AerospikeConnectorError::ParsingUIntFailed)?, - ); + fields[*pk] = Field::String(n.as_str().to_string()); } v => return Err(AerospikeConnectorError::KeyNotSupported(v)), } diff --git a/dozer-ingestion/oracle/src/connector/mod.rs b/dozer-ingestion/oracle/src/connector/mod.rs index 78c7e13b7a..4b8f874a81 100644 --- a/dozer-ingestion/oracle/src/connector/mod.rs +++ b/dozer-ingestion/oracle/src/connector/mod.rs @@ -33,7 +33,7 @@ pub struct Connector { #[derive(Debug, thiserror::Error)] pub enum Error { - #[error("oracle error: {0}")] + #[error("oracle error: {0:?}")] Oracle(#[from] oracle::Error), #[error("pdb not found: {0}")] PdbNotFound(String), @@ -264,6 +264,7 @@ impl Connector { let columns = table.column_names.join(", "); let owner = table.schema.unwrap_or_else(|| self.username.clone()); let sql = format!("SELECT {} FROM {}.{}", columns, owner, table.name); + debug!("{}", sql); let rows = self.connection.query(&sql, &[])?; let mut batch = Vec::with_capacity(self.batch_size); diff --git a/dozer-ingestion/oracle/src/connector/replicate/log/mod.rs b/dozer-ingestion/oracle/src/connector/replicate/log/mod.rs index 65196ab4cc..aa0a15a098 100644 --- a/dozer-ingestion/oracle/src/connector/replicate/log/mod.rs +++ b/dozer-ingestion/oracle/src/connector/replicate/log/mod.rs @@ -1,5 +1,6 @@ use std::{sync::mpsc::SyncSender, time::Duration}; +use dozer_ingestion_connector::dozer_types::log::debug; use dozer_ingestion_connector::{ dozer_types::{ chrono::{DateTime, Utc}, @@ -72,7 +73,7 @@ fn log_reader_loop( let mut last_rba: Option = None; loop { - info!("Listing logs starting from SCN {}", start_scn); + debug!(target: "oracle_replication", "Listing logs starting from SCN {}", start_scn); let mut logs = match list_logs(connection, start_scn) { Ok(logs) => logs, Err(e) => { @@ -95,7 +96,7 @@ fn log_reader_loop( 'replicate_logs: while !logs.is_empty() { let log = logs.remove(0); - info!( + debug!(target: "oracle_replication", "Reading log {} ({}) ({}, {}), starting from {:?}", log.name, log.sequence, log.first_change, log.next_change, last_rba ); @@ -145,7 +146,7 @@ fn log_reader_loop( if ingestor.is_closed() { return; } - info!("Read all logs, retrying after {:?}", poll_interval); + debug!(target: "oracle_replication", "Read all logs, retrying after {:?}", poll_interval); std::thread::sleep(poll_interval); } else { // If there are more logs, we need to start from the next log's first change. diff --git a/dozer-sink-aerospike/src/aerospike.rs b/dozer-sink-aerospike/src/aerospike.rs index be4f7fa39e..615d92fe62 100644 --- a/dozer-sink-aerospike/src/aerospike.rs +++ b/dozer-sink-aerospike/src/aerospike.rs @@ -1,3 +1,4 @@ +use std::time::Instant; use std::{ alloc::{handle_alloc_error, Layout}, ffi::{c_char, c_void, CStr, CString, NulError}, @@ -9,6 +10,7 @@ use std::{ use itertools::Itertools; use aerospike_client_sys::*; +use dozer_types::log::{debug, info}; use dozer_types::{ chrono::{DateTime, NaiveDate}, geo::{Coord, Point}, @@ -147,7 +149,9 @@ impl Client { as_config_init(config.as_mut_ptr()); config.assume_init() }; - config.policies.batch.base.total_timeout = 10000; + + config.policies.batch.base.total_timeout = 30000; + config.policies.batch.base.socket_timeout = 30000; config.policies.write.key = as_policy_key_e_AS_POLICY_KEY_SEND; config.policies.batch_write.key = as_policy_key_e_AS_POLICY_KEY_SEND; unsafe { @@ -185,6 +189,7 @@ impl Client { if let Some(filter) = filter { policy.base.filter_exp = filter.as_ptr(); } + as_try(|err| { aerospike_key_put( self.inner.as_ptr(), @@ -204,6 +209,7 @@ impl Client { ) -> Result<(), AerospikeError> { let mut policy = self.inner.as_ref().config.policies.write; policy.exists = as_policy_exists_e_AS_POLICY_EXISTS_CREATE; + self.put(key, new, policy, filter) } @@ -252,7 +258,18 @@ impl Client { &self, batch: *mut as_batch_records, ) -> Result<(), AerospikeError> { - as_try(|err| aerospike_batch_write(self.inner.as_ptr(), err, std::ptr::null(), batch)) + let started = Instant::now(); + let policy = self.inner.as_ref().config.policies.batch; + as_try(|err| { + aerospike_batch_write( + self.inner.as_ptr(), + err, + &policy as *const as_policy_batch, + batch, + ) + })?; + debug!(target: "aerospike_sink", "Batch write took {:?}", started.elapsed()); + Ok(()) } pub(crate) unsafe fn _select( @@ -293,7 +310,7 @@ impl Client { &self, batch: *mut as_batch_records, ) -> Result<(), AerospikeError> { - dbg!("Batch get {} records", (*batch).list.size); + info!("Batch get {} records", (*batch).list.size); as_try(|err| aerospike_batch_read(self.inner.as_ptr(), err, std::ptr::null(), batch)) } @@ -306,6 +323,7 @@ impl Client { request: &CStr, response: &mut *mut i8, ) -> Result<(), AerospikeError> { + info!("Info"); as_try(|err| { aerospike_info_any( self.inner.as_ptr(), diff --git a/dozer-sink-aerospike/src/denorm_dag.rs b/dozer-sink-aerospike/src/denorm_dag.rs index 9d80c0265f..60f7029257 100644 --- a/dozer-sink-aerospike/src/denorm_dag.rs +++ b/dozer-sink-aerospike/src/denorm_dag.rs @@ -10,6 +10,7 @@ use dozer_core::petgraph::visit::{ EdgeRef, IntoEdgesDirected, IntoNeighborsDirected, IntoNodeReferences, }; use dozer_types::indexmap::IndexMap; +use dozer_types::log::info; use dozer_types::models::sink::{AerospikeSet, AerospikeSinkTable}; use dozer_types::thiserror; use dozer_types::types::{Field, Record, Schema, TableOperation}; @@ -498,6 +499,7 @@ impl DenormalizationState { .sum(); let batch_size: u32 = batch_size_upper_bound.try_into().unwrap(); + info!("Writing denorm batch of size {}", batch_size); let mut write_batch = RecordBatch::new(batch_size, batch_size); for node in self.dag.node_weights_mut() { diff --git a/dozer-sink-aerospike/src/lib.rs b/dozer-sink-aerospike/src/lib.rs index 7c7e1883e3..4c68005f6b 100644 --- a/dozer-sink-aerospike/src/lib.rs +++ b/dozer-sink-aerospike/src/lib.rs @@ -3,7 +3,7 @@ pub use crate::aerospike::Client; use aerospike_client_sys::*; use denorm_dag::DenormalizationState; use dozer_core::event::EventHub; -use dozer_types::log::error; +use dozer_types::log::{error, info}; use dozer_types::models::connection::AerospikeConnection; use dozer_types::node::OpIdentifier; use dozer_types::thiserror; @@ -239,7 +239,6 @@ impl Drop for AsRecord<'_> { struct AerospikeSink { config: AerospikeSinkConfig, replication_worker: AerospikeSinkWorker, - current_transaction: Option, metadata_namespace: CString, metadata_set: CString, client: Arc, @@ -365,7 +364,6 @@ impl AerospikeSink { Ok(Self { config, replication_worker: worker_instance, - current_transaction: None, metadata_namespace, metadata_set, client, @@ -441,6 +439,8 @@ impl AerospikeSinkWorker { .sum(); // Write denormed tables let mut batch = RecordBatch::new(batch_size_est as u32, batch_size_est as u32); + + info!("Sink batch size {batch_size_est}"); for table in denormalized_tables { for (key, record) in table.records { batch.add_write( @@ -475,17 +475,21 @@ impl Sink for AerospikeSink { Ok(()) } - fn commit(&mut self, _epoch_details: &dozer_core::epoch::Epoch) -> Result<(), BoxedError> { - self.replication_worker - .commit(self.current_transaction.take())?; + fn commit(&mut self, epoch_details: &dozer_core::epoch::Epoch) -> Result<(), BoxedError> { + debug_assert_eq!(epoch_details.common_info.source_states.len(), 1); + let txid = epoch_details + .common_info + .source_states + .iter() + .next() + .and_then(|(_, state)| state.op_id()) + .map(|op_id| op_id.txid); + + self.replication_worker.commit(txid)?; Ok(()) } fn process(&mut self, op: TableOperation) -> Result<(), BoxedError> { - // Set current transaction before any error can be thrown, so we don't - // get stuck in an error loop if this error gets ignored by the caller - self.current_transaction = op.id.map(|id| id.txid); - self.replication_worker.process(op)?; Ok(()) } diff --git a/dozer-sink-oracle/src/lib.rs b/dozer-sink-oracle/src/lib.rs index 757659d4e1..974ca2f9f1 100644 --- a/dozer-sink-oracle/src/lib.rs +++ b/dozer-sink-oracle/src/lib.rs @@ -25,8 +25,9 @@ use oracle::{ Connection, }; -const TXN_ID_COL: &str = "__txn_id"; -const TXN_SEQ_COL: &str = "__txn_seq"; +const TXN_ID_COL: &str = "txn_id"; +const TXN_SEQ_COL: &str = "txn_seq"; +const OPKIND_COL: &str = "DOZER_OPKIND"; const METADATA_TABLE: &str = "__replication_metadata"; const META_TXN_ID_COL: &str = "txn_id"; const META_TABLE_COL: &str = "table"; @@ -96,6 +97,7 @@ struct OracleSink { update_metadata: String, select_metadata: String, latest_txid: Option, + insert_statement: String, } #[derive(Debug)] @@ -265,6 +267,7 @@ impl OracleSinkFactory { &self, connection: &Connection, table: &Table, + temp_table: Option<&Table>, schema: &Schema, ) -> Result<(), Error> { if self.validate_table(connection, table, schema)? { @@ -297,9 +300,15 @@ impl OracleSinkFactory { if field.nullable { "" } else { " NOT NULL" } )); } - let table = format!("CREATE TABLE {table} ({})", column_defs.join(",\n")); - info!("### CREATE TABLE #### \n: {:?}", table); - connection.execute(&table, &[])?; + let table_query = format!("CREATE TABLE {table} ({})", column_defs.join(",\n")); + info!("### CREATE TABLE #### \n: {:?}", table_query); + connection.execute(&table_query, &[])?; + + if let Some(temp_table) = temp_table { + let temp_table_query = format!("CREATE GLOBAL TEMPORARY TABLE {temp_table} ({},\n {OPKIND_COL} NUMBER(1)) ON COMMIT DELETE ROWS", column_defs.join(",\n")); + info!("### CREATE TEMPORARY TABLE #### \n: {:?}", temp_table_query); + connection.execute(&temp_table_query, &[])?; + } Ok(()) } @@ -348,21 +357,51 @@ impl OracleSinkFactory { Ok(()) } + + fn create_pk( + &self, + connection: &Connection, + table: &Table, + schema: &Schema, + ) -> Result<(), Error> { + let mut columns = schema + .primary_index + .iter() + .map(|ix| schema.fields[*ix].name.clone()) + .collect::>(); + + columns.iter_mut().for_each(|col| { + *col = col.to_uppercase(); + }); + + let index_name = + format!("{}_{}_{}_PK", table.owner, table.name, columns.join("_")).replace('#', ""); + + let query = "SELECT index_name FROM all_indexes WHERE table_name = :1 AND owner = :2"; + info!("Index check query {query}"); + + let mut index_exist = connection.query(query, &[&table.name, &table.owner])?; + if index_exist.next().is_some() { + info!("Index {index_name} already exist"); + } else { + let query = format!( + "ALTER TABLE {table} ADD CONSTRAINT {index_name} PRIMARY KEY ({})", + columns.join(", ") + ); + info!("### CREATE PK #### \n: {index_name}. Query: {query}"); + connection.execute(&query, &[])?; + } + + Ok(()) + } } -fn generate_merge_statement(table: &Table, schema: &Schema) -> String { +fn generate_merge_statement(table: &Table, temp_table: &Table, schema: &Schema) -> String { let field_names = schema .fields .iter() .map(|field| field.name.as_str()) .chain([TXN_ID_COL, TXN_SEQ_COL]); - let mut parameter_index = 1usize..; - let input_fields = field_names - .clone() - .zip(&mut parameter_index) - .map(|(name, i)| format!(":{i} \"{name}\"")) - .collect::>() - .join(", "); let destination_columns = field_names .clone() .map(|name| format!("D.\"{name}\"")) @@ -394,8 +433,6 @@ fn generate_merge_statement(table: &Table, schema: &Schema) -> String { pk_select = "1 = 1".to_owned(); } - let opkind_idx = parameter_index.next().unwrap(); - let opid_select = format!( r#"(D."{TXN_ID_COL}" IS NULL OR S."{TXN_ID_COL}" > D."{TXN_ID_COL}" @@ -408,7 +445,7 @@ fn generate_merge_statement(table: &Table, schema: &Schema) -> String { // do nothing (if the op is INSERT, format!( r#"MERGE INTO {table} D - USING (SELECT {input_fields}, :{opkind_idx} DOZER_OPKIND FROM DUAL) S + USING {temp_table} S ON ({pk_select}) WHEN NOT MATCHED THEN INSERT ({destination_columns}) VALUES ({source_values}) WHERE S.DOZER_OPKIND = 0 WHEN MATCHED THEN UPDATE SET {destination_assign} WHERE S.DOZER_OPKIND = 1 AND {opid_select} @@ -417,6 +454,40 @@ fn generate_merge_statement(table: &Table, schema: &Schema) -> String { ) } +fn generate_insert_statement(table: &Table, schema: &Schema) -> String { + let field_names = schema + .fields + .iter() + .map(|field| field.name.as_str()) + .chain([TXN_ID_COL, TXN_SEQ_COL, OPKIND_COL]); + + let destination_columns = field_names + .clone() + .map(|name| format!("\"{name}\"")) + .collect::>() + .join(", "); + + let mut parameter_index = 1usize..; + let input_fields = field_names + .clone() + .zip(&mut parameter_index) + .map(|(name, i)| format!(":{i} \"{name}\"")) + .collect::>() + .join(", "); + + // Match on PK and txn_id. + // If the record does not exist and the op is INSERT, do the INSERT + // If the record exists, but the txid is higher than the operation's txid, + // do nothing (if the op is INSERT, + format!( + r#"INSERT INTO {table} ({destination_columns}) + SELECT * + FROM + (SELECT {input_fields} FROM DUAL) + "# + ) +} + #[derive(Debug, Clone)] struct Table { owner: String, @@ -477,8 +548,19 @@ impl SinkFactory for OracleSinkFactory { false, ); - self.validate_or_create_table(&connection, &self.table, &amended_schema)?; + let temp_table = Table { + owner: self.table.owner.clone(), + name: format!("{}_TEMP", &self.table.name), + }; + + self.validate_or_create_table( + &connection, + &self.table, + Some(&temp_table), + &amended_schema, + )?; self.create_index(&connection, &self.table, &amended_schema)?; + self.create_pk(&connection, &temp_table, &amended_schema)?; let meta_table = Table { owner: self.table.owner.clone(), name: METADATA_TABLE.to_owned(), @@ -486,6 +568,7 @@ impl SinkFactory for OracleSinkFactory { self.validate_or_create_table( &connection, &meta_table, + None, Schema::new() .field( FieldDefinition { @@ -518,10 +601,12 @@ impl SinkFactory for OracleSinkFactory { ); let field_types = schema.fields.iter().map(|field| field.typ).collect(); + Ok(Box::new(OracleSink { conn: connection, insert_append, - merge_statement: generate_merge_statement(&self.table, &schema), + merge_statement: generate_merge_statement(&self.table, &temp_table, &schema), + insert_statement: generate_insert_statement(&temp_table, &schema), field_types, pk: schema.primary_index, batch_params: Vec::new(), @@ -599,9 +684,10 @@ impl OracleSink { fn exec_batch(&mut self) -> oracle::Result<()> { debug!(target: "oracle_sink", "Executing batch of size {}", self.batch_params.len()); let started = std::time::Instant::now(); + let mut batch = self .conn - .batch(&self.merge_statement, self.batch_params.len()) + .batch(&self.insert_statement, self.batch_params.len()) .build()?; for params in self.batch_params.drain(..) { let mut bind_idx = 1..; @@ -621,6 +707,11 @@ impl OracleSink { batch.append_row(&[])?; } batch.execute()?; + + self.conn.execute(&self.merge_statement, &[])?; + + self.conn.commit()?; + debug!(target: "oracle_sink", "Execution took {:?}", started.elapsed()); Ok(()) } @@ -786,7 +877,13 @@ mod tests { owner: "owner".to_owned(), name: "tablename".to_owned(), }; - let stmt = generate_merge_statement(&table, &schema); + + let temp_table = Table { + owner: "owner".to_owned(), + name: "tablename_temp".to_owned(), + }; + + let stmt = generate_merge_statement(&table, &temp_table, &schema); assert_eq!( trim_str(stmt), trim_str( diff --git a/dozer-types/src/node.rs b/dozer-types/src/node.rs index f84485f0f7..b5b5562ae5 100644 --- a/dozer-types/src/node.rs +++ b/dozer-types/src/node.rs @@ -117,6 +117,16 @@ pub enum SourceState { Restartable(OpIdentifier), } +impl SourceState { + pub fn op_id(&self) -> Option<&OpIdentifier> { + if let Self::Restartable(op_id) = self { + Some(op_id) + } else { + None + } + } +} + /// Map from a `Source` node's handle to it state. /// /// This uniquely identifies the state of the Dozer pipeline. From 2659935d97f5d99c571644257ba7592f1b65d2df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karolis=20Gudi=C5=A1kis?= Date: Thu, 14 Mar 2024 06:55:55 +0200 Subject: [PATCH 2/5] Revert change --- dozer-ingestion/aerospike/src/connector.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dozer-ingestion/aerospike/src/connector.rs b/dozer-ingestion/aerospike/src/connector.rs index cef23499b9..a6246638da 100644 --- a/dozer-ingestion/aerospike/src/connector.rs +++ b/dozer-ingestion/aerospike/src/connector.rs @@ -713,7 +713,10 @@ async fn map_record( fields[*pk] = Field::String(s.clone()); } serde_json::Value::Number(n) => { - fields[*pk] = Field::String(n.as_str().to_string()); + fields[*pk] = Field::UInt( + n.as_u64() + .ok_or(AerospikeConnectorError::ParsingUIntFailed)?, + ); } v => return Err(AerospikeConnectorError::KeyNotSupported(v)), } From 6de3ac5aa5e3e1a461015d66a9fc905d34788fa5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karolis=20Gudi=C5=A1kis?= Date: Thu, 14 Mar 2024 07:12:36 +0200 Subject: [PATCH 3/5] Apply clippy fixes --- dozer-sink-aerospike/src/aerospike.rs | 2 +- dozer-sink-aerospike/src/denorm_dag.rs | 4 ++-- dozer-sink-aerospike/src/lib.rs | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/dozer-sink-aerospike/src/aerospike.rs b/dozer-sink-aerospike/src/aerospike.rs index 615d92fe62..b7ca2e010a 100644 --- a/dozer-sink-aerospike/src/aerospike.rs +++ b/dozer-sink-aerospike/src/aerospike.rs @@ -310,7 +310,7 @@ impl Client { &self, batch: *mut as_batch_records, ) -> Result<(), AerospikeError> { - info!("Batch get {} records", (*batch).list.size); + dbg!("Batch get {} records", (*batch).list.size); as_try(|err| aerospike_batch_read(self.inner.as_ptr(), err, std::ptr::null(), batch)) } diff --git a/dozer-sink-aerospike/src/denorm_dag.rs b/dozer-sink-aerospike/src/denorm_dag.rs index 60f7029257..a435259f47 100644 --- a/dozer-sink-aerospike/src/denorm_dag.rs +++ b/dozer-sink-aerospike/src/denorm_dag.rs @@ -10,7 +10,7 @@ use dozer_core::petgraph::visit::{ EdgeRef, IntoEdgesDirected, IntoNeighborsDirected, IntoNodeReferences, }; use dozer_types::indexmap::IndexMap; -use dozer_types::log::info; +use dozer_types::log::debug; use dozer_types::models::sink::{AerospikeSet, AerospikeSinkTable}; use dozer_types::thiserror; use dozer_types::types::{Field, Record, Schema, TableOperation}; @@ -499,7 +499,7 @@ impl DenormalizationState { .sum(); let batch_size: u32 = batch_size_upper_bound.try_into().unwrap(); - info!("Writing denorm batch of size {}", batch_size); + debug!(target: "aerospike_sink", "Writing denorm batch of size {}", batch_size); let mut write_batch = RecordBatch::new(batch_size, batch_size); for node in self.dag.node_weights_mut() { diff --git a/dozer-sink-aerospike/src/lib.rs b/dozer-sink-aerospike/src/lib.rs index 4c68005f6b..c3f6f985ac 100644 --- a/dozer-sink-aerospike/src/lib.rs +++ b/dozer-sink-aerospike/src/lib.rs @@ -3,7 +3,7 @@ pub use crate::aerospike::Client; use aerospike_client_sys::*; use denorm_dag::DenormalizationState; use dozer_core::event::EventHub; -use dozer_types::log::{error, info}; +use dozer_types::log::{debug, error}; use dozer_types::models::connection::AerospikeConnection; use dozer_types::node::OpIdentifier; use dozer_types::thiserror; @@ -440,7 +440,7 @@ impl AerospikeSinkWorker { // Write denormed tables let mut batch = RecordBatch::new(batch_size_est as u32, batch_size_est as u32); - info!("Sink batch size {batch_size_est}"); + debug!(target: "aerospike_sink", "Sink batch size {batch_size_est}"); for table in denormalized_tables { for (key, record) in table.records { batch.add_write( From 7a772b07a47cad83ac507c08779165893bcfa3ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karolis=20Gudi=C5=A1kis?= Date: Thu, 14 Mar 2024 07:49:49 +0200 Subject: [PATCH 4/5] Fix tests --- dozer-sink-oracle/src/lib.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dozer-sink-oracle/src/lib.rs b/dozer-sink-oracle/src/lib.rs index 974ca2f9f1..78f9fc0224 100644 --- a/dozer-sink-oracle/src/lib.rs +++ b/dozer-sink-oracle/src/lib.rs @@ -25,8 +25,8 @@ use oracle::{ Connection, }; -const TXN_ID_COL: &str = "txn_id"; -const TXN_SEQ_COL: &str = "txn_seq"; +const TXN_ID_COL: &str = "__txn_id"; +const TXN_SEQ_COL: &str = "__txn_seq"; const OPKIND_COL: &str = "DOZER_OPKIND"; const METADATA_TABLE: &str = "__replication_metadata"; const META_TXN_ID_COL: &str = "txn_id"; @@ -889,7 +889,7 @@ mod tests { trim_str( r#" MERGE INTO "owner"."tablename" D - USING (SELECT :1 "id", :2 "name", :3 "content", :4 "__txn_id", :5 "__txn_seq", :6 DOZER_OPKIND FROM DUAL) S + USING "owner"."tablename_temp" S ON (D."id" = S."id" AND D."name" = S."name") WHEN NOT MATCHED THEN INSERT (D."id", D."name", D."content", D."__txn_id", D."__txn_seq") VALUES (S."id", S."name", S."content", S."__txn_id", S."__txn_seq") WHERE S.DOZER_OPKIND = 0 WHEN MATCHED THEN UPDATE SET D."content" = S."content", D."__txn_id" = S."__txn_id", D."__txn_seq" = S."__txn_seq" From f800a4b6cedfefa798b3364dc9dab3c77d84523b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karolis=20Gudi=C5=A1kis?= Date: Fri, 15 Mar 2024 06:46:21 +0200 Subject: [PATCH 5/5] Refactor oracle temp table --- dozer-sink-aerospike/src/aerospike.rs | 5 +- dozer-sink-aerospike/src/denorm_dag.rs | 3 +- dozer-sink-aerospike/src/lib.rs | 3 +- dozer-sink-oracle/src/lib.rs | 89 +++++++++----------------- 4 files changed, 35 insertions(+), 65 deletions(-) diff --git a/dozer-sink-aerospike/src/aerospike.rs b/dozer-sink-aerospike/src/aerospike.rs index b7ca2e010a..0b1c53305c 100644 --- a/dozer-sink-aerospike/src/aerospike.rs +++ b/dozer-sink-aerospike/src/aerospike.rs @@ -10,7 +10,7 @@ use std::{ use itertools::Itertools; use aerospike_client_sys::*; -use dozer_types::log::{debug, info}; +use dozer_types::log::debug; use dozer_types::{ chrono::{DateTime, NaiveDate}, geo::{Coord, Point}, @@ -258,6 +258,8 @@ impl Client { &self, batch: *mut as_batch_records, ) -> Result<(), AerospikeError> { + debug!(target: "aerospike_sink", "Writing batch of size {}", batch.as_ref().unwrap().list.size); + let started = Instant::now(); let policy = self.inner.as_ref().config.policies.batch; as_try(|err| { @@ -323,7 +325,6 @@ impl Client { request: &CStr, response: &mut *mut i8, ) -> Result<(), AerospikeError> { - info!("Info"); as_try(|err| { aerospike_info_any( self.inner.as_ptr(), diff --git a/dozer-sink-aerospike/src/denorm_dag.rs b/dozer-sink-aerospike/src/denorm_dag.rs index a435259f47..488eae0892 100644 --- a/dozer-sink-aerospike/src/denorm_dag.rs +++ b/dozer-sink-aerospike/src/denorm_dag.rs @@ -10,7 +10,7 @@ use dozer_core::petgraph::visit::{ EdgeRef, IntoEdgesDirected, IntoNeighborsDirected, IntoNodeReferences, }; use dozer_types::indexmap::IndexMap; -use dozer_types::log::debug; + use dozer_types::models::sink::{AerospikeSet, AerospikeSinkTable}; use dozer_types::thiserror; use dozer_types::types::{Field, Record, Schema, TableOperation}; @@ -499,7 +499,6 @@ impl DenormalizationState { .sum(); let batch_size: u32 = batch_size_upper_bound.try_into().unwrap(); - debug!(target: "aerospike_sink", "Writing denorm batch of size {}", batch_size); let mut write_batch = RecordBatch::new(batch_size, batch_size); for node in self.dag.node_weights_mut() { diff --git a/dozer-sink-aerospike/src/lib.rs b/dozer-sink-aerospike/src/lib.rs index c3f6f985ac..762436f8a7 100644 --- a/dozer-sink-aerospike/src/lib.rs +++ b/dozer-sink-aerospike/src/lib.rs @@ -3,7 +3,7 @@ pub use crate::aerospike::Client; use aerospike_client_sys::*; use denorm_dag::DenormalizationState; use dozer_core::event::EventHub; -use dozer_types::log::{debug, error}; +use dozer_types::log::error; use dozer_types::models::connection::AerospikeConnection; use dozer_types::node::OpIdentifier; use dozer_types::thiserror; @@ -440,7 +440,6 @@ impl AerospikeSinkWorker { // Write denormed tables let mut batch = RecordBatch::new(batch_size_est as u32, batch_size_est as u32); - debug!(target: "aerospike_sink", "Sink batch size {batch_size_est}"); for table in denormalized_tables { for (key, record) in table.records { batch.add_write( diff --git a/dozer-sink-oracle/src/lib.rs b/dozer-sink-oracle/src/lib.rs index 78f9fc0224..088b9b7c92 100644 --- a/dozer-sink-oracle/src/lib.rs +++ b/dozer-sink-oracle/src/lib.rs @@ -98,6 +98,7 @@ struct OracleSink { select_metadata: String, latest_txid: Option, insert_statement: String, + delete_statement: String, } #[derive(Debug)] @@ -270,11 +271,7 @@ impl OracleSinkFactory { temp_table: Option<&Table>, schema: &Schema, ) -> Result<(), Error> { - if self.validate_table(connection, table, schema)? { - return Ok(()); - } - - let mut column_defs = Vec::with_capacity(schema.fields.len() + 2); + let mut column_defs = Vec::with_capacity(schema.fields.len()); for field in &schema.fields { let name = &field.name; let col_type = match field.typ { @@ -300,13 +297,16 @@ impl OracleSinkFactory { if field.nullable { "" } else { " NOT NULL" } )); } - let table_query = format!("CREATE TABLE {table} ({})", column_defs.join(",\n")); - info!("### CREATE TABLE #### \n: {:?}", table_query); - connection.execute(&table_query, &[])?; + + if !(self.validate_table(connection, table, schema)?) { + let table_query = format!("CREATE TABLE {table} ({})", column_defs.join(",\n")); + info!("### CREATE TABLE ####\n{}", table_query); + connection.execute(&table_query, &[])?; + } if let Some(temp_table) = temp_table { - let temp_table_query = format!("CREATE GLOBAL TEMPORARY TABLE {temp_table} ({},\n {OPKIND_COL} NUMBER(1)) ON COMMIT DELETE ROWS", column_defs.join(",\n")); - info!("### CREATE TEMPORARY TABLE #### \n: {:?}", temp_table_query); + let temp_table_query = format!("CREATE PRIVATE TEMPORARY TABLE {temp_table} ({},\n {OPKIND_COL} NUMBER(1)) ON COMMIT PRESERVE DEFINITION", column_defs.join(",\n")).replace("NOT NULL", ""); + info!("### CREATE TEMPORARY TABLE ####\n{}", temp_table_query); connection.execute(&temp_table_query, &[])?; } @@ -357,44 +357,8 @@ impl OracleSinkFactory { Ok(()) } - - fn create_pk( - &self, - connection: &Connection, - table: &Table, - schema: &Schema, - ) -> Result<(), Error> { - let mut columns = schema - .primary_index - .iter() - .map(|ix| schema.fields[*ix].name.clone()) - .collect::>(); - - columns.iter_mut().for_each(|col| { - *col = col.to_uppercase(); - }); - - let index_name = - format!("{}_{}_{}_PK", table.owner, table.name, columns.join("_")).replace('#', ""); - - let query = "SELECT index_name FROM all_indexes WHERE table_name = :1 AND owner = :2"; - info!("Index check query {query}"); - - let mut index_exist = connection.query(query, &[&table.name, &table.owner])?; - if index_exist.next().is_some() { - info!("Index {index_name} already exist"); - } else { - let query = format!( - "ALTER TABLE {table} ADD CONSTRAINT {index_name} PRIMARY KEY ({})", - columns.join(", ") - ); - info!("### CREATE PK #### \n: {index_name}. Query: {query}"); - connection.execute(&query, &[])?; - } - - Ok(()) - } } + fn generate_merge_statement(table: &Table, temp_table: &Table, schema: &Schema) -> String { let field_names = schema .fields @@ -461,12 +425,6 @@ fn generate_insert_statement(table: &Table, schema: &Schema) -> String { .map(|field| field.name.as_str()) .chain([TXN_ID_COL, TXN_SEQ_COL, OPKIND_COL]); - let destination_columns = field_names - .clone() - .map(|name| format!("\"{name}\"")) - .collect::>() - .join(", "); - let mut parameter_index = 1usize..; let input_fields = field_names .clone() @@ -480,13 +438,16 @@ fn generate_insert_statement(table: &Table, schema: &Schema) -> String { // If the record exists, but the txid is higher than the operation's txid, // do nothing (if the op is INSERT, format!( - r#"INSERT INTO {table} ({destination_columns}) + r#"INSERT INTO {table} SELECT * FROM (SELECT {input_fields} FROM DUAL) "# ) } +fn generate_delete_statement(table: &Table) -> String { + format!(r#"DELETE FROM {table}"#) +} #[derive(Debug, Clone)] struct Table { @@ -550,7 +511,7 @@ impl SinkFactory for OracleSinkFactory { let temp_table = Table { owner: self.table.owner.clone(), - name: format!("{}_TEMP", &self.table.name), + name: format!("ORA$PTT_{}", &self.table.name), }; self.validate_or_create_table( @@ -560,7 +521,6 @@ impl SinkFactory for OracleSinkFactory { &amended_schema, )?; self.create_index(&connection, &self.table, &amended_schema)?; - self.create_pk(&connection, &temp_table, &amended_schema)?; let meta_table = Table { owner: self.table.owner.clone(), name: METADATA_TABLE.to_owned(), @@ -602,11 +562,21 @@ impl SinkFactory for OracleSinkFactory { let field_types = schema.fields.iter().map(|field| field.typ).collect(); + let merge_statement = generate_merge_statement(&self.table, &temp_table, &schema); + info!(target: "oracle_sink", "Merge statement {}", merge_statement); + + let insert_statement = generate_insert_statement(&temp_table, &schema); + info!(target: "oracle_sink", "Insert statement {}", insert_statement); + + let delete_statement = generate_delete_statement(&temp_table); + info!(target: "oracle_sink", "Delete statement {}", delete_statement); + Ok(Box::new(OracleSink { conn: connection, insert_append, - merge_statement: generate_merge_statement(&self.table, &temp_table, &schema), - insert_statement: generate_insert_statement(&temp_table, &schema), + merge_statement, + insert_statement, + delete_statement, field_types, pk: schema.primary_index, batch_params: Vec::new(), @@ -710,7 +680,7 @@ impl OracleSink { self.conn.execute(&self.merge_statement, &[])?; - self.conn.commit()?; + self.conn.execute(&self.delete_statement, &[])?; debug!(target: "oracle_sink", "Execution took {:?}", started.elapsed()); Ok(()) @@ -739,6 +709,7 @@ impl Sink for OracleSink { &mut self, _epoch_details: &dozer_core::epoch::Epoch, ) -> Result<(), dozer_types::errors::internal::BoxedError> { + // Ok(self.conn.commit()?) Ok(()) }