From 8ab6136d1cb94f00ce7b7332b42918e376be0ff3 Mon Sep 17 00:00:00 2001 From: shuiyisong <113876041+shuiyisong@users.noreply.github.com> Date: Thu, 18 Jul 2024 11:32:26 +0800 Subject: [PATCH] chore: support `pattern` as pipeline key name (#4368) * chore: add pattern to processor key name * fix: typo * refactor: test --- src/pipeline/src/etl/processor/dissect.rs | 8 +- src/pipeline/src/etl/processor/regex.rs | 36 ++++++- src/pipeline/tests/common.rs | 16 ++- src/pipeline/tests/dissect.rs | 113 ++++++++++++++++++++++ src/pipeline/tests/gsub.rs | 14 ++- src/pipeline/tests/join.rs | 24 ++--- src/pipeline/tests/on_failure.rs | 110 +++++++++------------ src/pipeline/tests/regex.rs | 109 +++++++++++++++++++++ 8 files changed, 339 insertions(+), 91 deletions(-) create mode 100644 src/pipeline/tests/dissect.rs create mode 100644 src/pipeline/tests/regex.rs diff --git a/src/pipeline/src/etl/processor/dissect.rs b/src/pipeline/src/etl/processor/dissect.rs index 005b104f5b14..adb416c843f0 100644 --- a/src/pipeline/src/etl/processor/dissect.rs +++ b/src/pipeline/src/etl/processor/dissect.rs @@ -19,8 +19,8 @@ use itertools::Itertools; use crate::etl::field::{Field, Fields}; use crate::etl::processor::{ - yaml_bool, yaml_field, yaml_fields, yaml_parse_strings, yaml_string, Processor, FIELDS_NAME, - FIELD_NAME, IGNORE_MISSING_NAME, PATTERNS_NAME, + yaml_bool, yaml_field, yaml_fields, yaml_parse_string, yaml_parse_strings, yaml_string, + Processor, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, PATTERNS_NAME, PATTERN_NAME, }; use crate::etl::value::{Map, Value}; @@ -559,6 +559,10 @@ impl TryFrom<&yaml_rust::yaml::Hash> for DissectProcessor { match key { FIELD_NAME => processor.with_fields(Fields::one(yaml_field(v, FIELD_NAME)?)), FIELDS_NAME => processor.with_fields(yaml_fields(v, FIELDS_NAME)?), + PATTERN_NAME => { + let pattern: Pattern = yaml_parse_string(v, PATTERN_NAME)?; + processor.with_patterns(vec![pattern]); + } PATTERNS_NAME => { let patterns = yaml_parse_strings(v, PATTERNS_NAME)?; processor.with_patterns(patterns); diff --git a/src/pipeline/src/etl/processor/regex.rs b/src/pipeline/src/etl/processor/regex.rs index 8aba43436155..7474b78db06d 100644 --- a/src/pipeline/src/etl/processor/regex.rs +++ b/src/pipeline/src/etl/processor/regex.rs @@ -23,8 +23,8 @@ use regex::Regex; use crate::etl::field::Fields; use crate::etl::processor::{ - yaml_bool, yaml_field, yaml_fields, yaml_strings, Field, Processor, FIELDS_NAME, FIELD_NAME, - IGNORE_MISSING_NAME, + yaml_bool, yaml_field, yaml_fields, yaml_string, yaml_strings, Field, Processor, FIELDS_NAME, + FIELD_NAME, IGNORE_MISSING_NAME, PATTERN_NAME, }; use crate::etl::value::{Map, Value}; @@ -157,6 +157,9 @@ impl TryFrom<&yaml_rust::yaml::Hash> for RegexProcessor { FIELDS_NAME => { processor.with_fields(yaml_fields(v, FIELDS_NAME)?); } + PATTERN_NAME => { + processor.try_with_patterns(vec![yaml_string(v, PATTERN_NAME)?])?; + } PATTERNS_NAME => { processor.try_with_patterns(yaml_strings(v, PATTERNS_NAME)?)?; } @@ -210,6 +213,35 @@ mod tests { use crate::etl::processor::Processor; use crate::etl::value::{Map, Value}; + #[test] + fn test_simple_parse() { + let mut processor = RegexProcessor::default(); + + // single field (with prefix), multiple patterns + let f = ["a"].iter().map(|f| f.parse().unwrap()).collect(); + processor.with_fields(Fields::new(f).unwrap()); + + let ar = "(?\\d)"; + + let patterns = [ar].iter().map(|p| p.to_string()).collect(); + processor.try_with_patterns(patterns).unwrap(); + + let mut map = Map::default(); + map.insert("a", Value::String("123".to_string())); + let processed_val = processor.exec_map(map).unwrap(); + + let v = Value::Map(Map { + values: vec![ + ("a_ar".to_string(), Value::String("1".to_string())), + ("a".to_string(), Value::String("123".to_string())), + ] + .into_iter() + .collect(), + }); + + assert_eq!(v, processed_val); + } + #[test] fn test_process() { let mut processor = RegexProcessor::default(); diff --git a/src/pipeline/tests/common.rs b/src/pipeline/tests/common.rs index cf75fd773b3b..7e1a44112eb4 100644 --- a/src/pipeline/tests/common.rs +++ b/src/pipeline/tests/common.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use greptime_proto::v1::Rows; +use greptime_proto::v1::{ColumnDataType, ColumnSchema, Rows, SemanticType}; use pipeline::{parse, Content, GreptimeTransformer, Pipeline, Value}; /// test util function to parse and execute pipeline @@ -28,3 +28,17 @@ pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows { pipeline.exec(input_value).expect("failed to exec pipeline") } + +/// test util function to create column schema +pub fn make_column_schema( + column_name: String, + datatype: ColumnDataType, + semantic_type: SemanticType, +) -> ColumnSchema { + ColumnSchema { + column_name, + datatype: datatype.into(), + semantic_type: semantic_type.into(), + ..Default::default() + } +} diff --git a/src/pipeline/tests/dissect.rs b/src/pipeline/tests/dissect.rs new file mode 100644 index 000000000000..bc9ca263ca40 --- /dev/null +++ b/src/pipeline/tests/dissect.rs @@ -0,0 +1,113 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod common; + +use greptime_proto::v1::value::ValueData::StringValue; +use greptime_proto::v1::{ColumnDataType, SemanticType}; + +#[test] +fn test_dissect_pattern() { + let input_value_str = r#" + [ + { + "str": "123 456" + } + ] +"#; + + let pipeline_yaml = r#" +processors: + - dissect: + field: str + pattern: "%{a} %{b}" + +transform: + - fields: + - a + - b + type: string +"#; + + let output = common::parse_and_exec(input_value_str, pipeline_yaml); + + let expected_schema = vec![ + common::make_column_schema("a".to_string(), ColumnDataType::String, SemanticType::Field), + common::make_column_schema("b".to_string(), ColumnDataType::String, SemanticType::Field), + common::make_column_schema( + "greptime_timestamp".to_string(), + ColumnDataType::TimestampNanosecond, + SemanticType::Timestamp, + ), + ]; + + assert_eq!(output.schema, expected_schema); + + assert_eq!( + output.rows[0].values[0].value_data, + Some(StringValue("123".to_string())) + ); + assert_eq!( + output.rows[0].values[1].value_data, + Some(StringValue("456".to_string())) + ); +} + +#[test] +fn test_dissect_patterns() { + let input_value_str = r#" + [ + { + "str": "123 456" + } + ] +"#; + + let pipeline_yaml = r#" +processors: + - dissect: + field: str + patterns: + - "%{a} %{b}" + +transform: + - fields: + - a + - b + type: string +"#; + + let output = common::parse_and_exec(input_value_str, pipeline_yaml); + + let expected_schema = vec![ + common::make_column_schema("a".to_string(), ColumnDataType::String, SemanticType::Field), + common::make_column_schema("b".to_string(), ColumnDataType::String, SemanticType::Field), + common::make_column_schema( + "greptime_timestamp".to_string(), + ColumnDataType::TimestampNanosecond, + SemanticType::Timestamp, + ), + ]; + + assert_eq!(output.schema, expected_schema); + + assert_eq!( + output.rows[0].values[0].value_data, + Some(StringValue("123".to_string())) + ); + assert_eq!( + output.rows[0].values[1].value_data, + Some(StringValue("456".to_string())) + ); +} diff --git a/src/pipeline/tests/gsub.rs b/src/pipeline/tests/gsub.rs index 0c527b87ce70..2f336923e8b6 100644 --- a/src/pipeline/tests/gsub.rs +++ b/src/pipeline/tests/gsub.rs @@ -13,7 +13,7 @@ // limitations under the License. use greptime_proto::v1::value::ValueData::TimestampMillisecondValue; -use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType}; +use greptime_proto::v1::{ColumnDataType, SemanticType}; mod common; @@ -49,13 +49,11 @@ transform: let output = common::parse_and_exec(input_value_str, pipeline_yaml); - let expected_schema = vec![ColumnSchema { - column_name: "reqTimeSec".to_string(), - datatype: ColumnDataType::TimestampMillisecond.into(), - semantic_type: SemanticType::Timestamp.into(), - datatype_extension: None, - options: None, - }]; + let expected_schema = vec![common::make_column_schema( + "reqTimeSec".to_string(), + ColumnDataType::TimestampMillisecond, + SemanticType::Timestamp, + )]; assert_eq!(output.schema, expected_schema); assert_eq!( diff --git a/src/pipeline/tests/join.rs b/src/pipeline/tests/join.rs index 302da13c79fd..9ffa35909c76 100644 --- a/src/pipeline/tests/join.rs +++ b/src/pipeline/tests/join.rs @@ -32,20 +32,16 @@ transform: lazy_static! { pub static ref EXPECTED_SCHEMA: Vec = vec![ - ColumnSchema { - column_name: "join_test".to_string(), - datatype: ColumnDataType::String.into(), - semantic_type: SemanticType::Field.into(), - datatype_extension: None, - options: None, - }, - ColumnSchema { - column_name: "greptime_timestamp".to_string(), - datatype: ColumnDataType::TimestampNanosecond.into(), - semantic_type: SemanticType::Timestamp.into(), - datatype_extension: None, - options: None, - }, + common::make_column_schema( + "join_test".to_string(), + ColumnDataType::String, + SemanticType::Field, + ), + common::make_column_schema( + "greptime_timestamp".to_string(), + ColumnDataType::TimestampNanosecond, + SemanticType::Timestamp, + ), ]; } diff --git a/src/pipeline/tests/on_failure.rs b/src/pipeline/tests/on_failure.rs index 199f8a1606db..db72a0b7fc63 100644 --- a/src/pipeline/tests/on_failure.rs +++ b/src/pipeline/tests/on_failure.rs @@ -13,7 +13,7 @@ // limitations under the License. use greptime_proto::v1::value::ValueData::{U16Value, U8Value}; -use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType}; +use greptime_proto::v1::{ColumnDataType, SemanticType}; mod common; @@ -40,20 +40,16 @@ transform: let output = common::parse_and_exec(input_value_str, pipeline_yaml); let expected_schema = vec![ - ColumnSchema { - column_name: "version".to_string(), - datatype: ColumnDataType::Uint8.into(), - semantic_type: SemanticType::Field.into(), - datatype_extension: None, - options: None, - }, - ColumnSchema { - column_name: "greptime_timestamp".to_string(), - datatype: ColumnDataType::TimestampNanosecond.into(), - semantic_type: SemanticType::Timestamp.into(), - datatype_extension: None, - options: None, - }, + common::make_column_schema( + "version".to_string(), + ColumnDataType::Uint8, + SemanticType::Field, + ), + common::make_column_schema( + "greptime_timestamp".to_string(), + ColumnDataType::TimestampNanosecond, + SemanticType::Timestamp, + ), ]; assert_eq!(output.schema, expected_schema); @@ -85,20 +81,16 @@ transform: let output = common::parse_and_exec(input_value_str, pipeline_yaml); let expected_schema = vec![ - ColumnSchema { - column_name: "version".to_string(), - datatype: ColumnDataType::Uint8.into(), - semantic_type: SemanticType::Field.into(), - datatype_extension: None, - options: None, - }, - ColumnSchema { - column_name: "greptime_timestamp".to_string(), - datatype: ColumnDataType::TimestampNanosecond.into(), - semantic_type: SemanticType::Timestamp.into(), - datatype_extension: None, - options: None, - }, + common::make_column_schema( + "version".to_string(), + ColumnDataType::Uint8, + SemanticType::Field, + ), + common::make_column_schema( + "greptime_timestamp".to_string(), + ColumnDataType::TimestampNanosecond, + SemanticType::Timestamp, + ), ]; assert_eq!(output.schema, expected_schema); @@ -125,20 +117,16 @@ transform: let output = common::parse_and_exec(input_value_str, pipeline_yaml); let expected_schema = vec![ - ColumnSchema { - column_name: "version".to_string(), - datatype: ColumnDataType::Uint8.into(), - semantic_type: SemanticType::Field.into(), - datatype_extension: None, - options: None, - }, - ColumnSchema { - column_name: "greptime_timestamp".to_string(), - datatype: ColumnDataType::TimestampNanosecond.into(), - semantic_type: SemanticType::Timestamp.into(), - datatype_extension: None, - options: None, - }, + common::make_column_schema( + "version".to_string(), + ColumnDataType::Uint8, + SemanticType::Field, + ), + common::make_column_schema( + "greptime_timestamp".to_string(), + ColumnDataType::TimestampNanosecond, + SemanticType::Timestamp, + ), ]; assert_eq!(output.schema, expected_schema); @@ -176,27 +164,21 @@ transform: let output = common::parse_and_exec(input_value_str, pipeline_yaml); let expected_schema = vec![ - ColumnSchema { - column_name: "version".to_string(), - datatype: ColumnDataType::Uint8.into(), - semantic_type: SemanticType::Field.into(), - datatype_extension: None, - options: None, - }, - ColumnSchema { - column_name: "spec_version".to_string(), - datatype: ColumnDataType::Uint16.into(), - semantic_type: SemanticType::Field.into(), - datatype_extension: None, - options: None, - }, - ColumnSchema { - column_name: "greptime_timestamp".to_string(), - datatype: ColumnDataType::TimestampNanosecond.into(), - semantic_type: SemanticType::Timestamp.into(), - datatype_extension: None, - options: None, - }, + common::make_column_schema( + "version".to_string(), + ColumnDataType::Uint8, + SemanticType::Field, + ), + common::make_column_schema( + "spec_version".to_string(), + ColumnDataType::Uint16, + SemanticType::Field, + ), + common::make_column_schema( + "greptime_timestamp".to_string(), + ColumnDataType::TimestampNanosecond, + SemanticType::Timestamp, + ), ]; assert_eq!(output.schema, expected_schema); diff --git a/src/pipeline/tests/regex.rs b/src/pipeline/tests/regex.rs new file mode 100644 index 000000000000..5519c613951f --- /dev/null +++ b/src/pipeline/tests/regex.rs @@ -0,0 +1,109 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod common; + +use greptime_proto::v1::value::ValueData::StringValue; +use greptime_proto::v1::{ColumnDataType, SemanticType}; + +#[test] +fn test_regex_pattern() { + let input_value_str = r#" + [ + { + "str": "123 456" + } + ] +"#; + + let pipeline_yaml = r#" +processors: + - regex: + fields: + - str + pattern: "(?\\d+)" + +transform: + - field: str_id + type: string +"#; + + let output = common::parse_and_exec(input_value_str, pipeline_yaml); + + let expected_schema = vec![ + common::make_column_schema( + "str_id".to_string(), + ColumnDataType::String, + SemanticType::Field, + ), + common::make_column_schema( + "greptime_timestamp".to_string(), + ColumnDataType::TimestampNanosecond, + SemanticType::Timestamp, + ), + ]; + + assert_eq!(output.schema, expected_schema); + + assert_eq!( + output.rows[0].values[0].value_data, + Some(StringValue("123".to_string())) + ); +} + +#[test] +fn test_regex_patterns() { + let input_value_str = r#" + [ + { + "str": "123 456" + } + ] +"#; + + let pipeline_yaml = r#" +processors: + - regex: + fields: + - str + patterns: + - "(?\\d+)" + +transform: + - field: str_id + type: string +"#; + + let output = common::parse_and_exec(input_value_str, pipeline_yaml); + + let expected_schema = vec![ + common::make_column_schema( + "str_id".to_string(), + ColumnDataType::String, + SemanticType::Field, + ), + common::make_column_schema( + "greptime_timestamp".to_string(), + ColumnDataType::TimestampNanosecond, + SemanticType::Timestamp, + ), + ]; + + assert_eq!(output.schema, expected_schema); + + assert_eq!( + output.rows[0].values[0].value_data, + Some(StringValue("123".to_string())) + ); +}