From 4a5f150e6e81356386c8c7c054d355fb59ed7c28 Mon Sep 17 00:00:00 2001 From: VG Date: Sun, 13 Nov 2022 22:42:13 +0800 Subject: [PATCH] fix: redirect events to the right endpoint --- dozer-api/src/grpc/services/common/helper.rs | 5 ++++- .../src/grpc/services/common/proto/api.proto | 1 + dozer-api/src/grpc/services/common/service.rs | 16 +++++++++------- dozer-api/src/grpc/services/on_delete.rs | 2 +- dozer-api/src/grpc/services/on_insert.rs | 4 +++- dozer-api/src/grpc/services/on_schema_change.rs | 2 +- dozer-api/src/grpc/services/on_update.rs | 2 +- dozer-api/src/grpc/tests/utils.rs | 9 ++++++--- dozer-api/src/grpc_server.rs | 2 +- dozer-orchestrator/src/pipeline/sinks.rs | 10 ++++++++-- dozer-types/src/events.rs | 4 ++-- 11 files changed, 37 insertions(+), 20 deletions(-) diff --git a/dozer-api/src/grpc/services/common/helper.rs b/dozer-api/src/grpc/services/common/helper.rs index eacd0a12dc..e1d9ccc4fa 100644 --- a/dozer-api/src/grpc/services/common/helper.rs +++ b/dozer-api/src/grpc/services/common/helper.rs @@ -4,22 +4,25 @@ use dozer_types::types::{Field, FieldType, Operation as DozerOperation, Record a use super::common_grpc::{Operation, OperationType, Record}; -pub fn map_operation(operation: &DozerOperation) -> Operation { +pub fn map_operation(endpoint_name: String, operation: &DozerOperation) -> Operation { match operation.to_owned() { DozerOperation::Delete { old } => Operation { typ: OperationType::Delete as i32, old: Some(map_record(old)), new: None, + endpoint_name, }, DozerOperation::Insert { new } => Operation { typ: OperationType::Insert as i32, old: None, new: Some(map_record(new)), + endpoint_name, }, DozerOperation::Update { old, new } => Operation { typ: OperationType::Insert as i32, old: Some(map_record(old)), new: Some(map_record(new)), + endpoint_name, }, } } diff --git a/dozer-api/src/grpc/services/common/proto/api.proto b/dozer-api/src/grpc/services/common/proto/api.proto index a3132664ec..b5aa31db36 100644 --- a/dozer-api/src/grpc/services/common/proto/api.proto +++ b/dozer-api/src/grpc/services/common/proto/api.proto @@ -49,6 +49,7 @@ message Operation { OperationType typ = 1; optional Record old = 2; Record new = 3; + string endpoint_name = 4; } message Record { repeated Value values = 1; } diff --git a/dozer-api/src/grpc/services/common/service.rs b/dozer-api/src/grpc/services/common/service.rs index 948a5d5669..2befe5e8f2 100644 --- a/dozer-api/src/grpc/services/common/service.rs +++ b/dozer-api/src/grpc/services/common/service.rs @@ -117,7 +117,7 @@ impl CommonGrpcService for ApiService { #[allow(non_camel_case_types)] type onEventStream = ResponseStream; async fn on_event(&self, request: Request) -> EventResult { - let _request = request.into_inner(); + let request = request.into_inner(); let (tx, rx) = tokio::sync::mpsc::channel(1); // create subscribe @@ -128,12 +128,14 @@ impl CommonGrpcService for ApiService { let receiver_event = broadcast_receiver.recv().await; match receiver_event { Ok(event) => { - if let ApiEvent::Operation(operation) = event { - let op = helper::map_operation(&operation); - if (tx.send(Ok(op)).await).is_err() { - warn!("on_insert_grpc_server_stream receiver drop"); - // receiver drop - break; + if let ApiEvent::Operation(endpoint_name, operation) = event { + if endpoint_name == request.endpoint { + let op = helper::map_operation(endpoint_name, &operation); + if (tx.send(Ok(op)).await).is_err() { + warn!("on_insert_grpc_server_stream receiver drop"); + // receiver drop + break; + } } } } diff --git a/dozer-api/src/grpc/services/on_delete.rs b/dozer-api/src/grpc/services/on_delete.rs index f72d5489e4..043f7309f3 100644 --- a/dozer-api/src/grpc/services/on_delete.rs +++ b/dozer-api/src/grpc/services/on_delete.rs @@ -41,7 +41,7 @@ async fn on_delete_grpc_server_stream( let mut broadcast_receiver = event_notifier.resubscribe(); tokio::spawn(async move { while let Ok(event) = broadcast_receiver.recv().await { - if let ApiEvent::Operation(Operation::Delete { old: record }) = event { + if let ApiEvent::Operation(_endpoint_name, Operation::Delete { old: record }) = event { let converted_record = api_helper.convert_record_to_json(record).unwrap(); let value_json = serde_json::to_value(converted_record) .map_err(GRPCError::SerizalizeError) diff --git a/dozer-api/src/grpc/services/on_insert.rs b/dozer-api/src/grpc/services/on_insert.rs index f28228cee0..10bb874244 100644 --- a/dozer-api/src/grpc/services/on_insert.rs +++ b/dozer-api/src/grpc/services/on_insert.rs @@ -45,7 +45,9 @@ async fn on_insert_grpc_server_stream( let receiver_event = broadcast_receiver.recv().await; match receiver_event { Ok(event) => { - if let ApiEvent::Operation(Operation::Insert { new: record }) = event { + if let ApiEvent::Operation(_endpoint_name, Operation::Insert { new: record }) = + event + { let converted_record = api_helper.convert_record_to_json(record).unwrap(); let value_json = serde_json::to_value(converted_record) .map_err(GRPCError::SerizalizeError) diff --git a/dozer-api/src/grpc/services/on_schema_change.rs b/dozer-api/src/grpc/services/on_schema_change.rs index ee38e1781b..cb7c2ac140 100644 --- a/dozer-api/src/grpc/services/on_schema_change.rs +++ b/dozer-api/src/grpc/services/on_schema_change.rs @@ -30,7 +30,7 @@ async fn on_schema_change_grpc_server_stream( let mut broadcast_receiver = event_notifier.resubscribe(); tokio::spawn(async move { while let Ok(event) = broadcast_receiver.recv().await { - if let ApiEvent::SchemaChange(schema) = event { + if let ApiEvent::SchemaChange(_endpoint_name, schema) = event { let value_json = serde_json::to_value(schema) .map_err(GRPCError::SerizalizeError) .unwrap(); diff --git a/dozer-api/src/grpc/services/on_update.rs b/dozer-api/src/grpc/services/on_update.rs index c4137706ec..531277f5ed 100644 --- a/dozer-api/src/grpc/services/on_update.rs +++ b/dozer-api/src/grpc/services/on_update.rs @@ -39,7 +39,7 @@ async fn on_update_grpc_server_stream( let mut broadcast_receiver = event_notifier.resubscribe(); tokio::spawn(async move { while let Ok(event) = broadcast_receiver.recv().await { - if let ApiEvent::Operation(Operation::Delete { old: record }) = event { + if let ApiEvent::Operation(_endpoint_name, Operation::Delete { old: record }) = event { let converted_record = api_helper.convert_record_to_json(record).unwrap(); let value_json = serde_json::to_value(converted_record) .map_err(GRPCError::SerizalizeError) diff --git a/dozer-api/src/grpc/tests/utils.rs b/dozer-api/src/grpc/tests/utils.rs index f60e7361af..98766cc5e5 100644 --- a/dozer-api/src/grpc/tests/utils.rs +++ b/dozer-api/src/grpc/tests/utils.rs @@ -47,9 +47,12 @@ pub fn mock_event_notifier() -> channel::Receiver { thread::sleep(time::Duration::from_millis(1000)); let record_json = json!({"schema_id":{"id":1811503150,"version":1},"values":[{"Int":1048},{"String":"Test33"},"Null",{"Int":2006}]}); - let fake_event = ApiEvent::Operation(dozer_types::types::Operation::Insert { - new: serde_json::from_value(record_json).unwrap(), - }); + let fake_event = ApiEvent::Operation( + "users".to_string(), + dozer_types::types::Operation::Insert { + new: serde_json::from_value(record_json).unwrap(), + }, + ); sender.try_send(fake_event).unwrap(); }); receiver diff --git a/dozer-api/src/grpc_server.rs b/dozer-api/src/grpc_server.rs index 2ebc46391d..3823a76a94 100644 --- a/dozer-api/src/grpc_server.rs +++ b/dozer-api/src/grpc_server.rs @@ -62,7 +62,7 @@ impl GRPCServer { // wait until all schemas are initalized while schemas.len() < pipeline_map.len() { let event = self.event_notifier.clone().recv(); - if let Ok(ApiEvent::SchemaChange(schema_change)) = event { + if let Ok(ApiEvent::SchemaChange(_endpoint_name, schema_change)) = event { let id = schema_change.get_id(); schemas.insert(id, schema_change); } diff --git a/dozer-orchestrator/src/pipeline/sinks.rs b/dozer-orchestrator/src/pipeline/sinks.rs index b9703f8db8..e530ec0762 100644 --- a/dozer-orchestrator/src/pipeline/sinks.rs +++ b/dozer-orchestrator/src/pipeline/sinks.rs @@ -120,7 +120,10 @@ impl Sink for CacheSink { if let Some(notifier) = &self.notifier { notifier - .try_send(ApiEvent::Operation(op.clone())) + .try_send(ApiEvent::Operation( + self.api_endpoint.name.to_owned(), + op.clone(), + )) .map_err(|e| ExecutionError::InternalError(Box::new(e)))?; } match op { @@ -182,7 +185,10 @@ impl Sink for CacheSink { if let Some(notifier) = &self.notifier { let res = notifier - .try_send(ApiEvent::SchemaChange(schema)) + .try_send(ApiEvent::SchemaChange( + self.api_endpoint.name.to_owned(), + schema, + )) .map_err(|e| { ExecutionError::SinkError(SinkError::SchemaNotificationFailed(Box::new(e))) }); diff --git a/dozer-types/src/events.rs b/dozer-types/src/events.rs index bd8d5f4d7b..5240406ceb 100644 --- a/dozer-types/src/events.rs +++ b/dozer-types/src/events.rs @@ -3,6 +3,6 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Serialize, Deserialize, Clone)] pub enum ApiEvent { - SchemaChange(Schema), - Operation(Operation), + SchemaChange(String, Schema), + Operation(String, Operation), }