Skip to content

Commit

Permalink
Added type filter in the event (#2347)
Browse files Browse the repository at this point in the history
* Added type filter in the event

* Resolved the CI errors

* check1

* check2

* check3

* Solved lint issues

* Added review changes

* check

* added final changes
  • Loading branch information
MG190202 authored Jan 30, 2024
1 parent 2f526c8 commit 58c85e3
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 16 deletions.
2 changes: 1 addition & 1 deletion dozer-api/src/grpc/common/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl CommonGrpcService for CommonService {
let schema = cache_endpoint.cache_reader().get_schema().0.clone();
endpoints.insert(
endpoint,
EndpointFilter::new(schema, filter.filter.as_deref())?,
EndpointFilter::new(schema, filter.r#type, filter.filter.as_deref())?,
);
}

Expand Down
38 changes: 33 additions & 5 deletions dozer-api/src/grpc/shared_impl/filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,28 @@ use dozer_types::{
types::{Field, Schema},
};

use dozer_types::grpc_types::types::{value, Operation, OperationType, Record, Value};
use dozer_types::grpc_types::types::{value, EventType, Operation, OperationType, Record, Value};

pub fn op_satisfies_filter(
op: &Operation,
event_type: EventType,
filter: Option<&FilterExpression>,
schema: &Schema,
) -> bool {
if let Some(filter) = filter {
if op.typ == OperationType::Insert as i32 || op.typ == OperationType::Delete as i32 {
record_satisfies_filter(op.new.as_ref().unwrap(), filter, schema)
if (op.typ == OperationType::Insert as i32) || (op.typ == OperationType::Delete as i32) {
if check_with_event_type(event_type, op) {
record_satisfies_filter(op.new.as_ref().unwrap(), filter, schema)
} else {
false
}
} else if op.typ == OperationType::Update as i32 {
record_satisfies_filter(op.old.as_ref().unwrap(), filter, schema)
|| record_satisfies_filter(op.new.as_ref().unwrap(), filter, schema)
if check_with_event_type(event_type, op) {
record_satisfies_filter(op.old.as_ref().unwrap(), filter, schema)
|| record_satisfies_filter(op.new.as_ref().unwrap(), filter, schema)
} else {
false
}
} else {
false
}
Expand All @@ -26,6 +35,25 @@ pub fn op_satisfies_filter(
}
}

fn check_with_event_type(event_type: EventType, op: &Operation) -> bool {
if op.typ == OperationType::Insert as i32 {
if (event_type == EventType::All) || (event_type == EventType::InsertOnly) {
return true;
}
} else if op.typ == OperationType::Delete as i32 {
if (event_type == EventType::All) || (event_type == EventType::DeleteOnly) {
return true;
}
} else if op.typ == OperationType::Update as i32 {
if (event_type == EventType::All) || (event_type == EventType::UpdateOnly) {
return true;
}
} else {
return false;
}
false
}

fn record_satisfies_filter(record: &Record, filter: &FilterExpression, schema: &Schema) -> bool {
match filter {
FilterExpression::And(filters) => filters
Expand Down
62 changes: 55 additions & 7 deletions dozer-api/src/grpc/shared_impl/filter/tests.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use super::*;
use dozer_cache::cache::test_utils::schema_1;
use dozer_types::grpc_types::types::EventType;
use dozer_types::serde_json::json;

use super::*;

fn test_field_satisfies_op_impl(
field: value::Value,
operator: Operator,
Expand Down Expand Up @@ -804,7 +804,7 @@ fn test_op_satisfies_filter() {
let filter2 = FilterExpression::Simple("a".into(), Operator::EQ, json!(2));
let filter3 = FilterExpression::Simple("a".into(), Operator::EQ, json!(3));

let check = |typ, old: Option<&Record>, new: &Record, filter, expected| {
let check = |typ, old: Option<&Record>, new: &Record, filter, expected, event_type| {
assert_eq!(
op_satisfies_filter(
&Operation {
Expand All @@ -813,36 +813,84 @@ fn test_op_satisfies_filter() {
new: Some(new.clone()),
endpoint: "".into()
},
event_type,
filter,
&schema
),
expected
);
};

check(OperationType::Insert, None, &new, Some(&filter1), false);
check(OperationType::Insert, None, &new, Some(&filter2), true);
check(OperationType::Delete, None, &new, Some(&filter1), false);
check(OperationType::Delete, None, &new, Some(&filter2), true);
check(
OperationType::Insert,
None,
&new,
Some(&filter1),
false,
EventType::DeleteOnly,
);
check(
OperationType::Insert,
None,
&new,
Some(&filter2),
true,
EventType::InsertOnly,
);
check(
OperationType::Insert,
None,
&new,
Some(&filter2),
true,
EventType::All,
);
check(
OperationType::Insert,
None,
&new,
Some(&filter1),
false,
EventType::UpdateOnly,
);
check(
OperationType::Delete,
None,
&new,
Some(&filter1),
false,
EventType::UpdateOnly,
);
check(
OperationType::Delete,
None,
&new,
Some(&filter2),
true,
EventType::DeleteOnly,
);
check(
OperationType::Update,
Some(&old),
&new,
Some(&filter1),
true,
EventType::UpdateOnly,
);
check(
OperationType::Update,
Some(&old),
&new,
Some(&filter2),
true,
EventType::UpdateOnly,
);
check(
OperationType::Update,
Some(&old),
&new,
Some(&filter3),
false,
EventType::UpdateOnly,
);
}
25 changes: 23 additions & 2 deletions dozer-api/src/grpc/shared_impl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use tokio::sync::broadcast::error::RecvError;
use tokio::sync::broadcast::Receiver;
use tokio_stream::wrappers::ReceiverStream;

use dozer_types::grpc_types::types::EventType;

use crate::api_helper::{get_records, get_records_count};
use crate::auth::Access;

Expand Down Expand Up @@ -68,10 +70,23 @@ pub fn query(
pub struct EndpointFilter {
schema: Schema,
filter: Option<FilterExpression>,
event_type: EventType,
}

impl EndpointFilter {
pub fn new(schema: Schema, filter: Option<&str>) -> Result<Self, Status> {
fn convert_event_type(event_type: i32) -> EventType {
if event_type == 1 {
EventType::InsertOnly
} else if event_type == 2 {
EventType::UpdateOnly
} else if event_type == 3 {
EventType::DeleteOnly
} else {
EventType::All
}
}

pub fn new(schema: Schema, event_type: i32, filter: Option<&str>) -> Result<Self, Status> {
let filter = filter
.and_then(|filter| {
if filter.is_empty() {
Expand All @@ -82,7 +97,12 @@ impl EndpointFilter {
})
.transpose()
.map_err(from_error)?;
Ok(Self { schema, filter })
let event_type = EndpointFilter::convert_event_type(event_type);
Ok(Self {
schema,
filter,
event_type,
})
}
}

Expand Down Expand Up @@ -115,6 +135,7 @@ pub fn on_event<T: Send + 'static>(
if let Some(filter) = endpoints.get(&op.endpoint) {
if filter::op_satisfies_filter(
&op,
filter.event_type,
filter.filter.as_ref(),
&filter.schema,
) && (tx.send(event_mapper(op)).await).is_err()
Expand Down
14 changes: 13 additions & 1 deletion dozer-api/src/grpc/typed/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::{
CacheEndpoint,
};
use dozer_cache::CacheReader;
use dozer_types::grpc_types::types::EventType;
use dozer_types::tonic::{
self,
codegen::{
Expand Down Expand Up @@ -374,7 +375,18 @@ fn on_event(
.ok_or_else(|| Status::new(Code::InvalidArgument, "filter must be a string"))
})
.transpose()?;
let filter = EndpointFilter::new(schema, filter)?;

let event_type = query_request.get_field_by_name("type");
let event_type = event_type.as_ref();

let event_type = event_type
.map(|event_type| {
event_type
.as_enum_number()
.ok_or_else(|| Status::new(Code::InvalidArgument, "type must be an EventType"))
})
.transpose()?;
let filter = EndpointFilter::new(schema, event_type.unwrap_or(EventType::All.into()), filter)?;

shared_impl::on_event(
[(table_name, filter)].into_iter().collect(),
Expand Down

0 comments on commit 58c85e3

Please sign in to comment.