From 0fc18b686587eb8488df9c723c89491a10d6bbc4 Mon Sep 17 00:00:00 2001 From: yuanbohan Date: Mon, 17 Jun 2024 15:57:47 +0800 Subject: [PATCH] feat(pipeline): gsub prosessor (#4121) * chore: add log http ingester scaffold * chore: add some example code * chore: add log inserter * chore: add log handler file * chore: add pipeline lib * chore: import log handler * chore: add pipelime http handler * chore: add pipeline private table * chore: add pipeline API * chore: improve error handling * chore: merge main * chore: add multi content type support for log handler * refactor: remove servers dep on pipeline * refactor: move define_into_tonic_status to common-error * refactor: bring in pipeline 3eb890c551b8d7f60c4491fcfec18966e2b210a4 * chore: fix typo * refactor: bring in pipeline a95c9767d7056ab01dd8ca5fa1214456c6ffc72c * chore: fix typo and license header * refactor: move http event handler to a separate file * chore: add test for pipeline * chore: fmt * refactor: bring in pipeline 7d2402701877901871dd1294a65ac937605a6a93 * refactor: move `pipeline_operator` to `pipeline` crate * chore: minor update * refactor: bring in pipeline 1711f4d46687bada72426d88cda417899e0ae3a4 * chore: add log * chore: add log * chore: remove open hook * chore: minor update * chore: fix fmt * chore: minor update * chore: rename desc for pipeline table * refactor: remove updated_at in pipelines * chore: add more content type support for log inserter api * chore: introduce pipeline crate * chore: update upload pipeline api * chore: fix by pr commit * chore: add some doc for pub fn/struct * chore: some minro fix * chore: add pipeline version support * chore: impl log pipeline version * gsub prosessor * chore: add test * chore: update commit Co-authored-by: dennis zhuang --------- Co-authored-by: paomian Co-authored-by: shuiyisong Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com> Co-authored-by: dennis zhuang --- src/pipeline/src/etl/processor/gsub.rs | 229 +++++++++++++++++++++++++ src/pipeline/src/etl/processor/mod.rs | 3 + src/pipeline/src/etl/value/array.rs | 12 ++ src/pipeline/tests/gsub.rs | 70 ++++++++ 4 files changed, 314 insertions(+) create mode 100644 src/pipeline/src/etl/processor/gsub.rs create mode 100644 src/pipeline/tests/gsub.rs diff --git a/src/pipeline/src/etl/processor/gsub.rs b/src/pipeline/src/etl/processor/gsub.rs new file mode 100644 index 000000000000..736d55998d1a --- /dev/null +++ b/src/pipeline/src/etl/processor/gsub.rs @@ -0,0 +1,229 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use regex::Regex; + +use crate::etl::field::{Field, Fields}; +use crate::etl::processor::{ + yaml_bool, yaml_field, yaml_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME, + IGNORE_MISSING_NAME, +}; +use crate::etl::value::{Array, Map, Value}; + +pub(crate) const PROCESSOR_GSUB: &str = "gsub"; + +const REPLACEMENT_NAME: &str = "replacement"; +const PATTERN_NAME: &str = "pattern"; + +/// A processor to replace all matches of a pattern in string by a replacement, only support string value, and array string value +#[derive(Debug, Default)] +pub struct GsubProcessor { + fields: Fields, + pattern: Option, + replacement: Option, + ignore_missing: bool, +} + +impl GsubProcessor { + fn with_fields(&mut self, fields: Fields) { + self.fields = fields; + } + + fn with_ignore_missing(&mut self, ignore_missing: bool) { + self.ignore_missing = ignore_missing; + } + + fn try_pattern(&mut self, pattern: &str) -> Result<(), String> { + self.pattern = Some(Regex::new(pattern).map_err(|e| e.to_string())?); + Ok(()) + } + + fn with_replacement(&mut self, replacement: impl Into) { + self.replacement = Some(replacement.into()); + } + + fn check(self) -> Result { + if self.pattern.is_none() { + return Err("pattern is required".to_string()); + } + + if self.replacement.is_none() { + return Err("replacement is required".to_string()); + } + + Ok(self) + } + + fn process_string_field(&self, val: &str, field: &Field) -> Result { + let replacement = self.replacement.as_ref().unwrap(); + let new_val = self + .pattern + .as_ref() + .unwrap() + .replace_all(val, replacement) + .to_string(); + let val = Value::String(new_val); + + let key = match field.target_field { + Some(ref target_field) => target_field, + None => field.get_field(), + }; + + Ok(Map::one(key, val)) + } + + fn process_array_field(&self, arr: &Array, field: &Field) -> Result { + let key = match field.target_field { + Some(ref target_field) => target_field, + None => field.get_field(), + }; + + let re = self.pattern.as_ref().unwrap(); + let replacement = self.replacement.as_ref().unwrap(); + + let mut result = Array::default(); + for val in arr.iter() { + match val { + Value::String(haystack) => { + let new_val = re.replace_all(haystack, replacement).to_string(); + result.push(Value::String(new_val)); + } + _ => { + return Err(format!( + "{} processor: expect string or array string, but got {val:?}", + self.kind() + )) + } + } + } + + Ok(Map::one(key, Value::Array(result))) + } +} + +impl TryFrom<&yaml_rust::yaml::Hash> for GsubProcessor { + type Error = String; + + fn try_from(value: &yaml_rust::yaml::Hash) -> Result { + let mut processor = GsubProcessor::default(); + + for (k, v) in value.iter() { + let key = k + .as_str() + .ok_or(format!("key must be a string, but got {k:?}"))?; + match key { + FIELD_NAME => { + processor.with_fields(Fields::one(yaml_field(v, FIELD_NAME)?)); + } + FIELDS_NAME => { + processor.with_fields(yaml_fields(v, FIELDS_NAME)?); + } + PATTERN_NAME => { + processor.try_pattern(&yaml_string(v, PATTERN_NAME)?)?; + } + REPLACEMENT_NAME => { + processor.with_replacement(yaml_string(v, REPLACEMENT_NAME)?); + } + + IGNORE_MISSING_NAME => { + processor.with_ignore_missing(yaml_bool(v, IGNORE_MISSING_NAME)?); + } + + _ => {} + } + } + + processor.check() + } +} + +impl crate::etl::processor::Processor for GsubProcessor { + fn kind(&self) -> &str { + PROCESSOR_GSUB + } + + fn ignore_missing(&self) -> bool { + self.ignore_missing + } + + fn fields(&self) -> &Fields { + &self.fields + } + + fn exec_field(&self, val: &Value, field: &Field) -> Result { + match val { + Value::String(val) => self.process_string_field(val, field), + Value::Array(arr) => self.process_array_field(arr, field), + _ => Err(format!( + "{} processor: expect string or array string, but got {val:?}", + self.kind() + )), + } + } +} + +#[cfg(test)] +mod tests { + use crate::etl::field::Field; + use crate::etl::processor::gsub::GsubProcessor; + use crate::etl::processor::Processor; + use crate::etl::value::{Map, Value}; + + #[test] + fn test_string_value() { + let mut processor = GsubProcessor::default(); + processor.try_pattern(r"\d+").unwrap(); + processor.with_replacement("xxx"); + + let field = Field::new("message"); + let val = Value::String("123".to_string()); + let result = processor.exec_field(&val, &field).unwrap(); + + assert_eq!( + result, + Map::one("message", Value::String("xxx".to_string())) + ); + } + + #[test] + fn test_array_string_value() { + let mut processor = GsubProcessor::default(); + processor.try_pattern(r"\d+").unwrap(); + processor.with_replacement("xxx"); + + let field = Field::new("message"); + let val = Value::Array( + vec![ + Value::String("123".to_string()), + Value::String("456".to_string()), + ] + .into(), + ); + let result = processor.exec_field(&val, &field).unwrap(); + + assert_eq!( + result, + Map::one( + "message", + Value::Array( + vec![ + Value::String("xxx".to_string()), + Value::String("xxx".to_string()) + ] + .into() + ) + ) + ); + } +} diff --git a/src/pipeline/src/etl/processor/mod.rs b/src/pipeline/src/etl/processor/mod.rs index 96e8a629f252..a5782d2a147d 100644 --- a/src/pipeline/src/etl/processor/mod.rs +++ b/src/pipeline/src/etl/processor/mod.rs @@ -17,6 +17,7 @@ pub mod csv; pub mod date; pub mod dissect; pub mod epoch; +pub mod gsub; pub mod letter; pub mod regex; pub mod urlencoding; @@ -29,6 +30,7 @@ use csv::CsvProcessor; use date::DateProcessor; use dissect::DissectProcessor; use epoch::EpochProcessor; +use gsub::GsubProcessor; use letter::LetterProcessor; use regex::RegexProcessor; use urlencoding::UrlEncodingProcessor; @@ -163,6 +165,7 @@ fn parse_processor(doc: &yaml_rust::Yaml) -> Result, String> date::PROCESSOR_DATE => Arc::new(DateProcessor::try_from(value)?), dissect::PROCESSOR_DISSECT => Arc::new(DissectProcessor::try_from(value)?), epoch::PROCESSOR_EPOCH => Arc::new(EpochProcessor::try_from(value)?), + gsub::PROCESSOR_GSUB => Arc::new(GsubProcessor::try_from(value)?), letter::PROCESSOR_LETTER => Arc::new(LetterProcessor::try_from(value)?), regex::PROCESSOR_REGEX => Arc::new(RegexProcessor::try_from(value)?), urlencoding::PROCESSOR_URL_ENCODING => Arc::new(UrlEncodingProcessor::try_from(value)?), diff --git a/src/pipeline/src/etl/value/array.rs b/src/pipeline/src/etl/value/array.rs index a401cf00ab67..4cba167215aa 100644 --- a/src/pipeline/src/etl/value/array.rs +++ b/src/pipeline/src/etl/value/array.rs @@ -45,6 +45,12 @@ impl std::ops::Deref for Array { } } +impl std::ops::DerefMut for Array { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.values + } +} + impl IntoIterator for Array { type Item = Value; @@ -54,3 +60,9 @@ impl IntoIterator for Array { self.values.into_iter() } } + +impl From> for Array { + fn from(values: Vec) -> Self { + Array { values } + } +} diff --git a/src/pipeline/tests/gsub.rs b/src/pipeline/tests/gsub.rs new file mode 100644 index 000000000000..94fa6875331e --- /dev/null +++ b/src/pipeline/tests/gsub.rs @@ -0,0 +1,70 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use greptime_proto::v1::value::ValueData::TimestampMillisecondValue; +use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType}; +use pipeline::{parse, Content, GreptimeTransformer, Pipeline, Value}; + +#[test] +fn test_gsub() { + let input_value_str = r#" + [ + { + "reqTimeSec": "1573840000.000" + } + ] +"#; + let input_value: Value = serde_json::from_str::(input_value_str) + .expect("failed to parse input value") + .try_into() + .expect("failed to convert input value"); + + let pipeline_yaml = r#" +--- +description: Pipeline for Akamai DataStream2 Log + +processors: + - gsub: + field: reqTimeSec + pattern: "\\." + replacement: "" + - epoch: + field: reqTimeSec + resolution: millisecond + ignore_missing: true + +transform: + - field: reqTimeSec + type: epoch, millisecond + index: timestamp +"#; + + let yaml_content = Content::Yaml(pipeline_yaml.into()); + let pipeline: Pipeline = + parse(&yaml_content).expect("failed to parse pipeline"); + let output = pipeline.exec(input_value).expect("failed to exec pipeline"); + + let expected_schema = vec![ColumnSchema { + column_name: "reqTimeSec".to_string(), + datatype: ColumnDataType::TimestampMillisecond.into(), + semantic_type: SemanticType::Timestamp.into(), + datatype_extension: None, + }]; + + assert_eq!(output.schema, expected_schema); + assert_eq!( + output.rows[0].values[0].value_data, + Some(TimestampMillisecondValue(1573840000000)) + ); +}