From 67f1b8a3388ddded55e52d77402254caff0d2573 Mon Sep 17 00:00:00 2001 From: Bei Chu <914745487@qq.com> Date: Thu, 21 Sep 2023 23:11:59 +0800 Subject: [PATCH 1/8] fix: Add change in #2062 that was pushed after merge queue (#2067) Signed-off-by: Bei Chu <914745487@qq.com> --- dozer-types/protos/cloud_notification.proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dozer-types/protos/cloud_notification.proto b/dozer-types/protos/cloud_notification.proto index 51ee5c37d4..d5c01054a6 100644 --- a/dozer-types/protos/cloud_notification.proto +++ b/dozer-types/protos/cloud_notification.proto @@ -6,7 +6,7 @@ message Notification { string id = 1; google.protobuf.Timestamp created_at = 2; string namespace = 3; - optional string app_name = 4; + string app_name = 4; Level level = 5; bool is_read = 6; oneof kind { From 556ce6bc177680a2cffe0a3ad5b2625c89a51cd4 Mon Sep 17 00:00:00 2001 From: Bei Chu <914745487@qq.com> Date: Fri, 22 Sep 2023 00:59:52 +0800 Subject: [PATCH 2/8] fix: Remove a `unwrap` in grpc arrow adapter (#2068) --- dozer-ingestion/src/connectors/grpc/adapter/arrow.rs | 2 +- dozer-ingestion/src/errors.rs | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/dozer-ingestion/src/connectors/grpc/adapter/arrow.rs b/dozer-ingestion/src/connectors/grpc/adapter/arrow.rs index 5989a2f496..94c1ccdcc6 100644 --- a/dozer-ingestion/src/connectors/grpc/adapter/arrow.rs +++ b/dozer-ingestion/src/connectors/grpc/adapter/arrow.rs @@ -137,7 +137,7 @@ fn map_record_batch( ) -> Result, ConnectorError> { let mut buf = Bytes::from(req.records).reader(); // read stream back - let mut reader = StreamReader::try_new(&mut buf, None).unwrap(); + let mut reader = StreamReader::try_new(&mut buf, None)?; let mut records = Vec::new(); while let Some(Ok(batch)) = reader.next() { let b_recs = map_record_batch_to_dozer_records(batch, schema) diff --git a/dozer-ingestion/src/errors.rs b/dozer-ingestion/src/errors.rs index f732f37236..32b0425704 100644 --- a/dozer-ingestion/src/errors.rs +++ b/dozer-ingestion/src/errors.rs @@ -1,5 +1,6 @@ #![allow(clippy::enum_variant_names)] +use deltalake::arrow::error::ArrowError; use dozer_log::errors::{ReaderBuilderError, ReaderError}; use dozer_types::errors::internal::BoxedError; use dozer_types::errors::types::{DeserializationError, SerializationError, TypeError}; @@ -52,6 +53,8 @@ pub enum ConnectorError { #[error("Unsupported grpc adapter: {0} {1}")] UnsupportedGrpcAdapter(String, String), + #[error("Arrow error: {0}")] + Arrow(#[from] ArrowError), #[error("Table not found: {0}")] TableNotFound(String), From 4aebc23dcba4bb13dc84d54fd78f547d8937ee70 Mon Sep 17 00:00:00 2001 From: Bei Chu <914745487@qq.com> Date: Fri, 22 Sep 2023 12:20:56 +0800 Subject: [PATCH 3/8] fix: Don't create checkpoint if source is not restarable (#2070) --- dozer-core/src/epoch/manager.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/dozer-core/src/epoch/manager.rs b/dozer-core/src/epoch/manager.rs index 3b6ebfb129..ddd7d2af15 100644 --- a/dozer-core/src/epoch/manager.rs +++ b/dozer-core/src/epoch/manager.rs @@ -192,12 +192,16 @@ impl EpochManager { let instant = SystemTime::now(); let action = if *should_commit { let num_records = self.record_store().num_records(); - if num_records - state.next_record_index_to_persist + if source_states.values().all(|table_states| { + table_states + .values() + .all(|&state| state != TableState::NonRestartable) + }) && (num_records - state.next_record_index_to_persist >= self.options.max_num_records_before_persist || instant .duration_since(state.last_persisted_epoch_decision_instant) .unwrap_or(Duration::from_secs(0)) - >= Duration::from_secs(self.options.max_interval_before_persist_in_seconds) + >= Duration::from_secs(self.options.max_interval_before_persist_in_seconds)) { state.next_record_index_to_persist = num_records; state.last_persisted_epoch_decision_instant = instant; From 8aedacb56169a03a9a08759048af684a4ccfc3bd Mon Sep 17 00:00:00 2001 From: VG Date: Fri, 22 Sep 2023 13:40:33 +0800 Subject: [PATCH 4/8] feat: implment connections json schema (#2069) * chore: create json schema from connection types * chore: remove redudant code * chore: fix tests * chore: fix grpc tests --- Cargo.lock | 36 + dozer-cli/Cargo.toml | 3 + dozer-cli/build.rs | 22 + dozer-cli/src/simple/cloud_orchestrator.rs | 4 +- dozer-types/Cargo.toml | 1 + dozer-types/build.rs | 58 - dozer-types/protos/cloud.proto | 28 +- dozer-types/protos/cloud_types.proto | 185 -- dozer-types/src/ingestion_types.rs | 189 +- dozer-types/src/models/api_config.rs | 35 +- dozer-types/src/models/api_endpoint.rs | 65 +- dozer-types/src/models/api_security.rs | 4 +- dozer-types/src/models/app_config.rs | 25 +- dozer-types/src/models/cloud.rs | 27 +- dozer-types/src/models/config.rs | 24 +- dozer-types/src/models/connection.rs | 71 +- dozer-types/src/models/flags.rs | 33 +- dozer-types/src/models/json_schema_helper.rs | 54 + dozer-types/src/models/mod.rs | 2 + dozer-types/src/models/source.rs | 25 +- dozer-types/src/models/telemetry.rs | 26 +- dozer-types/src/models/udf_config.rs | 13 +- dozer-types/src/{tests.rs => tests/mod.rs} | 0 json_schemas/connections.json | 852 ++++++++ json_schemas/dozer.json | 1941 ++++++++++++++++++ 25 files changed, 3181 insertions(+), 542 deletions(-) create mode 100644 dozer-cli/build.rs create mode 100644 dozer-types/src/models/json_schema_helper.rs rename dozer-types/src/{tests.rs => tests/mod.rs} (100%) create mode 100644 json_schemas/connections.json create mode 100644 json_schemas/dozer.json diff --git a/Cargo.lock b/Cargo.lock index b96e81270b..5944557aa0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2929,6 +2929,7 @@ dependencies = [ "prost-types 0.12.0", "pyo3", "rust_decimal", + "schemars", "serde", "serde_bytes", "serde_json", @@ -7235,6 +7236,30 @@ dependencies = [ "serde_json", ] +[[package]] +name = "schemars" +version = "0.8.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f7b0ce13155372a76ee2e1c5ffba1fe61ede73fbea5630d61eee6fac4929c0c" +dependencies = [ + "dyn-clone", + "schemars_derive", + "serde", + "serde_json", +] + +[[package]] +name = "schemars_derive" +version = "0.8.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e85e2a16b12bdb763244c69ab79363d71db2b4b918a2def53f80b02e0574b13c" +dependencies = [ + "proc-macro2", + "quote", + "serde_derive_internals", + "syn 1.0.109", +] + [[package]] name = "scopeguard" version = "1.1.0" @@ -7386,6 +7411,17 @@ dependencies = [ "syn 2.0.29", ] +[[package]] +name = "serde_derive_internals" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85bf8229e7920a9f636479437026331ce11aa132b4dde37d121944a44d6e5f3c" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "serde_json" version = "1.0.107" diff --git a/dozer-cli/Cargo.toml b/dozer-cli/Cargo.toml index 3215ad6ea5..c2f780eb31 100644 --- a/dozer-cli/Cargo.toml +++ b/dozer-cli/Cargo.toml @@ -49,6 +49,9 @@ tempfile = "3.8" actix-files = "0.6.2" prometheus-parse = "0.2.4" +[build-dependencies] +dozer-types = { path = "../dozer-types" } + [[bin]] edition = "2021" name = "dozer" diff --git a/dozer-cli/build.rs b/dozer-cli/build.rs new file mode 100644 index 0000000000..a0e4b4202c --- /dev/null +++ b/dozer-cli/build.rs @@ -0,0 +1,22 @@ +use std::fs::File; +use std::io::Write; +use std::path::Path; + +fn main() { + let schema_path = Path::new("../json_schemas"); + // Define the path to the file we want to create or overwrite + let connection_path = schema_path.join("connections.json"); + let dozer_path = schema_path.join("dozer.json"); + + let mut file = File::create(connection_path).expect("Failed to create connections.json"); + let schemas = dozer_types::models::get_connection_schemas().unwrap(); + write!(file, "{}", schemas).expect("Unable to write file"); + + let mut dozer_schema_file = File::create(dozer_path).expect("Failed to create dozer.json"); + let schema = dozer_types::models::get_dozer_schema().unwrap(); + write!(dozer_schema_file, "{}", schema).expect("Unable to write file"); + + // Print a message to indicate the file has been written + println!("cargo:rerun-if-changed=build.rs"); + println!("Written to {:?}", schema_path.display()); +} diff --git a/dozer-cli/src/simple/cloud_orchestrator.rs b/dozer-cli/src/simple/cloud_orchestrator.rs index d2f71a6236..4eff819ab9 100644 --- a/dozer-cli/src/simple/cloud_orchestrator.rs +++ b/dozer-cli/src/simple/cloud_orchestrator.rs @@ -193,9 +193,7 @@ impl CloudOrchestrator for SimpleOrchestrator { let mut table = table!(); for app in response.apps { - if let Some(app_data) = app.app { - table.add_row(row![app.app_id, app_data.convert_to_table()]); - } + table.add_row(row![app.app_id, app.app_name]); } table.printstd(); diff --git a/dozer-types/Cargo.toml b/dozer-types/Cargo.toml index 223b4e7a81..6be2ea21af 100644 --- a/dozer-types/Cargo.toml +++ b/dozer-types/Cargo.toml @@ -37,6 +37,7 @@ tokio-postgres = { version = "0.7.7", features = [ ] } serde_bytes = "0.11.12" arbitrary = { version = "1", features = ["derive"], optional = true } +schemars = "0.8.15" [build-dependencies] tonic-build = "0.10.0" diff --git a/dozer-types/build.rs b/dozer-types/build.rs index 99d5fa6416..2931e33163 100644 --- a/dozer-types/build.rs +++ b/dozer-types/build.rs @@ -49,64 +49,6 @@ fn main() -> Result<(), Box> { // Cloud Service & Types tonic_build::configure() .protoc_arg("--experimental_allow_proto3_optional") - .extern_path( - ".dozer.cloud.Endpoint", - "crate::models::api_endpoint::ApiEndpoint", - ) - .extern_path(".dozer.cloud.Source", "crate::models::source::Source") - .extern_path(".dozer.cloud.AppConfig", "crate::models::config::Config") - .extern_path( - ".dozer.cloud.ConnectionConfig", - "crate::models::connection::ConnectionConfig", - ) - .extern_path( - ".dozer.cloud.Connection", - "crate::models::connection::Connection", - ) - .extern_path( - ".dozer.cloud.EthContract", - "crate::ingestion_types::EthContract", - ) - .extern_path( - ".dozer.cloud.EthereumFilter", - "crate::ingestion_types::EthereumFilter", - ) - .extern_path( - ".dozer.cloud.DeltaLakeConfig", - "crate::ingestion_types::DeltaLakeConfig", - ) - .extern_path( - ".dozer.cloud.LocalStorage", - "crate::ingestion_types::LocalStorage", - ) - .extern_path( - ".dozer.cloud.S3Storage", - "crate::ingestion_types::S3Storage", - ) - .extern_path( - ".dozer.cloud.KafkaConfig", - "crate::ingestion_types::KafkaConfig", - ) - .extern_path( - ".dozer.cloud.SnowflakeConfig", - "crate::ingestion_types::SnowflakeConfig", - ) - .extern_path( - ".dozer.cloud::grpc_config::Schemas", - "crate::ingestion_types::GrpcConfigSchemas", - ) - .extern_path( - ".dozer.cloud.GrpcConfig", - "crate::ingestion_types::GrpcConfig", - ) - .extern_path( - ".dozer.cloud.EthereumConfig", - "crate::ingestion_types::EthConfig", - ) - .extern_path( - ".dozer.cloud.PostgresConfig", - "crate::models::connection::PostgresConfig", - ) .file_descriptor_set_path(out_dir.join("cloud.bin")) .compile(&["protos/cloud.proto"], &["protos"]) .unwrap(); diff --git a/dozer-types/protos/cloud.proto b/dozer-types/protos/cloud.proto index 1abb0ad2a1..3026e204d9 100644 --- a/dozer-types/protos/cloud.proto +++ b/dozer-types/protos/cloud.proto @@ -6,7 +6,7 @@ import "google/protobuf/timestamp.proto"; import "google/protobuf/empty.proto"; service DozerCloud { - + rpc validate_connection(ConnectionRequest) returns (ValidateConnectionResponse); rpc create_connection(ConnectionRequest) returns (ConnectionResponse); @@ -23,11 +23,11 @@ service DozerCloud { rpc deploy_application(DeployAppRequest) returns (DeployAppResponse); rpc update_application(UpdateAppRequest) returns (AppResponse); rpc delete_application(DeleteAppRequest) returns (DeleteAppResponse); - rpc get_application(GetAppRequest) returns (AppResponse); + rpc get_application(GetAppRequest) returns (AppResponse); rpc stop_dozer(StopRequest) returns (StopResponse); rpc get_status(GetStatusRequest) returns (GetStatusResponse); rpc list_deployments(ListDeploymentRequest) returns (ListDeploymentResponse); - rpc list_versions(ListVersionsRequest) returns (ListVersionsResponse); + rpc list_versions(ListVersionsRequest) returns (ListVersionsResponse); rpc upsert_version(UpsertVersionRequest) returns (UpsertVersionResponse); rpc set_current_version(SetCurrentVersionRequest) returns (SetCurrentVersionResponse); rpc list_files(ListFilesRequest) returns (ListFilesResponse); @@ -92,7 +92,7 @@ message DeployAppResponse { message AppResponse { string app_id = 1; - AppConfig app = 2; + string app_name = 2; repeated DeploymentInfo deployments = 4; google.protobuf.Timestamp created_at = 5; google.protobuf.Timestamp updated_at = 6; @@ -151,7 +151,7 @@ message DeploymentResource { // api, app string typ = 2; string created_at = 3; - + optional int32 desired = 4; optional int32 available = 5; @@ -224,17 +224,22 @@ message SetCurrentVersionRequest { message SetCurrentVersionResponse {} -message ConnectionRequest { Connection connection = 1; } +message ConnectionRequest { + string yaml_content = 2; +} message GetConnectionRequest { string connection_id = 1; } message ValidateConnectionResponse { bool success = 1; } message ConnectionResponse { string id = 1; - Connection connection = 2; + string name = 2; + string yaml_content = 3; +} +message Connection { + string name = 1; string yaml_content = 3; } - message GetTablesRequest { string connection_id = 2; } message GetTablesResponse { @@ -251,8 +256,9 @@ message GetAllConnectionResponse { Pagination pagination = 2; } message UpdateConnectionRequest { - Connection connection = 1; - string connection_id = 3; + string id = 1; + string name = 2; + string yaml_content = 3; } message ListFilesRequest { string app_id = 1; @@ -345,4 +351,4 @@ message ListSecretsRequest { message ListSecretsResponse { repeated string secrets = 1; -} \ No newline at end of file +} diff --git a/dozer-types/protos/cloud_types.proto b/dozer-types/protos/cloud_types.proto index d88fc1bbce..435705d386 100644 --- a/dozer-types/protos/cloud_types.proto +++ b/dozer-types/protos/cloud_types.proto @@ -1,142 +1,6 @@ syntax = "proto3"; package dozer.cloud; -message AppConfig { - string app_name = 2; - string home_dir = 3; - string cache_dir = 4; - repeated Connection connections = 5; - repeated Source sources = 6; - repeated Endpoint endpoints = 7; - ApiConfig api = 8; - optional string sql = 9; - Flags flags = 10; - optional uint64 cache_max_map_size = 11; -} -message Flags { - bool dynamic = 1; - bool grpc_web = 2; - bool push_events = 3; - bool authenticate_server_reflection = 4; - EnableProbabilisticOptimizations enable_probabilistic_optimizations = 5; -} - -message EnableProbabilisticOptimizations { - bool in_sets = 1; - bool in_joins = 2; - bool in_aggregations = 3; -} - -message Connection { - oneof config { - PostgresConfig Postgres = 1; - EthereumConfig Ethereum = 2; - GrpcConfig Grpc = 3; - SnowflakeConfig Snowflake = 4; - KafkaConfig Kafka = 5; - S3Storage S3Storage = 6; - LocalStorage LocalStorage = 7; - DeltaLakeConfig DeltaLake = 8; - } - string name = 9; -} - -message ConnectionConfig { - oneof config { - PostgresConfig Postgres = 1; - EthereumConfig Ethereum = 2; - GrpcConfig Grpc = 3; - SnowflakeConfig Snowflake = 4; - KafkaConfig Kafka = 5; - S3Storage S3Storage = 6; - LocalStorage LocalStorage = 7; - DeltaLakeConfig DeltaLake = 8; - } -} -message DeltaLakeConfig { - repeated Table tables = 1; -} -message S3Storage { - S3Details details = 1; - repeated Table tables = 2; -} - -message S3Details { - string access_key_id = 1; - string secret_access_key = 2; - string region = 3; - string bucket_name = 4; -} - -message LocalStorage { - LocalDetails details = 1; - repeated Table tables = 2; -} - -message LocalDetails { - string path = 1; -} - -message Table { - string name = 1; - string prefix = 2; - string file_type = 3; - string extension = 4; -} - -message SnowflakeConfig { - string server = 1; - string port = 2; - string user = 3; - string password = 4; - string database = 5; - string schema = 6; - string warehouse = 7; - optional string driver = 8; -} -message PostgresConfig { - string user = 1; - string password = 2; - string host = 3; - uint32 port = 4; - string database = 5; -} - -message GrpcConfig { - string host = 1; - uint32 port = 2; - oneof schemas { - string Inline = 3; - string Path = 4; - } - string adapter = 5; -} - -message KafkaConfig { - string broker = 1; - string topic = 2; - optional string schema_registry_url = 3; -} -message EventsConfig { string database = 1; } - -message EthereumConfig { - EthereumFilter filter = 1; - string wss_url = 2; - string name = 3; - repeated EthContract contracts = 4; -} - -message EthereumFilter { - optional uint64 from_block = 1; - repeated string addresses = 2; - repeated string topics = 3; -} -message EthContract { - string name = 1; - string address = 2; - string abi = 3; -} - message TableInfo { string table_name = 1; repeated ColumnInfo columns = 2; @@ -146,55 +10,6 @@ message ColumnInfo { bool is_nullable = 2; } -message Endpoint { - string name = 1; - string table_name = 2; - string path = 3; - ApiIndex index = 4; -} - -message ApiIndex { repeated string primary_key = 1; } - -message Source { - string name = 1; - string table_name = 2; - repeated string columns = 3; - string connection = 4; - optional string schema = 5; - RefreshConfig refresh_config = 7; -} - -message ApiConfig { - oneof ApiSecurity { string Jwt = 1; } - RestApiOptions rest = 2; - GrpcApiOptions grpc = 3; - GrpcApiOptions app_grpc = 4; -} - -message RestApiOptions { - uint32 port = 1; - string url = 2; - bool cors = 3; -} -message GrpcApiOptions { - uint32 port = 1; - string url = 2; - bool cors = 3; - bool web = 4; -} -message RefreshConfig { - oneof config { - RefreshConfigHour hour = 1; - RefreshConfigDay day = 2; - RefreshConfigCronExpression cron_expression = 3; - RefreshConfigRealTime realtime = 4; - } -} -message RefreshConfigHour { uint32 minute = 1; } -message RefreshConfigDay { string time = 1; } -message RefreshConfigCronExpression { string expression = 1; } -message RefreshConfigRealTime {} - message File { string name = 1; string content = 2; diff --git a/dozer-types/src/ingestion_types.rs b/dozer-types/src/ingestion_types.rs index b88c21ce9b..cdd6f08b19 100644 --- a/dozer-types/src/ingestion_types.rs +++ b/dozer-types/src/ingestion_types.rs @@ -1,4 +1,5 @@ use prettytable::Table as PrettyTable; +use schemars::JsonSchema; use std::fmt::Debug; use serde::{Deserialize, Serialize}; @@ -24,35 +25,43 @@ pub enum IngestionMessage { SnapshottingDone, } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema, Default)] pub struct EthFilter { // Starting block - #[prost(uint64, optional, tag = "1")] pub from_block: Option, - #[prost(uint64, optional, tag = "2")] + pub to_block: Option, - #[prost(string, repeated, tag = "3")] + #[serde(default)] pub addresses: Vec, - #[prost(string, repeated, tag = "4")] + #[serde(default)] pub topics: Vec, } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] pub struct GrpcConfig { - #[prost(string, tag = "1", default = "0.0.0.0")] #[serde(default = "default_ingest_host")] pub host: String, - #[prost(uint32, tag = "2", default = "8085")] + #[serde(default = "default_ingest_port")] pub port: u32, - #[prost(oneof = "GrpcConfigSchemas", tags = "3,4")] + pub schemas: Option, - #[prost(string, tag = "5", default = "default")] + #[serde(default = "default_grpc_adapter")] pub adapter: String, } +impl Default for GrpcConfig { + fn default() -> Self { + Self { + host: default_ingest_host(), + port: default_ingest_port(), + schemas: None, + adapter: default_grpc_adapter(), + } + } +} fn default_grpc_adapter() -> String { "default".to_owned() @@ -66,49 +75,43 @@ fn default_ingest_port() -> u32 { 8085 } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Oneof, Hash)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] pub enum GrpcConfigSchemas { - #[prost(string, tag = "3")] Inline(String), - #[prost(string, tag = "4")] + Path(String), } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema, Default)] pub struct EthConfig { - #[prost(oneof = "EthProviderConfig", tags = "2,3")] pub provider: Option, } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Oneof, Hash)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] pub enum EthProviderConfig { - #[prost(message, tag = "2")] Log(EthLogConfig), - #[prost(message, tag = "3")] + Trace(EthTraceConfig), } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema, Default)] pub struct EthLogConfig { - #[prost(string, tag = "1")] pub wss_url: String, - #[prost(message, optional, tag = "2")] + pub filter: Option, - #[prost(message, repeated, tag = "3")] + #[serde(default)] pub contracts: Vec, } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema, Default)] pub struct EthTraceConfig { - #[prost(string, tag = "1")] pub https_url: String, // Starting block - #[prost(uint64, tag = "2")] pub from_block: u64, - #[prost(uint64, optional, tag = "3")] + pub to_block: Option, - #[prost(uint64, tag = "4", default = "3")] + #[serde(default = "default_batch_size")] pub batch_size: u64, } @@ -143,21 +146,19 @@ impl EthConfig { } } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] pub struct EthContract { - #[prost(string, tag = "1")] pub name: String, - #[prost(string, tag = "2")] + pub address: String, - #[prost(string, tag = "3")] + pub abi: String, } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] pub struct KafkaConfig { - #[prost(string, tag = "1")] pub broker: String, - #[prost(string, optional, tag = "3")] + pub schema_registry_url: Option, } @@ -175,25 +176,24 @@ impl KafkaConfig { } } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] pub struct SnowflakeConfig { - #[prost(string, tag = "1")] pub server: String, - #[prost(string, tag = "2")] + pub port: String, - #[prost(string, tag = "3")] + pub user: String, - #[prost(string, tag = "4")] + pub password: String, - #[prost(string, tag = "5")] + pub database: String, - #[prost(string, tag = "6")] + pub schema: String, - #[prost(string, tag = "7")] + pub warehouse: String, - #[prost(string, optional, tag = "8")] + pub driver: Option, - #[prost(string, default = "PUBLIC", tag = "9")] + pub role: String, } @@ -213,15 +213,14 @@ impl SnowflakeConfig { } } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] pub struct DataFusionConfig { - #[prost(string, tag = "1")] pub access_key_id: String, - #[prost(string, tag = "2")] + pub secret_access_key: String, - #[prost(string, tag = "3")] + pub region: String, - #[prost(string, tag = "4")] + pub bucket_name: String, } @@ -236,87 +235,80 @@ impl DataFusionConfig { } } -// #[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)] +// #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] // pub struct Table { -// #[prost(string, tag = "1")] +// // pub name: String, -// #[prost(string, tag = "2")] +// // pub prefix: String, -// #[prost(string, tag = "3")] +// // pub file_type: String, -// #[prost(string, tag = "4")] +// // pub extension: String, // } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] pub struct Table { - #[prost(oneof = "TableConfig", tags = "1,2,3")] pub config: Option, - #[prost(string, tag = "4")] + pub name: String, } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Oneof, Hash)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] pub enum TableConfig { - #[prost(message, tag = "1")] CSV(CsvConfig), - #[prost(message, tag = "2")] + Delta(DeltaConfig), - #[prost(message, tag = "3")] + Parquet(ParquetConfig), } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] pub struct CsvConfig { - #[prost(string, tag = "1")] pub path: String, - #[prost(string, tag = "2")] + pub extension: String, - #[prost(bool, tag = "3")] + #[serde(default = "default_false")] pub marker_file: bool, - #[prost(string, tag = "4")] + #[serde(default = "default_marker")] pub marker_extension: String, } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] pub struct DeltaConfig { - #[prost(string, tag = "1")] pub path: String, } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] pub struct ParquetConfig { - #[prost(string, tag = "1")] pub path: String, - #[prost(string, tag = "2")] + pub extension: String, - #[prost(bool, tag = "3")] + #[serde(default = "default_false")] pub marker_file: bool, - #[prost(string, tag = "4")] + #[serde(default = "default_marker")] pub marker_extension: String, } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] pub struct S3Details { - #[prost(string, tag = "1")] pub access_key_id: String, - #[prost(string, tag = "2")] + pub secret_access_key: String, - #[prost(string, tag = "3")] + pub region: String, - #[prost(string, tag = "4")] + pub bucket_name: String, } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] pub struct S3Storage { - #[prost(message, optional, tag = "1")] pub details: Option, - #[prost(message, repeated, tag = "2")] + pub tables: Vec, } @@ -336,17 +328,15 @@ impl S3Storage { } } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] pub struct LocalDetails { - #[prost(string, tag = "1")] pub path: String, } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] pub struct LocalStorage { - #[prost(message, optional, tag = "1")] pub details: Option, - #[prost(message, repeated, tag = "2")] + pub tables: Vec
, } @@ -358,11 +348,10 @@ impl LocalStorage { } } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] pub struct DeltaTable { - #[prost(string, tag = "1")] pub path: String, - #[prost(string, tag = "2")] + pub name: String, } @@ -372,23 +361,20 @@ impl DeltaTable { } } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] pub struct DeltaLakeConfig { - #[prost(message, repeated, tag = "1")] pub tables: Vec, } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] pub struct MongodbConfig { - #[prost(string, tag = "1")] pub connection_string: String, } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] pub struct MySQLConfig { - #[prost(string, tag = "1")] pub url: String, - #[prost(uint32, optional, tag = "2")] + pub server_id: Option, } @@ -400,24 +386,21 @@ fn default_marker() -> String { String::from(".marker") } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] pub struct NestedDozerConfig { - #[prost(message, tag = "1")] pub grpc: Option, - #[prost(message, tag = "2")] pub log_options: Option, } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] pub struct NestedDozerLogOptions { - #[prost(uint32, tag = "2")] #[serde(default = "default_log_batch_size")] pub batch_size: u32, - #[prost(uint32, tag = "3")] + #[serde(default = "default_timeout")] pub timeout_in_millis: u32, - #[prost(uint32, tag = "4")] + #[serde(default = "default_buffer_size")] pub buffer_size: u32, } diff --git a/dozer-types/src/models/api_config.rs b/dozer-types/src/models/api_config.rs index e02b8413d6..6a56f48177 100644 --- a/dozer-types/src/models/api_config.rs +++ b/dozer-types/src/models/api_config.rs @@ -1,69 +1,64 @@ use crate::constants::DEFAULT_DEFAULT_MAX_NUM_RECORDS; use super::api_security::ApiSecurity; +use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, prost::Message)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, JsonSchema, Default)] pub struct ApiConfig { - #[prost(oneof = "ApiSecurity", tags = "1")] #[serde(skip_serializing_if = "Option::is_none")] /// The security configuration for the API; Default: None pub api_security: Option, - #[prost(message, tag = "2")] + #[serde(skip_serializing_if = "Option::is_none")] pub rest: Option, - #[prost(message, tag = "3")] + #[serde(skip_serializing_if = "Option::is_none")] pub grpc: Option, - #[prost(message, tag = "4")] #[serde(skip_serializing_if = "Option::is_none")] pub app_grpc: Option, - #[prost(uint32, tag = "5")] #[serde(default = "default_default_max_num_records")] // max records to be returned from the endpoints pub default_max_num_records: u32, } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, prost::Message)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, JsonSchema, Default)] pub struct RestApiOptions { - #[prost(uint32, tag = "1")] #[serde(default = "default_rest_port")] pub port: u32, - #[prost(string, tag = "2")] + #[serde(default = "default_host")] pub host: String, - #[prost(bool, tag = "3")] + #[serde(default = "default_cors")] pub cors: bool, - #[prost(bool, tag = "4")] + #[serde(default = "default_enabled")] pub enabled: bool, } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, prost::Message)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, JsonSchema, Default)] pub struct GrpcApiOptions { - #[prost(uint32, tag = "1")] #[serde(default = "default_grpc_port")] pub port: u32, - #[prost(string, tag = "2")] + #[serde(default = "default_host")] pub host: String, - #[prost(bool, tag = "3")] + #[serde(default = "default_cors")] pub cors: bool, - #[prost(bool, tag = "4")] + #[serde(default = "default_enable_web")] pub web: bool, - #[prost(bool, tag = "5")] + #[serde(default = "default_enabled")] pub enabled: bool, } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, prost::Message, Hash)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema, Default)] pub struct AppGrpcOptions { - #[prost(uint32)] #[serde(default = "default_app_grpc_port")] pub port: u32, - #[prost(string)] + #[serde(default = "default_app_grpc_host")] pub host: String, } diff --git a/dozer-types/src/models/api_endpoint.rs b/dozer-types/src/models/api_endpoint.rs index cc186696e2..b127ab88d4 100644 --- a/dozer-types/src/models/api_endpoint.rs +++ b/dozer-types/src/models/api_endpoint.rs @@ -1,58 +1,52 @@ +use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message)] +#[derive(Debug, Serialize, JsonSchema, Default, Deserialize, Eq, PartialEq, Clone)] pub struct ApiIndex { - #[prost(string, repeated, tag = "1")] #[serde(default, skip_serializing_if = "Vec::is_empty")] pub primary_key: Vec, - #[prost(message, tag = "2")] + #[serde(default, skip_serializing_if = "Option::is_none")] pub secondary: Option, } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message)] +#[derive(Debug, Serialize, JsonSchema, Default, Deserialize, Eq, PartialEq, Clone)] pub struct SecondaryIndexConfig { - #[prost(string, repeated, tag = "1")] #[serde(default, skip_serializing_if = "Vec::is_empty")] pub skip_default: Vec, - #[prost(message, repeated, tag = "2")] + #[serde(default, skip_serializing_if = "Vec::is_empty")] pub create: Vec, } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message)] +#[derive(Debug, Serialize, JsonSchema, Default, Deserialize, Eq, PartialEq, Clone)] pub struct CreateSecondaryIndex { - #[prost(oneof = "SecondaryIndex", tags = "1,2")] pub index: Option, } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Oneof)] +#[derive(Debug, Serialize, JsonSchema, Deserialize, Eq, PartialEq, Clone)] pub enum SecondaryIndex { - #[prost(message, tag = "1")] SortedInverted(SortedInverted), - #[prost(message, tag = "2")] + FullText(FullText), } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message)] +#[derive(Debug, Serialize, JsonSchema, Default, Deserialize, Eq, PartialEq, Clone)] pub struct SortedInverted { - #[prost(string, repeated, tag = "1")] pub fields: Vec, } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message)] +#[derive(Debug, Serialize, JsonSchema, Default, Deserialize, Eq, PartialEq, Clone)] pub struct FullText { - #[prost(string, tag = "1")] pub field: String, } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, Copy, ::prost::Oneof)] +#[derive(Debug, Serialize, JsonSchema, Deserialize, Eq, PartialEq, Clone, Copy)] pub enum OnInsertResolutionTypes { - #[prost(message, tag = "1")] Nothing(()), - #[prost(message, tag = "2")] + Update(()), - #[prost(message, tag = "3")] + Panic(()), } @@ -62,13 +56,12 @@ impl Default for OnInsertResolutionTypes { } } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, Copy, ::prost::Oneof)] +#[derive(Debug, Serialize, JsonSchema, Deserialize, Eq, PartialEq, Clone, Copy)] pub enum OnUpdateResolutionTypes { - #[prost(message, tag = "1")] Nothing(()), - #[prost(message, tag = "2")] + Upsert(()), - #[prost(message, tag = "3")] + Panic(()), } @@ -78,11 +71,10 @@ impl Default for OnUpdateResolutionTypes { } } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, Copy, ::prost::Oneof)] +#[derive(Debug, Serialize, JsonSchema, Deserialize, Eq, PartialEq, Clone, Copy)] pub enum OnDeleteResolutionTypes { - #[prost(message, tag = "1")] Nothing(()), - #[prost(message, tag = "2")] + Panic(()), } @@ -92,56 +84,45 @@ impl Default for OnDeleteResolutionTypes { } } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, Copy, ::prost::Message)] +#[derive(Debug, Serialize, JsonSchema, Default, Deserialize, Eq, PartialEq, Clone, Copy)] pub struct ConflictResolution { - #[prost(oneof = "OnInsertResolutionTypes", tags = "1,2,3")] pub on_insert: Option, - #[prost(oneof = "OnUpdateResolutionTypes", tags = "4,5,6")] pub on_update: Option, - #[prost(oneof = "OnDeleteResolutionTypes", tags = "7,8")] pub on_delete: Option, } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message)] +#[derive(Debug, Serialize, JsonSchema, Default, Deserialize, Eq, PartialEq, Clone)] pub struct LogReaderOptions { - #[prost(optional, uint32)] #[serde(skip_serializing_if = "Option::is_none")] pub batch_size: Option, - #[prost(optional, uint32)] #[serde(skip_serializing_if = "Option::is_none")] pub timeout_in_millis: Option, - #[prost(optional, uint32)] #[serde(skip_serializing_if = "Option::is_none")] pub buffer_size: Option, } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message)] +#[derive(Debug, Serialize, JsonSchema, Default, Deserialize, Eq, PartialEq, Clone)] pub struct ApiEndpoint { - #[prost(string)] pub name: String, - #[prost(string)] + /// name of the table in source database; Type: String pub table_name: String, - #[prost(string)] /// path of endpoint - e.g: /stocks pub path: String, - #[prost(message)] + pub index: Option, - #[prost(message)] #[serde(skip_serializing_if = "Option::is_none")] pub conflict_resolution: Option, - #[prost(optional, uint32)] #[serde(skip_serializing_if = "Option::is_none")] pub version: Option, - #[prost(optional, message)] #[serde(skip_serializing_if = "Option::is_none")] pub log_reader_options: Option, } diff --git a/dozer-types/src/models/api_security.rs b/dozer-types/src/models/api_security.rs index 20d996b069..d0026a4924 100644 --- a/dozer-types/src/models/api_security.rs +++ b/dozer-types/src/models/api_security.rs @@ -1,8 +1,8 @@ +use schemars::JsonSchema; use serde::{Deserialize, Serialize}; #[doc = r"The security model option for the API"] -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Oneof, Hash)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] pub enum ApiSecurity { /// Initialize with a JWT_SECRET - #[prost(string, tag = "1")] Jwt(String), } diff --git a/dozer-types/src/models/app_config.rs b/dozer-types/src/models/app_config.rs index 710a937fb1..ce4fc992ac 100644 --- a/dozer-types/src/models/app_config.rs +++ b/dozer-types/src/models/app_config.rs @@ -1,60 +1,55 @@ +use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, prost::Message)] +#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone, PartialEq, Eq, Default)] pub struct AppConfig { /// Pipeline buffer size - #[prost(uint32, optional)] + #[serde(skip_serializing_if = "Option::is_none")] pub app_buffer_size: Option, /// Commit size - #[prost(uint32, optional)] + #[serde(skip_serializing_if = "Option::is_none")] pub commit_size: Option, /// Commit timeout - #[prost(uint64, optional)] + #[serde(skip_serializing_if = "Option::is_none")] pub commit_timeout: Option, - #[prost(uint32, optional)] #[serde(skip_serializing_if = "Option::is_none")] pub persist_queue_capacity: Option, /// The storage to use for the log. - #[prost(oneof = "DataStorage", tags = "7,8")] + #[serde(skip_serializing_if = "Option::is_none")] pub data_storage: Option, - #[prost(uint32, optional)] /// How many errors we can tolerate before bringing down the app. #[serde(skip_serializing_if = "Option::is_none")] pub error_threshold: Option, - #[prost(uint64, optional)] #[serde(skip_serializing_if = "Option::is_none")] /// The maximum unpersisted number of records in the processor record store. A checkpoint will be created when this number is reached. pub max_num_records_before_persist: Option, - #[prost(uint64, optional)] #[serde(skip_serializing_if = "Option::is_none")] /// The maximum time in seconds before a new checkpoint is created. If there're no new records, no checkpoint will be created. pub max_interval_before_persist_in_seconds: Option, } -#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, prost::Oneof)] +#[derive(Debug, JsonSchema, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum DataStorage { - #[prost(message, tag = "7")] Local(()), - #[prost(message, tag = "8")] + S3(S3Storage), } -#[derive(Clone, PartialEq, Eq, Hash, Serialize, Deserialize, prost::Message)] +#[derive(Debug, JsonSchema, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct S3Storage { - #[prost(string, tag = "1")] pub region: String, - #[prost(string, tag = "2")] + pub bucket_name: String, } diff --git a/dozer-types/src/models/cloud.rs b/dozer-types/src/models/cloud.rs index 01f048e19f..bbd8193df9 100644 --- a/dozer-types/src/models/cloud.rs +++ b/dozer-types/src/models/cloud.rs @@ -1,53 +1,50 @@ +use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, prost::Message)] +#[derive(Debug, JsonSchema, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] pub struct Cloud { - #[prost(oneof = "UpdateCurrentVersionStrategy", tags = "1,2")] #[serde(skip_serializing_if = "Option::is_none")] pub update_current_version_strategy: Option, - #[prost(optional, string, tag = "3")] + #[serde(skip_serializing_if = "Option::is_none")] pub app_id: Option, - #[prost(optional, string, tag = "4")] + #[serde(skip_serializing_if = "Option::is_none")] pub profile: Option, - #[prost(message, optional, tag = "5")] + #[serde(skip_serializing_if = "Option::is_none")] pub app: Option, - #[prost(message, optional, tag = "6")] + #[serde(skip_serializing_if = "Option::is_none")] pub api: Option, } -#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, prost::Message)] +#[derive(Debug, JsonSchema, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] pub struct AppInstance { - #[prost(optional, string, tag = "1")] #[serde(skip_serializing_if = "Option::is_none")] pub instance_type: Option, } -#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, prost::Message)] +#[derive(Debug, JsonSchema, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] pub struct ApiInstance { - #[prost(optional, uint32, tag = "1")] #[serde( default = "default_num_api_instances", skip_serializing_if = "Option::is_none" )] pub instances_count: Option, - #[prost(optional, string, tag = "2")] + #[serde(skip_serializing_if = "Option::is_none")] pub instance_type: Option, - #[prost(optional, uint32, tag = "3")] + /// The size of the volume in GB #[serde(skip_serializing_if = "Option::is_none")] pub volume_size: Option, } -#[derive(Clone, Copy, PartialEq, Eq, Serialize, Deserialize, prost::Oneof)] +#[derive(Debug, JsonSchema, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] pub enum UpdateCurrentVersionStrategy { - #[prost(message, tag = "1")] OnCreate(()), - #[prost(message, tag = "2")] + Manual(()), } diff --git a/dozer-types/src/models/config.rs b/dozer-types/src/models/config.rs index 719967be98..5c10aea594 100644 --- a/dozer-types/src/models/config.rs +++ b/dozer-types/src/models/config.rs @@ -7,24 +7,21 @@ use super::{ use crate::constants::DEFAULT_HOME_DIR; use crate::models::udf_config::UdfConfig; use prettytable::Table as PrettyTable; +use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, prost::Message)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default, JsonSchema)] /// The configuration for the app pub struct Config { - #[prost(uint32, tag = "1")] pub version: u32, - #[prost(string, tag = "2")] /// name of the app pub app_name: String, - #[prost(string, tag = "3")] #[serde(skip_serializing_if = "String::is_empty", default = "default_home_dir")] ///directory for all process; Default: ./.dozer pub home_dir: String, - #[prost(string, tag = "4")] #[serde( skip_serializing_if = "String::is_empty", default = "default_cache_dir" @@ -33,54 +30,49 @@ pub struct Config { pub cache_dir: String, #[serde(default, skip_serializing_if = "Vec::is_empty")] - #[prost(message, repeated, tag = "5")] + /// connections to databases: Eg: Postgres, Snowflake, etc pub connections: Vec, #[serde(default, skip_serializing_if = "Vec::is_empty")] - #[prost(message, repeated, tag = "6")] + /// sources to ingest data related to particular connection pub sources: Vec, #[serde(default, skip_serializing_if = "Vec::is_empty")] - #[prost(message, repeated, tag = "7")] + /// api endpoints to expose pub endpoints: Vec, - #[prost(message, optional, tag = "8")] /// Api server config related: port, host, etc #[serde(skip_serializing_if = "Option::is_none")] pub api: Option, - #[prost(string, optional, tag = "9")] #[serde(skip_serializing_if = "Option::is_none")] /// transformations to apply to source data in SQL format as multiple queries pub sql: Option, #[serde(skip_serializing_if = "Option::is_none")] - #[prost(message, optional, tag = "10")] + /// flags to enable/disable features pub flags: Option, /// Cache lmdb max map size - #[prost(uint64, optional, tag = "11")] + #[serde(skip_serializing_if = "Option::is_none")] pub cache_max_map_size: Option, - #[prost(message, optional, tag = "12")] /// App runtime config: behaviour of pipeline and log #[serde(skip_serializing_if = "Option::is_none")] pub app: Option, - #[prost(message, optional, tag = "13")] /// Instrument using Dozer #[serde(skip_serializing_if = "Option::is_none")] pub telemetry: Option, - #[prost(message, optional, tag = "14")] + /// Dozer Cloud specific configuration #[serde(skip_serializing_if = "Option::is_none")] pub cloud: Option, - #[prost(message, repeated, tag = "15")] /// UDF specific configuration (eg. !Onnx) #[serde(default, skip_serializing_if = "Vec::is_empty")] pub udfs: Vec, diff --git a/dozer-types/src/models/connection.rs b/dozer-types/src/models/connection.rs index 445514ed15..6deff9f598 100644 --- a/dozer-types/src/models/connection.rs +++ b/dozer-types/src/models/connection.rs @@ -2,6 +2,7 @@ use crate::ingestion_types::{ DeltaLakeConfig, EthConfig, GrpcConfig, KafkaConfig, LocalStorage, MongodbConfig, MySQLConfig, NestedDozerConfig, S3Storage, SnowflakeConfig, }; +use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::str::FromStr; @@ -14,33 +15,58 @@ use prettytable::Table; use tokio_postgres::config::SslMode; use tokio_postgres::Config; -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)] +pub trait SchemaExample { + fn example() -> Self; +} + +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema, Default)] pub struct Connection { - #[prost(oneof = "ConnectionConfig", tags = "1,2,3,4,5,6,7,8")] /// authentication config - depends on db_type pub config: Option, - #[prost(string, tag = "9")] + pub name: String, } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)] +/// Configuration for a Postgres connection +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema, Default)] +#[schemars(example = "Self::example")] pub struct PostgresConfig { - #[prost(string, optional, tag = "1")] + /// The username to use for authentication pub user: Option, - #[prost(string, optional, tag = "2")] + + /// The password to use for authentication pub password: Option, - #[prost(string, optional, tag = "3")] + + /// The host to connect to (IP or DNS name) pub host: Option, - #[prost(uint32, optional, tag = "4")] + + /// The port to connect to (default: 5432) pub port: Option, - #[prost(string, optional, tag = "5")] + + /// The database to connect to (default: postgres) pub database: Option, - #[prost(string, optional, tag = "6")] + + /// The sslmode to use for the connection (disable, prefer, require) pub sslmode: Option, - #[prost(string, optional, tag = "7")] + + /// The connection url to use pub connection_url: Option, } +impl SchemaExample for PostgresConfig { + fn example() -> Self { + Self { + user: Some("postgres".to_string()), + password: None, + host: Some("localhost".to_string()), + port: Some(5432), + database: Some("postgres".to_string()), + sslmode: None, + connection_url: None, + } + } +} + #[derive(Eq, PartialEq, Clone, Debug, Hash)] pub struct PostgresConfigReplenished { pub user: String, @@ -165,39 +191,38 @@ fn get_sslmode(mode: String) -> Result { } } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Oneof, Hash)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] pub enum ConnectionConfig { - #[prost(message, tag = "1")] /// In yaml, present as tag: `!Postgres` Postgres(PostgresConfig), - #[prost(message, tag = "2")] + /// In yaml, present as tag: `!Ethereum` Ethereum(EthConfig), - #[prost(message, tag = "3")] + /// In yaml, present as tag: `!Grpc` Grpc(GrpcConfig), - #[prost(message, tag = "4")] + /// In yaml, present as tag: `!Snowflake` Snowflake(SnowflakeConfig), - #[prost(message, tag = "5")] + /// In yaml, present as tag: `!Kafka` Kafka(KafkaConfig), - #[prost(message, tag = "6")] + /// In yaml, present as tag: `!ObjectStore` S3Storage(S3Storage), - #[prost(message, tag = "7")] + /// In yaml, present as tag: `!ObjectStore` LocalStorage(LocalStorage), - #[prost(message, tag = "8")] + /// In yaml, present as tag" `!DeltaLake` DeltaLake(DeltaLakeConfig), - #[prost(message, tag = "9")] + /// In yaml, present as tag: `!MongoDB` MongoDB(MongodbConfig), - #[prost(message, tag = "10")] + /// In yaml, present as tag" `!MySQL` MySQL(MySQLConfig), - #[prost(message, tag = "11")] + /// In yaml, present as tag" `!Dozer` Dozer(NestedDozerConfig), } diff --git a/dozer-types/src/models/flags.rs b/dozer-types/src/models/flags.rs index 95cf9f234f..d5f958d6d5 100644 --- a/dozer-types/src/models/flags.rs +++ b/dozer-types/src/models/flags.rs @@ -1,56 +1,61 @@ +use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, prost::Message)] +#[derive(Debug, Serialize, JsonSchema, Deserialize, PartialEq, Eq, Clone)] pub struct Flags { /// dynamic grpc enabled; Default: true - #[prost(bool, tag = "1", default = true)] #[serde(default = "default_true")] pub dynamic: bool, /// http1 + web support for grpc. This is required for browser clients.; Default: true - #[prost(bool, tag = "2", default = true)] #[serde(default = "default_true")] pub grpc_web: bool, /// push events enabled.; Default: true - #[prost(bool, tag = "3", default = true)] #[serde(default = "default_push_events")] pub push_events: bool, /// require authentication to access grpc server reflection service if true.; Default: false - #[prost(bool, tag = "4", default = false)] + #[serde(default = "default_false")] pub authenticate_server_reflection: bool, /// probablistic optimizations reduce memory consumption at the expense of accuracy. #[serde(skip_serializing_if = "Option::is_none")] - #[prost(message, optional, tag = "5")] pub enable_probabilistic_optimizations: Option, } +impl Default for Flags { + fn default() -> Self { + Self { + dynamic: true, + grpc_web: true, + push_events: true, + authenticate_server_reflection: false, + enable_probabilistic_optimizations: Default::default(), + } + } +} -#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, prost::Message)] +#[derive(Debug, Serialize, JsonSchema, Default, Deserialize, PartialEq, Eq, Clone)] pub struct EnableProbabilisticOptimizations { /// enable probabilistic optimizations in set operations (UNION, EXCEPT, INTERSECT); Default: false - #[prost(bool, tag = "1", default = false)] #[serde(default = "default_false")] pub in_sets: bool, /// enable probabilistic optimizations in JOIN operations; Default: false - #[prost(bool, tag = "2", default = false)] #[serde(default = "default_false")] pub in_joins: bool, /// enable probabilistic optimizations in aggregations (SUM, COUNT, MIN, etc.); Default: false - #[prost(bool, tag = "3", default = false)] #[serde(default = "default_false")] pub in_aggregations: bool, } -pub fn default_push_events() -> bool { - true -} - fn default_true() -> bool { true } fn default_false() -> bool { false } + +pub fn default_push_events() -> bool { + true +} diff --git a/dozer-types/src/models/json_schema_helper.rs b/dozer-types/src/models/json_schema_helper.rs new file mode 100644 index 0000000000..58dab6f147 --- /dev/null +++ b/dozer-types/src/models/json_schema_helper.rs @@ -0,0 +1,54 @@ +use schemars::{schema::RootSchema, schema_for}; +use serde::{Deserialize, Serialize}; + +use crate::ingestion_types; + +use super::{config::Config, connection::PostgresConfig}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct Schema { + pub name: String, + pub schema: RootSchema, +} +pub fn get_dozer_schema() -> Result { + let schema = schema_for!(Config); + let schema_json = serde_json::to_string_pretty(&schema)?; + Ok(schema_json) +} + +pub fn get_connection_schemas() -> Result { + let mut schemas = vec![]; + + let configs = [ + ("postgres", schema_for!(PostgresConfig)), + ("ethereum", schema_for!(ingestion_types::EthConfig)), + ("grpc", schema_for!(ingestion_types::GrpcConfig)), + ("snowflake", schema_for!(ingestion_types::SnowflakeConfig)), + ("kafka", schema_for!(ingestion_types::KafkaConfig)), + ("s3", schema_for!(ingestion_types::S3Storage)), + ("local_storage", schema_for!(ingestion_types::LocalStorage)), + ("deltalake", schema_for!(ingestion_types::DeltaLakeConfig)), + ("mongodb", schema_for!(ingestion_types::MongodbConfig)), + ("mysql", schema_for!(ingestion_types::MySQLConfig)), + ("dozer", schema_for!(ingestion_types::NestedDozerConfig)), + ]; + for (name, schema) in configs.iter() { + schemas.push(Schema { + name: name.to_string(), + schema: schema.clone(), + }); + } + let schema_json = serde_json::to_string_pretty(&schemas)?; + Ok(schema_json) +} + +#[cfg(test)] +mod tests { + use super::get_connection_schemas; + + #[test] + fn get_schemas() { + let schemas = get_connection_schemas(); + assert!(schemas.is_ok()); + } +} diff --git a/dozer-types/src/models/mod.rs b/dozer-types/src/models/mod.rs index 234d592741..413f8514fc 100644 --- a/dozer-types/src/models/mod.rs +++ b/dozer-types/src/models/mod.rs @@ -6,6 +6,8 @@ pub mod cloud; pub mod config; pub mod connection; pub mod flags; +mod json_schema_helper; pub mod source; pub mod telemetry; pub mod udf_config; +pub use json_schema_helper::{get_connection_schemas, get_dozer_schema}; diff --git a/dozer-types/src/models/source.rs b/dozer-types/src/models/source.rs index 2c9ab0ecb7..1c56adc49d 100644 --- a/dozer-types/src/models/source.rs +++ b/dozer-types/src/models/source.rs @@ -1,25 +1,25 @@ +use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message)] +#[derive(Debug, Serialize, JsonSchema, Default, Deserialize, Eq, PartialEq, Clone)] pub struct Source { - #[prost(string, tag = "1")] /// name of the source - to distinguish between multiple sources; Type: String pub name: String, - #[prost(string, tag = "2")] + /// name of the table in source database; Type: String pub table_name: String, - #[prost(string, repeated, tag = "3")] + #[serde(default, skip_serializing_if = "Vec::is_empty")] /// list of columns gonna be used in the source table; Type: String[] pub columns: Vec, - #[prost(string, tag = "4")] + /// reference to pre-defined connection name; Type: String pub connection: String, /// name of schema source database; Type: String - #[prost(string, optional, tag = "5")] + #[serde(default)] pub schema: Option, - #[prost(oneof = "RefreshConfig", tags = "7")] + #[serde(default = "default_refresh_config")] #[serde(skip_serializing_if = "Option::is_none")] /// setting for how to refresh the data; Default: RealTime @@ -35,12 +35,12 @@ pub enum Value { Ref(String), } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, Debug)] +#[derive(Debug, Serialize, JsonSchema, Deserialize, Eq, PartialEq, Clone)] pub enum HistoryType { Master(MasterHistoryConfig), Transactional(TransactionalHistoryConfig), } -#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] +#[derive(Debug, Serialize, Deserialize, JsonSchema, Eq, PartialEq, Clone)] pub enum MasterHistoryConfig { AppendOnly { unique_key_field: String, @@ -50,19 +50,18 @@ pub enum MasterHistoryConfig { Overwrite, } -#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] +#[derive(Debug, Serialize, Deserialize, JsonSchema, Eq, PartialEq, Clone)] pub enum TransactionalHistoryConfig { RetainPartial { timestamp_field: String, retention_period: u32, }, } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Oneof)] +#[derive(Debug, Serialize, JsonSchema, Deserialize, Eq, PartialEq, Clone)] pub enum RefreshConfig { // Hour { minute: u32 }, // Day { time: String }, // CronExpression { expression: String }, - #[prost(message, tag = "7")] RealTime(RealTimeConfig), } impl Default for RefreshConfig { @@ -70,5 +69,5 @@ impl Default for RefreshConfig { RefreshConfig::RealTime(RealTimeConfig {}) } } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message)] +#[derive(Debug, Serialize, JsonSchema, Default, Deserialize, Eq, PartialEq, Clone)] pub struct RealTimeConfig {} diff --git a/dozer-types/src/models/telemetry.rs b/dozer-types/src/models/telemetry.rs index 2dca314b20..a2926fe078 100644 --- a/dozer-types/src/models/telemetry.rs +++ b/dozer-types/src/models/telemetry.rs @@ -1,38 +1,35 @@ +use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, prost::Message)] +#[derive(Debug, Serialize, JsonSchema, Default, Deserialize, PartialEq, Eq, Clone)] pub struct TelemetryConfig { - #[prost(oneof = "TelemetryTraceConfig", tags = "1, 2")] pub trace: Option, - #[prost(oneof = "TelemetryMetricsConfig", tags = "3")] + pub metrics: Option, } -#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, prost::Oneof)] +#[derive(Debug, Serialize, JsonSchema, Deserialize, PartialEq, Eq, Clone)] pub enum TelemetryTraceConfig { - #[prost(message, tag = "1")] Dozer(DozerTelemetryConfig), - #[prost(message, tag = "2")] + XRay(XRayConfig), } -#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, prost::Message)] +#[derive(Debug, Serialize, JsonSchema, Default, Deserialize, PartialEq, Eq, Clone)] pub struct DozerTelemetryConfig { - #[prost(string, tag = "1", default = "0.0.0.0:7006")] #[serde(default = "default_ingest_address")] pub endpoint: String, - #[prost(string, tag = "2", default = "default")] + #[serde(default = "default_grpc_adapter")] pub adapter: String, - #[prost(uint32, tag = "3")] + #[serde(default = "default_sample_ratio")] pub sample_percent: u32, } -#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, prost::Message)] +#[derive(Debug, Serialize, JsonSchema, Default, Deserialize, PartialEq, Eq, Clone)] pub struct XRayConfig { - #[prost(string, tag = "1")] pub endpoint: String, - #[prost(uint64, tag = "2")] + pub timeout_in_seconds: u64, } @@ -48,8 +45,7 @@ fn default_sample_ratio() -> u32 { 10 } -#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, prost::Oneof)] +#[derive(Debug, Serialize, JsonSchema, Deserialize, PartialEq, Eq, Clone)] pub enum TelemetryMetricsConfig { - #[prost(message, tag = "1")] Prometheus(()), } diff --git a/dozer-types/src/models/udf_config.rs b/dozer-types/src/models/udf_config.rs index aa285e226f..aabe7b41d9 100644 --- a/dozer-types/src/models/udf_config.rs +++ b/dozer-types/src/models/udf_config.rs @@ -1,25 +1,24 @@ +use schemars::JsonSchema; + use crate::serde::{Deserialize, Serialize}; -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, prost::Message)] +#[derive(Debug, Serialize, JsonSchema, Default, Deserialize, Eq, PartialEq, Clone)] pub struct UdfConfig { - #[prost(string, tag = "1")] /// name of the model function pub name: String, - #[prost(oneof = "UdfType", tags = "2")] + #[serde(skip_serializing_if = "Option::is_none")] /// setting for what type of udf to use; Default: Onnx pub config: Option, } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Oneof)] +#[derive(Debug, Serialize, JsonSchema, Deserialize, Eq, PartialEq, Clone)] pub enum UdfType { - #[prost(message, tag = "2")] Onnx(OnnxConfig), } -#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message)] +#[derive(Debug, Serialize, JsonSchema, Default, Deserialize, Eq, PartialEq, Clone)] pub struct OnnxConfig { - #[prost(string)] /// path to the model file pub path: String, } diff --git a/dozer-types/src/tests.rs b/dozer-types/src/tests/mod.rs similarity index 100% rename from dozer-types/src/tests.rs rename to dozer-types/src/tests/mod.rs diff --git a/json_schemas/connections.json b/json_schemas/connections.json new file mode 100644 index 0000000000..8e981e8640 --- /dev/null +++ b/json_schemas/connections.json @@ -0,0 +1,852 @@ +[ + { + "name": "postgres", + "schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "PostgresConfig", + "description": "Configuration for a Postgres connection", + "examples": [ + { + "connection_url": null, + "database": "postgres", + "host": "localhost", + "password": null, + "port": 5432, + "sslmode": null, + "user": "postgres" + } + ], + "type": "object", + "properties": { + "connection_url": { + "description": "The connection url to use", + "type": [ + "string", + "null" + ] + }, + "database": { + "description": "The database to connect to (default: postgres)", + "type": [ + "string", + "null" + ] + }, + "host": { + "description": "The host to connect to (IP or DNS name)", + "type": [ + "string", + "null" + ] + }, + "password": { + "description": "The password to use for authentication", + "type": [ + "string", + "null" + ] + }, + "port": { + "description": "The port to connect to (default: 5432)", + "type": [ + "integer", + "null" + ], + "format": "uint32", + "minimum": 0.0 + }, + "sslmode": { + "description": "The sslmode to use for the connection (disable, prefer, require)", + "type": [ + "string", + "null" + ] + }, + "user": { + "description": "The username to use for authentication", + "type": [ + "string", + "null" + ] + } + } + } + }, + { + "name": "ethereum", + "schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "EthConfig", + "type": "object", + "properties": { + "provider": { + "anyOf": [ + { + "$ref": "#/definitions/EthProviderConfig" + }, + { + "type": "null" + } + ] + } + }, + "definitions": { + "EthContract": { + "type": "object", + "required": [ + "abi", + "address", + "name" + ], + "properties": { + "abi": { + "type": "string" + }, + "address": { + "type": "string" + }, + "name": { + "type": "string" + } + } + }, + "EthFilter": { + "type": "object", + "properties": { + "addresses": { + "default": [], + "type": "array", + "items": { + "type": "string" + } + }, + "from_block": { + "type": [ + "integer", + "null" + ], + "format": "uint64", + "minimum": 0.0 + }, + "to_block": { + "type": [ + "integer", + "null" + ], + "format": "uint64", + "minimum": 0.0 + }, + "topics": { + "default": [], + "type": "array", + "items": { + "type": "string" + } + } + } + }, + "EthLogConfig": { + "type": "object", + "required": [ + "wss_url" + ], + "properties": { + "contracts": { + "default": [], + "type": "array", + "items": { + "$ref": "#/definitions/EthContract" + } + }, + "filter": { + "anyOf": [ + { + "$ref": "#/definitions/EthFilter" + }, + { + "type": "null" + } + ] + }, + "wss_url": { + "type": "string" + } + } + }, + "EthProviderConfig": { + "oneOf": [ + { + "type": "object", + "required": [ + "Log" + ], + "properties": { + "Log": { + "$ref": "#/definitions/EthLogConfig" + } + }, + "additionalProperties": false + }, + { + "type": "object", + "required": [ + "Trace" + ], + "properties": { + "Trace": { + "$ref": "#/definitions/EthTraceConfig" + } + }, + "additionalProperties": false + } + ] + }, + "EthTraceConfig": { + "type": "object", + "required": [ + "from_block", + "https_url" + ], + "properties": { + "batch_size": { + "default": 3, + "type": "integer", + "format": "uint64", + "minimum": 0.0 + }, + "from_block": { + "type": "integer", + "format": "uint64", + "minimum": 0.0 + }, + "https_url": { + "type": "string" + }, + "to_block": { + "type": [ + "integer", + "null" + ], + "format": "uint64", + "minimum": 0.0 + } + } + } + } + } + }, + { + "name": "grpc", + "schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "GrpcConfig", + "type": "object", + "properties": { + "adapter": { + "default": "default", + "type": "string" + }, + "host": { + "default": "0.0.0.0", + "type": "string" + }, + "port": { + "default": 8085, + "type": "integer", + "format": "uint32", + "minimum": 0.0 + }, + "schemas": { + "anyOf": [ + { + "$ref": "#/definitions/GrpcConfigSchemas" + }, + { + "type": "null" + } + ] + } + }, + "definitions": { + "GrpcConfigSchemas": { + "oneOf": [ + { + "type": "object", + "required": [ + "Inline" + ], + "properties": { + "Inline": { + "type": "string" + } + }, + "additionalProperties": false + }, + { + "type": "object", + "required": [ + "Path" + ], + "properties": { + "Path": { + "type": "string" + } + }, + "additionalProperties": false + } + ] + } + } + } + }, + { + "name": "snowflake", + "schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "SnowflakeConfig", + "type": "object", + "required": [ + "database", + "password", + "port", + "role", + "schema", + "server", + "user", + "warehouse" + ], + "properties": { + "database": { + "type": "string" + }, + "driver": { + "type": [ + "string", + "null" + ] + }, + "password": { + "type": "string" + }, + "port": { + "type": "string" + }, + "role": { + "type": "string" + }, + "schema": { + "type": "string" + }, + "server": { + "type": "string" + }, + "user": { + "type": "string" + }, + "warehouse": { + "type": "string" + } + } + } + }, + { + "name": "kafka", + "schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "KafkaConfig", + "type": "object", + "required": [ + "broker" + ], + "properties": { + "broker": { + "type": "string" + }, + "schema_registry_url": { + "type": [ + "string", + "null" + ] + } + } + } + }, + { + "name": "s3", + "schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "S3Storage", + "type": "object", + "required": [ + "tables" + ], + "properties": { + "details": { + "anyOf": [ + { + "$ref": "#/definitions/S3Details" + }, + { + "type": "null" + } + ] + }, + "tables": { + "type": "array", + "items": { + "$ref": "#/definitions/Table" + } + } + }, + "definitions": { + "CsvConfig": { + "type": "object", + "required": [ + "extension", + "path" + ], + "properties": { + "extension": { + "type": "string" + }, + "marker_extension": { + "default": ".marker", + "type": "string" + }, + "marker_file": { + "default": false, + "type": "boolean" + }, + "path": { + "type": "string" + } + } + }, + "DeltaConfig": { + "type": "object", + "required": [ + "path" + ], + "properties": { + "path": { + "type": "string" + } + } + }, + "ParquetConfig": { + "type": "object", + "required": [ + "extension", + "path" + ], + "properties": { + "extension": { + "type": "string" + }, + "marker_extension": { + "default": ".marker", + "type": "string" + }, + "marker_file": { + "default": false, + "type": "boolean" + }, + "path": { + "type": "string" + } + } + }, + "S3Details": { + "type": "object", + "required": [ + "access_key_id", + "bucket_name", + "region", + "secret_access_key" + ], + "properties": { + "access_key_id": { + "type": "string" + }, + "bucket_name": { + "type": "string" + }, + "region": { + "type": "string" + }, + "secret_access_key": { + "type": "string" + } + } + }, + "Table": { + "type": "object", + "required": [ + "name" + ], + "properties": { + "config": { + "anyOf": [ + { + "$ref": "#/definitions/TableConfig" + }, + { + "type": "null" + } + ] + }, + "name": { + "type": "string" + } + } + }, + "TableConfig": { + "oneOf": [ + { + "type": "object", + "required": [ + "CSV" + ], + "properties": { + "CSV": { + "$ref": "#/definitions/CsvConfig" + } + }, + "additionalProperties": false + }, + { + "type": "object", + "required": [ + "Delta" + ], + "properties": { + "Delta": { + "$ref": "#/definitions/DeltaConfig" + } + }, + "additionalProperties": false + }, + { + "type": "object", + "required": [ + "Parquet" + ], + "properties": { + "Parquet": { + "$ref": "#/definitions/ParquetConfig" + } + }, + "additionalProperties": false + } + ] + } + } + } + }, + { + "name": "local_storage", + "schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "LocalStorage", + "type": "object", + "required": [ + "tables" + ], + "properties": { + "details": { + "anyOf": [ + { + "$ref": "#/definitions/LocalDetails" + }, + { + "type": "null" + } + ] + }, + "tables": { + "type": "array", + "items": { + "$ref": "#/definitions/Table" + } + } + }, + "definitions": { + "CsvConfig": { + "type": "object", + "required": [ + "extension", + "path" + ], + "properties": { + "extension": { + "type": "string" + }, + "marker_extension": { + "default": ".marker", + "type": "string" + }, + "marker_file": { + "default": false, + "type": "boolean" + }, + "path": { + "type": "string" + } + } + }, + "DeltaConfig": { + "type": "object", + "required": [ + "path" + ], + "properties": { + "path": { + "type": "string" + } + } + }, + "LocalDetails": { + "type": "object", + "required": [ + "path" + ], + "properties": { + "path": { + "type": "string" + } + } + }, + "ParquetConfig": { + "type": "object", + "required": [ + "extension", + "path" + ], + "properties": { + "extension": { + "type": "string" + }, + "marker_extension": { + "default": ".marker", + "type": "string" + }, + "marker_file": { + "default": false, + "type": "boolean" + }, + "path": { + "type": "string" + } + } + }, + "Table": { + "type": "object", + "required": [ + "name" + ], + "properties": { + "config": { + "anyOf": [ + { + "$ref": "#/definitions/TableConfig" + }, + { + "type": "null" + } + ] + }, + "name": { + "type": "string" + } + } + }, + "TableConfig": { + "oneOf": [ + { + "type": "object", + "required": [ + "CSV" + ], + "properties": { + "CSV": { + "$ref": "#/definitions/CsvConfig" + } + }, + "additionalProperties": false + }, + { + "type": "object", + "required": [ + "Delta" + ], + "properties": { + "Delta": { + "$ref": "#/definitions/DeltaConfig" + } + }, + "additionalProperties": false + }, + { + "type": "object", + "required": [ + "Parquet" + ], + "properties": { + "Parquet": { + "$ref": "#/definitions/ParquetConfig" + } + }, + "additionalProperties": false + } + ] + } + } + } + }, + { + "name": "deltalake", + "schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "DeltaLakeConfig", + "type": "object", + "required": [ + "tables" + ], + "properties": { + "tables": { + "type": "array", + "items": { + "$ref": "#/definitions/DeltaTable" + } + } + }, + "definitions": { + "DeltaTable": { + "type": "object", + "required": [ + "name", + "path" + ], + "properties": { + "name": { + "type": "string" + }, + "path": { + "type": "string" + } + } + } + } + } + }, + { + "name": "mongodb", + "schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "MongodbConfig", + "type": "object", + "required": [ + "connection_string" + ], + "properties": { + "connection_string": { + "type": "string" + } + } + } + }, + { + "name": "mysql", + "schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "MySQLConfig", + "type": "object", + "required": [ + "url" + ], + "properties": { + "server_id": { + "type": [ + "integer", + "null" + ], + "format": "uint32", + "minimum": 0.0 + }, + "url": { + "type": "string" + } + } + } + }, + { + "name": "dozer", + "schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "NestedDozerConfig", + "type": "object", + "properties": { + "grpc": { + "anyOf": [ + { + "$ref": "#/definitions/AppGrpcOptions" + }, + { + "type": "null" + } + ] + }, + "log_options": { + "anyOf": [ + { + "$ref": "#/definitions/NestedDozerLogOptions" + }, + { + "type": "null" + } + ] + } + }, + "definitions": { + "AppGrpcOptions": { + "type": "object", + "properties": { + "host": { + "default": "0.0.0.0", + "type": "string" + }, + "port": { + "default": 50053, + "type": "integer", + "format": "uint32", + "minimum": 0.0 + } + } + }, + "NestedDozerLogOptions": { + "type": "object", + "properties": { + "batch_size": { + "default": 30, + "type": "integer", + "format": "uint32", + "minimum": 0.0 + }, + "buffer_size": { + "default": 1000, + "type": "integer", + "format": "uint32", + "minimum": 0.0 + }, + "timeout_in_millis": { + "default": 1000, + "type": "integer", + "format": "uint32", + "minimum": 0.0 + } + } + } + } + } + } +] \ No newline at end of file diff --git a/json_schemas/dozer.json b/json_schemas/dozer.json new file mode 100644 index 0000000000..c64ad02024 --- /dev/null +++ b/json_schemas/dozer.json @@ -0,0 +1,1941 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Config", + "description": "The configuration for the app", + "type": "object", + "required": [ + "app_name", + "version" + ], + "properties": { + "api": { + "description": "Api server config related: port, host, etc", + "anyOf": [ + { + "$ref": "#/definitions/ApiConfig" + }, + { + "type": "null" + } + ] + }, + "app": { + "description": "App runtime config: behaviour of pipeline and log", + "anyOf": [ + { + "$ref": "#/definitions/AppConfig" + }, + { + "type": "null" + } + ] + }, + "app_name": { + "description": "name of the app", + "type": "string" + }, + "cache_dir": { + "description": "directory for cache. Default: ./.dozer/cache", + "default": "./.dozer/cache", + "type": "string" + }, + "cache_max_map_size": { + "description": "Cache lmdb max map size", + "type": [ + "integer", + "null" + ], + "format": "uint64", + "minimum": 0.0 + }, + "cloud": { + "description": "Dozer Cloud specific configuration", + "anyOf": [ + { + "$ref": "#/definitions/Cloud" + }, + { + "type": "null" + } + ] + }, + "connections": { + "description": "connections to databases: Eg: Postgres, Snowflake, etc", + "type": "array", + "items": { + "$ref": "#/definitions/Connection" + } + }, + "endpoints": { + "description": "api endpoints to expose", + "type": "array", + "items": { + "$ref": "#/definitions/ApiEndpoint" + } + }, + "flags": { + "description": "flags to enable/disable features", + "anyOf": [ + { + "$ref": "#/definitions/Flags" + }, + { + "type": "null" + } + ] + }, + "home_dir": { + "description": "directory for all process; Default: ./.dozer", + "default": "./.dozer", + "type": "string" + }, + "sources": { + "description": "sources to ingest data related to particular connection", + "type": "array", + "items": { + "$ref": "#/definitions/Source" + } + }, + "sql": { + "description": "transformations to apply to source data in SQL format as multiple queries", + "type": [ + "string", + "null" + ] + }, + "telemetry": { + "description": "Instrument using Dozer", + "anyOf": [ + { + "$ref": "#/definitions/TelemetryConfig" + }, + { + "type": "null" + } + ] + }, + "udfs": { + "description": "UDF specific configuration (eg. !Onnx)", + "type": "array", + "items": { + "$ref": "#/definitions/UdfConfig" + } + }, + "version": { + "type": "integer", + "format": "uint32", + "minimum": 0.0 + } + }, + "definitions": { + "ApiConfig": { + "type": "object", + "properties": { + "api_security": { + "description": "The security configuration for the API; Default: None", + "anyOf": [ + { + "$ref": "#/definitions/ApiSecurity" + }, + { + "type": "null" + } + ] + }, + "app_grpc": { + "anyOf": [ + { + "$ref": "#/definitions/AppGrpcOptions" + }, + { + "type": "null" + } + ] + }, + "default_max_num_records": { + "default": 50, + "type": "integer", + "format": "uint32", + "minimum": 0.0 + }, + "grpc": { + "anyOf": [ + { + "$ref": "#/definitions/GrpcApiOptions" + }, + { + "type": "null" + } + ] + }, + "rest": { + "anyOf": [ + { + "$ref": "#/definitions/RestApiOptions" + }, + { + "type": "null" + } + ] + } + } + }, + "ApiEndpoint": { + "type": "object", + "required": [ + "name", + "path", + "table_name" + ], + "properties": { + "conflict_resolution": { + "anyOf": [ + { + "$ref": "#/definitions/ConflictResolution" + }, + { + "type": "null" + } + ] + }, + "index": { + "anyOf": [ + { + "$ref": "#/definitions/ApiIndex" + }, + { + "type": "null" + } + ] + }, + "log_reader_options": { + "anyOf": [ + { + "$ref": "#/definitions/LogReaderOptions" + }, + { + "type": "null" + } + ] + }, + "name": { + "type": "string" + }, + "path": { + "description": "path of endpoint - e.g: /stocks", + "type": "string" + }, + "table_name": { + "description": "name of the table in source database; Type: String", + "type": "string" + }, + "version": { + "type": [ + "integer", + "null" + ], + "format": "uint32", + "minimum": 0.0 + } + } + }, + "ApiIndex": { + "type": "object", + "properties": { + "primary_key": { + "type": "array", + "items": { + "type": "string" + } + }, + "secondary": { + "anyOf": [ + { + "$ref": "#/definitions/SecondaryIndexConfig" + }, + { + "type": "null" + } + ] + } + } + }, + "ApiInstance": { + "type": "object", + "properties": { + "instance_type": { + "type": [ + "string", + "null" + ] + }, + "instances_count": { + "default": 2, + "type": [ + "integer", + "null" + ], + "format": "uint32", + "minimum": 0.0 + }, + "volume_size": { + "description": "The size of the volume in GB", + "type": [ + "integer", + "null" + ], + "format": "uint32", + "minimum": 0.0 + } + } + }, + "ApiSecurity": { + "description": "The security model option for the API", + "oneOf": [ + { + "description": "Initialize with a JWT_SECRET", + "type": "object", + "required": [ + "Jwt" + ], + "properties": { + "Jwt": { + "type": "string" + } + }, + "additionalProperties": false + } + ] + }, + "AppConfig": { + "type": "object", + "properties": { + "app_buffer_size": { + "description": "Pipeline buffer size", + "type": [ + "integer", + "null" + ], + "format": "uint32", + "minimum": 0.0 + }, + "commit_size": { + "description": "Commit size", + "type": [ + "integer", + "null" + ], + "format": "uint32", + "minimum": 0.0 + }, + "commit_timeout": { + "description": "Commit timeout", + "type": [ + "integer", + "null" + ], + "format": "uint64", + "minimum": 0.0 + }, + "data_storage": { + "description": "The storage to use for the log.", + "anyOf": [ + { + "$ref": "#/definitions/DataStorage" + }, + { + "type": "null" + } + ] + }, + "error_threshold": { + "description": "How many errors we can tolerate before bringing down the app.", + "type": [ + "integer", + "null" + ], + "format": "uint32", + "minimum": 0.0 + }, + "max_interval_before_persist_in_seconds": { + "description": "The maximum time in seconds before a new checkpoint is created. If there're no new records, no checkpoint will be created.", + "type": [ + "integer", + "null" + ], + "format": "uint64", + "minimum": 0.0 + }, + "max_num_records_before_persist": { + "description": "The maximum unpersisted number of records in the processor record store. A checkpoint will be created when this number is reached.", + "type": [ + "integer", + "null" + ], + "format": "uint64", + "minimum": 0.0 + }, + "persist_queue_capacity": { + "type": [ + "integer", + "null" + ], + "format": "uint32", + "minimum": 0.0 + } + } + }, + "AppGrpcOptions": { + "type": "object", + "properties": { + "host": { + "default": "0.0.0.0", + "type": "string" + }, + "port": { + "default": 50053, + "type": "integer", + "format": "uint32", + "minimum": 0.0 + } + } + }, + "AppInstance": { + "type": "object", + "properties": { + "instance_type": { + "type": [ + "string", + "null" + ] + } + } + }, + "Cloud": { + "type": "object", + "properties": { + "api": { + "anyOf": [ + { + "$ref": "#/definitions/ApiInstance" + }, + { + "type": "null" + } + ] + }, + "app": { + "anyOf": [ + { + "$ref": "#/definitions/AppInstance" + }, + { + "type": "null" + } + ] + }, + "app_id": { + "type": [ + "string", + "null" + ] + }, + "profile": { + "type": [ + "string", + "null" + ] + }, + "update_current_version_strategy": { + "anyOf": [ + { + "$ref": "#/definitions/UpdateCurrentVersionStrategy" + }, + { + "type": "null" + } + ] + } + } + }, + "ConflictResolution": { + "type": "object", + "properties": { + "on_delete": { + "anyOf": [ + { + "$ref": "#/definitions/OnDeleteResolutionTypes" + }, + { + "type": "null" + } + ] + }, + "on_insert": { + "anyOf": [ + { + "$ref": "#/definitions/OnInsertResolutionTypes" + }, + { + "type": "null" + } + ] + }, + "on_update": { + "anyOf": [ + { + "$ref": "#/definitions/OnUpdateResolutionTypes" + }, + { + "type": "null" + } + ] + } + } + }, + "Connection": { + "type": "object", + "required": [ + "name" + ], + "properties": { + "config": { + "description": "authentication config - depends on db_type", + "anyOf": [ + { + "$ref": "#/definitions/ConnectionConfig" + }, + { + "type": "null" + } + ] + }, + "name": { + "type": "string" + } + } + }, + "ConnectionConfig": { + "oneOf": [ + { + "description": "In yaml, present as tag: `!Postgres`", + "type": "object", + "required": [ + "Postgres" + ], + "properties": { + "Postgres": { + "$ref": "#/definitions/PostgresConfig" + } + }, + "additionalProperties": false + }, + { + "description": "In yaml, present as tag: `!Ethereum`", + "type": "object", + "required": [ + "Ethereum" + ], + "properties": { + "Ethereum": { + "$ref": "#/definitions/EthConfig" + } + }, + "additionalProperties": false + }, + { + "description": "In yaml, present as tag: `!Grpc`", + "type": "object", + "required": [ + "Grpc" + ], + "properties": { + "Grpc": { + "$ref": "#/definitions/GrpcConfig" + } + }, + "additionalProperties": false + }, + { + "description": "In yaml, present as tag: `!Snowflake`", + "type": "object", + "required": [ + "Snowflake" + ], + "properties": { + "Snowflake": { + "$ref": "#/definitions/SnowflakeConfig" + } + }, + "additionalProperties": false + }, + { + "description": "In yaml, present as tag: `!Kafka`", + "type": "object", + "required": [ + "Kafka" + ], + "properties": { + "Kafka": { + "$ref": "#/definitions/KafkaConfig" + } + }, + "additionalProperties": false + }, + { + "description": "In yaml, present as tag: `!ObjectStore`", + "type": "object", + "required": [ + "S3Storage" + ], + "properties": { + "S3Storage": { + "$ref": "#/definitions/S3Storage" + } + }, + "additionalProperties": false + }, + { + "description": "In yaml, present as tag: `!ObjectStore`", + "type": "object", + "required": [ + "LocalStorage" + ], + "properties": { + "LocalStorage": { + "$ref": "#/definitions/LocalStorage" + } + }, + "additionalProperties": false + }, + { + "description": "In yaml, present as tag\" `!DeltaLake`", + "type": "object", + "required": [ + "DeltaLake" + ], + "properties": { + "DeltaLake": { + "$ref": "#/definitions/DeltaLakeConfig" + } + }, + "additionalProperties": false + }, + { + "description": "In yaml, present as tag: `!MongoDB`", + "type": "object", + "required": [ + "MongoDB" + ], + "properties": { + "MongoDB": { + "$ref": "#/definitions/MongodbConfig" + } + }, + "additionalProperties": false + }, + { + "description": "In yaml, present as tag\" `!MySQL`", + "type": "object", + "required": [ + "MySQL" + ], + "properties": { + "MySQL": { + "$ref": "#/definitions/MySQLConfig" + } + }, + "additionalProperties": false + }, + { + "description": "In yaml, present as tag\" `!Dozer`", + "type": "object", + "required": [ + "Dozer" + ], + "properties": { + "Dozer": { + "$ref": "#/definitions/NestedDozerConfig" + } + }, + "additionalProperties": false + } + ] + }, + "CreateSecondaryIndex": { + "type": "object", + "properties": { + "index": { + "anyOf": [ + { + "$ref": "#/definitions/SecondaryIndex" + }, + { + "type": "null" + } + ] + } + } + }, + "CsvConfig": { + "type": "object", + "required": [ + "extension", + "path" + ], + "properties": { + "extension": { + "type": "string" + }, + "marker_extension": { + "default": ".marker", + "type": "string" + }, + "marker_file": { + "default": false, + "type": "boolean" + }, + "path": { + "type": "string" + } + } + }, + "DataStorage": { + "oneOf": [ + { + "type": "object", + "required": [ + "Local" + ], + "properties": { + "Local": { + "type": "null" + } + }, + "additionalProperties": false + }, + { + "type": "object", + "required": [ + "S3" + ], + "properties": { + "S3": { + "$ref": "#/definitions/S3Storage2" + } + }, + "additionalProperties": false + } + ] + }, + "DeltaConfig": { + "type": "object", + "required": [ + "path" + ], + "properties": { + "path": { + "type": "string" + } + } + }, + "DeltaLakeConfig": { + "type": "object", + "required": [ + "tables" + ], + "properties": { + "tables": { + "type": "array", + "items": { + "$ref": "#/definitions/DeltaTable" + } + } + } + }, + "DeltaTable": { + "type": "object", + "required": [ + "name", + "path" + ], + "properties": { + "name": { + "type": "string" + }, + "path": { + "type": "string" + } + } + }, + "DozerTelemetryConfig": { + "type": "object", + "properties": { + "adapter": { + "default": "arrow", + "type": "string" + }, + "endpoint": { + "default": "0.0.0.0:7006", + "type": "string" + }, + "sample_percent": { + "default": 10, + "type": "integer", + "format": "uint32", + "minimum": 0.0 + } + } + }, + "EnableProbabilisticOptimizations": { + "type": "object", + "properties": { + "in_aggregations": { + "description": "enable probabilistic optimizations in aggregations (SUM, COUNT, MIN, etc.); Default: false", + "default": false, + "type": "boolean" + }, + "in_joins": { + "description": "enable probabilistic optimizations in JOIN operations; Default: false", + "default": false, + "type": "boolean" + }, + "in_sets": { + "description": "enable probabilistic optimizations in set operations (UNION, EXCEPT, INTERSECT); Default: false", + "default": false, + "type": "boolean" + } + } + }, + "EthConfig": { + "type": "object", + "properties": { + "provider": { + "anyOf": [ + { + "$ref": "#/definitions/EthProviderConfig" + }, + { + "type": "null" + } + ] + } + } + }, + "EthContract": { + "type": "object", + "required": [ + "abi", + "address", + "name" + ], + "properties": { + "abi": { + "type": "string" + }, + "address": { + "type": "string" + }, + "name": { + "type": "string" + } + } + }, + "EthFilter": { + "type": "object", + "properties": { + "addresses": { + "default": [], + "type": "array", + "items": { + "type": "string" + } + }, + "from_block": { + "type": [ + "integer", + "null" + ], + "format": "uint64", + "minimum": 0.0 + }, + "to_block": { + "type": [ + "integer", + "null" + ], + "format": "uint64", + "minimum": 0.0 + }, + "topics": { + "default": [], + "type": "array", + "items": { + "type": "string" + } + } + } + }, + "EthLogConfig": { + "type": "object", + "required": [ + "wss_url" + ], + "properties": { + "contracts": { + "default": [], + "type": "array", + "items": { + "$ref": "#/definitions/EthContract" + } + }, + "filter": { + "anyOf": [ + { + "$ref": "#/definitions/EthFilter" + }, + { + "type": "null" + } + ] + }, + "wss_url": { + "type": "string" + } + } + }, + "EthProviderConfig": { + "oneOf": [ + { + "type": "object", + "required": [ + "Log" + ], + "properties": { + "Log": { + "$ref": "#/definitions/EthLogConfig" + } + }, + "additionalProperties": false + }, + { + "type": "object", + "required": [ + "Trace" + ], + "properties": { + "Trace": { + "$ref": "#/definitions/EthTraceConfig" + } + }, + "additionalProperties": false + } + ] + }, + "EthTraceConfig": { + "type": "object", + "required": [ + "from_block", + "https_url" + ], + "properties": { + "batch_size": { + "default": 3, + "type": "integer", + "format": "uint64", + "minimum": 0.0 + }, + "from_block": { + "type": "integer", + "format": "uint64", + "minimum": 0.0 + }, + "https_url": { + "type": "string" + }, + "to_block": { + "type": [ + "integer", + "null" + ], + "format": "uint64", + "minimum": 0.0 + } + } + }, + "Flags": { + "type": "object", + "properties": { + "authenticate_server_reflection": { + "description": "require authentication to access grpc server reflection service if true.; Default: false", + "default": false, + "type": "boolean" + }, + "dynamic": { + "description": "dynamic grpc enabled; Default: true", + "default": true, + "type": "boolean" + }, + "enable_probabilistic_optimizations": { + "description": "probablistic optimizations reduce memory consumption at the expense of accuracy.", + "anyOf": [ + { + "$ref": "#/definitions/EnableProbabilisticOptimizations" + }, + { + "type": "null" + } + ] + }, + "grpc_web": { + "description": "http1 + web support for grpc. This is required for browser clients.; Default: true", + "default": true, + "type": "boolean" + }, + "push_events": { + "description": "push events enabled.; Default: true", + "default": true, + "type": "boolean" + } + } + }, + "FullText": { + "type": "object", + "required": [ + "field" + ], + "properties": { + "field": { + "type": "string" + } + } + }, + "GrpcApiOptions": { + "type": "object", + "properties": { + "cors": { + "default": true, + "type": "boolean" + }, + "enabled": { + "default": true, + "type": "boolean" + }, + "host": { + "default": "0.0.0.0", + "type": "string" + }, + "port": { + "default": 50051, + "type": "integer", + "format": "uint32", + "minimum": 0.0 + }, + "web": { + "default": true, + "type": "boolean" + } + } + }, + "GrpcConfig": { + "type": "object", + "properties": { + "adapter": { + "default": "default", + "type": "string" + }, + "host": { + "default": "0.0.0.0", + "type": "string" + }, + "port": { + "default": 8085, + "type": "integer", + "format": "uint32", + "minimum": 0.0 + }, + "schemas": { + "anyOf": [ + { + "$ref": "#/definitions/GrpcConfigSchemas" + }, + { + "type": "null" + } + ] + } + } + }, + "GrpcConfigSchemas": { + "oneOf": [ + { + "type": "object", + "required": [ + "Inline" + ], + "properties": { + "Inline": { + "type": "string" + } + }, + "additionalProperties": false + }, + { + "type": "object", + "required": [ + "Path" + ], + "properties": { + "Path": { + "type": "string" + } + }, + "additionalProperties": false + } + ] + }, + "KafkaConfig": { + "type": "object", + "required": [ + "broker" + ], + "properties": { + "broker": { + "type": "string" + }, + "schema_registry_url": { + "type": [ + "string", + "null" + ] + } + } + }, + "LocalDetails": { + "type": "object", + "required": [ + "path" + ], + "properties": { + "path": { + "type": "string" + } + } + }, + "LocalStorage": { + "type": "object", + "required": [ + "tables" + ], + "properties": { + "details": { + "anyOf": [ + { + "$ref": "#/definitions/LocalDetails" + }, + { + "type": "null" + } + ] + }, + "tables": { + "type": "array", + "items": { + "$ref": "#/definitions/Table" + } + } + } + }, + "LogReaderOptions": { + "type": "object", + "properties": { + "batch_size": { + "type": [ + "integer", + "null" + ], + "format": "uint32", + "minimum": 0.0 + }, + "buffer_size": { + "type": [ + "integer", + "null" + ], + "format": "uint32", + "minimum": 0.0 + }, + "timeout_in_millis": { + "type": [ + "integer", + "null" + ], + "format": "uint32", + "minimum": 0.0 + } + } + }, + "MongodbConfig": { + "type": "object", + "required": [ + "connection_string" + ], + "properties": { + "connection_string": { + "type": "string" + } + } + }, + "MySQLConfig": { + "type": "object", + "required": [ + "url" + ], + "properties": { + "server_id": { + "type": [ + "integer", + "null" + ], + "format": "uint32", + "minimum": 0.0 + }, + "url": { + "type": "string" + } + } + }, + "NestedDozerConfig": { + "type": "object", + "properties": { + "grpc": { + "anyOf": [ + { + "$ref": "#/definitions/AppGrpcOptions" + }, + { + "type": "null" + } + ] + }, + "log_options": { + "anyOf": [ + { + "$ref": "#/definitions/NestedDozerLogOptions" + }, + { + "type": "null" + } + ] + } + } + }, + "NestedDozerLogOptions": { + "type": "object", + "properties": { + "batch_size": { + "default": 30, + "type": "integer", + "format": "uint32", + "minimum": 0.0 + }, + "buffer_size": { + "default": 1000, + "type": "integer", + "format": "uint32", + "minimum": 0.0 + }, + "timeout_in_millis": { + "default": 1000, + "type": "integer", + "format": "uint32", + "minimum": 0.0 + } + } + }, + "OnDeleteResolutionTypes": { + "oneOf": [ + { + "type": "object", + "required": [ + "Nothing" + ], + "properties": { + "Nothing": { + "type": "null" + } + }, + "additionalProperties": false + }, + { + "type": "object", + "required": [ + "Panic" + ], + "properties": { + "Panic": { + "type": "null" + } + }, + "additionalProperties": false + } + ] + }, + "OnInsertResolutionTypes": { + "oneOf": [ + { + "type": "object", + "required": [ + "Nothing" + ], + "properties": { + "Nothing": { + "type": "null" + } + }, + "additionalProperties": false + }, + { + "type": "object", + "required": [ + "Update" + ], + "properties": { + "Update": { + "type": "null" + } + }, + "additionalProperties": false + }, + { + "type": "object", + "required": [ + "Panic" + ], + "properties": { + "Panic": { + "type": "null" + } + }, + "additionalProperties": false + } + ] + }, + "OnUpdateResolutionTypes": { + "oneOf": [ + { + "type": "object", + "required": [ + "Nothing" + ], + "properties": { + "Nothing": { + "type": "null" + } + }, + "additionalProperties": false + }, + { + "type": "object", + "required": [ + "Upsert" + ], + "properties": { + "Upsert": { + "type": "null" + } + }, + "additionalProperties": false + }, + { + "type": "object", + "required": [ + "Panic" + ], + "properties": { + "Panic": { + "type": "null" + } + }, + "additionalProperties": false + } + ] + }, + "OnnxConfig": { + "type": "object", + "required": [ + "path" + ], + "properties": { + "path": { + "description": "path to the model file", + "type": "string" + } + } + }, + "ParquetConfig": { + "type": "object", + "required": [ + "extension", + "path" + ], + "properties": { + "extension": { + "type": "string" + }, + "marker_extension": { + "default": ".marker", + "type": "string" + }, + "marker_file": { + "default": false, + "type": "boolean" + }, + "path": { + "type": "string" + } + } + }, + "PostgresConfig": { + "description": "Configuration for a Postgres connection", + "examples": [ + { + "connection_url": null, + "database": "postgres", + "host": "localhost", + "password": null, + "port": 5432, + "sslmode": null, + "user": "postgres" + } + ], + "type": "object", + "properties": { + "connection_url": { + "description": "The connection url to use", + "type": [ + "string", + "null" + ] + }, + "database": { + "description": "The database to connect to (default: postgres)", + "type": [ + "string", + "null" + ] + }, + "host": { + "description": "The host to connect to (IP or DNS name)", + "type": [ + "string", + "null" + ] + }, + "password": { + "description": "The password to use for authentication", + "type": [ + "string", + "null" + ] + }, + "port": { + "description": "The port to connect to (default: 5432)", + "type": [ + "integer", + "null" + ], + "format": "uint32", + "minimum": 0.0 + }, + "sslmode": { + "description": "The sslmode to use for the connection (disable, prefer, require)", + "type": [ + "string", + "null" + ] + }, + "user": { + "description": "The username to use for authentication", + "type": [ + "string", + "null" + ] + } + } + }, + "RealTimeConfig": { + "type": "object" + }, + "RefreshConfig": { + "oneOf": [ + { + "type": "object", + "required": [ + "RealTime" + ], + "properties": { + "RealTime": { + "$ref": "#/definitions/RealTimeConfig" + } + }, + "additionalProperties": false + } + ] + }, + "RestApiOptions": { + "type": "object", + "properties": { + "cors": { + "default": true, + "type": "boolean" + }, + "enabled": { + "default": true, + "type": "boolean" + }, + "host": { + "default": "0.0.0.0", + "type": "string" + }, + "port": { + "default": 8080, + "type": "integer", + "format": "uint32", + "minimum": 0.0 + } + } + }, + "S3Details": { + "type": "object", + "required": [ + "access_key_id", + "bucket_name", + "region", + "secret_access_key" + ], + "properties": { + "access_key_id": { + "type": "string" + }, + "bucket_name": { + "type": "string" + }, + "region": { + "type": "string" + }, + "secret_access_key": { + "type": "string" + } + } + }, + "S3Storage": { + "type": "object", + "required": [ + "tables" + ], + "properties": { + "details": { + "anyOf": [ + { + "$ref": "#/definitions/S3Details" + }, + { + "type": "null" + } + ] + }, + "tables": { + "type": "array", + "items": { + "$ref": "#/definitions/Table" + } + } + } + }, + "S3Storage2": { + "type": "object", + "required": [ + "bucket_name", + "region" + ], + "properties": { + "bucket_name": { + "type": "string" + }, + "region": { + "type": "string" + } + } + }, + "SecondaryIndex": { + "oneOf": [ + { + "type": "object", + "required": [ + "SortedInverted" + ], + "properties": { + "SortedInverted": { + "$ref": "#/definitions/SortedInverted" + } + }, + "additionalProperties": false + }, + { + "type": "object", + "required": [ + "FullText" + ], + "properties": { + "FullText": { + "$ref": "#/definitions/FullText" + } + }, + "additionalProperties": false + } + ] + }, + "SecondaryIndexConfig": { + "type": "object", + "properties": { + "create": { + "type": "array", + "items": { + "$ref": "#/definitions/CreateSecondaryIndex" + } + }, + "skip_default": { + "type": "array", + "items": { + "type": "string" + } + } + } + }, + "SnowflakeConfig": { + "type": "object", + "required": [ + "database", + "password", + "port", + "role", + "schema", + "server", + "user", + "warehouse" + ], + "properties": { + "database": { + "type": "string" + }, + "driver": { + "type": [ + "string", + "null" + ] + }, + "password": { + "type": "string" + }, + "port": { + "type": "string" + }, + "role": { + "type": "string" + }, + "schema": { + "type": "string" + }, + "server": { + "type": "string" + }, + "user": { + "type": "string" + }, + "warehouse": { + "type": "string" + } + } + }, + "SortedInverted": { + "type": "object", + "required": [ + "fields" + ], + "properties": { + "fields": { + "type": "array", + "items": { + "type": "string" + } + } + } + }, + "Source": { + "type": "object", + "required": [ + "connection", + "name", + "table_name" + ], + "properties": { + "columns": { + "description": "list of columns gonna be used in the source table; Type: String[]", + "type": "array", + "items": { + "type": "string" + } + }, + "connection": { + "description": "reference to pre-defined connection name; Type: String", + "type": "string" + }, + "name": { + "description": "name of the source - to distinguish between multiple sources; Type: String", + "type": "string" + }, + "refresh_config": { + "description": "setting for how to refresh the data; Default: RealTime", + "default": { + "RealTime": {} + }, + "anyOf": [ + { + "$ref": "#/definitions/RefreshConfig" + }, + { + "type": "null" + } + ] + }, + "schema": { + "description": "name of schema source database; Type: String", + "default": null, + "type": [ + "string", + "null" + ] + }, + "table_name": { + "description": "name of the table in source database; Type: String", + "type": "string" + } + } + }, + "Table": { + "type": "object", + "required": [ + "name" + ], + "properties": { + "config": { + "anyOf": [ + { + "$ref": "#/definitions/TableConfig" + }, + { + "type": "null" + } + ] + }, + "name": { + "type": "string" + } + } + }, + "TableConfig": { + "oneOf": [ + { + "type": "object", + "required": [ + "CSV" + ], + "properties": { + "CSV": { + "$ref": "#/definitions/CsvConfig" + } + }, + "additionalProperties": false + }, + { + "type": "object", + "required": [ + "Delta" + ], + "properties": { + "Delta": { + "$ref": "#/definitions/DeltaConfig" + } + }, + "additionalProperties": false + }, + { + "type": "object", + "required": [ + "Parquet" + ], + "properties": { + "Parquet": { + "$ref": "#/definitions/ParquetConfig" + } + }, + "additionalProperties": false + } + ] + }, + "TelemetryConfig": { + "type": "object", + "properties": { + "metrics": { + "anyOf": [ + { + "$ref": "#/definitions/TelemetryMetricsConfig" + }, + { + "type": "null" + } + ] + }, + "trace": { + "anyOf": [ + { + "$ref": "#/definitions/TelemetryTraceConfig" + }, + { + "type": "null" + } + ] + } + } + }, + "TelemetryMetricsConfig": { + "oneOf": [ + { + "type": "object", + "required": [ + "Prometheus" + ], + "properties": { + "Prometheus": { + "type": "null" + } + }, + "additionalProperties": false + } + ] + }, + "TelemetryTraceConfig": { + "oneOf": [ + { + "type": "object", + "required": [ + "Dozer" + ], + "properties": { + "Dozer": { + "$ref": "#/definitions/DozerTelemetryConfig" + } + }, + "additionalProperties": false + }, + { + "type": "object", + "required": [ + "XRay" + ], + "properties": { + "XRay": { + "$ref": "#/definitions/XRayConfig" + } + }, + "additionalProperties": false + } + ] + }, + "UdfConfig": { + "type": "object", + "required": [ + "name" + ], + "properties": { + "config": { + "description": "setting for what type of udf to use; Default: Onnx", + "anyOf": [ + { + "$ref": "#/definitions/UdfType" + }, + { + "type": "null" + } + ] + }, + "name": { + "description": "name of the model function", + "type": "string" + } + } + }, + "UdfType": { + "oneOf": [ + { + "type": "object", + "required": [ + "Onnx" + ], + "properties": { + "Onnx": { + "$ref": "#/definitions/OnnxConfig" + } + }, + "additionalProperties": false + } + ] + }, + "UpdateCurrentVersionStrategy": { + "oneOf": [ + { + "type": "object", + "required": [ + "OnCreate" + ], + "properties": { + "OnCreate": { + "type": "null" + } + }, + "additionalProperties": false + }, + { + "type": "object", + "required": [ + "Manual" + ], + "properties": { + "Manual": { + "type": "null" + } + }, + "additionalProperties": false + } + ] + }, + "XRayConfig": { + "type": "object", + "required": [ + "endpoint", + "timeout_in_seconds" + ], + "properties": { + "endpoint": { + "type": "string" + }, + "timeout_in_seconds": { + "type": "integer", + "format": "uint64", + "minimum": 0.0 + } + } + } + } +} \ No newline at end of file From 195177648eb0b2f1871405da3536124bc33ce633 Mon Sep 17 00:00:00 2001 From: Bei Chu <914745487@qq.com> Date: Fri, 22 Sep 2023 16:11:29 +0800 Subject: [PATCH 5/8] chore: Disable e2e test (#2071) Signed-off-by: Bei Chu <914745487@qq.com> --- .github/workflows/e2e.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/e2e.yaml b/.github/workflows/e2e.yaml index dd89d77efa..198b9aa1e6 100644 --- a/.github/workflows/e2e.yaml +++ b/.github/workflows/e2e.yaml @@ -2,7 +2,6 @@ name: Dozer E2E Test on: workflow_dispatch: - merge_group: pull_request_target: branches: [main] From 451c62d2d14d9f579f2095b05f9d7f8196afc59e Mon Sep 17 00:00:00 2001 From: Bei Chu <914745487@qq.com> Date: Fri, 22 Sep 2023 16:32:24 +0800 Subject: [PATCH 6/8] chore: Disable dozer connector checkpointing (#2072) --- dozer-ingestion/src/connectors/dozer/connector.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dozer-ingestion/src/connectors/dozer/connector.rs b/dozer-ingestion/src/connectors/dozer/connector.rs index bc76955c4c..c11e64b22b 100644 --- a/dozer-ingestion/src/connectors/dozer/connector.rs +++ b/dozer-ingestion/src/connectors/dozer/connector.rs @@ -14,7 +14,6 @@ use dozer_types::{ ingestion_types::{ default_log_options, IngestionMessage, NestedDozerConfig, NestedDozerLogOptions, }, - node::OpIdentifier, serde_json, types::{Operation, Record, Schema}, }; @@ -265,7 +264,7 @@ async fn read_table( .send(IngestionMessage::OperationEvent { table_index, op, - id: Some(OpIdentifier::new(0, op_and_pos.pos)), + id: None, }) .await; } From 72458bfb27c33bce0ec219a0a5ee5494df4e7452 Mon Sep 17 00:00:00 2001 From: Bei Chu <914745487@qq.com> Date: Fri, 22 Sep 2023 18:20:21 +0800 Subject: [PATCH 7/8] fix: Use same CORS headers with traefik to work around https://github.com/traefik/traefik/issues/5567 (#2076) --- dozer-api/src/grpc/grpc_web_middleware.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dozer-api/src/grpc/grpc_web_middleware.rs b/dozer-api/src/grpc/grpc_web_middleware.rs index 76cb718056..ba1f0c6795 100644 --- a/dozer-api/src/grpc/grpc_web_middleware.rs +++ b/dozer-api/src/grpc/grpc_web_middleware.rs @@ -93,7 +93,7 @@ where { if enabled { let service = GrpcWebLayer::new().layer(service); - let service = CorsLayer::very_permissive().layer(service); + let service = CorsLayer::permissive().layer(service); MaybeGrpcWebService::GrpcWeb(service) } else { MaybeGrpcWebService::NoGrpcWeb(service) From 24d413a9db46fc8e318060cbef45702ec1ad1bd4 Mon Sep 17 00:00:00 2001 From: Bei Chu <914745487@qq.com> Date: Fri, 22 Sep 2023 20:28:49 +0800 Subject: [PATCH 8/8] fix: Don't use blocking `Mutex` in async context (#2073) --- Cargo.lock | 11 +-- dozer-api/Cargo.toml | 1 + .../grpc/internal/internal_pipeline_server.rs | 54 ++++++++------ dozer-cli/src/pipeline/builder.rs | 2 +- dozer-cli/src/pipeline/log_sink.rs | 50 ++++++------- dozer-cli/src/simple/executor.rs | 2 +- dozer-log/src/replication/mod.rs | 15 ++-- dozer-log/src/replication/tests.rs | 70 +++++++++++-------- 8 files changed, 110 insertions(+), 95 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5944557aa0..ba4a79503a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -721,9 +721,9 @@ dependencies = [ [[package]] name = "async-stream" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad445822218ce64be7a341abfb0b1ea43b5c23aa83902542a4542e78309d8e5e" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" dependencies = [ "async-stream-impl", "futures-core", @@ -732,13 +732,13 @@ dependencies = [ [[package]] name = "async-stream-impl" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4655ae1a7b0cdf149156f780c5bf3f1352bc53cbd9e0a361a7ef7b22947e965" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.29", ] [[package]] @@ -2598,6 +2598,7 @@ dependencies = [ "actix-web", "actix-web-httpauth", "arc-swap", + "async-stream", "async-trait", "bytes", "dozer-cache", diff --git a/dozer-api/Cargo.toml b/dozer-api/Cargo.toml index d0446b3c02..894eb9c158 100644 --- a/dozer-api/Cargo.toml +++ b/dozer-api/Cargo.toml @@ -50,6 +50,7 @@ http-body = "0.4.5" bytes = "1.4.0" http = "0.2.9" pin-project = "1.1.3" +async-stream = "0.3.5" [dev-dependencies] tempdir = "0.3.7" diff --git a/dozer-api/src/grpc/internal/internal_pipeline_server.rs b/dozer-api/src/grpc/internal/internal_pipeline_server.rs index f09053f2a6..1e7b4d9745 100644 --- a/dozer-api/src/grpc/internal/internal_pipeline_server.rs +++ b/dozer-api/src/grpc/internal/internal_pipeline_server.rs @@ -1,3 +1,4 @@ +use async_stream::stream; use dozer_cache::dozer_log::home_dir::BuildId; use dozer_cache::dozer_log::replication::{Log, LogResponseFuture}; use dozer_types::bincode; @@ -11,16 +12,15 @@ use dozer_types::grpc_types::internal::{ use dozer_types::log::info; use dozer_types::models::api_config::AppGrpcOptions; use dozer_types::models::api_endpoint::ApiEndpoint; -use dozer_types::parking_lot::Mutex; use dozer_types::tonic::transport::server::TcpIncoming; use dozer_types::tonic::transport::Server; use dozer_types::tonic::{self, Request, Response, Status, Streaming}; -use futures_util::future::Either; use futures_util::stream::BoxStream; -use futures_util::{Future, StreamExt, TryStreamExt}; +use futures_util::{Future, StreamExt}; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; +use tokio::sync::Mutex; use crate::errors::GrpcError; use crate::grpc::run_server; @@ -52,7 +52,7 @@ impl InternalPipelineService for InternalPipelineServer { ) -> Result, Status> { let endpoint = request.into_inner().endpoint; let log = &find_log_endpoint(&self.endpoints, &endpoint)?.log; - let storage = log.lock().describe_storage(); + let storage = log.lock().await.describe_storage(); Ok(Response::new(StorageResponse { storage: Some(storage), })) @@ -103,26 +103,34 @@ impl InternalPipelineService for InternalPipelineServer { requests: Request>, ) -> Result, Status> { let endpoints = self.endpoints.clone(); - Ok(Response::new( - requests - .into_inner() - .and_then(move |request| { - let log = &match find_log_endpoint(&endpoints, &request.endpoint) { - Ok(log) => log, - Err(e) => return Either::Left(std::future::ready(Err(e))), + let requests = requests.into_inner(); + let stream = stream! { + for await request in requests { + let request = match request { + Ok(request) => request, + Err(e) => { + yield Err(e); + continue; } - .log; - - let response = log.lock().read( - request.start as usize..request.end as usize, - Duration::from_millis(request.timeout_in_millis as u64), - log.clone(), - ); - - Either::Right(serialize_log_response(response)) - }) - .boxed(), - )) + }; + + let log = &match find_log_endpoint(&endpoints, &request.endpoint) { + Ok(log) => log, + Err(e) => { + yield Err(e); + continue; + } + }.log; + + let response = log.lock().await.read( + request.start as usize..request.end as usize, + Duration::from_millis(request.timeout_in_millis as u64), + log.clone(), + ).await; + yield serialize_log_response(response).await; + } + }; + Ok(Response::new(stream.boxed())) } } diff --git a/dozer-cli/src/pipeline/builder.rs b/dozer-cli/src/pipeline/builder.rs index ce393ae261..4229804321 100644 --- a/dozer-cli/src/pipeline/builder.rs +++ b/dozer-cli/src/pipeline/builder.rs @@ -18,9 +18,9 @@ use dozer_types::models::connection::Connection; use dozer_types::models::flags::Flags; use dozer_types::models::source::Source; use dozer_types::models::udf_config::UdfConfig; -use dozer_types::parking_lot::Mutex; use std::hash::Hash; use tokio::runtime::Runtime; +use tokio::sync::Mutex; use crate::pipeline::dummy_sink::DummySinkFactory; use crate::pipeline::LogSinkFactory; diff --git a/dozer-cli/src/pipeline/log_sink.rs b/dozer-cli/src/pipeline/log_sink.rs index f04bf00952..06fafc9169 100644 --- a/dozer-cli/src/pipeline/log_sink.rs +++ b/dozer-cli/src/pipeline/log_sink.rs @@ -12,10 +12,10 @@ use dozer_core::{ }; use dozer_recordstore::ProcessorRecordStore; use dozer_tracing::LabelsAndProgress; +use dozer_types::errors::internal::BoxedError; use dozer_types::indicatif::ProgressBar; use dozer_types::types::Schema; -use dozer_types::{errors::internal::BoxedError, parking_lot::Mutex}; -use tokio::runtime::Runtime; +use tokio::{runtime::Runtime, sync::Mutex}; #[derive(Debug)] pub struct LogSinkFactory { @@ -69,7 +69,6 @@ pub struct LogSink { runtime: Arc, log: Arc>, pb: ProgressBar, - counter: u64, } impl LogSink { @@ -80,18 +79,7 @@ impl LogSink { labels: LabelsAndProgress, ) -> Self { let pb = labels.create_progress_bar(endpoint_name); - let counter = log.lock().end() as u64; - Self { - runtime, - log, - pb, - counter, - } - } - - fn update_counter(&mut self) { - self.counter += 1; - self.pb.set_position(self.counter); + Self { runtime, log, pb } } } @@ -102,35 +90,39 @@ impl Sink for LogSink { record_store: &ProcessorRecordStore, op: ProcessorOperation, ) -> Result<(), BoxedError> { - self.log - .lock() - .write(dozer_cache::dozer_log::replication::LogOperation::Op { + let end = self.runtime.block_on(self.log.lock()).write( + dozer_cache::dozer_log::replication::LogOperation::Op { op: op.load(record_store)?, - }); - self.update_counter(); + }, + ); + self.pb.set_position(end as u64); Ok(()) } fn commit(&mut self, epoch_details: &Epoch) -> Result<(), BoxedError> { - self.log.lock().write(LogOperation::Commit { - decision_instant: epoch_details.decision_instant, - }); - self.update_counter(); + let end = self + .runtime + .block_on(self.log.lock()) + .write(LogOperation::Commit { + decision_instant: epoch_details.decision_instant, + }); + self.pb.set_position(end as u64); Ok(()) } fn persist(&mut self, queue: &Queue) -> Result<(), BoxedError> { - self.log - .lock() + self.runtime + .block_on(self.log.lock()) .persist(queue, self.log.clone(), &self.runtime)?; Ok(()) } fn on_source_snapshotting_done(&mut self, connection_name: String) -> Result<(), BoxedError> { - self.log - .lock() + let end = self + .runtime + .block_on(self.log.lock()) .write(LogOperation::SnapshottingDone { connection_name }); - self.update_counter(); + self.pb.set_position(end as u64); Ok(()) } } diff --git a/dozer-cli/src/simple/executor.rs b/dozer-cli/src/simple/executor.rs index 30de921be0..09b12d89d3 100644 --- a/dozer-cli/src/simple/executor.rs +++ b/dozer-cli/src/simple/executor.rs @@ -6,8 +6,8 @@ use dozer_core::checkpoint::{CheckpointFactory, CheckpointFactoryOptions, Option use dozer_tracing::LabelsAndProgress; use dozer_types::models::api_endpoint::ApiEndpoint; use dozer_types::models::flags::Flags; -use dozer_types::parking_lot::Mutex; use tokio::runtime::Runtime; +use tokio::sync::Mutex; use std::sync::{atomic::AtomicBool, Arc}; diff --git a/dozer-log/src/replication/mod.rs b/dozer-log/src/replication/mod.rs index 6d59d4d924..c6e27588d7 100644 --- a/dozer-log/src/replication/mod.rs +++ b/dozer-log/src/replication/mod.rs @@ -5,13 +5,13 @@ use std::time::{Duration, SystemTime}; use dozer_types::grpc_types::internal::storage_response; use dozer_types::log::{debug, error}; -use dozer_types::parking_lot::Mutex; use dozer_types::serde::{Deserialize, Serialize}; use dozer_types::types::Operation; use dozer_types::{bincode, thiserror}; use pin_project::pin_project; use tokio::runtime::Runtime; use tokio::sync::oneshot::error::RecvError; +use tokio::sync::Mutex; use tokio::task::JoinHandle; use crate::storage::{Queue, Storage}; @@ -120,7 +120,8 @@ impl Log { self.in_memory.end() } - pub fn write(&mut self, op: LogOperation) { + /// Returns the log length after this write. + pub fn write(&mut self, op: LogOperation) -> usize { // Record operation. self.in_memory.ops.push(op); @@ -138,6 +139,8 @@ impl Log { true } }); + + self.end() } pub fn persist( @@ -169,7 +172,7 @@ impl Log { } }; - let mut this = this.lock(); + let mut this = this.lock().await; let this = this.deref_mut(); debug_assert!(persisted_log_entries_end(&this.persisted).unwrap_or(0) == range.start); debug_assert!(this.in_memory.start == range.start); @@ -198,7 +201,9 @@ impl Log { } /// Returned `LogResponse` is guaranteed to contain `request.start`, but may actually contain less then `request.end`. - pub fn read( + /// + /// Function is marked as `async` because it needs to run in a tokio runtime. + pub async fn read( &mut self, request: Range, timeout: Duration, @@ -230,7 +235,7 @@ impl Log { tokio::spawn(async move { // Try to trigger watcher when timeout. tokio::time::sleep(timeout).await; - let mut this = this.lock(); + let mut this = this.lock().await; let this = this.deref_mut(); // Find the watcher. It may have been triggered by slice fulfillment or persisting. if let Some((index, watcher)) = this diff --git a/dozer-log/src/replication/tests.rs b/dozer-log/src/replication/tests.rs index dd7fc47be5..5b315d67af 100644 --- a/dozer-log/src/replication/tests.rs +++ b/dozer-log/src/replication/tests.rs @@ -1,8 +1,7 @@ use std::{sync::Arc, time::Duration}; -use dozer_types::parking_lot::Mutex; use tempdir::TempDir; -use tokio::runtime::Runtime; +use tokio::{runtime::Runtime, sync::Mutex}; use crate::{ replication::{Log, LogOperation, LogResponse}, @@ -24,7 +23,6 @@ fn create_runtime() -> Arc { .into() } -#[allow(clippy::await_holding_lock)] #[tokio::test] async fn write_read() { let (_temp_dir, log, _) = create_test_log().await; @@ -41,26 +39,30 @@ async fn write_read() { }, ]; - let mut log_mut = log.lock(); + let mut log_mut = log.lock().await; for op in &ops { log_mut.write(op.clone()); } let range = 1..ops.len(); - let ops_read_future = log_mut.read(range.clone(), Duration::from_secs(1), log.clone()); + let ops_read_future = log_mut + .read(range.clone(), Duration::from_secs(1), log.clone()) + .await; drop(log_mut); let ops_read = ops_read_future.await.unwrap(); assert_eq!(ops_read, LogResponse::Operations(ops[range].to_vec())); } -#[allow(clippy::await_holding_lock)] #[tokio::test] async fn watch_write() { let (_temp_dir, log, _) = create_test_log().await; let range = 1..3; - let mut log_mut = log.lock(); - let handle = tokio::spawn(log_mut.read(range.clone(), Duration::from_secs(1), log.clone())); + let mut log_mut = log.lock().await; + let ops_read_future = log_mut + .read(range.clone(), Duration::from_secs(1), log.clone()) + .await; + let handle = tokio::spawn(ops_read_future); let ops = vec![ LogOperation::SnapshottingDone { @@ -82,16 +84,14 @@ async fn watch_write() { assert_eq!(ops_read, LogResponse::Operations(ops[range].to_vec())); } -#[allow(clippy::async_yields_async)] #[test] fn watch_partial() { let runtime = create_runtime(); let (_temp_dir, log, queue) = runtime.block_on(create_test_log()); - let mut log_mut = log.lock(); - let future = - runtime.block_on(async { log_mut.read(1..3, Duration::from_secs(1), log.clone()) }); + let mut log_mut = runtime.block_on(log.lock()); + let future = runtime.block_on(log_mut.read(1..3, Duration::from_secs(1), log.clone())); let handle = runtime.spawn(future); let ops = vec![ @@ -109,7 +109,8 @@ fn watch_partial() { // Persist must be called outside of tokio runtime. let runtime_clone = runtime.clone(); std::thread::spawn(move || { - log.lock() + runtime_clone + .block_on(log.lock()) .persist(&queue, log.clone(), &runtime_clone) .unwrap(); }) @@ -120,7 +121,6 @@ fn watch_partial() { assert_eq!(ops_read, LogResponse::Operations(ops[1..].to_vec())); } -#[allow(clippy::async_yields_async)] #[test] fn watch_out_of_range() { let runtime = create_runtime(); @@ -128,9 +128,8 @@ fn watch_out_of_range() { let (_temp_dir, log, queue) = runtime.block_on(create_test_log()); let range = 2..3; - let mut log_mut = log.lock(); - let future = runtime - .block_on(async { log_mut.read(range.clone(), Duration::from_secs(1), log.clone()) }); + let mut log_mut = runtime.block_on(log.lock()); + let future = runtime.block_on(log_mut.read(range.clone(), Duration::from_secs(1), log.clone())); let handle = runtime.spawn(future); let ops = vec![ @@ -153,15 +152,15 @@ fn watch_out_of_range() { // Persist must be called outside of tokio runtime. let runtime_clone = runtime.clone(); std::thread::spawn(move || { - log_clone - .lock() + runtime_clone + .block_on(log_clone.lock()) .persist(&queue, log_clone.clone(), &runtime_clone) .unwrap(); }) .join() .unwrap(); - log.lock().write(ops[2].clone()); + runtime.block_on(log.lock()).write(ops[2].clone()); let ops_read = runtime.block_on(handle).unwrap().unwrap(); assert_eq!(ops_read, LogResponse::Operations(ops[range].to_vec())); @@ -184,7 +183,7 @@ fn in_memory_log_should_shrink_after_persist() { connection_name: "2".to_string(), }, ]; - let mut log_mut = log.lock(); + let mut log_mut = runtime.block_on(log.lock()); log_mut.write(ops[0].clone()); log_mut.write(ops[1].clone()); drop(log_mut); @@ -193,31 +192,39 @@ fn in_memory_log_should_shrink_after_persist() { // Persist must be called outside of tokio runtime. let runtime_clone = runtime.clone(); let handle = std::thread::spawn(move || { - log_clone - .lock() + runtime_clone + .block_on(log_clone.lock()) .persist(&queue, log_clone.clone(), &runtime_clone) .unwrap() }) .join() .unwrap(); - log.lock().write(ops[2].clone()); + runtime.block_on(log.lock()).write(ops[2].clone()); runtime.block_on(handle).unwrap().unwrap(); assert!(matches!( runtime - .block_on(log.lock().read(0..1, Duration::from_secs(1), log.clone())) + .block_on(async move { + log.lock() + .await + .read(0..1, Duration::from_secs(1), log.clone()) + .await + .await + }) .unwrap(), LogResponse::Persisted(_) )); } -#[allow(clippy::await_holding_lock)] #[tokio::test] async fn watch_partial_timeout() { let (_temp_dir, log, _) = create_test_log().await; - let mut log_mut = log.lock(); - let handle = tokio::spawn(log_mut.read(0..2, Duration::from_secs(0), log.clone())); + let mut log_mut = log.lock().await; + let ops_read_future = log_mut + .read(0..2, Duration::from_secs(0), log.clone()) + .await; + let handle = tokio::spawn(ops_read_future); let op = LogOperation::SnapshottingDone { connection_name: "0".to_string(), @@ -229,7 +236,6 @@ async fn watch_partial_timeout() { assert_eq!(ops_read, LogResponse::Operations(vec![op])); } -#[allow(clippy::await_holding_lock)] #[tokio::test] async fn write_watch_partial_timeout() { let (_temp_dir, log, _) = create_test_log().await; @@ -237,10 +243,12 @@ async fn write_watch_partial_timeout() { let op = LogOperation::SnapshottingDone { connection_name: "0".to_string(), }; - let mut log_mut = log.lock(); + let mut log_mut = log.lock().await; log_mut.write(op.clone()); - let ops_read_future = log_mut.read(0..2, Duration::from_secs(0), log.clone()); + let ops_read_future = log_mut + .read(0..2, Duration::from_secs(0), log.clone()) + .await; drop(log_mut); let ops_read = ops_read_future.await.unwrap(); assert_eq!(ops_read, LogResponse::Operations(vec![op]));