Skip to content

Commit

Permalink
refactor: Use default and Option in a more principled way
Browse files Browse the repository at this point in the history
Also add `#[serde(deny_unknown_fields)]`
  • Loading branch information
chubei committed Sep 22, 2023
1 parent c2ec780 commit b600f96
Show file tree
Hide file tree
Showing 77 changed files with 1,269 additions and 1,935 deletions.
2 changes: 1 addition & 1 deletion dozer-api/src/generator/oapi/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl<'a> OpenApiGenerator<'a> {
parameters: vec![ReferenceOr::Item(Parameter::Path {
parameter_data: ParameterData {
name: "id".to_owned(),
description: Some(format!("Primary key of the document - {} ", self.endpoint.index.as_ref().map_or(String::new(), |index| index.primary_key.join(", ")))),
description: Some(format!("Primary key of the document - {} ", self.endpoint.index.primary_key.join(", "))),
required: true,
format: ParameterSchemaOrContent::Schema(ReferenceOr::Item(Schema {
schema_data: SchemaData {
Expand Down
21 changes: 12 additions & 9 deletions dozer-api/src/grpc/client_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use dozer_types::grpc_types::{
common::common_grpc_service_server::CommonGrpcServiceServer,
health::health_grpc_service_server::HealthGrpcServiceServer,
};
use dozer_types::models::api_config::{default_grpc_port, default_host};
use dozer_types::models::flags::default_dynamic;
use dozer_types::tonic::transport::server::TcpIncoming;
use dozer_types::tonic::transport::Server;
use dozer_types::tracing::Level;
Expand Down Expand Up @@ -62,7 +64,7 @@ impl ApiServer {
let security = get_api_security(self.security.to_owned());

// Service handling dynamic gRPC requests.
let typed_service = if self.flags.dynamic {
let typed_service = if self.flags.dynamic.unwrap_or_else(default_dynamic) {
Some(TypedService::new(
cache_endpoints,
operations_receiver,
Expand All @@ -78,8 +80,8 @@ impl ApiServer {

pub fn new(grpc_config: GrpcApiOptions, security: Option<ApiSecurity>, flags: Flags) -> Self {
Self {
port: grpc_config.port as u16,
host: grpc_config.host,
port: grpc_config.port.unwrap_or_else(default_grpc_port),
host: grpc_config.host.unwrap_or_else(default_host),
security,
flags,
}
Expand All @@ -95,22 +97,23 @@ impl ApiServer {
default_max_num_records: usize,
) -> Result<impl Future<Output = Result<(), dozer_types::tonic::transport::Error>>, ApiInitError>
{
let grpc_web = self.flags.grpc_web.unwrap_or(true);
// Create our services.
let common_service = CommonGrpcServiceServer::new(CommonService::new(
cache_endpoints.clone(),
operations_receiver.as_ref().map(|r| r.resubscribe()),
default_max_num_records,
));
let common_service = enable_grpc_web(common_service, self.flags.grpc_web);
let common_service = enable_grpc_web(common_service, grpc_web);

let (typed_service, reflection_service) = self.get_dynamic_service(
cache_endpoints,
operations_receiver,
default_max_num_records,
)?;
let typed_service =
typed_service.map(|typed_service| enable_grpc_web(typed_service, self.flags.grpc_web));
let reflection_service = enable_grpc_web(reflection_service, self.flags.grpc_web);
typed_service.map(|typed_service| enable_grpc_web(typed_service, grpc_web));
let reflection_service = enable_grpc_web(reflection_service, grpc_web);

let mut service_map: HashMap<String, ServingStatus> = HashMap::new();
service_map.insert("".to_string(), ServingStatus::Serving);
Expand All @@ -123,7 +126,7 @@ impl ApiServer {
let health_service = HealthGrpcServiceServer::new(HealthService {
serving_status: service_map,
});
let health_service = enable_grpc_web(health_service, self.flags.grpc_web);
let health_service = enable_grpc_web(health_service, grpc_web);

// Auth middleware.
let security = get_api_security(self.security.to_owned());
Expand All @@ -134,7 +137,7 @@ impl ApiServer {
let typed_service = typed_service.map(|typed_service| auth_middleware.layer(typed_service));
let mut authenticated_reflection_service = None;
let mut unauthenticated_reflection_service = None;
if self.flags.authenticate_server_reflection {
if self.flags.authenticate_server_reflection.unwrap_or(false) {
authenticated_reflection_service = Some(auth_middleware.layer(reflection_service))
} else {
unauthenticated_reflection_service = Some(reflection_service);
Expand All @@ -146,7 +149,7 @@ impl ApiServer {
if security.is_some() {
let service = enable_grpc_web(
AuthGrpcServiceServer::new(AuthService::new(security.to_owned())),
self.flags.grpc_web,
grpc_web,
);
auth_service = Some(auth_middleware.layer(service));
}
Expand Down
10 changes: 8 additions & 2 deletions dozer-api/src/grpc/internal/internal_pipeline_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use dozer_types::grpc_types::internal::{
EndpointResponse, EndpointsResponse, LogRequest, LogResponse, StorageRequest, StorageResponse,
};
use dozer_types::log::info;
use dozer_types::models::api_config::AppGrpcOptions;
use dozer_types::models::api_config::{
default_app_grpc_host, default_app_grpc_port, AppGrpcOptions,
};
use dozer_types::models::api_endpoint::ApiEndpoint;
use dozer_types::tonic::transport::server::TcpIncoming;
use dozer_types::tonic::transport::Server;
Expand Down Expand Up @@ -180,7 +182,11 @@ pub async fn start_internal_pipeline_server(
let server = InternalPipelineServer::new(endpoints);

// Start listening.
let addr = format!("{}:{}", options.host, options.port);
let addr = format!(
"{}:{}",
options.host.clone().unwrap_or_else(default_app_grpc_host),
options.port.unwrap_or_else(default_app_grpc_port)
);
info!("Starting Internal Server on {addr}");
let addr = addr
.parse()
Expand Down
17 changes: 7 additions & 10 deletions dozer-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ impl CacheEndpoint {
cache_labels(endpoint.name.clone(), log_reader_builder.build_name.clone());
cache_labels.extend(labels.labels().clone());
let schema = log_reader_builder.schema.clone();
let conflict_resolution = endpoint.conflict_resolution.unwrap_or_default();
let conflict_resolution = endpoint.conflict_resolution;
let write_options = CacheWriteOptions {
insert_resolution: conflict_resolution.on_insert.unwrap_or_default(),
delete_resolution: conflict_resolution.on_delete.unwrap_or_default(),
update_resolution: conflict_resolution.on_update.unwrap_or_default(),
insert_resolution: conflict_resolution.on_insert,
delete_resolution: conflict_resolution.on_delete,
update_resolution: conflict_resolution.on_update,
..Default::default()
};
let cache = open_or_create_cache(
Expand Down Expand Up @@ -154,18 +154,15 @@ fn get_log_reader_options(endpoint: &ApiEndpoint) -> LogReaderOptions {
endpoint: endpoint.name.clone(),
batch_size: endpoint
.log_reader_options
.as_ref()
.and_then(|options| options.batch_size)
.batch_size
.unwrap_or_else(default_log_reader_batch_size),
timeout_in_millis: endpoint
.log_reader_options
.as_ref()
.and_then(|options| options.timeout_in_millis)
.timeout_in_millis
.unwrap_or_else(default_log_reader_timeout_in_millis),
buffer_size: endpoint
.log_reader_options
.as_ref()
.and_then(|options| options.buffer_size)
.buffer_size
.unwrap_or_else(default_log_reader_buffer_size),
}
}
Expand Down
5 changes: 3 additions & 2 deletions dozer-api/src/rest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use actix_web::{
};
use actix_web_httpauth::middleware::HttpAuthentication;
use dozer_tracing::LabelsAndProgress;
use dozer_types::models::api_config::{default_host, default_rest_port};
use dozer_types::{log::info, models::api_config::RestApiOptions};
use dozer_types::{
models::api_security::ApiSecurity,
Expand Down Expand Up @@ -71,10 +72,10 @@ impl ApiServer {
) -> Self {
Self {
shutdown_timeout: 0,
port: rest_config.port as u16,
port: rest_config.port.unwrap_or_else(default_rest_port),
cors: CorsOptions::Permissive,
security,
host: rest_config.host,
host: rest_config.host.unwrap_or_else(default_host),
default_max_num_records,
}
}
Expand Down
10 changes: 5 additions & 5 deletions dozer-api/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ pub fn get_endpoint() -> ApiEndpoint {
ApiEndpoint {
name: "films".to_string(),
path: "/films".to_string(),
index: Some(ApiIndex {
index: ApiIndex {
primary_key: vec!["film_id".to_string()],
secondary: None,
}),
secondary: Default::default(),
},
table_name: "film".to_string(),
conflict_resolution: None,
log_reader_options: None,
conflict_resolution: Default::default(),
log_reader_options: Default::default(),
version: None,
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ use super::RwMainEnvironment;
fn init_env(conflict_resolution: ConflictResolution) -> (RwMainEnvironment, Schema) {
let schema = schema_multi_indices();
let write_options = CacheWriteOptions {
insert_resolution: conflict_resolution.on_insert.unwrap_or_default(),
delete_resolution: conflict_resolution.on_delete.unwrap_or_default(),
update_resolution: conflict_resolution.on_update.unwrap_or_default(),
insert_resolution: conflict_resolution.on_insert,
delete_resolution: conflict_resolution.on_delete,
update_resolution: conflict_resolution.on_update,
..Default::default()
};
let main_env =
Expand All @@ -25,9 +25,9 @@ fn init_env(conflict_resolution: ConflictResolution) -> (RwMainEnvironment, Sche
#[test]
fn ignore_insert_error_when_type_nothing() {
let (mut env, schema) = init_env(ConflictResolution {
on_insert: Some(OnInsertResolutionTypes::Nothing(())),
on_update: None,
on_delete: None,
on_insert: OnInsertResolutionTypes::Nothing,
on_update: Default::default(),
on_delete: Default::default(),
});

let initial_values = vec![Field::Int(1), Field::String("Film name old".to_string())];
Expand Down Expand Up @@ -56,9 +56,9 @@ fn ignore_insert_error_when_type_nothing() {
#[test]
fn update_after_insert_error_when_type_update() {
let (mut env, schema) = init_env(ConflictResolution {
on_insert: Some(OnInsertResolutionTypes::Update(())),
on_update: None,
on_delete: None,
on_insert: OnInsertResolutionTypes::Update,
on_update: Default::default(),
on_delete: Default::default(),
});

let initial_values = vec![Field::Int(1), Field::String("Film name old".to_string())];
Expand Down Expand Up @@ -102,9 +102,9 @@ fn update_after_insert_error_when_type_update() {
#[test]
fn return_insert_error_when_type_panic() {
let (mut env, schema) = init_env(ConflictResolution {
on_insert: Some(OnInsertResolutionTypes::Panic(())),
on_update: None,
on_delete: None,
on_insert: OnInsertResolutionTypes::Panic,
on_update: Default::default(),
on_delete: Default::default(),
});

let initial_values = vec![Field::Int(1), Field::String("Film name old".to_string())];
Expand All @@ -129,9 +129,9 @@ fn return_insert_error_when_type_panic() {
#[test]
fn ignore_update_error_when_type_nothing() {
let (mut env, schema) = init_env(ConflictResolution {
on_insert: None,
on_update: Some(OnUpdateResolutionTypes::Nothing(())),
on_delete: None,
on_insert: Default::default(),
on_update: OnUpdateResolutionTypes::Nothing,
on_delete: Default::default(),
});

let initial_values = vec![Field::Int(1), Field::Null];
Expand Down Expand Up @@ -159,9 +159,9 @@ fn ignore_update_error_when_type_nothing() {
#[test]
fn update_after_update_error_when_type_upsert() {
let (mut env, schema) = init_env(ConflictResolution {
on_insert: None,
on_update: Some(OnUpdateResolutionTypes::Upsert(())),
on_delete: None,
on_insert: Default::default(),
on_update: OnUpdateResolutionTypes::Upsert,
on_delete: Default::default(),
});

let initial_values = vec![Field::Int(1), Field::Null];
Expand Down Expand Up @@ -191,9 +191,9 @@ fn update_after_update_error_when_type_upsert() {
#[test]
fn return_update_error_when_type_panic() {
let (mut env, _) = init_env(ConflictResolution {
on_insert: None,
on_update: Some(OnUpdateResolutionTypes::Panic(())),
on_delete: None,
on_insert: Default::default(),
on_update: OnUpdateResolutionTypes::Panic,
on_delete: Default::default(),
});

let initial_values = vec![Field::Int(1), Field::Null];
Expand All @@ -219,9 +219,9 @@ fn return_update_error_when_type_panic() {
#[test]
fn ignore_delete_error_when_type_nothing() {
let (mut env, _) = init_env(ConflictResolution {
on_insert: None,
on_update: None,
on_delete: Some(OnDeleteResolutionTypes::Nothing(())),
on_insert: Default::default(),
on_update: Default::default(),
on_delete: OnDeleteResolutionTypes::Nothing,
});

let initial_values = vec![Field::Int(1), Field::Null];
Expand All @@ -242,9 +242,9 @@ fn ignore_delete_error_when_type_nothing() {
#[test]
fn return_delete_error_when_type_panic() {
let (mut env, _) = init_env(ConflictResolution {
on_insert: None,
on_update: None,
on_delete: Some(OnDeleteResolutionTypes::Panic(())),
on_insert: Default::default(),
on_update: Default::default(),
on_delete: OnDeleteResolutionTypes::Panic,
});

let initial_values = vec![Field::Int(1), Field::Null];
Expand Down
21 changes: 10 additions & 11 deletions dozer-cache/src/cache/lmdb/cache/main_environment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,11 @@ impl RwMainEnvironment {
} else {
// The record does not exist. Resolve the conflict.
match self.write_options.delete_resolution {
OnDeleteResolutionTypes::Nothing(()) => {
OnDeleteResolutionTypes::Nothing => {
warn!("Record (Key: {:?}) not found, ignoring delete", key);
Ok(None)
}
OnDeleteResolutionTypes::Panic(()) => Err(CacheError::PrimaryKeyNotFound),
OnDeleteResolutionTypes::Panic => Err(CacheError::PrimaryKeyNotFound),
}
}
}
Expand Down Expand Up @@ -320,8 +320,7 @@ impl RwMainEnvironment {
meta,
}) => {
// Case 5, 6, 7.
if self.write_options.update_resolution
== OnUpdateResolutionTypes::Nothing(())
if self.write_options.update_resolution == OnUpdateResolutionTypes::Nothing
{
// Case 5.
warn!("Old record (Key: {:?}) and new record (Key: {:?}) both exist, ignoring update", get_key_fields(old, schema), get_key_fields(new, schema));
Expand Down Expand Up @@ -367,22 +366,22 @@ impl RwMainEnvironment {
} else {
// Case 2, 3, 4, 9, 10, 11, 12, 13.
match self.write_options.update_resolution {
OnUpdateResolutionTypes::Nothing(()) => {
OnUpdateResolutionTypes::Nothing => {
// Case 2, 9, 12.
warn!("Old record (Key: {:?}) not found, ignoring update", old_key);
Ok(UpsertResult::Ignored)
}
OnUpdateResolutionTypes::Upsert(()) => {
OnUpdateResolutionTypes::Upsert => {
// Case 3, 10, 13.
insert_impl(
operation_log,
txn,
&self.common.schema.0,
new,
OnInsertResolutionTypes::Panic(()),
OnInsertResolutionTypes::Panic,
)
}
OnUpdateResolutionTypes::Panic(()) => {
OnUpdateResolutionTypes::Panic => {
// Case 4, 11, 14.
Err(CacheError::PrimaryKeyNotFound)
}
Expand Down Expand Up @@ -501,19 +500,19 @@ fn insert_impl(
} else {
// Resolve the conflict.
match insert_resolution {
OnInsertResolutionTypes::Nothing(()) => {
OnInsertResolutionTypes::Nothing => {
warn!(
"Record (Key: {:?}) already exist, ignoring insert",
get_key_fields(record, schema)
);
Ok(UpsertResult::Ignored)
}
OnInsertResolutionTypes::Panic(()) => Err(CacheError::PrimaryKeyExists {
OnInsertResolutionTypes::Panic => Err(CacheError::PrimaryKeyExists {
key: get_key_fields(record, schema),
meta,
insert_operation_id,
}),
OnInsertResolutionTypes::Update(()) => {
OnInsertResolutionTypes::Update => {
let new_meta = operation_log.update(
txn,
key.as_ref(),
Expand Down
Loading

0 comments on commit b600f96

Please sign in to comment.