Skip to content

Commit

Permalink
Fix snapshot inserts
Browse files Browse the repository at this point in the history
  • Loading branch information
karolisg committed Feb 27, 2024
1 parent afb1b62 commit 52d250c
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 44 deletions.
2 changes: 2 additions & 0 deletions dozer-ingestion/oracle/src/connector/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use dozer_ingestion_connector::{
use oracle::Row;

use super::{join::Column, Error};
use crate::info;

#[derive(Debug, Clone, Copy)]
pub struct MappedColumn {
Expand Down Expand Up @@ -50,6 +51,7 @@ fn map_data_type(
let typ = if data_type.starts_with("TIMESTAMP") {
FieldType::Timestamp
} else {
info!("{:?} {:?} {:?}", data_type, precision, scale);
match data_type {
"VARCHAR2" => Ok(FieldType::String),
"NVARCHAR2" => unimplemented!("convert NVARCHAR2 to String"),
Expand Down
57 changes: 13 additions & 44 deletions dozer-sink-oracle/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ impl From<oracle::Error> for Error {

#[derive(Debug)]
struct BatchedOperation {
op_id: Option<OpIdentifier>,
op_kind: OpKind,
params: Record,
}
Expand Down Expand Up @@ -201,8 +200,10 @@ impl OracleSinkFactory {
FieldType::String | FieldType::Text,
OracleType::Varchar2(_) | OracleType::NVarchar2(_),
) => {}
(FieldType::UInt | FieldType::Int, OracleType::Number(precision, 0))
(FieldType::U128 | FieldType::I128, OracleType::Number(precision, 0))
if precision >= 20 => {}
(FieldType::UInt | FieldType::Int, OracleType::Number(precision, 0))
if precision < 20 => {}
(FieldType::Float, OracleType::Number(38, 0) | OracleType::BinaryDouble) => {}
(FieldType::Boolean, OracleType::Number(_, 0)) => {}
(FieldType::Binary, OracleType::Raw(_)) => {}
Expand Down Expand Up @@ -246,9 +247,9 @@ impl OracleSinkFactory {
for field in &schema.fields {
let name = &field.name;
let col_type = match field.typ {
dozer_types::types::FieldType::UInt => "INTEGER",
dozer_types::types::FieldType::UInt => "NUMBER(19)",
dozer_types::types::FieldType::U128 => "INTEGER",
dozer_types::types::FieldType::Int => "INTEGER",
dozer_types::types::FieldType::Int => "NUMBER(19)",
dozer_types::types::FieldType::I128 => "INTEGER",
// Should this be BINARY_DOUBLE?
dozer_types::types::FieldType::Float => "NUMBER",
Expand Down Expand Up @@ -326,24 +327,17 @@ fn generate_merge_statement(table_name: &str, schema: &Schema) -> String {

let opkind_idx = parameter_index.next().unwrap();

let opid_select = format!(
r#"(D."{TXN_ID_COL}" IS NULL
OR S."{TXN_ID_COL}" > D."{TXN_ID_COL}"
OR (S."{TXN_ID_COL}" = D."{TXN_ID_COL}" AND S."{TXN_SEQ_COL}" > D."{TXN_SEQ_COL}"))"#
);

// Match on PK and txn_id.
// If the record does not exist and the op is INSERT, do the INSERT
// If the record exists, but the txid is higher than the operation's txid,
// do nothing (if the op is INSERT,
format!(
r#"MERGE INTO "{table_name}" D
USING (SELECT {input_fields}, :{opkind_idx} DOZER_OPKIND FROM DUAL) S
ON ({pk_select})
WHEN NOT MATCHED THEN INSERT ({destination_columns}) VALUES ({source_values}) WHERE S.DOZER_OPKIND = 0
WHEN MATCHED THEN UPDATE SET {destination_assign} WHERE S.DOZER_OPKIND = 1 AND {opid_select}
DELETE WHERE S.DOZER_OPKIND = 2 AND {opid_select}
"#
ON (S.DOZER_OPKIND > 0)
WHEN NOT MATCHED THEN INSERT ({destination_columns}) VALUES ({source_values})
WHEN MATCHED THEN UPDATE SET {destination_assign} WHERE {pk_select}
DELETE WHERE S.DOZER_OPKIND = 2"#
)
}

Expand Down Expand Up @@ -381,27 +375,8 @@ impl SinkFactory for OracleSinkFactory {
let schema = input_schemas.remove(&DEFAULT_PORT_HANDLE).unwrap();

let table_name = &self.table;
let mut amended_schema = schema.clone();
amended_schema.field(
dozer_types::types::FieldDefinition {
name: TXN_ID_COL.to_owned(),
typ: FieldType::UInt,
nullable: true,
source: dozer_types::types::SourceDefinition::Dynamic,
},
false,
);
amended_schema.field(
dozer_types::types::FieldDefinition {
name: TXN_SEQ_COL.to_owned(),
typ: FieldType::UInt,
nullable: true,
source: dozer_types::types::SourceDefinition::Dynamic,
},
false,
);

self.validate_or_create_table(&connection, table_name, &amended_schema)?;
self.validate_or_create_table(&connection, table_name, &schema)?;
self.validate_or_create_table(
&connection,
METADATA_TABLE,
Expand Down Expand Up @@ -429,7 +404,7 @@ impl SinkFactory for OracleSinkFactory {
let insert_append = format!(
//"INSERT /*+ APPEND */ INTO \"{table_name}\" VALUES ({})",
"INSERT INTO \"{table_name}\" VALUES ({})",
(1..=amended_schema.fields.len())
(1..=schema.fields.len())
.map(|i| format!(":{i}"))
.collect::<Vec<_>>()
.join(", ")
Expand Down Expand Up @@ -528,9 +503,6 @@ impl OracleSink {
{
batch.set(i, &OraField(field, *typ))?;
}
let (txid, seq_in_tx) = params.op_id.map(|opid| (opid.txid, opid.seq_in_tx)).unzip();
batch.set(bind_idx.next().unwrap(), &txid)?;
batch.set(bind_idx.next().unwrap(), &seq_in_tx)?;
batch.set(bind_idx.next().unwrap(), &(params.op_kind as u64))?;
batch.append_row(&[])?;
}
Expand All @@ -540,12 +512,11 @@ impl OracleSink {

fn batch(
&mut self,
op_id: Option<OpIdentifier>,
_op_id: Option<OpIdentifier>,
kind: OpKind,
record: Record,
) -> oracle::Result<()> {
self.batch_params.push(BatchedOperation {
op_id,
op_kind: kind,
params: record,
});
Expand Down Expand Up @@ -620,9 +591,7 @@ impl Sink for OracleSink {
{
batch.set(i, &OraField(field, *typ))?;
}
let (txid, seq_in_tx) = op.id.map(|id| (id.txid, id.seq_in_tx)).unzip();
batch.set(bind_idx.next().unwrap(), &txid)?;
batch.set(bind_idx.next().unwrap(), &seq_in_tx)?;
batch.append_row(&[])?;
}
batch.execute()?;
}
Expand Down

0 comments on commit 52d250c

Please sign in to comment.