From c049ce6ab1ae26b61bb7ef0a029f119a6d2d63d6 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 29 Nov 2024 19:18:02 +0800 Subject: [PATCH] feat: add decolorize processor (#5065) * feat: add decolorize processor Signed-off-by: Ruihang Xia * Update src/pipeline/src/etl/processor/cmcd.rs * add crate level integration test Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/pipeline/src/etl/processor.rs | 12 +- src/pipeline/src/etl/processor/cmcd.rs | 4 + src/pipeline/src/etl/processor/decolorize.rs | 195 +++++++++++++++++++ src/pipeline/src/etl/processor/dissect.rs | 17 -- src/pipeline/src/etl/processor/gsub.rs | 4 - src/pipeline/tests/pipeline.rs | 33 ++++ 6 files changed, 239 insertions(+), 26 deletions(-) create mode 100644 src/pipeline/src/etl/processor/decolorize.rs diff --git a/src/pipeline/src/etl/processor.rs b/src/pipeline/src/etl/processor.rs index 110e3e8736fb..6b00b19793ef 100644 --- a/src/pipeline/src/etl/processor.rs +++ b/src/pipeline/src/etl/processor.rs @@ -15,6 +15,7 @@ pub mod cmcd; pub mod csv; pub mod date; +pub mod decolorize; pub mod dissect; pub mod epoch; pub mod gsub; @@ -29,6 +30,7 @@ use ahash::{HashSet, HashSetExt}; use cmcd::{CmcdProcessor, CmcdProcessorBuilder}; use csv::{CsvProcessor, CsvProcessorBuilder}; use date::{DateProcessor, DateProcessorBuilder}; +use decolorize::{DecolorizeProcessor, DecolorizeProcessorBuilder}; use dissect::{DissectProcessor, DissectProcessorBuilder}; use enum_dispatch::enum_dispatch; use epoch::{EpochProcessor, EpochProcessorBuilder}; @@ -61,11 +63,6 @@ const TARGET_FIELDS_NAME: &str = "target_fields"; const JSON_PATH_NAME: &str = "json_path"; const JSON_PATH_RESULT_INDEX_NAME: &str = "result_index"; -// const IF_NAME: &str = "if"; -// const IGNORE_FAILURE_NAME: &str = "ignore_failure"; -// const ON_FAILURE_NAME: &str = "on_failure"; -// const TAG_NAME: &str = "tag"; - /// Processor trait defines the interface for all processors. /// /// A processor is a transformation that can be applied to a field in a document @@ -99,6 +96,7 @@ pub enum ProcessorKind { Epoch(EpochProcessor), Date(DateProcessor), JsonPath(JsonPathProcessor), + Decolorize(DecolorizeProcessor), } /// ProcessorBuilder trait defines the interface for all processor builders @@ -128,6 +126,7 @@ pub enum ProcessorBuilders { Epoch(EpochProcessorBuilder), Date(DateProcessorBuilder), JsonPath(JsonPathProcessorBuilder), + Decolorize(DecolorizeProcessorBuilder), } #[derive(Debug, Default)] @@ -275,6 +274,9 @@ fn parse_processor(doc: &yaml_rust::Yaml) -> Result { json_path::PROCESSOR_JSON_PATH => { ProcessorBuilders::JsonPath(json_path::JsonPathProcessorBuilder::try_from(value)?) } + decolorize::PROCESSOR_DECOLORIZE => { + ProcessorBuilders::Decolorize(DecolorizeProcessorBuilder::try_from(value)?) + } _ => return UnsupportedProcessorSnafu { processor: str_key }.fail(), }; diff --git a/src/pipeline/src/etl/processor/cmcd.rs b/src/pipeline/src/etl/processor/cmcd.rs index 06cfeb7c6905..086fe8f3d610 100644 --- a/src/pipeline/src/etl/processor/cmcd.rs +++ b/src/pipeline/src/etl/processor/cmcd.rs @@ -12,6 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +//! Pipeline Processor for CMCD (Common Media Client Data) data. +//! +//! Refer to [`CmcdProcessor`] for more information. + use std::collections::BTreeMap; use ahash::HashSet; diff --git a/src/pipeline/src/etl/processor/decolorize.rs b/src/pipeline/src/etl/processor/decolorize.rs new file mode 100644 index 000000000000..e72bc28a1e66 --- /dev/null +++ b/src/pipeline/src/etl/processor/decolorize.rs @@ -0,0 +1,195 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Removes ANSI color control codes from the input text. +//! +//! Similar to [`decolorize`](https://grafana.com/docs/loki/latest/query/log_queries/#removing-color-codes) +//! from Grafana Loki and [`strip_ansi_escape_codes`](https://vector.dev/docs/reference/vrl/functions/#strip_ansi_escape_codes) +//! from Vector VRL. + +use ahash::HashSet; +use once_cell::sync::Lazy; +use regex::Regex; +use snafu::OptionExt; + +use crate::etl::error::{ + Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result, +}; +use crate::etl::field::{Fields, OneInputOneOutputField}; +use crate::etl::processor::{ + yaml_bool, yaml_new_field, yaml_new_fields, ProcessorBuilder, ProcessorKind, FIELDS_NAME, + FIELD_NAME, IGNORE_MISSING_NAME, +}; +use crate::etl::value::Value; + +pub(crate) const PROCESSOR_DECOLORIZE: &str = "decolorize"; + +static RE: Lazy = Lazy::new(|| Regex::new(r"\x1b\[[0-9;]*m").unwrap()); + +#[derive(Debug, Default)] +pub struct DecolorizeProcessorBuilder { + fields: Fields, + ignore_missing: bool, +} + +impl ProcessorBuilder for DecolorizeProcessorBuilder { + fn output_keys(&self) -> HashSet<&str> { + self.fields + .iter() + .map(|f| f.target_or_input_field()) + .collect() + } + + fn input_keys(&self) -> HashSet<&str> { + self.fields.iter().map(|f| f.input_field()).collect() + } + + fn build(self, intermediate_keys: &[String]) -> Result { + self.build(intermediate_keys).map(ProcessorKind::Decolorize) + } +} + +impl DecolorizeProcessorBuilder { + fn build(self, intermediate_keys: &[String]) -> Result { + let mut real_fields = vec![]; + for field in self.fields.into_iter() { + let input = OneInputOneOutputField::build( + "decolorize", + intermediate_keys, + field.input_field(), + field.target_or_input_field(), + )?; + real_fields.push(input); + } + Ok(DecolorizeProcessor { + fields: real_fields, + ignore_missing: self.ignore_missing, + }) + } +} + +/// Remove ANSI color control codes from the input text. +#[derive(Debug, Default)] +pub struct DecolorizeProcessor { + fields: Vec, + ignore_missing: bool, +} + +impl DecolorizeProcessor { + fn process_string(&self, val: &str) -> Result { + Ok(Value::String(RE.replace_all(val, "").into_owned())) + } + + fn process(&self, val: &Value) -> Result { + match val { + Value::String(val) => self.process_string(val), + _ => ProcessorExpectStringSnafu { + processor: PROCESSOR_DECOLORIZE, + v: val.clone(), + } + .fail(), + } + } +} + +impl TryFrom<&yaml_rust::yaml::Hash> for DecolorizeProcessorBuilder { + type Error = Error; + + fn try_from(value: &yaml_rust::yaml::Hash) -> Result { + let mut fields = Fields::default(); + let mut ignore_missing = false; + + for (k, v) in value.iter() { + let key = k + .as_str() + .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?; + + match key { + FIELD_NAME => { + fields = Fields::one(yaml_new_field(v, FIELD_NAME)?); + } + FIELDS_NAME => { + fields = yaml_new_fields(v, FIELDS_NAME)?; + } + IGNORE_MISSING_NAME => { + ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?; + } + _ => {} + } + } + + Ok(DecolorizeProcessorBuilder { + fields, + ignore_missing, + }) + } +} + +impl crate::etl::processor::Processor for DecolorizeProcessor { + fn kind(&self) -> &str { + PROCESSOR_DECOLORIZE + } + + fn ignore_missing(&self) -> bool { + self.ignore_missing + } + + fn exec_mut(&self, val: &mut Vec) -> Result<()> { + for field in self.fields.iter() { + let index = field.input_index(); + match val.get(index) { + Some(Value::Null) | None => { + if !self.ignore_missing { + return ProcessorMissingFieldSnafu { + processor: self.kind(), + field: field.input_name(), + } + .fail(); + } + } + Some(v) => { + let result = self.process(v)?; + let output_index = field.output_index(); + val[output_index] = result; + } + } + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_decolorize_processor() { + let processor = DecolorizeProcessor { + fields: vec![], + ignore_missing: false, + }; + + let val = Value::String("\x1b[32mGreen\x1b[0m".to_string()); + let result = processor.process(&val).unwrap(); + assert_eq!(result, Value::String("Green".to_string())); + + let val = Value::String("Plain text".to_string()); + let result = processor.process(&val).unwrap(); + assert_eq!(result, Value::String("Plain text".to_string())); + + let val = Value::String("\x1b[46mfoo\x1b[0m bar".to_string()); + let result = processor.process(&val).unwrap(); + assert_eq!(result, Value::String("foo bar".to_string())); + } +} diff --git a/src/pipeline/src/etl/processor/dissect.rs b/src/pipeline/src/etl/processor/dissect.rs index 09c6fc93d069..a9ccf5e8735e 100644 --- a/src/pipeline/src/etl/processor/dissect.rs +++ b/src/pipeline/src/etl/processor/dissect.rs @@ -644,7 +644,6 @@ impl DissectProcessor { let mut pos = 0; let mut appends: HashMap> = HashMap::new(); - // let mut maps: HashMap = HashMap::new(); let mut process_name_value = |name: &Name, value: String| { let name_index = name.index; @@ -658,22 +657,6 @@ impl DissectProcessor { .or_default() .push((value, order.unwrap_or_default())); } - // Some(StartModifier::MapKey) => match maps.get(&name_index) { - // Some(map_val) => { - // map.insert(value, Value::String(map_val.to_string())); - // } - // None => { - // maps.insert(name_index, value); - // } - // }, - // Some(StartModifier::MapVal) => match maps.get(&name_index) { - // Some(map_key) => { - // map.insert(map_key, Value::String(value)); - // } - // None => { - // maps.insert(name_index, value); - // } - // }, Some(_) => { // do nothing, ignore MapKey and MapVal // because transform can know the key name diff --git a/src/pipeline/src/etl/processor/gsub.rs b/src/pipeline/src/etl/processor/gsub.rs index b5a328c6fa00..54c8306ec4de 100644 --- a/src/pipeline/src/etl/processor/gsub.rs +++ b/src/pipeline/src/etl/processor/gsub.rs @@ -132,10 +132,6 @@ impl GsubProcessor { v: val.clone(), } .fail(), - // Err(format!( - // "{} processor: expect string or array string, but got {val:?}", - // self.kind() - // )), } } } diff --git a/src/pipeline/tests/pipeline.rs b/src/pipeline/tests/pipeline.rs index d1df83273185..e68c7b9e6a6e 100644 --- a/src/pipeline/tests/pipeline.rs +++ b/src/pipeline/tests/pipeline.rs @@ -674,3 +674,36 @@ transform: assert_eq!(expected, r); } + +#[test] +fn test_decolorize() { + let input_value = serde_json::json!({ + "message": "\u{001b}[32mSuccess\u{001b}[0m and \u{001b}[31mError\u{001b}[0m" + }); + + let pipeline_yaml = r#" +processors: + - decolorize: + fields: + - message +transform: + - fields: + - message + type: string +"#; + let yaml_content = Content::Yaml(pipeline_yaml.into()); + let pipeline: Pipeline = parse(&yaml_content).unwrap(); + + let mut status = pipeline.init_intermediate_state(); + pipeline.prepare(input_value, &mut status).unwrap(); + let row = pipeline.exec_mut(&mut status).unwrap(); + + let r = row + .values + .into_iter() + .map(|v| v.value_data.unwrap()) + .collect::>(); + + let expected = StringValue("Success and Error".into()); + assert_eq!(expected, r[0]); +}