diff --git a/dozer-sink-oracle/src/lib.rs b/dozer-sink-oracle/src/lib.rs index 088b9b7c92..5df56fcd2e 100644 --- a/dozer-sink-oracle/src/lib.rs +++ b/dozer-sink-oracle/src/lib.rs @@ -1,4 +1,5 @@ use dozer_types::{ + log::warn, models::sink::OracleSinkConfig, thiserror, types::{FieldDefinition, Operation, SourceDefinition, TableOperation}, @@ -123,6 +124,7 @@ impl OracleSinkFactory { table: Table { owner, name: config.table_name, + unique_key: config.unique_key, }, } } @@ -325,20 +327,26 @@ impl OracleSinkFactory { .map(|ix| schema.fields[*ix].name.clone()) .collect::>(); - columns.iter_mut().for_each(|col| { - *col = col.to_uppercase(); - }); - let index_name = format!( "{}_{}_{}_TXN_ID_TXN_SEQ_INDEX", - table.owner, - table.name, - columns.join("_") - ) - .replace('#', ""); + table + .owner + .to_ascii_uppercase() + .strip_prefix("C##") + .unwrap_or(&table.owner), + table.name.to_ascii_uppercase(), + columns + .iter() + .map(|col| col + .chars() + .filter_map(|c| c.is_ascii_alphabetic().then_some(c.to_ascii_uppercase())) + .collect::()) + .collect::>() + .join("_") + ); - columns.push(format!("\"{}\"", TXN_ID_COL)); - columns.push(format!("\"{}\"", TXN_SEQ_COL)); + columns.push(TXN_ID_COL.to_owned()); + columns.push(TXN_SEQ_COL.to_owned()); let query = "SELECT index_name FROM all_indexes WHERE table_name = :1 AND owner = :2"; info!("Index check query {query}"); @@ -349,8 +357,13 @@ impl OracleSinkFactory { } else { let query = format!( "CREATE INDEX {index_name} ON {table} ({})", - columns.join(", ") + columns + .into_iter() + .map(|col| format!("\"{col}\"")) + .collect::>() + .join(", ") ); + dbg!(&query); info!("### CREATE INDEX #### \n: {index_name}. Query: {query}"); connection.execute(&query, &[])?; } @@ -386,16 +399,22 @@ fn generate_merge_statement(table: &Table, temp_table: &Table, schema: &Schema) .collect::>() .join(", "); - let mut pk_select = schema - .primary_index - .iter() - .map(|ix| &schema.fields[*ix].name) - .map(|name| format!("D.\"{name}\" = S.\"{name}\"")) - .collect::>() - .join(" AND "); - if pk_select.is_empty() { - pk_select = "1 = 1".to_owned(); - } + let unique_fields = if !table.unique_key.is_empty() { + table.unique_key.clone() + } else { + schema + .primary_index + .iter() + .map(|ix| &schema.fields[*ix].name) + .map(|name| format!("D.\"{name}\" = S.\"{name}\"")) + .collect::>() + }; + let pk_select = if !unique_fields.is_empty() { + unique_fields.join(" AND ") + } else { + warn!("No unique key defined for oracle sink table {table}. Table will be append-only"); + "1 = 0".to_owned() + }; let opid_select = format!( r#"(D."{TXN_ID_COL}" IS NULL @@ -453,6 +472,7 @@ fn generate_delete_statement(table: &Table) -> String { struct Table { owner: String, name: String, + unique_key: Vec, } #[async_trait] @@ -512,6 +532,7 @@ impl SinkFactory for OracleSinkFactory { let temp_table = Table { owner: self.table.owner.clone(), name: format!("ORA$PTT_{}", &self.table.name), + unique_key: vec![], }; self.validate_or_create_table( @@ -524,6 +545,7 @@ impl SinkFactory for OracleSinkFactory { let meta_table = Table { owner: self.table.owner.clone(), name: METADATA_TABLE.to_owned(), + unique_key: vec![], }; self.validate_or_create_table( &connection, @@ -826,6 +848,7 @@ impl Sink for OracleSink { #[cfg(test)] mod tests { use super::*; + use dozer_core::tokio; fn trim_str(s: impl AsRef) -> String { s.as_ref() @@ -847,11 +870,13 @@ mod tests { let table = Table { owner: "owner".to_owned(), name: "tablename".to_owned(), + unique_key: vec![], }; let temp_table = Table { owner: "owner".to_owned(), name: "tablename_temp".to_owned(), + unique_key: vec![], }; let stmt = generate_merge_statement(&table, &temp_table, &schema); @@ -875,6 +900,72 @@ mod tests { ) } + #[tokio::test] + #[ignore = "Needs oracle database"] + async fn test_insert_composite() { + let factory = OracleSinkFactory::new( + OracleConfig { + user: "C##DOZER".into(), + password: "123".into(), + host: "localhost".into(), + port: 1521, + sid: "ORCLCDB".into(), + pdb: Some("ORCLPDB1".into()), + schemas: vec![], + batch_size: None, + replicator: dozer_types::models::ingestion_types::OracleReplicator::LogMiner { + poll_interval_in_milliseconds: 0, + }, + }, + OracleSinkConfig { + connection: "".into(), + table_name: "test".into(), + owner: None, + unique_key: vec![], + }, + ); + let mut schema = Schema::new(); + schema.field( + FieldDefinition { + name: "ida".into(), + typ: FieldType::UInt, + nullable: false, + source: SourceDefinition::Dynamic, + }, + true, + ); + schema.field( + FieldDefinition { + name: "idb".into(), + typ: FieldType::UInt, + nullable: false, + source: SourceDefinition::Dynamic, + }, + true, + ); + let schemas = HashMap::from_iter([(DEFAULT_PORT_HANDLE, schema)]); + let mut sink = factory.build(schemas, EventHub::new(100)).await.unwrap(); + for id0 in 0..2 { + for id1 in 0..2 { + sink.process(TableOperation { + id: None, + op: Operation::Insert { + new: Record::new(vec![Field::UInt(id0), Field::UInt(id1)]), + }, + port: 0, + }) + .unwrap(); + } + } + sink.flush_batch().unwrap(); + let conn = Connection::connect("C##DOZER", "123", "localhost/ORCLPDB1").unwrap(); + assert_eq!( + conn.query_row_as::("SELECT COUNT(*) FROM \"test\"", &[]) + .unwrap(), + 4 + ) + } + fn f(name: &str) -> FieldDefinition { FieldDefinition { name: name.to_owned(), diff --git a/dozer-types/src/models/sink.rs b/dozer-types/src/models/sink.rs index 2a2559b1f2..10c1f468d5 100644 --- a/dozer-types/src/models/sink.rs +++ b/dozer-types/src/models/sink.rs @@ -245,6 +245,8 @@ pub struct OracleSinkConfig { pub connection: String, pub table_name: String, #[serde(default)] + pub unique_key: Vec, + #[serde(default)] pub owner: Option, } diff --git a/json_schemas/dozer.json b/json_schemas/dozer.json index a2dcd3c8bb..3bac6f286e 100644 --- a/json_schemas/dozer.json +++ b/json_schemas/dozer.json @@ -1478,6 +1478,13 @@ }, "table_name": { "type": "string" + }, + "unique_key": { + "default": [], + "type": "array", + "items": { + "type": "string" + } } }, "additionalProperties": false