diff --git a/dozer-api/src/grpc/common/service.rs b/dozer-api/src/grpc/common/service.rs index 7bc13d2904..72abbf51eb 100644 --- a/dozer-api/src/grpc/common/service.rs +++ b/dozer-api/src/grpc/common/service.rs @@ -124,7 +124,7 @@ impl CommonGrpcService for CommonService { let schema = cache_endpoint.cache_reader().get_schema().0.clone(); endpoints.insert( endpoint, - EndpointFilter::new(schema, filter.filter.as_deref())?, + EndpointFilter::new(schema, filter.r#type, filter.filter.as_deref())?, ); } diff --git a/dozer-api/src/grpc/shared_impl/filter/mod.rs b/dozer-api/src/grpc/shared_impl/filter/mod.rs index 37549f1d18..c6b191b8e6 100644 --- a/dozer-api/src/grpc/shared_impl/filter/mod.rs +++ b/dozer-api/src/grpc/shared_impl/filter/mod.rs @@ -5,19 +5,28 @@ use dozer_types::{ types::{Field, Schema}, }; -use dozer_types::grpc_types::types::{value, Operation, OperationType, Record, Value}; +use dozer_types::grpc_types::types::{value, EventType, Operation, OperationType, Record, Value}; pub fn op_satisfies_filter( op: &Operation, + event_type: EventType, filter: Option<&FilterExpression>, schema: &Schema, ) -> bool { if let Some(filter) = filter { - if op.typ == OperationType::Insert as i32 || op.typ == OperationType::Delete as i32 { - record_satisfies_filter(op.new.as_ref().unwrap(), filter, schema) + if (op.typ == OperationType::Insert as i32) || (op.typ == OperationType::Delete as i32) { + if check_with_event_type(event_type, op) { + record_satisfies_filter(op.new.as_ref().unwrap(), filter, schema) + } else { + false + } } else if op.typ == OperationType::Update as i32 { - record_satisfies_filter(op.old.as_ref().unwrap(), filter, schema) - || record_satisfies_filter(op.new.as_ref().unwrap(), filter, schema) + if check_with_event_type(event_type, op) { + record_satisfies_filter(op.old.as_ref().unwrap(), filter, schema) + || record_satisfies_filter(op.new.as_ref().unwrap(), filter, schema) + } else { + false + } } else { false } @@ -26,6 +35,25 @@ pub fn op_satisfies_filter( } } +fn check_with_event_type(event_type: EventType, op: &Operation) -> bool { + if op.typ == OperationType::Insert as i32 { + if (event_type == EventType::All) || (event_type == EventType::InsertOnly) { + return true; + } + } else if op.typ == OperationType::Delete as i32 { + if (event_type == EventType::All) || (event_type == EventType::DeleteOnly) { + return true; + } + } else if op.typ == OperationType::Update as i32 { + if (event_type == EventType::All) || (event_type == EventType::UpdateOnly) { + return true; + } + } else { + return false; + } + false +} + fn record_satisfies_filter(record: &Record, filter: &FilterExpression, schema: &Schema) -> bool { match filter { FilterExpression::And(filters) => filters diff --git a/dozer-api/src/grpc/shared_impl/filter/tests.rs b/dozer-api/src/grpc/shared_impl/filter/tests.rs index 0e8d4a99eb..9242bbc808 100644 --- a/dozer-api/src/grpc/shared_impl/filter/tests.rs +++ b/dozer-api/src/grpc/shared_impl/filter/tests.rs @@ -1,8 +1,8 @@ +use super::*; use dozer_cache::cache::test_utils::schema_1; +use dozer_types::grpc_types::types::EventType; use dozer_types::serde_json::json; -use super::*; - fn test_field_satisfies_op_impl( field: value::Value, operator: Operator, @@ -804,7 +804,7 @@ fn test_op_satisfies_filter() { let filter2 = FilterExpression::Simple("a".into(), Operator::EQ, json!(2)); let filter3 = FilterExpression::Simple("a".into(), Operator::EQ, json!(3)); - let check = |typ, old: Option<&Record>, new: &Record, filter, expected| { + let check = |typ, old: Option<&Record>, new: &Record, filter, expected, event_type| { assert_eq!( op_satisfies_filter( &Operation { @@ -813,6 +813,7 @@ fn test_op_satisfies_filter() { new: Some(new.clone()), endpoint: "".into() }, + event_type, filter, &schema ), @@ -820,16 +821,61 @@ fn test_op_satisfies_filter() { ); }; - check(OperationType::Insert, None, &new, Some(&filter1), false); - check(OperationType::Insert, None, &new, Some(&filter2), true); - check(OperationType::Delete, None, &new, Some(&filter1), false); - check(OperationType::Delete, None, &new, Some(&filter2), true); + check( + OperationType::Insert, + None, + &new, + Some(&filter1), + false, + EventType::DeleteOnly, + ); + check( + OperationType::Insert, + None, + &new, + Some(&filter2), + true, + EventType::InsertOnly, + ); + check( + OperationType::Insert, + None, + &new, + Some(&filter2), + true, + EventType::All, + ); + check( + OperationType::Insert, + None, + &new, + Some(&filter1), + false, + EventType::UpdateOnly, + ); + check( + OperationType::Delete, + None, + &new, + Some(&filter1), + false, + EventType::UpdateOnly, + ); + check( + OperationType::Delete, + None, + &new, + Some(&filter2), + true, + EventType::DeleteOnly, + ); check( OperationType::Update, Some(&old), &new, Some(&filter1), true, + EventType::UpdateOnly, ); check( OperationType::Update, @@ -837,6 +883,7 @@ fn test_op_satisfies_filter() { &new, Some(&filter2), true, + EventType::UpdateOnly, ); check( OperationType::Update, @@ -844,5 +891,6 @@ fn test_op_satisfies_filter() { &new, Some(&filter3), false, + EventType::UpdateOnly, ); } diff --git a/dozer-api/src/grpc/shared_impl/mod.rs b/dozer-api/src/grpc/shared_impl/mod.rs index 4627396d58..c22170938c 100644 --- a/dozer-api/src/grpc/shared_impl/mod.rs +++ b/dozer-api/src/grpc/shared_impl/mod.rs @@ -12,6 +12,8 @@ use tokio::sync::broadcast::error::RecvError; use tokio::sync::broadcast::Receiver; use tokio_stream::wrappers::ReceiverStream; +use dozer_types::grpc_types::types::EventType; + use crate::api_helper::{get_records, get_records_count}; use crate::auth::Access; @@ -68,10 +70,23 @@ pub fn query( pub struct EndpointFilter { schema: Schema, filter: Option, + event_type: EventType, } impl EndpointFilter { - pub fn new(schema: Schema, filter: Option<&str>) -> Result { + fn convert_event_type(event_type: i32) -> EventType { + if event_type == 1 { + EventType::InsertOnly + } else if event_type == 2 { + EventType::UpdateOnly + } else if event_type == 3 { + EventType::DeleteOnly + } else { + EventType::All + } + } + + pub fn new(schema: Schema, event_type: i32, filter: Option<&str>) -> Result { let filter = filter .and_then(|filter| { if filter.is_empty() { @@ -82,7 +97,12 @@ impl EndpointFilter { }) .transpose() .map_err(from_error)?; - Ok(Self { schema, filter }) + let event_type = EndpointFilter::convert_event_type(event_type); + Ok(Self { + schema, + filter, + event_type, + }) } } @@ -115,6 +135,7 @@ pub fn on_event( if let Some(filter) = endpoints.get(&op.endpoint) { if filter::op_satisfies_filter( &op, + filter.event_type, filter.filter.as_ref(), &filter.schema, ) && (tx.send(event_mapper(op)).await).is_err() diff --git a/dozer-api/src/grpc/typed/service.rs b/dozer-api/src/grpc/typed/service.rs index 75bd5b113d..e730a1accb 100644 --- a/dozer-api/src/grpc/typed/service.rs +++ b/dozer-api/src/grpc/typed/service.rs @@ -17,6 +17,7 @@ use crate::{ CacheEndpoint, }; use dozer_cache::CacheReader; +use dozer_types::grpc_types::types::EventType; use dozer_types::tonic::{ self, codegen::{ @@ -374,7 +375,18 @@ fn on_event( .ok_or_else(|| Status::new(Code::InvalidArgument, "filter must be a string")) }) .transpose()?; - let filter = EndpointFilter::new(schema, filter)?; + + let event_type = query_request.get_field_by_name("type"); + let event_type = event_type.as_ref(); + + let event_type = event_type + .map(|event_type| { + event_type + .as_enum_number() + .ok_or_else(|| Status::new(Code::InvalidArgument, "type must be an EventType")) + }) + .transpose()?; + let filter = EndpointFilter::new(schema, event_type.unwrap_or(EventType::All.into()), filter)?; shared_impl::on_event( [(table_name, filter)].into_iter().collect(),