Skip to content

Commit

Permalink
Aerospike connector: expand data type support and tighten source type…
Browse files Browse the repository at this point in the history
… requirements (#2463)

* Aerospike connector: expand data type support and tighten source type
requirements

* Fix test
  • Loading branch information
Jesse-Bakker authored Mar 21, 2024
1 parent a5f8cba commit 9c13ed6
Show file tree
Hide file tree
Showing 2 changed files with 256 additions and 212 deletions.
314 changes: 208 additions & 106 deletions dozer-ingestion/aerospike/src/connector.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use dozer_ingestion_connector::dozer_types::errors::internal::BoxedError;
use dozer_ingestion_connector::dozer_types::errors::types::DeserializationError;
use dozer_ingestion_connector::dozer_types::event::Event;
use dozer_ingestion_connector::dozer_types::json_types::serde_json_to_json_value;
use dozer_ingestion_connector::dozer_types::log::{debug, error, info, trace, warn};
use dozer_ingestion_connector::dozer_types::models::connection::AerospikeConnection;
use dozer_ingestion_connector::dozer_types::models::ingestion_types::{
Expand Down Expand Up @@ -31,16 +33,12 @@ use actix_web::HttpRequest;
use actix_web::HttpServer;
use actix_web::{get, HttpResponse};

use dozer_ingestion_connector::dozer_types::ordered_float::OrderedFloat;
use dozer_ingestion_connector::dozer_types::prost::Message;
use dozer_ingestion_connector::dozer_types::rust_decimal::Decimal;
use dozer_ingestion_connector::dozer_types::serde_json;
use dozer_ingestion_connector::dozer_types::serde_json::Value;

use base64::prelude::*;
use dozer_ingestion_connector::dozer_types::chrono::{
DateTime, FixedOffset, NaiveDate, NaiveDateTime, Utc,
};
use dozer_ingestion_connector::dozer_types::chrono::{DateTime, FixedOffset, NaiveDateTime, Utc};

use dozer_ingestion_connector::dozer_types::thiserror::{self, Error};
use dozer_ingestion_connector::schema_parser::SchemaParser;
Expand Down Expand Up @@ -98,6 +96,9 @@ pub enum AerospikeConnectorError {
#[error("Failed timestamp parsing")]
ParsingTimestampFailed,

#[error("Failed point parsing")]
ParsingPointFailed,

#[error("Failed int parsing")]
ParsingIntFailed,

Expand All @@ -107,6 +108,9 @@ pub enum AerospikeConnectorError {
#[error("Failed float parsing")]
ParsingFloatFailed,

#[error("Failed decimal parsing")]
ParsingDecimalFailed(#[from] dozer_types::rust_decimal::Error),

#[error("Schema not found: {0}")]
SchemaNotFound(String),

Expand All @@ -115,6 +119,12 @@ pub enum AerospikeConnectorError {

#[error("Key is neither string or int")]
KeyNotSupported(Value),

#[error("Failed to parse json")]
JsonParsingFailed(#[from] DeserializationError),

#[error("Failed to parse duration")]
ParsingDurationFailed,
}

#[derive(Deserialize, Debug)]
Expand Down Expand Up @@ -747,113 +757,205 @@ async fn map_record(

pub(crate) fn map_value_to_field(
bin_type: &str,
value: Value,
mut value: Value,
typ: FieldType,
) -> Result<Field, AerospikeConnectorError> {
match value {
Value::Null => Ok(Field::Null),
Value::Bool(b) => match typ {
FieldType::UInt => Ok(Field::UInt(b as u64)),
FieldType::U128 => Ok(Field::U128(b as u128)),
FieldType::Int => Ok(Field::Int(b as i64)),
FieldType::I128 => Ok(Field::I128(b as i128)),
FieldType::Float => Ok(Field::Float(OrderedFloat(if b { 1.0 } else { 0.0 }))),
FieldType::Boolean => Ok(Field::Boolean(b)),
FieldType::String => Ok(Field::String(b.to_string())),
FieldType::Text => Ok(Field::Text(b.to_string())),
FieldType::Binary => Ok(Field::Binary(b.encode_to_vec())),
FieldType::Decimal => Ok(Field::Decimal(Decimal::from(b as i8))),
typ => Err(AerospikeConnectorError::UnsupportedType(typ)),
},
Value::Number(v) => {
match typ {
FieldType::UInt => Ok(Field::UInt(
v.as_u64()
.ok_or(AerospikeConnectorError::ParsingUIntFailed)?,
)),
FieldType::U128 => Ok(Field::U128(
v.as_u64()
.ok_or(AerospikeConnectorError::ParsingUIntFailed)?
as u128,
)),
FieldType::Int => Ok(Field::Int(
v.as_i64()
.ok_or(AerospikeConnectorError::ParsingIntFailed)?,
)),
FieldType::I128 => Ok(Field::I128(
v.as_i64()
.ok_or(AerospikeConnectorError::ParsingIntFailed)?
as i128,
)),
FieldType::Float => Ok(Field::Float(OrderedFloat(
v.as_f64()
.ok_or(AerospikeConnectorError::ParsingFloatFailed)?,
))),
FieldType::Boolean => Ok(Field::Boolean(
v.as_i64()
.ok_or(AerospikeConnectorError::ParsingIntFailed)?
== 1,
)),
FieldType::String => Ok(Field::String(v.to_string())),
FieldType::Text => Ok(Field::Text(v.to_string())),
FieldType::Binary => Ok(Field::Binary(v.to_string().as_bytes().to_vec())),
FieldType::Timestamp => {
// TODO: decide on the format of the timestamp

// Convert the timestamp string into an i64
let timestamp = v
.as_i64()
.ok_or(AerospikeConnectorError::ParsingTimestampFailed)?;

// Create a NaiveDateTime from the timestamp
let naive = NaiveDateTime::from_timestamp_opt(timestamp, 0)
.ok_or(AerospikeConnectorError::InvalidTimestamp(timestamp))?;

// Create a normal DateTime from the NaiveDateTime
let datetime: DateTime<FixedOffset> =
DateTime::<Utc>::from_naive_utc_and_offset(naive, Utc).fixed_offset();
Ok(Field::Timestamp(datetime))
}
FieldType::Date => {
let days = v
.as_i64()
.ok_or(AerospikeConnectorError::ParsingDaysError)?;

let date = NaiveDate::from_num_days_from_ce_opt(days.try_into()?)
.ok_or(AerospikeConnectorError::InvalidDate(days))?;
Ok(Field::Date(date))
}
typ => Err(AerospikeConnectorError::UnsupportedType(typ)),
if value.is_null() {
return Ok(Field::Null);
}
let unsupported_type = || AerospikeConnectorError::UnsupportedTypeForFieldType {
bin_type: bin_type.to_owned(),
field_type: typ,
};
let check_type = |wanted_typ| {
if bin_type == wanted_typ {
Ok(())
} else {
Err(unsupported_type())
}
};
match typ {
FieldType::UInt => {
check_type("int")?;
let number = value.as_number().ok_or_else(unsupported_type)?;
Ok(Field::UInt(number.as_u64().ok_or_else(|| {
AerospikeConnectorError::ParsingUIntFailed
})?))
}
FieldType::Int => {
check_type("int")?;
let number = value.as_number().ok_or_else(unsupported_type)?;
Ok(Field::Int(number.as_i64().ok_or_else(|| {
AerospikeConnectorError::ParsingIntFailed
})?))
}
FieldType::U128 => {
check_type("str")?;
let string = value.as_str().ok_or_else(unsupported_type)?;
Ok(Field::U128(string.parse()?))
}
FieldType::I128 => {
check_type("str")?;
let string = value.as_str().ok_or_else(unsupported_type)?;
Ok(Field::I128(string.parse()?))
}
FieldType::Float => {
check_type("float")?;
let number = value.as_number().ok_or_else(unsupported_type)?;
Ok(Field::Float(
number
.as_f64()
.ok_or(AerospikeConnectorError::ParsingFloatFailed)?
.into(),
))
}
FieldType::Boolean => {
check_type("bool")?;
Ok(Field::Boolean(
value.as_bool().ok_or_else(unsupported_type)?,
))
}
FieldType::String => {
check_type("str")?;
Ok(Field::String(
value.as_str().ok_or_else(unsupported_type)?.to_owned(),
))
}
FieldType::Text => {
check_type("str")?;
Ok(Field::Text(
value.as_str().ok_or_else(unsupported_type)?.to_owned(),
))
}
FieldType::Binary => {
check_type("blob")?;
if bin_type != "blob" {
return Err(unsupported_type());
}
let string = value.as_str().ok_or_else(unsupported_type)?;
Ok(Field::Binary(BASE64_STANDARD.decode(string)?))
}
Value::String(s) => {
match typ {
FieldType::UInt => Ok(Field::UInt(s.as_str().parse()?)),
FieldType::U128 => Ok(Field::U128(s.as_str().parse()?)),
FieldType::Int => Ok(Field::Int(s.as_str().parse()?)),
FieldType::I128 => Ok(Field::I128(s.as_str().parse()?)),
FieldType::Float => Ok(Field::Float(OrderedFloat(s.parse()?))),
FieldType::Boolean => Ok(Field::Boolean(s == "true" || s == "1")),
FieldType::String => Ok(Field::String(s)),
FieldType::Text => Ok(Field::Text(s)),
FieldType::Timestamp => Ok(Field::Timestamp(DateTime::parse_from_rfc3339(&s)?)),
FieldType::Date => {
// TODO: decide on the format of the date

Err(AerospikeConnectorError::UnsupportedType(typ))
}
FieldType::Binary => {
let bytes = BASE64_STANDARD.decode(s.as_bytes())?;
Ok(Field::Binary(bytes))
}
typ => Err(AerospikeConnectorError::UnsupportedType(typ)),
FieldType::Decimal => {
check_type("str")?;
let string = value.as_str().ok_or_else(unsupported_type)?;
Ok(Field::Decimal(string.parse()?))
}
FieldType::Timestamp => {
check_type("str")?;
let string = value.as_str().ok_or_else(unsupported_type)?;
Ok(Field::Timestamp(DateTime::parse_from_rfc3339(string)?))
}
FieldType::Date => {
check_type("str")?;
let string = value.as_str().ok_or_else(unsupported_type)?;
Ok(Field::Date(string.parse()?))
}
FieldType::Json => Ok(Field::Json(serde_json_to_json_value(value)?)),
FieldType::Point => {
check_type("geojson")?;
let json = value.as_object_mut().ok_or_else(unsupported_type)?;
if !json.get("type").is_some_and(|type_| type_ == "Point") {
return Err(AerospikeConnectorError::ParsingPointFailed);
}
let Some(Value::Array(coords)) = json.remove("coordinates") else {
return Err(AerospikeConnectorError::ParsingPointFailed);
};
let p: [Value; 2] = coords
.try_into()
.map_err(|_| AerospikeConnectorError::ParsingPointFailed)?;
if let (Some(x), Some(y)) = (p[0].as_f64(), p[1].as_f64()) {
Ok(Field::Point((x, y).into()))
} else {
Err(AerospikeConnectorError::ParsingPointFailed)
}
}
Value::Object(_) | Value::Array(_) => {
Err(AerospikeConnectorError::UnsupportedTypeForFieldType {
bin_type: bin_type.to_string(),
field_type: typ,
})
FieldType::Duration => {
check_type("str")?;
let string = value.as_str().ok_or_else(unsupported_type)?;
let duration = parse_duration(string)?;
Ok(Field::Duration(dozer_types::types::DozerDuration(
duration,
dozer_types::types::TimeUnit::Nanoseconds,
)))
}
}
}

fn parse_duration(string: &str) -> Result<Duration, AerospikeConnectorError> {
let err = |_| AerospikeConnectorError::ParsingDurationFailed;
if !string.get(0..2).is_some_and(|chars| chars == "PT") {
return Err(AerospikeConnectorError::ParsingDurationFailed);
}
let string = &string[2..];
let to_duration = |scale, number: &Decimal| -> Result<Duration, AerospikeConnectorError> {
let as_secs: Decimal = number * Decimal::new(scale, 0);
let secs = as_secs.try_into().map_err(err)?;
let frac = as_secs.fract() * Decimal::new(1_000_000_000, 0);
Ok(Duration::new(secs, frac.try_into().map_err(err)?))
};
let (hours, string) = parse_duration_part(string, 'H')?;
let mut duration = to_duration(3600, &hours)?;
if hours.is_integer() {
let (mins, string) = parse_duration_part(string, 'M')?;
duration += to_duration(60, &mins)?;
if mins.is_integer() {
let (secs, string) = parse_duration_part(string, 'S')?;
duration += to_duration(1, &secs)?;
if !string.is_empty() {
return Err(AerospikeConnectorError::ParsingDurationFailed);
}
} else if !string.is_empty() {
return Err(AerospikeConnectorError::ParsingDurationFailed);
}
} else if !string.is_empty() {
return Err(AerospikeConnectorError::ParsingDurationFailed);
}
Ok(duration)
}

fn parse_duration_part(
string: &str,
delim: char,
) -> Result<(Decimal, &str), AerospikeConnectorError> {
let idx = string.find(delim);
let value = idx
.map_or(Ok(Decimal::ZERO), |idx| string[..idx].parse())
.map_err(|_| AerospikeConnectorError::ParsingDurationFailed)?;
if let Some(idx) = idx {
Ok((value, &string[idx + 1..]))
} else {
Ok((value, string))
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_parse_duration() {
assert_eq!(parse_duration("PT3H").unwrap(), Duration::new(3600 * 3, 0));
assert_eq!(parse_duration("PT3M").unwrap(), Duration::new(60 * 3, 0));
assert_eq!(parse_duration("PT3S").unwrap(), Duration::new(3, 0));

assert_eq!(
parse_duration("PT3H3S").unwrap(),
Duration::new(3600 * 3 + 3, 0)
);

assert_eq!(
parse_duration("PT3.2H").unwrap(),
Duration::new(3600 * 3 + 12 * 60, 0)
);

assert_eq!(
parse_duration("PT3.2H").unwrap(),
Duration::new(3600 * 3 + 12 * 60, 0)
);
assert!(parse_duration("PT3.2H2M").is_err());
assert_eq!(
parse_duration("PT0.000123S").unwrap(),
Duration::new(0, 123_000)
);
}
}
Loading

0 comments on commit 9c13ed6

Please sign in to comment.