From e2baca33497b97e4f88dca35146fe3a5234c82c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karolis=20Gudi=C5=A1kis?= Date: Wed, 28 Feb 2024 03:03:16 +0200 Subject: [PATCH] Fix: Revert oracle resumability and fix snapshot data insert (#2425) * Fix snapshot inserts * remove log * Remove import * Fix field names * Fix data types mapping * Remove unused consts * Mark U128 and I128 as unimplemented --- dozer-sink-oracle/src/lib.rs | 67 ++++++++---------------------------- 1 file changed, 15 insertions(+), 52 deletions(-) diff --git a/dozer-sink-oracle/src/lib.rs b/dozer-sink-oracle/src/lib.rs index d665b7c73b..17a32ec083 100644 --- a/dozer-sink-oracle/src/lib.rs +++ b/dozer-sink-oracle/src/lib.rs @@ -20,8 +20,6 @@ use oracle::{ Connection, }; -const TXN_ID_COL: &str = "__txn_id"; -const TXN_SEQ_COL: &str = "__txn_seq"; const METADATA_TABLE: &str = "__replication_metadata"; const META_TXN_ID_COL: &str = "txn_id"; const META_TABLE_COL: &str = "table"; @@ -73,7 +71,6 @@ impl From for Error { #[derive(Debug)] struct BatchedOperation { - op_id: Option, op_kind: OpKind, params: Record, } @@ -201,6 +198,8 @@ impl OracleSinkFactory { FieldType::String | FieldType::Text, OracleType::Varchar2(_) | OracleType::NVarchar2(_), ) => {} + (FieldType::U128 | FieldType::I128, OracleType::Number(precision, 0)) + if precision >= 39 => {} (FieldType::UInt | FieldType::Int, OracleType::Number(precision, 0)) if precision >= 20 => {} (FieldType::Float, OracleType::Number(38, 0) | OracleType::BinaryDouble) => {} @@ -246,10 +245,10 @@ impl OracleSinkFactory { for field in &schema.fields { let name = &field.name; let col_type = match field.typ { - dozer_types::types::FieldType::UInt => "INTEGER", - dozer_types::types::FieldType::U128 => "INTEGER", - dozer_types::types::FieldType::Int => "INTEGER", - dozer_types::types::FieldType::I128 => "INTEGER", + dozer_types::types::FieldType::UInt => "NUMBER(20)", + dozer_types::types::FieldType::U128 => unimplemented!(), + dozer_types::types::FieldType::Int => "NUMBER(20)", + dozer_types::types::FieldType::I128 => unimplemented!(), // Should this be BINARY_DOUBLE? dozer_types::types::FieldType::Float => "NUMBER", dozer_types::types::FieldType::Boolean => "NUMBER", @@ -280,11 +279,7 @@ impl OracleSinkFactory { } fn generate_merge_statement(table_name: &str, schema: &Schema) -> String { - let field_names = schema - .fields - .iter() - .map(|field| field.name.as_str()) - .chain([TXN_ID_COL, TXN_SEQ_COL]); + let field_names = schema.fields.iter().map(|field| &field.name); let mut parameter_index = 1usize..; let input_fields = field_names @@ -326,12 +321,6 @@ fn generate_merge_statement(table_name: &str, schema: &Schema) -> String { let opkind_idx = parameter_index.next().unwrap(); - let opid_select = format!( - r#"(D."{TXN_ID_COL}" IS NULL - OR S."{TXN_ID_COL}" > D."{TXN_ID_COL}" - OR (S."{TXN_ID_COL}" = D."{TXN_ID_COL}" AND S."{TXN_SEQ_COL}" > D."{TXN_SEQ_COL}"))"# - ); - // Match on PK and txn_id. // If the record does not exist and the op is INSERT, do the INSERT // If the record exists, but the txid is higher than the operation's txid, @@ -339,11 +328,10 @@ fn generate_merge_statement(table_name: &str, schema: &Schema) -> String { format!( r#"MERGE INTO "{table_name}" D USING (SELECT {input_fields}, :{opkind_idx} DOZER_OPKIND FROM DUAL) S - ON ({pk_select}) - WHEN NOT MATCHED THEN INSERT ({destination_columns}) VALUES ({source_values}) WHERE S.DOZER_OPKIND = 0 - WHEN MATCHED THEN UPDATE SET {destination_assign} WHERE S.DOZER_OPKIND = 1 AND {opid_select} - DELETE WHERE S.DOZER_OPKIND = 2 AND {opid_select} - "# + ON (S.DOZER_OPKIND > 0) + WHEN NOT MATCHED THEN INSERT ({destination_columns}) VALUES ({source_values}) + WHEN MATCHED THEN UPDATE SET {destination_assign} WHERE {pk_select} + DELETE WHERE S.DOZER_OPKIND = 2"# ) } @@ -381,27 +369,8 @@ impl SinkFactory for OracleSinkFactory { let schema = input_schemas.remove(&DEFAULT_PORT_HANDLE).unwrap(); let table_name = &self.table; - let mut amended_schema = schema.clone(); - amended_schema.field( - dozer_types::types::FieldDefinition { - name: TXN_ID_COL.to_owned(), - typ: FieldType::UInt, - nullable: true, - source: dozer_types::types::SourceDefinition::Dynamic, - }, - false, - ); - amended_schema.field( - dozer_types::types::FieldDefinition { - name: TXN_SEQ_COL.to_owned(), - typ: FieldType::UInt, - nullable: true, - source: dozer_types::types::SourceDefinition::Dynamic, - }, - false, - ); - self.validate_or_create_table(&connection, table_name, &amended_schema)?; + self.validate_or_create_table(&connection, table_name, &schema)?; self.validate_or_create_table( &connection, METADATA_TABLE, @@ -429,7 +398,7 @@ impl SinkFactory for OracleSinkFactory { let insert_append = format!( //"INSERT /*+ APPEND */ INTO \"{table_name}\" VALUES ({})", "INSERT INTO \"{table_name}\" VALUES ({})", - (1..=amended_schema.fields.len()) + (1..=schema.fields.len()) .map(|i| format!(":{i}")) .collect::>() .join(", ") @@ -528,9 +497,6 @@ impl OracleSink { { batch.set(i, &OraField(field, *typ))?; } - let (txid, seq_in_tx) = params.op_id.map(|opid| (opid.txid, opid.seq_in_tx)).unzip(); - batch.set(bind_idx.next().unwrap(), &txid)?; - batch.set(bind_idx.next().unwrap(), &seq_in_tx)?; batch.set(bind_idx.next().unwrap(), &(params.op_kind as u64))?; batch.append_row(&[])?; } @@ -540,12 +506,11 @@ impl OracleSink { fn batch( &mut self, - op_id: Option, + _op_id: Option, kind: OpKind, record: Record, ) -> oracle::Result<()> { self.batch_params.push(BatchedOperation { - op_id, op_kind: kind, params: record, }); @@ -620,9 +585,7 @@ impl Sink for OracleSink { { batch.set(i, &OraField(field, *typ))?; } - let (txid, seq_in_tx) = op.id.map(|id| (id.txid, id.seq_in_tx)).unzip(); - batch.set(bind_idx.next().unwrap(), &txid)?; - batch.set(bind_idx.next().unwrap(), &seq_in_tx)?; + batch.append_row(&[])?; } batch.execute()?; }