From b6ed058faa1dd1d1a52ecabbe01c444ddb88c94d Mon Sep 17 00:00:00 2001 From: Aarya Attrey <74827081+aaryaattrey@users.noreply.github.com> Date: Mon, 9 Oct 2023 20:21:42 +0530 Subject: [PATCH] implement example for connectors (#2141) * implement example for connectors * address reviews * address reviews --------- Co-authored-by: aaryaattrey --- dozer-types/src/models/ingestion_types.rs | 142 +++++++++++++++++++++- json_schemas/dozer.json | 105 ++++++++++++++++ 2 files changed, 246 insertions(+), 1 deletion(-) diff --git a/dozer-types/src/models/ingestion_types.rs b/dozer-types/src/models/ingestion_types.rs index 872b15da75..a4c382f94b 100644 --- a/dozer-types/src/models/ingestion_types.rs +++ b/dozer-types/src/models/ingestion_types.rs @@ -6,6 +6,7 @@ use serde::{Deserialize, Serialize}; use crate::{ helper::{deserialize_duration_secs_f64, f64_schema, serialize_duration_secs_f64}, + models::connection::SchemaExample, node::OpIdentifier, types::Operation, }; @@ -46,6 +47,8 @@ pub struct EthFilter { } #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] +#[schemars(example = "Self::example")] + pub struct GrpcConfig { #[serde(skip_serializing_if = "Option::is_none")] pub host: Option, @@ -77,7 +80,9 @@ pub enum GrpcConfigSchemas { Path(String), } -#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema, Default)] +#[schemars(example = "Self::example")] + pub struct EthConfig { pub provider: EthProviderConfig, } @@ -150,6 +155,8 @@ pub struct EthContract { } #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] +#[schemars(example = "Self::example")] + pub struct KafkaConfig { pub broker: String, @@ -171,6 +178,8 @@ impl KafkaConfig { } #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] +#[schemars(example = "Self::example")] + pub struct SnowflakeConfig { pub server: String, @@ -300,6 +309,7 @@ pub struct S3Details { } #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] +#[schemars(example = "Self::example")] pub struct S3Storage { pub details: S3Details, @@ -316,6 +326,27 @@ impl S3Storage { ) } } +impl SchemaExample for S3Storage { + fn example() -> Self { + let s3_details = S3Details { + access_key_id: "".to_owned(), + secret_access_key: "".to_owned(), + region: "".to_owned(), + bucket_name: "".to_owned(), + }; + Self { + details: s3_details, + tables: vec![Table { + config: Some(TableConfig::CSV(CsvConfig { + path: "path/to/file".to_owned(), + extension: ".csv".to_owned(), + marker_extension: None, + })), + name: "table_name".to_owned(), + }], + } + } +} #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] pub struct LocalDetails { @@ -323,6 +354,8 @@ pub struct LocalDetails { } #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] +#[schemars(example = "Self::example")] + pub struct LocalStorage { pub details: LocalDetails, @@ -349,16 +382,22 @@ impl DeltaTable { } #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] +#[schemars(example = "Self::example")] + pub struct DeltaLakeConfig { pub tables: Vec, } #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] +#[schemars(example = "Self::example")] + pub struct MongodbConfig { pub connection_string: String, } #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] +#[schemars(example = "Self::example")] + pub struct MySQLConfig { pub url: String, @@ -399,3 +438,104 @@ pub fn default_buffer_size() -> u32 { pub fn default_snowflake_poll_interval() -> Duration { Duration::from_secs(60) } + +impl SchemaExample for MongodbConfig { + fn example() -> Self { + Self { + connection_string: "mongodb://localhost:27017/db_name".to_owned(), + } + } +} + +impl SchemaExample for MySQLConfig { + fn example() -> Self { + Self { + url: "mysql://root:1234@localhost:3306/db_name".to_owned(), + server_id: Some((1).to_owned()), + } + } +} + +impl SchemaExample for GrpcConfig { + fn example() -> Self { + Self { + host: Some("localhost".to_owned()), + port: Some(50051), + schemas: GrpcConfigSchemas::Path("schema.json".to_owned()), + adapter: Some("arrow".to_owned()), + } + } +} + +impl SchemaExample for KafkaConfig { + fn example() -> Self { + Self { + broker: "".to_owned(), + schema_registry_url: Some("".to_owned()), + } + } +} + +impl SchemaExample for DeltaLakeConfig { + fn example() -> Self { + Self { + tables: vec![DeltaTable { + path: "".to_owned(), + name: "".to_owned(), + }], + } + } +} + +impl SchemaExample for LocalStorage { + fn example() -> Self { + Self { + details: LocalDetails { + path: "path".to_owned(), + }, + tables: vec![Table { + config: Some(TableConfig::CSV(CsvConfig { + path: "path/to/table".to_owned(), + extension: ".csv".to_owned(), + marker_extension: None, + })), + name: "table_name".to_owned(), + }], + } + } +} + +impl SchemaExample for SnowflakeConfig { + fn example() -> Self { + Self { + server: "..snowflakecomputing.com".to_owned(), + port: "443".to_owned(), + user: "bob".to_owned(), + password: "password".to_owned(), + database: "database".to_owned(), + schema: "schema".to_owned(), + warehouse: "warehouse".to_owned(), + driver: Some("SnowflakeDSIIDriver".to_owned()), + role: "role".to_owned(), + poll_interval_seconds: None, + } + } +} + +impl SchemaExample for EthConfig { + fn example() -> Self { + let eth_filter = EthFilter { + from_block: Some(0), + to_block: None, + addresses: vec![], + topics: vec![], + }; + Self { + provider: EthProviderConfig::Log(EthLogConfig { + wss_url: "".to_owned(), + filter: Some(eth_filter), + contracts: vec![], + }), + } + } +} diff --git a/json_schemas/dozer.json b/json_schemas/dozer.json index 9f67d6522c..e03358b6e6 100644 --- a/json_schemas/dozer.json +++ b/json_schemas/dozer.json @@ -630,6 +630,16 @@ } }, "DeltaLakeConfig": { + "examples": [ + { + "tables": [ + { + "name": "", + "path": "" + } + ] + } + ], "type": "object", "required": [ "tables" @@ -712,6 +722,22 @@ "additionalProperties": false }, "EthConfig": { + "examples": [ + { + "provider": { + "Log": { + "contracts": [], + "filter": { + "addresses": [], + "from_block": 0, + "to_block": null, + "topics": [] + }, + "wss_url": "" + } + } + } + ], "type": "object", "required": [ "provider" @@ -962,6 +988,16 @@ "additionalProperties": false }, "GrpcConfig": { + "examples": [ + { + "adapter": "arrow", + "host": "localhost", + "port": 50051, + "schemas": { + "Path": "schema.json" + } + } + ], "type": "object", "required": [ "schemas" @@ -1021,6 +1057,12 @@ ] }, "KafkaConfig": { + "examples": [ + { + "broker": "", + "schema_registry_url": "" + } + ], "type": "object", "required": [ "broker" @@ -1049,6 +1091,24 @@ } }, "LocalStorage": { + "examples": [ + { + "details": { + "path": "path" + }, + "tables": [ + { + "config": { + "CSV": { + "extension": ".csv", + "path": "path/to/table" + } + }, + "name": "table_name" + } + ] + } + ], "type": "object", "required": [ "details", @@ -1097,6 +1157,11 @@ "additionalProperties": false }, "MongodbConfig": { + "examples": [ + { + "connection_string": "mongodb://localhost:27017/db_name" + } + ], "type": "object", "required": [ "connection_string" @@ -1108,6 +1173,12 @@ } }, "MySQLConfig": { + "examples": [ + { + "server_id": 1, + "url": "mysql://root:1234@localhost:3306/db_name" + } + ], "type": "object", "required": [ "url" @@ -1393,6 +1464,27 @@ } }, "S3Storage": { + "examples": [ + { + "details": { + "access_key_id": "", + "bucket_name": "", + "region": "", + "secret_access_key": "" + }, + "tables": [ + { + "config": { + "CSV": { + "extension": ".csv", + "path": "path/to/file" + } + }, + "name": "table_name" + } + ] + } + ], "type": "object", "required": [ "details", @@ -1473,6 +1565,19 @@ "additionalProperties": false }, "SnowflakeConfig": { + "examples": [ + { + "database": "database", + "driver": "SnowflakeDSIIDriver", + "password": "password", + "port": "443", + "role": "role", + "schema": "schema", + "server": "..snowflakecomputing.com", + "user": "bob", + "warehouse": "warehouse" + } + ], "type": "object", "required": [ "database",