Skip to content

Commit

Permalink
fix: Use user provided type in Oracle source
Browse files Browse the repository at this point in the history
We don't want to require CREATE TYPE permission.
  • Loading branch information
chubei committed Apr 3, 2024
1 parent 7660863 commit 875a9a1
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 45 deletions.
19 changes: 14 additions & 5 deletions dozer-ingestion/oracle/src/connector/listing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@ pub struct TableColumn {
}

impl TableColumn {
pub fn list(connection: &Connection, schemas: &[String]) -> Result<Vec<TableColumn>, Error> {
pub fn list(
connection: &Connection,
string_collection_type_name: &str,
schemas: &[String],
) -> Result<Vec<TableColumn>, Error> {
assert!(!schemas.is_empty());
let sql = "
SELECT OWNER, TABLE_NAME, COLUMN_NAME, DATA_TYPE, NULLABLE, DATA_PRECISION, DATA_SCALE
FROM ALL_TAB_COLUMNS
WHERE OWNER IN (SELECT COLUMN_VALUE FROM TABLE(:2))
";
let schemas = super::string_collection(connection, schemas)?;
let schemas = super::string_collection(connection, string_collection_type_name, schemas)?;
debug!("{}, {}", sql, schemas);
let rows = connection.query_as::<(
String,
Expand Down Expand Up @@ -63,6 +67,7 @@ pub struct ConstraintColumn {
impl ConstraintColumn {
pub fn list(
connection: &Connection,
string_collection_type_name: &str,
schemas: &[String],
) -> Result<Vec<ConstraintColumn>, Error> {
assert!(!schemas.is_empty());
Expand All @@ -75,7 +80,7 @@ impl ConstraintColumn {
FROM ALL_CONS_COLUMNS
WHERE OWNER IN (SELECT COLUMN_VALUE FROM TABLE(:2))
";
let schemas = super::string_collection(connection, schemas)?;
let schemas = super::string_collection(connection, string_collection_type_name, schemas)?;
debug!("{}, {}", sql, schemas);
let rows =
connection.query_as::<(String, String, String, Option<String>)>(sql, &[&schemas])?;
Expand All @@ -102,7 +107,11 @@ pub struct Constraint {
}

impl Constraint {
pub fn list(connection: &Connection, schemas: &[String]) -> Result<Vec<Constraint>, Error> {
pub fn list(
connection: &Connection,
string_collection_type_name: &str,
schemas: &[String],
) -> Result<Vec<Constraint>, Error> {
assert!(!schemas.is_empty());
let sql = "
SELECT
Expand All @@ -114,7 +123,7 @@ impl Constraint {
AND
CONSTRAINT_TYPE = 'P'
";
let schemas = super::string_collection(connection, schemas)?;
let schemas = super::string_collection(connection, string_collection_type_name, schemas)?;
debug!("{}, {}", sql, schemas);
let rows = connection.query_as::<(Option<String>, Option<String>)>(sql, &[&schemas])?;

Expand Down
79 changes: 40 additions & 39 deletions dozer-ingestion/oracle/src/connector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@ use dozer_ingestion_connector::{
},
Ingestor, SourceSchema, TableIdentifier, TableInfo,
};
use oracle::{
sql_type::{Collection, ObjectType},
Connection,
};
use oracle::{sql_type::Collection, Connection};

#[derive(Debug, Clone)]
pub struct Connector {
connection_name: String,
connection: Arc<Connection>,
username: String,
string_collection_type_name: String,
batch_size: usize,
replicator: OracleReplicator,
}
Expand Down Expand Up @@ -101,6 +99,7 @@ impl Connector {
username: String,
password: &str,
connect_string: &str,
string_collection_type_name: String,
batch_size: usize,
replicator: OracleReplicator,
) -> Result<Self, Error> {
Expand All @@ -110,6 +109,7 @@ impl Connector {
connection_name,
connection: Arc::new(connection),
username,
string_collection_type_name,
batch_size,
replicator,
})
Expand All @@ -136,7 +136,8 @@ impl Connector {
FROM ALL_TABLES
WHERE OWNER IN (SELECT COLUMN_VALUE FROM TABLE(:2))
";
let owners = string_collection(&self.connection, schemas)?;
let owners =
string_collection(&self.connection, &self.string_collection_type_name, schemas)?;
debug!("{}, {}", sql, owners);
self.connection
.query_as::<(String, String)>(sql, &[&owners])?
Expand Down Expand Up @@ -166,8 +167,11 @@ impl Connector {
.unwrap_or_else(|| self.username.clone())
})
.collect::<HashSet<_>>();
let table_columns =
listing::TableColumn::list(&self.connection, &schemas.into_iter().collect::<Vec<_>>())?;
let table_columns = listing::TableColumn::list(
&self.connection,
&self.string_collection_type_name,
&schemas.into_iter().collect::<Vec<_>>(),
)?;
let mut table_to_columns = HashMap::<(String, String), Vec<String>>::new();
for table_column in table_columns {
let table_pair = (table_column.owner, table_column.table_name);
Expand Down Expand Up @@ -214,10 +218,21 @@ impl Connector {
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>();
let table_columns = listing::TableColumn::list(&self.connection, &schemas)?;
let constraint_columns =
listing::ConstraintColumn::list(&self.connection, &schemas).unwrap();
let constraints = listing::Constraint::list(&self.connection, &schemas).unwrap();
let table_columns = listing::TableColumn::list(
&self.connection,
&self.string_collection_type_name,
&schemas,
)?;
let constraint_columns = listing::ConstraintColumn::list(
&self.connection,
&self.string_collection_type_name,
&schemas,
)?;
let constraints = listing::Constraint::list(
&self.connection,
&self.string_collection_type_name,
&schemas,
)?;
let table_columns =
join::join_columns_constraints(table_columns, constraint_columns, constraints);

Expand Down Expand Up @@ -415,31 +430,13 @@ mod listing;
mod mapping;
mod replicate;

const TEMP_DOZER_TYPE_NAME: &str = "TEMP_DOZER_TYPE";

fn temp_varray_of_vchar2(
fn string_collection(
connection: &Connection,
num_strings: usize,
max_num_chars: usize,
) -> Result<ObjectType, Error> {
let sql = format!(
"CREATE OR REPLACE TYPE {} AS VARRAY({}) OF VARCHAR2({})",
TEMP_DOZER_TYPE_NAME, num_strings, max_num_chars
);
debug!("{}", sql);
connection.execute(&sql, &[])?;
connection
.object_type(TEMP_DOZER_TYPE_NAME)
.map_err(Into::into)
}

fn string_collection(connection: &Connection, strings: &[String]) -> Result<Collection, Error> {
let temp_type = temp_varray_of_vchar2(
connection,
strings.len(),
strings.iter().map(|s| s.len()).max().unwrap(),
)?;
let mut collection = temp_type.new_collection()?;
string_collection_type_name: &str,
strings: &[String],
) -> Result<Collection, Error> {
let object_type = connection.object_type(string_collection_type_name)?;
let mut collection = object_type.new_collection()?;
for string in strings {
collection.push(&str_to_sql!(*string))?;
}
Expand Down Expand Up @@ -497,16 +494,17 @@ mod tests {

env_logger::init();

let replicate_user = "DOZER";
let data_user = "DOZER";
let host = "database-1.cxtwfj9nkwtu.ap-southeast-1.rds.amazonaws.com";
let sid = "ORCL";
let replicate_user = "C##DOZER";
let data_user = "CHUBEI";
let host = "localhost";
let sid = "ORCLPDB1";

let mut connector = super::Connector::new(
"oracle".into(),
replicate_user.into(),
"123",
&format!("{}:{}/{}", host, 1521, sid),
"SYS.DOZER_VARRAY_OF_VARCHAR2".to_string(),
100_000,
OracleReplicator::DozerLogReader,
)
Expand All @@ -525,11 +523,14 @@ mod tests {
estimate_throughput(iterator);
let checkpoint = handle.join().unwrap().unwrap();

let sid = "ORCLCDB";

let mut connector = super::Connector::new(
"oracle".into(),
replicate_user.into(),
"123",
&format!("{}:{}/{}", host, 1521, sid),
"SYS.DOZER_VARRAY_OF_VARCHAR2".to_string(),
1,
OracleReplicator::LogMiner {
poll_interval_in_milliseconds: 1000,
Expand Down
4 changes: 3 additions & 1 deletion dozer-ingestion/oracle/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ impl OracleConnector {
config.user.clone(),
&config.password,
&root_connect_string,
config.string_collection_type_name.clone(),
batch_size,
config.replicator,
)?;
Expand All @@ -61,9 +62,10 @@ impl OracleConnector {
let pdb_connect_string = format!("{}:{}/{}", config.host, config.port, pdb);
let pdb_connector = connector::Connector::new(
connection_name,
config.user.clone(),
config.user,
&config.password,
&pdb_connect_string,
config.string_collection_type_name,
batch_size,
config.replicator,
)?;
Expand Down
2 changes: 2 additions & 0 deletions dozer-types/src/models/ingestion_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,8 @@ pub struct OracleConfig {
/// The schemas to consider when listing tables. If empty, will list all schemas, which can be slow.
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub schemas: Vec<String>,
/// A `VARRAY OF VARCHAR2` type that can be used by dozer.
pub string_collection_type_name: String,
/// Batch size during snapshotting
pub batch_size: Option<usize>,
pub replicator: OracleReplicator,
Expand Down
5 changes: 5 additions & 0 deletions json_schemas/dozer.json
Original file line number Diff line number Diff line change
Expand Up @@ -1379,6 +1379,7 @@
"port",
"replicator",
"sid",
"string_collection_type_name",
"user"
],
"properties": {
Expand Down Expand Up @@ -1422,6 +1423,10 @@
"sid": {
"type": "string"
},
"string_collection_type_name": {
"description": "A `VARRAY OF VARCHAR2` type that can be used by dozer.",
"type": "string"
},
"user": {
"type": "string"
}
Expand Down

0 comments on commit 875a9a1

Please sign in to comment.