Skip to content

Commit

Permalink
Fix: Revert oracle resumability and fix snapshot data insert (#2425)
Browse files Browse the repository at this point in the history
* Fix snapshot inserts

* remove log

* Remove import

* Fix field names

* Fix data types mapping

* Remove unused consts

* Mark U128 and I128 as unimplemented
  • Loading branch information
karolisg authored Feb 28, 2024
1 parent 62ea8f0 commit e2baca3
Showing 1 changed file with 15 additions and 52 deletions.
67 changes: 15 additions & 52 deletions dozer-sink-oracle/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ use oracle::{
Connection,
};

const TXN_ID_COL: &str = "__txn_id";
const TXN_SEQ_COL: &str = "__txn_seq";
const METADATA_TABLE: &str = "__replication_metadata";
const META_TXN_ID_COL: &str = "txn_id";
const META_TABLE_COL: &str = "table";
Expand Down Expand Up @@ -73,7 +71,6 @@ impl From<oracle::Error> for Error {

#[derive(Debug)]
struct BatchedOperation {
op_id: Option<OpIdentifier>,
op_kind: OpKind,
params: Record,
}
Expand Down Expand Up @@ -201,6 +198,8 @@ impl OracleSinkFactory {
FieldType::String | FieldType::Text,
OracleType::Varchar2(_) | OracleType::NVarchar2(_),
) => {}
(FieldType::U128 | FieldType::I128, OracleType::Number(precision, 0))
if precision >= 39 => {}
(FieldType::UInt | FieldType::Int, OracleType::Number(precision, 0))
if precision >= 20 => {}
(FieldType::Float, OracleType::Number(38, 0) | OracleType::BinaryDouble) => {}
Expand Down Expand Up @@ -246,10 +245,10 @@ impl OracleSinkFactory {
for field in &schema.fields {
let name = &field.name;
let col_type = match field.typ {
dozer_types::types::FieldType::UInt => "INTEGER",
dozer_types::types::FieldType::U128 => "INTEGER",
dozer_types::types::FieldType::Int => "INTEGER",
dozer_types::types::FieldType::I128 => "INTEGER",
dozer_types::types::FieldType::UInt => "NUMBER(20)",
dozer_types::types::FieldType::U128 => unimplemented!(),
dozer_types::types::FieldType::Int => "NUMBER(20)",
dozer_types::types::FieldType::I128 => unimplemented!(),
// Should this be BINARY_DOUBLE?
dozer_types::types::FieldType::Float => "NUMBER",
dozer_types::types::FieldType::Boolean => "NUMBER",
Expand Down Expand Up @@ -280,11 +279,7 @@ impl OracleSinkFactory {
}

fn generate_merge_statement(table_name: &str, schema: &Schema) -> String {
let field_names = schema
.fields
.iter()
.map(|field| field.name.as_str())
.chain([TXN_ID_COL, TXN_SEQ_COL]);
let field_names = schema.fields.iter().map(|field| &field.name);

let mut parameter_index = 1usize..;
let input_fields = field_names
Expand Down Expand Up @@ -326,24 +321,17 @@ fn generate_merge_statement(table_name: &str, schema: &Schema) -> String {

let opkind_idx = parameter_index.next().unwrap();

let opid_select = format!(
r#"(D."{TXN_ID_COL}" IS NULL
OR S."{TXN_ID_COL}" > D."{TXN_ID_COL}"
OR (S."{TXN_ID_COL}" = D."{TXN_ID_COL}" AND S."{TXN_SEQ_COL}" > D."{TXN_SEQ_COL}"))"#
);

// Match on PK and txn_id.
// If the record does not exist and the op is INSERT, do the INSERT
// If the record exists, but the txid is higher than the operation's txid,
// do nothing (if the op is INSERT,
format!(
r#"MERGE INTO "{table_name}" D
USING (SELECT {input_fields}, :{opkind_idx} DOZER_OPKIND FROM DUAL) S
ON ({pk_select})
WHEN NOT MATCHED THEN INSERT ({destination_columns}) VALUES ({source_values}) WHERE S.DOZER_OPKIND = 0
WHEN MATCHED THEN UPDATE SET {destination_assign} WHERE S.DOZER_OPKIND = 1 AND {opid_select}
DELETE WHERE S.DOZER_OPKIND = 2 AND {opid_select}
"#
ON (S.DOZER_OPKIND > 0)
WHEN NOT MATCHED THEN INSERT ({destination_columns}) VALUES ({source_values})
WHEN MATCHED THEN UPDATE SET {destination_assign} WHERE {pk_select}
DELETE WHERE S.DOZER_OPKIND = 2"#
)
}

Expand Down Expand Up @@ -381,27 +369,8 @@ impl SinkFactory for OracleSinkFactory {
let schema = input_schemas.remove(&DEFAULT_PORT_HANDLE).unwrap();

let table_name = &self.table;
let mut amended_schema = schema.clone();
amended_schema.field(
dozer_types::types::FieldDefinition {
name: TXN_ID_COL.to_owned(),
typ: FieldType::UInt,
nullable: true,
source: dozer_types::types::SourceDefinition::Dynamic,
},
false,
);
amended_schema.field(
dozer_types::types::FieldDefinition {
name: TXN_SEQ_COL.to_owned(),
typ: FieldType::UInt,
nullable: true,
source: dozer_types::types::SourceDefinition::Dynamic,
},
false,
);

self.validate_or_create_table(&connection, table_name, &amended_schema)?;
self.validate_or_create_table(&connection, table_name, &schema)?;
self.validate_or_create_table(
&connection,
METADATA_TABLE,
Expand Down Expand Up @@ -429,7 +398,7 @@ impl SinkFactory for OracleSinkFactory {
let insert_append = format!(
//"INSERT /*+ APPEND */ INTO \"{table_name}\" VALUES ({})",
"INSERT INTO \"{table_name}\" VALUES ({})",
(1..=amended_schema.fields.len())
(1..=schema.fields.len())
.map(|i| format!(":{i}"))
.collect::<Vec<_>>()
.join(", ")
Expand Down Expand Up @@ -528,9 +497,6 @@ impl OracleSink {
{
batch.set(i, &OraField(field, *typ))?;
}
let (txid, seq_in_tx) = params.op_id.map(|opid| (opid.txid, opid.seq_in_tx)).unzip();
batch.set(bind_idx.next().unwrap(), &txid)?;
batch.set(bind_idx.next().unwrap(), &seq_in_tx)?;
batch.set(bind_idx.next().unwrap(), &(params.op_kind as u64))?;
batch.append_row(&[])?;
}
Expand All @@ -540,12 +506,11 @@ impl OracleSink {

fn batch(
&mut self,
op_id: Option<OpIdentifier>,
_op_id: Option<OpIdentifier>,
kind: OpKind,
record: Record,
) -> oracle::Result<()> {
self.batch_params.push(BatchedOperation {
op_id,
op_kind: kind,
params: record,
});
Expand Down Expand Up @@ -620,9 +585,7 @@ impl Sink for OracleSink {
{
batch.set(i, &OraField(field, *typ))?;
}
let (txid, seq_in_tx) = op.id.map(|id| (id.txid, id.seq_in_tx)).unzip();
batch.set(bind_idx.next().unwrap(), &txid)?;
batch.set(bind_idx.next().unwrap(), &seq_in_tx)?;
batch.append_row(&[])?;
}
batch.execute()?;
}
Expand Down

0 comments on commit e2baca3

Please sign in to comment.