From 9323374ab538be6da739c571c708ee99bdaf4ba8 Mon Sep 17 00:00:00 2001 From: VG Date: Tue, 10 Oct 2023 01:42:18 +0800 Subject: [PATCH] chore: init schema option at connection level --- dozer-cli/src/cli/init.rs | 1 + dozer-ingestion/src/connectors/mod.rs | 5 ++-- .../src/connectors/postgres/connector.rs | 6 ++++- .../src/connectors/postgres/iterator.rs | 4 ++++ .../src/connectors/postgres/schema/helper.rs | 24 +++++++++++++++---- .../src/connectors/postgres/schema/tests.rs | 8 +++---- .../src/connectors/postgres/snapshotter.rs | 6 ++++- .../tests/continue_replication_tests.rs | 2 ++ .../tests/test_suite/connectors/postgres.rs | 1 + dozer-types/src/models/connection.rs | 7 ++++++ .../src/tests/postgres_yaml_deserialize.rs | 6 +++++ json_schemas/connections.json | 10 ++++++-- json_schemas/dozer.json | 10 ++++++-- 13 files changed, 74 insertions(+), 16 deletions(-) diff --git a/dozer-cli/src/cli/init.rs b/dozer-cli/src/cli/init.rs index bedc000470..2e71e22907 100644 --- a/dozer-cli/src/cli/init.rs +++ b/dozer-cli/src/cli/init.rs @@ -159,6 +159,7 @@ pub fn generate_connection(connection_name: &str) -> Connection { database: Some("users".to_owned()), sslmode: None, connection_url: None, + schema: None, }; let connection: Connection = Connection { name: "postgres".to_owned(), diff --git a/dozer-ingestion/src/connectors/mod.rs b/dozer-ingestion/src/connectors/mod.rs index bc08d6b0b3..f1a1877cd7 100644 --- a/dozer-ingestion/src/connectors/mod.rs +++ b/dozer-ingestion/src/connectors/mod.rs @@ -202,12 +202,13 @@ impl TableToIngest { pub fn get_connector(connection: Connection) -> Result, ConnectorError> { let config = connection.config; - match config { - ConnectionConfig::Postgres(_) => { + match config.clone() { + ConnectionConfig::Postgres(c) => { let config = map_connection_config(&config)?; let postgres_config = PostgresConfig { name: connection.name, config, + schema: c.schema, }; if let Some(dbname) = postgres_config.config.get_dbname() { diff --git a/dozer-ingestion/src/connectors/postgres/connector.rs b/dozer-ingestion/src/connectors/postgres/connector.rs index 00c36890df..ee6f47035d 100644 --- a/dozer-ingestion/src/connectors/postgres/connector.rs +++ b/dozer-ingestion/src/connectors/postgres/connector.rs @@ -24,6 +24,7 @@ use super::connection::helper; pub struct PostgresConfig { pub name: String, pub config: Config, + pub schema: Option, } #[derive(Debug)] @@ -32,6 +33,7 @@ pub struct PostgresConnector { replication_conn_config: Config, conn_config: Config, schema_helper: SchemaHelper, + pub schema: Option, } #[derive(Debug)] @@ -47,7 +49,7 @@ impl PostgresConnector { let mut replication_conn_config = config.config.clone(); replication_conn_config.replication_mode(ReplicationMode::Logical); - let helper = SchemaHelper::new(config.config.clone()); + let helper = SchemaHelper::new(config.config.clone(), config.schema.clone()); // conn_str - replication_conn_config // conn_str_plain- conn_config @@ -57,6 +59,7 @@ impl PostgresConnector { conn_config: config.config, replication_conn_config, schema_helper: helper, + schema: config.schema, } } @@ -198,6 +201,7 @@ impl Connector for PostgresConnector { self.replication_conn_config.clone(), ingestor, self.conn_config.clone(), + self.schema.clone(), ); iterator.start(lsn).await } diff --git a/dozer-ingestion/src/connectors/postgres/iterator.rs b/dozer-ingestion/src/connectors/postgres/iterator.rs index da4c5b1f03..50fdb088bf 100644 --- a/dozer-ingestion/src/connectors/postgres/iterator.rs +++ b/dozer-ingestion/src/connectors/postgres/iterator.rs @@ -27,6 +27,7 @@ pub struct Details { tables: Vec, replication_conn_config: tokio_postgres::Config, conn_config: tokio_postgres::Config, + schema: Option, } #[derive(Debug, Clone, Copy)] @@ -51,6 +52,7 @@ impl<'a> PostgresIterator<'a> { replication_conn_config: tokio_postgres::Config, ingestor: &'a Ingestor, conn_config: tokio_postgres::Config, + schema: Option, ) -> Self { let details = Arc::new(Details { name, @@ -59,6 +61,7 @@ impl<'a> PostgresIterator<'a> { tables, replication_conn_config, conn_config, + schema, }); PostgresIterator { details, ingestor } } @@ -160,6 +163,7 @@ impl<'a> PostgresIteratorHandler<'a> { let snapshotter = PostgresSnapshotter { conn_config: details.conn_config.to_owned(), ingestor: self.ingestor, + schema: details.schema.clone(), }; let tables = details .tables diff --git a/dozer-ingestion/src/connectors/postgres/schema/helper.rs b/dozer-ingestion/src/connectors/postgres/schema/helper.rs index d369062477..61f2a1fd9e 100644 --- a/dozer-ingestion/src/connectors/postgres/schema/helper.rs +++ b/dozer-ingestion/src/connectors/postgres/schema/helper.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use crate::connectors::{CdcType, ListOrFilterColumns, SourceSchema, SourceSchemaResult}; use crate::errors::{ConnectorError, PostgresConnectorError, PostgresSchemaError}; +use dozer_types::models::connection::PostgresConfig; use dozer_types::types::{FieldDefinition, FieldType, Schema, SourceDefinition}; use crate::connectors::postgres::connection::helper; @@ -18,6 +19,8 @@ use PostgresSchemaError::TableTypeNotFound; #[derive(Debug)] pub struct SchemaHelper { conn_config: tokio_postgres::Config, + // Postgres schema + schema: Option, } struct PostgresTableRow { @@ -84,8 +87,11 @@ pub struct PostgresTableInfo { type RowsWithColumnsMap = (Vec, HashMap>); impl SchemaHelper { - pub fn new(conn_config: tokio_postgres::Config) -> SchemaHelper { - Self { conn_config } + pub fn new(conn_config: tokio_postgres::Config, schema: Option) -> SchemaHelper { + Self { + conn_config, + schema, + } } pub async fn get_tables( @@ -185,8 +191,18 @@ impl SchemaHelper { ); client.query(&sql, &[&schemas, &table_names]).await } else { - let sql = str::replace(SQL, ":tables_name_condition", "t.table_type = 'BASE TABLE'"); - client.query(&sql, &[]).await + if let Some(schema) = &self.schema { + let sql = str::replace( + SQL, + ":tables_name_condition", + "t.table_schema = $1 AND t.table_type = 'BASE TABLE'", + ); + client.query(&sql, &[&schema]).await + } else { + let sql = + str::replace(SQL, ":tables_name_condition", "t.table_type = 'BASE TABLE'"); + client.query(&sql, &[]).await + } }; query diff --git a/dozer-ingestion/src/connectors/postgres/schema/tests.rs b/dozer-ingestion/src/connectors/postgres/schema/tests.rs index 655601a861..1bd5a2b009 100644 --- a/dozer-ingestion/src/connectors/postgres/schema/tests.rs +++ b/dozer-ingestion/src/connectors/postgres/schema/tests.rs @@ -34,7 +34,7 @@ async fn test_connector_get_tables() { client.create_schema(&schema).await; client.create_simple_table(&schema, &table_name).await; - let schema_helper = SchemaHelper::new(client.postgres_config.clone()); + let schema_helper = SchemaHelper::new(client.postgres_config.clone(), None); let result = schema_helper.get_tables(None).await.unwrap(); let table = result.get(0).unwrap(); @@ -69,7 +69,7 @@ async fn test_connector_get_schema_with_selected_columns() { client.create_schema(&schema).await; client.create_simple_table(&schema, &table_name).await; - let schema_helper = SchemaHelper::new(client.postgres_config.clone()); + let schema_helper = SchemaHelper::new(client.postgres_config.clone(), None); let table_info = ListOrFilterColumns { schema: Some(schema.clone()), name: table_name.clone(), @@ -104,7 +104,7 @@ async fn test_connector_get_schema_without_selected_columns() { client.create_schema(&schema).await; client.create_simple_table(&schema, &table_name).await; - let schema_helper = SchemaHelper::new(client.postgres_config.clone()); + let schema_helper = SchemaHelper::new(client.postgres_config.clone(), None); let table_info = ListOrFilterColumns { name: table_name.clone(), schema: Some(schema.clone()), @@ -146,7 +146,7 @@ async fn test_connector_view_cannot_be_used() { client.create_simple_table(&schema, &table_name).await; client.create_view(&schema, &table_name, &view_name).await; - let schema_helper = SchemaHelper::new(client.postgres_config.clone()); + let schema_helper = SchemaHelper::new(client.postgres_config.clone(), None); let table_info = ListOrFilterColumns { name: view_name, schema: Some(schema.clone()), diff --git a/dozer-ingestion/src/connectors/postgres/snapshotter.rs b/dozer-ingestion/src/connectors/postgres/snapshotter.rs index 64f6eb1f8d..933cb56801 100644 --- a/dozer-ingestion/src/connectors/postgres/snapshotter.rs +++ b/dozer-ingestion/src/connectors/postgres/snapshotter.rs @@ -21,6 +21,7 @@ use tokio::task::JoinSet; pub struct PostgresSnapshotter<'a> { pub conn_config: tokio_postgres::Config, pub ingestor: &'a Ingestor, + pub schema: Option, } impl<'a> PostgresSnapshotter<'a> { @@ -28,7 +29,7 @@ impl<'a> PostgresSnapshotter<'a> { &self, tables: &[ListOrFilterColumns], ) -> Result, ConnectorError> { - let helper = SchemaHelper::new(self.conn_config.clone()); + let helper = SchemaHelper::new(self.conn_config.clone(), self.schema.clone()); helper .get_schemas(tables) .await @@ -197,6 +198,7 @@ mod tests { let snapshotter = PostgresSnapshotter { conn_config, ingestor: &ingestor, + schema: None, }; let actual = snapshotter.sync_tables(&input_tables).await; @@ -248,6 +250,7 @@ mod tests { let snapshotter = PostgresSnapshotter { conn_config, ingestor: &ingestor, + schema: None, }; let actual = snapshotter.sync_tables(&input_tables).await; @@ -288,6 +291,7 @@ mod tests { let snapshotter = PostgresSnapshotter { conn_config, ingestor: &ingestor, + schema: None, }; let actual = snapshotter.sync_tables(&input_tables).await; diff --git a/dozer-ingestion/src/connectors/postgres/tests/continue_replication_tests.rs b/dozer-ingestion/src/connectors/postgres/tests/continue_replication_tests.rs index 795ff6be6f..e51a1239b3 100644 --- a/dozer-ingestion/src/connectors/postgres/tests/continue_replication_tests.rs +++ b/dozer-ingestion/src/connectors/postgres/tests/continue_replication_tests.rs @@ -26,6 +26,7 @@ mod tests { let postgres_config = PostgresConfig { name: "test".to_string(), config: conn_config.clone(), + schema: None, }; let connector = PostgresConnector::new(postgres_config); @@ -81,6 +82,7 @@ mod tests { let postgres_config = PostgresConfig { name: connector_name, config: conn_config.clone(), + schema: None, }; let connector = PostgresConnector::new(postgres_config); diff --git a/dozer-ingestion/tests/test_suite/connectors/postgres.rs b/dozer-ingestion/tests/test_suite/connectors/postgres.rs index 019c474fa1..10ecad3ec1 100644 --- a/dozer-ingestion/tests/test_suite/connectors/postgres.rs +++ b/dozer-ingestion/tests/test_suite/connectors/postgres.rs @@ -131,6 +131,7 @@ async fn create_postgres_server() -> (Client, PostgresConnectorTest, PostgresCon let connector = PostgresConnector::new(PostgresConfig { name: "postgres_connector_test".to_string(), config: config.clone(), + schema: None, }); let client = connect(config.clone()).await.unwrap(); diff --git a/dozer-types/src/models/connection.rs b/dozer-types/src/models/connection.rs index 9345b9a63a..182eb888bc 100644 --- a/dozer-types/src/models/connection.rs +++ b/dozer-types/src/models/connection.rs @@ -47,10 +47,16 @@ pub struct PostgresConfig { pub database: Option, /// The sslmode to use for the connection (disable, prefer, require) + #[serde(skip_serializing_if = "Option::is_none")] pub sslmode: Option, /// The connection url to use + #[serde(skip_serializing_if = "Option::is_none")] pub connection_url: Option, + + /// The connection url to use + #[serde(skip_serializing_if = "Option::is_none")] + pub schema: Option, } impl SchemaExample for PostgresConfig { @@ -61,6 +67,7 @@ impl SchemaExample for PostgresConfig { host: Some("localhost".to_string()), port: Some(5432), database: Some("postgres".to_string()), + schema: Some("public".to_string()), ..Default::default() } } diff --git a/dozer-types/src/tests/postgres_yaml_deserialize.rs b/dozer-types/src/tests/postgres_yaml_deserialize.rs index 28565cb09b..0cb03f6ce9 100644 --- a/dozer-types/src/tests/postgres_yaml_deserialize.rs +++ b/dozer-types/src/tests/postgres_yaml_deserialize.rs @@ -18,6 +18,7 @@ fn standard() { database: Some("users".to_string()), sslmode: None, connection_url: None, + schema: None, }; let expected = ConnectionConfig::Postgres(postgres_auth); assert_eq!(expected, deserializer_result); @@ -51,6 +52,7 @@ fn standard_with_ssl_mode() { database: Some("users".to_string()), sslmode: Some("require".to_string()), connection_url: None, + schema: None, }; let expected = ConnectionConfig::Postgres(postgres_auth); assert_eq!(expected, deserializer_result); @@ -79,6 +81,7 @@ fn standard_url() { database: None, sslmode: None, connection_url: Some("postgres://postgres:postgres@ep-silent-bread-370191.ap-southeast-1.aws.neon.tech:5432/neondb?sslmode=prefer".to_string()), + schema: None, }; let expected = ConnectionConfig::Postgres(postgres_auth); assert_eq!(expected, deserializer_result); @@ -108,6 +111,7 @@ fn standard_url_missing_user() { database: None, sslmode: None, connection_url: Some("postgresql://localhost:5432/stocks?sslmode=prefer".to_string()), + schema: None, }; let expected = ConnectionConfig::Postgres(postgres_auth); assert_eq!(expected, deserializer_result); @@ -138,6 +142,7 @@ fn standard_url_missing_password() { database: None, sslmode: None, connection_url: Some("postgresql://localhost:5432/stocks?sslmode=prefer".to_string()), + schema: None, }; let expected = ConnectionConfig::Postgres(postgres_auth); assert_eq!(expected, deserializer_result); @@ -168,6 +173,7 @@ fn standard_url_2() { database: None, sslmode: None, connection_url: Some("postgresql://localhost:5432/stocks?sslmode=prefer".to_string()), + schema: None, }; let expected = ConnectionConfig::Postgres(postgres_auth); assert_eq!(expected, deserializer_result); diff --git a/json_schemas/connections.json b/json_schemas/connections.json index 42ed688795..417e77bd38 100644 --- a/json_schemas/connections.json +++ b/json_schemas/connections.json @@ -7,12 +7,11 @@ "description": "Configuration for a Postgres connection", "examples": [ { - "connection_url": null, "database": "postgres", "host": "localhost", "password": "postgres", "port": 5432, - "sslmode": null, + "schema": "public", "user": "postgres" } ], @@ -55,6 +54,13 @@ "format": "uint32", "minimum": 0.0 }, + "schema": { + "description": "The connection url to use", + "type": [ + "string", + "null" + ] + }, "sslmode": { "description": "The sslmode to use for the connection (disable, prefer, require)", "type": [ diff --git a/json_schemas/dozer.json b/json_schemas/dozer.json index fe61635330..f7c810f7fb 100644 --- a/json_schemas/dozer.json +++ b/json_schemas/dozer.json @@ -1298,12 +1298,11 @@ "description": "Configuration for a Postgres connection", "examples": [ { - "connection_url": null, "database": "postgres", "host": "localhost", "password": "postgres", "port": 5432, - "sslmode": null, + "schema": "public", "user": "postgres" } ], @@ -1346,6 +1345,13 @@ "format": "uint32", "minimum": 0.0 }, + "schema": { + "description": "The connection url to use", + "type": [ + "string", + "null" + ] + }, "sslmode": { "description": "The sslmode to use for the connection (disable, prefer, require)", "type": [