Skip to content

Commit

Permalink
chore: init schema option at connection level
Browse files Browse the repository at this point in the history
  • Loading branch information
v3g42 committed Oct 9, 2023
1 parent 60c88ab commit 9323374
Show file tree
Hide file tree
Showing 13 changed files with 74 additions and 16 deletions.
1 change: 1 addition & 0 deletions dozer-cli/src/cli/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ pub fn generate_connection(connection_name: &str) -> Connection {
database: Some("users".to_owned()),
sslmode: None,
connection_url: None,
schema: None,
};
let connection: Connection = Connection {
name: "postgres".to_owned(),
Expand Down
5 changes: 3 additions & 2 deletions dozer-ingestion/src/connectors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,13 @@ impl TableToIngest {

pub fn get_connector(connection: Connection) -> Result<Box<dyn Connector>, ConnectorError> {
let config = connection.config;
match config {
ConnectionConfig::Postgres(_) => {
match config.clone() {
ConnectionConfig::Postgres(c) => {
let config = map_connection_config(&config)?;
let postgres_config = PostgresConfig {
name: connection.name,
config,
schema: c.schema,
};

if let Some(dbname) = postgres_config.config.get_dbname() {
Expand Down
6 changes: 5 additions & 1 deletion dozer-ingestion/src/connectors/postgres/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use super::connection::helper;
pub struct PostgresConfig {
pub name: String,
pub config: Config,
pub schema: Option<String>,
}

#[derive(Debug)]
Expand All @@ -32,6 +33,7 @@ pub struct PostgresConnector {
replication_conn_config: Config,
conn_config: Config,
schema_helper: SchemaHelper,
pub schema: Option<String>,
}

#[derive(Debug)]
Expand All @@ -47,7 +49,7 @@ impl PostgresConnector {
let mut replication_conn_config = config.config.clone();
replication_conn_config.replication_mode(ReplicationMode::Logical);

let helper = SchemaHelper::new(config.config.clone());
let helper = SchemaHelper::new(config.config.clone(), config.schema.clone());

// conn_str - replication_conn_config
// conn_str_plain- conn_config
Expand All @@ -57,6 +59,7 @@ impl PostgresConnector {
conn_config: config.config,
replication_conn_config,
schema_helper: helper,
schema: config.schema,
}
}

Expand Down Expand Up @@ -198,6 +201,7 @@ impl Connector for PostgresConnector {
self.replication_conn_config.clone(),
ingestor,
self.conn_config.clone(),
self.schema.clone(),
);
iterator.start(lsn).await
}
Expand Down
4 changes: 4 additions & 0 deletions dozer-ingestion/src/connectors/postgres/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub struct Details {
tables: Vec<PostgresTableInfo>,
replication_conn_config: tokio_postgres::Config,
conn_config: tokio_postgres::Config,
schema: Option<String>,
}

#[derive(Debug, Clone, Copy)]
Expand All @@ -51,6 +52,7 @@ impl<'a> PostgresIterator<'a> {
replication_conn_config: tokio_postgres::Config,
ingestor: &'a Ingestor,
conn_config: tokio_postgres::Config,
schema: Option<String>,
) -> Self {
let details = Arc::new(Details {
name,
Expand All @@ -59,6 +61,7 @@ impl<'a> PostgresIterator<'a> {
tables,
replication_conn_config,
conn_config,
schema,
});
PostgresIterator { details, ingestor }
}
Expand Down Expand Up @@ -160,6 +163,7 @@ impl<'a> PostgresIteratorHandler<'a> {
let snapshotter = PostgresSnapshotter {
conn_config: details.conn_config.to_owned(),
ingestor: self.ingestor,
schema: details.schema.clone(),
};
let tables = details
.tables
Expand Down
24 changes: 20 additions & 4 deletions dozer-ingestion/src/connectors/postgres/schema/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::HashMap;

use crate::connectors::{CdcType, ListOrFilterColumns, SourceSchema, SourceSchemaResult};
use crate::errors::{ConnectorError, PostgresConnectorError, PostgresSchemaError};
use dozer_types::models::connection::PostgresConfig;
use dozer_types::types::{FieldDefinition, FieldType, Schema, SourceDefinition};

use crate::connectors::postgres::connection::helper;
Expand All @@ -18,6 +19,8 @@ use PostgresSchemaError::TableTypeNotFound;
#[derive(Debug)]
pub struct SchemaHelper {
conn_config: tokio_postgres::Config,
// Postgres schema
schema: Option<String>,
}

struct PostgresTableRow {
Expand Down Expand Up @@ -84,8 +87,11 @@ pub struct PostgresTableInfo {
type RowsWithColumnsMap = (Vec<Row>, HashMap<SchemaTableIdentifier, Vec<String>>);

impl SchemaHelper {
pub fn new(conn_config: tokio_postgres::Config) -> SchemaHelper {
Self { conn_config }
pub fn new(conn_config: tokio_postgres::Config, schema: Option<String>) -> SchemaHelper {
Self {
conn_config,
schema,
}
}

pub async fn get_tables(
Expand Down Expand Up @@ -185,8 +191,18 @@ impl SchemaHelper {
);
client.query(&sql, &[&schemas, &table_names]).await
} else {
let sql = str::replace(SQL, ":tables_name_condition", "t.table_type = 'BASE TABLE'");
client.query(&sql, &[]).await
if let Some(schema) = &self.schema {
let sql = str::replace(
SQL,
":tables_name_condition",
"t.table_schema = $1 AND t.table_type = 'BASE TABLE'",
);
client.query(&sql, &[&schema]).await
} else {
let sql =
str::replace(SQL, ":tables_name_condition", "t.table_type = 'BASE TABLE'");
client.query(&sql, &[]).await
}
};

query
Expand Down
8 changes: 4 additions & 4 deletions dozer-ingestion/src/connectors/postgres/schema/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async fn test_connector_get_tables() {
client.create_schema(&schema).await;
client.create_simple_table(&schema, &table_name).await;

let schema_helper = SchemaHelper::new(client.postgres_config.clone());
let schema_helper = SchemaHelper::new(client.postgres_config.clone(), None);
let result = schema_helper.get_tables(None).await.unwrap();

let table = result.get(0).unwrap();
Expand Down Expand Up @@ -69,7 +69,7 @@ async fn test_connector_get_schema_with_selected_columns() {
client.create_schema(&schema).await;
client.create_simple_table(&schema, &table_name).await;

let schema_helper = SchemaHelper::new(client.postgres_config.clone());
let schema_helper = SchemaHelper::new(client.postgres_config.clone(), None);
let table_info = ListOrFilterColumns {
schema: Some(schema.clone()),
name: table_name.clone(),
Expand Down Expand Up @@ -104,7 +104,7 @@ async fn test_connector_get_schema_without_selected_columns() {
client.create_schema(&schema).await;
client.create_simple_table(&schema, &table_name).await;

let schema_helper = SchemaHelper::new(client.postgres_config.clone());
let schema_helper = SchemaHelper::new(client.postgres_config.clone(), None);
let table_info = ListOrFilterColumns {
name: table_name.clone(),
schema: Some(schema.clone()),
Expand Down Expand Up @@ -146,7 +146,7 @@ async fn test_connector_view_cannot_be_used() {
client.create_simple_table(&schema, &table_name).await;
client.create_view(&schema, &table_name, &view_name).await;

let schema_helper = SchemaHelper::new(client.postgres_config.clone());
let schema_helper = SchemaHelper::new(client.postgres_config.clone(), None);
let table_info = ListOrFilterColumns {
name: view_name,
schema: Some(schema.clone()),
Expand Down
6 changes: 5 additions & 1 deletion dozer-ingestion/src/connectors/postgres/snapshotter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ use tokio::task::JoinSet;
pub struct PostgresSnapshotter<'a> {
pub conn_config: tokio_postgres::Config,
pub ingestor: &'a Ingestor,
pub schema: Option<String>,
}

impl<'a> PostgresSnapshotter<'a> {
pub async fn get_tables(
&self,
tables: &[ListOrFilterColumns],
) -> Result<Vec<SourceSchemaResult>, ConnectorError> {
let helper = SchemaHelper::new(self.conn_config.clone());
let helper = SchemaHelper::new(self.conn_config.clone(), self.schema.clone());
helper
.get_schemas(tables)
.await
Expand Down Expand Up @@ -197,6 +198,7 @@ mod tests {
let snapshotter = PostgresSnapshotter {
conn_config,
ingestor: &ingestor,
schema: None,
};

let actual = snapshotter.sync_tables(&input_tables).await;
Expand Down Expand Up @@ -248,6 +250,7 @@ mod tests {
let snapshotter = PostgresSnapshotter {
conn_config,
ingestor: &ingestor,
schema: None,
};

let actual = snapshotter.sync_tables(&input_tables).await;
Expand Down Expand Up @@ -288,6 +291,7 @@ mod tests {
let snapshotter = PostgresSnapshotter {
conn_config,
ingestor: &ingestor,
schema: None,
};

let actual = snapshotter.sync_tables(&input_tables).await;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ mod tests {
let postgres_config = PostgresConfig {
name: "test".to_string(),
config: conn_config.clone(),
schema: None,
};

let connector = PostgresConnector::new(postgres_config);
Expand Down Expand Up @@ -81,6 +82,7 @@ mod tests {
let postgres_config = PostgresConfig {
name: connector_name,
config: conn_config.clone(),
schema: None,
};

let connector = PostgresConnector::new(postgres_config);
Expand Down
1 change: 1 addition & 0 deletions dozer-ingestion/tests/test_suite/connectors/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ async fn create_postgres_server() -> (Client, PostgresConnectorTest, PostgresCon
let connector = PostgresConnector::new(PostgresConfig {
name: "postgres_connector_test".to_string(),
config: config.clone(),
schema: None,
});

let client = connect(config.clone()).await.unwrap();
Expand Down
7 changes: 7 additions & 0 deletions dozer-types/src/models/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,16 @@ pub struct PostgresConfig {
pub database: Option<String>,

/// The sslmode to use for the connection (disable, prefer, require)
#[serde(skip_serializing_if = "Option::is_none")]
pub sslmode: Option<String>,

/// The connection url to use
#[serde(skip_serializing_if = "Option::is_none")]
pub connection_url: Option<String>,

/// The connection url to use
#[serde(skip_serializing_if = "Option::is_none")]
pub schema: Option<String>,
}

impl SchemaExample for PostgresConfig {
Expand All @@ -61,6 +67,7 @@ impl SchemaExample for PostgresConfig {
host: Some("localhost".to_string()),
port: Some(5432),
database: Some("postgres".to_string()),
schema: Some("public".to_string()),
..Default::default()
}
}
Expand Down
6 changes: 6 additions & 0 deletions dozer-types/src/tests/postgres_yaml_deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ fn standard() {
database: Some("users".to_string()),
sslmode: None,
connection_url: None,
schema: None,
};
let expected = ConnectionConfig::Postgres(postgres_auth);
assert_eq!(expected, deserializer_result);
Expand Down Expand Up @@ -51,6 +52,7 @@ fn standard_with_ssl_mode() {
database: Some("users".to_string()),
sslmode: Some("require".to_string()),
connection_url: None,
schema: None,
};
let expected = ConnectionConfig::Postgres(postgres_auth);
assert_eq!(expected, deserializer_result);
Expand Down Expand Up @@ -79,6 +81,7 @@ fn standard_url() {
database: None,
sslmode: None,
connection_url: Some("postgres://postgres:[email protected]:5432/neondb?sslmode=prefer".to_string()),
schema: None,
};
let expected = ConnectionConfig::Postgres(postgres_auth);
assert_eq!(expected, deserializer_result);
Expand Down Expand Up @@ -108,6 +111,7 @@ fn standard_url_missing_user() {
database: None,
sslmode: None,
connection_url: Some("postgresql://localhost:5432/stocks?sslmode=prefer".to_string()),
schema: None,
};
let expected = ConnectionConfig::Postgres(postgres_auth);
assert_eq!(expected, deserializer_result);
Expand Down Expand Up @@ -138,6 +142,7 @@ fn standard_url_missing_password() {
database: None,
sslmode: None,
connection_url: Some("postgresql://localhost:5432/stocks?sslmode=prefer".to_string()),
schema: None,
};
let expected = ConnectionConfig::Postgres(postgres_auth);
assert_eq!(expected, deserializer_result);
Expand Down Expand Up @@ -168,6 +173,7 @@ fn standard_url_2() {
database: None,
sslmode: None,
connection_url: Some("postgresql://localhost:5432/stocks?sslmode=prefer".to_string()),
schema: None,
};
let expected = ConnectionConfig::Postgres(postgres_auth);
assert_eq!(expected, deserializer_result);
Expand Down
10 changes: 8 additions & 2 deletions json_schemas/connections.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@
"description": "Configuration for a Postgres connection",
"examples": [
{
"connection_url": null,
"database": "postgres",
"host": "localhost",
"password": "postgres",
"port": 5432,
"sslmode": null,
"schema": "public",
"user": "postgres"
}
],
Expand Down Expand Up @@ -55,6 +54,13 @@
"format": "uint32",
"minimum": 0.0
},
"schema": {
"description": "The connection url to use",
"type": [
"string",
"null"
]
},
"sslmode": {
"description": "The sslmode to use for the connection (disable, prefer, require)",
"type": [
Expand Down
10 changes: 8 additions & 2 deletions json_schemas/dozer.json
Original file line number Diff line number Diff line change
Expand Up @@ -1298,12 +1298,11 @@
"description": "Configuration for a Postgres connection",
"examples": [
{
"connection_url": null,
"database": "postgres",
"host": "localhost",
"password": "postgres",
"port": 5432,
"sslmode": null,
"schema": "public",
"user": "postgres"
}
],
Expand Down Expand Up @@ -1346,6 +1345,13 @@
"format": "uint32",
"minimum": 0.0
},
"schema": {
"description": "The connection url to use",
"type": [
"string",
"null"
]
},
"sslmode": {
"description": "The sslmode to use for the connection (disable, prefer, require)",
"type": [
Expand Down

0 comments on commit 9323374

Please sign in to comment.