Skip to content

Commit

Permalink
MOSIP-23678 : added kafka support for SecurezoneNotificationStage (#1685
Browse files Browse the repository at this point in the history
)
  • Loading branch information
MonobikashDas authored Apr 14, 2023
1 parent 07256e5 commit 3500c8f
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class SecurezoneNotificationStage extends MosipVerticleAPIManager {
/**
* The reg proc logger.
*/
private static Logger regProcLogger = RegProcessorLogger.getLogger(SecurezoneNotificationStage.class);
private static final Logger regProcLogger = RegProcessorLogger.getLogger(SecurezoneNotificationStage.class);

/**
* The cluster url.
Expand Down Expand Up @@ -157,15 +157,9 @@ public void processURL(RoutingContext ctx) {
regProcLogger.debug(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), "",
"SecurezoneNotificationStage::processURL()::entry");

InternalRegistrationStatusDto registrationStatusDto = new InternalRegistrationStatusDto();
MessageDTO messageDTO = new MessageDTO();
TrimExceptionMessage trimMessage = new TrimExceptionMessage();
LogDescription description = new LogDescription();
boolean isTransactionSuccessful = false;

try {
JsonObject obj = ctx.getBodyAsJson();

messageDTO.setMessageBusAddress(MessageBusAddress.SECUREZONE_NOTIFICATION_IN);
messageDTO.setInternalError(Boolean.FALSE);
messageDTO.setRid(obj.getString("rid"));
Expand All @@ -174,7 +168,54 @@ public void processURL(RoutingContext ctx) {
messageDTO.setSource(obj.getString("source"));
messageDTO.setIteration(obj.getInteger("iteration"));
messageDTO.setWorkflowInstanceId(obj.getString("workflowInstanceId"));
MessageDTO result = process(messageDTO);
if (result.getIsValid()) {
sendMessage(result);
this.setResponse(ctx,
"Packet with registrationId '" + result.getRid() + "' has been forwarded to next stage");

regProcLogger.info(obj.getString("rid"),
"Packet with registrationId '" + result.getRid() + "' has been forwarded to next stage",
null, null);
} else {
this.setResponse(ctx, "Packet with registrationId '" + obj.getString("rid")
+ "' has not been uploaded to file System");

regProcLogger.info(obj.getString("rid"),
"Packet with registrationId '" + result.getRid() + "' has not been uploaded to file System",
null, null);
}
} catch (Exception e) {
ctx.fail(e);
}
}

/**
* This is for failure handler
*
* @param routingContext
*/
private void failure(RoutingContext routingContext) {
this.setResponse(routingContext, routingContext.failure().getMessage());
}

/**
* sends messageDTO to camel bridge.
*
* @param messageDTO the message DTO
*/
public void sendMessage(MessageDTO messageDTO) {
if (routingEnabled)
this.send(this.mosipEventBus, MessageBusAddress.SECUREZONE_NOTIFICATION_OUT, messageDTO);
}

@Override
public MessageDTO process(MessageDTO messageDTO) {
InternalRegistrationStatusDto registrationStatusDto = new InternalRegistrationStatusDto();
TrimExceptionMessage trimMessage = new TrimExceptionMessage();
LogDescription description = new LogDescription();
boolean isTransactionSuccessful = false;
try {
registrationStatusDto = registrationStatusService.getRegistrationStatus(messageDTO.getRid(),
messageDTO.getReg_type(), messageDTO.getIteration(), messageDTO.getWorkflowInstanceId());

Expand Down Expand Up @@ -227,23 +268,6 @@ public void processURL(RoutingContext ctx) {
LoggerFileConstant.REGISTRATIONID.toString(), messageDTO.getRid(),
"Transaction failed. RID not found in registration table.");
}

if (messageDTO.getIsValid()) {
sendMessage(messageDTO);
this.setResponse(ctx,
"Packet with registrationId '" + messageDTO.getRid() + "' has been forwarded to next stage");

regProcLogger.info(obj.getString("rid"),
"Packet with registrationId '" + messageDTO.getRid() + "' has been forwarded to next stage",
null, null);
} else {
this.setResponse(ctx, "Packet with registrationId '" + obj.getString("rid")
+ "' has not been uploaded to file System");

regProcLogger.info(obj.getString("rid"),
"Packet with registrationId '" + messageDTO.getRid() + "' has not been uploaded to file System",
null, null);
}
} catch (TablenotAccessibleException e) {
registrationStatusDto.setStatusCode(RegistrationStatusCode.PROCESSING.toString());
registrationStatusDto.setStatusComment(
Expand All @@ -259,16 +283,14 @@ public void processURL(RoutingContext ctx) {
+ ExceptionUtils.getStackTrace(e));
messageDTO.setInternalError(Boolean.TRUE);
messageDTO.setRid(registrationStatusDto.getRegistrationId());
ctx.fail(e);
} catch (Exception e) {
regProcLogger.error(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.APPLICATIONID.toString(),
ctx.getBodyAsString(), ExceptionUtils.getStackTrace(e));
messageDTO.toString(), ExceptionUtils.getStackTrace(e));
messageDTO.setIsValid(Boolean.FALSE);
description.setCode(PlatformErrorMessages.RPR_SECUREZONE_FAILURE.getCode());
description.setMessage(PlatformErrorMessages.RPR_SECUREZONE_FAILURE.getMessage());
ctx.fail(e);
} finally {
if (messageDTO.getInternalError()) {
if (messageDTO.getInternalError() != null && messageDTO.getInternalError()) {
registrationStatusDto.setUpdatedBy(USER);
int retryCount = registrationStatusDto.getRetryCount() != null
? registrationStatusDto.getRetryCount() + 1
Expand All @@ -291,30 +313,7 @@ public void processURL(RoutingContext ctx) {
auditLogRequestBuilder.createAuditRequestBuilder(description.getMessage(), eventId, eventName, eventType,
moduleId, moduleName, messageDTO.getRid());
}
}

/**
* This is for failure handler
*
* @param routingContext
*/
private void failure(RoutingContext routingContext) {
this.setResponse(routingContext, routingContext.failure().getMessage());
}

/**
* sends messageDTO to camel bridge.
*
* @param messageDTO the message DTO
*/
public void sendMessage(MessageDTO messageDTO) {
if (routingEnabled)
this.send(this.mosipEventBus, MessageBusAddress.SECUREZONE_NOTIFICATION_OUT, messageDTO);
}

@Override
public MessageDTO process(MessageDTO object) {
return null;
return messageDTO;
}

private boolean isDuplicatePacketForSameReqId(MessageDTO messageDTO) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package io.mosip.registration.processor.securezone.notification.stage;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;

Expand Down Expand Up @@ -63,25 +61,27 @@
@RunWith(SpringRunner.class)
public class SecurezoneNotificationStageTest {

private static final int maxRetryCount = 5;
private static final int maxRetryCount = 5;

private static final InputStream stream = Mockito.mock(InputStream.class);
private static final InputStream stream = Mockito.mock(InputStream.class);

/** The registration status service. */
@Mock
RegistrationStatusService<String, InternalRegistrationStatusDto, RegistrationStatusDto> registrationStatusService;

@Mock
MessageDTO messageDTO= new MessageDTO();

/** The registration status service. */
@Mock
RegistrationStatusService<String, InternalRegistrationStatusDto, RegistrationStatusDto> registrationStatusService;

@Mock
private SyncRegistrationService<SyncResponseDto, SyncRegistrationDto> syncRegistrationService;

@Mock
private RegistrationExceptionMapperUtil registrationStatusMapperUtil;
@Mock
private RegistrationExceptionMapperUtil registrationStatusMapperUtil;

@Mock
private AuditLogRequestBuilder auditLogRequestBuilder;
@Mock
private AuditLogRequestBuilder auditLogRequestBuilder;

private RoutingContext ctx;
private Boolean responseObject;
private RoutingContext ctx;
private Boolean responseObject;

@Mock
private MosipRouter router;
Expand Down Expand Up @@ -422,24 +422,30 @@ public void setup() {
Mockito.when(router.post(Mockito.any())).thenReturn(null);
Mockito.doNothing().when(router).setRoute(Mockito.any());
Mockito.doNothing().when(router).nonSecureHandler(Mockito.any(),Mockito.any());
MessageDTO messageDTO= new MessageDTO();
messageDTO.setInternalError(Boolean.FALSE);
messageDTO.setIsValid(Boolean.TRUE);
messageDTO.setRid("2018701130000410092018110735");
messageDTO.setIsValid(true);
messageDTO.setInternalError(false);
messageDTO.setReg_type("NEW");
messageDTO.setSource("REGISTRATIONCLIENT");
messageDTO.setIteration(1);
messageDTO.setWorkflowInstanceId("78fc3d34-03f5-11ec-9a03-0242ac130003");

Mockito.doNothing().when(registrationStatusService).updateRegistrationStatus(any(),any(),any());
Mockito.doReturn(responseWrapper).when(auditLogRequestBuilder).createAuditRequestBuilder(anyString(), anyString(), anyString(),
anyString(), anyString(), anyString(), anyString());
entities.add(entity);
Mockito.when(registrationStatusMapperUtil.getStatusCode(any())).thenReturn("Something");
Mockito.when(syncRegistrationService.findByWorkflowInstanceId(anyString())).thenReturn(entity);
Mockito.when(syncRegistrationService.findByAdditionalInfoReqId(anyString())).thenReturn(entities);
Mockito.when(registrationStatusService.getRegistrationStatus(anyString(), any(), any(), any()))
.thenReturn(registrationStatusDto);
}

@Test
public void processURLTest() {
entities.add(entity);
Mockito.when(syncRegistrationService.findByWorkflowInstanceId(anyString())).thenReturn(entity);
Mockito.when(syncRegistrationService.findByAdditionalInfoReqId(anyString())).thenReturn(entities);
Mockito.when(registrationStatusService.getRegistrationStatus(anyString(), any(), any(), any()))
.thenReturn(registrationStatusDto);

notificationStage.processURL(ctx);

assertTrue(responseObject);
}

Expand All @@ -454,17 +460,17 @@ public void ridNotFoundTest() {
@Test
public void duplicateRidFoundTest() {

SyncRegistrationEntity syncEntity = new SyncRegistrationEntity();
syncEntity.setAdditionalInfoReqId(null);
syncEntity.setRegistrationType("NEW");
syncEntity.setPacketId("2018701130000410092018110735");
syncEntity.setWorkflowInstanceId("78fc3d34-03f5-11ec-9a03-0242ac130004");
SyncRegistrationEntity syncEntity = new SyncRegistrationEntity();
syncEntity.setAdditionalInfoReqId(null);
syncEntity.setRegistrationType("NEW");
syncEntity.setPacketId("2018701130000410092018110735");
syncEntity.setWorkflowInstanceId("78fc3d34-03f5-11ec-9a03-0242ac130004");
entities.add(syncEntity);
entities.add(entity1);

Mockito.when(registrationStatusService.getRegistrationStatus(anyString(), any(), any(), any()))
.thenReturn(registrationStatusDto).thenReturn(registrationStatusDto).thenReturn(registrationStatusDto1);
Mockito.when(syncRegistrationService.findByWorkflowInstanceId(anyString())).thenReturn(syncEntity);
.thenReturn(registrationStatusDto).thenReturn(registrationStatusDto).thenReturn(registrationStatusDto1);
Mockito.when(syncRegistrationService.findByWorkflowInstanceId(anyString())).thenReturn(syncEntity);
Mockito.when(syncRegistrationService.findByRegistrationId(any())).thenReturn(entities);
notificationStage.processURL(ctx);
assertTrue(responseObject);
Expand All @@ -473,16 +479,16 @@ public void duplicateRidFoundTest() {
@Test
public void duplicateAdditionalReqIdFoundTest() {

SyncRegistrationEntity syncEntity = new SyncRegistrationEntity();
syncEntity.setAdditionalInfoReqId("abc");
syncEntity.setPacketId("2018701130000410092018110735");
syncEntity.setWorkflowInstanceId("78fc3d34-03f5-11ec-9a03-0242ac130004");
SyncRegistrationEntity syncEntity = new SyncRegistrationEntity();
syncEntity.setAdditionalInfoReqId("abc");
syncEntity.setPacketId("2018701130000410092018110735");
syncEntity.setWorkflowInstanceId("78fc3d34-03f5-11ec-9a03-0242ac130004");
entities.add(syncEntity);
entities.add(entity);

Mockito.when(registrationStatusService.getRegistrationStatus(anyString(), any(), any(), any()))
.thenReturn(registrationStatusDto).thenReturn(registrationStatusDto).thenReturn(registrationStatusDto1);
Mockito.when(syncRegistrationService.findByWorkflowInstanceId(anyString())).thenReturn(entity);
.thenReturn(registrationStatusDto).thenReturn(registrationStatusDto).thenReturn(registrationStatusDto1);
Mockito.when(syncRegistrationService.findByWorkflowInstanceId(anyString())).thenReturn(entity);
Mockito.when(syncRegistrationService.findByAdditionalInfoReqId(anyString())).thenReturn(entities);
notificationStage.processURL(ctx);
assertTrue(responseObject);
Expand All @@ -502,19 +508,31 @@ public void processTest() {
inputDto.setInternalError(Boolean.FALSE);
inputDto.setIsValid(Boolean.TRUE);
inputDto.setRid("2018701130000410092018110735");
inputDto.setWorkflowInstanceId("78fc3d34-03f5-11ec-9a03-0242ac130003");

Mockito.when(syncRegistrationService.findByWorkflowInstanceId(anyString())).thenReturn(entity);
Mockito.when(syncRegistrationService.findByAdditionalInfoReqId(anyString())).thenReturn(entities);
MessageDTO messageDTO = notificationStage.process(inputDto);
assertNull(messageDTO);
assertTrue(messageDTO.getIsValid());
}

@Test
public void dbExceptionTest() {
Mockito.when(registrationStatusService.getRegistrationStatus(anyString(), any(), any(), any())).thenThrow(new TablenotAccessibleException("exception"));
Mockito.when(registrationStatusMapperUtil.getStatusCode(RegistrationExceptionTypeCode.DATA_ACCESS_EXCEPTION))
.thenReturn("REPROCESS");
notificationStage.processURL(ctx);
assertNull(responseObject);
.thenReturn("REPROCESS");
MessageDTO result = notificationStage.process(messageDTO);
assertTrue(result.getInternalError());
}


@Test
public void genericExceptionTest() {
Mockito.when(syncRegistrationService
.findByWorkflowInstanceId(anyString())).thenThrow(new NullPointerException("exception"));
Mockito.when(registrationStatusMapperUtil.getStatusCode(RegistrationExceptionTypeCode.DATA_ACCESS_EXCEPTION))
.thenReturn("REPROCESS");
MessageDTO result = notificationStage.process(messageDTO);
assertFalse(result.getIsValid());
}
}

0 comments on commit 3500c8f

Please sign in to comment.