From 3500c8facc107f39012ec4210bff053e2aa7d527 Mon Sep 17 00:00:00 2001 From: Monobikash Das <43202165+MonobikashDas@users.noreply.github.com> Date: Fri, 14 Apr 2023 13:14:53 +0530 Subject: [PATCH] MOSIP-23678 : added kafka support for SecurezoneNotificationStage (#1685) --- .../stage/SecurezoneNotificationStage.java | 103 +++++++++--------- .../SecurezoneNotificationStageTest.java | 100 ++++++++++------- 2 files changed, 110 insertions(+), 93 deletions(-) diff --git a/registration-processor/pre-processor/registration-processor-securezone-notification-stage/src/main/java/io/mosip/registration/processor/securezone/notification/stage/SecurezoneNotificationStage.java b/registration-processor/pre-processor/registration-processor-securezone-notification-stage/src/main/java/io/mosip/registration/processor/securezone/notification/stage/SecurezoneNotificationStage.java index e7a12e896a4..7059daa8f37 100644 --- a/registration-processor/pre-processor/registration-processor-securezone-notification-stage/src/main/java/io/mosip/registration/processor/securezone/notification/stage/SecurezoneNotificationStage.java +++ b/registration-processor/pre-processor/registration-processor-securezone-notification-stage/src/main/java/io/mosip/registration/processor/securezone/notification/stage/SecurezoneNotificationStage.java @@ -61,7 +61,7 @@ public class SecurezoneNotificationStage extends MosipVerticleAPIManager { /** * The reg proc logger. */ - private static Logger regProcLogger = RegProcessorLogger.getLogger(SecurezoneNotificationStage.class); + private static final Logger regProcLogger = RegProcessorLogger.getLogger(SecurezoneNotificationStage.class); /** * The cluster url. @@ -157,15 +157,9 @@ public void processURL(RoutingContext ctx) { regProcLogger.debug(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), "", "SecurezoneNotificationStage::processURL()::entry"); - InternalRegistrationStatusDto registrationStatusDto = new InternalRegistrationStatusDto(); MessageDTO messageDTO = new MessageDTO(); - TrimExceptionMessage trimMessage = new TrimExceptionMessage(); - LogDescription description = new LogDescription(); - boolean isTransactionSuccessful = false; - try { JsonObject obj = ctx.getBodyAsJson(); - messageDTO.setMessageBusAddress(MessageBusAddress.SECUREZONE_NOTIFICATION_IN); messageDTO.setInternalError(Boolean.FALSE); messageDTO.setRid(obj.getString("rid")); @@ -174,7 +168,54 @@ public void processURL(RoutingContext ctx) { messageDTO.setSource(obj.getString("source")); messageDTO.setIteration(obj.getInteger("iteration")); messageDTO.setWorkflowInstanceId(obj.getString("workflowInstanceId")); + MessageDTO result = process(messageDTO); + if (result.getIsValid()) { + sendMessage(result); + this.setResponse(ctx, + "Packet with registrationId '" + result.getRid() + "' has been forwarded to next stage"); + + regProcLogger.info(obj.getString("rid"), + "Packet with registrationId '" + result.getRid() + "' has been forwarded to next stage", + null, null); + } else { + this.setResponse(ctx, "Packet with registrationId '" + obj.getString("rid") + + "' has not been uploaded to file System"); + + regProcLogger.info(obj.getString("rid"), + "Packet with registrationId '" + result.getRid() + "' has not been uploaded to file System", + null, null); + } + } catch (Exception e) { + ctx.fail(e); + } + } + /** + * This is for failure handler + * + * @param routingContext + */ + private void failure(RoutingContext routingContext) { + this.setResponse(routingContext, routingContext.failure().getMessage()); + } + + /** + * sends messageDTO to camel bridge. + * + * @param messageDTO the message DTO + */ + public void sendMessage(MessageDTO messageDTO) { + if (routingEnabled) + this.send(this.mosipEventBus, MessageBusAddress.SECUREZONE_NOTIFICATION_OUT, messageDTO); + } + + @Override + public MessageDTO process(MessageDTO messageDTO) { + InternalRegistrationStatusDto registrationStatusDto = new InternalRegistrationStatusDto(); + TrimExceptionMessage trimMessage = new TrimExceptionMessage(); + LogDescription description = new LogDescription(); + boolean isTransactionSuccessful = false; + try { registrationStatusDto = registrationStatusService.getRegistrationStatus(messageDTO.getRid(), messageDTO.getReg_type(), messageDTO.getIteration(), messageDTO.getWorkflowInstanceId()); @@ -227,23 +268,6 @@ public void processURL(RoutingContext ctx) { LoggerFileConstant.REGISTRATIONID.toString(), messageDTO.getRid(), "Transaction failed. RID not found in registration table."); } - - if (messageDTO.getIsValid()) { - sendMessage(messageDTO); - this.setResponse(ctx, - "Packet with registrationId '" + messageDTO.getRid() + "' has been forwarded to next stage"); - - regProcLogger.info(obj.getString("rid"), - "Packet with registrationId '" + messageDTO.getRid() + "' has been forwarded to next stage", - null, null); - } else { - this.setResponse(ctx, "Packet with registrationId '" + obj.getString("rid") - + "' has not been uploaded to file System"); - - regProcLogger.info(obj.getString("rid"), - "Packet with registrationId '" + messageDTO.getRid() + "' has not been uploaded to file System", - null, null); - } } catch (TablenotAccessibleException e) { registrationStatusDto.setStatusCode(RegistrationStatusCode.PROCESSING.toString()); registrationStatusDto.setStatusComment( @@ -259,16 +283,14 @@ public void processURL(RoutingContext ctx) { + ExceptionUtils.getStackTrace(e)); messageDTO.setInternalError(Boolean.TRUE); messageDTO.setRid(registrationStatusDto.getRegistrationId()); - ctx.fail(e); } catch (Exception e) { regProcLogger.error(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.APPLICATIONID.toString(), - ctx.getBodyAsString(), ExceptionUtils.getStackTrace(e)); + messageDTO.toString(), ExceptionUtils.getStackTrace(e)); messageDTO.setIsValid(Boolean.FALSE); description.setCode(PlatformErrorMessages.RPR_SECUREZONE_FAILURE.getCode()); description.setMessage(PlatformErrorMessages.RPR_SECUREZONE_FAILURE.getMessage()); - ctx.fail(e); } finally { - if (messageDTO.getInternalError()) { + if (messageDTO.getInternalError() != null && messageDTO.getInternalError()) { registrationStatusDto.setUpdatedBy(USER); int retryCount = registrationStatusDto.getRetryCount() != null ? registrationStatusDto.getRetryCount() + 1 @@ -291,30 +313,7 @@ public void processURL(RoutingContext ctx) { auditLogRequestBuilder.createAuditRequestBuilder(description.getMessage(), eventId, eventName, eventType, moduleId, moduleName, messageDTO.getRid()); } - } - - /** - * This is for failure handler - * - * @param routingContext - */ - private void failure(RoutingContext routingContext) { - this.setResponse(routingContext, routingContext.failure().getMessage()); - } - - /** - * sends messageDTO to camel bridge. - * - * @param messageDTO the message DTO - */ - public void sendMessage(MessageDTO messageDTO) { - if (routingEnabled) - this.send(this.mosipEventBus, MessageBusAddress.SECUREZONE_NOTIFICATION_OUT, messageDTO); - } - - @Override - public MessageDTO process(MessageDTO object) { - return null; + return messageDTO; } private boolean isDuplicatePacketForSameReqId(MessageDTO messageDTO) { diff --git a/registration-processor/pre-processor/registration-processor-securezone-notification-stage/src/test/java/io/mosip/registration/processor/securezone/notification/stage/SecurezoneNotificationStageTest.java b/registration-processor/pre-processor/registration-processor-securezone-notification-stage/src/test/java/io/mosip/registration/processor/securezone/notification/stage/SecurezoneNotificationStageTest.java index 78b981ec418..86171c89847 100644 --- a/registration-processor/pre-processor/registration-processor-securezone-notification-stage/src/test/java/io/mosip/registration/processor/securezone/notification/stage/SecurezoneNotificationStageTest.java +++ b/registration-processor/pre-processor/registration-processor-securezone-notification-stage/src/test/java/io/mosip/registration/processor/securezone/notification/stage/SecurezoneNotificationStageTest.java @@ -1,8 +1,6 @@ package io.mosip.registration.processor.securezone.notification.stage; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; @@ -63,25 +61,27 @@ @RunWith(SpringRunner.class) public class SecurezoneNotificationStageTest { - private static final int maxRetryCount = 5; + private static final int maxRetryCount = 5; - private static final InputStream stream = Mockito.mock(InputStream.class); + private static final InputStream stream = Mockito.mock(InputStream.class); - /** The registration status service. */ - @Mock - RegistrationStatusService registrationStatusService; - - @Mock + MessageDTO messageDTO= new MessageDTO(); + + /** The registration status service. */ + @Mock + RegistrationStatusService registrationStatusService; + + @Mock private SyncRegistrationService syncRegistrationService; - @Mock - private RegistrationExceptionMapperUtil registrationStatusMapperUtil; + @Mock + private RegistrationExceptionMapperUtil registrationStatusMapperUtil; - @Mock - private AuditLogRequestBuilder auditLogRequestBuilder; + @Mock + private AuditLogRequestBuilder auditLogRequestBuilder; - private RoutingContext ctx; - private Boolean responseObject; + private RoutingContext ctx; + private Boolean responseObject; @Mock private MosipRouter router; @@ -422,24 +422,30 @@ public void setup() { Mockito.when(router.post(Mockito.any())).thenReturn(null); Mockito.doNothing().when(router).setRoute(Mockito.any()); Mockito.doNothing().when(router).nonSecureHandler(Mockito.any(),Mockito.any()); - MessageDTO messageDTO= new MessageDTO(); - messageDTO.setInternalError(Boolean.FALSE); - messageDTO.setIsValid(Boolean.TRUE); messageDTO.setRid("2018701130000410092018110735"); + messageDTO.setIsValid(true); + messageDTO.setInternalError(false); + messageDTO.setReg_type("NEW"); + messageDTO.setSource("REGISTRATIONCLIENT"); + messageDTO.setIteration(1); + messageDTO.setWorkflowInstanceId("78fc3d34-03f5-11ec-9a03-0242ac130003"); + Mockito.doNothing().when(registrationStatusService).updateRegistrationStatus(any(),any(),any()); Mockito.doReturn(responseWrapper).when(auditLogRequestBuilder).createAuditRequestBuilder(anyString(), anyString(), anyString(), anyString(), anyString(), anyString(), anyString()); + entities.add(entity); Mockito.when(registrationStatusMapperUtil.getStatusCode(any())).thenReturn("Something"); + Mockito.when(syncRegistrationService.findByWorkflowInstanceId(anyString())).thenReturn(entity); + Mockito.when(syncRegistrationService.findByAdditionalInfoReqId(anyString())).thenReturn(entities); + Mockito.when(registrationStatusService.getRegistrationStatus(anyString(), any(), any(), any())) + .thenReturn(registrationStatusDto); } @Test public void processURLTest() { - entities.add(entity); - Mockito.when(syncRegistrationService.findByWorkflowInstanceId(anyString())).thenReturn(entity); - Mockito.when(syncRegistrationService.findByAdditionalInfoReqId(anyString())).thenReturn(entities); - Mockito.when(registrationStatusService.getRegistrationStatus(anyString(), any(), any(), any())) - .thenReturn(registrationStatusDto); + notificationStage.processURL(ctx); + assertTrue(responseObject); } @@ -454,17 +460,17 @@ public void ridNotFoundTest() { @Test public void duplicateRidFoundTest() { - SyncRegistrationEntity syncEntity = new SyncRegistrationEntity(); - syncEntity.setAdditionalInfoReqId(null); - syncEntity.setRegistrationType("NEW"); - syncEntity.setPacketId("2018701130000410092018110735"); - syncEntity.setWorkflowInstanceId("78fc3d34-03f5-11ec-9a03-0242ac130004"); + SyncRegistrationEntity syncEntity = new SyncRegistrationEntity(); + syncEntity.setAdditionalInfoReqId(null); + syncEntity.setRegistrationType("NEW"); + syncEntity.setPacketId("2018701130000410092018110735"); + syncEntity.setWorkflowInstanceId("78fc3d34-03f5-11ec-9a03-0242ac130004"); entities.add(syncEntity); entities.add(entity1); Mockito.when(registrationStatusService.getRegistrationStatus(anyString(), any(), any(), any())) - .thenReturn(registrationStatusDto).thenReturn(registrationStatusDto).thenReturn(registrationStatusDto1); - Mockito.when(syncRegistrationService.findByWorkflowInstanceId(anyString())).thenReturn(syncEntity); + .thenReturn(registrationStatusDto).thenReturn(registrationStatusDto).thenReturn(registrationStatusDto1); + Mockito.when(syncRegistrationService.findByWorkflowInstanceId(anyString())).thenReturn(syncEntity); Mockito.when(syncRegistrationService.findByRegistrationId(any())).thenReturn(entities); notificationStage.processURL(ctx); assertTrue(responseObject); @@ -473,16 +479,16 @@ public void duplicateRidFoundTest() { @Test public void duplicateAdditionalReqIdFoundTest() { - SyncRegistrationEntity syncEntity = new SyncRegistrationEntity(); - syncEntity.setAdditionalInfoReqId("abc"); - syncEntity.setPacketId("2018701130000410092018110735"); - syncEntity.setWorkflowInstanceId("78fc3d34-03f5-11ec-9a03-0242ac130004"); + SyncRegistrationEntity syncEntity = new SyncRegistrationEntity(); + syncEntity.setAdditionalInfoReqId("abc"); + syncEntity.setPacketId("2018701130000410092018110735"); + syncEntity.setWorkflowInstanceId("78fc3d34-03f5-11ec-9a03-0242ac130004"); entities.add(syncEntity); entities.add(entity); Mockito.when(registrationStatusService.getRegistrationStatus(anyString(), any(), any(), any())) - .thenReturn(registrationStatusDto).thenReturn(registrationStatusDto).thenReturn(registrationStatusDto1); - Mockito.when(syncRegistrationService.findByWorkflowInstanceId(anyString())).thenReturn(entity); + .thenReturn(registrationStatusDto).thenReturn(registrationStatusDto).thenReturn(registrationStatusDto1); + Mockito.when(syncRegistrationService.findByWorkflowInstanceId(anyString())).thenReturn(entity); Mockito.when(syncRegistrationService.findByAdditionalInfoReqId(anyString())).thenReturn(entities); notificationStage.processURL(ctx); assertTrue(responseObject); @@ -502,19 +508,31 @@ public void processTest() { inputDto.setInternalError(Boolean.FALSE); inputDto.setIsValid(Boolean.TRUE); inputDto.setRid("2018701130000410092018110735"); + inputDto.setWorkflowInstanceId("78fc3d34-03f5-11ec-9a03-0242ac130003"); Mockito.when(syncRegistrationService.findByWorkflowInstanceId(anyString())).thenReturn(entity); Mockito.when(syncRegistrationService.findByAdditionalInfoReqId(anyString())).thenReturn(entities); MessageDTO messageDTO = notificationStage.process(inputDto); - assertNull(messageDTO); + assertTrue(messageDTO.getIsValid()); } @Test public void dbExceptionTest() { Mockito.when(registrationStatusService.getRegistrationStatus(anyString(), any(), any(), any())).thenThrow(new TablenotAccessibleException("exception")); Mockito.when(registrationStatusMapperUtil.getStatusCode(RegistrationExceptionTypeCode.DATA_ACCESS_EXCEPTION)) - .thenReturn("REPROCESS"); - notificationStage.processURL(ctx); - assertNull(responseObject); + .thenReturn("REPROCESS"); + MessageDTO result = notificationStage.process(messageDTO); + assertTrue(result.getInternalError()); + } + + + @Test + public void genericExceptionTest() { + Mockito.when(syncRegistrationService + .findByWorkflowInstanceId(anyString())).thenThrow(new NullPointerException("exception")); + Mockito.when(registrationStatusMapperUtil.getStatusCode(RegistrationExceptionTypeCode.DATA_ACCESS_EXCEPTION)) + .thenReturn("REPROCESS"); + MessageDTO result = notificationStage.process(messageDTO); + assertFalse(result.getIsValid()); } }