Skip to content

Commit

Permalink
chore: support pattern as pipeline key name (#4368)
Browse files Browse the repository at this point in the history
* chore: add pattern to processor key name

* fix: typo

* refactor: test
  • Loading branch information
shuiyisong authored Jul 18, 2024
1 parent e39f49f commit 8ab6136
Show file tree
Hide file tree
Showing 8 changed files with 339 additions and 91 deletions.
8 changes: 6 additions & 2 deletions src/pipeline/src/etl/processor/dissect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use itertools::Itertools;

use crate::etl::field::{Field, Fields};
use crate::etl::processor::{
yaml_bool, yaml_field, yaml_fields, yaml_parse_strings, yaml_string, Processor, FIELDS_NAME,
FIELD_NAME, IGNORE_MISSING_NAME, PATTERNS_NAME,
yaml_bool, yaml_field, yaml_fields, yaml_parse_string, yaml_parse_strings, yaml_string,
Processor, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, PATTERNS_NAME, PATTERN_NAME,
};
use crate::etl::value::{Map, Value};

Expand Down Expand Up @@ -559,6 +559,10 @@ impl TryFrom<&yaml_rust::yaml::Hash> for DissectProcessor {
match key {
FIELD_NAME => processor.with_fields(Fields::one(yaml_field(v, FIELD_NAME)?)),
FIELDS_NAME => processor.with_fields(yaml_fields(v, FIELDS_NAME)?),
PATTERN_NAME => {
let pattern: Pattern = yaml_parse_string(v, PATTERN_NAME)?;
processor.with_patterns(vec![pattern]);
}
PATTERNS_NAME => {
let patterns = yaml_parse_strings(v, PATTERNS_NAME)?;
processor.with_patterns(patterns);
Expand Down
36 changes: 34 additions & 2 deletions src/pipeline/src/etl/processor/regex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use regex::Regex;

use crate::etl::field::Fields;
use crate::etl::processor::{
yaml_bool, yaml_field, yaml_fields, yaml_strings, Field, Processor, FIELDS_NAME, FIELD_NAME,
IGNORE_MISSING_NAME,
yaml_bool, yaml_field, yaml_fields, yaml_string, yaml_strings, Field, Processor, FIELDS_NAME,
FIELD_NAME, IGNORE_MISSING_NAME, PATTERN_NAME,
};
use crate::etl::value::{Map, Value};

Expand Down Expand Up @@ -157,6 +157,9 @@ impl TryFrom<&yaml_rust::yaml::Hash> for RegexProcessor {
FIELDS_NAME => {
processor.with_fields(yaml_fields(v, FIELDS_NAME)?);
}
PATTERN_NAME => {
processor.try_with_patterns(vec![yaml_string(v, PATTERN_NAME)?])?;
}
PATTERNS_NAME => {
processor.try_with_patterns(yaml_strings(v, PATTERNS_NAME)?)?;
}
Expand Down Expand Up @@ -210,6 +213,35 @@ mod tests {
use crate::etl::processor::Processor;
use crate::etl::value::{Map, Value};

#[test]
fn test_simple_parse() {
let mut processor = RegexProcessor::default();

// single field (with prefix), multiple patterns
let f = ["a"].iter().map(|f| f.parse().unwrap()).collect();
processor.with_fields(Fields::new(f).unwrap());

let ar = "(?<ar>\\d)";

let patterns = [ar].iter().map(|p| p.to_string()).collect();
processor.try_with_patterns(patterns).unwrap();

let mut map = Map::default();
map.insert("a", Value::String("123".to_string()));
let processed_val = processor.exec_map(map).unwrap();

let v = Value::Map(Map {
values: vec![
("a_ar".to_string(), Value::String("1".to_string())),
("a".to_string(), Value::String("123".to_string())),
]
.into_iter()
.collect(),
});

assert_eq!(v, processed_val);
}

#[test]
fn test_process() {
let mut processor = RegexProcessor::default();
Expand Down
16 changes: 15 additions & 1 deletion src/pipeline/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use greptime_proto::v1::Rows;
use greptime_proto::v1::{ColumnDataType, ColumnSchema, Rows, SemanticType};
use pipeline::{parse, Content, GreptimeTransformer, Pipeline, Value};

/// test util function to parse and execute pipeline
Expand All @@ -28,3 +28,17 @@ pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows {

pipeline.exec(input_value).expect("failed to exec pipeline")
}

/// test util function to create column schema
pub fn make_column_schema(
column_name: String,
datatype: ColumnDataType,
semantic_type: SemanticType,
) -> ColumnSchema {
ColumnSchema {
column_name,
datatype: datatype.into(),
semantic_type: semantic_type.into(),
..Default::default()
}
}
113 changes: 113 additions & 0 deletions src/pipeline/tests/dissect.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod common;

use greptime_proto::v1::value::ValueData::StringValue;
use greptime_proto::v1::{ColumnDataType, SemanticType};

#[test]
fn test_dissect_pattern() {
let input_value_str = r#"
[
{
"str": "123 456"
}
]
"#;

let pipeline_yaml = r#"
processors:
- dissect:
field: str
pattern: "%{a} %{b}"
transform:
- fields:
- a
- b
type: string
"#;

let output = common::parse_and_exec(input_value_str, pipeline_yaml);

let expected_schema = vec![
common::make_column_schema("a".to_string(), ColumnDataType::String, SemanticType::Field),
common::make_column_schema("b".to_string(), ColumnDataType::String, SemanticType::Field),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];

assert_eq!(output.schema, expected_schema);

assert_eq!(
output.rows[0].values[0].value_data,
Some(StringValue("123".to_string()))
);
assert_eq!(
output.rows[0].values[1].value_data,
Some(StringValue("456".to_string()))
);
}

#[test]
fn test_dissect_patterns() {
let input_value_str = r#"
[
{
"str": "123 456"
}
]
"#;

let pipeline_yaml = r#"
processors:
- dissect:
field: str
patterns:
- "%{a} %{b}"
transform:
- fields:
- a
- b
type: string
"#;

let output = common::parse_and_exec(input_value_str, pipeline_yaml);

let expected_schema = vec![
common::make_column_schema("a".to_string(), ColumnDataType::String, SemanticType::Field),
common::make_column_schema("b".to_string(), ColumnDataType::String, SemanticType::Field),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];

assert_eq!(output.schema, expected_schema);

assert_eq!(
output.rows[0].values[0].value_data,
Some(StringValue("123".to_string()))
);
assert_eq!(
output.rows[0].values[1].value_data,
Some(StringValue("456".to_string()))
);
}
14 changes: 6 additions & 8 deletions src/pipeline/tests/gsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use greptime_proto::v1::value::ValueData::TimestampMillisecondValue;
use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType};
use greptime_proto::v1::{ColumnDataType, SemanticType};

mod common;

Expand Down Expand Up @@ -49,13 +49,11 @@ transform:

let output = common::parse_and_exec(input_value_str, pipeline_yaml);

let expected_schema = vec![ColumnSchema {
column_name: "reqTimeSec".to_string(),
datatype: ColumnDataType::TimestampMillisecond.into(),
semantic_type: SemanticType::Timestamp.into(),
datatype_extension: None,
options: None,
}];
let expected_schema = vec![common::make_column_schema(
"reqTimeSec".to_string(),
ColumnDataType::TimestampMillisecond,
SemanticType::Timestamp,
)];

assert_eq!(output.schema, expected_schema);
assert_eq!(
Expand Down
24 changes: 10 additions & 14 deletions src/pipeline/tests/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,16 @@ transform:

lazy_static! {
pub static ref EXPECTED_SCHEMA: Vec<ColumnSchema> = vec![
ColumnSchema {
column_name: "join_test".to_string(),
datatype: ColumnDataType::String.into(),
semantic_type: SemanticType::Field.into(),
datatype_extension: None,
options: None,
},
ColumnSchema {
column_name: "greptime_timestamp".to_string(),
datatype: ColumnDataType::TimestampNanosecond.into(),
semantic_type: SemanticType::Timestamp.into(),
datatype_extension: None,
options: None,
},
common::make_column_schema(
"join_test".to_string(),
ColumnDataType::String,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
}

Expand Down
Loading

0 comments on commit 8ab6136

Please sign in to comment.