Skip to content

Commit

Permalink
Oracle sink: Allow custom unique_key and fix index creation
Browse files Browse the repository at this point in the history
  • Loading branch information
Jesse-Bakker committed Apr 2, 2024
1 parent df196a6 commit 173efb8
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 22 deletions.
135 changes: 113 additions & 22 deletions dozer-sink-oracle/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use dozer_types::{
log::warn,
models::sink::OracleSinkConfig,
thiserror,
types::{FieldDefinition, Operation, SourceDefinition, TableOperation},
Expand Down Expand Up @@ -123,6 +124,7 @@ impl OracleSinkFactory {
table: Table {
owner,
name: config.table_name,
unique_key: config.unique_key,
},
}
}
Expand Down Expand Up @@ -325,20 +327,26 @@ impl OracleSinkFactory {
.map(|ix| schema.fields[*ix].name.clone())
.collect::<Vec<_>>();

columns.iter_mut().for_each(|col| {
*col = col.to_uppercase();
});

let index_name = format!(
"{}_{}_{}_TXN_ID_TXN_SEQ_INDEX",
table.owner,
table.name,
columns.join("_")
)
.replace('#', "");
table
.owner
.to_ascii_uppercase()
.strip_prefix("C##")
.unwrap_or(&table.owner),
table.name.to_ascii_uppercase(),
columns
.iter()
.map(|col| col
.chars()
.filter_map(|c| c.is_ascii_alphabetic().then_some(c.to_ascii_uppercase()))
.collect::<String>())
.collect::<Vec<_>>()
.join("_")
);

columns.push(format!("\"{}\"", TXN_ID_COL));
columns.push(format!("\"{}\"", TXN_SEQ_COL));
columns.push(TXN_ID_COL.to_owned());
columns.push(TXN_SEQ_COL.to_owned());

let query = "SELECT index_name FROM all_indexes WHERE table_name = :1 AND owner = :2";
info!("Index check query {query}");
Expand All @@ -349,8 +357,13 @@ impl OracleSinkFactory {
} else {
let query = format!(
"CREATE INDEX {index_name} ON {table} ({})",
columns.join(", ")
columns
.into_iter()
.map(|col| format!("\"{col}\""))
.collect::<Vec<_>>()
.join(", ")
);
dbg!(&query);
info!("### CREATE INDEX #### \n: {index_name}. Query: {query}");
connection.execute(&query, &[])?;
}
Expand Down Expand Up @@ -386,16 +399,22 @@ fn generate_merge_statement(table: &Table, temp_table: &Table, schema: &Schema)
.collect::<Vec<_>>()
.join(", ");

let mut pk_select = schema
.primary_index
.iter()
.map(|ix| &schema.fields[*ix].name)
.map(|name| format!("D.\"{name}\" = S.\"{name}\""))
.collect::<Vec<_>>()
.join(" AND ");
if pk_select.is_empty() {
pk_select = "1 = 1".to_owned();
}
let unique_fields = if !table.unique_key.is_empty() {
table.unique_key.clone()
} else {
schema
.primary_index
.iter()
.map(|ix| &schema.fields[*ix].name)
.map(|name| format!("D.\"{name}\" = S.\"{name}\""))
.collect::<Vec<_>>()
};
let pk_select = if !unique_fields.is_empty() {
unique_fields.join(" AND ")
} else {
warn!("No unique key defined for oracle sink table {table}. Table will be append-only");
"1 = 0".to_owned()
};

let opid_select = format!(
r#"(D."{TXN_ID_COL}" IS NULL
Expand Down Expand Up @@ -453,6 +472,7 @@ fn generate_delete_statement(table: &Table) -> String {
struct Table {
owner: String,
name: String,
unique_key: Vec<String>,
}

#[async_trait]
Expand Down Expand Up @@ -512,6 +532,7 @@ impl SinkFactory for OracleSinkFactory {
let temp_table = Table {
owner: self.table.owner.clone(),
name: format!("ORA$PTT_{}", &self.table.name),
unique_key: vec![],
};

self.validate_or_create_table(
Expand All @@ -524,6 +545,7 @@ impl SinkFactory for OracleSinkFactory {
let meta_table = Table {
owner: self.table.owner.clone(),
name: METADATA_TABLE.to_owned(),
unique_key: vec![],
};
self.validate_or_create_table(
&connection,
Expand Down Expand Up @@ -826,6 +848,7 @@ impl Sink for OracleSink {
#[cfg(test)]
mod tests {
use super::*;
use dozer_core::tokio;

fn trim_str(s: impl AsRef<str>) -> String {
s.as_ref()
Expand All @@ -847,11 +870,13 @@ mod tests {
let table = Table {
owner: "owner".to_owned(),
name: "tablename".to_owned(),
unique_key: vec![],
};

let temp_table = Table {
owner: "owner".to_owned(),
name: "tablename_temp".to_owned(),
unique_key: vec![],
};

let stmt = generate_merge_statement(&table, &temp_table, &schema);
Expand All @@ -875,6 +900,72 @@ mod tests {
)
}

#[tokio::test]
#[ignore = "Needs oracle database"]
async fn test_insert_composite() {
let factory = OracleSinkFactory::new(
OracleConfig {
user: "C##DOZER".into(),
password: "123".into(),
host: "localhost".into(),
port: 1521,
sid: "ORCLCDB".into(),
pdb: Some("ORCLPDB1".into()),
schemas: vec![],
batch_size: None,
replicator: dozer_types::models::ingestion_types::OracleReplicator::LogMiner {
poll_interval_in_milliseconds: 0,
},
},
OracleSinkConfig {
connection: "".into(),
table_name: "test".into(),
owner: None,
unique_key: vec![],
},
);
let mut schema = Schema::new();
schema.field(
FieldDefinition {
name: "ida".into(),
typ: FieldType::UInt,
nullable: false,
source: SourceDefinition::Dynamic,
},
true,
);
schema.field(
FieldDefinition {
name: "idb".into(),
typ: FieldType::UInt,
nullable: false,
source: SourceDefinition::Dynamic,
},
true,
);
let schemas = HashMap::from_iter([(DEFAULT_PORT_HANDLE, schema)]);
let mut sink = factory.build(schemas, EventHub::new(100)).await.unwrap();
for id0 in 0..2 {
for id1 in 0..2 {
sink.process(TableOperation {
id: None,
op: Operation::Insert {
new: Record::new(vec![Field::UInt(id0), Field::UInt(id1)]),
},
port: 0,
})
.unwrap();
}
}
sink.flush_batch().unwrap();
let conn = Connection::connect("C##DOZER", "123", "localhost/ORCLPDB1").unwrap();
assert_eq!(
conn.query_row_as::<usize>("SELECT COUNT(*) FROM \"test\"", &[])
.unwrap(),
4
)
}

fn f(name: &str) -> FieldDefinition {
FieldDefinition {
name: name.to_owned(),
Expand Down
2 changes: 2 additions & 0 deletions dozer-types/src/models/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ pub struct OracleSinkConfig {
pub connection: String,
pub table_name: String,
#[serde(default)]
pub unique_key: Vec<String>,
#[serde(default)]
pub owner: Option<String>,
}

Expand Down
7 changes: 7 additions & 0 deletions json_schemas/dozer.json
Original file line number Diff line number Diff line change
Expand Up @@ -1478,6 +1478,13 @@
},
"table_name": {
"type": "string"
},
"unique_key": {
"default": [],
"type": "array",
"items": {
"type": "string"
}
}
},
"additionalProperties": false
Expand Down

0 comments on commit 173efb8

Please sign in to comment.