Skip to content

Commit

Permalink
feat: json & jsonb and their array type support (#1200)
Browse files Browse the repository at this point in the history
* chore: introduce types

* chore: rename Bson to Json

* chore: rename Bson to Json

* chore: Json arrow conversion (initial attempt)

* merge with the newest master

* merge with the newest master

* merge with the newest master

* merge with the newest master

* merge with the newest master

* merge with the newest master

* merge with the newest master

* merge with the newest master

* merge with the newest master

* supporting json

* supporting json

* supporting json

* supporting json

* supporting json

* supporting json

* supporting json

* supporting json

* supporting json

* supporting json

* supporting json

* supporting json

* supporting json

* supporting json array & jsonb array

* addressed comments

* addressed comments

* addressed comments

* addressed comments

* addressed comments

* addressed comments

* addressed comments

* addressed comments

* addressed comments

* addressed comments

* addressed comments

* Update dozer-ingestion/src/errors.rs

Co-authored-by: Bei Chu <[email protected]>
Signed-off-by: Chloe Kim <[email protected]>

* addressed comments

* addressed comments

* addressed comments

* addressed comments

* addressed comments

* addressed comments

* addressed comments

* addressed comments

* addressed comments

* addressed comments

* addressed comments

* addressed comments

* addressed comments

* addressed comments

* addressed comments

* addressed comments

* addressed comments

* addressed comments

---------

Signed-off-by: Chloe Kim <[email protected]>
Co-authored-by: Chloe Kim <[email protected]>
Co-authored-by: Chloe Kim <[email protected]>
Co-authored-by: Bei Chu <[email protected]>
  • Loading branch information
4 people authored Apr 28, 2023
1 parent fbd69ab commit 0d226e3
Show file tree
Hide file tree
Showing 49 changed files with 705 additions and 311 deletions.
2 changes: 1 addition & 1 deletion dozer-api/src/cache_builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ fn generate_secondary_indexes(
}

// Skip creating indexes
FieldType::Text | FieldType::Binary | FieldType::Bson => (),
FieldType::Text | FieldType::Binary | FieldType::Json => (),
}
}

Expand Down
16 changes: 11 additions & 5 deletions dozer-api/src/generator/oapi/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,17 @@ impl<'a> OpenApiGenerator<'a> {
FieldType::Float => Value::from(1.1),
FieldType::Boolean => Value::from(true),
FieldType::String => Value::from("foo".to_string()),
FieldType::Binary
| FieldType::Decimal
| FieldType::Timestamp
| FieldType::Bson => Value::Null,

FieldType::Binary | FieldType::Decimal | FieldType::Timestamp => Value::Null,
FieldType::Json => {
json!([{
"name": "John Doe",
"age": 43,
"phones": [
"+44 1234567",
"+44 2345678"
]
}])
}
FieldType::Text => Value::from("lorem ipsum".to_string()),
FieldType::Date => Value::from("2022-11-24"),
FieldType::Point => {
Expand Down
49 changes: 21 additions & 28 deletions dozer-api/src/generator/oapi/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use dozer_types::{
types::{FieldType, DATE_FORMAT},
};
use openapiv3::{
ArrayType, Contact, IntegerFormat, IntegerType, MediaType, NumberFormat, NumberType,
AnySchema, ArrayType, Contact, IntegerFormat, IntegerType, MediaType, NumberFormat, NumberType,
ObjectType, Parameter, ParameterData, ParameterSchemaOrContent, PathStyle, ReferenceOr,
Response, Schema, SchemaData, SchemaKind, StringFormat, StringType, Type,
VariantOrUnknownOrEmpty,
Expand Down Expand Up @@ -82,7 +82,7 @@ pub fn convert_cache_to_oapi_schema(
field.name,
ReferenceOr::boxed_item(Schema {
schema_data: Default::default(),
schema_kind: SchemaKind::Type(convert_cache_type_to_schema_type(field.typ)),
schema_kind: convert_cache_type_to_schema_type(field.typ),
}),
);
}
Expand All @@ -101,21 +101,21 @@ pub fn convert_cache_to_oapi_schema(
}

/// Should be consistent with `field_to_json_value`.
fn convert_cache_type_to_schema_type(field_type: dozer_types::types::FieldType) -> Type {
fn convert_cache_type_to_schema_type(field_type: dozer_types::types::FieldType) -> SchemaKind {
match field_type {
FieldType::UInt | FieldType::Int => Type::Integer(IntegerType {
FieldType::UInt | FieldType::Int => SchemaKind::Type(Type::Integer(IntegerType {
format: VariantOrUnknownOrEmpty::Item(IntegerFormat::Int64),
..Default::default()
}),
FieldType::U128 | FieldType::I128 => Type::String(StringType {
})),
FieldType::U128 | FieldType::I128 => SchemaKind::Type(Type::String(StringType {
format: VariantOrUnknownOrEmpty::Empty,
..Default::default()
}),
FieldType::Float => Type::Number(NumberType {
})),
FieldType::Float => SchemaKind::Type(Type::Number(NumberType {
format: VariantOrUnknownOrEmpty::Item(NumberFormat::Double),
..Default::default()
}),
FieldType::Boolean => Type::Boolean {},
})),
FieldType::Boolean => SchemaKind::Type(Type::Boolean {}),
FieldType::String
| FieldType::Text
| FieldType::Decimal
Expand All @@ -134,65 +134,58 @@ fn convert_cache_type_to_schema_type(field_type: dozer_types::types::FieldType)
} else {
(VariantOrUnknownOrEmpty::Empty, None)
};
Type::String(StringType {
SchemaKind::Type(Type::String(StringType {
format,
pattern,
..Default::default()
})
}))
}
FieldType::Binary | FieldType::Bson => Type::Array(ArrayType {
FieldType::Json => SchemaKind::Any(AnySchema::default()),
FieldType::Binary => SchemaKind::Type(Type::Array(ArrayType {
items: Some(ReferenceOr::Item(Box::new(u8_schema()))),
min_items: None,
max_items: None,
unique_items: false,
}),
})),
FieldType::Point => {
let mut properties: IndexMap<String, ReferenceOr<Box<Schema>>> = IndexMap::new();
let required: Vec<String> = vec!["x".to_string(), "y".to_string()];

for key in &required {
properties.insert(
key.clone(),
ReferenceOr::boxed_item(Schema {
schema_data: Default::default(),
schema_kind: SchemaKind::Type(convert_cache_type_to_schema_type(
FieldType::Float,
)),
schema_kind: convert_cache_type_to_schema_type(FieldType::Float),
}),
);
}

Type::Object(ObjectType {
SchemaKind::Type(Type::Object(ObjectType {
properties,
required,
additional_properties: None,
min_properties: None,
max_properties: None,
})
}))
}
FieldType::Duration => {
let mut properties: IndexMap<String, ReferenceOr<Box<Schema>>> = IndexMap::new();
let required: Vec<String> = vec!["value".to_string(), "time_unit".to_string()];

for key in &required {
properties.insert(
key.clone(),
ReferenceOr::boxed_item(Schema {
schema_data: Default::default(),
schema_kind: SchemaKind::Type(convert_cache_type_to_schema_type(
FieldType::String,
)),
schema_kind: convert_cache_type_to_schema_type(FieldType::String),
}),
);
}

Type::Object(ObjectType {
SchemaKind::Type(Type::Object(ObjectType {
properties,
required,
additional_properties: None,
min_properties: None,
max_properties: None,
})
}))
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions dozer-api/src/generator/protoc/generator/implementation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const POINT_TYPE_CLASS: &str = "dozer.types.PointType";
const DURATION_TYPE_CLASS: &str = "dozer.types.DurationType";
const DECIMAL_TYPE_CLASS: &str = "dozer.types.RustDecimal";
const TIMESTAMP_TYPE_CLASS: &str = "google.protobuf.Timestamp";
const JSON_TYPE_CLASS: &str = "google.protobuf.Value";

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(crate = "self::serde")]
Expand Down Expand Up @@ -88,7 +89,7 @@ impl<'a> ProtoGeneratorImpl<'a> {
}

fn libs_by_type(&self) -> Result<Vec<String>, GenerationError> {
let type_need_import_libs = [TIMESTAMP_TYPE_CLASS];
let type_need_import_libs = [TIMESTAMP_TYPE_CLASS, JSON_TYPE_CLASS];
let mut libs_import: Vec<String> = self
.schema
.fields
Expand All @@ -99,6 +100,7 @@ impl<'a> ProtoGeneratorImpl<'a> {
})
.map(|proto_type| match proto_type.as_str() {
TIMESTAMP_TYPE_CLASS => "google/protobuf/timestamp.proto".to_owned(),
JSON_TYPE_CLASS => "google/protobuf/struct.proto".to_owned(),
_ => "".to_owned(),
})
.collect();
Expand Down Expand Up @@ -381,7 +383,7 @@ fn convert_dozer_type_to_proto_type(field_type: FieldType) -> Result<String, Gen
FieldType::Decimal => Ok(DECIMAL_TYPE_CLASS.to_owned()),
FieldType::Timestamp => Ok(TIMESTAMP_TYPE_CLASS.to_owned()),
FieldType::Date => Ok("string".to_owned()),
FieldType::Bson => Ok("bytes".to_owned()),
FieldType::Json => Ok(JSON_TYPE_CLASS.to_owned()),
FieldType::Point => Ok(POINT_TYPE_CLASS.to_owned()),
FieldType::Duration => Ok(DURATION_TYPE_CLASS.to_owned()),
}
Expand Down
1 change: 1 addition & 0 deletions dozer-api/src/grpc/typed/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ fn interval_value_to_pb(
}
GrpcTypes::value::Value::TimestampValue(ts) => Value::Message(ts.transcode_to_dynamic()),
GrpcTypes::value::Value::DateValue(d) => Value::String(d),
GrpcTypes::value::Value::JsonValue(v) => Value::Message(v.transcode_to_dynamic()),
})
}

Expand Down
25 changes: 21 additions & 4 deletions dozer-api/src/grpc/typed/tests/utils.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::env;

use super::super::helper::count_response_to_typed_response;
use super::super::helper::query_response_to_typed_response;
use crate::{
generator::protoc::generator::ProtoGenerator,
test_utils::{self, get_sample_records},
};

use super::super::helper::query_response_to_typed_response;
use std::env;

#[test]
fn test_records_to_typed_response() {
Expand All @@ -23,3 +22,21 @@ fn test_records_to_typed_response() {
.get_field_by_name(service_desc.query.response_desc.records_field.name());
assert!(records.is_some(), "records must be present");
}

#[test]
fn test_count_records_to_typed_response() {
let res = env::current_dir().unwrap();
let path = res.join("src/grpc/typed/tests/generated_films.bin");

let (schema, _) = test_utils::get_schema();
let service_desc = ProtoGenerator::read_schema(&path, "films").unwrap();

let records = get_sample_records(schema);
let res =
count_response_to_typed_response(records.len(), service_desc.count.response_desc.clone())
.unwrap();
let res_records = res
.message
.get_field_by_name(service_desc.count.response_desc.count_field.name());
assert_eq!(records.len() as u64, res_records.unwrap().as_u64().unwrap());
}
7 changes: 4 additions & 3 deletions dozer-api/src/grpc/types_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use dozer_types::grpc_types::types::{
value, DurationType, Operation, OperationType, PointType, Record, RecordWithId, RustDecimal,
Type, Value,
};
use dozer_types::json_types::json_value_to_prost;
use dozer_types::ordered_float::OrderedFloat;
use dozer_types::rust_decimal::Decimal;
use dozer_types::types::{DozerDuration, Field, FieldType, DATE_FORMAT};
Expand Down Expand Up @@ -125,8 +126,8 @@ fn field_to_prost_value(f: Field) -> Value {
nanos: ts.timestamp_subsec_nanos() as i32,
})),
},
Field::Bson(b) => Value {
value: Some(value::Value::BytesValue(b)),
Field::Json(b) => Value {
value: Some(value::Value::JsonValue(json_value_to_prost(b))),
},
Field::Null => Value { value: None },
Field::Date(date) => Value {
Expand Down Expand Up @@ -165,7 +166,7 @@ fn field_type_to_internal_type(typ: FieldType) -> Type {
FieldType::Binary => Type::Binary,
FieldType::Decimal => Type::Decimal,
FieldType::Timestamp => Type::Timestamp,
FieldType::Bson => Type::Bson,
FieldType::Json => Type::Json,
FieldType::Date => Type::String,
FieldType::Point => Type::Point,
FieldType::Duration => Type::Duration,
Expand Down
4 changes: 2 additions & 2 deletions dozer-api/src/rest/api_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use dozer_cache::cache::CacheRecord;
use dozer_cache::CacheReader;
use dozer_types::errors::types::TypeError;
use dozer_types::indexmap::IndexMap;
use dozer_types::json_types::field_to_json_value;
use dozer_types::log::warn;
use dozer_types::models::api_endpoint::ApiEndpoint;
use dozer_types::types::{Field, Schema};
Expand All @@ -18,6 +17,7 @@ use crate::generator::oapi::generator::OpenApiGenerator;
use crate::CacheEndpoint;
use crate::{auth::Access, errors::ApiError};
use dozer_types::grpc_types::health::health_check_response::ServingStatus;
use dozer_types::json_types::field_to_json_value;
use dozer_types::serde_json::{json, Value};

fn generate_oapi3(reader: &CacheReader, endpoint: ApiEndpoint) -> Result<OpenAPI, ApiError> {
Expand Down Expand Up @@ -168,7 +168,7 @@ fn record_to_map(
let mut map = IndexMap::new();

for (field_def, field) in schema.fields.iter().zip(record.record.values) {
let val = field_to_json_value(field);
let val = field_to_json_value(field).map_err(TypeError::DeserializationError)?;
map.insert(field_def.name.clone(), val);
}

Expand Down
2 changes: 1 addition & 1 deletion dozer-cache/src/cache/lmdb/cache/main_environment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ fn debug_check_schema_record_consistency(schema: &Schema, record: &Record) {
FieldType::Decimal => debug_assert!(value.as_decimal().is_some()),
FieldType::Timestamp => debug_assert!(value.as_timestamp().is_some()),
FieldType::Date => debug_assert!(value.as_date().is_some()),
FieldType::Bson => debug_assert!(value.as_bson().is_some()),
FieldType::Json => debug_assert!(value.as_json().is_some()),
FieldType::Point => debug_assert!(value.as_point().is_some()),
FieldType::Duration => debug_assert!(value.as_duration().is_some()),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ mod tests {
use dozer_storage::{
lmdb::DatabaseFlags, lmdb_sys::mdb_cmp, LmdbEnvironment, RwLmdbEnvironment,
};

use dozer_types::json_types::JsonValue;
use dozer_types::{
chrono::{DateTime, NaiveDate, TimeZone, Utc},
ordered_float::OrderedFloat,
Expand Down Expand Up @@ -232,7 +234,9 @@ mod tests {
Field::Decimal(Decimal::new(i64::MAX, 0)),
Field::Timestamp(DateTime::from(Utc.timestamp_millis_opt(1).unwrap())),
Field::Date(NaiveDate::from_ymd_opt(2020, 1, 2).unwrap()),
Field::Bson(vec![255]),
Field::Json(JsonValue::Array(vec![JsonValue::Number(OrderedFloat(
255_f64,
))])),
];
for a in test_cases.iter() {
check(a);
Expand Down
Loading

0 comments on commit 0d226e3

Please sign in to comment.