Skip to content

Commit

Permalink
feat(pipeline): transform support on_failure (#4123)
Browse files Browse the repository at this point in the history
* chore: add log http ingester scaffold

* chore: add some example code

* chore: add log inserter

* chore: add log handler file

* chore: add pipeline lib

* chore: import log handler

* chore: add pipelime http handler

* chore: add pipeline private table

* chore: add pipeline API

* chore: improve error handling

* chore: merge main

* chore: add multi content type support for log handler

* refactor: remove servers dep on pipeline

* refactor: move define_into_tonic_status to common-error

* refactor: bring in pipeline 3eb890c551b8d7f60c4491fcfec18966e2b210a4

* chore: fix typo

* refactor: bring in pipeline a95c9767d7056ab01dd8ca5fa1214456c6ffc72c

* chore: fix typo and license header

* refactor: move http event handler to a separate file

* chore: add test for pipeline

* chore: fmt

* refactor: bring in pipeline 7d2402701877901871dd1294a65ac937605a6a93

* refactor: move `pipeline_operator` to `pipeline` crate

* chore: minor update

* refactor: bring in pipeline 1711f4d46687bada72426d88cda417899e0ae3a4

* chore: add log

* chore: add log

* chore: remove open hook

* chore: minor update

* chore: fix fmt

* chore: minor update

* chore: rename desc for pipeline table

* refactor: remove updated_at in pipelines

* chore: add more content type support for log inserter api

* chore: introduce pipeline crate

* chore: update upload pipeline api

* chore: fix by pr commit

* chore: add some doc for pub fn/struct

* chore: some minro fix

* chore: add pipeline version support

* chore: impl log pipeline version

* transform on_failure

* chore: add test

* chore: move test to a separate file

* chore: add comment

---------

Co-authored-by: paomian <[email protected]>
Co-authored-by: shuiyisong <[email protected]>
  • Loading branch information
3 people authored Jun 17, 2024
1 parent 558272d commit 0aceebf
Show file tree
Hide file tree
Showing 6 changed files with 520 additions and 34 deletions.
65 changes: 63 additions & 2 deletions src/pipeline/src/etl/transform/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const TRANSFORM_FIELDS: &str = "fields";
const TRANSFORM_TYPE: &str = "type";
const TRANSFORM_INDEX: &str = "index";
const TRANSFORM_DEFAULT: &str = "default";
const TRANSFORM_ON_FAILURE: &str = "on_failure";

pub use transformer::greptime::GreptimeTransformer;
// pub use transformer::noop::NoopTransformer;
Expand All @@ -38,6 +39,38 @@ pub trait Transformer: std::fmt::Display + Sized + Send + Sync + 'static {
fn transform(&self, val: crate::etl::value::Value) -> Result<Self::Output, String>;
}

/// On Failure behavior when transform fails
#[derive(Debug, Clone, Default)]
pub enum OnFailure {
// Return None if transform fails
#[default]
Ignore,
// Return default value of the field if transform fails
// Default value depends on the type of the field, or explicitly set by user
Default,
}

impl std::str::FromStr for OnFailure {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"ignore" => Ok(OnFailure::Ignore),
"default" => Ok(OnFailure::Default),
_ => Err(format!("invalid transform on_failure value: {}", s)),
}
}
}

impl std::fmt::Display for OnFailure {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
OnFailure::Ignore => write!(f, "ignore"),
OnFailure::Default => write!(f, "default"),
}
}
}

#[derive(Debug, Default, Clone)]
pub struct Transforms {
transforms: Vec<Transform>,
Expand Down Expand Up @@ -97,6 +130,8 @@ pub struct Transform {
pub default: Option<Value>,

pub index: Option<Index>,

pub on_failure: Option<OnFailure>,
}

impl std::fmt::Display for Transform {
Expand All @@ -107,10 +142,21 @@ impl std::fmt::Display for Transform {
"".to_string()
};

let fields = format!("field(s): {}", self.fields);
let type_ = format!("type: {}", self.type_);
let fields = format!("field(s): {}", self.fields);
let default = if let Some(default) = &self.default {
format!(", default: {}", default)
} else {
"".to_string()
};

let on_failure = if let Some(on_failure) = &self.on_failure {
format!(", on_failure: {}", on_failure)
} else {
"".to_string()
};

write!(f, "{type_}{index}, {fields}")
write!(f, "{type_}{index}, {fields}{default}{on_failure}",)
}
}

Expand All @@ -121,6 +167,7 @@ impl Default for Transform {
type_: Value::Null,
default: None,
index: None,
on_failure: None,
}
}
}
Expand Down Expand Up @@ -155,9 +202,17 @@ impl Transform {
self.index = Some(index);
}

fn with_on_failure(&mut self, on_failure: OnFailure) {
self.on_failure = Some(on_failure);
}

pub(crate) fn get_default(&self) -> Option<&Value> {
self.default.as_ref()
}

pub(crate) fn get_type_matched_default_val(&self) -> &Value {
&self.type_
}
}

impl TryFrom<&yaml_rust::yaml::Hash> for Transform {
Expand Down Expand Up @@ -192,6 +247,12 @@ impl TryFrom<&yaml_rust::yaml::Hash> for Transform {
TRANSFORM_DEFAULT => {
default_opt = Some(Value::try_from(v)?);
}

TRANSFORM_ON_FAILURE => {
let on_failure = yaml_string(v, TRANSFORM_ON_FAILURE)?;
transform.with_on_failure(on_failure.parse()?);
}

_ => {}
}
}
Expand Down
230 changes: 203 additions & 27 deletions src/pipeline/src/etl/transform/transformer/greptime/coerce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use greptime_proto::v1::value::ValueData;
use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType};

use crate::etl::transform::index::Index;
use crate::etl::transform::Transform;
use crate::etl::transform::{OnFailure, Transform};
use crate::etl::value::{Epoch, Time, Value};

impl TryFrom<Value> for ValueData {
Expand Down Expand Up @@ -177,8 +177,20 @@ fn coerce_bool_value(b: bool, transform: &Transform) -> Result<Option<ValueData>
Value::Boolean(_) => ValueData::BoolValue(b),
Value::String(_) => ValueData::StringValue(b.to_string()),

Value::Time(_) => return Err("Boolean type not supported for Time".to_string()),
Value::Epoch(_) => return Err("Boolean type not supported for Epoch".to_string()),
Value::Time(_) => match transform.on_failure {
Some(OnFailure::Ignore) => return Ok(None),
Some(OnFailure::Default) => {
return Err("default value not supported for Time".to_string())
}
None => return Err("Boolean type not supported for Time".to_string()),
},
Value::Epoch(_) => match transform.on_failure {
Some(OnFailure::Ignore) => return Ok(None),
Some(OnFailure::Default) => {
return Err("default value not supported for Epoch".to_string())
}
None => return Err("Boolean type not supported for Epoch".to_string()),
},

Value::Array(_) => unimplemented!("Array type not supported"),
Value::Map(_) => unimplemented!("Object type not supported"),
Expand Down Expand Up @@ -207,8 +219,21 @@ fn coerce_i64_value(n: i64, transform: &Transform) -> Result<Option<ValueData>,
Value::Boolean(_) => ValueData::BoolValue(n != 0),
Value::String(_) => ValueData::StringValue(n.to_string()),

Value::Time(_) => return Err("Integer type not supported for Time".to_string()),
Value::Epoch(_) => return Err("Integer type not supported for Epoch".to_string()),
Value::Time(_) => match transform.on_failure {
Some(OnFailure::Ignore) => return Ok(None),
Some(OnFailure::Default) => {
return Err("default value not supported for Time".to_string())
}
None => return Err("Integer type not supported for Time".to_string()),
},

Value::Epoch(_) => match transform.on_failure {
Some(OnFailure::Ignore) => return Ok(None),
Some(OnFailure::Default) => {
return Err("default value not supported for Epoch".to_string())
}
None => return Err("Integer type not supported for Epoch".to_string()),
},

Value::Array(_) => unimplemented!("Array type not supported"),
Value::Map(_) => unimplemented!("Object type not supported"),
Expand Down Expand Up @@ -237,8 +262,21 @@ fn coerce_u64_value(n: u64, transform: &Transform) -> Result<Option<ValueData>,
Value::Boolean(_) => ValueData::BoolValue(n != 0),
Value::String(_) => ValueData::StringValue(n.to_string()),

Value::Time(_) => return Err("Integer type not supported for Time".to_string()),
Value::Epoch(_) => return Err("Integer type not supported for Epoch".to_string()),
Value::Time(_) => match transform.on_failure {
Some(OnFailure::Ignore) => return Ok(None),
Some(OnFailure::Default) => {
return Err("default value not supported for Time".to_string())
}
None => return Err("Integer type not supported for Time".to_string()),
},

Value::Epoch(_) => match transform.on_failure {
Some(OnFailure::Ignore) => return Ok(None),
Some(OnFailure::Default) => {
return Err("default value not supported for Epoch".to_string())
}
None => return Err("Integer type not supported for Epoch".to_string()),
},

Value::Array(_) => unimplemented!("Array type not supported"),
Value::Map(_) => unimplemented!("Object type not supported"),
Expand Down Expand Up @@ -267,8 +305,21 @@ fn coerce_f64_value(n: f64, transform: &Transform) -> Result<Option<ValueData>,
Value::Boolean(_) => ValueData::BoolValue(n != 0.0),
Value::String(_) => ValueData::StringValue(n.to_string()),

Value::Time(_) => return Err("Float type not supported for Time".to_string()),
Value::Epoch(_) => return Err("Float type not supported for Epoch".to_string()),
Value::Time(_) => match transform.on_failure {
Some(OnFailure::Ignore) => return Ok(None),
Some(OnFailure::Default) => {
return Err("default value not supported for Time".to_string())
}
None => return Err("Float type not supported for Time".to_string()),
},

Value::Epoch(_) => match transform.on_failure {
Some(OnFailure::Ignore) => return Ok(None),
Some(OnFailure::Default) => {
return Err("default value not supported for Epoch".to_string())
}
None => return Err("Float type not supported for Epoch".to_string()),
},

Value::Array(_) => unimplemented!("Array type not supported"),
Value::Map(_) => unimplemented!("Object type not supported"),
Expand All @@ -280,31 +331,156 @@ fn coerce_f64_value(n: f64, transform: &Transform) -> Result<Option<ValueData>,
}

fn coerce_string_value(s: &str, transform: &Transform) -> Result<Option<ValueData>, String> {
let val = match transform.type_ {
Value::Int8(_) => ValueData::I8Value(s.parse::<i32>().map_err(|e| e.to_string())?),
Value::Int16(_) => ValueData::I16Value(s.parse::<i32>().map_err(|e| e.to_string())?),
Value::Int32(_) => ValueData::I32Value(s.parse::<i32>().map_err(|e| e.to_string())?),
Value::Int64(_) => ValueData::I64Value(s.parse::<i64>().map_err(|e| e.to_string())?),
match transform.type_ {
Value::Int8(_) if s.parse::<i32>().is_ok() => {
Ok(Some(ValueData::I8Value(s.parse().unwrap())))
}
Value::Int16(_) if s.parse::<i32>().is_ok() => {
Ok(Some(ValueData::I16Value(s.parse().unwrap())))
}
Value::Int32(_) if s.parse::<i32>().is_ok() => {
Ok(Some(ValueData::I32Value(s.parse().unwrap())))
}
Value::Int64(_) if s.parse::<i64>().is_ok() => {
Ok(Some(ValueData::I64Value(s.parse().unwrap())))
}

Value::Uint8(_) => ValueData::U8Value(s.parse::<u32>().map_err(|e| e.to_string())?),
Value::Uint16(_) => ValueData::U16Value(s.parse::<u32>().map_err(|e| e.to_string())?),
Value::Uint32(_) => ValueData::U32Value(s.parse::<u32>().map_err(|e| e.to_string())?),
Value::Uint64(_) => ValueData::U64Value(s.parse::<u64>().map_err(|e| e.to_string())?),
Value::Uint8(_) if s.parse::<u32>().is_ok() => {
Ok(Some(ValueData::U8Value(s.parse().unwrap())))
}
Value::Uint16(_) if s.parse::<u32>().is_ok() => {
Ok(Some(ValueData::U16Value(s.parse().unwrap())))
}
Value::Uint32(_) if s.parse::<u32>().is_ok() => {
Ok(Some(ValueData::U32Value(s.parse().unwrap())))
}
Value::Uint64(_) if s.parse::<u64>().is_ok() => {
Ok(Some(ValueData::U64Value(s.parse().unwrap())))
}

Value::Float32(_) => ValueData::F32Value(s.parse::<f32>().map_err(|e| e.to_string())?),
Value::Float64(_) => ValueData::F64Value(s.parse::<f64>().map_err(|e| e.to_string())?),
Value::Float32(_) if s.parse::<f32>().is_ok() => {
Ok(Some(ValueData::F32Value(s.parse().unwrap())))
}
Value::Float64(_) if s.parse::<f64>().is_ok() => {
Ok(Some(ValueData::F64Value(s.parse().unwrap())))
}

Value::Boolean(_) => ValueData::BoolValue(s.parse::<bool>().map_err(|e| e.to_string())?),
Value::String(_) => ValueData::StringValue(s.to_string()),
Value::Boolean(_) if s.parse::<bool>().is_ok() => {
Ok(Some(ValueData::BoolValue(s.parse().unwrap())))
}

Value::Time(_) => return Err("String type not supported for Time".to_string()),
Value::Epoch(_) => return Err("String type not supported for Epoch".to_string()),
// on_failure
Value::Int8(_)
| Value::Int16(_)
| Value::Int32(_)
| Value::Int64(_)
| Value::Uint8(_)
| Value::Uint16(_)
| Value::Uint32(_)
| Value::Uint64(_)
| Value::Float32(_)
| Value::Float64(_)
| Value::Boolean(_) => match transform.on_failure {
Some(OnFailure::Ignore) => Ok(None),
Some(OnFailure::Default) => match transform.get_default() {
Some(default) => coerce_value(default, transform),
None => coerce_value(transform.get_type_matched_default_val(), transform),
},
None => Err(format!(
"failed to coerce string value '{s}' to type '{}'",
transform.type_.to_str_type()
)),
},

Value::String(_) => Ok(Some(ValueData::StringValue(s.to_string()))),

Value::Time(_) => match transform.on_failure {
Some(OnFailure::Ignore) => Ok(None),
Some(OnFailure::Default) => Err("default value not supported for Time".to_string()),
None => Err("String type not supported for Time".to_string()),
},

Value::Epoch(_) => match transform.on_failure {
Some(OnFailure::Ignore) => Ok(None),
Some(OnFailure::Default) => Err("default value not supported for Epoch".to_string()),
None => Err("String type not supported for Epoch".to_string()),
},

Value::Array(_) => unimplemented!("Array type not supported"),
Value::Map(_) => unimplemented!("Object type not supported"),

Value::Null => return Ok(None),
};
Value::Null => Ok(None),
}
}

Ok(Some(val))
#[cfg(test)]
mod tests {
use super::*;
use crate::etl::field::Fields;

#[test]
fn test_coerce_string_without_on_failure() {
let transform = Transform {
fields: Fields::default(),
type_: Value::Int32(0),
default: None,
index: None,
on_failure: None,
};

// valid string
{
let val = Value::String("123".to_string());
let result = coerce_value(&val, &transform).unwrap();
assert_eq!(result, Some(ValueData::I32Value(123)));
}

// invalid string
{
let val = Value::String("hello".to_string());
let result = coerce_value(&val, &transform);
assert!(result.is_err());
}
}

#[test]
fn test_coerce_string_with_on_failure_ignore() {
let transform = Transform {
fields: Fields::default(),
type_: Value::Int32(0),
default: None,
index: None,
on_failure: Some(OnFailure::Ignore),
};

let val = Value::String("hello".to_string());
let result = coerce_value(&val, &transform).unwrap();
assert_eq!(result, None);
}

#[test]
fn test_coerce_string_with_on_failure_default() {
let mut transform = Transform {
fields: Fields::default(),
type_: Value::Int32(0),
default: None,
index: None,
on_failure: Some(OnFailure::Default),
};

// with no explicit default value
{
let val = Value::String("hello".to_string());
let result = coerce_value(&val, &transform).unwrap();
assert_eq!(result, Some(ValueData::I32Value(0)));
}

// with explicit default value
{
transform.default = Some(Value::Int32(42));
let val = Value::String("hello".to_string());
let result = coerce_value(&val, &transform).unwrap();
assert_eq!(result, Some(ValueData::I32Value(42)));
}
}
}
Loading

0 comments on commit 0aceebf

Please sign in to comment.