From 2c2fbea93d312a81c970141f0ed5ba363cd1a191 Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 18 Jul 2024 18:33:24 +0300 Subject: [PATCH] clippy, fmt, test typo Added tests for naive filter Fixed the NaiveFilterOperand from_str constructor --- benches/filters.rs | 21 ++-- src/filters/error.rs | 9 +- src/filters/filter.rs | 6 +- src/filters/filter_factory.rs | 27 ++--- .../jmespath_filter/custom_functions.rs | 4 +- src/filters/jmespath_filter/filter.rs | 46 +++---- src/filters/jmespath_filter/mod.rs | 2 +- src/filters/mod.rs | 5 +- src/filters/naive_filter/error.rs | 4 +- src/filters/naive_filter/filter.rs | 112 +++++++++++++----- src/filters/naive_filter/mod.rs | 6 +- src/filters/naive_filter/operand.rs | 32 +++-- src/filters/naive_filter/operator.rs | 18 +-- src/lib.rs | 31 +++-- src/main.rs | 20 ++-- 15 files changed, 212 insertions(+), 131 deletions(-) diff --git a/benches/filters.rs b/benches/filters.rs index fd7b94c..c0ddbc3 100644 --- a/benches/filters.rs +++ b/benches/filters.rs @@ -1,7 +1,7 @@ use std::fs::File; use std::io::{self, BufRead, BufReader}; -use criterion::{black_box, Criterion, criterion_group, criterion_main}; +use criterion::{black_box, criterion_group, criterion_main, Criterion}; use serde_json::Value; use kafka_delta_ingest::{Filter, FilterEngine, FilterError, FilterFactory}; @@ -25,33 +25,32 @@ fn filtering(filter: &Box, values: &Vec) { for v in values.into_iter() { match filter.filter(v) { Ok(_) => {} - Err(e) => { - match e { - FilterError::FilterSkipMessage => {} - _ => panic!("something wrong"), - } - } + Err(e) => match e { + FilterError::FilterSkipMessage => {} + _ => panic!("something wrong"), + }, }; } } fn naive_filter_benchmark(c: &mut Criterion) { let values = read_json_file(SOURCE_PATH).unwrap(); - let filter = FilterFactory::try_build(&FilterEngine::Naive, &vec!("method=='GET'".to_string())).expect("wrong"); + let filter = FilterFactory::try_build(&FilterEngine::Naive, &vec!["method=='GET'".to_string()]) + .expect("wrong"); c.bench_function("naive_filter_benchmark", |b| { b.iter(|| filtering(&filter, black_box(&values))) }); } - fn jmespath_filter_benchmark(c: &mut Criterion) { let values = read_json_file(SOURCE_PATH).unwrap(); - let filter = FilterFactory::try_build(&FilterEngine::Jmespath, &vec!("method=='GET'".to_string())).expect("wrong"); + let filter = + FilterFactory::try_build(&FilterEngine::Jmespath, &vec!["method=='GET'".to_string()]) + .expect("wrong"); c.bench_function("jmespath_filter_benchmark", |b| { b.iter(|| filtering(&filter, black_box(&values))) }); } - criterion_group!(benches, naive_filter_benchmark, jmespath_filter_benchmark); criterion_main!(benches); diff --git a/src/filters/error.rs b/src/filters/error.rs index 17cf273..a3204e0 100644 --- a/src/filters/error.rs +++ b/src/filters/error.rs @@ -1,5 +1,4 @@ use jmespatch::JmespathError; -use serde_json; use crate::filters::naive_filter::error::NaiveFilterError; @@ -37,11 +36,11 @@ pub enum FilterError { source: serde_json::Error, }, - /// Not found filter engine - #[error("Not found filter engine: {reason}")] + /// Filter engine not found + #[error("Filter engine not found: {name}")] NotFound { - /// - reason: String + /// Wrong name + name: String, }, /// Error returned for skipping message diff --git a/src/filters/filter.rs b/src/filters/filter.rs index 547ca8c..d07ce06 100644 --- a/src/filters/filter.rs +++ b/src/filters/filter.rs @@ -5,9 +5,11 @@ use crate::filters::FilterError; /// Trait for implementing a filter mechanism pub trait Filter: Send { /// Constructor - fn from_filters(filters: &Vec) -> Result where Self: Sized; + fn from_filters(filters: &[String]) -> Result + where + Self: Sized; /// A function that filters a message. If any of the filters fail, it throws an error; /// if all filters pass, it returns nothing. fn filter(&self, message: &Value) -> Result<(), FilterError>; -} \ No newline at end of file +} diff --git a/src/filters/filter_factory.rs b/src/filters/filter_factory.rs index 9942201..ce9a387 100644 --- a/src/filters/filter_factory.rs +++ b/src/filters/filter_factory.rs @@ -6,7 +6,7 @@ pub enum FilterEngine { /// Filter for simple comparisons that works a little faster Naive, /// Filter for complex comparisons - Jmespath + Jmespath, } /// Factory for creating and managing filters @@ -14,21 +14,18 @@ pub struct FilterFactory {} impl FilterFactory { /// Factory for creating filter instances pub fn try_build( - filter_engine: &FilterEngine, filters: &Vec + filter_engine: &FilterEngine, + filters: &[String], ) -> Result, FilterError> { match filter_engine { - FilterEngine::Naive => { - match NaiveFilter::from_filters(filters) { - Ok(f) => {Ok(Box::new(f))} - Err(e) => {Err(e)} - } - } - FilterEngine::Jmespath => { - match JmespathFilter::from_filters(filters) { - Ok(f) => {Ok(Box::new(f))} - Err(e) => {Err(e)} - } - } + FilterEngine::Naive => match NaiveFilter::from_filters(filters) { + Ok(f) => Ok(Box::new(f)), + Err(e) => Err(e), + }, + FilterEngine::Jmespath => match JmespathFilter::from_filters(filters) { + Ok(f) => Ok(Box::new(f)), + Err(e) => Err(e), + }, } } -} \ No newline at end of file +} diff --git a/src/filters/jmespath_filter/custom_functions.rs b/src/filters/jmespath_filter/custom_functions.rs index 40b5e30..28fd8fd 100644 --- a/src/filters/jmespath_filter/custom_functions.rs +++ b/src/filters/jmespath_filter/custom_functions.rs @@ -1,8 +1,8 @@ use std::convert::TryFrom; use std::sync::Arc; -use jmespatch::{Context, ErrorReason, JmespathError, Rcvar, Variable}; use jmespatch::functions::{ArgumentType, CustomFunction, Signature}; +use jmespatch::{Context, ErrorReason, JmespathError, Rcvar, Variable}; /// Custom function to compare two string values in a case-insensitive manner fn eq_ignore_case(args: &[Rcvar], context: &mut Context) -> Result { @@ -38,4 +38,4 @@ pub fn create_eq_ignore_case_function() -> CustomFunction { Signature::new(vec![ArgumentType::String, ArgumentType::String], None), Box::new(eq_ignore_case), ) -} \ No newline at end of file +} diff --git a/src/filters/jmespath_filter/filter.rs b/src/filters/jmespath_filter/filter.rs index 3d23c37..1fda756 100644 --- a/src/filters/jmespath_filter/filter.rs +++ b/src/filters/jmespath_filter/filter.rs @@ -2,8 +2,8 @@ use jmespatch::{Expression, Runtime}; use serde_json::Value; use crate::filters::filter::Filter; -use crate::filters::FilterError; use crate::filters::jmespath_filter::custom_functions::create_eq_ignore_case_function; +use crate::filters::FilterError; lazy_static! { static ref FILTER_RUNTIME: Runtime = { @@ -14,7 +14,6 @@ lazy_static! { }; } - /// Implementation of the [Filter] trait for complex checks, such as checking for /// the presence of a key in an object or comparing the second value in an array /// or check array length. @@ -24,7 +23,7 @@ pub struct JmespathFilter { } impl Filter for JmespathFilter { - fn from_filters(filters: &Vec) -> Result { + fn from_filters(filters: &[String]) -> Result { let filters = filters .iter() .map(|f| { @@ -48,18 +47,17 @@ impl Filter for JmespathFilter { match filter.search(message) { Err(e) => return Err(FilterError::JmespathError { source: e }), Ok(v) => { - if v.as_boolean().unwrap() == false { + if !v.as_boolean().unwrap() { return Err(FilterError::FilterSkipMessage); } } }; } - return Ok(()); + Ok(()) } } - #[cfg(test)] mod tests { use std::fs::File; @@ -89,22 +87,22 @@ mod tests { for v in values.into_iter() { match filter.filter(&v) { - Ok(_) => { passed_messages += 1 } - Err(FilterError::FilterSkipMessage) => { - filtered_messages += 1 - } - Err(e) => panic!("{}", e) + Ok(_) => passed_messages += 1, + Err(FilterError::FilterSkipMessage) => filtered_messages += 1, + Err(e) => panic!("{}", e), } } - return (passed_messages, filtered_messages) + return (passed_messages, filtered_messages); } #[test] fn equal() { let values = read_json_file(SOURCE_PATH).unwrap(); - let filter = match JmespathFilter::from_filters(&vec!["session_id=='a8a3d0e3-7b4e-4f17-b264-76cb792bdb96'".to_string()]) { + let filter = match JmespathFilter::from_filters(&vec![ + "session_id=='a8a3d0e3-7b4e-4f17-b264-76cb792bdb96'".to_string(), + ]) { Ok(f) => f, - Err(e) => panic!("{}", e) + Err(e) => panic!("{}", e), }; let (passed_messages, filtered_messages) = run_filter(&filter, &values); @@ -115,9 +113,11 @@ mod tests { #[test] fn eq_ignore_case() { let values = read_json_file(SOURCE_PATH).unwrap(); - let filter = match JmespathFilter::from_filters(&vec!["eq_ignore_case(method, 'get')".to_string()]) { + let filter = match JmespathFilter::from_filters(&vec![ + "eq_ignore_case(method, 'get')".to_string() + ]) { Ok(f) => f, - Err(e) => panic!("{}", e) + Err(e) => panic!("{}", e), }; let (passed_messages, filtered_messages) = run_filter(&filter, &values); @@ -129,9 +129,11 @@ mod tests { #[test] fn or_condition() { let values = read_json_file(SOURCE_PATH).unwrap(); - let filter = match JmespathFilter::from_filters(&vec!["(status == `404` || method == 'GET')".to_string()]) { + let filter = match JmespathFilter::from_filters(&vec![ + "(status == `404` || method == 'GET')".to_string(), + ]) { Ok(f) => f, - Err(e) => panic!("{}", e) + Err(e) => panic!("{}", e), }; let (passed_messages, filtered_messages) = run_filter(&filter, &values); @@ -158,9 +160,11 @@ mod tests { .iter() .map(|line| serde_json::from_str::(&line).unwrap()) .collect(); - let filter = match JmespathFilter::from_filters(&vec!["!contains(keys(@), 'status') || (status == '1' && age >= `26`)".to_string()]) { + let filter = match JmespathFilter::from_filters(&vec![ + "!contains(keys(@), 'status') || (status == '1' && age >= `26`)".to_string(), + ]) { Ok(f) => f, - Err(e) => panic!("{}", e) + Err(e) => panic!("{}", e), }; let (passed_messages, filtered_messages) = run_filter(&filter, &values); @@ -168,4 +172,4 @@ mod tests { assert_eq!(7, passed_messages); assert_eq!(3, filtered_messages); } -} \ No newline at end of file +} diff --git a/src/filters/jmespath_filter/mod.rs b/src/filters/jmespath_filter/mod.rs index c058857..c79dafe 100644 --- a/src/filters/jmespath_filter/mod.rs +++ b/src/filters/jmespath_filter/mod.rs @@ -1,2 +1,2 @@ -pub(super) mod filter; mod custom_functions; +pub(super) mod filter; diff --git a/src/filters/mod.rs b/src/filters/mod.rs index 2a7f789..f1ea6b6 100644 --- a/src/filters/mod.rs +++ b/src/filters/mod.rs @@ -4,9 +4,8 @@ pub use filter_factory::{FilterEngine, FilterFactory}; pub(crate) use jmespath_filter::filter::JmespathFilter; pub(crate) use naive_filter::filter::NaiveFilter; -mod naive_filter; -mod jmespath_filter; mod error; mod filter; mod filter_factory; - +mod jmespath_filter; +mod naive_filter; diff --git a/src/filters/naive_filter/error.rs b/src/filters/naive_filter/error.rs index 4f66daf..5de1c23 100644 --- a/src/filters/naive_filter/error.rs +++ b/src/filters/naive_filter/error.rs @@ -1,5 +1,3 @@ -use serde_json; - #[derive(thiserror::Error, Debug)] pub enum NaiveFilterError { /// Error from [`serde_json`] @@ -9,7 +7,7 @@ pub enum NaiveFilterError { #[from] source: serde_json::Error, }, - + /// Error occurs when trying to execute a filter #[error("NaiveFilter execution error: {reason}")] RuntimeError { reason: String }, diff --git a/src/filters/naive_filter/filter.rs b/src/filters/naive_filter/filter.rs index 1a5af9d..d9136d8 100644 --- a/src/filters/naive_filter/filter.rs +++ b/src/filters/naive_filter/filter.rs @@ -2,9 +2,9 @@ use regex::Regex; use serde_json::Value; use crate::filters::filter::Filter; -use crate::filters::FilterError; use crate::filters::naive_filter::operand::NaiveFilterOperand; use crate::filters::naive_filter::operator::{get_operator, OperatorRef}; +use crate::filters::FilterError; pub struct NaiveFilterExpression { left: NaiveFilterOperand, @@ -21,28 +21,29 @@ pub(crate) struct NaiveFilter { } impl Filter for NaiveFilter { - fn from_filters(filters: &Vec) -> Result { + fn from_filters(filters: &[String]) -> Result { let mut expressions: Vec = Vec::new(); let re = Regex::new(r"(?.*)(?>=|<=|==|!=|~=|>|<)(?.*)").unwrap(); for filter in filters.iter() { let (_, [left, op, right]) = re.captures(filter.trim()).unwrap().extract(); - expressions.push( - NaiveFilterExpression { - left: NaiveFilterOperand::from_str(left)?, - op: get_operator(op)?, - right: NaiveFilterOperand::from_str(right)?, - } - ); + expressions.push(NaiveFilterExpression { + left: NaiveFilterOperand::from_str(left)?, + op: get_operator(op)?, + right: NaiveFilterOperand::from_str(right)?, + }); } - return Ok(NaiveFilter { + Ok(NaiveFilter { filters: expressions, }) } - fn filter(&self, message: &Value) -> Result<(), FilterError>{ + fn filter(&self, message: &Value) -> Result<(), FilterError> { for filter in self.filters.iter() { - if !filter.op.execute(filter.left.get_value(message), filter.right.get_value(message))? { + if !filter.op.execute( + filter.left.get_value(message), + filter.right.get_value(message), + )? { return Err(FilterError::FilterSkipMessage); } } @@ -50,7 +51,6 @@ impl Filter for NaiveFilter { } } - #[cfg(test)] mod tests { use std::fs::File; @@ -80,22 +80,23 @@ mod tests { for v in values.into_iter() { match filter.filter(&v) { - Ok(_) => { passed_messages += 1 } - Err(FilterError::FilterSkipMessage) => { - filtered_messages += 1 - } - Err(e) => panic!("{}", e) + Ok(_) => passed_messages += 1, + Err(FilterError::FilterSkipMessage) => filtered_messages += 1, + Err(e) => panic!("{}", e), } } - return (passed_messages, filtered_messages) + return (passed_messages, filtered_messages); } #[test] fn greater_than_or_equal() { let values = read_json_file(SOURCE_PATH).unwrap(); - let filter = match NaiveFilter::from_filters(&vec!["status>=`201`".to_string(), "method=='GET'".to_string()]) { + let filter = match NaiveFilter::from_filters(&vec![ + "status>=`201`".to_string(), + "method=='GET'".to_string(), + ]) { Ok(f) => f, - Err(e) => panic!("{}", e) + Err(e) => panic!("{}", e), }; let (passed_messages, filtered_messages) = run_filter(&filter, &values); @@ -106,9 +107,12 @@ mod tests { #[test] fn less_than_or_equal() { let values = read_json_file(SOURCE_PATH).unwrap(); - let filter = match NaiveFilter::from_filters(&vec!["status<=`403`".to_string(), "method=='POST'".to_string()]) { + let filter = match NaiveFilter::from_filters(&vec![ + "status<=`403`".to_string(), + "method=='POST'".to_string(), + ]) { Ok(f) => f, - Err(e) => panic!("{}", e) + Err(e) => panic!("{}", e), }; let (passed_messages, filtered_messages) = run_filter(&filter, &values); @@ -119,9 +123,11 @@ mod tests { #[test] fn equal() { let values = read_json_file(SOURCE_PATH).unwrap(); - let filter = match NaiveFilter::from_filters(&vec!["session_id=='a8a3d0e3-7b4e-4f17-b264-76cb792bdb96'".to_string()]) { + let filter = match NaiveFilter::from_filters(&vec![ + "session_id=='a8a3d0e3-7b4e-4f17-b264-76cb792bdb96'".to_string(), + ]) { Ok(f) => f, - Err(e) => panic!("{}", e) + Err(e) => panic!("{}", e), }; let (passed_messages, filtered_messages) = run_filter(&filter, &values); @@ -134,7 +140,7 @@ mod tests { let values = read_json_file(SOURCE_PATH).unwrap(); let filter = match NaiveFilter::from_filters(&vec!["method!='POST'".to_string()]) { Ok(f) => f, - Err(e) => panic!("{}", e) + Err(e) => panic!("{}", e), }; let (passed_messages, filtered_messages) = run_filter(&filter, &values); @@ -145,9 +151,9 @@ mod tests { #[test] fn eq_ignore_case() { let values = read_json_file(SOURCE_PATH).unwrap(); - let filter = match NaiveFilter::from_filters(&vec!["method~='get')".to_string()]) { + let filter = match NaiveFilter::from_filters(&vec!["method~='get'".to_string()]) { Ok(f) => f, - Err(e) => panic!("{}", e) + Err(e) => panic!("{}", e), }; let (passed_messages, filtered_messages) = run_filter(&filter, &values); @@ -155,4 +161,52 @@ mod tests { assert_eq!(17, passed_messages); assert_eq!(83, filtered_messages); } -} \ No newline at end of file + + #[test] + fn invalid_filters() { + assert!( + NaiveFilter::from_filters(&vec!["method~='get]".to_string()]).is_err(), + "The filter should not have been created" + ); + assert!( + NaiveFilter::from_filters(&vec!["method~='get']".to_string()]).is_err(), + "The filter should not have been created" + ); + assert!( + NaiveFilter::from_filters(&vec!["status~=`404".to_string()]).is_err(), + "The filter should not have been created" + ); + assert!( + NaiveFilter::from_filters(&vec!["status~=`404,123`".to_string()]).is_err(), + "The filter should not have been created" + ); + assert!( + NaiveFilter::from_filters(&vec!["status~=`abc`".to_string()]).is_err(), + "The filter should not have been created" + ); + assert!( + NaiveFilter::from_filters(&vec!["status~=`abc`".to_string()]).is_err(), + "The filter should not have been created" + ); + } + + #[test] + fn valid_filters() { + assert!( + NaiveFilter::from_filters(&vec!["method=='get'".to_string()]).is_ok(), + "The filter should have been created" + ); + assert!( + NaiveFilter::from_filters(&vec!["status==`404`".to_string()]).is_ok(), + "The filter should have been created" + ); + assert!( + NaiveFilter::from_filters(&vec!["status==internal.status".to_string()]).is_ok(), + "The filter should have been created" + ); + assert!( + NaiveFilter::from_filters(&vec!["internal.value!=`3.1415962`".to_string()]).is_ok(), + "The filter should have been created" + ); + } +} diff --git a/src/filters/naive_filter/mod.rs b/src/filters/naive_filter/mod.rs index 9dc4f19..6554787 100644 --- a/src/filters/naive_filter/mod.rs +++ b/src/filters/naive_filter/mod.rs @@ -1,4 +1,4 @@ -pub(super) mod operator; -pub(super) mod operand; -pub(super) mod filter; pub(super) mod error; +pub(super) mod filter; +pub(super) mod operand; +pub(super) mod operator; diff --git a/src/filters/naive_filter/operand.rs b/src/filters/naive_filter/operand.rs index fd588eb..61bec4d 100644 --- a/src/filters/naive_filter/operand.rs +++ b/src/filters/naive_filter/operand.rs @@ -21,22 +21,38 @@ impl NaiveFilterOperand { } let path: Vec = path.unwrap().split('.').map(str::to_string).collect(); - return Ok(Self { + Ok(Self { value, path: Some(path), - }); + }) } pub(crate) fn from_str(operand_str: &str) -> Result { - if operand_str.starts_with('`') { + let operand_str = operand_str.trim(); + + match operand_str.chars().next() { // number - NaiveFilterOperand::new(serde_json::from_str(operand_str.trim_matches('`'))?, None) - } else if operand_str.starts_with('\'') { + Some('`') => { + if !operand_str.ends_with('`') { + return Err(NaiveFilterError::PrepareError { + reason: "To filter by number, the number must begin and end with `" + .to_string(), + }); + } + NaiveFilterOperand::new(serde_json::from_str(operand_str.trim_matches('`'))?, None) + } // string - NaiveFilterOperand::new(Some(json!(operand_str.trim_matches('\''))), None) - } else { + Some('\'') => { + if !operand_str.ends_with('\'') { + return Err(NaiveFilterError::PrepareError { + reason: "To filter by string, the string must begin and end with '" + .to_string(), + }); + } + NaiveFilterOperand::new(Some(json!(operand_str.trim_matches('\''))), None) + } // path to attribute via dot - NaiveFilterOperand::new(None, Some(operand_str.to_string())) + _ => NaiveFilterOperand::new(None, Some(operand_str.to_string())), } } fn is_path(&self) -> bool { diff --git a/src/filters/naive_filter/operator.rs b/src/filters/naive_filter/operator.rs index 753e34a..b5e183f 100644 --- a/src/filters/naive_filter/operator.rs +++ b/src/filters/naive_filter/operator.rs @@ -20,7 +20,7 @@ impl Operator for GteOperator { fn execute(&self, left: &Value, right: &Value) -> Result { match left { Value::Number(n) => { - return if let Some(integer) = n.as_i64() { + if let Some(integer) = n.as_i64() { Ok(integer >= right.as_i64().unwrap()) } else { Ok(n.as_f64().unwrap() >= right.as_f64().unwrap()) @@ -39,7 +39,7 @@ impl Operator for LteOperator { fn execute(&self, left: &Value, right: &Value) -> Result { match left { Value::Number(n) => { - return if let Some(integer) = n.as_i64() { + if let Some(integer) = n.as_i64() { Ok(integer <= right.as_i64().unwrap()) } else { Ok(n.as_f64().unwrap() <= right.as_f64().unwrap()) @@ -57,7 +57,7 @@ impl Operator for EqOperator { fn execute(&self, left: &Value, right: &Value) -> Result { match left { Value::Number(n) => { - return if let Some(integer) = n.as_i64() { + if let Some(integer) = n.as_i64() { Ok(integer == right.as_i64().unwrap()) } else { Ok(n.as_f64().unwrap() == right.as_f64().unwrap()) @@ -66,7 +66,7 @@ impl Operator for EqOperator { Value::String(s) => { return Ok(s.as_str() == right.as_str().unwrap()) }, - Value::Bool(b) => return Ok(*b == right.as_bool().unwrap()), + Value::Bool(b) => Ok(*b == right.as_bool().unwrap()), _ => Err( NaiveFilterError::RuntimeError { reason: format!("The == operator can only be used for numbers, strings or bools. Passed: {:?}, {:?}", left, right) @@ -79,16 +79,16 @@ impl Operator for NeqOperator { fn execute(&self, left: &Value, right: &Value) -> Result { match left { Value::Number(n) => { - return if let Some(integer) = n.as_i64() { + if let Some(integer) = n.as_i64() { Ok(integer != right.as_i64().unwrap()) } else { Ok(n.as_f64().unwrap() != right.as_f64().unwrap()) } }, Value::String(s) => { - return Ok(s.as_str() != right.as_str().unwrap()) + Ok(s.as_str() != right.as_str().unwrap()) }, - Value::Bool(b) => return Ok(*b != right.as_bool().unwrap()), + Value::Bool(b) => Ok(*b != right.as_bool().unwrap()), _ => Err( NaiveFilterError::RuntimeError { reason: format!("The != operator can only be used for numbers, strings or bools. Passed: {:?}, {:?}", left, right) @@ -109,7 +109,7 @@ impl Operator for GtOperator { fn execute(&self, left: &Value, right: &Value) -> Result { match left { Value::Number(n) => { - return if let Some(integer) = n.as_i64() { + if let Some(integer) = n.as_i64() { Ok(integer > right.as_i64().unwrap()) } else { Ok(n.as_f64().unwrap() > right.as_f64().unwrap()) @@ -127,7 +127,7 @@ impl Operator for LtOperator { fn execute(&self, left: &Value, right: &Value) -> Result { match left { Value::Number(n) => { - return if let Some(integer) = n.as_i64() { + if let Some(integer) = n.as_i64() { Ok(integer < right.as_i64().unwrap()) } else { Ok(n.as_f64().unwrap() < right.as_f64().unwrap()) diff --git a/src/lib.rs b/src/lib.rs index ca9addb..fbf2d83 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,25 +14,25 @@ extern crate serde_json; #[macro_use] extern crate strum_macros; -use std::{collections::HashMap, path::PathBuf}; use std::ops::Add; use std::sync::Arc; use std::time::{Duration, Instant}; +use std::{collections::HashMap, path::PathBuf}; -use deltalake_core::{DeltaTable, DeltaTableError}; use deltalake_core::operations::transaction::TableReference; use deltalake_core::protocol::DeltaOperation; use deltalake_core::protocol::OutputMode; +use deltalake_core::{DeltaTable, DeltaTableError}; use futures::stream::StreamExt; use log::{debug, error, info, warn}; +use rdkafka::message::BorrowedMessage; use rdkafka::{ - ClientContext, config::ClientConfig, consumer::{Consumer, ConsumerContext, Rebalance, StreamConsumer}, error::KafkaError, - Message, Offset, TopicPartitionList, util::Timeout, + util::Timeout, + ClientContext, Message, Offset, TopicPartitionList, }; -use rdkafka::message::BorrowedMessage; use serde_json::Value; use tokio::sync::RwLock; use tokio_util::sync::CancellationToken; @@ -42,22 +42,22 @@ use coercions::CoercionTree; use delta_helpers::*; use serialization::{MessageDeserializer, MessageDeserializerFactory}; +pub use crate::filters::{Filter, FilterEngine, FilterError, FilterFactory}; +use crate::offsets::WriteOffsetsError; +use crate::value_buffers::{ConsumedBuffers, ValueBuffers}; use crate::{ dead_letters::*, metrics::*, transforms::*, writer::{DataWriter, DataWriterError}, }; -pub use crate::filters::{Filter, FilterEngine, FilterError, FilterFactory}; -use crate::offsets::WriteOffsetsError; -use crate::value_buffers::{ConsumedBuffers, ValueBuffers}; mod coercions; -mod filters; /// Doc pub mod cursor; mod dead_letters; mod delta_helpers; +mod filters; mod metrics; mod offsets; mod serialization; @@ -212,11 +212,18 @@ pub enum IngestError { #[error("FilterError: {source}")] Filter { /// Wrapped [`FilterError`] - #[from] - source: FilterError, + source: Box, }, } +impl From for IngestError { + fn from(error: FilterError) -> Self { + IngestError::Filter { + source: Box::new(error), + } + } +} + /// Formats for message parsing #[derive(Clone, Debug)] pub enum MessageFormat { @@ -459,7 +466,7 @@ pub async fn start_ingest( debug!("Skipping message with partition {}, offset {} on topic {} because it was already processed", partition, offset, topic); continue; } - IngestError::Filter { source } => match source { + IngestError::Filter { source } => match *source { FilterError::FilterSkipMessage => { ingest_metrics.message_filtered(); debug!("Skip message by filter"); diff --git a/src/main.rs b/src/main.rs index 95079a6..be3fb9b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -40,8 +40,8 @@ use clap::{Arg, ArgAction, ArgGroup, ArgMatches, Command}; use log::{error, info, LevelFilter}; use kafka_delta_ingest::{ - AutoOffsetReset, DataTypeOffset, DataTypePartition, FilterEngine, FilterError, IngestOptions, - MessageFormat, SchemaSource, start_ingest + start_ingest, AutoOffsetReset, DataTypeOffset, DataTypePartition, FilterEngine, FilterError, + IngestOptions, MessageFormat, SchemaSource, }; #[tokio::main(flavor = "current_thread")] @@ -123,7 +123,7 @@ async fn main() -> anyhow::Result<()> { let filters: Vec = ingest_matches .get_many::("filter") - .map(|list| list.map(|f| f.clone()).collect()) + .map(|list| list.cloned().collect()) .unwrap_or_else(Vec::new); let filter_engine: FilterEngine = convert_matches_to_filter_engine(ingest_matches)?; @@ -526,12 +526,18 @@ fn convert_matches_to_message_format( fn convert_matches_to_filter_engine( ingest_matches: &ArgMatches, -) -> Result { - return match ingest_matches.get_one::("filter_engine").unwrap().as_str() { +) -> Result> { + return match ingest_matches + .get_one::("filter_engine") + .unwrap() + .as_str() + { "naive" => Ok(FilterEngine::Naive), "jmespath" => Ok(FilterEngine::Jmespath), - f => Err(FilterError::NotFound {reason: f.to_string() }) - } + f => Err(Box::new(FilterError::NotFound { + name: f.to_string(), + })), + }; } #[cfg(test)]