diff --git a/src/pipeline/src/etl.rs b/src/pipeline/src/etl.rs index de4c544a011a..f6b3efd6e6f3 100644 --- a/src/pipeline/src/etl.rs +++ b/src/pipeline/src/etl.rs @@ -21,7 +21,7 @@ pub mod value; use ahash::HashSet; use common_telemetry::debug; -use itertools::{merge, Itertools}; +use itertools::Itertools; use processor::{Processor, ProcessorBuilder, Processors}; use transform::{TransformBuilders, Transformer, Transforms}; use value::Value; @@ -91,13 +91,18 @@ where debug!("required_keys: {:?}", required_keys); // intermediate keys are the keys that all processor and transformer required - let ordered_intermediate_keys: Vec = - merge(processors_required_keys, transforms_required_keys) - .cloned() - .collect::>() - .into_iter() - .sorted() - .collect(); + let ordered_intermediate_keys: Vec = [ + processors_required_keys, + transforms_required_keys, + processors_output_keys, + ] + .iter() + .flat_map(|l| l.iter()) + .collect::>() + .into_iter() + .sorted() + .cloned() + .collect_vec(); let mut final_intermediate_keys = Vec::with_capacity(ordered_intermediate_keys.len()); let mut intermediate_keys_exclude_original = diff --git a/src/pipeline/src/etl/processor/dissect.rs b/src/pipeline/src/etl/processor/dissect.rs index 9a4b8a966e66..dca88d38430d 100644 --- a/src/pipeline/src/etl/processor/dissect.rs +++ b/src/pipeline/src/etl/processor/dissect.rs @@ -817,16 +817,12 @@ impl Processor for DissectProcessor { for field in self.fields.iter() { let index = field.input_index(); match val.get(index) { - Some(Value::String(val_str)) => match self.process(val_str) { - Ok(r) => { - for (k, v) in r { - val[k] = v; - } - } - Err(e) => { - warn!("dissect processor: {}", e); + Some(Value::String(val_str)) => { + let r = self.process(val_str)?; + for (k, v) in r { + val[k] = v; } - }, + } Some(Value::Null) | None => { if !self.ignore_missing { return Err(format!( diff --git a/src/pipeline/tests/dissect.rs b/src/pipeline/tests/dissect.rs index 82ce63399c67..22cf14c46bb3 100644 --- a/src/pipeline/tests/dissect.rs +++ b/src/pipeline/tests/dissect.rs @@ -247,3 +247,37 @@ transform: Some(StringValue("key1_key2".to_string())) ); } + +#[test] +fn test_parse_failure() { + let input_str = r#" +{ + "str": "key1 key2" +}"#; + + let pipeline_yaml = r#" +processors: + - dissect: + field: str + patterns: + - "%{key1} %{key2} %{key3}" + +transform: + - fields: + - key1 + type: string +"#; + + let input_value = serde_json::from_str::(input_str).unwrap(); + + let yaml_content = pipeline::Content::Yaml(pipeline_yaml.into()); + let pipeline: pipeline::Pipeline = + pipeline::parse(&yaml_content).expect("failed to parse pipeline"); + let mut result = pipeline.init_intermediate_state(); + + pipeline.prepare(input_value, &mut result).unwrap(); + let row = pipeline.exec_mut(&mut result); + + assert!(row.is_err()); + assert_eq!(row.err().unwrap(), "No matching pattern found"); +} diff --git a/src/pipeline/tests/regex.rs b/src/pipeline/tests/regex.rs index 5be60c987525..a8a7daaf5c6f 100644 --- a/src/pipeline/tests/regex.rs +++ b/src/pipeline/tests/regex.rs @@ -122,3 +122,49 @@ transform: assert_eq!(output.rows[0].values[0].value_data, None); } + +#[test] +fn test_unuse_regex_group() { + let input_value_str = r#" + [ + { + "str": "123 456" + } + ] +"#; + + let pipeline_yaml = r#" +processors: +- regex: + fields: + - str + pattern: "(?\\d+) (?\\d+)" + +transform: +- field: str_id1 + type: string +"#; + + let output = common::parse_and_exec(input_value_str, pipeline_yaml); + + assert_eq!( + output.schema, + vec![ + common::make_column_schema( + "str_id1".to_string(), + ColumnDataType::String, + SemanticType::Field, + ), + common::make_column_schema( + "greptime_timestamp".to_string(), + ColumnDataType::TimestampNanosecond, + SemanticType::Timestamp, + ), + ] + ); + + assert_eq!( + output.rows[0].values[0].value_data, + Some(StringValue("123".to_string())) + ); +}