Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Use user provided type in Oracle source #2473

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions dozer-ingestion/oracle/src/connector/listing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@ pub struct TableColumn {
}

impl TableColumn {
pub fn list(connection: &Connection, schemas: &[String]) -> Result<Vec<TableColumn>, Error> {
pub fn list(
connection: &Connection,
string_collection_type_name: &str,
schemas: &[String],
) -> Result<Vec<TableColumn>, Error> {
assert!(!schemas.is_empty());
let sql = "
SELECT OWNER, TABLE_NAME, COLUMN_NAME, DATA_TYPE, NULLABLE, DATA_PRECISION, DATA_SCALE
FROM ALL_TAB_COLUMNS
WHERE OWNER IN (SELECT COLUMN_VALUE FROM TABLE(:2))
";
let schemas = super::string_collection(connection, schemas)?;
let schemas = super::string_collection(connection, string_collection_type_name, schemas)?;
debug!("{}, {}", sql, schemas);
let rows = connection.query_as::<(
String,
Expand Down Expand Up @@ -63,6 +67,7 @@ pub struct ConstraintColumn {
impl ConstraintColumn {
pub fn list(
connection: &Connection,
string_collection_type_name: &str,
schemas: &[String],
) -> Result<Vec<ConstraintColumn>, Error> {
assert!(!schemas.is_empty());
Expand All @@ -75,7 +80,7 @@ impl ConstraintColumn {
FROM ALL_CONS_COLUMNS
WHERE OWNER IN (SELECT COLUMN_VALUE FROM TABLE(:2))
";
let schemas = super::string_collection(connection, schemas)?;
let schemas = super::string_collection(connection, string_collection_type_name, schemas)?;
debug!("{}, {}", sql, schemas);
let rows =
connection.query_as::<(String, String, String, Option<String>)>(sql, &[&schemas])?;
Expand All @@ -102,7 +107,11 @@ pub struct Constraint {
}

impl Constraint {
pub fn list(connection: &Connection, schemas: &[String]) -> Result<Vec<Constraint>, Error> {
pub fn list(
connection: &Connection,
string_collection_type_name: &str,
schemas: &[String],
) -> Result<Vec<Constraint>, Error> {
assert!(!schemas.is_empty());
let sql = "
SELECT
Expand All @@ -114,7 +123,7 @@ impl Constraint {
AND
CONSTRAINT_TYPE = 'P'
";
let schemas = super::string_collection(connection, schemas)?;
let schemas = super::string_collection(connection, string_collection_type_name, schemas)?;
debug!("{}, {}", sql, schemas);
let rows = connection.query_as::<(Option<String>, Option<String>)>(sql, &[&schemas])?;

Expand Down
79 changes: 40 additions & 39 deletions dozer-ingestion/oracle/src/connector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@ use dozer_ingestion_connector::{
},
Ingestor, SourceSchema, TableIdentifier, TableInfo,
};
use oracle::{
sql_type::{Collection, ObjectType},
Connection,
};
use oracle::{sql_type::Collection, Connection};

#[derive(Debug, Clone)]
pub struct Connector {
connection_name: String,
connection: Arc<Connection>,
username: String,
string_collection_type_name: String,
batch_size: usize,
replicator: OracleReplicator,
}
Expand Down Expand Up @@ -101,6 +99,7 @@ impl Connector {
username: String,
password: &str,
connect_string: &str,
string_collection_type_name: String,
batch_size: usize,
replicator: OracleReplicator,
) -> Result<Self, Error> {
Expand All @@ -110,6 +109,7 @@ impl Connector {
connection_name,
connection: Arc::new(connection),
username,
string_collection_type_name,
batch_size,
replicator,
})
Expand All @@ -136,7 +136,8 @@ impl Connector {
FROM ALL_TABLES
WHERE OWNER IN (SELECT COLUMN_VALUE FROM TABLE(:2))
";
let owners = string_collection(&self.connection, schemas)?;
let owners =
string_collection(&self.connection, &self.string_collection_type_name, schemas)?;
debug!("{}, {}", sql, owners);
self.connection
.query_as::<(String, String)>(sql, &[&owners])?
Expand Down Expand Up @@ -166,8 +167,11 @@ impl Connector {
.unwrap_or_else(|| self.username.clone())
})
.collect::<HashSet<_>>();
let table_columns =
listing::TableColumn::list(&self.connection, &schemas.into_iter().collect::<Vec<_>>())?;
let table_columns = listing::TableColumn::list(
&self.connection,
&self.string_collection_type_name,
&schemas.into_iter().collect::<Vec<_>>(),
)?;
let mut table_to_columns = HashMap::<(String, String), Vec<String>>::new();
for table_column in table_columns {
let table_pair = (table_column.owner, table_column.table_name);
Expand Down Expand Up @@ -214,10 +218,21 @@ impl Connector {
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>();
let table_columns = listing::TableColumn::list(&self.connection, &schemas)?;
let constraint_columns =
listing::ConstraintColumn::list(&self.connection, &schemas).unwrap();
let constraints = listing::Constraint::list(&self.connection, &schemas).unwrap();
let table_columns = listing::TableColumn::list(
&self.connection,
&self.string_collection_type_name,
&schemas,
)?;
let constraint_columns = listing::ConstraintColumn::list(
&self.connection,
&self.string_collection_type_name,
&schemas,
)?;
let constraints = listing::Constraint::list(
&self.connection,
&self.string_collection_type_name,
&schemas,
)?;
let table_columns =
join::join_columns_constraints(table_columns, constraint_columns, constraints);

Expand Down Expand Up @@ -415,31 +430,13 @@ mod listing;
mod mapping;
mod replicate;

const TEMP_DOZER_TYPE_NAME: &str = "TEMP_DOZER_TYPE";

fn temp_varray_of_vchar2(
fn string_collection(
connection: &Connection,
num_strings: usize,
max_num_chars: usize,
) -> Result<ObjectType, Error> {
let sql = format!(
"CREATE OR REPLACE TYPE {} AS VARRAY({}) OF VARCHAR2({})",
TEMP_DOZER_TYPE_NAME, num_strings, max_num_chars
);
debug!("{}", sql);
connection.execute(&sql, &[])?;
connection
.object_type(TEMP_DOZER_TYPE_NAME)
.map_err(Into::into)
}

fn string_collection(connection: &Connection, strings: &[String]) -> Result<Collection, Error> {
let temp_type = temp_varray_of_vchar2(
connection,
strings.len(),
strings.iter().map(|s| s.len()).max().unwrap(),
)?;
let mut collection = temp_type.new_collection()?;
string_collection_type_name: &str,
strings: &[String],
) -> Result<Collection, Error> {
let object_type = connection.object_type(string_collection_type_name)?;
let mut collection = object_type.new_collection()?;
for string in strings {
collection.push(&str_to_sql!(*string))?;
}
Expand Down Expand Up @@ -497,16 +494,17 @@ mod tests {

env_logger::init();

let replicate_user = "DOZER";
let data_user = "DOZER";
let host = "database-1.cxtwfj9nkwtu.ap-southeast-1.rds.amazonaws.com";
let sid = "ORCL";
let replicate_user = "C##DOZER";
let data_user = "CHUBEI";
let host = "localhost";
let sid = "ORCLPDB1";

let mut connector = super::Connector::new(
"oracle".into(),
replicate_user.into(),
"123",
&format!("{}:{}/{}", host, 1521, sid),
"SYS.DOZER_VARRAY_OF_VARCHAR2".to_string(),
100_000,
OracleReplicator::DozerLogReader,
)
Expand All @@ -525,11 +523,14 @@ mod tests {
estimate_throughput(iterator);
let checkpoint = handle.join().unwrap().unwrap();

let sid = "ORCLCDB";

let mut connector = super::Connector::new(
"oracle".into(),
replicate_user.into(),
"123",
&format!("{}:{}/{}", host, 1521, sid),
"SYS.DOZER_VARRAY_OF_VARCHAR2".to_string(),
1,
OracleReplicator::LogMiner {
poll_interval_in_milliseconds: 1000,
Expand Down
4 changes: 3 additions & 1 deletion dozer-ingestion/oracle/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ impl OracleConnector {
config.user.clone(),
&config.password,
&root_connect_string,
config.string_collection_type_name.clone(),
batch_size,
config.replicator,
)?;
Expand All @@ -61,9 +62,10 @@ impl OracleConnector {
let pdb_connect_string = format!("{}:{}/{}", config.host, config.port, pdb);
let pdb_connector = connector::Connector::new(
connection_name,
config.user.clone(),
config.user,
&config.password,
&pdb_connect_string,
config.string_collection_type_name,
batch_size,
config.replicator,
)?;
Expand Down
1 change: 1 addition & 0 deletions dozer-sink-oracle/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -920,6 +920,7 @@ mod tests {
sid: "ORCLCDB".into(),
pdb: Some("ORCLPDB1".into()),
schemas: vec![],
string_collection_type_name: "DOZER_VAARY_OF_VARCHAR2".to_string(),
batch_size: None,
replicator: dozer_types::models::ingestion_types::OracleReplicator::LogMiner {
poll_interval_in_milliseconds: 0,
Expand Down
2 changes: 2 additions & 0 deletions dozer-types/src/models/ingestion_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,8 @@ pub struct OracleConfig {
/// The schemas to consider when listing tables. If empty, will list all schemas, which can be slow.
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub schemas: Vec<String>,
/// A `VARRAY OF VARCHAR2` type that can be used by dozer.
pub string_collection_type_name: String,
/// Batch size during snapshotting
pub batch_size: Option<usize>,
pub replicator: OracleReplicator,
Expand Down
5 changes: 5 additions & 0 deletions json_schemas/dozer.json
Original file line number Diff line number Diff line change
Expand Up @@ -1379,6 +1379,7 @@
"port",
"replicator",
"sid",
"string_collection_type_name",
"user"
],
"properties": {
Expand Down Expand Up @@ -1422,6 +1423,10 @@
"sid": {
"type": "string"
},
"string_collection_type_name": {
"description": "A `VARRAY OF VARCHAR2` type that can be used by dozer.",
"type": "string"
},
"user": {
"type": "string"
}
Expand Down
Loading