Skip to content

Commit

Permalink
fix: redirect events to the right endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
v3g42 committed Nov 13, 2022
1 parent 90429bc commit 4a5f150
Show file tree
Hide file tree
Showing 11 changed files with 37 additions and 20 deletions.
5 changes: 4 additions & 1 deletion dozer-api/src/grpc/services/common/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,25 @@ use dozer_types::types::{Field, FieldType, Operation as DozerOperation, Record a

use super::common_grpc::{Operation, OperationType, Record};

pub fn map_operation(operation: &DozerOperation) -> Operation {
pub fn map_operation(endpoint_name: String, operation: &DozerOperation) -> Operation {
match operation.to_owned() {
DozerOperation::Delete { old } => Operation {
typ: OperationType::Delete as i32,
old: Some(map_record(old)),
new: None,
endpoint_name,
},
DozerOperation::Insert { new } => Operation {
typ: OperationType::Insert as i32,
old: None,
new: Some(map_record(new)),
endpoint_name,
},
DozerOperation::Update { old, new } => Operation {
typ: OperationType::Insert as i32,
old: Some(map_record(old)),
new: Some(map_record(new)),
endpoint_name,
},
}
}
Expand Down
1 change: 1 addition & 0 deletions dozer-api/src/grpc/services/common/proto/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ message Operation {
OperationType typ = 1;
optional Record old = 2;
Record new = 3;
string endpoint_name = 4;
}
message Record { repeated Value values = 1; }

Expand Down
16 changes: 9 additions & 7 deletions dozer-api/src/grpc/services/common/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl CommonGrpcService for ApiService {
#[allow(non_camel_case_types)]
type onEventStream = ResponseStream;
async fn on_event(&self, request: Request<OnEventRequest>) -> EventResult<Self::onEventStream> {
let _request = request.into_inner();
let request = request.into_inner();

let (tx, rx) = tokio::sync::mpsc::channel(1);
// create subscribe
Expand All @@ -128,12 +128,14 @@ impl CommonGrpcService for ApiService {
let receiver_event = broadcast_receiver.recv().await;
match receiver_event {
Ok(event) => {
if let ApiEvent::Operation(operation) = event {
let op = helper::map_operation(&operation);
if (tx.send(Ok(op)).await).is_err() {
warn!("on_insert_grpc_server_stream receiver drop");
// receiver drop
break;
if let ApiEvent::Operation(endpoint_name, operation) = event {
if endpoint_name == request.endpoint {
let op = helper::map_operation(endpoint_name, &operation);
if (tx.send(Ok(op)).await).is_err() {
warn!("on_insert_grpc_server_stream receiver drop");
// receiver drop
break;
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion dozer-api/src/grpc/services/on_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async fn on_delete_grpc_server_stream(
let mut broadcast_receiver = event_notifier.resubscribe();
tokio::spawn(async move {
while let Ok(event) = broadcast_receiver.recv().await {
if let ApiEvent::Operation(Operation::Delete { old: record }) = event {
if let ApiEvent::Operation(_endpoint_name, Operation::Delete { old: record }) = event {
let converted_record = api_helper.convert_record_to_json(record).unwrap();
let value_json = serde_json::to_value(converted_record)
.map_err(GRPCError::SerizalizeError)
Expand Down
4 changes: 3 additions & 1 deletion dozer-api/src/grpc/services/on_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ async fn on_insert_grpc_server_stream(
let receiver_event = broadcast_receiver.recv().await;
match receiver_event {
Ok(event) => {
if let ApiEvent::Operation(Operation::Insert { new: record }) = event {
if let ApiEvent::Operation(_endpoint_name, Operation::Insert { new: record }) =
event
{
let converted_record = api_helper.convert_record_to_json(record).unwrap();
let value_json = serde_json::to_value(converted_record)
.map_err(GRPCError::SerizalizeError)
Expand Down
2 changes: 1 addition & 1 deletion dozer-api/src/grpc/services/on_schema_change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async fn on_schema_change_grpc_server_stream(
let mut broadcast_receiver = event_notifier.resubscribe();
tokio::spawn(async move {
while let Ok(event) = broadcast_receiver.recv().await {
if let ApiEvent::SchemaChange(schema) = event {
if let ApiEvent::SchemaChange(_endpoint_name, schema) = event {
let value_json = serde_json::to_value(schema)
.map_err(GRPCError::SerizalizeError)
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion dozer-api/src/grpc/services/on_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async fn on_update_grpc_server_stream(
let mut broadcast_receiver = event_notifier.resubscribe();
tokio::spawn(async move {
while let Ok(event) = broadcast_receiver.recv().await {
if let ApiEvent::Operation(Operation::Delete { old: record }) = event {
if let ApiEvent::Operation(_endpoint_name, Operation::Delete { old: record }) = event {
let converted_record = api_helper.convert_record_to_json(record).unwrap();
let value_json = serde_json::to_value(converted_record)
.map_err(GRPCError::SerizalizeError)
Expand Down
9 changes: 6 additions & 3 deletions dozer-api/src/grpc/tests/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,12 @@ pub fn mock_event_notifier() -> channel::Receiver<ApiEvent> {
thread::sleep(time::Duration::from_millis(1000));
let record_json = json!({"schema_id":{"id":1811503150,"version":1},"values":[{"Int":1048},{"String":"Test33"},"Null",{"Int":2006}]});

let fake_event = ApiEvent::Operation(dozer_types::types::Operation::Insert {
new: serde_json::from_value(record_json).unwrap(),
});
let fake_event = ApiEvent::Operation(
"users".to_string(),
dozer_types::types::Operation::Insert {
new: serde_json::from_value(record_json).unwrap(),
},
);
sender.try_send(fake_event).unwrap();
});
receiver
Expand Down
2 changes: 1 addition & 1 deletion dozer-api/src/grpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl GRPCServer {
// wait until all schemas are initalized
while schemas.len() < pipeline_map.len() {
let event = self.event_notifier.clone().recv();
if let Ok(ApiEvent::SchemaChange(schema_change)) = event {
if let Ok(ApiEvent::SchemaChange(_endpoint_name, schema_change)) = event {
let id = schema_change.get_id();
schemas.insert(id, schema_change);
}
Expand Down
10 changes: 8 additions & 2 deletions dozer-orchestrator/src/pipeline/sinks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,10 @@ impl Sink for CacheSink {

if let Some(notifier) = &self.notifier {
notifier
.try_send(ApiEvent::Operation(op.clone()))
.try_send(ApiEvent::Operation(
self.api_endpoint.name.to_owned(),
op.clone(),
))
.map_err(|e| ExecutionError::InternalError(Box::new(e)))?;
}
match op {
Expand Down Expand Up @@ -182,7 +185,10 @@ impl Sink for CacheSink {

if let Some(notifier) = &self.notifier {
let res = notifier
.try_send(ApiEvent::SchemaChange(schema))
.try_send(ApiEvent::SchemaChange(
self.api_endpoint.name.to_owned(),
schema,
))
.map_err(|e| {
ExecutionError::SinkError(SinkError::SchemaNotificationFailed(Box::new(e)))
});
Expand Down
4 changes: 2 additions & 2 deletions dozer-types/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum ApiEvent {
SchemaChange(Schema),
Operation(Operation),
SchemaChange(String, Schema),
Operation(String, Operation),
}

0 comments on commit 4a5f150

Please sign in to comment.