Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Oracle sink: Add configuration for destination table owner #2442

Merged
merged 1 commit into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions dozer-cli/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,10 +300,7 @@ impl<'a> PipelineBuilder<'a> {
.ok_or_else(|| {
OrchestrationError::ConnectionNotFound(config.connection.clone())
})?;
let sink = Box::new(OracleSinkFactory {
config: connection.clone(),
table: config.table_name.clone(),
});
let sink = Box::new(OracleSinkFactory::new(connection.clone(), config.clone()));
let table_info = get_table_info(&config.table_name)?;
add_sink_to_pipeline(
&mut pipeline,
Expand Down
107 changes: 72 additions & 35 deletions dozer-sink-oracle/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use dozer_types::{
models::sink::OracleSinkConfig,
thiserror,
types::{FieldDefinition, Operation, SourceDefinition, TableOperation},
};
Expand Down Expand Up @@ -62,7 +63,7 @@ enum Error {
UpdatedPrimaryKey { old: Vec<Field>, new: Vec<Field> },
#[error("Destination table {table} has incompatible schema. {inner}")]
IncompatibleSchema {
table: String,
table: Table,
inner: SchemaValidationError,
},
#[error("Oracle database error: {0}")]
Expand Down Expand Up @@ -99,8 +100,29 @@ struct OracleSink {

#[derive(Debug)]
pub struct OracleSinkFactory {
pub config: OracleConfig,
pub table: String,
connection_config: OracleConfig,
table: Table,
}

impl std::fmt::Display for Table {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "\"{}\".\"{}\"", &self.owner, &self.name)
}
}

impl OracleSinkFactory {
pub fn new(connection_config: OracleConfig, config: OracleSinkConfig) -> Self {
let owner = config
.owner
.unwrap_or_else(|| connection_config.user.clone());
Self {
connection_config,
table: Table {
owner,
name: config.table_name,
},
}
}
}

fn parse_oracle_type(
Expand Down Expand Up @@ -166,17 +188,17 @@ impl OracleSinkFactory {
fn validate_table(
&self,
connection: &Connection,
table_name: &str,
table: &Table,
schema: &Schema,
) -> Result<bool, Error> {
let err = |e| Error::IncompatibleSchema {
table: table_name.to_owned(),
table: table.clone(),
inner: e,
};

let results = connection.query_as::<(String, String, u32, Option<u8>, Option<i8>, String)>(
"SELECT COLUMN_NAME, DATA_TYPE, DATA_LENGTH, DATA_PRECISION, DATA_SCALE, NULLABLE FROM ALL_TAB_COLS WHERE table_name = :1",
&[&table_name],
"SELECT COLUMN_NAME, DATA_TYPE, DATA_LENGTH, DATA_PRECISION, DATA_SCALE, NULLABLE FROM ALL_TAB_COLS WHERE table_name = :1 AND owner = :2",
&[&table.name, &table.owner],
)?;

let mut cols = HashMap::new();
Expand Down Expand Up @@ -242,10 +264,10 @@ impl OracleSinkFactory {
fn validate_or_create_table(
&self,
connection: &Connection,
table_name: &str,
table: &Table,
schema: &Schema,
) -> Result<(), Error> {
if self.validate_table(connection, table_name, schema)? {
if self.validate_table(connection, table, schema)? {
return Ok(());
}

Expand Down Expand Up @@ -275,10 +297,7 @@ impl OracleSinkFactory {
if field.nullable { "" } else { " NOT NULL" }
));
}
let table = format!(
"CREATE TABLE \"{table_name}\" ({})",
column_defs.join(",\n")
);
let table = format!("CREATE TABLE {table} ({})", column_defs.join(",\n"));
info!("### CREATE TABLE #### \n: {:?}", table);
connection.execute(&table, &[])?;

Expand All @@ -288,7 +307,7 @@ impl OracleSinkFactory {
fn create_index(
&self,
connection: &Connection,
table_name: &str,
table: &Table,
schema: &Schema,
) -> Result<(), Error> {
let mut columns = schema
Expand All @@ -301,21 +320,26 @@ impl OracleSinkFactory {
*col = col.to_uppercase();
});

let index_name =
format!("{table_name}_{}_TXN_ID_TXN_SEQ_INDEX", columns.join("_")).replace('#', "");
let index_name = format!(
"{}_{}_{}_TXN_ID_TXN_SEQ_INDEX",
table.owner,
table.name,
columns.join("_")
)
.replace('#', "");

columns.push(format!("\"{}\"", TXN_ID_COL));
columns.push(format!("\"{}\"", TXN_SEQ_COL));

let query = "SELECT index_name FROM all_indexes WHERE table_name = :1";
let query = "SELECT index_name FROM all_indexes WHERE table_name = :1 AND owner = :2";
info!("Index check query {query}");

let mut index_exist = connection.query(query, &[&table_name])?;
let mut index_exist = connection.query(query, &[&table.name, &table.owner])?;
if index_exist.next().is_some() {
info!("Index {index_name} already exist");
} else {
let query = format!(
"CREATE INDEX {index_name} ON {table_name} ({})",
"CREATE INDEX {index_name} ON {table} ({})",
columns.join(", ")
);
info!("### CREATE INDEX #### \n: {index_name}. Query: {query}");
Expand All @@ -325,7 +349,7 @@ impl OracleSinkFactory {
Ok(())
}
}
fn generate_merge_statement(table_name: &str, schema: &Schema) -> String {
fn generate_merge_statement(table: &Table, schema: &Schema) -> String {
let field_names = schema
.fields
.iter()
Expand Down Expand Up @@ -383,7 +407,7 @@ fn generate_merge_statement(table_name: &str, schema: &Schema) -> String {
// If the record exists, but the txid is higher than the operation's txid,
// do nothing (if the op is INSERT,
format!(
r#"MERGE INTO "{table_name}" D
r#"MERGE INTO {table} D
USING (SELECT {input_fields}, :{opkind_idx} DOZER_OPKIND FROM DUAL) S
ON ({pk_select})
WHEN NOT MATCHED THEN INSERT ({destination_columns}) VALUES ({source_values}) WHERE S.DOZER_OPKIND = 0
Expand All @@ -393,6 +417,12 @@ fn generate_merge_statement(table_name: &str, schema: &Schema) -> String {
)
}

#[derive(Debug, Clone)]
struct Table {
owner: String,
name: String,
}

#[async_trait]
impl SinkFactory for OracleSinkFactory {
fn type_name(&self) -> String {
Expand All @@ -404,7 +434,7 @@ impl SinkFactory for OracleSinkFactory {
}

fn get_input_port_name(&self, _port: &PortHandle) -> String {
self.table.clone()
self.table.name.clone()
}

fn prepare(&self, _input_schemas: HashMap<PortHandle, Schema>) -> Result<(), BoxedError> {
Expand All @@ -416,7 +446,7 @@ impl SinkFactory for OracleSinkFactory {
mut input_schemas: HashMap<PortHandle, Schema>,
_event_hub: EventHub,
) -> Result<Box<dyn dozer_core::node::Sink>, BoxedError> {
let config = &self.config;
let config = &self.connection_config;
let root_connect_string = format!(
"{}:{}/{}",
config.host,
Expand All @@ -427,8 +457,6 @@ impl SinkFactory for OracleSinkFactory {

let schema = input_schemas.remove(&DEFAULT_PORT_HANDLE).unwrap();

let table_name = &self.table;

let mut amended_schema = schema.clone();
amended_schema.field(
dozer_types::types::FieldDefinition {
Expand All @@ -449,11 +477,15 @@ impl SinkFactory for OracleSinkFactory {
false,
);

self.validate_or_create_table(&connection, table_name, &amended_schema)?;
self.create_index(&connection, table_name, &amended_schema)?;
self.validate_or_create_table(&connection, &self.table, &amended_schema)?;
self.create_index(&connection, &self.table, &amended_schema)?;
let meta_table = Table {
owner: self.table.owner.clone(),
name: METADATA_TABLE.to_owned(),
};
self.validate_or_create_table(
&connection,
METADATA_TABLE,
&meta_table,
Schema::new()
.field(
FieldDefinition {
Expand All @@ -477,7 +509,8 @@ impl SinkFactory for OracleSinkFactory {

let insert_append = format!(
//"INSERT /*+ APPEND */ INTO \"{table_name}\" VALUES ({})",
"INSERT INTO \"{table_name}\" VALUES ({})",
"INSERT INTO {} VALUES ({})",
&self.table,
(1..=amended_schema.fields.len())
.map(|i| format!(":{i}"))
.collect::<Vec<_>>()
Expand All @@ -488,15 +521,15 @@ impl SinkFactory for OracleSinkFactory {
Ok(Box::new(OracleSink {
conn: connection,
insert_append,
merge_statement: generate_merge_statement(table_name, &schema),
merge_statement: generate_merge_statement(&self.table, &schema),
field_types,
pk: schema.primary_index,
batch_params: Vec::new(),
//TODO: make this configurable
batch_size: 10000,
insert_metadata: format!("INSERT INTO \"{METADATA_TABLE}\" (\"{META_TABLE_COL}\", \"{META_TXN_ID_COL}\") VALUES (q'\"{table_name}\"', :1)"),
update_metadata: format!("UPDATE \"{METADATA_TABLE}\" SET \"{META_TXN_ID_COL}\" = :1 WHERE \"{META_TABLE_COL}\" = q'\"{table_name}\"'") ,
select_metadata: format!("SELECT \"{META_TXN_ID_COL}\" FROM \"{METADATA_TABLE}\" WHERE \"{META_TABLE_COL}\" = q'\"{table_name}\"'"),
insert_metadata: format!("INSERT INTO \"{METADATA_TABLE}\" (\"{META_TABLE_COL}\", \"{META_TXN_ID_COL}\") VALUES (q'\"{}_{}\"', :1)", &self.table.owner, &self.table.name),
update_metadata: format!("UPDATE \"{METADATA_TABLE}\" SET \"{META_TXN_ID_COL}\" = :1 WHERE \"{META_TABLE_COL}\" = q'\"{}_{}\"'", &self.table.owner, &self.table.name) ,
select_metadata: format!("SELECT \"{META_TXN_ID_COL}\" FROM \"{METADATA_TABLE}\" WHERE \"{META_TABLE_COL}\" = q'\"{}_{}\"'", &self.table.owner, &self.table.name),
latest_txid: None,
}))
}
Expand Down Expand Up @@ -747,12 +780,16 @@ mod tests {
.field(f("name"), true)
.field(f("content"), false);

let stmt = generate_merge_statement("tablename", &schema);
let table = Table {
owner: "owner".to_owned(),
name: "tablename".to_owned(),
};
let stmt = generate_merge_statement(&table, &schema);
assert_eq!(
trim_str(stmt),
trim_str(
r#"
MERGE INTO "tablename" D
MERGE INTO "owner"."tablename" D
USING (SELECT :1 "id", :2 "name", :3 "content", :4 "__txn_id", :5 "__txn_seq", :6 DOZER_OPKIND FROM DUAL) S
ON (D."id" = S."id" AND D."name" = S."name")
WHEN NOT MATCHED THEN INSERT (D."id", D."name", D."content", D."__txn_id", D."__txn_seq") VALUES (S."id", S."name", S."content", S."__txn_id", S."__txn_seq") WHERE S.DOZER_OPKIND = 0
Expand Down
2 changes: 2 additions & 0 deletions dozer-types/src/models/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ pub struct ClickhouseSinkTableOptions {
pub struct OracleSinkConfig {
pub connection: String,
pub table_name: String,
#[serde(default)]
pub owner: Option<String>,
}

pub fn default_log_reader_batch_size() -> u32 {
Expand Down
7 changes: 7 additions & 0 deletions json_schemas/dozer.json
Original file line number Diff line number Diff line change
Expand Up @@ -1451,6 +1451,13 @@
"connection": {
"type": "string"
},
"owner": {
"default": null,
"type": [
"string",
"null"
]
},
"table_name": {
"type": "string"
}
Expand Down
Loading