From ff8c2c4d03191b463218277af7f8a299fbbdf6a1 Mon Sep 17 00:00:00 2001 From: VG Date: Mon, 9 Oct 2023 22:58:06 +0800 Subject: [PATCH 1/3] chore: include errors if table schemas are unavailable --- dozer-cli/src/live/state.rs | 9 +- dozer-ingestion/src/connectors/mod.rs | 2 +- dozer-types/protos/types.proto | 1 + dozer-types/src/models/connection.rs | 5 +- json_schemas/connections.json | 137 ++++++++++++++++++++++++-- json_schemas/dozer.json | 2 +- 6 files changed, 143 insertions(+), 13 deletions(-) diff --git a/dozer-cli/src/live/state.rs b/dozer-cli/src/live/state.rs index 6044597b90..08bf2cdce5 100644 --- a/dozer-cli/src/live/state.rs +++ b/dozer-cli/src/live/state.rs @@ -1,4 +1,4 @@ -use std::{sync::Arc, thread::JoinHandle}; +use std::{collections::HashMap, sync::Arc, thread::JoinHandle}; use clap::Parser; @@ -204,6 +204,7 @@ impl LiveState { let contract = get_contract(&dozer)?; Ok(SchemasResponse { schemas: contract.get_endpoints_schemas(), + errors: HashMap::new(), }) } pub async fn get_source_schemas( @@ -217,7 +218,10 @@ impl LiveState { contract .get_source_schemas(&connection_name) .ok_or(LiveError::ConnectionNotFound(connection_name)) - .map(|schemas| SchemasResponse { schemas }) + .map(|schemas| SchemasResponse { + schemas, + errors: HashMap::new(), + }) } pub async fn get_graph_schemas(&self) -> Result { @@ -227,6 +231,7 @@ impl LiveState { Ok(SchemasResponse { schemas: contract.get_graph_schemas(), + errors: HashMap::new(), }) } diff --git a/dozer-ingestion/src/connectors/mod.rs b/dozer-ingestion/src/connectors/mod.rs index f8f808ee66..4510f94c30 100644 --- a/dozer-ingestion/src/connectors/mod.rs +++ b/dozer-ingestion/src/connectors/mod.rs @@ -27,7 +27,7 @@ use crate::ingestion::Ingestor; use dozer_types::log::debug; use dozer_types::models::connection::Connection; use dozer_types::models::connection::ConnectionConfig; -use dozer_types::models::ingestion_types::{default_grpc_adapter, EthProviderConfig}; +use dozer_types::models::ingestion_types::{default_grpc_adapter}; use dozer_types::node::OpIdentifier; use dozer_types::tonic::async_trait; diff --git a/dozer-types/protos/types.proto b/dozer-types/protos/types.proto index bfa5f15328..fc16c7e539 100644 --- a/dozer-types/protos/types.proto +++ b/dozer-types/protos/types.proto @@ -136,6 +136,7 @@ message Value { } message SchemasResponse { map schemas = 1; + map errors = 2; } message Schema { diff --git a/dozer-types/src/models/connection.rs b/dozer-types/src/models/connection.rs index ab9e98345f..9345b9a63a 100644 --- a/dozer-types/src/models/connection.rs +++ b/dozer-types/src/models/connection.rs @@ -57,12 +57,11 @@ impl SchemaExample for PostgresConfig { fn example() -> Self { Self { user: Some("postgres".to_string()), - password: None, + password: Some("postgres".to_string()), host: Some("localhost".to_string()), port: Some(5432), database: Some("postgres".to_string()), - sslmode: None, - connection_url: None, + ..Default::default() } } } diff --git a/json_schemas/connections.json b/json_schemas/connections.json index e4d3cee2c2..8ad7be70de 100644 --- a/json_schemas/connections.json +++ b/json_schemas/connections.json @@ -10,7 +10,7 @@ "connection_url": null, "database": "postgres", "host": "localhost", - "password": null, + "password": "postgres", "port": 5432, "sslmode": null, "user": "postgres" @@ -78,6 +78,22 @@ "schema": { "$schema": "http://json-schema.org/draft-07/schema#", "title": "EthConfig", + "examples": [ + { + "provider": { + "Log": { + "contracts": [], + "filter": { + "addresses": [], + "from_block": 0, + "to_block": null, + "topics": [] + }, + "wss_url": "" + } + } + } + ], "type": "object", "required": [ "provider" @@ -111,6 +127,7 @@ "type": "object", "properties": { "addresses": { + "default": [], "type": "array", "items": { "type": "string" @@ -133,6 +150,7 @@ "minimum": 0.0 }, "topics": { + "default": [], "type": "array", "items": { "type": "string" @@ -147,6 +165,7 @@ ], "properties": { "contracts": { + "default": [], "type": "array", "items": { "$ref": "#/definitions/EthContract" @@ -236,6 +255,16 @@ "schema": { "$schema": "http://json-schema.org/draft-07/schema#", "title": "GrpcConfig", + "examples": [ + { + "adapter": "arrow", + "host": "localhost", + "port": 50051, + "schemas": { + "Path": "schema.json" + } + } + ], "type": "object", "required": [ "schemas" @@ -302,6 +331,19 @@ "schema": { "$schema": "http://json-schema.org/draft-07/schema#", "title": "SnowflakeConfig", + "examples": [ + { + "database": "database", + "driver": "SnowflakeDSIIDriver", + "password": "password", + "port": "443", + "role": "role", + "schema": "schema", + "server": "..snowflakecomputing.com", + "user": "bob", + "warehouse": "warehouse" + } + ], "type": "object", "required": [ "database", @@ -356,6 +398,12 @@ "schema": { "$schema": "http://json-schema.org/draft-07/schema#", "title": "KafkaConfig", + "examples": [ + { + "broker": "", + "schema_registry_url": "" + } + ], "type": "object", "required": [ "broker" @@ -378,6 +426,27 @@ "schema": { "$schema": "http://json-schema.org/draft-07/schema#", "title": "S3Storage", + "examples": [ + { + "details": { + "access_key_id": "", + "bucket_name": "", + "region": "", + "secret_access_key": "" + }, + "tables": [ + { + "config": { + "CSV": { + "extension": ".csv", + "path": "path/to/file" + } + }, + "name": "table_name" + } + ] + } + ], "type": "object", "required": [ "details", @@ -474,12 +543,18 @@ "Table": { "type": "object", "required": [ - "config", "name" ], "properties": { "config": { - "$ref": "#/definitions/TableConfig" + "anyOf": [ + { + "$ref": "#/definitions/TableConfig" + }, + { + "type": "null" + } + ] }, "name": { "type": "string" @@ -534,6 +609,24 @@ "schema": { "$schema": "http://json-schema.org/draft-07/schema#", "title": "LocalStorage", + "examples": [ + { + "details": { + "path": "path" + }, + "tables": [ + { + "config": { + "CSV": { + "extension": ".csv", + "path": "path/to/table" + } + }, + "name": "table_name" + } + ] + } + ], "type": "object", "required": [ "details", @@ -618,12 +711,18 @@ "Table": { "type": "object", "required": [ - "config", "name" ], "properties": { "config": { - "$ref": "#/definitions/TableConfig" + "anyOf": [ + { + "$ref": "#/definitions/TableConfig" + }, + { + "type": "null" + } + ] }, "name": { "type": "string" @@ -678,6 +777,16 @@ "schema": { "$schema": "http://json-schema.org/draft-07/schema#", "title": "DeltaLakeConfig", + "examples": [ + { + "tables": [ + { + "name": "", + "path": "" + } + ] + } + ], "type": "object", "required": [ "tables" @@ -714,6 +823,11 @@ "schema": { "$schema": "http://json-schema.org/draft-07/schema#", "title": "MongodbConfig", + "examples": [ + { + "connection_string": "mongodb://localhost:27017/db_name" + } + ], "type": "object", "required": [ "connection_string" @@ -730,6 +844,12 @@ "schema": { "$schema": "http://json-schema.org/draft-07/schema#", "title": "MySQLConfig", + "examples": [ + { + "server_id": 1, + "url": "mysql://root:1234@localhost:3306/db_name" + } + ], "type": "object", "required": [ "url" @@ -760,7 +880,12 @@ ], "properties": { "log_options": { - "$ref": "#/definitions/NestedDozerLogOptions" + "default": {}, + "allOf": [ + { + "$ref": "#/definitions/NestedDozerLogOptions" + } + ] }, "url": { "type": "string" diff --git a/json_schemas/dozer.json b/json_schemas/dozer.json index 1e2b653da7..9f67d6522c 100644 --- a/json_schemas/dozer.json +++ b/json_schemas/dozer.json @@ -1233,7 +1233,7 @@ "connection_url": null, "database": "postgres", "host": "localhost", - "password": null, + "password": "postgres", "port": 5432, "sslmode": null, "user": "postgres" From b6ed058faa1dd1d1a52ecabbe01c444ddb88c94d Mon Sep 17 00:00:00 2001 From: Aarya Attrey <74827081+aaryaattrey@users.noreply.github.com> Date: Mon, 9 Oct 2023 20:21:42 +0530 Subject: [PATCH 2/3] implement example for connectors (#2141) * implement example for connectors * address reviews * address reviews --------- Co-authored-by: aaryaattrey --- dozer-types/src/models/ingestion_types.rs | 142 +++++++++++++++++++++- json_schemas/dozer.json | 105 ++++++++++++++++ 2 files changed, 246 insertions(+), 1 deletion(-) diff --git a/dozer-types/src/models/ingestion_types.rs b/dozer-types/src/models/ingestion_types.rs index 872b15da75..a4c382f94b 100644 --- a/dozer-types/src/models/ingestion_types.rs +++ b/dozer-types/src/models/ingestion_types.rs @@ -6,6 +6,7 @@ use serde::{Deserialize, Serialize}; use crate::{ helper::{deserialize_duration_secs_f64, f64_schema, serialize_duration_secs_f64}, + models::connection::SchemaExample, node::OpIdentifier, types::Operation, }; @@ -46,6 +47,8 @@ pub struct EthFilter { } #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] +#[schemars(example = "Self::example")] + pub struct GrpcConfig { #[serde(skip_serializing_if = "Option::is_none")] pub host: Option, @@ -77,7 +80,9 @@ pub enum GrpcConfigSchemas { Path(String), } -#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema, Default)] +#[schemars(example = "Self::example")] + pub struct EthConfig { pub provider: EthProviderConfig, } @@ -150,6 +155,8 @@ pub struct EthContract { } #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] +#[schemars(example = "Self::example")] + pub struct KafkaConfig { pub broker: String, @@ -171,6 +178,8 @@ impl KafkaConfig { } #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] +#[schemars(example = "Self::example")] + pub struct SnowflakeConfig { pub server: String, @@ -300,6 +309,7 @@ pub struct S3Details { } #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] +#[schemars(example = "Self::example")] pub struct S3Storage { pub details: S3Details, @@ -316,6 +326,27 @@ impl S3Storage { ) } } +impl SchemaExample for S3Storage { + fn example() -> Self { + let s3_details = S3Details { + access_key_id: "".to_owned(), + secret_access_key: "".to_owned(), + region: "".to_owned(), + bucket_name: "".to_owned(), + }; + Self { + details: s3_details, + tables: vec![Table { + config: Some(TableConfig::CSV(CsvConfig { + path: "path/to/file".to_owned(), + extension: ".csv".to_owned(), + marker_extension: None, + })), + name: "table_name".to_owned(), + }], + } + } +} #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] pub struct LocalDetails { @@ -323,6 +354,8 @@ pub struct LocalDetails { } #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] +#[schemars(example = "Self::example")] + pub struct LocalStorage { pub details: LocalDetails, @@ -349,16 +382,22 @@ impl DeltaTable { } #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] +#[schemars(example = "Self::example")] + pub struct DeltaLakeConfig { pub tables: Vec, } #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] +#[schemars(example = "Self::example")] + pub struct MongodbConfig { pub connection_string: String, } #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] +#[schemars(example = "Self::example")] + pub struct MySQLConfig { pub url: String, @@ -399,3 +438,104 @@ pub fn default_buffer_size() -> u32 { pub fn default_snowflake_poll_interval() -> Duration { Duration::from_secs(60) } + +impl SchemaExample for MongodbConfig { + fn example() -> Self { + Self { + connection_string: "mongodb://localhost:27017/db_name".to_owned(), + } + } +} + +impl SchemaExample for MySQLConfig { + fn example() -> Self { + Self { + url: "mysql://root:1234@localhost:3306/db_name".to_owned(), + server_id: Some((1).to_owned()), + } + } +} + +impl SchemaExample for GrpcConfig { + fn example() -> Self { + Self { + host: Some("localhost".to_owned()), + port: Some(50051), + schemas: GrpcConfigSchemas::Path("schema.json".to_owned()), + adapter: Some("arrow".to_owned()), + } + } +} + +impl SchemaExample for KafkaConfig { + fn example() -> Self { + Self { + broker: "".to_owned(), + schema_registry_url: Some("".to_owned()), + } + } +} + +impl SchemaExample for DeltaLakeConfig { + fn example() -> Self { + Self { + tables: vec![DeltaTable { + path: "".to_owned(), + name: "".to_owned(), + }], + } + } +} + +impl SchemaExample for LocalStorage { + fn example() -> Self { + Self { + details: LocalDetails { + path: "path".to_owned(), + }, + tables: vec![Table { + config: Some(TableConfig::CSV(CsvConfig { + path: "path/to/table".to_owned(), + extension: ".csv".to_owned(), + marker_extension: None, + })), + name: "table_name".to_owned(), + }], + } + } +} + +impl SchemaExample for SnowflakeConfig { + fn example() -> Self { + Self { + server: "..snowflakecomputing.com".to_owned(), + port: "443".to_owned(), + user: "bob".to_owned(), + password: "password".to_owned(), + database: "database".to_owned(), + schema: "schema".to_owned(), + warehouse: "warehouse".to_owned(), + driver: Some("SnowflakeDSIIDriver".to_owned()), + role: "role".to_owned(), + poll_interval_seconds: None, + } + } +} + +impl SchemaExample for EthConfig { + fn example() -> Self { + let eth_filter = EthFilter { + from_block: Some(0), + to_block: None, + addresses: vec![], + topics: vec![], + }; + Self { + provider: EthProviderConfig::Log(EthLogConfig { + wss_url: "".to_owned(), + filter: Some(eth_filter), + contracts: vec![], + }), + } + } +} diff --git a/json_schemas/dozer.json b/json_schemas/dozer.json index 9f67d6522c..e03358b6e6 100644 --- a/json_schemas/dozer.json +++ b/json_schemas/dozer.json @@ -630,6 +630,16 @@ } }, "DeltaLakeConfig": { + "examples": [ + { + "tables": [ + { + "name": "", + "path": "" + } + ] + } + ], "type": "object", "required": [ "tables" @@ -712,6 +722,22 @@ "additionalProperties": false }, "EthConfig": { + "examples": [ + { + "provider": { + "Log": { + "contracts": [], + "filter": { + "addresses": [], + "from_block": 0, + "to_block": null, + "topics": [] + }, + "wss_url": "" + } + } + } + ], "type": "object", "required": [ "provider" @@ -962,6 +988,16 @@ "additionalProperties": false }, "GrpcConfig": { + "examples": [ + { + "adapter": "arrow", + "host": "localhost", + "port": 50051, + "schemas": { + "Path": "schema.json" + } + } + ], "type": "object", "required": [ "schemas" @@ -1021,6 +1057,12 @@ ] }, "KafkaConfig": { + "examples": [ + { + "broker": "", + "schema_registry_url": "" + } + ], "type": "object", "required": [ "broker" @@ -1049,6 +1091,24 @@ } }, "LocalStorage": { + "examples": [ + { + "details": { + "path": "path" + }, + "tables": [ + { + "config": { + "CSV": { + "extension": ".csv", + "path": "path/to/table" + } + }, + "name": "table_name" + } + ] + } + ], "type": "object", "required": [ "details", @@ -1097,6 +1157,11 @@ "additionalProperties": false }, "MongodbConfig": { + "examples": [ + { + "connection_string": "mongodb://localhost:27017/db_name" + } + ], "type": "object", "required": [ "connection_string" @@ -1108,6 +1173,12 @@ } }, "MySQLConfig": { + "examples": [ + { + "server_id": 1, + "url": "mysql://root:1234@localhost:3306/db_name" + } + ], "type": "object", "required": [ "url" @@ -1393,6 +1464,27 @@ } }, "S3Storage": { + "examples": [ + { + "details": { + "access_key_id": "", + "bucket_name": "", + "region": "", + "secret_access_key": "" + }, + "tables": [ + { + "config": { + "CSV": { + "extension": ".csv", + "path": "path/to/file" + } + }, + "name": "table_name" + } + ] + } + ], "type": "object", "required": [ "details", @@ -1473,6 +1565,19 @@ "additionalProperties": false }, "SnowflakeConfig": { + "examples": [ + { + "database": "database", + "driver": "SnowflakeDSIIDriver", + "password": "password", + "port": "443", + "role": "role", + "schema": "schema", + "server": "..snowflakecomputing.com", + "user": "bob", + "warehouse": "warehouse" + } + ], "type": "object", "required": [ "database", From 1bacdd60c6e7931c1608a1df85932548f542e9f4 Mon Sep 17 00:00:00 2001 From: VG Date: Tue, 10 Oct 2023 00:43:58 +0800 Subject: [PATCH 3/3] chore: fix conflicts --- dozer-ingestion/src/connectors/mod.rs | 4 ++- dozer-types/src/models/ingestion_types.rs | 10 +++---- json_schemas/connections.json | 35 ++++------------------- json_schemas/dozer.json | 5 +--- 4 files changed, 15 insertions(+), 39 deletions(-) diff --git a/dozer-ingestion/src/connectors/mod.rs b/dozer-ingestion/src/connectors/mod.rs index 4510f94c30..bc08d6b0b3 100644 --- a/dozer-ingestion/src/connectors/mod.rs +++ b/dozer-ingestion/src/connectors/mod.rs @@ -27,7 +27,7 @@ use crate::ingestion::Ingestor; use dozer_types::log::debug; use dozer_types::models::connection::Connection; use dozer_types::models::connection::ConnectionConfig; -use dozer_types::models::ingestion_types::{default_grpc_adapter}; +use dozer_types::models::ingestion_types::default_grpc_adapter; use dozer_types::node::OpIdentifier; use dozer_types::tonic::async_trait; @@ -45,6 +45,8 @@ pub mod snowflake; use self::dozer::NestedDozerConnector; #[cfg(feature = "ethereum")] use self::ethereum::{EthLogConnector, EthTraceConnector}; +#[cfg(feature = "ethereum")] +use dozer_types::models::ingestion_types::EthProviderConfig; use self::grpc::connector::GrpcConnector; use self::grpc::{ArrowAdapter, DefaultAdapter}; diff --git a/dozer-types/src/models/ingestion_types.rs b/dozer-types/src/models/ingestion_types.rs index a4c382f94b..9e304d7b9a 100644 --- a/dozer-types/src/models/ingestion_types.rs +++ b/dozer-types/src/models/ingestion_types.rs @@ -80,7 +80,7 @@ pub enum GrpcConfigSchemas { Path(String), } -#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema, Default)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] #[schemars(example = "Self::example")] pub struct EthConfig { @@ -337,11 +337,11 @@ impl SchemaExample for S3Storage { Self { details: s3_details, tables: vec![Table { - config: Some(TableConfig::CSV(CsvConfig { + config: TableConfig::CSV(CsvConfig { path: "path/to/file".to_owned(), extension: ".csv".to_owned(), marker_extension: None, - })), + }), name: "table_name".to_owned(), }], } @@ -494,11 +494,11 @@ impl SchemaExample for LocalStorage { path: "path".to_owned(), }, tables: vec![Table { - config: Some(TableConfig::CSV(CsvConfig { + config: TableConfig::CSV(CsvConfig { path: "path/to/table".to_owned(), extension: ".csv".to_owned(), marker_extension: None, - })), + }), name: "table_name".to_owned(), }], } diff --git a/json_schemas/connections.json b/json_schemas/connections.json index 8ad7be70de..42ed688795 100644 --- a/json_schemas/connections.json +++ b/json_schemas/connections.json @@ -82,12 +82,9 @@ { "provider": { "Log": { - "contracts": [], "filter": { - "addresses": [], "from_block": 0, - "to_block": null, - "topics": [] + "to_block": null }, "wss_url": "" } @@ -127,7 +124,6 @@ "type": "object", "properties": { "addresses": { - "default": [], "type": "array", "items": { "type": "string" @@ -150,7 +146,6 @@ "minimum": 0.0 }, "topics": { - "default": [], "type": "array", "items": { "type": "string" @@ -165,7 +160,6 @@ ], "properties": { "contracts": { - "default": [], "type": "array", "items": { "$ref": "#/definitions/EthContract" @@ -543,18 +537,12 @@ "Table": { "type": "object", "required": [ + "config", "name" ], "properties": { "config": { - "anyOf": [ - { - "$ref": "#/definitions/TableConfig" - }, - { - "type": "null" - } - ] + "$ref": "#/definitions/TableConfig" }, "name": { "type": "string" @@ -711,18 +699,12 @@ "Table": { "type": "object", "required": [ + "config", "name" ], "properties": { "config": { - "anyOf": [ - { - "$ref": "#/definitions/TableConfig" - }, - { - "type": "null" - } - ] + "$ref": "#/definitions/TableConfig" }, "name": { "type": "string" @@ -880,12 +862,7 @@ ], "properties": { "log_options": { - "default": {}, - "allOf": [ - { - "$ref": "#/definitions/NestedDozerLogOptions" - } - ] + "$ref": "#/definitions/NestedDozerLogOptions" }, "url": { "type": "string" diff --git a/json_schemas/dozer.json b/json_schemas/dozer.json index e03358b6e6..fe61635330 100644 --- a/json_schemas/dozer.json +++ b/json_schemas/dozer.json @@ -726,12 +726,9 @@ { "provider": { "Log": { - "contracts": [], "filter": { - "addresses": [], "from_block": 0, - "to_block": null, - "topics": [] + "to_block": null }, "wss_url": "" }