Skip to content

Commit

Permalink
Add message filtering
Browse files Browse the repository at this point in the history
  • Loading branch information
Nekit2217 committed Jul 13, 2024
1 parent 21778d6 commit 62dc437
Show file tree
Hide file tree
Showing 17 changed files with 911 additions and 32 deletions.
7 changes: 7 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ deltalake-azure = { version = "~0.1.3", optional = true }
dynamodb_lock = { version = "0.6.0", optional = true }
# sentry
sentry = { version = "0.23.0", optional = true }
regex = "1.10.2"

[features]
default = []
Expand All @@ -66,6 +67,12 @@ serial_test = "*"
tempfile = "3"
time = "0.3.20"
utime = "0.3"
criterion = "0.5.1"
tar = "0.4"

[[bench]]
name = "filters"
harness = false

[profile.release]
lto = true
57 changes: 57 additions & 0 deletions benches/filters.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use std::fs::File;
use std::io::{self, BufRead, BufReader};

use criterion::{black_box, Criterion, criterion_group, criterion_main};
use serde_json::Value;

use kafka_delta_ingest::{Filter, FilterEngine, FilterError, FilterFactory};

const SOURCE_PATH: &str = "tests/json/web_requests-100.json";

fn read_json_file(file_path: &str) -> io::Result<Vec<Value>> {
let file = File::open(file_path)?;
let reader = BufReader::new(file);
let lines: Vec<_> = reader.lines().take(30000).collect::<io::Result<_>>()?;

let values: Vec<Value> = lines
.iter()
.map(|line| serde_json::from_str::<Value>(&line).unwrap())
.collect();

Ok(values)
}

fn filtering(filter: &Box<dyn Filter>, values: &Vec<Value>) {
for v in values.into_iter() {
match filter.filter(v) {
Ok(_) => {}
Err(e) => {
match e {
FilterError::FilterSkipMessage => {}
_ => panic!("something wrong"),
}
}
};
}
}

fn naive_filter_benchmark(c: &mut Criterion) {
let values = read_json_file(SOURCE_PATH).unwrap();
let filter = FilterFactory::try_build(&FilterEngine::Naive, &vec!("method=='get'".to_string())).expect("wrong");
c.bench_function("naive_filter_benchmark", |b| {
b.iter(|| filtering(&filter, black_box(&values)))
});
}


fn jmespath_filter_benchmark(c: &mut Criterion) {
let values = read_json_file(SOURCE_PATH).unwrap();
let filter = FilterFactory::try_build(&FilterEngine::Jmespath, &vec!("method=='get'".to_string())).expect("wrong");
c.bench_function("jmespath_filter_benchmark", |b| {
b.iter(|| filtering(&filter, black_box(&values)))
});
}


criterion_group!(benches, naive_filter_benchmark, jmespath_filter_benchmark);
criterion_main!(benches);
50 changes: 50 additions & 0 deletions src/filters/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use jmespatch::JmespathError;
use serde_json;

use crate::filters::naive_filter::error::NaiveFilterError;

/// Errors returned by filters
#[derive(thiserror::Error, Debug)]
pub enum FilterError {
/// Failed compile expression
#[error("Failed compile expression: {source}")]
CompileExpressionError {
/// Wrapped [JmespathError]
source: JmespathError,
},

/// Message does not match filter pattern
#[error("Can't filter message: {source}")]
JmespathError {
/// Wrapped [JmespathError]
#[from]
source: JmespathError,
},

/// NaiveFilterError
#[error("NaiveFilter failure: {source}")]
NaiveFilterError {
/// Wrapped [`crate::filters::naive_filter::error::NaiveFilterError`]
#[from]
source: NaiveFilterError,
},

/// Error from [`serde_json`]
#[error("JSON serialization failed: {source}")]
SerdeJson {
/// Wrapped [`serde_json::Error`]
#[from]
source: serde_json::Error,
},

/// Not found filter engine
#[error("Not found filter engine: {reason}")]
NotFound {
///
reason: String
},

/// Error returned for skipping message
#[error("Skipped a message by filter")]
FilterSkipMessage,
}
13 changes: 13 additions & 0 deletions src/filters/filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use serde_json::Value;

use crate::filters::FilterError;

/// Trait for implementing a filter mechanism
pub trait Filter: Send {
/// Constructor
fn from_filters(filters: &Vec<String>) -> Result<Self, FilterError> where Self: Sized;

/// A function that filters a message. If any of the filters fail, it throws an error;
/// if all filters pass, it returns nothing.
fn filter(&self, message: &Value) -> Result<(), FilterError>;
}
34 changes: 34 additions & 0 deletions src/filters/filter_factory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use crate::filters::{Filter, FilterError, JmespathFilter, NaiveFilter};

/// Filter options
#[derive(Clone, Debug)]
pub enum FilterEngine {
/// Filter for simple comparisons that works a little faster
Naive,
/// Filter for complex comparisons
Jmespath
}

/// Factory for creating and managing filters
pub struct FilterFactory {}
impl FilterFactory {
/// Factory for creating filter instances
pub fn try_build(
filter_engine: &FilterEngine, filters: &Vec<String>
) -> Result<Box<dyn Filter>, FilterError> {
match filter_engine {
FilterEngine::Naive => {
match NaiveFilter::from_filters(filters) {
Ok(f) => {Ok(Box::new(f))}
Err(e) => {Err(e)}
}
}
FilterEngine::Jmespath => {
match JmespathFilter::from_filters(filters) {
Ok(f) => {Ok(Box::new(f))}
Err(e) => {Err(e)}
}
}
}
}
}
41 changes: 41 additions & 0 deletions src/filters/jmespath_filter/custom_functions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use std::convert::TryFrom;
use std::sync::Arc;

use jmespatch::{Context, ErrorReason, JmespathError, Rcvar, Variable};
use jmespatch::functions::{ArgumentType, CustomFunction, Signature};

/// Custom function to compare two string values in a case-insensitive manner
fn eq_ignore_case(args: &[Rcvar], context: &mut Context) -> Result<Rcvar, JmespathError> {
let s = match args[0].as_string() {
None => {
return Err(JmespathError::new(
context.expression,
context.offset,
ErrorReason::Parse("first variable must be string".to_string()),
))
}
Some(s) => s,
};

let p = match args[1].as_string() {
None => {
return Err(JmespathError::new(
context.expression,
context.offset,
ErrorReason::Parse("second variable must be string".to_string()),
))
}
Some(p) => p,
};

let var = Variable::try_from(serde_json::Value::Bool(s.eq_ignore_ascii_case(p)))?;

Ok(Arc::new(var))
}

pub fn create_eq_ignore_case_function() -> CustomFunction {
CustomFunction::new(
Signature::new(vec![ArgumentType::String, ArgumentType::String], None),
Box::new(eq_ignore_case),
)
}
171 changes: 171 additions & 0 deletions src/filters/jmespath_filter/filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
use jmespatch::{Expression, Runtime};
use serde_json::Value;

use crate::filters::filter::Filter;
use crate::filters::FilterError;
use crate::filters::jmespath_filter::custom_functions::create_eq_ignore_case_function;

lazy_static! {
static ref FILTER_RUNTIME: Runtime = {
let mut runtime = Runtime::new();
runtime.register_builtin_functions();
runtime.register_function("eq_ignore_case", Box::new(create_eq_ignore_case_function()));
runtime
};
}


/// Implementation of the [Filter] trait for complex checks, such as checking for
/// the presence of a key in an object or comparing the second value in an array
/// or check array length.
/// More examples: https://jmespath.org/examples.html; https://jmespath.org/tutorial.html
pub struct JmespathFilter {
filters: Vec<Expression<'static>>,
}

impl Filter for JmespathFilter {
fn from_filters(filters: &Vec<String>) -> Result<Self, FilterError> {
let filters = filters
.iter()
.map(|f| {
FILTER_RUNTIME
.compile(f)
.map_err(|source| FilterError::CompileExpressionError { source })
})
.collect::<Result<Vec<Expression<'static>>, FilterError>>();
match filters {
Ok(filters) => Ok(Self { filters }),
Err(e) => Err(e),
}
}

fn filter(&self, message: &Value) -> Result<(), FilterError> {
if self.filters.is_empty() {
return Ok(());
}

for filter in &self.filters {
match filter.search(message) {
Err(e) => return Err(FilterError::JmespathError { source: e }),
Ok(v) => {
if v.as_boolean().unwrap() == false {
return Err(FilterError::FilterSkipMessage);
}
}
};
}

return Ok(());
}
}


#[cfg(test)]
mod tests {
use std::fs::File;
use std::io;
use std::io::{BufRead, BufReader};

use super::*;

const SOURCE_PATH: &str = "tests/json/web_requests-100.json";

fn read_json_file(file_path: &str) -> io::Result<Vec<Value>> {
let file = File::open(file_path)?;
let reader = BufReader::new(file);
let lines: Vec<_> = reader.lines().take(30000).collect::<io::Result<_>>()?;

let values: Vec<Value> = lines
.iter()
.map(|line| serde_json::from_str::<Value>(&line).unwrap())
.collect();

Ok(values)
}

fn run_filter(filter: &JmespathFilter, values: &Vec<Value>) -> (i32, i32) {
let mut passed_messages = 0;
let mut filtered_messages = 0;

for v in values.into_iter() {
match filter.filter(&v) {
Ok(_) => { passed_messages += 1 }
Err(FilterError::FilterSkipMessage) => {
filtered_messages += 1
}
Err(e) => panic!("{}", e)
}
}

return (passed_messages, filtered_messages)
}
#[test]
fn equal() {
let values = read_json_file(SOURCE_PATH).unwrap();
let filter = match JmespathFilter::from_filters(&vec!["session_id=='a8a3d0e3-7b4e-4f17-b264-76cb792bdb96'".to_string()]) {
Ok(f) => f,
Err(e) => panic!("{}", e)
};

let (passed_messages, filtered_messages) = run_filter(&filter, &values);

assert_eq!(1, passed_messages);
assert_eq!(99, filtered_messages);
}
#[test]
fn eq_ignore_case() {
let values = read_json_file(SOURCE_PATH).unwrap();
let filter = match JmespathFilter::from_filters(&vec!["eq_ignore_case(method, 'get')".to_string()]) {
Ok(f) => f,
Err(e) => panic!("{}", e)
};

let (passed_messages, filtered_messages) = run_filter(&filter, &values);

assert_eq!(17, passed_messages);
assert_eq!(83, filtered_messages);
}

#[test]
fn or_condition() {
let values = read_json_file(SOURCE_PATH).unwrap();
let filter = match JmespathFilter::from_filters(&vec!["(status == `404` || method == 'GET')".to_string()]) {
Ok(f) => f,
Err(e) => panic!("{}", e)
};

let (passed_messages, filtered_messages) = run_filter(&filter, &values);

assert_eq!(25, passed_messages);
assert_eq!(75, filtered_messages);
}

#[test]
fn complex_condition() {
let buff = r#"{"name": "John Doe", "age": 30, "status": "1", "department": "Engineering"}
{"name": "Jane Smith", "age": 25, "status": "1", "department": "Marketing"}
{"name": "Emily Johnson", "age": 35, "department": "Sales"}
{"name": "Michael Brown", "age": 40, "status": "3", "department": "Engineering"}
{"name": "Sarah Davis", "age": 28, "department": "Marketing"}
{"name": "David Wilson", "age": 22, "department": "Sales"}
{"name": "Laura Martinez", "age": 33, "status": "2", "department": "Engineering"}
{"name": "James Anderson", "age": 45, "department": "Marketing"}
{"name": "Linda Thomas", "age": 50, "department": "Sales"}
{"name": "Robert Jackson", "age": 37, "department": "Engineering"}"#;

let objects = buff.split("\n").map(|s| s.trim()).collect::<Vec<&str>>();
let values: Vec<Value> = objects
.iter()
.map(|line| serde_json::from_str::<Value>(&line).unwrap())
.collect();
let filter = match JmespathFilter::from_filters(&vec!["!contains(keys(@), 'status') || (status == '1' && age >= `26`)".to_string()]) {
Ok(f) => f,
Err(e) => panic!("{}", e)
};

let (passed_messages, filtered_messages) = run_filter(&filter, &values);

assert_eq!(7, passed_messages);
assert_eq!(3, filtered_messages);
}
}
2 changes: 2 additions & 0 deletions src/filters/jmespath_filter/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub(super) mod filter;
mod custom_functions;
Loading

0 comments on commit 62dc437

Please sign in to comment.