Skip to content

Commit

Permalink
Oracle sink: Add configuration for destination table owner (#2442)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jesse-Bakker authored Mar 7, 2024
1 parent 9d00e48 commit 9a5e355
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 39 deletions.
5 changes: 1 addition & 4 deletions dozer-cli/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,10 +300,7 @@ impl<'a> PipelineBuilder<'a> {
.ok_or_else(|| {
OrchestrationError::ConnectionNotFound(config.connection.clone())
})?;
let sink = Box::new(OracleSinkFactory {
config: connection.clone(),
table: config.table_name.clone(),
});
let sink = Box::new(OracleSinkFactory::new(connection.clone(), config.clone()));
let table_info = get_table_info(&config.table_name)?;
add_sink_to_pipeline(
&mut pipeline,
Expand Down
107 changes: 72 additions & 35 deletions dozer-sink-oracle/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use dozer_types::{
models::sink::OracleSinkConfig,
thiserror,
types::{FieldDefinition, Operation, SourceDefinition, TableOperation},
};
Expand Down Expand Up @@ -62,7 +63,7 @@ enum Error {
UpdatedPrimaryKey { old: Vec<Field>, new: Vec<Field> },
#[error("Destination table {table} has incompatible schema. {inner}")]
IncompatibleSchema {
table: String,
table: Table,
inner: SchemaValidationError,
},
#[error("Oracle database error: {0}")]
Expand Down Expand Up @@ -99,8 +100,29 @@ struct OracleSink {

#[derive(Debug)]
pub struct OracleSinkFactory {
pub config: OracleConfig,
pub table: String,
connection_config: OracleConfig,
table: Table,
}

impl std::fmt::Display for Table {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "\"{}\".\"{}\"", &self.owner, &self.name)
}
}

impl OracleSinkFactory {
pub fn new(connection_config: OracleConfig, config: OracleSinkConfig) -> Self {
let owner = config
.owner
.unwrap_or_else(|| connection_config.user.clone());
Self {
connection_config,
table: Table {
owner,
name: config.table_name,
},
}
}
}

fn parse_oracle_type(
Expand Down Expand Up @@ -166,17 +188,17 @@ impl OracleSinkFactory {
fn validate_table(
&self,
connection: &Connection,
table_name: &str,
table: &Table,
schema: &Schema,
) -> Result<bool, Error> {
let err = |e| Error::IncompatibleSchema {
table: table_name.to_owned(),
table: table.clone(),
inner: e,
};

let results = connection.query_as::<(String, String, u32, Option<u8>, Option<i8>, String)>(
"SELECT COLUMN_NAME, DATA_TYPE, DATA_LENGTH, DATA_PRECISION, DATA_SCALE, NULLABLE FROM ALL_TAB_COLS WHERE table_name = :1",
&[&table_name],
"SELECT COLUMN_NAME, DATA_TYPE, DATA_LENGTH, DATA_PRECISION, DATA_SCALE, NULLABLE FROM ALL_TAB_COLS WHERE table_name = :1 AND owner = :2",
&[&table.name, &table.owner],
)?;

let mut cols = HashMap::new();
Expand Down Expand Up @@ -242,10 +264,10 @@ impl OracleSinkFactory {
fn validate_or_create_table(
&self,
connection: &Connection,
table_name: &str,
table: &Table,
schema: &Schema,
) -> Result<(), Error> {
if self.validate_table(connection, table_name, schema)? {
if self.validate_table(connection, table, schema)? {
return Ok(());
}

Expand Down Expand Up @@ -275,10 +297,7 @@ impl OracleSinkFactory {
if field.nullable { "" } else { " NOT NULL" }
));
}
let table = format!(
"CREATE TABLE \"{table_name}\" ({})",
column_defs.join(",\n")
);
let table = format!("CREATE TABLE {table} ({})", column_defs.join(",\n"));
info!("### CREATE TABLE #### \n: {:?}", table);
connection.execute(&table, &[])?;

Expand All @@ -288,7 +307,7 @@ impl OracleSinkFactory {
fn create_index(
&self,
connection: &Connection,
table_name: &str,
table: &Table,
schema: &Schema,
) -> Result<(), Error> {
let mut columns = schema
Expand All @@ -301,21 +320,26 @@ impl OracleSinkFactory {
*col = col.to_uppercase();
});

let index_name =
format!("{table_name}_{}_TXN_ID_TXN_SEQ_INDEX", columns.join("_")).replace('#', "");
let index_name = format!(
"{}_{}_{}_TXN_ID_TXN_SEQ_INDEX",
table.owner,
table.name,
columns.join("_")
)
.replace('#', "");

columns.push(format!("\"{}\"", TXN_ID_COL));
columns.push(format!("\"{}\"", TXN_SEQ_COL));

let query = "SELECT index_name FROM all_indexes WHERE table_name = :1";
let query = "SELECT index_name FROM all_indexes WHERE table_name = :1 AND owner = :2";
info!("Index check query {query}");

let mut index_exist = connection.query(query, &[&table_name])?;
let mut index_exist = connection.query(query, &[&table.name, &table.owner])?;
if index_exist.next().is_some() {
info!("Index {index_name} already exist");
} else {
let query = format!(
"CREATE INDEX {index_name} ON {table_name} ({})",
"CREATE INDEX {index_name} ON {table} ({})",
columns.join(", ")
);
info!("### CREATE INDEX #### \n: {index_name}. Query: {query}");
Expand All @@ -325,7 +349,7 @@ impl OracleSinkFactory {
Ok(())
}
}
fn generate_merge_statement(table_name: &str, schema: &Schema) -> String {
fn generate_merge_statement(table: &Table, schema: &Schema) -> String {
let field_names = schema
.fields
.iter()
Expand Down Expand Up @@ -383,7 +407,7 @@ fn generate_merge_statement(table_name: &str, schema: &Schema) -> String {
// If the record exists, but the txid is higher than the operation's txid,
// do nothing (if the op is INSERT,
format!(
r#"MERGE INTO "{table_name}" D
r#"MERGE INTO {table} D
USING (SELECT {input_fields}, :{opkind_idx} DOZER_OPKIND FROM DUAL) S
ON ({pk_select})
WHEN NOT MATCHED THEN INSERT ({destination_columns}) VALUES ({source_values}) WHERE S.DOZER_OPKIND = 0
Expand All @@ -393,6 +417,12 @@ fn generate_merge_statement(table_name: &str, schema: &Schema) -> String {
)
}

#[derive(Debug, Clone)]
struct Table {
owner: String,
name: String,
}

#[async_trait]
impl SinkFactory for OracleSinkFactory {
fn type_name(&self) -> String {
Expand All @@ -404,7 +434,7 @@ impl SinkFactory for OracleSinkFactory {
}

fn get_input_port_name(&self, _port: &PortHandle) -> String {
self.table.clone()
self.table.name.clone()
}

fn prepare(&self, _input_schemas: HashMap<PortHandle, Schema>) -> Result<(), BoxedError> {
Expand All @@ -416,7 +446,7 @@ impl SinkFactory for OracleSinkFactory {
mut input_schemas: HashMap<PortHandle, Schema>,
_event_hub: EventHub,
) -> Result<Box<dyn dozer_core::node::Sink>, BoxedError> {
let config = &self.config;
let config = &self.connection_config;
let root_connect_string = format!(
"{}:{}/{}",
config.host,
Expand All @@ -427,8 +457,6 @@ impl SinkFactory for OracleSinkFactory {

let schema = input_schemas.remove(&DEFAULT_PORT_HANDLE).unwrap();

let table_name = &self.table;

let mut amended_schema = schema.clone();
amended_schema.field(
dozer_types::types::FieldDefinition {
Expand All @@ -449,11 +477,15 @@ impl SinkFactory for OracleSinkFactory {
false,
);

self.validate_or_create_table(&connection, table_name, &amended_schema)?;
self.create_index(&connection, table_name, &amended_schema)?;
self.validate_or_create_table(&connection, &self.table, &amended_schema)?;
self.create_index(&connection, &self.table, &amended_schema)?;
let meta_table = Table {
owner: self.table.owner.clone(),
name: METADATA_TABLE.to_owned(),
};
self.validate_or_create_table(
&connection,
METADATA_TABLE,
&meta_table,
Schema::new()
.field(
FieldDefinition {
Expand All @@ -477,7 +509,8 @@ impl SinkFactory for OracleSinkFactory {

let insert_append = format!(
//"INSERT /*+ APPEND */ INTO \"{table_name}\" VALUES ({})",
"INSERT INTO \"{table_name}\" VALUES ({})",
"INSERT INTO {} VALUES ({})",
&self.table,
(1..=amended_schema.fields.len())
.map(|i| format!(":{i}"))
.collect::<Vec<_>>()
Expand All @@ -488,15 +521,15 @@ impl SinkFactory for OracleSinkFactory {
Ok(Box::new(OracleSink {
conn: connection,
insert_append,
merge_statement: generate_merge_statement(table_name, &schema),
merge_statement: generate_merge_statement(&self.table, &schema),
field_types,
pk: schema.primary_index,
batch_params: Vec::new(),
//TODO: make this configurable
batch_size: 10000,
insert_metadata: format!("INSERT INTO \"{METADATA_TABLE}\" (\"{META_TABLE_COL}\", \"{META_TXN_ID_COL}\") VALUES (q'\"{table_name}\"', :1)"),
update_metadata: format!("UPDATE \"{METADATA_TABLE}\" SET \"{META_TXN_ID_COL}\" = :1 WHERE \"{META_TABLE_COL}\" = q'\"{table_name}\"'") ,
select_metadata: format!("SELECT \"{META_TXN_ID_COL}\" FROM \"{METADATA_TABLE}\" WHERE \"{META_TABLE_COL}\" = q'\"{table_name}\"'"),
insert_metadata: format!("INSERT INTO \"{METADATA_TABLE}\" (\"{META_TABLE_COL}\", \"{META_TXN_ID_COL}\") VALUES (q'\"{}_{}\"', :1)", &self.table.owner, &self.table.name),
update_metadata: format!("UPDATE \"{METADATA_TABLE}\" SET \"{META_TXN_ID_COL}\" = :1 WHERE \"{META_TABLE_COL}\" = q'\"{}_{}\"'", &self.table.owner, &self.table.name) ,
select_metadata: format!("SELECT \"{META_TXN_ID_COL}\" FROM \"{METADATA_TABLE}\" WHERE \"{META_TABLE_COL}\" = q'\"{}_{}\"'", &self.table.owner, &self.table.name),
latest_txid: None,
}))
}
Expand Down Expand Up @@ -747,12 +780,16 @@ mod tests {
.field(f("name"), true)
.field(f("content"), false);

let stmt = generate_merge_statement("tablename", &schema);
let table = Table {
owner: "owner".to_owned(),
name: "tablename".to_owned(),
};
let stmt = generate_merge_statement(&table, &schema);
assert_eq!(
trim_str(stmt),
trim_str(
r#"
MERGE INTO "tablename" D
MERGE INTO "owner"."tablename" D
USING (SELECT :1 "id", :2 "name", :3 "content", :4 "__txn_id", :5 "__txn_seq", :6 DOZER_OPKIND FROM DUAL) S
ON (D."id" = S."id" AND D."name" = S."name")
WHEN NOT MATCHED THEN INSERT (D."id", D."name", D."content", D."__txn_id", D."__txn_seq") VALUES (S."id", S."name", S."content", S."__txn_id", S."__txn_seq") WHERE S.DOZER_OPKIND = 0
Expand Down
2 changes: 2 additions & 0 deletions dozer-types/src/models/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ pub struct ClickhouseSinkTableOptions {
pub struct OracleSinkConfig {
pub connection: String,
pub table_name: String,
#[serde(default)]
pub owner: Option<String>,
}

pub fn default_log_reader_batch_size() -> u32 {
Expand Down
7 changes: 7 additions & 0 deletions json_schemas/dozer.json
Original file line number Diff line number Diff line change
Expand Up @@ -1467,6 +1467,13 @@
"connection": {
"type": "string"
},
"owner": {
"default": null,
"type": [
"string",
"null"
]
},
"table_name": {
"type": "string"
}
Expand Down

0 comments on commit 9a5e355

Please sign in to comment.