diff --git a/dozer-cli/src/pipeline/builder.rs b/dozer-cli/src/pipeline/builder.rs index 974c94e123..a3301d83b7 100644 --- a/dozer-cli/src/pipeline/builder.rs +++ b/dozer-cli/src/pipeline/builder.rs @@ -300,10 +300,7 @@ impl<'a> PipelineBuilder<'a> { .ok_or_else(|| { OrchestrationError::ConnectionNotFound(config.connection.clone()) })?; - let sink = Box::new(OracleSinkFactory { - config: connection.clone(), - table: config.table_name.clone(), - }); + let sink = Box::new(OracleSinkFactory::new(connection.clone(), config.clone())); let table_info = get_table_info(&config.table_name)?; add_sink_to_pipeline( &mut pipeline, diff --git a/dozer-sink-oracle/src/lib.rs b/dozer-sink-oracle/src/lib.rs index 0e62816b4e..3299a52f7a 100644 --- a/dozer-sink-oracle/src/lib.rs +++ b/dozer-sink-oracle/src/lib.rs @@ -1,4 +1,5 @@ use dozer_types::{ + models::sink::OracleSinkConfig, thiserror, types::{FieldDefinition, Operation, SourceDefinition, TableOperation}, }; @@ -62,7 +63,7 @@ enum Error { UpdatedPrimaryKey { old: Vec, new: Vec }, #[error("Destination table {table} has incompatible schema. {inner}")] IncompatibleSchema { - table: String, + table: Table, inner: SchemaValidationError, }, #[error("Oracle database error: {0}")] @@ -99,8 +100,29 @@ struct OracleSink { #[derive(Debug)] pub struct OracleSinkFactory { - pub config: OracleConfig, - pub table: String, + connection_config: OracleConfig, + table: Table, +} + +impl std::fmt::Display for Table { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "\"{}\".\"{}\"", &self.owner, &self.name) + } +} + +impl OracleSinkFactory { + pub fn new(connection_config: OracleConfig, config: OracleSinkConfig) -> Self { + let owner = config + .owner + .unwrap_or_else(|| connection_config.user.clone()); + Self { + connection_config, + table: Table { + owner, + name: config.table_name, + }, + } + } } fn parse_oracle_type( @@ -166,17 +188,17 @@ impl OracleSinkFactory { fn validate_table( &self, connection: &Connection, - table_name: &str, + table: &Table, schema: &Schema, ) -> Result { let err = |e| Error::IncompatibleSchema { - table: table_name.to_owned(), + table: table.clone(), inner: e, }; let results = connection.query_as::<(String, String, u32, Option, Option, String)>( - "SELECT COLUMN_NAME, DATA_TYPE, DATA_LENGTH, DATA_PRECISION, DATA_SCALE, NULLABLE FROM ALL_TAB_COLS WHERE table_name = :1", - &[&table_name], + "SELECT COLUMN_NAME, DATA_TYPE, DATA_LENGTH, DATA_PRECISION, DATA_SCALE, NULLABLE FROM ALL_TAB_COLS WHERE table_name = :1 AND owner = :2", + &[&table.name, &table.owner], )?; let mut cols = HashMap::new(); @@ -242,10 +264,10 @@ impl OracleSinkFactory { fn validate_or_create_table( &self, connection: &Connection, - table_name: &str, + table: &Table, schema: &Schema, ) -> Result<(), Error> { - if self.validate_table(connection, table_name, schema)? { + if self.validate_table(connection, table, schema)? { return Ok(()); } @@ -275,10 +297,7 @@ impl OracleSinkFactory { if field.nullable { "" } else { " NOT NULL" } )); } - let table = format!( - "CREATE TABLE \"{table_name}\" ({})", - column_defs.join(",\n") - ); + let table = format!("CREATE TABLE {table} ({})", column_defs.join(",\n")); info!("### CREATE TABLE #### \n: {:?}", table); connection.execute(&table, &[])?; @@ -288,7 +307,7 @@ impl OracleSinkFactory { fn create_index( &self, connection: &Connection, - table_name: &str, + table: &Table, schema: &Schema, ) -> Result<(), Error> { let mut columns = schema @@ -301,21 +320,26 @@ impl OracleSinkFactory { *col = col.to_uppercase(); }); - let index_name = - format!("{table_name}_{}_TXN_ID_TXN_SEQ_INDEX", columns.join("_")).replace('#', ""); + let index_name = format!( + "{}_{}_{}_TXN_ID_TXN_SEQ_INDEX", + table.owner, + table.name, + columns.join("_") + ) + .replace('#', ""); columns.push(format!("\"{}\"", TXN_ID_COL)); columns.push(format!("\"{}\"", TXN_SEQ_COL)); - let query = "SELECT index_name FROM all_indexes WHERE table_name = :1"; + let query = "SELECT index_name FROM all_indexes WHERE table_name = :1 AND owner = :2"; info!("Index check query {query}"); - let mut index_exist = connection.query(query, &[&table_name])?; + let mut index_exist = connection.query(query, &[&table.name, &table.owner])?; if index_exist.next().is_some() { info!("Index {index_name} already exist"); } else { let query = format!( - "CREATE INDEX {index_name} ON {table_name} ({})", + "CREATE INDEX {index_name} ON {table} ({})", columns.join(", ") ); info!("### CREATE INDEX #### \n: {index_name}. Query: {query}"); @@ -325,7 +349,7 @@ impl OracleSinkFactory { Ok(()) } } -fn generate_merge_statement(table_name: &str, schema: &Schema) -> String { +fn generate_merge_statement(table: &Table, schema: &Schema) -> String { let field_names = schema .fields .iter() @@ -383,7 +407,7 @@ fn generate_merge_statement(table_name: &str, schema: &Schema) -> String { // If the record exists, but the txid is higher than the operation's txid, // do nothing (if the op is INSERT, format!( - r#"MERGE INTO "{table_name}" D + r#"MERGE INTO {table} D USING (SELECT {input_fields}, :{opkind_idx} DOZER_OPKIND FROM DUAL) S ON ({pk_select}) WHEN NOT MATCHED THEN INSERT ({destination_columns}) VALUES ({source_values}) WHERE S.DOZER_OPKIND = 0 @@ -393,6 +417,12 @@ fn generate_merge_statement(table_name: &str, schema: &Schema) -> String { ) } +#[derive(Debug, Clone)] +struct Table { + owner: String, + name: String, +} + #[async_trait] impl SinkFactory for OracleSinkFactory { fn type_name(&self) -> String { @@ -404,7 +434,7 @@ impl SinkFactory for OracleSinkFactory { } fn get_input_port_name(&self, _port: &PortHandle) -> String { - self.table.clone() + self.table.name.clone() } fn prepare(&self, _input_schemas: HashMap) -> Result<(), BoxedError> { @@ -416,7 +446,7 @@ impl SinkFactory for OracleSinkFactory { mut input_schemas: HashMap, _event_hub: EventHub, ) -> Result, BoxedError> { - let config = &self.config; + let config = &self.connection_config; let root_connect_string = format!( "{}:{}/{}", config.host, @@ -427,8 +457,6 @@ impl SinkFactory for OracleSinkFactory { let schema = input_schemas.remove(&DEFAULT_PORT_HANDLE).unwrap(); - let table_name = &self.table; - let mut amended_schema = schema.clone(); amended_schema.field( dozer_types::types::FieldDefinition { @@ -449,11 +477,15 @@ impl SinkFactory for OracleSinkFactory { false, ); - self.validate_or_create_table(&connection, table_name, &amended_schema)?; - self.create_index(&connection, table_name, &amended_schema)?; + self.validate_or_create_table(&connection, &self.table, &amended_schema)?; + self.create_index(&connection, &self.table, &amended_schema)?; + let meta_table = Table { + owner: self.table.owner.clone(), + name: METADATA_TABLE.to_owned(), + }; self.validate_or_create_table( &connection, - METADATA_TABLE, + &meta_table, Schema::new() .field( FieldDefinition { @@ -477,7 +509,8 @@ impl SinkFactory for OracleSinkFactory { let insert_append = format!( //"INSERT /*+ APPEND */ INTO \"{table_name}\" VALUES ({})", - "INSERT INTO \"{table_name}\" VALUES ({})", + "INSERT INTO {} VALUES ({})", + &self.table, (1..=amended_schema.fields.len()) .map(|i| format!(":{i}")) .collect::>() @@ -488,15 +521,15 @@ impl SinkFactory for OracleSinkFactory { Ok(Box::new(OracleSink { conn: connection, insert_append, - merge_statement: generate_merge_statement(table_name, &schema), + merge_statement: generate_merge_statement(&self.table, &schema), field_types, pk: schema.primary_index, batch_params: Vec::new(), //TODO: make this configurable batch_size: 10000, - insert_metadata: format!("INSERT INTO \"{METADATA_TABLE}\" (\"{META_TABLE_COL}\", \"{META_TXN_ID_COL}\") VALUES (q'\"{table_name}\"', :1)"), - update_metadata: format!("UPDATE \"{METADATA_TABLE}\" SET \"{META_TXN_ID_COL}\" = :1 WHERE \"{META_TABLE_COL}\" = q'\"{table_name}\"'") , - select_metadata: format!("SELECT \"{META_TXN_ID_COL}\" FROM \"{METADATA_TABLE}\" WHERE \"{META_TABLE_COL}\" = q'\"{table_name}\"'"), + insert_metadata: format!("INSERT INTO \"{METADATA_TABLE}\" (\"{META_TABLE_COL}\", \"{META_TXN_ID_COL}\") VALUES (q'\"{}_{}\"', :1)", &self.table.owner, &self.table.name), + update_metadata: format!("UPDATE \"{METADATA_TABLE}\" SET \"{META_TXN_ID_COL}\" = :1 WHERE \"{META_TABLE_COL}\" = q'\"{}_{}\"'", &self.table.owner, &self.table.name) , + select_metadata: format!("SELECT \"{META_TXN_ID_COL}\" FROM \"{METADATA_TABLE}\" WHERE \"{META_TABLE_COL}\" = q'\"{}_{}\"'", &self.table.owner, &self.table.name), latest_txid: None, })) } @@ -747,12 +780,16 @@ mod tests { .field(f("name"), true) .field(f("content"), false); - let stmt = generate_merge_statement("tablename", &schema); + let table = Table { + owner: "owner".to_owned(), + name: "tablename".to_owned(), + }; + let stmt = generate_merge_statement(&table, &schema); assert_eq!( trim_str(stmt), trim_str( r#" - MERGE INTO "tablename" D + MERGE INTO "owner"."tablename" D USING (SELECT :1 "id", :2 "name", :3 "content", :4 "__txn_id", :5 "__txn_seq", :6 DOZER_OPKIND FROM DUAL) S ON (D."id" = S."id" AND D."name" = S."name") WHEN NOT MATCHED THEN INSERT (D."id", D."name", D."content", D."__txn_id", D."__txn_seq") VALUES (S."id", S."name", S."content", S."__txn_id", S."__txn_seq") WHERE S.DOZER_OPKIND = 0 diff --git a/dozer-types/src/models/sink.rs b/dozer-types/src/models/sink.rs index 766d870db8..bcf42ab737 100644 --- a/dozer-types/src/models/sink.rs +++ b/dozer-types/src/models/sink.rs @@ -183,6 +183,8 @@ pub struct ClickhouseSinkTableOptions { pub struct OracleSinkConfig { pub connection: String, pub table_name: String, + #[serde(default)] + pub owner: Option, } pub fn default_log_reader_batch_size() -> u32 { diff --git a/json_schemas/dozer.json b/json_schemas/dozer.json index a6a8ee2d93..6643cf9b5c 100644 --- a/json_schemas/dozer.json +++ b/json_schemas/dozer.json @@ -1467,6 +1467,13 @@ "connection": { "type": "string" }, + "owner": { + "default": null, + "type": [ + "string", + "null" + ] + }, "table_name": { "type": "string" }