From 627a3262735eaa490cee8dadf413b61a04948d16 Mon Sep 17 00:00:00 2001 From: localhost Date: Thu, 26 Sep 2024 03:32:34 +0800 Subject: [PATCH] refactor: Change the error type in the pipeline crate from String to Error (#4763) * chore: in process * chore: change pipeline crate error type * chore: improve event error * chore: fix by pr comment * chore: use snafu context replace ok_or_else * refactor: update snafu usage --------- Co-authored-by: Ning Sun --- src/pipeline/benches/processor.rs | 8 +- src/pipeline/src/etl.rs | 30 +- src/pipeline/src/etl/error.rs | 552 ++++++++++++++++++ src/pipeline/src/etl/field.rs | 14 +- src/pipeline/src/etl/processor.rs | 71 ++- src/pipeline/src/etl/processor/cmcd.rs | 69 ++- src/pipeline/src/etl/processor/csv.rs | 62 +- src/pipeline/src/etl/processor/date.rs | 63 +- src/pipeline/src/etl/processor/dissect.rs | 224 +++---- src/pipeline/src/etl/processor/epoch.rs | 45 +- src/pipeline/src/etl/processor/gsub.rs | 65 ++- src/pipeline/src/etl/processor/join.rs | 47 +- src/pipeline/src/etl/processor/letter.rs | 44 +- src/pipeline/src/etl/processor/regex.rs | 79 +-- src/pipeline/src/etl/processor/timestamp.rs | 93 +-- src/pipeline/src/etl/processor/urlencoding.rs | 46 +- src/pipeline/src/etl/transform.rs | 45 +- src/pipeline/src/etl/transform/index.rs | 12 +- .../src/etl/transform/transformer/greptime.rs | 35 +- .../transform/transformer/greptime/coerce.rs | 80 +-- src/pipeline/src/etl/value.rs | 75 +-- src/pipeline/src/lib.rs | 1 + src/pipeline/src/manager/error.rs | 16 +- src/pipeline/src/manager/table.rs | 3 +- src/pipeline/tests/dissect.rs | 2 +- src/servers/src/http/event.rs | 37 +- 26 files changed, 1276 insertions(+), 542 deletions(-) create mode 100644 src/pipeline/src/etl/error.rs diff --git a/src/pipeline/benches/processor.rs b/src/pipeline/benches/processor.rs index 281d8ce0efd2..09462753d892 100644 --- a/src/pipeline/benches/processor.rs +++ b/src/pipeline/benches/processor.rs @@ -13,13 +13,13 @@ // limitations under the License. use criterion::{black_box, criterion_group, criterion_main, Criterion}; -use pipeline::{parse, Content, GreptimeTransformer, Pipeline}; +use pipeline::{parse, Content, GreptimeTransformer, Pipeline, Result}; use serde_json::{Deserializer, Value}; fn processor_mut( pipeline: &Pipeline, input_values: Vec, -) -> Result, String> { +) -> Result> { let mut payload = pipeline.init_intermediate_state(); let mut result = Vec::with_capacity(input_values.len()); @@ -30,7 +30,7 @@ fn processor_mut( pipeline.reset_intermediate_state(&mut payload); } - Ok::, String>(result) + Ok(result) } fn prepare_pipeline() -> Pipeline { @@ -230,7 +230,7 @@ fn criterion_benchmark(c: &mut Criterion) { let input_value_str = include_str!("./data.log"); let input_value = Deserializer::from_str(input_value_str) .into_iter::() - .collect::, _>>() + .collect::, _>>() .unwrap(); let pipeline = prepare_pipeline(); let mut group = c.benchmark_group("pipeline"); diff --git a/src/pipeline/src/etl.rs b/src/pipeline/src/etl.rs index f6b3efd6e6f3..f29032e4f8a2 100644 --- a/src/pipeline/src/etl.rs +++ b/src/pipeline/src/etl.rs @@ -14,6 +14,7 @@ #![allow(dead_code)] +pub mod error; pub mod field; pub mod processor; pub mod transform; @@ -21,12 +22,16 @@ pub mod value; use ahash::HashSet; use common_telemetry::debug; +use error::{IntermediateKeyIndexSnafu, PrepareValueMustBeObjectSnafu, YamlLoadSnafu}; use itertools::Itertools; use processor::{Processor, ProcessorBuilder, Processors}; +use snafu::{OptionExt, ResultExt}; use transform::{TransformBuilders, Transformer, Transforms}; use value::Value; use yaml_rust::YamlLoader; +use crate::etl::error::Result; + const DESCRIPTION: &str = "description"; const PROCESSORS: &str = "processors"; const TRANSFORM: &str = "transform"; @@ -37,13 +42,13 @@ pub enum Content { Yaml(String), } -pub fn parse(input: &Content) -> Result, String> +pub fn parse(input: &Content) -> Result> where T: Transformer, { match input { Content::Yaml(str) => { - let docs = YamlLoader::load_from_str(str).map_err(|e| e.to_string())?; + let docs = YamlLoader::load_from_str(str).context(YamlLoadSnafu)?; let doc = &docs[0]; @@ -124,7 +129,7 @@ where .processor_builders .into_iter() .map(|builder| builder.build(&final_intermediate_keys)) - .collect::, _>>()?; + .collect::>>()?; let processors = Processors { processors: processors_kind_list, required_keys: processors_required_keys.clone(), @@ -136,7 +141,7 @@ where .builders .into_iter() .map(|builder| builder.build(&final_intermediate_keys, &output_keys)) - .collect::, String>>()?; + .collect::>>()?; let transformers = Transforms { transforms: transfor_list, @@ -197,7 +202,7 @@ impl Pipeline where T: Transformer, { - pub fn exec_mut(&self, val: &mut Vec) -> Result { + pub fn exec_mut(&self, val: &mut Vec) -> Result { for processor in self.processors.iter() { processor.exec_mut(val)?; } @@ -205,7 +210,7 @@ where self.transformer.transform_mut(val) } - pub fn prepare(&self, val: serde_json::Value, result: &mut [Value]) -> Result<(), String> { + pub fn prepare(&self, val: serde_json::Value, result: &mut [Value]) -> Result<()> { match val { serde_json::Value::Object(map) => { let mut search_from = 0; @@ -230,7 +235,7 @@ where result[0] = val.try_into()?; } _ => { - return Err("expect object".to_string()); + return PrepareValueMustBeObjectSnafu.fail(); } } Ok(()) @@ -274,18 +279,11 @@ where } } -pub(crate) fn find_key_index( - intermediate_keys: &[String], - key: &str, - kind: &str, -) -> Result { +pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str) -> Result { intermediate_keys .iter() .position(|k| k == key) - .ok_or(format!( - "{} processor.{} not found in intermediate keys", - kind, key - )) + .context(IntermediateKeyIndexSnafu { kind, key }) } #[cfg(test)] diff --git a/src/pipeline/src/etl/error.rs b/src/pipeline/src/etl/error.rs new file mode 100644 index 000000000000..3680053ba0d7 --- /dev/null +++ b/src/pipeline/src/etl/error.rs @@ -0,0 +1,552 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use common_macro::stack_trace_debug; +use snafu::{Location, Snafu}; + +#[derive(Snafu)] +#[snafu(visibility(pub))] +#[stack_trace_debug] +pub enum Error { + #[snafu(display("Empty input field"))] + EmptyInputField { + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Missing input field"))] + MissingInputField { + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Processor must be a map"))] + ProcessorMustBeMap { + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("{processor} processor: missing field: {field}"))] + ProcessorMissingField { + processor: String, + field: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("{processor} processor: expect string value, but got {v:?}"))] + ProcessorExpectString { + processor: String, + v: crate::etl::Value, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("{processor} processor: unsupported value {val}"))] + ProcessorUnsupportedValue { + processor: &'static str, + val: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("processor key must be a string"))] + ProcessorKeyMustBeString { + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("{kind} processor: failed to parse {value}"))] + ProcessorFailedToParseString { + kind: String, + value: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("processor must have a string key"))] + ProcessorMustHaveStringKey { + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("unsupported {processor} processor"))] + UnsupportedProcessor { + processor: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Field {field} must be a {ty}"))] + FieldMustBeType { + field: String, + ty: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Field parse from string failed: {field}"))] + FailedParseFieldFromString { + #[snafu(source)] + error: Box, + field: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("failed to parse {key} as int: {value}"))] + FailedToParseIntKey { + key: String, + value: String, + #[snafu(source)] + error: std::num::ParseIntError, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to parse {value} to int"))] + FailedToParseInt { + value: String, + #[snafu(source)] + error: std::num::ParseIntError, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("failed to parse {key} as float: {value}"))] + FailedToParseFloatKey { + key: String, + value: String, + #[snafu(source)] + error: std::num::ParseFloatError, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("{kind} processor.{key} not found in intermediate keys"))] + IntermediateKeyIndex { + kind: String, + key: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("{k} missing value in {s}"))] + CmcdMissingValue { + k: String, + s: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("{part} missing key in {s}"))] + CmcdMissingKey { + part: String, + s: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("key must be a string, but got {k:?}"))] + KeyMustBeString { + k: yaml_rust::Yaml, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("csv read error"))] + CsvRead { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: csv::Error, + }, + #[snafu(display("expected at least one record from csv format, but got none"))] + CsvNoRecord { + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("'{separator}' must be a single character, but got '{value}'"))] + CsvSeparatorName { + separator: &'static str, + value: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("'{quote}' must be a single character, but got '{value}'"))] + CsvQuoteName { + quote: &'static str, + value: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Parse date timezone error {value}"))] + DateParseTimezone { + value: String, + #[snafu(source)] + error: chrono_tz::ParseError, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Parse date error {value}"))] + DateParse { + value: String, + #[snafu(source)] + error: chrono::ParseError, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("failed to get local timezone"))] + DateFailedToGetLocalTimezone { + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("failed to get timestamp"))] + DateFailedToGetTimestamp { + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("{processor} processor: invalid format {s}"))] + DateInvalidFormat { + s: String, + processor: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Invalid Pattern: '{s}'. {detail}"))] + DissectInvalidPattern { + s: String, + detail: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Empty pattern is not allowed"))] + DissectEmptyPattern { + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("'{split}' exceeds the input"))] + DissectSplitExceedsInput { + split: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("'{split}' does not match the input '{input}'"))] + DissectSplitNotMatchInput { + split: String, + input: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("consecutive names are not allowed: '{name1}' '{name2}'"))] + DissectConsecutiveNames { + name1: String, + name2: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("No matching pattern found"))] + DissectNoMatchingPattern { + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("'{m}' modifier already set, but found {modifier}"))] + DissectModifierAlreadySet { + m: String, + modifier: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Append Order modifier is already set to '{n}', cannot be set to {order}"))] + DissectAppendOrderAlreadySet { + n: String, + order: u32, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Order can only be set to Append Modifier, current modifier is {m}"))] + DissectOrderOnlyAppend { + m: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Order can only be set to Append Modifier"))] + DissectOrderOnlyAppendModifier { + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("End modifier already set: '{m}'"))] + DissectEndModifierAlreadySet { + m: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("invalid resolution: {resolution}"))] + EpochInvalidResolution { + resolution: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("pattern is required"))] + GsubPatternRequired { + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("replacement is required"))] + GsubReplacementRequired { + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("invalid regex pattern: {pattern}"))] + Regex { + #[snafu(source)] + error: regex::Error, + pattern: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("separator is required"))] + JoinSeparatorRequired { + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("invalid method: {method}"))] + LetterInvalidMethod { + method: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("no named group found in regex {origin}"))] + RegexNamedGroupNotFound { + origin: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("no valid field found in {processor} processor"))] + RegexNoValidField { + processor: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("no valid pattern found in {processor} processor"))] + RegexNoValidPattern { + processor: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("invalid method: {s}"))] + UrlEncodingInvalidMethod { + s: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("url decoding error"))] + UrlEncodingDecode { + #[snafu(source)] + error: std::string::FromUtf8Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("invalid transform on_failure value: {value}"))] + TransformOnFailureInvalidValue { + value: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("transform element must be a map"))] + TransformElementMustBeMap { + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("transform {fields:?} type MUST BE set before default {default}"))] + TransformTypeMustBeSet { + fields: String, + default: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("transform cannot be empty"))] + TransformEmpty { + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("column name must be unique, but got duplicated: {duplicates}"))] + TransformColumnNameMustBeUnique { + duplicates: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display( + "Illegal to set multiple timestamp Index columns, please set only one: {columns}" + ))] + TransformMultipleTimestampIndex { + columns: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("transform must have exactly one field specified as timestamp Index, but got {count}: {columns}"))] + TransformTimestampIndexCount { + count: usize, + columns: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Null type not supported"))] + CoerceUnsupportedNullType { + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Null type not supported when to coerce '{ty}' type"))] + CoerceUnsupportedNullTypeTo { + ty: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("{ty} value not supported for Epoch"))] + CoerceUnsupportedEpochType { + ty: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("failed to coerce string value '{s}' to type '{ty}'"))] + CoerceStringToType { + s: String, + ty: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display( + "invalid resolution: '{resolution}'. Available resolutions: {valid_resolution}" + ))] + ValueInvalidResolution { + resolution: String, + valid_resolution: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("failed to parse type: '{t}'"))] + ValueParseType { + t: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("failed to parse {ty}: {v}"))] + ValueParseInt { + ty: String, + v: String, + #[snafu(source)] + error: std::num::ParseIntError, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("failed to parse {ty}: {v}"))] + ValueParseFloat { + ty: String, + v: String, + #[snafu(source)] + error: std::num::ParseFloatError, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("failed to parse {ty}: {v}"))] + ValueParseBoolean { + ty: String, + v: String, + #[snafu(source)] + error: std::str::ParseBoolError, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("default value not unsupported for type {value}"))] + ValueDefaultValueUnsupported { + value: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("unsupported number type: {value}"))] + ValueUnsupportedNumberType { + value: serde_json::Number, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("unsupported yaml type: {value:?}"))] + ValueUnsupportedYamlType { + value: yaml_rust::Yaml, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("key in Hash must be a string, but got {value:?}"))] + ValueYamlKeyMustBeString { + value: yaml_rust::Yaml, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Yaml load error."))] + YamlLoad { + #[snafu(source)] + error: yaml_rust::ScanError, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Prepare value must be an object"))] + PrepareValueMustBeObject { + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Column options error"))] + ColumnOptions { + #[snafu(source)] + source: api::error::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("unsupported index type: {value}"))] + UnsupportedIndexType { + value: String, + #[snafu(implicit)] + location: Location, + }, +} + +pub type Result = std::result::Result; + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + StatusCode::InvalidArguments + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/pipeline/src/etl/field.rs b/src/pipeline/src/etl/field.rs index ff2f1ee7b5af..84d94040b541 100644 --- a/src/pipeline/src/etl/field.rs +++ b/src/pipeline/src/etl/field.rs @@ -15,6 +15,10 @@ use std::ops::Deref; use std::str::FromStr; +use snafu::OptionExt; + +use super::error::{EmptyInputFieldSnafu, MissingInputFieldSnafu}; +use crate::etl::error::{Error, Result}; use crate::etl::find_key_index; /// Information about the input field including the name and index in intermediate keys. @@ -56,7 +60,7 @@ impl OneInputOneOutputField { intermediate_keys: &[String], input_field: &str, target_field: &str, - ) -> Result { + ) -> Result { let input_index = find_key_index(intermediate_keys, input_field, processor_kind)?; let input_field_info = InputFieldInfo::new(input_field, input_index); @@ -145,19 +149,19 @@ pub struct Field { } impl FromStr for Field { - type Err = String; + type Err = Error; - fn from_str(s: &str) -> Result { + fn from_str(s: &str) -> Result { let mut parts = s.split(','); let input_field = parts .next() - .ok_or("input field is missing")? + .context(MissingInputFieldSnafu)? .trim() .to_string(); let target_field = parts.next().map(|x| x.trim().to_string()); if input_field.is_empty() { - return Err("input field is empty".to_string()); + return EmptyInputFieldSnafu.fail(); } Ok(Field { diff --git a/src/pipeline/src/etl/processor.rs b/src/pipeline/src/etl/processor.rs index 257cce4dfc0c..afda8f399e9c 100644 --- a/src/pipeline/src/etl/processor.rs +++ b/src/pipeline/src/etl/processor.rs @@ -36,10 +36,16 @@ use itertools::Itertools; use join::{JoinProcessor, JoinProcessorBuilder}; use letter::{LetterProcessor, LetterProcessorBuilder}; use regex::{RegexProcessor, RegexProcessorBuilder}; +use snafu::{OptionExt, ResultExt}; use timestamp::{TimestampProcessor, TimestampProcessorBuilder}; use urlencoding::{UrlEncodingProcessor, UrlEncodingProcessorBuilder}; +use super::error::{ + FailedParseFieldFromStringSnafu, FieldMustBeTypeSnafu, ProcessorKeyMustBeStringSnafu, + ProcessorMustBeMapSnafu, ProcessorMustHaveStringKeySnafu, UnsupportedProcessorSnafu, +}; use super::field::{Field, Fields}; +use crate::etl::error::{Error, Result}; use crate::etl::value::Value; const FIELD_NAME: &str = "field"; @@ -70,7 +76,7 @@ pub trait Processor: std::fmt::Debug + Send + Sync + 'static { fn ignore_missing(&self) -> bool; /// Execute the processor on a vector which be preprocessed by the pipeline - fn exec_mut(&self, val: &mut Vec) -> Result<(), String>; + fn exec_mut(&self, val: &mut Vec) -> Result<()>; } #[derive(Debug)] @@ -98,7 +104,7 @@ pub trait ProcessorBuilder: std::fmt::Debug + Send + Sync + 'static { /// Get the processor's input keys fn input_keys(&self) -> HashSet<&str>; /// Build the processor - fn build(self, intermediate_keys: &[String]) -> Result; + fn build(self, intermediate_keys: &[String]) -> Result; } #[derive(Debug)] @@ -171,9 +177,9 @@ impl Processors { } impl TryFrom<&Vec> for ProcessorBuilderList { - type Error = String; + type Error = Error; - fn try_from(vec: &Vec) -> Result { + fn try_from(vec: &Vec) -> Result { let mut processors_builders = vec![]; let mut all_output_keys = HashSet::with_capacity(50); let mut all_required_keys = HashSet::with_capacity(50); @@ -226,13 +232,10 @@ impl TryFrom<&Vec> for ProcessorBuilderList { } } -fn parse_processor(doc: &yaml_rust::Yaml) -> Result { - let map = doc.as_hash().ok_or("processor must be a map".to_string())?; +fn parse_processor(doc: &yaml_rust::Yaml) -> Result { + let map = doc.as_hash().context(ProcessorMustBeMapSnafu)?; - let key = map - .keys() - .next() - .ok_or("processor must have a string key".to_string())?; + let key = map.keys().next().context(ProcessorMustHaveStringKeySnafu)?; let value = map .get(key) @@ -240,9 +243,7 @@ fn parse_processor(doc: &yaml_rust::Yaml) -> Result { .as_hash() .expect("processor value must be a map"); - let str_key = key - .as_str() - .ok_or("processor key must be a string".to_string())?; + let str_key = key.as_str().context(ProcessorKeyMustBeStringSnafu)?; let processor = match str_key { cmcd::PROCESSOR_CMCD => ProcessorBuilders::Cmcd(CmcdProcessorBuilder::try_from(value)?), @@ -264,58 +265,72 @@ fn parse_processor(doc: &yaml_rust::Yaml) -> Result { urlencoding::PROCESSOR_URL_ENCODING => { ProcessorBuilders::UrlEncoding(UrlEncodingProcessorBuilder::try_from(value)?) } - _ => return Err(format!("unsupported {} processor", str_key)), + _ => return UnsupportedProcessorSnafu { processor: str_key }.fail(), }; Ok(processor) } -pub(crate) fn yaml_string(v: &yaml_rust::Yaml, field: &str) -> Result { +pub(crate) fn yaml_string(v: &yaml_rust::Yaml, field: &str) -> Result { v.as_str() .map(|s| s.to_string()) - .ok_or(format!("'{field}' must be a string")) + .context(FieldMustBeTypeSnafu { + field, + ty: "string", + }) } -pub(crate) fn yaml_strings(v: &yaml_rust::Yaml, field: &str) -> Result, String> { +pub(crate) fn yaml_strings(v: &yaml_rust::Yaml, field: &str) -> Result> { let vec = v .as_vec() - .ok_or(format!("'{field}' must be a list of strings",))? + .context(FieldMustBeTypeSnafu { + field, + ty: "list of string", + })? .iter() .map(|v| v.as_str().unwrap_or_default().into()) .collect(); Ok(vec) } -pub(crate) fn yaml_bool(v: &yaml_rust::Yaml, field: &str) -> Result { - v.as_bool().ok_or(format!("'{field}' must be a boolean")) +pub(crate) fn yaml_bool(v: &yaml_rust::Yaml, field: &str) -> Result { + v.as_bool().context(FieldMustBeTypeSnafu { + field, + ty: "boolean", + }) } -pub(crate) fn yaml_parse_string(v: &yaml_rust::Yaml, field: &str) -> Result +pub(crate) fn yaml_parse_string(v: &yaml_rust::Yaml, field: &str) -> Result where T: std::str::FromStr, - T::Err: ToString, + T::Err: std::error::Error + Send + Sync + 'static, { yaml_string(v, field)? .parse::() - .map_err(|e| e.to_string()) + .map_err(|e| Box::new(e) as Box) + .context(FailedParseFieldFromStringSnafu { field }) } -pub(crate) fn yaml_parse_strings(v: &yaml_rust::Yaml, field: &str) -> Result, String> +pub(crate) fn yaml_parse_strings(v: &yaml_rust::Yaml, field: &str) -> Result> where T: std::str::FromStr, - T::Err: ToString, + T::Err: std::error::Error + Send + Sync + 'static, { yaml_strings(v, field).and_then(|v| { v.into_iter() - .map(|s| s.parse::().map_err(|e| e.to_string())) + .map(|s| { + s.parse::() + .map_err(|e| Box::new(e) as Box) + .context(FailedParseFieldFromStringSnafu { field }) + }) .collect() }) } -pub(crate) fn yaml_new_fields(v: &yaml_rust::Yaml, field: &str) -> Result { +pub(crate) fn yaml_new_fields(v: &yaml_rust::Yaml, field: &str) -> Result { yaml_parse_strings(v, field).map(Fields::new) } -pub(crate) fn yaml_new_field(v: &yaml_rust::Yaml, field: &str) -> Result { +pub(crate) fn yaml_new_field(v: &yaml_rust::Yaml, field: &str) -> Result { yaml_parse_string(v, field) } diff --git a/src/pipeline/src/etl/processor/cmcd.rs b/src/pipeline/src/etl/processor/cmcd.rs index 1556829d6596..f43186b94aa0 100644 --- a/src/pipeline/src/etl/processor/cmcd.rs +++ b/src/pipeline/src/etl/processor/cmcd.rs @@ -15,8 +15,14 @@ use std::collections::BTreeMap; use ahash::HashSet; +use snafu::{OptionExt, ResultExt}; use urlencoding::decode; +use crate::etl::error::{ + CmcdMissingKeySnafu, CmcdMissingValueSnafu, Error, FailedToParseFloatKeySnafu, + FailedToParseIntKeySnafu, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, + ProcessorMissingFieldSnafu, Result, +}; use crate::etl::field::{Field, Fields, InputFieldInfo, OneInputMultiOutputField}; use crate::etl::find_key_index; use crate::etl::processor::{ @@ -82,7 +88,7 @@ impl CmcdProcessorBuilder { pub(super) fn build_cmcd_outputs( field: &Field, intermediate_keys: &[String], - ) -> Result<(BTreeMap, Vec), String> { + ) -> Result<(BTreeMap, Vec)> { let mut output_index = BTreeMap::new(); let mut cmcd_field_outputs = Vec::with_capacity(CMCD_KEYS.len()); for cmcd in CMCD_KEYS { @@ -119,7 +125,7 @@ impl CmcdProcessorBuilder { } /// build CmcdProcessor from CmcdProcessorBuilder - pub fn build(self, intermediate_keys: &[String]) -> Result { + pub fn build(self, intermediate_keys: &[String]) -> Result { let mut real_fields = vec![]; let mut cmcd_outputs = Vec::with_capacity(CMCD_KEYS.len()); for field in self.fields.into_iter() { @@ -151,7 +157,7 @@ impl ProcessorBuilder for CmcdProcessorBuilder { self.fields.iter().map(|f| f.input_field()).collect() } - fn build(self, intermediate_keys: &[String]) -> Result { + fn build(self, intermediate_keys: &[String]) -> Result { self.build(intermediate_keys).map(ProcessorKind::Cmcd) } } @@ -170,7 +176,7 @@ pub(super) struct CmcdOutputInfo { /// index in intermediate_keys index: usize, /// function to resolve value - f: fn(&str, &str, Option<&str>) -> Result, + f: fn(&str, &str, Option<&str>) -> Result, } impl CmcdOutputInfo { @@ -178,7 +184,7 @@ impl CmcdOutputInfo { final_key: String, key: &'static str, index: usize, - f: fn(&str, &str, Option<&str>) -> Result, + f: fn(&str, &str, Option<&str>) -> Result, ) -> Self { Self { final_key, @@ -201,28 +207,28 @@ impl Default for CmcdOutputInfo { } /// function to resolve CMCD_KEY_BS | CMCD_KEY_SU -fn bs_su(_: &str, _: &str, _: Option<&str>) -> Result { +fn bs_su(_: &str, _: &str, _: Option<&str>) -> Result { Ok(Value::Boolean(true)) } /// function to resolve CMCD_KEY_BR | CMCD_KEY_BL | CMCD_KEY_D | CMCD_KEY_DL | CMCD_KEY_MTP | CMCD_KEY_RTP | CMCD_KEY_TB -fn br_tb(s: &str, k: &str, v: Option<&str>) -> Result { - let v = v.ok_or(format!("{k} missing value in {s}"))?; +fn br_tb(s: &str, k: &str, v: Option<&str>) -> Result { + let v = v.context(CmcdMissingValueSnafu { k, s })?; let val: i64 = v .parse() - .map_err(|_| format!("failed to parse {v} as i64"))?; + .context(FailedToParseIntKeySnafu { key: k, value: v })?; Ok(Value::Int64(val)) } /// function to resolve CMCD_KEY_CID | CMCD_KEY_NRR | CMCD_KEY_OT | CMCD_KEY_SF | CMCD_KEY_SID | CMCD_KEY_V -fn cid_v(s: &str, k: &str, v: Option<&str>) -> Result { - let v = v.ok_or(format!("{k} missing value in {s}"))?; +fn cid_v(s: &str, k: &str, v: Option<&str>) -> Result { + let v = v.context(CmcdMissingValueSnafu { k, s })?; Ok(Value::String(v.to_string())) } /// function to resolve CMCD_KEY_NOR -fn nor(s: &str, k: &str, v: Option<&str>) -> Result { - let v = v.ok_or(format!("{k} missing value in {s}"))?; +fn nor(s: &str, k: &str, v: Option<&str>) -> Result { + let v = v.context(CmcdMissingValueSnafu { k, s })?; let val = match decode(v) { Ok(val) => val.to_string(), Err(_) => v.to_string(), @@ -231,11 +237,11 @@ fn nor(s: &str, k: &str, v: Option<&str>) -> Result { } /// function to resolve CMCD_KEY_PR -fn pr(s: &str, k: &str, v: Option<&str>) -> Result { - let v = v.ok_or(format!("{k} missing value in {s}"))?; +fn pr(s: &str, k: &str, v: Option<&str>) -> Result { + let v = v.context(CmcdMissingValueSnafu { k, s })?; let val: f64 = v .parse() - .map_err(|_| format!("failed to parse {v} as f64"))?; + .context(FailedToParseFloatKeySnafu { key: k, value: v })?; Ok(Value::Float64(val)) } @@ -287,12 +293,12 @@ impl CmcdProcessor { format!("{}_{}", prefix, key) } - fn parse(&self, field_index: usize, s: &str) -> Result, String> { + fn parse(&self, field_index: usize, s: &str) -> Result> { let parts = s.split(','); let mut result = Vec::new(); for part in parts { let mut kv = part.split('='); - let k = kv.next().ok_or(format!("{part} missing key in {s}"))?; + let k = kv.next().context(CmcdMissingKeySnafu { part, s })?; let v = kv.next(); for cmcd_key in self.cmcd_outputs[field_index].iter() { @@ -308,16 +314,16 @@ impl CmcdProcessor { } impl TryFrom<&yaml_rust::yaml::Hash> for CmcdProcessorBuilder { - type Error = String; + type Error = Error; - fn try_from(value: &yaml_rust::yaml::Hash) -> Result { + fn try_from(value: &yaml_rust::yaml::Hash) -> Result { let mut fields = Fields::default(); let mut ignore_missing = false; for (k, v) in value.iter() { let key = k .as_str() - .ok_or(format!("key must be a string, but got {k:?}"))?; + .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?; match key { FIELD_NAME => { fields = Fields::one(yaml_new_field(v, FIELD_NAME)?); @@ -362,7 +368,7 @@ impl Processor for CmcdProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut Vec) -> Result<(), String> { + fn exec_mut(&self, val: &mut Vec) -> Result<()> { for (field_index, field) in self.fields.iter().enumerate() { let field_value_index = field.input_index(); match val.get(field_value_index) { @@ -374,18 +380,19 @@ impl Processor for CmcdProcessor { } Some(Value::Null) | None => { if !self.ignore_missing { - return Err(format!( - "{} processor: missing field: {}", - self.kind(), - field.input_name() - )); + return ProcessorMissingFieldSnafu { + processor: self.kind().to_string(), + field: field.input_name().to_string(), + } + .fail(); } } Some(v) => { - return Err(format!( - "{} processor: expect string value, but got {v:?}", - self.kind() - )); + return ProcessorExpectStringSnafu { + processor: self.kind().to_string(), + v: v.clone(), + } + .fail(); } } } diff --git a/src/pipeline/src/etl/processor/csv.rs b/src/pipeline/src/etl/processor/csv.rs index fb1fca2bfb34..c9cb5f847db1 100644 --- a/src/pipeline/src/etl/processor/csv.rs +++ b/src/pipeline/src/etl/processor/csv.rs @@ -18,7 +18,12 @@ use ahash::HashSet; use csv::{ReaderBuilder, Trim}; use itertools::EitherOrBoth::{Both, Left, Right}; use itertools::Itertools; +use snafu::{OptionExt, ResultExt}; +use crate::etl::error::{ + CsvNoRecordSnafu, CsvQuoteNameSnafu, CsvReadSnafu, CsvSeparatorNameSnafu, Error, + KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result, +}; use crate::etl::field::{Fields, InputFieldInfo, OneInputMultiOutputField}; use crate::etl::find_key_index; use crate::etl::processor::{ @@ -53,7 +58,7 @@ pub struct CsvProcessorBuilder { } impl CsvProcessorBuilder { - fn build(self, intermediate_keys: &[String]) -> Result { + fn build(self, intermediate_keys: &[String]) -> Result { let mut real_fields = vec![]; for field in self.fields { @@ -68,7 +73,7 @@ impl CsvProcessorBuilder { .target_fields .iter() .map(|f| find_key_index(intermediate_keys, f, "csv")) - .collect::, String>>()?; + .collect::>>()?; Ok(CsvProcessor { reader: self.reader, fields: real_fields, @@ -88,7 +93,7 @@ impl ProcessorBuilder for CsvProcessorBuilder { self.fields.iter().map(|f| f.input_field()).collect() } - fn build(self, intermediate_keys: &[String]) -> Result { + fn build(self, intermediate_keys: &[String]) -> Result { self.build(intermediate_keys).map(ProcessorKind::Csv) } } @@ -114,11 +119,11 @@ pub struct CsvProcessor { impl CsvProcessor { // process the csv format string to a map with target_fields as keys - fn process(&self, val: &str) -> Result, String> { + fn process(&self, val: &str) -> Result> { let mut reader = self.reader.from_reader(val.as_bytes()); if let Some(result) = reader.records().next() { - let record: csv::StringRecord = result.map_err(|e| e.to_string())?; + let record: csv::StringRecord = result.context(CsvReadSnafu)?; let values: Vec<(usize, Value)> = self .output_index_info @@ -142,15 +147,15 @@ impl CsvProcessor { Ok(values) } else { - Err("expected at least one record from csv format, but got none".into()) + CsvNoRecordSnafu.fail() } } } impl TryFrom<&yaml_rust::yaml::Hash> for CsvProcessorBuilder { - type Error = String; + type Error = Error; - fn try_from(hash: &yaml_rust::yaml::Hash) -> Result { + fn try_from(hash: &yaml_rust::yaml::Hash) -> Result { let mut reader = ReaderBuilder::new(); reader.has_headers(false); @@ -162,7 +167,7 @@ impl TryFrom<&yaml_rust::yaml::Hash> for CsvProcessorBuilder { for (k, v) in hash { let key = k .as_str() - .ok_or(format!("key must be a string, but got {k:?}"))?; + .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?; match key { FIELD_NAME => { fields = Fields::one(yaml_new_field(v, FIELD_NAME)?); @@ -180,10 +185,11 @@ impl TryFrom<&yaml_rust::yaml::Hash> for CsvProcessorBuilder { SEPARATOR_NAME => { let separator = yaml_string(v, SEPARATOR_NAME)?; if separator.len() != 1 { - return Err(format!( - "'{}' must be a single character, but got '{}'", - SEPARATOR_NAME, separator - )); + return CsvSeparatorNameSnafu { + separator: SEPARATOR_NAME, + value: separator, + } + .fail(); } else { reader.delimiter(separator.as_bytes()[0]); } @@ -191,10 +197,11 @@ impl TryFrom<&yaml_rust::yaml::Hash> for CsvProcessorBuilder { QUOTE_NAME => { let quote = yaml_string(v, QUOTE_NAME)?; if quote.len() != 1 { - return Err(format!( - "'{}' must be a single character, but got '{}'", - QUOTE_NAME, quote - )); + return CsvQuoteNameSnafu { + quote: QUOTE_NAME, + value: quote, + } + .fail(); } else { reader.quote(quote.as_bytes()[0]); } @@ -240,7 +247,7 @@ impl Processor for CsvProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut Vec) -> Result<(), String> { + fn exec_mut(&self, val: &mut Vec) -> Result<()> { for field in self.fields.iter() { let index = field.input_index(); match val.get(index) { @@ -252,18 +259,19 @@ impl Processor for CsvProcessor { } Some(Value::Null) | None => { if !self.ignore_missing { - return Err(format!( - "{} processor: missing field: {}", - self.kind(), - field.input_name() - )); + return ProcessorMissingFieldSnafu { + processor: self.kind().to_string(), + field: field.input_name().to_string(), + } + .fail(); } } Some(v) => { - return Err(format!( - "{} processor: expect string value, but got {v:?}", - self.kind() - )); + return ProcessorExpectStringSnafu { + processor: self.kind().to_string(), + v: v.clone(), + } + .fail(); } } } diff --git a/src/pipeline/src/etl/processor/date.rs b/src/pipeline/src/etl/processor/date.rs index b9bfcf3b6c6b..fa202a0edff2 100644 --- a/src/pipeline/src/etl/processor/date.rs +++ b/src/pipeline/src/etl/processor/date.rs @@ -18,7 +18,13 @@ use ahash::HashSet; use chrono::{DateTime, NaiveDateTime}; use chrono_tz::Tz; use lazy_static::lazy_static; +use snafu::{OptionExt, ResultExt}; +use crate::etl::error::{ + DateFailedToGetLocalTimezoneSnafu, DateFailedToGetTimestampSnafu, DateParseSnafu, + DateParseTimezoneSnafu, Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, + ProcessorFailedToParseStringSnafu, ProcessorMissingFieldSnafu, Result, +}; use crate::etl::field::{Fields, OneInputOneOutputField}; use crate::etl::processor::{ yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, yaml_strings, Processor, @@ -103,13 +109,13 @@ impl ProcessorBuilder for DateProcessorBuilder { self.fields.iter().map(|f| f.input_field()).collect() } - fn build(self, intermediate_keys: &[String]) -> Result { + fn build(self, intermediate_keys: &[String]) -> Result { self.build(intermediate_keys).map(ProcessorKind::Date) } } impl DateProcessorBuilder { - pub fn build(self, intermediate_keys: &[String]) -> Result { + pub fn build(self, intermediate_keys: &[String]) -> Result { let mut real_fields = vec![]; for field in self.fields.into_iter() { let input = OneInputOneOutputField::build( @@ -131,9 +137,9 @@ impl DateProcessorBuilder { } impl TryFrom<&yaml_rust::yaml::Hash> for DateProcessorBuilder { - type Error = String; + type Error = Error; - fn try_from(hash: &yaml_rust::yaml::Hash) -> Result { + fn try_from(hash: &yaml_rust::yaml::Hash) -> Result { let mut fields = Fields::default(); let mut formats = Formats::default(); let mut timezone = None; @@ -143,7 +149,7 @@ impl TryFrom<&yaml_rust::yaml::Hash> for DateProcessorBuilder { for (k, v) in hash { let key = k .as_str() - .ok_or(format!("key must be a string, but got {k:?}"))?; + .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?; match key { FIELD_NAME => { @@ -205,10 +211,12 @@ pub struct DateProcessor { } impl DateProcessor { - fn parse(&self, val: &str) -> Result { + fn parse(&self, val: &str) -> Result { let mut tz = Tz::UTC; if let Some(timezone) = &self.timezone { - tz = timezone.parse::().map_err(|e| e.to_string())?; + tz = timezone.parse::().context(DateParseTimezoneSnafu { + value: timezone.as_ref(), + })?; } for fmt in self.formats.iter() { @@ -217,7 +225,11 @@ impl DateProcessor { } } - Err(format!("{} processor: failed to parse {val}", self.kind(),)) + ProcessorFailedToParseStringSnafu { + kind: PROCESSOR_DATE.to_string(), + value: val.to_string(), + } + .fail() } } @@ -230,7 +242,7 @@ impl Processor for DateProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut Vec) -> Result<(), String> { + fn exec_mut(&self, val: &mut Vec) -> Result<()> { for field in self.fields.iter() { let index = field.input_index(); match val.get(index) { @@ -241,18 +253,19 @@ impl Processor for DateProcessor { } Some(Value::Null) | None => { if !self.ignore_missing { - return Err(format!( - "{} processor: missing field: {}", - self.kind(), - field.input_name() - )); + return ProcessorMissingFieldSnafu { + processor: self.kind().to_string(), + field: field.input_name().to_string(), + } + .fail(); } } Some(v) => { - return Err(format!( - "{} processor: expect string value, but got {v:?}", - self.kind() - )); + return ProcessorExpectStringSnafu { + processor: self.kind().to_string(), + v: v.clone(), + } + .fail(); } } } @@ -261,16 +274,20 @@ impl Processor for DateProcessor { } /// try to parse val with timezone first, if failed, parse without timezone -fn try_parse(val: &str, fmt: &str, tz: Tz) -> Result { +fn try_parse(val: &str, fmt: &str, tz: Tz) -> Result { if let Ok(dt) = DateTime::parse_from_str(val, fmt) { - Ok(dt.timestamp_nanos_opt().ok_or("failed to get timestamp")?) + Ok(dt + .timestamp_nanos_opt() + .context(DateFailedToGetTimestampSnafu)?) } else { let dt = NaiveDateTime::parse_from_str(val, fmt) - .map_err(|e| e.to_string())? + .context(DateParseSnafu { value: val })? .and_local_timezone(tz) .single() - .ok_or("failed to get local timezone")?; - Ok(dt.timestamp_nanos_opt().ok_or("failed to get timestamp")?) + .context(DateFailedToGetLocalTimezoneSnafu)?; + Ok(dt + .timestamp_nanos_opt() + .context(DateFailedToGetTimestampSnafu)?) } } diff --git a/src/pipeline/src/etl/processor/dissect.rs b/src/pipeline/src/etl/processor/dissect.rs index f9925916fce4..09c6fc93d069 100644 --- a/src/pipeline/src/etl/processor/dissect.rs +++ b/src/pipeline/src/etl/processor/dissect.rs @@ -16,7 +16,15 @@ use std::ops::Deref; use ahash::{HashMap, HashMapExt, HashSet, HashSetExt}; use itertools::Itertools; - +use snafu::OptionExt; + +use crate::etl::error::{ + DissectAppendOrderAlreadySetSnafu, DissectConsecutiveNamesSnafu, DissectEmptyPatternSnafu, + DissectEndModifierAlreadySetSnafu, DissectInvalidPatternSnafu, DissectModifierAlreadySetSnafu, + DissectNoMatchingPatternSnafu, DissectOrderOnlyAppendModifierSnafu, + DissectOrderOnlyAppendSnafu, DissectSplitExceedsInputSnafu, DissectSplitNotMatchInputSnafu, + Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result, +}; use crate::etl::field::{Fields, InputFieldInfo, OneInputMultiOutputField}; use crate::etl::find_key_index; use crate::etl::processor::{ @@ -77,9 +85,13 @@ impl NameInfo { self.name.is_empty() && self.start_modifier.is_none() && self.end_modifier.is_none() } - fn try_start_modifier(&mut self, modifier: StartModifier) -> Result<(), String> { + fn try_start_modifier(&mut self, modifier: StartModifier) -> Result<()> { match &self.start_modifier { - Some(m) => Err(format!("'{m}' modifier already set, but found {modifier}",)), + Some(m) => DissectModifierAlreadySetSnafu { + m: m.to_string(), + modifier: modifier.to_string(), + } + .fail(), None => { self.start_modifier = Some(modifier); Ok(()) @@ -87,27 +99,27 @@ impl NameInfo { } } - fn try_append_order(&mut self, order: u32) -> Result<(), String> { + fn try_append_order(&mut self, order: u32) -> Result<()> { match &mut self.start_modifier { Some(StartModifier::Append(o)) => match o { - Some(n) => Err(format!( - "Append Order modifier is already set to '{n}', cannot be set to {order}" - )), + Some(n) => DissectAppendOrderAlreadySetSnafu { + n: n.to_string(), + order, + } + .fail(), None => { *o = Some(order); Ok(()) } }, - Some(m) => Err(format!( - "Order can only be set to Append Modifier, current modifier is {m}" - )), - None => Err("Order can only be set to Append Modifier".to_string()), + Some(m) => DissectOrderOnlyAppendSnafu { m: m.to_string() }.fail(), + None => DissectOrderOnlyAppendModifierSnafu.fail(), } } - fn try_end_modifier(&mut self) -> Result<(), String> { + fn try_end_modifier(&mut self) -> Result<()> { match &self.end_modifier { - Some(m) => Err(format!("End modifier already set: '{m}'")), + Some(m) => DissectEndModifierAlreadySetSnafu { m: m.to_string() }.fail(), None => { self.end_modifier = Some(EndModifier); Ok(()) @@ -290,9 +302,9 @@ impl std::ops::DerefMut for PatternInfo { } impl std::str::FromStr for PatternInfo { - type Err = String; + type Err = Error; - fn from_str(s: &str) -> Result { + fn from_str(s: &str) -> Result { let mut parts = vec![]; let mut cursor = PartInfo::empty_split(); @@ -338,9 +350,11 @@ impl std::str::FromStr for PatternInfo { } if j == pos + 1 { - return Err(format!( - "Invalid Pattern: '{s}'. Digit order must be set after '/'", - )); + return DissectInvalidPatternSnafu { + s, + detail: "Digit order must be set after '/'", + } + .fail(); } name.try_append_order(order)?; @@ -358,14 +372,20 @@ impl std::str::FromStr for PatternInfo { ('-', PartInfo::Name(name)) if !name.is_end_modifier_set() => { if let Some('>') = chars.get(pos + 1) { } else { - return Err(format!( - "Invalid Pattern: '{s}'. expected '->' but only '-'", - )); + return DissectInvalidPatternSnafu { + s, + detail: "Expected '->' but only '-'", + } + .fail(); } if let Some('}') = chars.get(pos + 2) { } else { - return Err(format!("Invalid Pattern: '{s}'. expected '}}' after '->'",)); + return DissectInvalidPatternSnafu { + s, + detail: "Expected '}' after '->'", + } + .fail(); } name.try_end_modifier()?; @@ -377,7 +397,7 @@ impl std::str::FromStr for PatternInfo { } else { format!("Invalid '{ch}' in '{name}'") }; - return Err(format!("Invalid Pattern: '{s}'. {tail}")); + return DissectInvalidPatternSnafu { s, detail: tail }.fail(); } (_, PartInfo::Name(_)) => { cursor.push(ch); @@ -390,7 +410,11 @@ impl std::str::FromStr for PatternInfo { match cursor { PartInfo::Split(ref split) if !split.is_empty() => parts.push(cursor), PartInfo::Name(name) if !name.is_empty() => { - return Err(format!("Invalid Pattern: '{s}'. '{name}' is not closed")) + return DissectInvalidPatternSnafu { + s, + detail: format!("'{name}' is not closed"), + } + .fail(); } _ => {} } @@ -402,9 +426,9 @@ impl std::str::FromStr for PatternInfo { } impl PatternInfo { - fn check(&self) -> Result<(), String> { + fn check(&self) -> Result<()> { if self.len() == 0 { - return Err("Empty pattern is not allowed".to_string()); + return DissectEmptyPatternSnafu.fail(); } let mut map_keys = HashSet::new(); @@ -415,42 +439,47 @@ impl PatternInfo { let next_part = self.get(i + 1); match (this_part, next_part) { (PartInfo::Split(split), _) if split.is_empty() => { - return Err(format!( - "Invalid Pattern: '{}'. Empty split is not allowed", - self.origin - )); + return DissectInvalidPatternSnafu { + s: &self.origin, + detail: "Empty split is not allowed", + } + .fail(); } (PartInfo::Name(name1), Some(PartInfo::Name(name2))) => { - return Err(format!( - "Invalid Pattern: '{}'. consecutive names are not allowed: '{}' '{}'", - self.origin, name1, name2 - )); + return DissectInvalidPatternSnafu { + s: &self.origin, + detail: format!("consecutive names are not allowed: '{name1}' '{name2}'",), + } + .fail(); } (PartInfo::Name(name), _) if name.is_name_empty() => { if let Some(ref m) = name.start_modifier { - return Err(format!( - "Invalid Pattern: '{}'. only '{}' modifier is invalid", - self.origin, m - )); + return DissectInvalidPatternSnafu { + s: &self.origin, + detail: format!("only '{m}' modifier is invalid"), + } + .fail(); } } (PartInfo::Name(name), _) => match name.start_modifier { Some(StartModifier::MapKey) => { if map_keys.contains(&name.name) { - return Err(format!( - "Invalid Pattern: '{}'. Duplicate map key: '{}'", - self.origin, name.name - )); + return DissectInvalidPatternSnafu { + s: &self.origin, + detail: format!("Duplicate map key: '{}'", name.name), + } + .fail(); } else { map_keys.insert(&name.name); } } Some(StartModifier::MapVal) => { if map_vals.contains(&name.name) { - return Err(format!( - "Invalid Pattern: '{}'. Duplicate map val: '{}'", - self.origin, name.name - )); + return DissectInvalidPatternSnafu { + s: &self.origin, + detail: format!("Duplicate map val: '{}'", name.name), + } + .fail(); } else { map_vals.insert(&name.name); } @@ -462,15 +491,18 @@ impl PatternInfo { } if map_keys != map_vals { - return Err(format!( - "Invalid Pattern: '{}'. key and value not matched: '{}'", - self.origin, - map_keys - .symmetric_difference(&map_vals) - .map(|s| s.as_str()) - .collect::>() - .join(",") - )); + return DissectInvalidPatternSnafu { + s: &self.origin, + detail: format!( + "key and value not matched: '{}'", + map_keys + .symmetric_difference(&map_vals) + .map(|s| s.as_str()) + .collect::>() + .join(",") + ), + } + .fail(); } Ok(()) @@ -516,10 +548,7 @@ impl DissectProcessorBuilder { .collect() } - fn part_info_to_part( - part_info: PartInfo, - intermediate_keys: &[String], - ) -> Result { + fn part_info_to_part(part_info: PartInfo, intermediate_keys: &[String]) -> Result { match part_info { PartInfo::Split(s) => Ok(Part::Split(s)), PartInfo::Name(n) => match n.start_modifier { @@ -545,13 +574,13 @@ impl DissectProcessorBuilder { fn pattern_info_to_pattern( pattern_info: PatternInfo, intermediate_keys: &[String], - ) -> Result { + ) -> Result { let original = pattern_info.origin; let pattern = pattern_info .parts .into_iter() .map(|part_info| Self::part_info_to_part(part_info, intermediate_keys)) - .collect::, String>>()?; + .collect::>>()?; Ok(Pattern { origin: original, parts: pattern, @@ -561,7 +590,7 @@ impl DissectProcessorBuilder { fn build_patterns_from_pattern_infos( patterns: Vec, intermediate_keys: &[String], - ) -> Result, String> { + ) -> Result> { patterns .into_iter() .map(|pattern_info| Self::pattern_info_to_pattern(pattern_info, intermediate_keys)) @@ -578,7 +607,7 @@ impl ProcessorBuilder for DissectProcessorBuilder { self.fields.iter().map(|f| f.input_field()).collect() } - fn build(self, intermediate_keys: &[String]) -> Result { + fn build(self, intermediate_keys: &[String]) -> Result { let mut real_fields = vec![]; for field in self.fields.into_iter() { let input_index = find_key_index(intermediate_keys, field.input_field(), "dissect")?; @@ -610,11 +639,7 @@ pub struct DissectProcessor { } impl DissectProcessor { - fn process_pattern( - &self, - chs: &[char], - pattern: &Pattern, - ) -> Result, String> { + fn process_pattern(&self, chs: &[char], pattern: &Pattern) -> Result> { let mut map = Vec::new(); let mut pos = 0; @@ -668,23 +693,26 @@ impl DissectProcessor { let split_chs = split.chars().collect::>(); let split_len = split_chs.len(); if pos + split_len > chs.len() { - return Err(format!("'{split}' exceeds the input",)); + return DissectSplitExceedsInputSnafu { split }.fail(); } if &chs[pos..pos + split_len] != split_chs.as_slice() { - return Err(format!( - "'{split}' does not match the input '{}'", - chs[pos..pos + split_len].iter().collect::() - )); + return DissectSplitNotMatchInputSnafu { + split, + input: chs[pos..pos + split_len].iter().collect::(), + } + .fail(); } pos += split_len; } (Part::Name(name1), Some(Part::Name(name2))) => { - return Err(format!( - "consecutive names are not allowed: '{name1}' '{name2}'" - )); + return DissectConsecutiveNamesSnafu { + name1: name1.to_string(), + name2: name2.to_string(), + } + .fail(); } // if Name part is the last part, then the rest of the input is the value @@ -695,10 +723,10 @@ impl DissectProcessor { // if Name part, and next part is Split, then find the matched value of the name (Part::Name(name), Some(Part::Split(split))) => { - let stop = split - .chars() - .next() - .ok_or("Empty split is not allowed".to_string())?; // this won't happen + let stop = split.chars().next().context(DissectInvalidPatternSnafu { + s: &pattern.origin, + detail: "Empty split is not allowed", + })?; // this won't happen let mut end = pos; while end < chs.len() && chs[end] != stop { end += 1; @@ -737,7 +765,7 @@ impl DissectProcessor { Ok(map) } - fn process(&self, val: &str) -> Result, String> { + fn process(&self, val: &str) -> Result> { let chs = val.chars().collect::>(); for pattern in &self.patterns { @@ -745,15 +773,14 @@ impl DissectProcessor { return Ok(map); } } - - Err("No matching pattern found".to_string()) + DissectNoMatchingPatternSnafu.fail() } } impl TryFrom<&yaml_rust::yaml::Hash> for DissectProcessorBuilder { - type Error = String; + type Error = Error; - fn try_from(value: &yaml_rust::yaml::Hash) -> Result { + fn try_from(value: &yaml_rust::yaml::Hash) -> Result { let mut fields = Fields::default(); let mut patterns = vec![]; let mut ignore_missing = false; @@ -762,7 +789,7 @@ impl TryFrom<&yaml_rust::yaml::Hash> for DissectProcessorBuilder { for (k, v) in value.iter() { let key = k .as_str() - .ok_or(format!("key must be a string, but got '{k:?}'"))?; + .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?; match key { FIELD_NAME => { @@ -809,7 +836,7 @@ impl Processor for DissectProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut Vec) -> Result<(), String> { + fn exec_mut(&self, val: &mut Vec) -> Result<()> { for field in self.fields.iter() { let index = field.input_index(); match val.get(index) { @@ -821,18 +848,19 @@ impl Processor for DissectProcessor { } Some(Value::Null) | None => { if !self.ignore_missing { - return Err(format!( - "{} processor: missing field: {}", - self.kind(), - field.input_name() - )); + return ProcessorMissingFieldSnafu { + processor: self.kind(), + field: field.input_name(), + } + .fail(); } } Some(v) => { - return Err(format!( - "{} processor: expect string value, but got {v:?}", - self.kind() - )); + return ProcessorExpectStringSnafu { + processor: self.kind(), + v: v.clone(), + } + .fail(); } } } @@ -1123,7 +1151,7 @@ mod tests { ), ( "%{->clientip} ", - "Invalid Pattern: '%{->clientip} '. expected '}' after '->'", + "Invalid Pattern: '%{->clientip} '. Expected '}' after '->'", ), ( "%{/clientip} ", @@ -1185,7 +1213,7 @@ mod tests { for (pattern, expected) in cases.into_iter() { let err = pattern.parse::().unwrap_err(); - assert_eq!(err, expected); + assert_eq!(err.to_string(), expected); } } diff --git a/src/pipeline/src/etl/processor/epoch.rs b/src/pipeline/src/etl/processor/epoch.rs index 32c7d617860d..f2c03fd120de 100644 --- a/src/pipeline/src/etl/processor/epoch.rs +++ b/src/pipeline/src/etl/processor/epoch.rs @@ -13,7 +13,12 @@ // limitations under the License. use ahash::HashSet; +use snafu::{OptionExt, ResultExt}; +use crate::etl::error::{ + EpochInvalidResolutionSnafu, Error, FailedToParseIntSnafu, KeyMustBeStringSnafu, + ProcessorMissingFieldSnafu, ProcessorUnsupportedValueSnafu, Result, +}; use crate::etl::field::{Fields, OneInputOneOutputField}; use crate::etl::processor::{ yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, ProcessorBuilder, @@ -39,15 +44,15 @@ enum Resolution { } impl TryFrom<&str> for Resolution { - type Error = String; + type Error = Error; - fn try_from(s: &str) -> Result { + fn try_from(s: &str) -> Result { match s { SECOND_RESOLUTION | SEC_RESOLUTION | S_RESOLUTION => Ok(Resolution::Second), MILLISECOND_RESOLUTION | MILLI_RESOLUTION | MS_RESOLUTION => Ok(Resolution::Milli), MICROSECOND_RESOLUTION | MICRO_RESOLUTION | US_RESOLUTION => Ok(Resolution::Micro), NANOSECOND_RESOLUTION | NANO_RESOLUTION | NS_RESOLUTION => Ok(Resolution::Nano), - _ => Err(format!("invalid resolution: {s}")), + _ => EpochInvalidResolutionSnafu { resolution: s }.fail(), } } } @@ -71,13 +76,13 @@ impl ProcessorBuilder for EpochProcessorBuilder { self.fields.iter().map(|f| f.input_field()).collect() } - fn build(self, intermediate_keys: &[String]) -> Result { + fn build(self, intermediate_keys: &[String]) -> Result { self.build(intermediate_keys).map(ProcessorKind::Epoch) } } impl EpochProcessorBuilder { - pub fn build(self, intermediate_keys: &[String]) -> Result { + pub fn build(self, intermediate_keys: &[String]) -> Result { let mut real_fields = vec![]; for field in self.fields.into_iter() { let input = OneInputOneOutputField::build( @@ -112,11 +117,11 @@ pub struct EpochProcessor { } impl EpochProcessor { - fn parse(&self, val: &Value) -> Result { + fn parse(&self, val: &Value) -> Result { let t: i64 = match val { Value::String(s) => s .parse::() - .map_err(|e| format!("Failed to parse {} to number: {}", s, e))?, + .context(FailedToParseIntSnafu { value: s })?, Value::Int16(i) => *i as i64, Value::Int32(i) => *i as i64, Value::Int64(i) => *i, @@ -135,9 +140,11 @@ impl EpochProcessor { }, _ => { - return Err(format!( - "{PROCESSOR_EPOCH} processor: unsupported value {val}" - )) + return ProcessorUnsupportedValueSnafu { + processor: PROCESSOR_EPOCH, + val: val.to_string(), + } + .fail(); } }; @@ -151,9 +158,9 @@ impl EpochProcessor { } impl TryFrom<&yaml_rust::yaml::Hash> for EpochProcessorBuilder { - type Error = String; + type Error = Error; - fn try_from(hash: &yaml_rust::yaml::Hash) -> Result { + fn try_from(hash: &yaml_rust::yaml::Hash) -> Result { let mut fields = Fields::default(); let mut resolution = Resolution::default(); let mut ignore_missing = false; @@ -161,7 +168,7 @@ impl TryFrom<&yaml_rust::yaml::Hash> for EpochProcessorBuilder { for (k, v) in hash { let key = k .as_str() - .ok_or(format!("key must be a string, but got {k:?}"))?; + .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?; match key { FIELD_NAME => { @@ -200,17 +207,17 @@ impl Processor for EpochProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut Vec) -> Result<(), String> { + fn exec_mut(&self, val: &mut Vec) -> Result<()> { for field in self.fields.iter() { let index = field.input_index(); match val.get(index) { Some(Value::Null) | None => { if !self.ignore_missing { - return Err(format!( - "{} processor: missing field: {}", - self.kind(), - field.input_name() - )); + return ProcessorMissingFieldSnafu { + processor: self.kind(), + field: field.input_name(), + } + .fail(); } } Some(v) => { diff --git a/src/pipeline/src/etl/processor/gsub.rs b/src/pipeline/src/etl/processor/gsub.rs index 1b8e581e6a3f..b5a328c6fa00 100644 --- a/src/pipeline/src/etl/processor/gsub.rs +++ b/src/pipeline/src/etl/processor/gsub.rs @@ -14,11 +14,16 @@ use ahash::HashSet; use regex::Regex; +use snafu::{OptionExt, ResultExt}; +use crate::etl::error::{ + Error, GsubPatternRequiredSnafu, GsubReplacementRequiredSnafu, KeyMustBeStringSnafu, + ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, RegexSnafu, Result, +}; use crate::etl::field::{Fields, OneInputOneOutputField}; use crate::etl::processor::{ - yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, ProcessorBuilder, - ProcessorKind, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, PATTERN_NAME, + yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, ProcessorBuilder, ProcessorKind, + FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, PATTERN_NAME, }; use crate::etl::value::Value; @@ -46,25 +51,25 @@ impl ProcessorBuilder for GsubProcessorBuilder { self.fields.iter().map(|f| f.input_field()).collect() } - fn build(self, intermediate_keys: &[String]) -> Result { + fn build(self, intermediate_keys: &[String]) -> Result { self.build(intermediate_keys).map(ProcessorKind::Gsub) } } impl GsubProcessorBuilder { - fn check(self) -> Result { + fn check(self) -> Result { if self.pattern.is_none() { - return Err("pattern is required".to_string()); + return GsubPatternRequiredSnafu.fail(); } if self.replacement.is_none() { - return Err("replacement is required".to_string()); + return GsubReplacementRequiredSnafu.fail(); } Ok(self) } - fn build(self, intermediate_keys: &[String]) -> Result { + fn build(self, intermediate_keys: &[String]) -> Result { let mut real_fields = vec![]; for field in self.fields.into_iter() { let input = OneInputOneOutputField::build( @@ -94,19 +99,19 @@ pub struct GsubProcessor { } impl GsubProcessor { - fn check(self) -> Result { + fn check(self) -> Result { if self.pattern.is_none() { - return Err("pattern is required".to_string()); + return GsubPatternRequiredSnafu.fail(); } if self.replacement.is_none() { - return Err("replacement is required".to_string()); + return GsubReplacementRequiredSnafu.fail(); } Ok(self) } - fn process_string(&self, val: &str) -> Result { + fn process_string(&self, val: &str) -> Result { let replacement = self.replacement.as_ref().unwrap(); let new_val = self .pattern @@ -119,21 +124,26 @@ impl GsubProcessor { Ok(val) } - fn process(&self, val: &Value) -> Result { + fn process(&self, val: &Value) -> Result { match val { Value::String(val) => self.process_string(val), - _ => Err(format!( - "{} processor: expect string or array string, but got {val:?}", - self.kind() - )), + _ => ProcessorExpectStringSnafu { + processor: PROCESSOR_GSUB, + v: val.clone(), + } + .fail(), + // Err(format!( + // "{} processor: expect string or array string, but got {val:?}", + // self.kind() + // )), } } } impl TryFrom<&yaml_rust::yaml::Hash> for GsubProcessorBuilder { - type Error = String; + type Error = Error; - fn try_from(value: &yaml_rust::yaml::Hash) -> Result { + fn try_from(value: &yaml_rust::yaml::Hash) -> Result { let mut fields = Fields::default(); let mut ignore_missing = false; let mut pattern = None; @@ -142,7 +152,8 @@ impl TryFrom<&yaml_rust::yaml::Hash> for GsubProcessorBuilder { for (k, v) in value.iter() { let key = k .as_str() - .ok_or(format!("key must be a string, but got {k:?}"))?; + .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?; + match key { FIELD_NAME => { fields = Fields::one(yaml_new_field(v, FIELD_NAME)?); @@ -152,7 +163,9 @@ impl TryFrom<&yaml_rust::yaml::Hash> for GsubProcessorBuilder { } PATTERN_NAME => { let pattern_str = yaml_string(v, PATTERN_NAME)?; - pattern = Some(Regex::new(&pattern_str).map_err(|e| e.to_string())?); + pattern = Some(Regex::new(&pattern_str).context(RegexSnafu { + pattern: pattern_str, + })?); } REPLACEMENT_NAME => { let replacement_str = yaml_string(v, REPLACEMENT_NAME)?; @@ -187,17 +200,17 @@ impl crate::etl::processor::Processor for GsubProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut Vec) -> Result<(), String> { + fn exec_mut(&self, val: &mut Vec) -> Result<()> { for field in self.fields.iter() { let index = field.input_index(); match val.get(index) { Some(Value::Null) | None => { if !self.ignore_missing { - return Err(format!( - "{} processor: missing field: {}", - self.kind(), - field.input_name() - )); + return ProcessorMissingFieldSnafu { + processor: self.kind(), + field: field.input_name(), + } + .fail(); } } Some(v) => { diff --git a/src/pipeline/src/etl/processor/join.rs b/src/pipeline/src/etl/processor/join.rs index d4b309d5c2e2..ddbc086ab8da 100644 --- a/src/pipeline/src/etl/processor/join.rs +++ b/src/pipeline/src/etl/processor/join.rs @@ -13,7 +13,12 @@ // limitations under the License. use ahash::HashSet; +use snafu::OptionExt; +use crate::etl::error::{ + Error, JoinSeparatorRequiredSnafu, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, + ProcessorMissingFieldSnafu, Result, +}; use crate::etl::field::{Fields, OneInputOneOutputField}; use crate::etl::processor::{ yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, ProcessorBuilder, @@ -42,21 +47,21 @@ impl ProcessorBuilder for JoinProcessorBuilder { self.fields.iter().map(|f| f.input_field()).collect() } - fn build(self, intermediate_keys: &[String]) -> Result { + fn build(self, intermediate_keys: &[String]) -> Result { self.build(intermediate_keys).map(ProcessorKind::Join) } } impl JoinProcessorBuilder { - fn check(self) -> Result { + fn check(self) -> Result { if self.separator.is_none() { - return Err("separator is required".to_string()); + return JoinSeparatorRequiredSnafu.fail(); } Ok(self) } - pub fn build(self, intermediate_keys: &[String]) -> Result { + pub fn build(self, intermediate_keys: &[String]) -> Result { let mut real_fields = vec![]; for field in self.fields.into_iter() { let input = OneInputOneOutputField::build( @@ -85,7 +90,7 @@ pub struct JoinProcessor { } impl JoinProcessor { - fn process(&self, arr: &Array) -> Result { + fn process(&self, arr: &Array) -> Result { let sep = self.separator.as_ref().unwrap(); let val = arr .iter() @@ -96,9 +101,9 @@ impl JoinProcessor { Ok(Value::String(val)) } - fn check(self) -> Result { + fn check(self) -> Result { if self.separator.is_none() { - return Err("separator is required".to_string()); + return JoinSeparatorRequiredSnafu.fail(); } Ok(self) @@ -106,9 +111,9 @@ impl JoinProcessor { } impl TryFrom<&yaml_rust::yaml::Hash> for JoinProcessorBuilder { - type Error = String; + type Error = Error; - fn try_from(value: &yaml_rust::yaml::Hash) -> Result { + fn try_from(value: &yaml_rust::yaml::Hash) -> Result { let mut fields = Fields::default(); let mut separator = None; let mut ignore_missing = false; @@ -116,7 +121,8 @@ impl TryFrom<&yaml_rust::yaml::Hash> for JoinProcessorBuilder { for (k, v) in value.iter() { let key = k .as_str() - .ok_or(format!("key must be a string, but got {k:?}"))?; + .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?; + match key { FIELD_NAME => { fields = Fields::one(yaml_new_field(v, FIELD_NAME)?); @@ -152,7 +158,7 @@ impl Processor for JoinProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut Vec) -> Result<(), String> { + fn exec_mut(&self, val: &mut Vec) -> Result<()> { for field in self.fields.iter() { let index = field.input_index(); match val.get(index) { @@ -163,18 +169,19 @@ impl Processor for JoinProcessor { } Some(Value::Null) | None => { if !self.ignore_missing { - return Err(format!( - "{} processor: missing field: {}", - self.kind(), - field.input_name() - )); + return ProcessorMissingFieldSnafu { + processor: self.kind(), + field: field.input_name(), + } + .fail(); } } Some(v) => { - return Err(format!( - "{} processor: expect string value, but got {v:?}", - self.kind() - )); + return ProcessorExpectStringSnafu { + processor: self.kind(), + v: v.clone(), + } + .fail(); } } } diff --git a/src/pipeline/src/etl/processor/letter.rs b/src/pipeline/src/etl/processor/letter.rs index f388b5a2a9de..8eb939918104 100644 --- a/src/pipeline/src/etl/processor/letter.rs +++ b/src/pipeline/src/etl/processor/letter.rs @@ -13,7 +13,12 @@ // limitations under the License. use ahash::HashSet; +use snafu::OptionExt; +use crate::etl::error::{ + Error, KeyMustBeStringSnafu, LetterInvalidMethodSnafu, ProcessorExpectStringSnafu, + ProcessorMissingFieldSnafu, Result, +}; use crate::etl::field::{Fields, OneInputOneOutputField}; use crate::etl::processor::{ yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, ProcessorBuilder, @@ -42,14 +47,14 @@ impl std::fmt::Display for Method { } impl std::str::FromStr for Method { - type Err = String; + type Err = Error; - fn from_str(s: &str) -> Result { + fn from_str(s: &str) -> Result { match s.to_lowercase().as_str() { "upper" => Ok(Method::Upper), "lower" => Ok(Method::Lower), "capital" => Ok(Method::Capital), - _ => Err(format!("invalid method: {s}")), + _ => LetterInvalidMethodSnafu { method: s }.fail(), } } } @@ -73,13 +78,13 @@ impl ProcessorBuilder for LetterProcessorBuilder { self.fields.iter().map(|f| f.input_field()).collect() } - fn build(self, intermediate_keys: &[String]) -> Result { + fn build(self, intermediate_keys: &[String]) -> Result { self.build(intermediate_keys).map(ProcessorKind::Letter) } } impl LetterProcessorBuilder { - pub fn build(self, intermediate_keys: &[String]) -> Result { + pub fn build(self, intermediate_keys: &[String]) -> Result { let mut real_fields = vec![]; for field in self.fields.into_iter() { let input = OneInputOneOutputField::build( @@ -108,7 +113,7 @@ pub struct LetterProcessor { } impl LetterProcessor { - fn process_field(&self, val: &str) -> Result { + fn process_field(&self, val: &str) -> Result { let processed = match self.method { Method::Upper => val.to_uppercase(), Method::Lower => val.to_lowercase(), @@ -121,9 +126,9 @@ impl LetterProcessor { } impl TryFrom<&yaml_rust::yaml::Hash> for LetterProcessorBuilder { - type Error = String; + type Error = Error; - fn try_from(value: &yaml_rust::yaml::Hash) -> Result { + fn try_from(value: &yaml_rust::yaml::Hash) -> Result { let mut fields = Fields::default(); let mut method = Method::Lower; let mut ignore_missing = false; @@ -131,7 +136,7 @@ impl TryFrom<&yaml_rust::yaml::Hash> for LetterProcessorBuilder { for (k, v) in value.iter() { let key = k .as_str() - .ok_or(format!("key must be a string, but got {k:?}"))?; + .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?; match key { FIELD_NAME => { fields = Fields::one(yaml_new_field(v, FIELD_NAME)?); @@ -166,7 +171,7 @@ impl Processor for LetterProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut Vec) -> Result<(), String> { + fn exec_mut(&self, val: &mut Vec) -> Result<()> { for field in self.fields.iter() { let index = field.input_index(); match val.get(index) { @@ -177,18 +182,19 @@ impl Processor for LetterProcessor { } Some(Value::Null) | None => { if !self.ignore_missing { - return Err(format!( - "{} processor: missing field: {}", - self.kind(), - &field.input().name - )); + return ProcessorMissingFieldSnafu { + processor: self.kind(), + field: field.input_name(), + } + .fail(); } } Some(v) => { - return Err(format!( - "{} processor: expect string value, but got {v:?}", - self.kind() - )); + return ProcessorExpectStringSnafu { + processor: self.kind(), + v: v.clone(), + } + .fail(); } } } diff --git a/src/pipeline/src/etl/processor/regex.rs b/src/pipeline/src/etl/processor/regex.rs index a1de2ea76d34..a74c19140c19 100644 --- a/src/pipeline/src/etl/processor/regex.rs +++ b/src/pipeline/src/etl/processor/regex.rs @@ -21,7 +21,13 @@ pub(crate) const PROCESSOR_REGEX: &str = "regex"; use ahash::{HashSet, HashSetExt}; use lazy_static::lazy_static; use regex::Regex; +use snafu::{OptionExt, ResultExt}; +use crate::etl::error::{ + Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, + RegexNamedGroupNotFoundSnafu, RegexNoValidFieldSnafu, RegexNoValidPatternSnafu, RegexSnafu, + Result, +}; use crate::etl::field::{Fields, InputFieldInfo, OneInputMultiOutputField}; use crate::etl::find_key_index; use crate::etl::processor::{ @@ -60,15 +66,15 @@ impl std::fmt::Display for GroupRegex { } impl std::str::FromStr for GroupRegex { - type Err = String; + type Err = Error; - fn from_str(origin: &str) -> Result { + fn from_str(origin: &str) -> Result { let groups = get_regex_group_names(origin); if groups.is_empty() { - return Err(format!("no named group found in regex {origin}")); + return RegexNamedGroupNotFoundSnafu { origin }.fail(); } - let regex = Regex::new(origin).map_err(|e| e.to_string())?; + let regex = Regex::new(origin).context(RegexSnafu { pattern: origin })?; Ok(GroupRegex { origin: origin.into(), regex, @@ -94,25 +100,25 @@ impl ProcessorBuilder for RegexProcessorBuilder { self.fields.iter().map(|f| f.input_field()).collect() } - fn build(self, intermediate_keys: &[String]) -> Result { + fn build(self, intermediate_keys: &[String]) -> Result { self.build(intermediate_keys).map(ProcessorKind::Regex) } } impl RegexProcessorBuilder { - fn check(self) -> Result { + fn check(self) -> Result { if self.fields.is_empty() { - return Err(format!( - "no valid field found in {} processor", - PROCESSOR_REGEX - )); + return RegexNoValidFieldSnafu { + processor: PROCESSOR_REGEX, + } + .fail(); } if self.patterns.is_empty() { - return Err(format!( - "no valid pattern found in {} processor", - PROCESSOR_REGEX - )); + return RegexNoValidPatternSnafu { + processor: PROCESSOR_REGEX, + } + .fail(); } Ok(self) @@ -122,7 +128,7 @@ impl RegexProcessorBuilder { group_regex: &GroupRegex, om_field: &OneInputMultiOutputField, intermediate_keys: &[String], - ) -> Result, String> { + ) -> Result> { group_regex .groups .iter() @@ -135,35 +141,35 @@ impl RegexProcessorBuilder { index, }) }) - .collect::, String>>() + .collect::>>() } fn build_group_output_infos( patterns: &[GroupRegex], om_field: &OneInputMultiOutputField, intermediate_keys: &[String], - ) -> Result>, String> { + ) -> Result>> { patterns .iter() .map(|group_regex| { Self::build_group_output_info(group_regex, om_field, intermediate_keys) }) - .collect::, String>>() + .collect::>>() } fn build_output_info( real_fields: &[OneInputMultiOutputField], patterns: &[GroupRegex], intermediate_keys: &[String], - ) -> Result { + ) -> Result { let inner = real_fields .iter() .map(|om_field| Self::build_group_output_infos(patterns, om_field, intermediate_keys)) - .collect::, String>>(); + .collect::>>(); inner.map(|inner| RegexProcessorOutputInfo { inner }) } - fn build(self, intermediate_keys: &[String]) -> Result { + fn build(self, intermediate_keys: &[String]) -> Result { let mut real_fields = vec![]; for field in self.fields.into_iter() { let input_index = find_key_index(intermediate_keys, field.input_field(), "regex")?; @@ -184,9 +190,9 @@ impl RegexProcessorBuilder { } impl TryFrom<&yaml_rust::yaml::Hash> for RegexProcessorBuilder { - type Error = String; + type Error = Error; - fn try_from(value: &yaml_rust::yaml::Hash) -> Result { + fn try_from(value: &yaml_rust::yaml::Hash) -> Result { let mut fields = Fields::default(); let mut patterns: Vec = vec![]; let mut ignore_missing = false; @@ -194,7 +200,7 @@ impl TryFrom<&yaml_rust::yaml::Hash> for RegexProcessorBuilder { for (k, v) in value.iter() { let key = k .as_str() - .ok_or(format!("key must be a string, but got {k:?}"))?; + .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?; match key { FIELD_NAME => { fields = Fields::one(yaml_new_field(v, FIELD_NAME)?); @@ -275,7 +281,7 @@ pub struct RegexProcessor { } impl RegexProcessor { - fn try_with_patterns(&mut self, patterns: Vec) -> Result<(), String> { + fn try_with_patterns(&mut self, patterns: Vec) -> Result<()> { let mut rs = vec![]; for pattern in patterns { let gr = pattern.parse()?; @@ -290,7 +296,7 @@ impl RegexProcessor { val: &str, gr: &GroupRegex, index: (usize, usize), - ) -> Result, String> { + ) -> Result> { let mut result = Vec::new(); if let Some(captures) = gr.regex.captures(val) { for (group_index, group) in gr.groups.iter().enumerate() { @@ -316,7 +322,7 @@ impl Processor for RegexProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut Vec) -> Result<(), String> { + fn exec_mut(&self, val: &mut Vec) -> Result<()> { for (field_index, field) in self.fields.iter().enumerate() { let index = field.input_index(); let mut result_list = None; @@ -346,18 +352,19 @@ impl Processor for RegexProcessor { } Some(Value::Null) | None => { if !self.ignore_missing { - return Err(format!( - "{} processor: missing field: {}", - self.kind(), - field.input_name() - )); + return ProcessorMissingFieldSnafu { + processor: self.kind(), + field: field.input_name(), + } + .fail(); } } Some(v) => { - return Err(format!( - "{} processor: expect string value, but got {v:?}", - self.kind() - )); + return ProcessorExpectStringSnafu { + processor: self.kind(), + v: v.clone(), + } + .fail(); } } // safety here diff --git a/src/pipeline/src/etl/processor/timestamp.rs b/src/pipeline/src/etl/processor/timestamp.rs index 7ab9571101e3..3f9621c19deb 100644 --- a/src/pipeline/src/etl/processor/timestamp.rs +++ b/src/pipeline/src/etl/processor/timestamp.rs @@ -18,7 +18,14 @@ use ahash::HashSet; use chrono::{DateTime, NaiveDateTime}; use chrono_tz::Tz; use lazy_static::lazy_static; +use snafu::{OptionExt, ResultExt}; +use crate::etl::error::{ + DateFailedToGetLocalTimezoneSnafu, DateFailedToGetTimestampSnafu, DateInvalidFormatSnafu, + DateParseSnafu, DateParseTimezoneSnafu, EpochInvalidResolutionSnafu, Error, + KeyMustBeStringSnafu, ProcessorFailedToParseStringSnafu, ProcessorMissingFieldSnafu, + ProcessorUnsupportedValueSnafu, Result, +}; use crate::etl::field::{Fields, OneInputOneOutputField}; use crate::etl::processor::{ yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, yaml_strings, Processor, @@ -69,15 +76,15 @@ enum Resolution { } impl TryFrom<&str> for Resolution { - type Error = String; + type Error = Error; - fn try_from(s: &str) -> Result { + fn try_from(s: &str) -> Result { match s { SECOND_RESOLUTION | SEC_RESOLUTION | S_RESOLUTION => Ok(Resolution::Second), MILLISECOND_RESOLUTION | MILLI_RESOLUTION | MS_RESOLUTION => Ok(Resolution::Milli), MICROSECOND_RESOLUTION | MICRO_RESOLUTION | US_RESOLUTION => Ok(Resolution::Micro), NANOSECOND_RESOLUTION | NANO_RESOLUTION | NS_RESOLUTION => Ok(Resolution::Nano), - _ => Err(format!("invalid resolution: {s}")), + _ => EpochInvalidResolutionSnafu { resolution: s }.fail(), } } } @@ -127,13 +134,13 @@ impl ProcessorBuilder for TimestampProcessorBuilder { self.fields.iter().map(|f| f.input_field()).collect() } - fn build(self, intermediate_keys: &[String]) -> Result { + fn build(self, intermediate_keys: &[String]) -> Result { self.build(intermediate_keys).map(ProcessorKind::Timestamp) } } impl TimestampProcessorBuilder { - pub fn build(self, intermediate_keys: &[String]) -> Result { + pub fn build(self, intermediate_keys: &[String]) -> Result { let mut real_fields = vec![]; for field in self.fields.into_iter() { let input = OneInputOneOutputField::build( @@ -169,29 +176,37 @@ pub struct TimestampProcessor { impl TimestampProcessor { /// try to parse val with timezone first, if failed, parse without timezone - fn try_parse(val: &str, fmt: &str, tz: Tz) -> Result { + fn try_parse(val: &str, fmt: &str, tz: Tz) -> Result { if let Ok(dt) = DateTime::parse_from_str(val, fmt) { - Ok(dt.timestamp_nanos_opt().ok_or("failed to get timestamp")?) + Ok(dt + .timestamp_nanos_opt() + .context(DateFailedToGetTimestampSnafu)?) } else { let dt = NaiveDateTime::parse_from_str(val, fmt) - .map_err(|e| e.to_string())? + .context(DateParseSnafu { value: val })? .and_local_timezone(tz) .single() - .ok_or("failed to get local timezone")?; - Ok(dt.timestamp_nanos_opt().ok_or("failed to get timestamp")?) + .context(DateFailedToGetLocalTimezoneSnafu)?; + Ok(dt + .timestamp_nanos_opt() + .context(DateFailedToGetTimestampSnafu)?) } } - fn parse_time_str(&self, val: &str) -> Result { + fn parse_time_str(&self, val: &str) -> Result { for (fmt, tz) in self.formats.iter() { if let Ok(ns) = Self::try_parse(val, fmt, *tz) { return Ok(ns); } } - Err(format!("{} processor: failed to parse {val}", self.kind(),)) + ProcessorFailedToParseStringSnafu { + kind: PROCESSOR_TIMESTAMP, + value: val.to_string(), + } + .fail() } - fn parse(&self, val: &Value) -> Result { + fn parse(&self, val: &Value) -> Result { let t: i64 = match val { Value::String(s) => { let t = s.parse::(); @@ -221,9 +236,11 @@ impl TimestampProcessor { }, _ => { - return Err(format!( - "{PROCESSOR_TIMESTAMP} processor: unsupported value {val}" - )) + return ProcessorUnsupportedValueSnafu { + processor: PROCESSOR_TIMESTAMP, + val: val.to_string(), + } + .fail(); } }; @@ -236,7 +253,7 @@ impl TimestampProcessor { } } -fn parse_formats(yaml: &yaml_rust::yaml::Yaml) -> Result, Tz)>, String> { +fn parse_formats(yaml: &yaml_rust::yaml::Yaml) -> Result, Tz)>> { return match yaml.as_vec() { Some(formats_yaml) => { let mut formats = Vec::with_capacity(formats_yaml.len()); @@ -244,32 +261,38 @@ fn parse_formats(yaml: &yaml_rust::yaml::Yaml) -> Result, Tz)>, let s = yaml_strings(v, FORMATS_NAME) .or(yaml_string(v, FORMATS_NAME).map(|s| vec![s]))?; if s.len() != 1 && s.len() != 2 { - return Err(format!( - "{PROCESSOR_TIMESTAMP} processor: invalid format {s:?}" - )); + return DateInvalidFormatSnafu { + processor: PROCESSOR_TIMESTAMP, + s: format!("{s:?}"), + } + .fail(); } let mut iter = s.into_iter(); // safety: unwrap is safe here let formatter = iter.next().unwrap(); let tz = iter .next() - .map(|tz| tz.parse::()) - .unwrap_or(Ok(Tz::UTC)) - .map_err(|e| e.to_string())?; + .map(|tz| { + tz.parse::() + .context(DateParseTimezoneSnafu { value: tz }) + }) + .unwrap_or(Ok(Tz::UTC))?; formats.push((Arc::new(formatter), tz)); } Ok(formats) } - None => Err(format!( - "{PROCESSOR_TIMESTAMP} processor: invalid format {yaml:?}" - )), + None => DateInvalidFormatSnafu { + processor: PROCESSOR_TIMESTAMP, + s: format!("{yaml:?}"), + } + .fail(), }; } impl TryFrom<&yaml_rust::yaml::Hash> for TimestampProcessorBuilder { - type Error = String; + type Error = Error; - fn try_from(hash: &yaml_rust::yaml::Hash) -> Result { + fn try_from(hash: &yaml_rust::yaml::Hash) -> Result { let mut fields = Fields::default(); let mut formats = Formats::default(); let mut resolution = Resolution::default(); @@ -278,7 +301,7 @@ impl TryFrom<&yaml_rust::yaml::Hash> for TimestampProcessorBuilder { for (k, v) in hash { let key = k .as_str() - .ok_or(format!("key must be a string, but got {k:?}"))?; + .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?; match key { FIELD_NAME => { @@ -321,17 +344,17 @@ impl Processor for TimestampProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut Vec) -> Result<(), String> { + fn exec_mut(&self, val: &mut Vec) -> Result<()> { for field in self.fields.iter() { let index = field.input().index; match val.get(index) { Some(Value::Null) | None => { if !self.ignore_missing { - return Err(format!( - "{} processor: missing field: {}", - self.kind(), - &field.input().name - )); + return ProcessorMissingFieldSnafu { + processor: self.kind(), + field: field.input_name(), + } + .fail(); } } Some(v) => { diff --git a/src/pipeline/src/etl/processor/urlencoding.rs b/src/pipeline/src/etl/processor/urlencoding.rs index 7db9d092f254..ca42aae23677 100644 --- a/src/pipeline/src/etl/processor/urlencoding.rs +++ b/src/pipeline/src/etl/processor/urlencoding.rs @@ -13,8 +13,13 @@ // limitations under the License. use ahash::HashSet; +use snafu::{OptionExt, ResultExt}; use urlencoding::{decode, encode}; +use crate::etl::error::{ + Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result, + UrlEncodingDecodeSnafu, UrlEncodingInvalidMethodSnafu, +}; use crate::etl::field::{Fields, OneInputOneOutputField}; use crate::etl::processor::{ yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, ProcessorBuilder, ProcessorKind, @@ -41,13 +46,13 @@ impl std::fmt::Display for Method { } impl std::str::FromStr for Method { - type Err = String; + type Err = Error; - fn from_str(s: &str) -> Result { + fn from_str(s: &str) -> Result { match s { "decode" => Ok(Method::Decode), "encode" => Ok(Method::Encode), - _ => Err(format!("invalid method: {s}")), + _ => UrlEncodingInvalidMethodSnafu { s }.fail(), } } } @@ -71,14 +76,14 @@ impl ProcessorBuilder for UrlEncodingProcessorBuilder { self.fields.iter().map(|f| f.input_field()).collect() } - fn build(self, intermediate_keys: &[String]) -> Result { + fn build(self, intermediate_keys: &[String]) -> Result { self.build(intermediate_keys) .map(ProcessorKind::UrlEncoding) } } impl UrlEncodingProcessorBuilder { - fn build(self, intermediate_keys: &[String]) -> Result { + fn build(self, intermediate_keys: &[String]) -> Result { let mut real_fields = vec![]; for field in self.fields.into_iter() { let input = OneInputOneOutputField::build( @@ -106,19 +111,19 @@ pub struct UrlEncodingProcessor { } impl UrlEncodingProcessor { - fn process_field(&self, val: &str) -> Result { + fn process_field(&self, val: &str) -> Result { let processed = match self.method { Method::Encode => encode(val).to_string(), - Method::Decode => decode(val).map_err(|e| e.to_string())?.into_owned(), + Method::Decode => decode(val).context(UrlEncodingDecodeSnafu)?.into_owned(), }; Ok(Value::String(processed)) } } impl TryFrom<&yaml_rust::yaml::Hash> for UrlEncodingProcessorBuilder { - type Error = String; + type Error = Error; - fn try_from(value: &yaml_rust::yaml::Hash) -> Result { + fn try_from(value: &yaml_rust::yaml::Hash) -> Result { let mut fields = Fields::default(); let mut method = Method::Decode; let mut ignore_missing = false; @@ -126,7 +131,7 @@ impl TryFrom<&yaml_rust::yaml::Hash> for UrlEncodingProcessorBuilder { for (k, v) in value.iter() { let key = k .as_str() - .ok_or(format!("key must be a string, but got {k:?}"))?; + .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?; match key { FIELD_NAME => { fields = Fields::one(yaml_new_field(v, FIELD_NAME)?); @@ -166,7 +171,7 @@ impl crate::etl::processor::Processor for UrlEncodingProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut Vec) -> Result<(), String> { + fn exec_mut(&self, val: &mut Vec) -> Result<()> { for field in self.fields.iter() { let index = field.input_index(); match val.get(index) { @@ -177,18 +182,19 @@ impl crate::etl::processor::Processor for UrlEncodingProcessor { } Some(Value::Null) | None => { if !self.ignore_missing { - return Err(format!( - "{} processor: missing field: {}", - self.kind(), - field.output_name() - )); + return ProcessorMissingFieldSnafu { + processor: self.kind(), + field: field.input_name(), + } + .fail(); } } Some(v) => { - return Err(format!( - "{} processor: expect string value, but got {v:?}", - self.kind() - )); + return ProcessorExpectStringSnafu { + processor: self.kind(), + v: v.clone(), + } + .fail(); } } } diff --git a/src/pipeline/src/etl/transform.rs b/src/pipeline/src/etl/transform.rs index 15d1bf337843..4408523c27b8 100644 --- a/src/pipeline/src/etl/transform.rs +++ b/src/pipeline/src/etl/transform.rs @@ -16,7 +16,9 @@ pub mod index; pub mod transformer; use itertools::Itertools; +use snafu::OptionExt; +use crate::etl::error::{Error, Result}; use crate::etl::find_key_index; use crate::etl::processor::yaml_string; use crate::etl::transform::index::Index; @@ -31,6 +33,10 @@ const TRANSFORM_ON_FAILURE: &str = "on_failure"; pub use transformer::greptime::GreptimeTransformer; +use super::error::{ + KeyMustBeStringSnafu, TransformElementMustBeMapSnafu, TransformOnFailureInvalidValueSnafu, + TransformTypeMustBeSetSnafu, +}; use super::field::{Fields, InputFieldInfo, OneInputOneOutputField}; use super::processor::{yaml_new_field, yaml_new_fields}; @@ -38,11 +44,11 @@ pub trait Transformer: std::fmt::Display + Sized + Send + Sync + 'static { type Output; type VecOutput; - fn new(transforms: Transforms) -> Result; + fn new(transforms: Transforms) -> Result; fn schemas(&self) -> &Vec; fn transforms(&self) -> &Transforms; fn transforms_mut(&mut self) -> &mut Transforms; - fn transform_mut(&self, val: &mut Vec) -> Result; + fn transform_mut(&self, val: &mut Vec) -> Result; } /// On Failure behavior when transform fails @@ -57,13 +63,13 @@ pub enum OnFailure { } impl std::str::FromStr for OnFailure { - type Err = String; + type Err = Error; - fn from_str(s: &str) -> Result { + fn from_str(s: &str) -> Result { match s { "ignore" => Ok(OnFailure::Ignore), "default" => Ok(OnFailure::Default), - _ => Err(format!("invalid transform on_failure value: {}", s)), + _ => TransformOnFailureInvalidValueSnafu { value: s }.fail(), } } } @@ -139,16 +145,16 @@ impl std::ops::DerefMut for Transforms { } impl TryFrom<&Vec> for TransformBuilders { - type Error = String; + type Error = Error; - fn try_from(docs: &Vec) -> Result { + fn try_from(docs: &Vec) -> Result { let mut transforms = Vec::with_capacity(100); let mut all_output_keys: Vec = Vec::with_capacity(100); let mut all_required_keys = Vec::with_capacity(100); for doc in docs { let transform_builder: TransformBuilder = doc .as_hash() - .ok_or("transform element must be a map".to_string())? + .context(TransformElementMustBeMapSnafu)? .try_into()?; let mut transform_output_keys = transform_builder .fields @@ -187,11 +193,7 @@ pub struct TransformBuilder { } impl TransformBuilder { - pub fn build( - self, - intermediate_keys: &[String], - output_keys: &[String], - ) -> Result { + pub fn build(self, intermediate_keys: &[String], output_keys: &[String]) -> Result { let mut real_fields = vec![]; for field in self.fields { let input_index = find_key_index(intermediate_keys, field.input_field(), "transform")?; @@ -277,9 +279,9 @@ impl Transform { } impl TryFrom<&yaml_rust::yaml::Hash> for TransformBuilder { - type Error = String; + type Error = Error; - fn try_from(hash: &yaml_rust::yaml::Hash) -> Result { + fn try_from(hash: &yaml_rust::yaml::Hash) -> Result { let mut fields = Fields::default(); let mut type_ = Value::Null; let mut default = None; @@ -287,7 +289,9 @@ impl TryFrom<&yaml_rust::yaml::Hash> for TransformBuilder { let mut on_failure = None; for (k, v) in hash { - let key = k.as_str().ok_or("key must be a string")?; + let key = k + .as_str() + .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?; match key { TRANSFORM_FIELD => { fields = Fields::one(yaml_new_field(v, TRANSFORM_FIELD)?); @@ -324,10 +328,11 @@ impl TryFrom<&yaml_rust::yaml::Hash> for TransformBuilder { if let Some(default_value) = default { match (&type_, &default_value) { (Value::Null, _) => { - return Err(format!( - "transform {:?} type MUST BE set before default {}", - fields, &default_value, - )); + return TransformTypeMustBeSetSnafu { + fields: format!("{:?}", fields), + default: default_value.to_string(), + } + .fail(); } (_, Value::Null) => {} // if default is not set, then it will be regarded as default null (_, _) => { diff --git a/src/pipeline/src/etl/transform/index.rs b/src/pipeline/src/etl/transform/index.rs index f26ca4828abe..6af41990a072 100644 --- a/src/pipeline/src/etl/transform/index.rs +++ b/src/pipeline/src/etl/transform/index.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::etl::error::{Error, Result, UnsupportedIndexTypeSnafu}; + const INDEX_TIMESTAMP: &str = "timestamp"; const INDEX_TIMEINDEX: &str = "time"; const INDEX_TAG: &str = "tag"; @@ -38,22 +40,22 @@ impl std::fmt::Display for Index { } impl TryFrom for Index { - type Error = String; + type Error = Error; - fn try_from(value: String) -> Result { + fn try_from(value: String) -> Result { Index::try_from(value.as_str()) } } impl TryFrom<&str> for Index { - type Error = String; + type Error = Error; - fn try_from(value: &str) -> Result { + fn try_from(value: &str) -> Result { match value { INDEX_TIMESTAMP | INDEX_TIMEINDEX => Ok(Index::Time), INDEX_TAG => Ok(Index::Tag), INDEX_FULLTEXT => Ok(Index::Fulltext), - _ => Err(format!("unsupported index type: {}", value)), + _ => UnsupportedIndexTypeSnafu { value }.fail(), } } } diff --git a/src/pipeline/src/etl/transform/transformer/greptime.rs b/src/pipeline/src/etl/transform/transformer/greptime.rs index 9753b0100483..000a2ddc26f9 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime.rs @@ -20,6 +20,10 @@ use coerce::{coerce_columns, coerce_value}; use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue}; use itertools::Itertools; +use crate::etl::error::{ + Result, TransformColumnNameMustBeUniqueSnafu, TransformEmptySnafu, + TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu, +}; use crate::etl::field::{InputFieldInfo, OneInputOneOutputField}; use crate::etl::transform::index::Index; use crate::etl::transform::{Transform, Transformer, Transforms}; @@ -71,7 +75,7 @@ impl GreptimeTransformer { } /// Generate the schema for the GreptimeTransformer - fn schemas(transforms: &Transforms) -> Result, String> { + fn schemas(transforms: &Transforms) -> Result> { let mut schema = vec![]; for transform in transforms.iter() { schema.extend(coerce_columns(transform)?); @@ -90,9 +94,9 @@ impl Transformer for GreptimeTransformer { type Output = Rows; type VecOutput = Row; - fn new(mut transforms: Transforms) -> Result { + fn new(mut transforms: Transforms) -> Result { if transforms.is_empty() { - return Err("transform cannot be empty".to_string()); + return TransformEmptySnafu.fail(); } let mut column_names_set = HashSet::new(); @@ -108,9 +112,7 @@ impl Transformer for GreptimeTransformer { let intersections: Vec<_> = column_names_set.intersection(&target_fields_set).collect(); if !intersections.is_empty() { let duplicates = intersections.iter().join(","); - return Err(format!( - "column name must be unique, but got duplicated: {duplicates}" - )); + return TransformColumnNameMustBeUniqueSnafu { duplicates }.fail(); } column_names_set.extend(target_fields_set); @@ -121,10 +123,14 @@ impl Transformer for GreptimeTransformer { 1 => timestamp_columns .push(transform.real_fields.first().unwrap().input_name()), _ => { - return Err(format!( - "Illegal to set multiple timestamp Index columns, please set only one: {}", - transform.real_fields.iter().map(|x|x.input_name()).join(", ") - )) + return TransformMultipleTimestampIndexSnafu { + columns: transform + .real_fields + .iter() + .map(|x| x.input_name()) + .join(", "), + } + .fail(); } } } @@ -145,14 +151,12 @@ impl Transformer for GreptimeTransformer { _ => { let columns: String = timestamp_columns.iter().map(|s| s.to_string()).join(", "); let count = timestamp_columns.len(); - Err( - format!("transform must have exactly one field specified as timestamp Index, but got {count}: {columns}") - ) + TransformTimestampIndexCountSnafu { count, columns }.fail() } } } - fn transform_mut(&self, val: &mut Vec) -> Result { + fn transform_mut(&self, val: &mut Vec) -> Result { let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()]; for transform in self.transforms.iter() { for field in transform.real_fields.iter() { @@ -160,8 +164,7 @@ impl Transformer for GreptimeTransformer { let output_index = field.output_index(); match val.get(index) { Some(v) => { - let value_data = coerce_value(v, transform) - .map_err(|e| format!("{} processor: {}", field.input_name(), e))?; + let value_data = coerce_value(v, transform)?; // every transform fields has only one output field values[output_index] = GreptimeValue { value_data }; } diff --git a/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs b/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs index 8c7efef22fd2..827613b02b60 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs @@ -17,17 +17,22 @@ use api::v1::ColumnOptions; use datatypes::schema::FulltextOptions; use greptime_proto::v1::value::ValueData; use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType}; +use snafu::ResultExt; +use crate::etl::error::{ + CoerceStringToTypeSnafu, CoerceUnsupportedEpochTypeSnafu, CoerceUnsupportedNullTypeSnafu, + CoerceUnsupportedNullTypeToSnafu, ColumnOptionsSnafu, Error, Result, +}; use crate::etl::transform::index::Index; use crate::etl::transform::{OnFailure, Transform}; use crate::etl::value::{Timestamp, Value}; impl TryFrom for ValueData { - type Error = String; + type Error = Error; - fn try_from(value: Value) -> Result { + fn try_from(value: Value) -> Result { match value { - Value::Null => Err("Null type not supported".to_string()), + Value::Null => CoerceUnsupportedNullTypeSnafu.fail(), Value::Int8(v) => Ok(ValueData::I32Value(v as i32)), Value::Int16(v) => Ok(ValueData::I32Value(v as i32)), @@ -63,7 +68,7 @@ impl TryFrom for ValueData { } // TODO(yuanbohan): add fulltext support in datatype_extension -pub(crate) fn coerce_columns(transform: &Transform) -> Result, String> { +pub(crate) fn coerce_columns(transform: &Transform) -> Result> { let mut columns = Vec::new(); for field in transform.real_fields.iter() { @@ -94,19 +99,19 @@ fn coerce_semantic_type(transform: &Transform) -> SemanticType { } } -fn coerce_options(transform: &Transform) -> Result, String> { +fn coerce_options(transform: &Transform) -> Result> { if let Some(Index::Fulltext) = transform.index { options_from_fulltext(&FulltextOptions { enable: true, ..Default::default() }) - .map_err(|e| e.to_string()) + .context(ColumnOptionsSnafu) } else { Ok(None) } } -fn coerce_type(transform: &Transform) -> Result { +fn coerce_type(transform: &Transform) -> Result { match transform.type_ { Value::Int8(_) => Ok(ColumnDataType::Int8), Value::Int16(_) => Ok(ColumnDataType::Int16), @@ -132,17 +137,14 @@ fn coerce_type(transform: &Transform) -> Result { Value::Array(_) => unimplemented!("Array"), Value::Map(_) => unimplemented!("Object"), - Value::Null => Err(format!( - "Null type not supported when to coerce '{}' type", - transform.type_.to_str_type() - )), + Value::Null => CoerceUnsupportedNullTypeToSnafu { + ty: transform.type_.to_str_type(), + } + .fail(), } } -pub(crate) fn coerce_value( - val: &Value, - transform: &Transform, -) -> Result, String> { +pub(crate) fn coerce_value(val: &Value, transform: &Transform) -> Result> { match val { Value::Null => match &transform.default { Some(default) => coerce_value(default, transform), @@ -190,7 +192,7 @@ pub(crate) fn coerce_value( } } -fn coerce_bool_value(b: bool, transform: &Transform) -> Result, String> { +fn coerce_bool_value(b: bool, transform: &Transform) -> Result> { let val = match transform.type_ { Value::Int8(_) => ValueData::I8Value(b as i32), Value::Int16(_) => ValueData::I16Value(b as i32), @@ -211,9 +213,11 @@ fn coerce_bool_value(b: bool, transform: &Transform) -> Result Value::Timestamp(_) => match transform.on_failure { Some(OnFailure::Ignore) => return Ok(None), Some(OnFailure::Default) => { - return Err("default value not supported for Epoch".to_string()) + return CoerceUnsupportedEpochTypeSnafu { ty: "Default" }.fail(); + } + None => { + return CoerceUnsupportedEpochTypeSnafu { ty: "Boolean" }.fail(); } - None => return Err("Boolean type not supported for Epoch".to_string()), }, Value::Array(_) => unimplemented!("Array type not supported"), @@ -225,7 +229,7 @@ fn coerce_bool_value(b: bool, transform: &Transform) -> Result Ok(Some(val)) } -fn coerce_i64_value(n: i64, transform: &Transform) -> Result, String> { +fn coerce_i64_value(n: i64, transform: &Transform) -> Result> { let val = match transform.type_ { Value::Int8(_) => ValueData::I8Value(n as i32), Value::Int16(_) => ValueData::I16Value(n as i32), @@ -246,9 +250,11 @@ fn coerce_i64_value(n: i64, transform: &Transform) -> Result, Value::Timestamp(_) => match transform.on_failure { Some(OnFailure::Ignore) => return Ok(None), Some(OnFailure::Default) => { - return Err("default value not supported for Epoch".to_string()) + return CoerceUnsupportedEpochTypeSnafu { ty: "Default" }.fail(); + } + None => { + return CoerceUnsupportedEpochTypeSnafu { ty: "Integer" }.fail(); } - None => return Err("Integer type not supported for Epoch".to_string()), }, Value::Array(_) => unimplemented!("Array type not supported"), @@ -260,7 +266,7 @@ fn coerce_i64_value(n: i64, transform: &Transform) -> Result, Ok(Some(val)) } -fn coerce_u64_value(n: u64, transform: &Transform) -> Result, String> { +fn coerce_u64_value(n: u64, transform: &Transform) -> Result> { let val = match transform.type_ { Value::Int8(_) => ValueData::I8Value(n as i32), Value::Int16(_) => ValueData::I16Value(n as i32), @@ -281,9 +287,11 @@ fn coerce_u64_value(n: u64, transform: &Transform) -> Result, Value::Timestamp(_) => match transform.on_failure { Some(OnFailure::Ignore) => return Ok(None), Some(OnFailure::Default) => { - return Err("default value not supported for Epoch".to_string()) + return CoerceUnsupportedEpochTypeSnafu { ty: "Default" }.fail(); + } + None => { + return CoerceUnsupportedEpochTypeSnafu { ty: "Integer" }.fail(); } - None => return Err("Integer type not supported for Epoch".to_string()), }, Value::Array(_) => unimplemented!("Array type not supported"), @@ -295,7 +303,7 @@ fn coerce_u64_value(n: u64, transform: &Transform) -> Result, Ok(Some(val)) } -fn coerce_f64_value(n: f64, transform: &Transform) -> Result, String> { +fn coerce_f64_value(n: f64, transform: &Transform) -> Result> { let val = match transform.type_ { Value::Int8(_) => ValueData::I8Value(n as i32), Value::Int16(_) => ValueData::I16Value(n as i32), @@ -316,9 +324,11 @@ fn coerce_f64_value(n: f64, transform: &Transform) -> Result, Value::Timestamp(_) => match transform.on_failure { Some(OnFailure::Ignore) => return Ok(None), Some(OnFailure::Default) => { - return Err("default value not supported for Epoch".to_string()) + return CoerceUnsupportedEpochTypeSnafu { ty: "Default" }.fail(); + } + None => { + return CoerceUnsupportedEpochTypeSnafu { ty: "Float" }.fail(); } - None => return Err("Float type not supported for Epoch".to_string()), }, Value::Array(_) => unimplemented!("Array type not supported"), @@ -340,17 +350,17 @@ macro_rules! coerce_string_value { Some(default) => coerce_value(default, $transform), None => coerce_value($transform.get_type_matched_default_val(), $transform), }, - None => Err(format!( - "failed to coerce string value '{}' to type '{}'", - $s, - $transform.type_.to_str_type() - )), + None => CoerceStringToTypeSnafu { + s: $s, + ty: $transform.type_.to_str_type(), + } + .fail(), }, } }; } -fn coerce_string_value(s: &String, transform: &Transform) -> Result, String> { +fn coerce_string_value(s: &String, transform: &Transform) -> Result> { match transform.type_ { Value::Int8(_) => { coerce_string_value!(s, transform, i32, I8Value) @@ -393,8 +403,8 @@ fn coerce_string_value(s: &String, transform: &Transform) -> Result match transform.on_failure { Some(OnFailure::Ignore) => Ok(None), - Some(OnFailure::Default) => Err("default value not supported for Epoch".to_string()), - None => Err("String type not supported for Epoch".to_string()), + Some(OnFailure::Default) => CoerceUnsupportedEpochTypeSnafu { ty: "Default" }.fail(), + None => CoerceUnsupportedEpochTypeSnafu { ty: "String" }.fail(), }, Value::Array(_) => unimplemented!("Array type not supported"), diff --git a/src/pipeline/src/etl/value.rs b/src/pipeline/src/etl/value.rs index 42390a9b73e5..3adde2514b9e 100644 --- a/src/pipeline/src/etl/value.rs +++ b/src/pipeline/src/etl/value.rs @@ -19,8 +19,16 @@ pub mod time; use ahash::{HashMap, HashMapExt}; pub use array::Array; pub use map::Map; +use snafu::{OptionExt, ResultExt}; pub use time::Timestamp; +use super::error::{ + ValueDefaultValueUnsupportedSnafu, ValueInvalidResolutionSnafu, ValueParseBooleanSnafu, + ValueParseFloatSnafu, ValueParseIntSnafu, ValueParseTypeSnafu, ValueUnsupportedNumberTypeSnafu, + ValueUnsupportedYamlTypeSnafu, ValueYamlKeyMustBeStringSnafu, +}; +use crate::etl::error::{Error, Result}; + /// Value can be used as type /// acts as value: the enclosed value is the actual value /// acts as type: the enclosed value is the default value @@ -58,7 +66,7 @@ impl Value { matches!(self, Value::Null) } - pub fn parse_str_type(t: &str) -> Result { + pub fn parse_str_type(t: &str) -> Result { let mut parts = t.splitn(2, ','); let head = parts.next().unwrap_or_default(); let tail = parts.next().map(|s| s.trim().to_string()); @@ -93,10 +101,11 @@ impl Value { time::SECOND_RESOLUTION | time::SEC_RESOLUTION | time::S_RESOLUTION => { Ok(Value::Timestamp(Timestamp::Second(0))) } - _ => Err(format!( - "invalid resolution: '{resolution}'. Available resolutions: {}", - time::VALID_RESOLUTIONS.join(",") - )), + _ => ValueInvalidResolutionSnafu { + resolution, + valid_resolution: time::VALID_RESOLUTIONS.join(","), + } + .fail(), }, _ => Ok(Value::Timestamp(Timestamp::Nanosecond(0))), }, @@ -104,65 +113,68 @@ impl Value { "array" => Ok(Value::Array(Array::default())), "map" => Ok(Value::Map(Map::default())), - _ => Err(format!("failed to parse type: '{t}'")), + _ => ValueParseTypeSnafu { t }.fail(), } } /// only support string, bool, number, null - pub fn parse_str_value(&self, v: &str) -> Result { + pub fn parse_str_value(&self, v: &str) -> Result { match self { Value::Int8(_) => v .parse::() .map(Value::Int8) - .map_err(|e| format!("failed to parse int8: {}", e)), + .context(ValueParseIntSnafu { ty: "int8", v }), Value::Int16(_) => v .parse::() .map(Value::Int16) - .map_err(|e| format!("failed to parse int16: {}", e)), + .context(ValueParseIntSnafu { ty: "int16", v }), Value::Int32(_) => v .parse::() .map(Value::Int32) - .map_err(|e| format!("failed to parse int32: {}", e)), + .context(ValueParseIntSnafu { ty: "int32", v }), Value::Int64(_) => v .parse::() .map(Value::Int64) - .map_err(|e| format!("failed to parse int64: {}", e)), + .context(ValueParseIntSnafu { ty: "int64", v }), Value::Uint8(_) => v .parse::() .map(Value::Uint8) - .map_err(|e| format!("failed to parse uint8: {}", e)), + .context(ValueParseIntSnafu { ty: "uint8", v }), Value::Uint16(_) => v .parse::() .map(Value::Uint16) - .map_err(|e| format!("failed to parse uint16: {}", e)), + .context(ValueParseIntSnafu { ty: "uint16", v }), Value::Uint32(_) => v .parse::() .map(Value::Uint32) - .map_err(|e| format!("failed to parse uint32: {}", e)), + .context(ValueParseIntSnafu { ty: "uint32", v }), Value::Uint64(_) => v .parse::() .map(Value::Uint64) - .map_err(|e| format!("failed to parse uint64: {}", e)), + .context(ValueParseIntSnafu { ty: "uint64", v }), Value::Float32(_) => v .parse::() .map(Value::Float32) - .map_err(|e| format!("failed to parse float32: {}", e)), + .context(ValueParseFloatSnafu { ty: "float32", v }), Value::Float64(_) => v .parse::() .map(Value::Float64) - .map_err(|e| format!("failed to parse float64: {}", e)), + .context(ValueParseFloatSnafu { ty: "float64", v }), Value::Boolean(_) => v .parse::() .map(Value::Boolean) - .map_err(|e| format!("failed to parse bool: {}", e)), + .context(ValueParseBooleanSnafu { ty: "boolean", v }), Value::String(_) => Ok(Value::String(v.to_string())), Value::Null => Ok(Value::Null), - _ => Err(format!("default value not unsupported for type {}", self)), + _ => ValueDefaultValueUnsupportedSnafu { + value: format!("{:?}", self), + } + .fail(), } } @@ -249,9 +261,9 @@ impl std::fmt::Display for Value { } impl TryFrom for Value { - type Error = String; + type Error = Error; - fn try_from(v: serde_json::Value) -> Result { + fn try_from(v: serde_json::Value) -> Result { match v { serde_json::Value::Null => Ok(Value::Null), serde_json::Value::Bool(v) => Ok(Value::Boolean(v)), @@ -263,7 +275,7 @@ impl TryFrom for Value { } else if let Some(v) = v.as_f64() { Ok(Value::Float64(v)) } else { - Err(format!("unsupported number type: {}", v)) + ValueUnsupportedNumberTypeSnafu { value: v }.fail() } } serde_json::Value::String(v) => Ok(Value::String(v)), @@ -286,20 +298,17 @@ impl TryFrom for Value { } impl TryFrom<&yaml_rust::Yaml> for Value { - type Error = String; + type Error = Error; - fn try_from(v: &yaml_rust::Yaml) -> Result { + fn try_from(v: &yaml_rust::Yaml) -> Result { match v { yaml_rust::Yaml::Null => Ok(Value::Null), yaml_rust::Yaml::Boolean(v) => Ok(Value::Boolean(*v)), yaml_rust::Yaml::Integer(v) => Ok(Value::Int64(*v)), - yaml_rust::Yaml::Real(v) => { - if let Ok(v) = v.parse() { - Ok(Value::Float64(v)) - } else { - Err(format!("failed to parse float64: {}", v)) - } - } + yaml_rust::Yaml::Real(v) => match v.parse::() { + Ok(v) => Ok(Value::Float64(v)), + Err(e) => Err(e).context(ValueParseFloatSnafu { ty: "float64", v }), + }, yaml_rust::Yaml::String(v) => Ok(Value::String(v.to_string())), yaml_rust::Yaml::Array(arr) => { let mut values = vec![]; @@ -313,12 +322,12 @@ impl TryFrom<&yaml_rust::Yaml> for Value { for (k, v) in v { let key = k .as_str() - .ok_or(format!("key in Hash must be a string, but got {v:?}"))?; + .with_context(|| ValueYamlKeyMustBeStringSnafu { value: v.clone() })?; values.insert(key.to_string(), Value::try_from(v)?); } Ok(Value::Map(Map { values })) } - _ => Err(format!("unsupported yaml type: {v:?}")), + _ => ValueUnsupportedYamlTypeSnafu { value: v.clone() }.fail(), } } } diff --git a/src/pipeline/src/lib.rs b/src/pipeline/src/lib.rs index 86367232bd06..8fc72c584484 100644 --- a/src/pipeline/src/lib.rs +++ b/src/pipeline/src/lib.rs @@ -16,6 +16,7 @@ mod etl; mod manager; mod metrics; +pub use etl::error::Result; pub use etl::processor::Processor; pub use etl::transform::{GreptimeTransformer, Transformer}; pub use etl::value::{Array, Map, Value}; diff --git a/src/pipeline/src/manager/error.rs b/src/pipeline/src/manager/error.rs index 4467b42b51c3..a51ad61dacb8 100644 --- a/src/pipeline/src/manager/error.rs +++ b/src/pipeline/src/manager/error.rs @@ -32,14 +32,16 @@ pub enum Error { #[snafu(display("Failed to insert pipeline to pipelines table"))] InsertPipeline { + #[snafu(source)] source: operator::error::Error, #[snafu(implicit)] location: Location, }, - #[snafu(display("Failed to parse pipeline: {}", reason))] + #[snafu(display("Failed to parse pipeline"))] CompilePipeline { - reason: String, + #[snafu(source)] + source: crate::etl::error::Error, #[snafu(implicit)] location: Location, }, @@ -56,6 +58,7 @@ pub enum Error { CollectRecords { #[snafu(implicit)] location: Location, + #[snafu(source)] source: common_recordbatch::error::Error, }, @@ -76,6 +79,7 @@ pub enum Error { #[snafu(display("Failed to execute internal statement"))] ExecuteInternalStatement { + #[snafu(source)] source: query::error::Error, #[snafu(implicit)] location: Location, @@ -83,6 +87,7 @@ pub enum Error { #[snafu(display("Failed to create dataframe"))] DataFrame { + #[snafu(source)] source: query::error::Error, #[snafu(implicit)] location: Location, @@ -90,6 +95,7 @@ pub enum Error { #[snafu(display("General catalog error"))] Catalog { + #[snafu(source)] source: catalog::error::Error, #[snafu(implicit)] location: Location, @@ -97,14 +103,16 @@ pub enum Error { #[snafu(display("Failed to create table"))] CreateTable { + #[snafu(source)] source: operator::error::Error, #[snafu(implicit)] location: Location, }, - #[snafu(display("Failed to execute pipeline, reason: {}", reason))] + #[snafu(display("Failed to execute pipeline"))] PipelineTransform { - reason: String, + #[snafu(source)] + source: crate::etl::error::Error, #[snafu(implicit)] location: Location, }, diff --git a/src/pipeline/src/manager/table.rs b/src/pipeline/src/manager/table.rs index 8989412c3eb5..7b3719b66707 100644 --- a/src/pipeline/src/manager/table.rs +++ b/src/pipeline/src/manager/table.rs @@ -204,8 +204,7 @@ impl PipelineTable { /// Compile a pipeline from a string. pub fn compile_pipeline(pipeline: &str) -> Result> { let yaml_content = Content::Yaml(pipeline.into()); - parse::(&yaml_content) - .map_err(|e| CompilePipelineSnafu { reason: e }.build()) + parse::(&yaml_content).context(CompilePipelineSnafu) } /// Insert a pipeline into the pipeline table. diff --git a/src/pipeline/tests/dissect.rs b/src/pipeline/tests/dissect.rs index 22cf14c46bb3..7577d58080c7 100644 --- a/src/pipeline/tests/dissect.rs +++ b/src/pipeline/tests/dissect.rs @@ -279,5 +279,5 @@ transform: let row = pipeline.exec_mut(&mut result); assert!(row.is_err()); - assert_eq!(row.err().unwrap(), "No matching pattern found"); + assert_eq!(row.err().unwrap().to_string(), "No matching pattern found"); } diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index f0a0902837a9..7877d2b84ab5 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -115,19 +115,18 @@ pub async fn add_pipeline( ) -> Result { let start = Instant::now(); let handler = state.log_handler; - if pipeline_name.is_empty() { - return Err(InvalidParameterSnafu { + ensure!( + !pipeline_name.is_empty(), + InvalidParameterSnafu { reason: "pipeline_name is required in path", } - .build()); - } - - if payload.is_empty() { - return Err(InvalidParameterSnafu { + ); + ensure!( + !payload.is_empty(), + InvalidParameterSnafu { reason: "pipeline is required in body", } - .build()); - } + ); query_ctx.set_channel(Channel::Http); let query_ctx = Arc::new(query_ctx); @@ -252,12 +251,12 @@ pub async fn pipeline_dryrun( let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?; - if value.len() > 10 { - return Err(InvalidParameterSnafu { + ensure!( + value.len() <= 10, + InvalidParameterSnafu { reason: "too many rows for dryrun", } - .build()); - } + ); query_ctx.set_channel(Channel::Http); let query_ctx = Arc::new(query_ctx); @@ -272,11 +271,11 @@ pub async fn pipeline_dryrun( for v in value { pipeline .prepare(v, &mut intermediate_state) - .map_err(|reason| PipelineTransformSnafu { reason }.build()) + .context(PipelineTransformSnafu) .context(PipelineSnafu)?; let r = pipeline .exec_mut(&mut intermediate_state) - .map_err(|reason| PipelineTransformSnafu { reason }.build()) + .context(PipelineTransformSnafu) .context(PipelineSnafu)?; results.push(r); pipeline.reset_intermediate_state(&mut intermediate_state); @@ -438,21 +437,21 @@ async fn ingest_logs_inner( for v in pipeline_data { pipeline .prepare(v, &mut intermediate_state) - .map_err(|reason| { + .inspect_err(|_| { METRIC_HTTP_LOGS_TRANSFORM_ELAPSED .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE]) .observe(transform_timer.elapsed().as_secs_f64()); - PipelineTransformSnafu { reason }.build() }) + .context(PipelineTransformSnafu) .context(PipelineSnafu)?; let r = pipeline .exec_mut(&mut intermediate_state) - .map_err(|reason| { + .inspect_err(|_| { METRIC_HTTP_LOGS_TRANSFORM_ELAPSED .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE]) .observe(transform_timer.elapsed().as_secs_f64()); - PipelineTransformSnafu { reason }.build() }) + .context(PipelineTransformSnafu) .context(PipelineSnafu)?; results.push(r); pipeline.reset_intermediate_state(&mut intermediate_state);