From 75b7372831d23bd444455d7524f2fd27aa977be1 Mon Sep 17 00:00:00 2001 From: chubei <914745487@qq.com> Date: Wed, 3 Apr 2024 23:36:04 +0800 Subject: [PATCH] fix: Use user provided type in Oracle source We don't want to require CREATE TYPE permission. --- .../oracle/src/connector/listing.rs | 19 +++-- dozer-ingestion/oracle/src/connector/mod.rs | 79 ++++++++++--------- dozer-ingestion/oracle/src/lib.rs | 4 +- dozer-sink-oracle/src/lib.rs | 1 + dozer-types/src/models/ingestion_types.rs | 2 + json_schemas/dozer.json | 5 ++ 6 files changed, 65 insertions(+), 45 deletions(-) diff --git a/dozer-ingestion/oracle/src/connector/listing.rs b/dozer-ingestion/oracle/src/connector/listing.rs index 33abd6f2bc..a5a4c9debb 100644 --- a/dozer-ingestion/oracle/src/connector/listing.rs +++ b/dozer-ingestion/oracle/src/connector/listing.rs @@ -15,14 +15,18 @@ pub struct TableColumn { } impl TableColumn { - pub fn list(connection: &Connection, schemas: &[String]) -> Result, Error> { + pub fn list( + connection: &Connection, + string_collection_type_name: &str, + schemas: &[String], + ) -> Result, Error> { assert!(!schemas.is_empty()); let sql = " SELECT OWNER, TABLE_NAME, COLUMN_NAME, DATA_TYPE, NULLABLE, DATA_PRECISION, DATA_SCALE FROM ALL_TAB_COLUMNS WHERE OWNER IN (SELECT COLUMN_VALUE FROM TABLE(:2)) "; - let schemas = super::string_collection(connection, schemas)?; + let schemas = super::string_collection(connection, string_collection_type_name, schemas)?; debug!("{}, {}", sql, schemas); let rows = connection.query_as::<( String, @@ -63,6 +67,7 @@ pub struct ConstraintColumn { impl ConstraintColumn { pub fn list( connection: &Connection, + string_collection_type_name: &str, schemas: &[String], ) -> Result, Error> { assert!(!schemas.is_empty()); @@ -75,7 +80,7 @@ impl ConstraintColumn { FROM ALL_CONS_COLUMNS WHERE OWNER IN (SELECT COLUMN_VALUE FROM TABLE(:2)) "; - let schemas = super::string_collection(connection, schemas)?; + let schemas = super::string_collection(connection, string_collection_type_name, schemas)?; debug!("{}, {}", sql, schemas); let rows = connection.query_as::<(String, String, String, Option)>(sql, &[&schemas])?; @@ -102,7 +107,11 @@ pub struct Constraint { } impl Constraint { - pub fn list(connection: &Connection, schemas: &[String]) -> Result, Error> { + pub fn list( + connection: &Connection, + string_collection_type_name: &str, + schemas: &[String], + ) -> Result, Error> { assert!(!schemas.is_empty()); let sql = " SELECT @@ -114,7 +123,7 @@ impl Constraint { AND CONSTRAINT_TYPE = 'P' "; - let schemas = super::string_collection(connection, schemas)?; + let schemas = super::string_collection(connection, string_collection_type_name, schemas)?; debug!("{}, {}", sql, schemas); let rows = connection.query_as::<(Option, Option)>(sql, &[&schemas])?; diff --git a/dozer-ingestion/oracle/src/connector/mod.rs b/dozer-ingestion/oracle/src/connector/mod.rs index 70ea0b617f..3da590551f 100644 --- a/dozer-ingestion/oracle/src/connector/mod.rs +++ b/dozer-ingestion/oracle/src/connector/mod.rs @@ -18,16 +18,14 @@ use dozer_ingestion_connector::{ }, Ingestor, SourceSchema, TableIdentifier, TableInfo, }; -use oracle::{ - sql_type::{Collection, ObjectType}, - Connection, -}; +use oracle::{sql_type::Collection, Connection}; #[derive(Debug, Clone)] pub struct Connector { connection_name: String, connection: Arc, username: String, + string_collection_type_name: String, batch_size: usize, replicator: OracleReplicator, } @@ -101,6 +99,7 @@ impl Connector { username: String, password: &str, connect_string: &str, + string_collection_type_name: String, batch_size: usize, replicator: OracleReplicator, ) -> Result { @@ -110,6 +109,7 @@ impl Connector { connection_name, connection: Arc::new(connection), username, + string_collection_type_name, batch_size, replicator, }) @@ -136,7 +136,8 @@ impl Connector { FROM ALL_TABLES WHERE OWNER IN (SELECT COLUMN_VALUE FROM TABLE(:2)) "; - let owners = string_collection(&self.connection, schemas)?; + let owners = + string_collection(&self.connection, &self.string_collection_type_name, schemas)?; debug!("{}, {}", sql, owners); self.connection .query_as::<(String, String)>(sql, &[&owners])? @@ -166,8 +167,11 @@ impl Connector { .unwrap_or_else(|| self.username.clone()) }) .collect::>(); - let table_columns = - listing::TableColumn::list(&self.connection, &schemas.into_iter().collect::>())?; + let table_columns = listing::TableColumn::list( + &self.connection, + &self.string_collection_type_name, + &schemas.into_iter().collect::>(), + )?; let mut table_to_columns = HashMap::<(String, String), Vec>::new(); for table_column in table_columns { let table_pair = (table_column.owner, table_column.table_name); @@ -214,10 +218,21 @@ impl Connector { .collect::>() .into_iter() .collect::>(); - let table_columns = listing::TableColumn::list(&self.connection, &schemas)?; - let constraint_columns = - listing::ConstraintColumn::list(&self.connection, &schemas).unwrap(); - let constraints = listing::Constraint::list(&self.connection, &schemas).unwrap(); + let table_columns = listing::TableColumn::list( + &self.connection, + &self.string_collection_type_name, + &schemas, + )?; + let constraint_columns = listing::ConstraintColumn::list( + &self.connection, + &self.string_collection_type_name, + &schemas, + )?; + let constraints = listing::Constraint::list( + &self.connection, + &self.string_collection_type_name, + &schemas, + )?; let table_columns = join::join_columns_constraints(table_columns, constraint_columns, constraints); @@ -415,31 +430,13 @@ mod listing; mod mapping; mod replicate; -const TEMP_DOZER_TYPE_NAME: &str = "TEMP_DOZER_TYPE"; - -fn temp_varray_of_vchar2( +fn string_collection( connection: &Connection, - num_strings: usize, - max_num_chars: usize, -) -> Result { - let sql = format!( - "CREATE OR REPLACE TYPE {} AS VARRAY({}) OF VARCHAR2({})", - TEMP_DOZER_TYPE_NAME, num_strings, max_num_chars - ); - debug!("{}", sql); - connection.execute(&sql, &[])?; - connection - .object_type(TEMP_DOZER_TYPE_NAME) - .map_err(Into::into) -} - -fn string_collection(connection: &Connection, strings: &[String]) -> Result { - let temp_type = temp_varray_of_vchar2( - connection, - strings.len(), - strings.iter().map(|s| s.len()).max().unwrap(), - )?; - let mut collection = temp_type.new_collection()?; + string_collection_type_name: &str, + strings: &[String], +) -> Result { + let object_type = connection.object_type(string_collection_type_name)?; + let mut collection = object_type.new_collection()?; for string in strings { collection.push(&str_to_sql!(*string))?; } @@ -497,16 +494,17 @@ mod tests { env_logger::init(); - let replicate_user = "DOZER"; - let data_user = "DOZER"; - let host = "database-1.cxtwfj9nkwtu.ap-southeast-1.rds.amazonaws.com"; - let sid = "ORCL"; + let replicate_user = "C##DOZER"; + let data_user = "CHUBEI"; + let host = "localhost"; + let sid = "ORCLPDB1"; let mut connector = super::Connector::new( "oracle".into(), replicate_user.into(), "123", &format!("{}:{}/{}", host, 1521, sid), + "SYS.DOZER_VARRAY_OF_VARCHAR2".to_string(), 100_000, OracleReplicator::DozerLogReader, ) @@ -525,11 +523,14 @@ mod tests { estimate_throughput(iterator); let checkpoint = handle.join().unwrap().unwrap(); + let sid = "ORCLCDB"; + let mut connector = super::Connector::new( "oracle".into(), replicate_user.into(), "123", &format!("{}:{}/{}", host, 1521, sid), + "SYS.DOZER_VARRAY_OF_VARCHAR2".to_string(), 1, OracleReplicator::LogMiner { poll_interval_in_milliseconds: 1000, diff --git a/dozer-ingestion/oracle/src/lib.rs b/dozer-ingestion/oracle/src/lib.rs index cf398cb097..e9100d651a 100644 --- a/dozer-ingestion/oracle/src/lib.rs +++ b/dozer-ingestion/oracle/src/lib.rs @@ -53,6 +53,7 @@ impl OracleConnector { config.user.clone(), &config.password, &root_connect_string, + config.string_collection_type_name.clone(), batch_size, config.replicator, )?; @@ -61,9 +62,10 @@ impl OracleConnector { let pdb_connect_string = format!("{}:{}/{}", config.host, config.port, pdb); let pdb_connector = connector::Connector::new( connection_name, - config.user.clone(), + config.user, &config.password, &pdb_connect_string, + config.string_collection_type_name, batch_size, config.replicator, )?; diff --git a/dozer-sink-oracle/src/lib.rs b/dozer-sink-oracle/src/lib.rs index 71e9594476..dff723d1c2 100644 --- a/dozer-sink-oracle/src/lib.rs +++ b/dozer-sink-oracle/src/lib.rs @@ -920,6 +920,7 @@ mod tests { sid: "ORCLCDB".into(), pdb: Some("ORCLPDB1".into()), schemas: vec![], + string_collection_type_name: "DOZER_VAARY_OF_VARCHAR2".to_string(), batch_size: None, replicator: dozer_types::models::ingestion_types::OracleReplicator::LogMiner { poll_interval_in_milliseconds: 0, diff --git a/dozer-types/src/models/ingestion_types.rs b/dozer-types/src/models/ingestion_types.rs index 24832547e3..f8e09279c2 100644 --- a/dozer-types/src/models/ingestion_types.rs +++ b/dozer-types/src/models/ingestion_types.rs @@ -656,6 +656,8 @@ pub struct OracleConfig { /// The schemas to consider when listing tables. If empty, will list all schemas, which can be slow. #[serde(default, skip_serializing_if = "Vec::is_empty")] pub schemas: Vec, + /// A `VARRAY OF VARCHAR2` type that can be used by dozer. + pub string_collection_type_name: String, /// Batch size during snapshotting pub batch_size: Option, pub replicator: OracleReplicator, diff --git a/json_schemas/dozer.json b/json_schemas/dozer.json index 3bac6f286e..18d33ea8a7 100644 --- a/json_schemas/dozer.json +++ b/json_schemas/dozer.json @@ -1379,6 +1379,7 @@ "port", "replicator", "sid", + "string_collection_type_name", "user" ], "properties": { @@ -1422,6 +1423,10 @@ "sid": { "type": "string" }, + "string_collection_type_name": { + "description": "A `VARRAY OF VARCHAR2` type that can be used by dozer.", + "type": "string" + }, "user": { "type": "string" }