Skip to content

Commit

Permalink
implement example for connectors (#2141)
Browse files Browse the repository at this point in the history
* implement example for connectors

* address reviews

* address reviews

---------

Co-authored-by: aaryaattrey <[email protected]>
  • Loading branch information
2 people authored and v3g42 committed Oct 9, 2023
1 parent ff8c2c4 commit b6ed058
Show file tree
Hide file tree
Showing 2 changed files with 246 additions and 1 deletion.
142 changes: 141 additions & 1 deletion dozer-types/src/models/ingestion_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use serde::{Deserialize, Serialize};

use crate::{
helper::{deserialize_duration_secs_f64, f64_schema, serialize_duration_secs_f64},
models::connection::SchemaExample,
node::OpIdentifier,
types::Operation,
};
Expand Down Expand Up @@ -46,6 +47,8 @@ pub struct EthFilter {
}

#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)]
#[schemars(example = "Self::example")]

pub struct GrpcConfig {
#[serde(skip_serializing_if = "Option::is_none")]
pub host: Option<String>,
Expand Down Expand Up @@ -77,7 +80,9 @@ pub enum GrpcConfigSchemas {
Path(String),
}

#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)]
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema, Default)]
#[schemars(example = "Self::example")]

pub struct EthConfig {
pub provider: EthProviderConfig,
}
Expand Down Expand Up @@ -150,6 +155,8 @@ pub struct EthContract {
}

#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)]
#[schemars(example = "Self::example")]

pub struct KafkaConfig {
pub broker: String,

Expand All @@ -171,6 +178,8 @@ impl KafkaConfig {
}

#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)]
#[schemars(example = "Self::example")]

pub struct SnowflakeConfig {
pub server: String,

Expand Down Expand Up @@ -300,6 +309,7 @@ pub struct S3Details {
}

#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)]
#[schemars(example = "Self::example")]
pub struct S3Storage {
pub details: S3Details,

Expand All @@ -316,13 +326,36 @@ impl S3Storage {
)
}
}
impl SchemaExample for S3Storage {
fn example() -> Self {
let s3_details = S3Details {
access_key_id: "".to_owned(),
secret_access_key: "".to_owned(),
region: "".to_owned(),
bucket_name: "".to_owned(),
};
Self {
details: s3_details,
tables: vec![Table {
config: Some(TableConfig::CSV(CsvConfig {
path: "path/to/file".to_owned(),
extension: ".csv".to_owned(),
marker_extension: None,
})),
name: "table_name".to_owned(),
}],
}
}
}

#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)]
pub struct LocalDetails {
pub path: String,
}

#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)]
#[schemars(example = "Self::example")]

pub struct LocalStorage {
pub details: LocalDetails,

Expand All @@ -349,16 +382,22 @@ impl DeltaTable {
}

#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)]
#[schemars(example = "Self::example")]

pub struct DeltaLakeConfig {
pub tables: Vec<DeltaTable>,
}

#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)]
#[schemars(example = "Self::example")]

pub struct MongodbConfig {
pub connection_string: String,
}

#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)]
#[schemars(example = "Self::example")]

pub struct MySQLConfig {
pub url: String,

Expand Down Expand Up @@ -399,3 +438,104 @@ pub fn default_buffer_size() -> u32 {
pub fn default_snowflake_poll_interval() -> Duration {
Duration::from_secs(60)
}

impl SchemaExample for MongodbConfig {
fn example() -> Self {
Self {
connection_string: "mongodb://localhost:27017/db_name".to_owned(),
}
}
}

impl SchemaExample for MySQLConfig {
fn example() -> Self {
Self {
url: "mysql://root:1234@localhost:3306/db_name".to_owned(),
server_id: Some((1).to_owned()),
}
}
}

impl SchemaExample for GrpcConfig {
fn example() -> Self {
Self {
host: Some("localhost".to_owned()),
port: Some(50051),
schemas: GrpcConfigSchemas::Path("schema.json".to_owned()),
adapter: Some("arrow".to_owned()),
}
}
}

impl SchemaExample for KafkaConfig {
fn example() -> Self {
Self {
broker: "".to_owned(),
schema_registry_url: Some("".to_owned()),
}
}
}

impl SchemaExample for DeltaLakeConfig {
fn example() -> Self {
Self {
tables: vec![DeltaTable {
path: "".to_owned(),
name: "".to_owned(),
}],
}
}
}

impl SchemaExample for LocalStorage {
fn example() -> Self {
Self {
details: LocalDetails {
path: "path".to_owned(),
},
tables: vec![Table {
config: Some(TableConfig::CSV(CsvConfig {
path: "path/to/table".to_owned(),
extension: ".csv".to_owned(),
marker_extension: None,
})),
name: "table_name".to_owned(),
}],
}
}
}

impl SchemaExample for SnowflakeConfig {
fn example() -> Self {
Self {
server: "<account_name>.<region_id>.snowflakecomputing.com".to_owned(),
port: "443".to_owned(),
user: "bob".to_owned(),
password: "password".to_owned(),
database: "database".to_owned(),
schema: "schema".to_owned(),
warehouse: "warehouse".to_owned(),
driver: Some("SnowflakeDSIIDriver".to_owned()),
role: "role".to_owned(),
poll_interval_seconds: None,
}
}
}

impl SchemaExample for EthConfig {
fn example() -> Self {
let eth_filter = EthFilter {
from_block: Some(0),
to_block: None,
addresses: vec![],
topics: vec![],
};
Self {
provider: EthProviderConfig::Log(EthLogConfig {
wss_url: "".to_owned(),
filter: Some(eth_filter),
contracts: vec![],
}),
}
}
}
105 changes: 105 additions & 0 deletions json_schemas/dozer.json
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,16 @@
}
},
"DeltaLakeConfig": {
"examples": [
{
"tables": [
{
"name": "",
"path": ""
}
]
}
],
"type": "object",
"required": [
"tables"
Expand Down Expand Up @@ -712,6 +722,22 @@
"additionalProperties": false
},
"EthConfig": {
"examples": [
{
"provider": {
"Log": {
"contracts": [],
"filter": {
"addresses": [],
"from_block": 0,
"to_block": null,
"topics": []
},
"wss_url": ""
}
}
}
],
"type": "object",
"required": [
"provider"
Expand Down Expand Up @@ -962,6 +988,16 @@
"additionalProperties": false
},
"GrpcConfig": {
"examples": [
{
"adapter": "arrow",
"host": "localhost",
"port": 50051,
"schemas": {
"Path": "schema.json"
}
}
],
"type": "object",
"required": [
"schemas"
Expand Down Expand Up @@ -1021,6 +1057,12 @@
]
},
"KafkaConfig": {
"examples": [
{
"broker": "",
"schema_registry_url": ""
}
],
"type": "object",
"required": [
"broker"
Expand Down Expand Up @@ -1049,6 +1091,24 @@
}
},
"LocalStorage": {
"examples": [
{
"details": {
"path": "path"
},
"tables": [
{
"config": {
"CSV": {
"extension": ".csv",
"path": "path/to/table"
}
},
"name": "table_name"
}
]
}
],
"type": "object",
"required": [
"details",
Expand Down Expand Up @@ -1097,6 +1157,11 @@
"additionalProperties": false
},
"MongodbConfig": {
"examples": [
{
"connection_string": "mongodb://localhost:27017/db_name"
}
],
"type": "object",
"required": [
"connection_string"
Expand All @@ -1108,6 +1173,12 @@
}
},
"MySQLConfig": {
"examples": [
{
"server_id": 1,
"url": "mysql://root:1234@localhost:3306/db_name"
}
],
"type": "object",
"required": [
"url"
Expand Down Expand Up @@ -1393,6 +1464,27 @@
}
},
"S3Storage": {
"examples": [
{
"details": {
"access_key_id": "",
"bucket_name": "",
"region": "",
"secret_access_key": ""
},
"tables": [
{
"config": {
"CSV": {
"extension": ".csv",
"path": "path/to/file"
}
},
"name": "table_name"
}
]
}
],
"type": "object",
"required": [
"details",
Expand Down Expand Up @@ -1473,6 +1565,19 @@
"additionalProperties": false
},
"SnowflakeConfig": {
"examples": [
{
"database": "database",
"driver": "SnowflakeDSIIDriver",
"password": "password",
"port": "443",
"role": "role",
"schema": "schema",
"server": "<account_name>.<region_id>.snowflakecomputing.com",
"user": "bob",
"warehouse": "warehouse"
}
],
"type": "object",
"required": [
"database",
Expand Down

0 comments on commit b6ed058

Please sign in to comment.