Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: include errors if table schemas are unavailable #2143

Merged
merged 3 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions dozer-cli/src/live/state.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{sync::Arc, thread::JoinHandle};
use std::{collections::HashMap, sync::Arc, thread::JoinHandle};

use clap::Parser;

Expand Down Expand Up @@ -204,6 +204,7 @@ impl LiveState {
let contract = get_contract(&dozer)?;
Ok(SchemasResponse {
schemas: contract.get_endpoints_schemas(),
errors: HashMap::new(),
})
}
pub async fn get_source_schemas(
Expand All @@ -217,7 +218,10 @@ impl LiveState {
contract
.get_source_schemas(&connection_name)
.ok_or(LiveError::ConnectionNotFound(connection_name))
.map(|schemas| SchemasResponse { schemas })
.map(|schemas| SchemasResponse {
schemas,
errors: HashMap::new(),
})
}

pub async fn get_graph_schemas(&self) -> Result<SchemasResponse, LiveError> {
Expand All @@ -227,6 +231,7 @@ impl LiveState {

Ok(SchemasResponse {
schemas: contract.get_graph_schemas(),
errors: HashMap::new(),
})
}

Expand Down
4 changes: 3 additions & 1 deletion dozer-ingestion/src/connectors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::ingestion::Ingestor;
use dozer_types::log::debug;
use dozer_types::models::connection::Connection;
use dozer_types::models::connection::ConnectionConfig;
use dozer_types::models::ingestion_types::{default_grpc_adapter, EthProviderConfig};
use dozer_types::models::ingestion_types::default_grpc_adapter;
use dozer_types::node::OpIdentifier;
use dozer_types::tonic::async_trait;

Expand All @@ -45,6 +45,8 @@ pub mod snowflake;
use self::dozer::NestedDozerConnector;
#[cfg(feature = "ethereum")]
use self::ethereum::{EthLogConnector, EthTraceConnector};
#[cfg(feature = "ethereum")]
use dozer_types::models::ingestion_types::EthProviderConfig;

use self::grpc::connector::GrpcConnector;
use self::grpc::{ArrowAdapter, DefaultAdapter};
Expand Down
1 change: 1 addition & 0 deletions dozer-types/protos/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ message Value {
}
message SchemasResponse {
map<string, Schema> schemas = 1;
map<string, string> errors = 2;
}

message Schema {
Expand Down
5 changes: 2 additions & 3 deletions dozer-types/src/models/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,11 @@ impl SchemaExample for PostgresConfig {
fn example() -> Self {
Self {
user: Some("postgres".to_string()),
password: None,
password: Some("postgres".to_string()),
host: Some("localhost".to_string()),
port: Some(5432),
database: Some("postgres".to_string()),
sslmode: None,
connection_url: None,
..Default::default()
}
}
}
Expand Down
140 changes: 140 additions & 0 deletions dozer-types/src/models/ingestion_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use serde::{Deserialize, Serialize};

use crate::{
helper::{deserialize_duration_secs_f64, f64_schema, serialize_duration_secs_f64},
models::connection::SchemaExample,
node::OpIdentifier,
types::Operation,
};
Expand Down Expand Up @@ -46,6 +47,8 @@ pub struct EthFilter {
}

#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)]
#[schemars(example = "Self::example")]

pub struct GrpcConfig {
#[serde(skip_serializing_if = "Option::is_none")]
pub host: Option<String>,
Expand Down Expand Up @@ -78,6 +81,8 @@ pub enum GrpcConfigSchemas {
}

#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)]
#[schemars(example = "Self::example")]

pub struct EthConfig {
pub provider: EthProviderConfig,
}
Expand Down Expand Up @@ -150,6 +155,8 @@ pub struct EthContract {
}

#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)]
#[schemars(example = "Self::example")]

pub struct KafkaConfig {
pub broker: String,

Expand All @@ -171,6 +178,8 @@ impl KafkaConfig {
}

#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)]
#[schemars(example = "Self::example")]

pub struct SnowflakeConfig {
pub server: String,

Expand Down Expand Up @@ -300,6 +309,7 @@ pub struct S3Details {
}

#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)]
#[schemars(example = "Self::example")]
pub struct S3Storage {
pub details: S3Details,

Expand All @@ -316,13 +326,36 @@ impl S3Storage {
)
}
}
impl SchemaExample for S3Storage {
fn example() -> Self {
let s3_details = S3Details {
access_key_id: "".to_owned(),
secret_access_key: "".to_owned(),
region: "".to_owned(),
bucket_name: "".to_owned(),
};
Self {
details: s3_details,
tables: vec![Table {
config: TableConfig::CSV(CsvConfig {
path: "path/to/file".to_owned(),
extension: ".csv".to_owned(),
marker_extension: None,
}),
name: "table_name".to_owned(),
}],
}
}
}

#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)]
pub struct LocalDetails {
pub path: String,
}

#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)]
#[schemars(example = "Self::example")]

pub struct LocalStorage {
pub details: LocalDetails,

Expand All @@ -349,16 +382,22 @@ impl DeltaTable {
}

#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)]
#[schemars(example = "Self::example")]

pub struct DeltaLakeConfig {
pub tables: Vec<DeltaTable>,
}

#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)]
#[schemars(example = "Self::example")]

pub struct MongodbConfig {
pub connection_string: String,
}

#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)]
#[schemars(example = "Self::example")]

pub struct MySQLConfig {
pub url: String,

Expand Down Expand Up @@ -399,3 +438,104 @@ pub fn default_buffer_size() -> u32 {
pub fn default_snowflake_poll_interval() -> Duration {
Duration::from_secs(60)
}

impl SchemaExample for MongodbConfig {
fn example() -> Self {
Self {
connection_string: "mongodb://localhost:27017/db_name".to_owned(),
}
}
}

impl SchemaExample for MySQLConfig {
fn example() -> Self {
Self {
url: "mysql://root:1234@localhost:3306/db_name".to_owned(),
server_id: Some((1).to_owned()),
}
}
}

impl SchemaExample for GrpcConfig {
fn example() -> Self {
Self {
host: Some("localhost".to_owned()),
port: Some(50051),
schemas: GrpcConfigSchemas::Path("schema.json".to_owned()),
adapter: Some("arrow".to_owned()),
}
}
}

impl SchemaExample for KafkaConfig {
fn example() -> Self {
Self {
broker: "".to_owned(),
schema_registry_url: Some("".to_owned()),
}
}
}

impl SchemaExample for DeltaLakeConfig {
fn example() -> Self {
Self {
tables: vec![DeltaTable {
path: "".to_owned(),
name: "".to_owned(),
}],
}
}
}

impl SchemaExample for LocalStorage {
fn example() -> Self {
Self {
details: LocalDetails {
path: "path".to_owned(),
},
tables: vec![Table {
config: TableConfig::CSV(CsvConfig {
path: "path/to/table".to_owned(),
extension: ".csv".to_owned(),
marker_extension: None,
}),
name: "table_name".to_owned(),
}],
}
}
}

impl SchemaExample for SnowflakeConfig {
fn example() -> Self {
Self {
server: "<account_name>.<region_id>.snowflakecomputing.com".to_owned(),
port: "443".to_owned(),
user: "bob".to_owned(),
password: "password".to_owned(),
database: "database".to_owned(),
schema: "schema".to_owned(),
warehouse: "warehouse".to_owned(),
driver: Some("SnowflakeDSIIDriver".to_owned()),
role: "role".to_owned(),
poll_interval_seconds: None,
}
}
}

impl SchemaExample for EthConfig {
fn example() -> Self {
let eth_filter = EthFilter {
from_block: Some(0),
to_block: None,
addresses: vec![],
topics: vec![],
};
Self {
provider: EthProviderConfig::Log(EthLogConfig {
wss_url: "".to_owned(),
filter: Some(eth_filter),
contracts: vec![],
}),
}
}
}
Loading
Loading