Skip to content

Commit

Permalink
feat: Support integer on oracle ingestion/sink (#2419)
Browse files Browse the repository at this point in the history
  • Loading branch information
karolisg authored Feb 26, 2024
1 parent 7fb445f commit afb1b62
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 16 deletions.
4 changes: 4 additions & 0 deletions dozer-ingestion/oracle/src/connector/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ pub struct Column {
pub data_type: Option<String>,
pub nullable: Option<String>,
pub is_primary_key: bool,
pub precision: Option<i64>,
pub scale: Option<i64>,
}

pub fn join_columns_constraints(
Expand Down Expand Up @@ -47,6 +49,8 @@ pub fn join_columns_constraints(
data_type: table_column.data_type,
nullable: table_column.nullable,
is_primary_key,
precision: table_column.precision,
scale: table_column.scale,
};
let table_pair = (column_triple.0, column_triple.1);
table_to_columns.entry(table_pair).or_default().push(column);
Expand Down
22 changes: 15 additions & 7 deletions dozer-ingestion/oracle/src/connector/listing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,40 @@ pub struct TableColumn {
pub column_name: String,
pub data_type: Option<String>,
pub nullable: Option<String>,
pub precision: Option<i64>,
pub scale: Option<i64>,
}

impl TableColumn {
pub fn list(connection: &Connection, schemas: &[String]) -> Result<Vec<TableColumn>, Error> {
assert!(!schemas.is_empty());
let sql = "
SELECT OWNER, TABLE_NAME, COLUMN_NAME, DATA_TYPE, NULLABLE
SELECT OWNER, TABLE_NAME, COLUMN_NAME, DATA_TYPE, NULLABLE, DATA_PRECISION, DATA_SCALE
FROM ALL_TAB_COLUMNS
WHERE OWNER IN (SELECT COLUMN_VALUE FROM TABLE(:2))
";
let schemas = super::string_collection(connection, schemas)?;
let rows = connection
.query_as::<(String, String, String, Option<String>, Option<String>)>(
sql,
&[&schemas],
)?;
let rows = connection.query_as::<(
String,
String,
String,
Option<String>,
Option<String>,
Option<i64>,
Option<i64>,
)>(sql, &[&schemas])?;

let mut columns = Vec::new();
for row in rows {
let (owner, table_name, column_name, data_type, nullable) = row?;
let (owner, table_name, column_name, data_type, nullable, precision, scale) = row?;
let column = TableColumn {
owner,
table_name,
column_name,
data_type,
nullable,
precision,
scale,
};
columns.push(column);
}
Expand Down
19 changes: 18 additions & 1 deletion dozer-ingestion/oracle/src/connector/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ fn map_data_type(
column_name: &str,
data_type: Option<&str>,
nullable: Option<&str>,
precision: Option<i64>,
scale: Option<i64>,
) -> Result<MappedColumn, DataTypeError> {
let data_type = data_type.ok_or_else(|| DataTypeError::ColumnDataTypeIsNull {
schema: schema.to_string(),
Expand All @@ -51,7 +53,12 @@ fn map_data_type(
match data_type {
"VARCHAR2" => Ok(FieldType::String),
"NVARCHAR2" => unimplemented!("convert NVARCHAR2 to String"),
"NUMBER" => Ok(FieldType::Decimal),
"INTEGER" => Ok(FieldType::I128),
"NUMBER" => match (precision, scale) {
(Some(precision), Some(0)) if precision <= 19 => Ok(FieldType::Int),
(_, Some(0)) => Ok(FieldType::I128),
_ => Ok(FieldType::Decimal),
},
"FLOAT" => Ok(FieldType::Float),
"DATE" => Ok(FieldType::Date),
"BINARY_FLOAT" => Ok(FieldType::Float),
Expand Down Expand Up @@ -89,6 +96,14 @@ pub fn map_row(schema: &Schema, row: Row) -> Result<Record, Error> {

fn map_field(index: usize, field: &FieldDefinition, row: &Row) -> Result<Field, Error> {
Ok(match (field.typ, field.nullable) {
(FieldType::Int, true) => row
.get::<_, Option<i64>>(index)?
.map_or(Field::Null, Field::Int),
(FieldType::Int, false) => Field::Int(row.get(index)?),
(FieldType::UInt, true) => row
.get::<_, Option<u64>>(index)?
.map_or(Field::Null, Field::UInt),
(FieldType::UInt, false) => Field::UInt(row.get(index)?),
(FieldType::Float, true) => row
.get::<_, Option<f64>>(index)?
.map_or(Field::Null, |value| Field::Float(OrderedFloat(value))),
Expand Down Expand Up @@ -153,6 +168,8 @@ fn map_columns(schema: &str, table_name: &str, columns: Vec<Column>) -> ColumnMa
&column.name,
column.data_type.as_deref(),
column.nullable.as_deref(),
column.precision,
column.scale,
);
(
column.name,
Expand Down
4 changes: 4 additions & 0 deletions dozer-ingestion/oracle/src/connector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ pub enum Error {
ParseDateTime(#[source] chrono::ParseError, String),
#[error("got overflow float number {0}")]
FloatOverflow(Decimal),
#[error("got error when parsing uint {0}")]
ParseUIntFailed(Decimal),
#[error("got error when parsing int {0}")]
ParseIntFailed(Decimal),
#[error("type mismatch for {field}, expected {expected:?}, actual {actual:?}")]
TypeMismatch {
field: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ fn map_value(
))),
(ParsedValue::String(string), FieldType::Decimal, _) => Ok(Field::Decimal(string.parse()?)),
(ParsedValue::Number(number), FieldType::Decimal, _) => Ok(Field::Decimal(number)),
(ParsedValue::Number(number), FieldType::Int, _) => Ok(Field::Int(
number
.to_i64()
.ok_or_else(|| Error::ParseIntFailed(number))?,
)),
(ParsedValue::Number(number), FieldType::UInt, _) => Ok(Field::UInt(
number
.to_u64()
.ok_or_else(|| Error::ParseUIntFailed(number))?,
)),
(ParsedValue::String(string), FieldType::String, _) => Ok(Field::String(string)),
(ParsedValue::Number(_), FieldType::String, _) => Err(Error::TypeMismatch {
field: name.to_string(),
Expand Down
24 changes: 16 additions & 8 deletions dozer-sink-oracle/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,10 @@ impl OracleSinkFactory {
for field in &schema.fields {
let name = &field.name;
let col_type = match field.typ {
dozer_types::types::FieldType::UInt => "NUMBER",
dozer_types::types::FieldType::U128 => unimplemented!(),
dozer_types::types::FieldType::Int => "NUMBER",
dozer_types::types::FieldType::I128 => unimplemented!(),
dozer_types::types::FieldType::UInt => "INTEGER",
dozer_types::types::FieldType::U128 => "INTEGER",
dozer_types::types::FieldType::Int => "INTEGER",
dozer_types::types::FieldType::I128 => "INTEGER",
// Should this be BINARY_DOUBLE?
dozer_types::types::FieldType::Float => "NUMBER",
dozer_types::types::FieldType::Boolean => "NUMBER",
Expand Down Expand Up @@ -325,9 +325,13 @@ fn generate_merge_statement(table_name: &str, schema: &Schema) -> String {
}

let opkind_idx = parameter_index.next().unwrap();

let opid_select = format!(
r#"COALESCE(S."{TXN_ID_COL}" > D."{TXN_ID_COL}" OR (S."{TXN_ID_COL}" = D."{TXN_ID_COL}" AND S."{TXN_SEQ_COL}" > D."{TXN_SEQ_COL}"), TRUE) = TRUE"#
r#"(D."{TXN_ID_COL}" IS NULL
OR S."{TXN_ID_COL}" > D."{TXN_ID_COL}"
OR (S."{TXN_ID_COL}" = D."{TXN_ID_COL}" AND S."{TXN_SEQ_COL}" > D."{TXN_SEQ_COL}"))"#
);

// Match on PK and txn_id.
// If the record does not exist and the op is INSERT, do the INSERT
// If the record exists, but the txid is higher than the operation's txid,
Expand Down Expand Up @@ -513,8 +517,8 @@ impl OracleSink {
.conn
.batch(&self.merge_statement, self.batch_params.len())
.build()?;
let mut bind_idx = 1..;
for params in self.batch_params.drain(..) {
let mut bind_idx = 1..;
for ((field, typ), i) in params
.params
.values
Expand Down Expand Up @@ -709,8 +713,12 @@ mod tests {
ON (D."id" = S."id" AND D."name" = S."name")
WHEN NOT MATCHED THEN INSERT (D."id", D."name", D."content", D."__txn_id", D."__txn_seq") VALUES (S."id", S."name", S."content", S."__txn_id", S."__txn_seq") WHERE S.DOZER_OPKIND = 0
WHEN MATCHED THEN UPDATE SET D."content" = S."content", D."__txn_id" = S."__txn_id", D."__txn_seq" = S."__txn_seq"
WHERE S.DOZER_OPKIND = 1 AND COALESCE(S."__txn_id" > D."__txn_id" OR (S."__txn_id" = D."__txn_id" AND S."__txn_seq" > D."__txn_seq"), TRUE) = TRUE
DELETE WHERE S.DOZER_OPKIND = 2 AND COALESCE(S."__txn_id" > D."__txn_id" OR (S."__txn_id" = D."__txn_id" AND S."__txn_seq" > D."__txn_seq"), TRUE) = TRUE
WHERE S.DOZER_OPKIND = 1 AND (D."__txn_id" IS NULL
OR S."__txn_id" > D."__txn_id"
OR (S."__txn_id" = D."__txn_id" AND S."__txn_seq" > D."__txn_seq"))
DELETE WHERE S.DOZER_OPKIND = 2 AND (D."__txn_id" IS NULL
OR S."__txn_id" > D."__txn_id"
OR (S."__txn_id" = D."__txn_id" AND S."__txn_seq" > D."__txn_seq"))
"#
)
)
Expand Down

0 comments on commit afb1b62

Please sign in to comment.