From f800a4b6cedfefa798b3364dc9dab3c77d84523b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karolis=20Gudi=C5=A1kis?= Date: Fri, 15 Mar 2024 06:46:21 +0200 Subject: [PATCH] Refactor oracle temp table --- dozer-sink-aerospike/src/aerospike.rs | 5 +- dozer-sink-aerospike/src/denorm_dag.rs | 3 +- dozer-sink-aerospike/src/lib.rs | 3 +- dozer-sink-oracle/src/lib.rs | 89 +++++++++----------------- 4 files changed, 35 insertions(+), 65 deletions(-) diff --git a/dozer-sink-aerospike/src/aerospike.rs b/dozer-sink-aerospike/src/aerospike.rs index b7ca2e010a..0b1c53305c 100644 --- a/dozer-sink-aerospike/src/aerospike.rs +++ b/dozer-sink-aerospike/src/aerospike.rs @@ -10,7 +10,7 @@ use std::{ use itertools::Itertools; use aerospike_client_sys::*; -use dozer_types::log::{debug, info}; +use dozer_types::log::debug; use dozer_types::{ chrono::{DateTime, NaiveDate}, geo::{Coord, Point}, @@ -258,6 +258,8 @@ impl Client { &self, batch: *mut as_batch_records, ) -> Result<(), AerospikeError> { + debug!(target: "aerospike_sink", "Writing batch of size {}", batch.as_ref().unwrap().list.size); + let started = Instant::now(); let policy = self.inner.as_ref().config.policies.batch; as_try(|err| { @@ -323,7 +325,6 @@ impl Client { request: &CStr, response: &mut *mut i8, ) -> Result<(), AerospikeError> { - info!("Info"); as_try(|err| { aerospike_info_any( self.inner.as_ptr(), diff --git a/dozer-sink-aerospike/src/denorm_dag.rs b/dozer-sink-aerospike/src/denorm_dag.rs index a435259f47..488eae0892 100644 --- a/dozer-sink-aerospike/src/denorm_dag.rs +++ b/dozer-sink-aerospike/src/denorm_dag.rs @@ -10,7 +10,7 @@ use dozer_core::petgraph::visit::{ EdgeRef, IntoEdgesDirected, IntoNeighborsDirected, IntoNodeReferences, }; use dozer_types::indexmap::IndexMap; -use dozer_types::log::debug; + use dozer_types::models::sink::{AerospikeSet, AerospikeSinkTable}; use dozer_types::thiserror; use dozer_types::types::{Field, Record, Schema, TableOperation}; @@ -499,7 +499,6 @@ impl DenormalizationState { .sum(); let batch_size: u32 = batch_size_upper_bound.try_into().unwrap(); - debug!(target: "aerospike_sink", "Writing denorm batch of size {}", batch_size); let mut write_batch = RecordBatch::new(batch_size, batch_size); for node in self.dag.node_weights_mut() { diff --git a/dozer-sink-aerospike/src/lib.rs b/dozer-sink-aerospike/src/lib.rs index c3f6f985ac..762436f8a7 100644 --- a/dozer-sink-aerospike/src/lib.rs +++ b/dozer-sink-aerospike/src/lib.rs @@ -3,7 +3,7 @@ pub use crate::aerospike::Client; use aerospike_client_sys::*; use denorm_dag::DenormalizationState; use dozer_core::event::EventHub; -use dozer_types::log::{debug, error}; +use dozer_types::log::error; use dozer_types::models::connection::AerospikeConnection; use dozer_types::node::OpIdentifier; use dozer_types::thiserror; @@ -440,7 +440,6 @@ impl AerospikeSinkWorker { // Write denormed tables let mut batch = RecordBatch::new(batch_size_est as u32, batch_size_est as u32); - debug!(target: "aerospike_sink", "Sink batch size {batch_size_est}"); for table in denormalized_tables { for (key, record) in table.records { batch.add_write( diff --git a/dozer-sink-oracle/src/lib.rs b/dozer-sink-oracle/src/lib.rs index 78f9fc0224..088b9b7c92 100644 --- a/dozer-sink-oracle/src/lib.rs +++ b/dozer-sink-oracle/src/lib.rs @@ -98,6 +98,7 @@ struct OracleSink { select_metadata: String, latest_txid: Option, insert_statement: String, + delete_statement: String, } #[derive(Debug)] @@ -270,11 +271,7 @@ impl OracleSinkFactory { temp_table: Option<&Table>, schema: &Schema, ) -> Result<(), Error> { - if self.validate_table(connection, table, schema)? { - return Ok(()); - } - - let mut column_defs = Vec::with_capacity(schema.fields.len() + 2); + let mut column_defs = Vec::with_capacity(schema.fields.len()); for field in &schema.fields { let name = &field.name; let col_type = match field.typ { @@ -300,13 +297,16 @@ impl OracleSinkFactory { if field.nullable { "" } else { " NOT NULL" } )); } - let table_query = format!("CREATE TABLE {table} ({})", column_defs.join(",\n")); - info!("### CREATE TABLE #### \n: {:?}", table_query); - connection.execute(&table_query, &[])?; + + if !(self.validate_table(connection, table, schema)?) { + let table_query = format!("CREATE TABLE {table} ({})", column_defs.join(",\n")); + info!("### CREATE TABLE ####\n{}", table_query); + connection.execute(&table_query, &[])?; + } if let Some(temp_table) = temp_table { - let temp_table_query = format!("CREATE GLOBAL TEMPORARY TABLE {temp_table} ({},\n {OPKIND_COL} NUMBER(1)) ON COMMIT DELETE ROWS", column_defs.join(",\n")); - info!("### CREATE TEMPORARY TABLE #### \n: {:?}", temp_table_query); + let temp_table_query = format!("CREATE PRIVATE TEMPORARY TABLE {temp_table} ({},\n {OPKIND_COL} NUMBER(1)) ON COMMIT PRESERVE DEFINITION", column_defs.join(",\n")).replace("NOT NULL", ""); + info!("### CREATE TEMPORARY TABLE ####\n{}", temp_table_query); connection.execute(&temp_table_query, &[])?; } @@ -357,44 +357,8 @@ impl OracleSinkFactory { Ok(()) } - - fn create_pk( - &self, - connection: &Connection, - table: &Table, - schema: &Schema, - ) -> Result<(), Error> { - let mut columns = schema - .primary_index - .iter() - .map(|ix| schema.fields[*ix].name.clone()) - .collect::>(); - - columns.iter_mut().for_each(|col| { - *col = col.to_uppercase(); - }); - - let index_name = - format!("{}_{}_{}_PK", table.owner, table.name, columns.join("_")).replace('#', ""); - - let query = "SELECT index_name FROM all_indexes WHERE table_name = :1 AND owner = :2"; - info!("Index check query {query}"); - - let mut index_exist = connection.query(query, &[&table.name, &table.owner])?; - if index_exist.next().is_some() { - info!("Index {index_name} already exist"); - } else { - let query = format!( - "ALTER TABLE {table} ADD CONSTRAINT {index_name} PRIMARY KEY ({})", - columns.join(", ") - ); - info!("### CREATE PK #### \n: {index_name}. Query: {query}"); - connection.execute(&query, &[])?; - } - - Ok(()) - } } + fn generate_merge_statement(table: &Table, temp_table: &Table, schema: &Schema) -> String { let field_names = schema .fields @@ -461,12 +425,6 @@ fn generate_insert_statement(table: &Table, schema: &Schema) -> String { .map(|field| field.name.as_str()) .chain([TXN_ID_COL, TXN_SEQ_COL, OPKIND_COL]); - let destination_columns = field_names - .clone() - .map(|name| format!("\"{name}\"")) - .collect::>() - .join(", "); - let mut parameter_index = 1usize..; let input_fields = field_names .clone() @@ -480,13 +438,16 @@ fn generate_insert_statement(table: &Table, schema: &Schema) -> String { // If the record exists, but the txid is higher than the operation's txid, // do nothing (if the op is INSERT, format!( - r#"INSERT INTO {table} ({destination_columns}) + r#"INSERT INTO {table} SELECT * FROM (SELECT {input_fields} FROM DUAL) "# ) } +fn generate_delete_statement(table: &Table) -> String { + format!(r#"DELETE FROM {table}"#) +} #[derive(Debug, Clone)] struct Table { @@ -550,7 +511,7 @@ impl SinkFactory for OracleSinkFactory { let temp_table = Table { owner: self.table.owner.clone(), - name: format!("{}_TEMP", &self.table.name), + name: format!("ORA$PTT_{}", &self.table.name), }; self.validate_or_create_table( @@ -560,7 +521,6 @@ impl SinkFactory for OracleSinkFactory { &amended_schema, )?; self.create_index(&connection, &self.table, &amended_schema)?; - self.create_pk(&connection, &temp_table, &amended_schema)?; let meta_table = Table { owner: self.table.owner.clone(), name: METADATA_TABLE.to_owned(), @@ -602,11 +562,21 @@ impl SinkFactory for OracleSinkFactory { let field_types = schema.fields.iter().map(|field| field.typ).collect(); + let merge_statement = generate_merge_statement(&self.table, &temp_table, &schema); + info!(target: "oracle_sink", "Merge statement {}", merge_statement); + + let insert_statement = generate_insert_statement(&temp_table, &schema); + info!(target: "oracle_sink", "Insert statement {}", insert_statement); + + let delete_statement = generate_delete_statement(&temp_table); + info!(target: "oracle_sink", "Delete statement {}", delete_statement); + Ok(Box::new(OracleSink { conn: connection, insert_append, - merge_statement: generate_merge_statement(&self.table, &temp_table, &schema), - insert_statement: generate_insert_statement(&temp_table, &schema), + merge_statement, + insert_statement, + delete_statement, field_types, pk: schema.primary_index, batch_params: Vec::new(), @@ -710,7 +680,7 @@ impl OracleSink { self.conn.execute(&self.merge_statement, &[])?; - self.conn.commit()?; + self.conn.execute(&self.delete_statement, &[])?; debug!(target: "oracle_sink", "Execution took {:?}", started.elapsed()); Ok(()) @@ -739,6 +709,7 @@ impl Sink for OracleSink { &mut self, _epoch_details: &dozer_core::epoch::Epoch, ) -> Result<(), dozer_types::errors::internal::BoxedError> { + // Ok(self.conn.commit()?) Ok(()) }