From b0451fc333a9a27adbaff40b8deb6e4d0f810090 Mon Sep 17 00:00:00 2001 From: Michael Kalish Date: Wed, 23 Oct 2024 13:45:24 -0400 Subject: [PATCH] 16140: manually populate poison queue when a step encounters an error (#16258) * 16140: manually populate poison queue when a step encounters an error * fixup! 16140: manually populate poison queue when a step encounters an error --- .../src/main/kotlin/azure/QueueAccess.kt | 5 +- .../event/ReportStreamEventData.kt | 2 + .../kotlin/fhirengine/azure/FHIRFunctions.kt | 38 +++++- .../kotlin/fhirengine/engine/FHIRConverter.kt | 2 + .../engine/FHIRDestinationFilter.kt | 1 + .../kotlin/fhirengine/engine/FHIREngine.kt | 5 + .../kotlin/fhirengine/engine/FHIRReceiver.kt | 1 + .../fhirengine/engine/FHIRReceiverFilter.kt | 1 + .../fhirengine/engine/FHIRTranslator.kt | 2 + .../test/kotlin/SubmissionReceiverTests.kt | 12 +- .../test/kotlin/azure/FHIRFunctionsTests.kt | 119 ++++++++++++++++++ .../test/kotlin/azure/ReportFunctionTests.kt | 8 +- .../azure/FHIRConverterIntegrationTests.kt | 2 +- .../FHIRDestinationFilterIntegrationTests.kt | 2 +- .../FHIRReceiverFilterIntegrationTests.kt | 2 +- .../azure/FHIRReceiverIntegrationTests.kt | 2 +- .../azure/FHIRTranslatorIntegrationTests.kt | 2 +- 17 files changed, 183 insertions(+), 23 deletions(-) create mode 100644 prime-router/src/test/kotlin/azure/FHIRFunctionsTests.kt diff --git a/prime-router/src/main/kotlin/azure/QueueAccess.kt b/prime-router/src/main/kotlin/azure/QueueAccess.kt index 7591d7e0952..58795c46e9e 100644 --- a/prime-router/src/main/kotlin/azure/QueueAccess.kt +++ b/prime-router/src/main/kotlin/azure/QueueAccess.kt @@ -56,7 +56,7 @@ object QueueAccess { queueName: String, message: String, invisibleDuration: Duration = Duration.ZERO, - ) { + ): String { // Bug: event.at is calculated before the call to workflowengine.recordHistory // In cases of very large datasets, that db write can take a very long time, pushing // the current time past event.at. This causes negative durations. Hence this: @@ -66,13 +66,14 @@ object QueueAccess { invisibleDuration } val timeToLive = invisibleDuration.plusDays(timeToLiveDays) - createQueueClient(queueName).sendMessageWithResponse( + val response = createQueueClient(queueName).sendMessageWithResponse( message, duration, timeToLive, null, null ) + return response.value.messageId } fun receiveMessage(queueName: String): Event { diff --git a/prime-router/src/main/kotlin/azure/observability/event/ReportStreamEventData.kt b/prime-router/src/main/kotlin/azure/observability/event/ReportStreamEventData.kt index d2ffffa53fe..7596e10913f 100644 --- a/prime-router/src/main/kotlin/azure/observability/event/ReportStreamEventData.kt +++ b/prime-router/src/main/kotlin/azure/observability/event/ReportStreamEventData.kt @@ -66,6 +66,7 @@ enum class ReportStreamEventProperties { SENDER_NAME, BUNDLE_DIGEST, INGESTION_TYPE, + POISON_QUEUE_MESSAGE_ID, ; @JsonKey @@ -90,6 +91,7 @@ enum class ReportStreamEventName { REPORT_LAST_MILE_FAILURE, REPORT_NOT_PROCESSABLE, ITEM_SENT, + PIPELINE_EXCEPTION, } /** diff --git a/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt b/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt index d8cb1732d3c..0f7a31b560e 100644 --- a/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt +++ b/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt @@ -12,6 +12,8 @@ import gov.cdc.prime.router.azure.DatabaseAccess import gov.cdc.prime.router.azure.QueueAccess import gov.cdc.prime.router.azure.WorkflowEngine import gov.cdc.prime.router.azure.db.enums.TaskAction +import gov.cdc.prime.router.azure.observability.event.ReportStreamEventName +import gov.cdc.prime.router.azure.observability.event.ReportStreamEventProperties import gov.cdc.prime.router.common.BaseEngine import gov.cdc.prime.router.fhirengine.engine.FHIRConverter import gov.cdc.prime.router.fhirengine.engine.FHIRDestinationFilter @@ -24,6 +26,7 @@ import gov.cdc.prime.router.fhirengine.engine.PrimeRouterQueueMessage import gov.cdc.prime.router.fhirengine.engine.ReportPipelineMessage import org.apache.commons.lang3.StringUtils import org.apache.logging.log4j.kotlin.Logging +import org.jooq.exception.DataAccessException class FHIRFunctions( private val workflowEngine: WorkflowEngine = WorkflowEngine(), @@ -140,13 +143,36 @@ class FHIRFunctions( ): List { val messageContent = readMessage(fhirEngine.engineType, message, dequeueCount) - val newMessages = databaseAccess.transactReturning { txn -> - val results = fhirEngine.run(messageContent, actionLogger, actionHistory, txn) - recordResults(message, actionHistory, txn) - results - } + try { + val newMessages = databaseAccess.transactReturning { txn -> + val results = fhirEngine.run(messageContent, actionLogger, actionHistory, txn) + recordResults(message, actionHistory, txn) + results + } - return newMessages + return newMessages + } catch (ex: DataAccessException) { + // This is the one exception type that we currently will allow for retrying as there are occasional + // DB connectivity issues that are resolved without intervention + logger.error(ex) + throw ex + } catch (ex: Exception) { + // We're catching anything else that occurs because the most likely cause is a code or configuration error + // that will not be resolved if the message is automatically retried + // Instead, the error is recorded as an event and message is manually inserted into the poison queue + val report = databaseAccess.fetchReportFile(messageContent.reportId) + val poisonQueueMessageId = queueAccess.sendMessage("${messageContent.messageQueueName}-poison", message) + fhirEngine.reportEventService.sendReportProcessingError( + ReportStreamEventName.PIPELINE_EXCEPTION, + report, + fhirEngine.taskAction, + ex.message ?: "" + ) { + params(mapOf(ReportStreamEventProperties.POISON_QUEUE_MESSAGE_ID to poisonQueueMessageId)) + } + + return emptyList() + } } /** diff --git a/prime-router/src/main/kotlin/fhirengine/engine/FHIRConverter.kt b/prime-router/src/main/kotlin/fhirengine/engine/FHIRConverter.kt index 46565172885..9527ef4dc19 100644 --- a/prime-router/src/main/kotlin/fhirengine/engine/FHIRConverter.kt +++ b/prime-router/src/main/kotlin/fhirengine/engine/FHIRConverter.kt @@ -81,6 +81,8 @@ class FHIRConverter( override val engineType: String = "Convert" + override val taskAction: TaskAction = TaskAction.convert + /** * Accepts a [message] in either HL7 or FHIR format * HL7 messages will be converted into FHIR. diff --git a/prime-router/src/main/kotlin/fhirengine/engine/FHIRDestinationFilter.kt b/prime-router/src/main/kotlin/fhirengine/engine/FHIRDestinationFilter.kt index e834b6feea4..9f92a2b85b4 100644 --- a/prime-router/src/main/kotlin/fhirengine/engine/FHIRDestinationFilter.kt +++ b/prime-router/src/main/kotlin/fhirengine/engine/FHIRDestinationFilter.kt @@ -54,6 +54,7 @@ class FHIRDestinationFilter( override val finishedField: Field = Tables.TASK.DESTINATION_FILTERED_AT override val engineType: String = "DestinationFilter" + override val taskAction: TaskAction = TaskAction.destination_filter internal fun findTopicReceivers(topic: Topic): List = settings.receivers.filter { it.customerStatus != CustomerStatus.INACTIVE && it.topic == topic } diff --git a/prime-router/src/main/kotlin/fhirengine/engine/FHIREngine.kt b/prime-router/src/main/kotlin/fhirengine/engine/FHIREngine.kt index 6ec0e6dabdc..dd0d051c889 100644 --- a/prime-router/src/main/kotlin/fhirengine/engine/FHIREngine.kt +++ b/prime-router/src/main/kotlin/fhirengine/engine/FHIREngine.kt @@ -188,6 +188,11 @@ abstract class FHIREngine( */ abstract val engineType: String + /** + * The task action associated with the engine + */ + abstract val taskAction: TaskAction + /** * Result class that is returned as part of completing the work on a message * diff --git a/prime-router/src/main/kotlin/fhirengine/engine/FHIRReceiver.kt b/prime-router/src/main/kotlin/fhirengine/engine/FHIRReceiver.kt index 2a12444e552..9bea5e891ae 100644 --- a/prime-router/src/main/kotlin/fhirengine/engine/FHIRReceiver.kt +++ b/prime-router/src/main/kotlin/fhirengine/engine/FHIRReceiver.kt @@ -61,6 +61,7 @@ class FHIRReceiver( override val finishedField: Field = Tables.TASK.PROCESSED_AT override val engineType: String = "Receive" + override val taskAction: TaskAction = TaskAction.receive private val clientIdHeader = "client_id" private val contentTypeHeader = "content-type" diff --git a/prime-router/src/main/kotlin/fhirengine/engine/FHIRReceiverFilter.kt b/prime-router/src/main/kotlin/fhirengine/engine/FHIRReceiverFilter.kt index a2d9def71e9..aa11ba34f13 100644 --- a/prime-router/src/main/kotlin/fhirengine/engine/FHIRReceiverFilter.kt +++ b/prime-router/src/main/kotlin/fhirengine/engine/FHIRReceiverFilter.kt @@ -68,6 +68,7 @@ class FHIRReceiverFilter( override val finishedField: Field = Tables.TASK.RECEIVER_FILTERED_AT override val engineType: String = "ReceiverFilter" + override val taskAction: TaskAction = TaskAction.receiver_filter /** * Accepts a [message] in internal FHIR format diff --git a/prime-router/src/main/kotlin/fhirengine/engine/FHIRTranslator.kt b/prime-router/src/main/kotlin/fhirengine/engine/FHIRTranslator.kt index 506c9b2cfd7..5d319720ddd 100644 --- a/prime-router/src/main/kotlin/fhirengine/engine/FHIRTranslator.kt +++ b/prime-router/src/main/kotlin/fhirengine/engine/FHIRTranslator.kt @@ -20,6 +20,7 @@ import gov.cdc.prime.router.azure.BlobAccess import gov.cdc.prime.router.azure.DatabaseAccess import gov.cdc.prime.router.azure.Event import gov.cdc.prime.router.azure.db.Tables +import gov.cdc.prime.router.azure.db.enums.TaskAction import gov.cdc.prime.router.azure.observability.context.MDCUtils import gov.cdc.prime.router.azure.observability.context.withLoggingContext import gov.cdc.prime.router.azure.observability.event.AzureEventService @@ -170,6 +171,7 @@ class FHIRTranslator( override val finishedField: Field = Tables.TASK.TRANSLATED_AT override val engineType: String = "Translate" + override val taskAction: TaskAction = TaskAction.translate /** * Returns a byteArray representation of the [bundle] in a format [receiver] expects, or throws an exception if the diff --git a/prime-router/src/test/kotlin/SubmissionReceiverTests.kt b/prime-router/src/test/kotlin/SubmissionReceiverTests.kt index 5f3b70619fa..60e68bc44ba 100644 --- a/prime-router/src/test/kotlin/SubmissionReceiverTests.kt +++ b/prime-router/src/test/kotlin/SubmissionReceiverTests.kt @@ -787,7 +787,7 @@ class SubmissionReceiverTests { every { engine.routeReport(any(), any(), any(), any(), any()) } returns routeResult every { SubmissionReceiver.doDuplicateDetection(any(), any(), any()) } returns Unit every { engine.insertProcessTask(any(), any(), any(), any()) } returns Unit - every { queueMock.sendMessage(QueueMessage.elrConvertQueueName, any()) } returns Unit + every { queueMock.sendMessage(QueueMessage.elrConvertQueueName, any()) } returns "" // act receiver.validateAndMoveToProcessing( @@ -870,7 +870,7 @@ class SubmissionReceiverTests { every { engine.routeReport(any(), any(), any(), any(), any()) } returns routeResult every { SubmissionReceiver.doDuplicateDetection(any(), any(), any()) } returns Unit every { engine.insertProcessTask(any(), any(), any(), any()) } returns Unit - every { queueMock.sendMessage(QueueMessage.elrConvertQueueName, any()) } returns Unit + every { queueMock.sendMessage(QueueMessage.elrConvertQueueName, any()) } returns "" // act receiver.validateAndMoveToProcessing( @@ -941,7 +941,7 @@ class SubmissionReceiverTests { every { engine.routeReport(any(), any(), any(), any(), any()) } returns routeResult every { SubmissionReceiver.doDuplicateDetection(any(), any(), any()) } returns Unit every { engine.insertProcessTask(any(), any(), any(), any()) } returns Unit - every { queueMock.sendMessage(QueueMessage.elrConvertQueueName, any()) } returns Unit + every { queueMock.sendMessage(QueueMessage.elrConvertQueueName, any()) } returns "" // act receiver.validateAndMoveToProcessing( @@ -1011,7 +1011,7 @@ class SubmissionReceiverTests { every { engine.routeReport(any(), any(), any(), any(), any()) } returns routeResult every { SubmissionReceiver.doDuplicateDetection(any(), any(), any()) } returns Unit every { engine.insertProcessTask(any(), any(), any(), any()) } returns Unit - every { queueMock.sendMessage(QueueMessage.elrConvertQueueName, any()) } returns Unit + every { queueMock.sendMessage(QueueMessage.elrConvertQueueName, any()) } returns "" // act receiver.validateAndMoveToProcessing( @@ -1080,7 +1080,7 @@ class SubmissionReceiverTests { every { engine.recordReceivedReport(any(), any(), any(), any(), any()) } returns blobInfo every { engine.routeReport(any(), any(), any(), any(), any()) } returns routeResult every { engine.insertProcessTask(any(), any(), any(), any()) } returns Unit - every { queueMock.sendMessage(QueueMessage.elrConvertQueueName, any()) } returns Unit + every { queueMock.sendMessage(QueueMessage.elrConvertQueueName, any()) } returns "" // act var exceptionThrown = false @@ -1153,7 +1153,7 @@ class SubmissionReceiverTests { every { engine.recordReceivedReport(any(), any(), any(), any(), any()) } returns blobInfo every { engine.routeReport(any(), any(), any(), any(), any()) } returns routeResult every { engine.insertProcessTask(any(), any(), any(), any()) } returns Unit - every { queueMock.sendMessage(QueueMessage.elrConvertQueueName, any()) } returns Unit + every { queueMock.sendMessage(QueueMessage.elrConvertQueueName, any()) } returns "" // act / assert assertFailure { diff --git a/prime-router/src/test/kotlin/azure/FHIRFunctionsTests.kt b/prime-router/src/test/kotlin/azure/FHIRFunctionsTests.kt new file mode 100644 index 00000000000..1d3454c1824 --- /dev/null +++ b/prime-router/src/test/kotlin/azure/FHIRFunctionsTests.kt @@ -0,0 +1,119 @@ +package gov.cdc.prime.router.azure + +import gov.cdc.prime.reportstream.shared.QueueMessage +import gov.cdc.prime.router.FileSettings +import gov.cdc.prime.router.azure.db.enums.TaskAction +import gov.cdc.prime.router.azure.db.tables.pojos.ReportFile +import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService +import gov.cdc.prime.router.azure.observability.event.ReportStreamEventName +import gov.cdc.prime.router.azure.observability.event.ReportStreamReportProcessingErrorEventBuilder +import gov.cdc.prime.router.common.UniversalPipelineTestUtils +import gov.cdc.prime.router.common.UniversalPipelineTestUtils.createFHIRFunctionsInstance +import gov.cdc.prime.router.fhirengine.azure.FHIRFunctions +import gov.cdc.prime.router.fhirengine.engine.FHIRConverter +import gov.cdc.prime.router.metadata.LookupTable +import gov.cdc.prime.router.unittest.UnitTestUtils +import io.mockk.every +import io.mockk.mockk +import io.mockk.mockkConstructor +import io.mockk.mockkObject +import io.mockk.slot +import io.mockk.spyk +import io.mockk.verify +import org.jooq.exception.DataAccessException +import org.jooq.tools.jdbc.MockConnection +import org.jooq.tools.jdbc.MockDataProvider +import org.jooq.tools.jdbc.MockResult +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import java.util.UUID + +class FHIRFunctionsTests { + + val queueMessage = """ + { + "type": "convert", + "reportId": "${UUID.randomUUID()}", + "blobURL": "", + "digest": "", + "blobSubFolderName": "ignore.full-elr", + "topic": "full-elr", + "schemaName": "" + } + """ + + @BeforeEach + fun beforeEach() { + mockkObject(QueueAccess) + every { QueueAccess.sendMessage(any(), any()) } returns "poison-123" + mockkObject(BlobAccess) + mockkConstructor(DatabaseLookupTableAccess::class) + } + + private fun createFHIRFunctionsInstance(): FHIRFunctions { + val settings = FileSettings().loadOrganizations(UniversalPipelineTestUtils.universalPipelineOrganization) + val metadata = UnitTestUtils.simpleMetadata + metadata.lookupTableStore += mapOf( + "observation-mapping" to LookupTable("observation-mapping", emptyList()) + ) + val dataProvider = MockDataProvider { emptyArray() } + val connection = MockConnection(dataProvider) + val accessSpy = spyk(DatabaseAccess(connection)) + val workflowEngine = WorkflowEngine.Builder() + .metadata(metadata) + .settingsProvider(settings) + .databaseAccess(accessSpy) + .build() + every { accessSpy.fetchReportFile(any()) } returns mockk(relaxed = true) + return FHIRFunctions(workflowEngine, databaseAccess = accessSpy) + } + + @Test + fun `test should add to the poison queue and catch an unexpected exception`() { + val fhirFunctions = createFHIRFunctionsInstance() + + val mockReportEventService = mockk(relaxed = true) + val init = slot Unit>() + every { + mockReportEventService.sendReportProcessingError(any(), any(), any(), any(), capture(init)) + } returns Unit + val mockFHIRConverter = mockk(relaxed = true) + every { mockFHIRConverter.run(any(), any(), any(), any()) } throws RuntimeException("Error") + every { mockFHIRConverter.reportEventService } returns mockReportEventService + every { mockFHIRConverter.taskAction } returns TaskAction.convert + fhirFunctions.process(queueMessage, 1, mockFHIRConverter, ActionHistory(TaskAction.convert)) + + verify(exactly = 1) { + QueueAccess.sendMessage( + "${QueueMessage.elrConvertQueueName}-poison", + queueMessage + ) + mockReportEventService.sendReportProcessingError( + ReportStreamEventName.PIPELINE_EXCEPTION, + any(), + TaskAction.convert, + "Error", + init.captured + ) + } + } + + @Test + fun `test should not add to the poison queue and throw a data access exception`() { + val fhirFunctions = createFHIRFunctionsInstance() + + val mockFHIRConverter = mockk(relaxed = true) + every { mockFHIRConverter.run(any(), any(), any(), any()) } throws DataAccessException("Error") + assertThrows { + fhirFunctions.process(queueMessage, 1, mockFHIRConverter, ActionHistory(TaskAction.convert)) + } + + verify(exactly = 0) { + QueueAccess.sendMessage( + "${QueueMessage.elrConvertQueueName}-poison", + queueMessage + ) + } + } +} \ No newline at end of file diff --git a/prime-router/src/test/kotlin/azure/ReportFunctionTests.kt b/prime-router/src/test/kotlin/azure/ReportFunctionTests.kt index 7465ba0f3f9..33ac1482980 100644 --- a/prime-router/src/test/kotlin/azure/ReportFunctionTests.kt +++ b/prime-router/src/test/kotlin/azure/ReportFunctionTests.kt @@ -293,7 +293,7 @@ class ReportFunctionTests { } returns report1 every { engine.recordReceivedReport(any(), any(), any(), any(), any()) } returns blobInfo - every { engine.queue.sendMessage(any(), any(), any()) } returns Unit + every { engine.queue.sendMessage(any(), any(), any()) } returns "" val bodyBytes = "".toByteArray() mockkObject(ReportWriter) every { ReportWriter.getBodyBytes(any(), any(), any()) }.returns(bodyBytes) @@ -660,7 +660,7 @@ class ReportFunctionTests { every { actionHistory.action.sendingOrg } returns "Test Sender" every { actionHistory.action.actionName } returns TaskAction.receive every { engine.recordReceivedReport(any(), any(), any(), any(), any()) } returns blobInfo - every { engine.queue.sendMessage(any(), any(), any()) } returns Unit + every { engine.queue.sendMessage(any(), any(), any()) } returns "" val bodyBytes = "".toByteArray() mockkObject(ReportWriter) every { ReportWriter.getBodyBytes(any(), any(), any()) }.returns(bodyBytes) @@ -714,7 +714,7 @@ class ReportFunctionTests { every { actionHistory.action.sendingOrg } returns "Test Sender" every { actionHistory.action.actionName } returns TaskAction.receive every { engine.recordReceivedReport(any(), any(), any(), any(), any()) } returns blobInfo - every { engine.queue.sendMessage(any(), any(), any()) } returns Unit + every { engine.queue.sendMessage(any(), any(), any()) } returns "" val bodyBytes = "".toByteArray() mockkObject(ReportWriter) every { ReportWriter.getBodyBytes(any(), any(), any()) }.returns(bodyBytes) @@ -784,7 +784,7 @@ class ReportFunctionTests { every { actionHistory.action.sendingOrg } returns "Test Sender" every { actionHistory.action.actionName } returns TaskAction.receive every { engine.recordReceivedReport(any(), any(), any(), any(), any()) } returns blobInfo - every { engine.queue.sendMessage(any(), any(), any()) } returns Unit + every { engine.queue.sendMessage(any(), any(), any()) } returns "" val bodyBytes = "".toByteArray() mockkObject(ReportWriter) every { ReportWriter.getBodyBytes(any(), any(), any()) }.returns(bodyBytes) diff --git a/prime-router/src/test/kotlin/fhirengine/azure/FHIRConverterIntegrationTests.kt b/prime-router/src/test/kotlin/fhirengine/azure/FHIRConverterIntegrationTests.kt index d4c0a390c46..bd483b02079 100644 --- a/prime-router/src/test/kotlin/fhirengine/azure/FHIRConverterIntegrationTests.kt +++ b/prime-router/src/test/kotlin/fhirengine/azure/FHIRConverterIntegrationTests.kt @@ -158,7 +158,7 @@ class FHIRConverterIntegrationTests { @BeforeEach fun beforeEach() { mockkObject(QueueAccess) - every { QueueAccess.sendMessage(any(), any()) } returns Unit + every { QueueAccess.sendMessage(any(), any()) } returns "" mockkObject(BlobAccess) every { BlobAccess getProperty "defaultBlobMetadata" } returns getBlobContainerMetadata() mockkObject(BlobAccess.BlobContainerMetadata) diff --git a/prime-router/src/test/kotlin/fhirengine/azure/FHIRDestinationFilterIntegrationTests.kt b/prime-router/src/test/kotlin/fhirengine/azure/FHIRDestinationFilterIntegrationTests.kt index f587d95106d..8e2f420d888 100644 --- a/prime-router/src/test/kotlin/fhirengine/azure/FHIRDestinationFilterIntegrationTests.kt +++ b/prime-router/src/test/kotlin/fhirengine/azure/FHIRDestinationFilterIntegrationTests.kt @@ -92,7 +92,7 @@ class FHIRDestinationFilterIntegrationTests : Logging { mockkObject(BlobAccess) mockkObject(BlobAccess.BlobContainerMetadata) - every { QueueAccess.sendMessage(any(), any()) } returns Unit + every { QueueAccess.sendMessage(any(), any()) } returns "" every { BlobAccess getProperty "defaultBlobMetadata" } returns UniversalPipelineTestUtils .getBlobContainerMetadata(azuriteContainer) every { BlobAccess.BlobContainerMetadata.build(any(), any()) } returns diff --git a/prime-router/src/test/kotlin/fhirengine/azure/FHIRReceiverFilterIntegrationTests.kt b/prime-router/src/test/kotlin/fhirengine/azure/FHIRReceiverFilterIntegrationTests.kt index 5e921089b91..eae5b63fe2c 100644 --- a/prime-router/src/test/kotlin/fhirengine/azure/FHIRReceiverFilterIntegrationTests.kt +++ b/prime-router/src/test/kotlin/fhirengine/azure/FHIRReceiverFilterIntegrationTests.kt @@ -179,7 +179,7 @@ class FHIRReceiverFilterIntegrationTests : Logging { @BeforeEach fun beforeEach() { mockkObject(QueueAccess) - every { QueueAccess.sendMessage(any(), any()) } returns Unit + every { QueueAccess.sendMessage(any(), any()) } returns "" mockkObject(BlobAccess) every { BlobAccess getProperty "defaultBlobMetadata" } returns UniversalPipelineTestUtils .getBlobContainerMetadata(azuriteContainer) diff --git a/prime-router/src/test/kotlin/fhirengine/azure/FHIRReceiverIntegrationTests.kt b/prime-router/src/test/kotlin/fhirengine/azure/FHIRReceiverIntegrationTests.kt index a3f4e2be2db..525e53905d2 100644 --- a/prime-router/src/test/kotlin/fhirengine/azure/FHIRReceiverIntegrationTests.kt +++ b/prime-router/src/test/kotlin/fhirengine/azure/FHIRReceiverIntegrationTests.kt @@ -120,7 +120,7 @@ class FHIRReceiverIntegrationTests { fun beforeEach() { clearAllMocks() mockkObject(QueueAccess) - every { QueueAccess.sendMessage(any(), any()) } returns Unit + every { QueueAccess.sendMessage(any(), any()) } returns "" mockkObject(BlobAccess) every { BlobAccess getProperty "defaultBlobMetadata" } returns getBlobContainerMetadata() mockkObject(BlobAccess.BlobContainerMetadata) diff --git a/prime-router/src/test/kotlin/fhirengine/azure/FHIRTranslatorIntegrationTests.kt b/prime-router/src/test/kotlin/fhirengine/azure/FHIRTranslatorIntegrationTests.kt index e74ad4939b9..c45b9fe3e28 100644 --- a/prime-router/src/test/kotlin/fhirengine/azure/FHIRTranslatorIntegrationTests.kt +++ b/prime-router/src/test/kotlin/fhirengine/azure/FHIRTranslatorIntegrationTests.kt @@ -70,7 +70,7 @@ class FHIRTranslatorIntegrationTests : Logging { @BeforeEach fun beforeEach() { mockkObject(QueueAccess) - every { QueueAccess.sendMessage(any(), any()) } returns Unit + every { QueueAccess.sendMessage(any(), any()) } returns "" mockkObject(BlobAccess) every { BlobAccess getProperty "defaultBlobMetadata" } returns UniversalPipelineTestUtils .getBlobContainerMetadata(azuriteContainer)