From 7fb445ff91db2ec21cb8253659e1d85a591967cc Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Karolis=20Gudi=C5=A1kis?= <karolisgud@gmail.com>
Date: Sun, 25 Feb 2024 10:17:17 +0200
Subject: [PATCH 1/2] chore: Add oracle instant client library to docker image
 (#2421)

---
 ci/Dockerfile | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/ci/Dockerfile b/ci/Dockerfile
index dbd61d5ac8..e5d97d456e 100644
--- a/ci/Dockerfile
+++ b/ci/Dockerfile
@@ -6,7 +6,8 @@ RUN apt-get update \
       odbcinst \
       unixodbc \
       curl \
-      unzip
+      unzip \
+      libaio1
 
 # INSTALL PROTOBUF
 RUN curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v3.18.1/protoc-3.18.1-linux-x86_64.zip 

From afb1b629fe77b84f10ad7299b92d442d0e3734e1 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Karolis=20Gudi=C5=A1kis?= <karolisgud@gmail.com>
Date: Mon, 26 Feb 2024 13:14:24 +0200
Subject: [PATCH 2/2] feat: Support integer on oracle ingestion/sink (#2419)

---
 dozer-ingestion/oracle/src/connector/join.rs  |  4 ++++
 .../oracle/src/connector/listing.rs           | 22 +++++++++++------
 .../oracle/src/connector/mapping.rs           | 19 ++++++++++++++-
 dozer-ingestion/oracle/src/connector/mod.rs   |  4 ++++
 .../connector/replicate/log_miner/mapping.rs  | 10 ++++++++
 dozer-sink-oracle/src/lib.rs                  | 24 ++++++++++++-------
 6 files changed, 67 insertions(+), 16 deletions(-)

diff --git a/dozer-ingestion/oracle/src/connector/join.rs b/dozer-ingestion/oracle/src/connector/join.rs
index ad04a9c948..0073463449 100644
--- a/dozer-ingestion/oracle/src/connector/join.rs
+++ b/dozer-ingestion/oracle/src/connector/join.rs
@@ -8,6 +8,8 @@ pub struct Column {
     pub data_type: Option<String>,
     pub nullable: Option<String>,
     pub is_primary_key: bool,
+    pub precision: Option<i64>,
+    pub scale: Option<i64>,
 }
 
 pub fn join_columns_constraints(
@@ -47,6 +49,8 @@ pub fn join_columns_constraints(
             data_type: table_column.data_type,
             nullable: table_column.nullable,
             is_primary_key,
+            precision: table_column.precision,
+            scale: table_column.scale,
         };
         let table_pair = (column_triple.0, column_triple.1);
         table_to_columns.entry(table_pair).or_default().push(column);
diff --git a/dozer-ingestion/oracle/src/connector/listing.rs b/dozer-ingestion/oracle/src/connector/listing.rs
index 089089d81b..77f449136d 100644
--- a/dozer-ingestion/oracle/src/connector/listing.rs
+++ b/dozer-ingestion/oracle/src/connector/listing.rs
@@ -9,32 +9,40 @@ pub struct TableColumn {
     pub column_name: String,
     pub data_type: Option<String>,
     pub nullable: Option<String>,
+    pub precision: Option<i64>,
+    pub scale: Option<i64>,
 }
 
 impl TableColumn {
     pub fn list(connection: &Connection, schemas: &[String]) -> Result<Vec<TableColumn>, Error> {
         assert!(!schemas.is_empty());
         let sql = "
-        SELECT OWNER, TABLE_NAME, COLUMN_NAME, DATA_TYPE, NULLABLE
+        SELECT OWNER, TABLE_NAME, COLUMN_NAME, DATA_TYPE, NULLABLE, DATA_PRECISION, DATA_SCALE
         FROM ALL_TAB_COLUMNS
         WHERE OWNER IN (SELECT COLUMN_VALUE FROM TABLE(:2))
         ";
         let schemas = super::string_collection(connection, schemas)?;
-        let rows = connection
-            .query_as::<(String, String, String, Option<String>, Option<String>)>(
-                sql,
-                &[&schemas],
-            )?;
+        let rows = connection.query_as::<(
+            String,
+            String,
+            String,
+            Option<String>,
+            Option<String>,
+            Option<i64>,
+            Option<i64>,
+        )>(sql, &[&schemas])?;
 
         let mut columns = Vec::new();
         for row in rows {
-            let (owner, table_name, column_name, data_type, nullable) = row?;
+            let (owner, table_name, column_name, data_type, nullable, precision, scale) = row?;
             let column = TableColumn {
                 owner,
                 table_name,
                 column_name,
                 data_type,
                 nullable,
+                precision,
+                scale,
             };
             columns.push(column);
         }
diff --git a/dozer-ingestion/oracle/src/connector/mapping.rs b/dozer-ingestion/oracle/src/connector/mapping.rs
index 89a43451f7..0e8a545bb9 100644
--- a/dozer-ingestion/oracle/src/connector/mapping.rs
+++ b/dozer-ingestion/oracle/src/connector/mapping.rs
@@ -39,6 +39,8 @@ fn map_data_type(
     column_name: &str,
     data_type: Option<&str>,
     nullable: Option<&str>,
+    precision: Option<i64>,
+    scale: Option<i64>,
 ) -> Result<MappedColumn, DataTypeError> {
     let data_type = data_type.ok_or_else(|| DataTypeError::ColumnDataTypeIsNull {
         schema: schema.to_string(),
@@ -51,7 +53,12 @@ fn map_data_type(
         match data_type {
             "VARCHAR2" => Ok(FieldType::String),
             "NVARCHAR2" => unimplemented!("convert NVARCHAR2 to String"),
-            "NUMBER" => Ok(FieldType::Decimal),
+            "INTEGER" => Ok(FieldType::I128),
+            "NUMBER" => match (precision, scale) {
+                (Some(precision), Some(0)) if precision <= 19 => Ok(FieldType::Int),
+                (_, Some(0)) => Ok(FieldType::I128),
+                _ => Ok(FieldType::Decimal),
+            },
             "FLOAT" => Ok(FieldType::Float),
             "DATE" => Ok(FieldType::Date),
             "BINARY_FLOAT" => Ok(FieldType::Float),
@@ -89,6 +96,14 @@ pub fn map_row(schema: &Schema, row: Row) -> Result<Record, Error> {
 
 fn map_field(index: usize, field: &FieldDefinition, row: &Row) -> Result<Field, Error> {
     Ok(match (field.typ, field.nullable) {
+        (FieldType::Int, true) => row
+            .get::<_, Option<i64>>(index)?
+            .map_or(Field::Null, Field::Int),
+        (FieldType::Int, false) => Field::Int(row.get(index)?),
+        (FieldType::UInt, true) => row
+            .get::<_, Option<u64>>(index)?
+            .map_or(Field::Null, Field::UInt),
+        (FieldType::UInt, false) => Field::UInt(row.get(index)?),
         (FieldType::Float, true) => row
             .get::<_, Option<f64>>(index)?
             .map_or(Field::Null, |value| Field::Float(OrderedFloat(value))),
@@ -153,6 +168,8 @@ fn map_columns(schema: &str, table_name: &str, columns: Vec<Column>) -> ColumnMa
                 &column.name,
                 column.data_type.as_deref(),
                 column.nullable.as_deref(),
+                column.precision,
+                column.scale,
             );
             (
                 column.name,
diff --git a/dozer-ingestion/oracle/src/connector/mod.rs b/dozer-ingestion/oracle/src/connector/mod.rs
index e680aad915..079366513c 100644
--- a/dozer-ingestion/oracle/src/connector/mod.rs
+++ b/dozer-ingestion/oracle/src/connector/mod.rs
@@ -69,6 +69,10 @@ pub enum Error {
     ParseDateTime(#[source] chrono::ParseError, String),
     #[error("got overflow float number {0}")]
     FloatOverflow(Decimal),
+    #[error("got error when parsing uint {0}")]
+    ParseUIntFailed(Decimal),
+    #[error("got error when parsing int {0}")]
+    ParseIntFailed(Decimal),
     #[error("type mismatch for {field}, expected {expected:?}, actual {actual:?}")]
     TypeMismatch {
         field: String,
diff --git a/dozer-ingestion/oracle/src/connector/replicate/log_miner/mapping.rs b/dozer-ingestion/oracle/src/connector/replicate/log_miner/mapping.rs
index 4bae5c974e..1803f19c02 100644
--- a/dozer-ingestion/oracle/src/connector/replicate/log_miner/mapping.rs
+++ b/dozer-ingestion/oracle/src/connector/replicate/log_miner/mapping.rs
@@ -42,6 +42,16 @@ fn map_value(
         ))),
         (ParsedValue::String(string), FieldType::Decimal, _) => Ok(Field::Decimal(string.parse()?)),
         (ParsedValue::Number(number), FieldType::Decimal, _) => Ok(Field::Decimal(number)),
+        (ParsedValue::Number(number), FieldType::Int, _) => Ok(Field::Int(
+            number
+                .to_i64()
+                .ok_or_else(|| Error::ParseIntFailed(number))?,
+        )),
+        (ParsedValue::Number(number), FieldType::UInt, _) => Ok(Field::UInt(
+            number
+                .to_u64()
+                .ok_or_else(|| Error::ParseUIntFailed(number))?,
+        )),
         (ParsedValue::String(string), FieldType::String, _) => Ok(Field::String(string)),
         (ParsedValue::Number(_), FieldType::String, _) => Err(Error::TypeMismatch {
             field: name.to_string(),
diff --git a/dozer-sink-oracle/src/lib.rs b/dozer-sink-oracle/src/lib.rs
index ac58ae8705..d665b7c73b 100644
--- a/dozer-sink-oracle/src/lib.rs
+++ b/dozer-sink-oracle/src/lib.rs
@@ -246,10 +246,10 @@ impl OracleSinkFactory {
         for field in &schema.fields {
             let name = &field.name;
             let col_type = match field.typ {
-                dozer_types::types::FieldType::UInt => "NUMBER",
-                dozer_types::types::FieldType::U128 => unimplemented!(),
-                dozer_types::types::FieldType::Int => "NUMBER",
-                dozer_types::types::FieldType::I128 => unimplemented!(),
+                dozer_types::types::FieldType::UInt => "INTEGER",
+                dozer_types::types::FieldType::U128 => "INTEGER",
+                dozer_types::types::FieldType::Int => "INTEGER",
+                dozer_types::types::FieldType::I128 => "INTEGER",
                 // Should this be BINARY_DOUBLE?
                 dozer_types::types::FieldType::Float => "NUMBER",
                 dozer_types::types::FieldType::Boolean => "NUMBER",
@@ -325,9 +325,13 @@ fn generate_merge_statement(table_name: &str, schema: &Schema) -> String {
     }
 
     let opkind_idx = parameter_index.next().unwrap();
+
     let opid_select = format!(
-        r#"COALESCE(S."{TXN_ID_COL}" > D."{TXN_ID_COL}" OR (S."{TXN_ID_COL}" = D."{TXN_ID_COL}" AND S."{TXN_SEQ_COL}" > D."{TXN_SEQ_COL}"), TRUE) = TRUE"#
+        r#"(D."{TXN_ID_COL}" IS NULL
+        OR S."{TXN_ID_COL}" > D."{TXN_ID_COL}"
+        OR (S."{TXN_ID_COL}" = D."{TXN_ID_COL}" AND S."{TXN_SEQ_COL}" > D."{TXN_SEQ_COL}"))"#
     );
+
     // Match on PK and txn_id.
     // If the record does not exist and the op is INSERT, do the INSERT
     // If the record exists, but the txid is higher than the operation's txid,
@@ -513,8 +517,8 @@ impl OracleSink {
             .conn
             .batch(&self.merge_statement, self.batch_params.len())
             .build()?;
-        let mut bind_idx = 1..;
         for params in self.batch_params.drain(..) {
+            let mut bind_idx = 1..;
             for ((field, typ), i) in params
                 .params
                 .values
@@ -709,8 +713,12 @@ mod tests {
                 ON (D."id" = S."id" AND D."name" = S."name")
                 WHEN NOT MATCHED THEN INSERT (D."id", D."name", D."content", D."__txn_id", D."__txn_seq") VALUES (S."id", S."name", S."content", S."__txn_id", S."__txn_seq") WHERE S.DOZER_OPKIND = 0
                 WHEN MATCHED THEN UPDATE SET D."content" = S."content", D."__txn_id" = S."__txn_id", D."__txn_seq" = S."__txn_seq"
-                WHERE S.DOZER_OPKIND = 1 AND COALESCE(S."__txn_id" > D."__txn_id" OR (S."__txn_id" = D."__txn_id" AND S."__txn_seq" > D."__txn_seq"), TRUE) = TRUE
-                DELETE WHERE S.DOZER_OPKIND = 2 AND COALESCE(S."__txn_id" > D."__txn_id" OR (S."__txn_id" = D."__txn_id" AND S."__txn_seq" > D."__txn_seq"), TRUE) = TRUE
+                WHERE S.DOZER_OPKIND = 1 AND (D."__txn_id" IS NULL
+                    OR S."__txn_id" > D."__txn_id"
+                    OR (S."__txn_id" = D."__txn_id" AND S."__txn_seq" > D."__txn_seq"))
+                DELETE WHERE S.DOZER_OPKIND = 2 AND (D."__txn_id" IS NULL
+                    OR S."__txn_id" > D."__txn_id"
+                    OR (S."__txn_id" = D."__txn_id" AND S."__txn_seq" > D."__txn_seq"))
 "#
             )
         )