Skip to content

Commit

Permalink
16140: manually populate poison queue when a step encounters an error (
Browse files Browse the repository at this point in the history
…#16258)

* 16140: manually populate poison queue when a step encounters an error

* fixup! 16140: manually populate poison queue when a step encounters an error
  • Loading branch information
mkalish authored Oct 23, 2024
1 parent d3c6ac4 commit b0451fc
Show file tree
Hide file tree
Showing 17 changed files with 183 additions and 23 deletions.
5 changes: 3 additions & 2 deletions prime-router/src/main/kotlin/azure/QueueAccess.kt
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ object QueueAccess {
queueName: String,
message: String,
invisibleDuration: Duration = Duration.ZERO,
) {
): String {
// Bug: event.at is calculated before the call to workflowengine.recordHistory
// In cases of very large datasets, that db write can take a very long time, pushing
// the current time past event.at. This causes negative durations. Hence this:
Expand All @@ -66,13 +66,14 @@ object QueueAccess {
invisibleDuration
}
val timeToLive = invisibleDuration.plusDays(timeToLiveDays)
createQueueClient(queueName).sendMessageWithResponse(
val response = createQueueClient(queueName).sendMessageWithResponse(
message,
duration,
timeToLive,
null,
null
)
return response.value.messageId
}

fun receiveMessage(queueName: String): Event {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ enum class ReportStreamEventProperties {
SENDER_NAME,
BUNDLE_DIGEST,
INGESTION_TYPE,
POISON_QUEUE_MESSAGE_ID,
;

@JsonKey
Expand All @@ -90,6 +91,7 @@ enum class ReportStreamEventName {
REPORT_LAST_MILE_FAILURE,
REPORT_NOT_PROCESSABLE,
ITEM_SENT,
PIPELINE_EXCEPTION,
}

/**
Expand Down
38 changes: 32 additions & 6 deletions prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import gov.cdc.prime.router.azure.DatabaseAccess
import gov.cdc.prime.router.azure.QueueAccess
import gov.cdc.prime.router.azure.WorkflowEngine
import gov.cdc.prime.router.azure.db.enums.TaskAction
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventName
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventProperties
import gov.cdc.prime.router.common.BaseEngine
import gov.cdc.prime.router.fhirengine.engine.FHIRConverter
import gov.cdc.prime.router.fhirengine.engine.FHIRDestinationFilter
Expand All @@ -24,6 +26,7 @@ import gov.cdc.prime.router.fhirengine.engine.PrimeRouterQueueMessage
import gov.cdc.prime.router.fhirengine.engine.ReportPipelineMessage
import org.apache.commons.lang3.StringUtils
import org.apache.logging.log4j.kotlin.Logging
import org.jooq.exception.DataAccessException

class FHIRFunctions(
private val workflowEngine: WorkflowEngine = WorkflowEngine(),
Expand Down Expand Up @@ -140,13 +143,36 @@ class FHIRFunctions(
): List<QueueMessage> {
val messageContent = readMessage(fhirEngine.engineType, message, dequeueCount)

val newMessages = databaseAccess.transactReturning { txn ->
val results = fhirEngine.run(messageContent, actionLogger, actionHistory, txn)
recordResults(message, actionHistory, txn)
results
}
try {
val newMessages = databaseAccess.transactReturning { txn ->
val results = fhirEngine.run(messageContent, actionLogger, actionHistory, txn)
recordResults(message, actionHistory, txn)
results
}

return newMessages
return newMessages
} catch (ex: DataAccessException) {
// This is the one exception type that we currently will allow for retrying as there are occasional
// DB connectivity issues that are resolved without intervention
logger.error(ex)
throw ex
} catch (ex: Exception) {
// We're catching anything else that occurs because the most likely cause is a code or configuration error
// that will not be resolved if the message is automatically retried
// Instead, the error is recorded as an event and message is manually inserted into the poison queue
val report = databaseAccess.fetchReportFile(messageContent.reportId)
val poisonQueueMessageId = queueAccess.sendMessage("${messageContent.messageQueueName}-poison", message)
fhirEngine.reportEventService.sendReportProcessingError(
ReportStreamEventName.PIPELINE_EXCEPTION,
report,
fhirEngine.taskAction,
ex.message ?: ""
) {
params(mapOf(ReportStreamEventProperties.POISON_QUEUE_MESSAGE_ID to poisonQueueMessageId))
}

return emptyList()
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ class FHIRConverter(

override val engineType: String = "Convert"

override val taskAction: TaskAction = TaskAction.convert

/**
* Accepts a [message] in either HL7 or FHIR format
* HL7 messages will be converted into FHIR.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class FHIRDestinationFilter(
override val finishedField: Field<OffsetDateTime> = Tables.TASK.DESTINATION_FILTERED_AT

override val engineType: String = "DestinationFilter"
override val taskAction: TaskAction = TaskAction.destination_filter

internal fun findTopicReceivers(topic: Topic): List<Receiver> =
settings.receivers.filter { it.customerStatus != CustomerStatus.INACTIVE && it.topic == topic }
Expand Down
5 changes: 5 additions & 0 deletions prime-router/src/main/kotlin/fhirengine/engine/FHIREngine.kt
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ abstract class FHIREngine(
*/
abstract val engineType: String

/**
* The task action associated with the engine
*/
abstract val taskAction: TaskAction

/**
* Result class that is returned as part of completing the work on a message
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class FHIRReceiver(
override val finishedField: Field<OffsetDateTime> = Tables.TASK.PROCESSED_AT

override val engineType: String = "Receive"
override val taskAction: TaskAction = TaskAction.receive

private val clientIdHeader = "client_id"
private val contentTypeHeader = "content-type"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class FHIRReceiverFilter(
override val finishedField: Field<OffsetDateTime> = Tables.TASK.RECEIVER_FILTERED_AT

override val engineType: String = "ReceiverFilter"
override val taskAction: TaskAction = TaskAction.receiver_filter

/**
* Accepts a [message] in internal FHIR format
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import gov.cdc.prime.router.azure.BlobAccess
import gov.cdc.prime.router.azure.DatabaseAccess
import gov.cdc.prime.router.azure.Event
import gov.cdc.prime.router.azure.db.Tables
import gov.cdc.prime.router.azure.db.enums.TaskAction
import gov.cdc.prime.router.azure.observability.context.MDCUtils
import gov.cdc.prime.router.azure.observability.context.withLoggingContext
import gov.cdc.prime.router.azure.observability.event.AzureEventService
Expand Down Expand Up @@ -170,6 +171,7 @@ class FHIRTranslator(

override val finishedField: Field<OffsetDateTime> = Tables.TASK.TRANSLATED_AT
override val engineType: String = "Translate"
override val taskAction: TaskAction = TaskAction.translate

/**
* Returns a byteArray representation of the [bundle] in a format [receiver] expects, or throws an exception if the
Expand Down
12 changes: 6 additions & 6 deletions prime-router/src/test/kotlin/SubmissionReceiverTests.kt
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,7 @@ class SubmissionReceiverTests {
every { engine.routeReport(any(), any(), any(), any(), any()) } returns routeResult
every { SubmissionReceiver.doDuplicateDetection(any(), any(), any()) } returns Unit
every { engine.insertProcessTask(any(), any(), any(), any()) } returns Unit
every { queueMock.sendMessage(QueueMessage.elrConvertQueueName, any()) } returns Unit
every { queueMock.sendMessage(QueueMessage.elrConvertQueueName, any()) } returns ""

// act
receiver.validateAndMoveToProcessing(
Expand Down Expand Up @@ -870,7 +870,7 @@ class SubmissionReceiverTests {
every { engine.routeReport(any(), any(), any(), any(), any()) } returns routeResult
every { SubmissionReceiver.doDuplicateDetection(any(), any(), any()) } returns Unit
every { engine.insertProcessTask(any(), any(), any(), any()) } returns Unit
every { queueMock.sendMessage(QueueMessage.elrConvertQueueName, any()) } returns Unit
every { queueMock.sendMessage(QueueMessage.elrConvertQueueName, any()) } returns ""

// act
receiver.validateAndMoveToProcessing(
Expand Down Expand Up @@ -941,7 +941,7 @@ class SubmissionReceiverTests {
every { engine.routeReport(any(), any(), any(), any(), any()) } returns routeResult
every { SubmissionReceiver.doDuplicateDetection(any(), any(), any()) } returns Unit
every { engine.insertProcessTask(any(), any(), any(), any()) } returns Unit
every { queueMock.sendMessage(QueueMessage.elrConvertQueueName, any()) } returns Unit
every { queueMock.sendMessage(QueueMessage.elrConvertQueueName, any()) } returns ""

// act
receiver.validateAndMoveToProcessing(
Expand Down Expand Up @@ -1011,7 +1011,7 @@ class SubmissionReceiverTests {
every { engine.routeReport(any(), any(), any(), any(), any()) } returns routeResult
every { SubmissionReceiver.doDuplicateDetection(any(), any(), any()) } returns Unit
every { engine.insertProcessTask(any(), any(), any(), any()) } returns Unit
every { queueMock.sendMessage(QueueMessage.elrConvertQueueName, any()) } returns Unit
every { queueMock.sendMessage(QueueMessage.elrConvertQueueName, any()) } returns ""

// act
receiver.validateAndMoveToProcessing(
Expand Down Expand Up @@ -1080,7 +1080,7 @@ class SubmissionReceiverTests {
every { engine.recordReceivedReport(any(), any(), any(), any(), any()) } returns blobInfo
every { engine.routeReport(any(), any(), any(), any(), any()) } returns routeResult
every { engine.insertProcessTask(any(), any(), any(), any()) } returns Unit
every { queueMock.sendMessage(QueueMessage.elrConvertQueueName, any()) } returns Unit
every { queueMock.sendMessage(QueueMessage.elrConvertQueueName, any()) } returns ""

// act
var exceptionThrown = false
Expand Down Expand Up @@ -1153,7 +1153,7 @@ class SubmissionReceiverTests {
every { engine.recordReceivedReport(any(), any(), any(), any(), any()) } returns blobInfo
every { engine.routeReport(any(), any(), any(), any(), any()) } returns routeResult
every { engine.insertProcessTask(any(), any(), any(), any()) } returns Unit
every { queueMock.sendMessage(QueueMessage.elrConvertQueueName, any()) } returns Unit
every { queueMock.sendMessage(QueueMessage.elrConvertQueueName, any()) } returns ""

// act / assert
assertFailure {
Expand Down
119 changes: 119 additions & 0 deletions prime-router/src/test/kotlin/azure/FHIRFunctionsTests.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package gov.cdc.prime.router.azure

import gov.cdc.prime.reportstream.shared.QueueMessage
import gov.cdc.prime.router.FileSettings
import gov.cdc.prime.router.azure.db.enums.TaskAction
import gov.cdc.prime.router.azure.db.tables.pojos.ReportFile
import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventName
import gov.cdc.prime.router.azure.observability.event.ReportStreamReportProcessingErrorEventBuilder
import gov.cdc.prime.router.common.UniversalPipelineTestUtils
import gov.cdc.prime.router.common.UniversalPipelineTestUtils.createFHIRFunctionsInstance
import gov.cdc.prime.router.fhirengine.azure.FHIRFunctions
import gov.cdc.prime.router.fhirengine.engine.FHIRConverter
import gov.cdc.prime.router.metadata.LookupTable
import gov.cdc.prime.router.unittest.UnitTestUtils
import io.mockk.every
import io.mockk.mockk
import io.mockk.mockkConstructor
import io.mockk.mockkObject
import io.mockk.slot
import io.mockk.spyk
import io.mockk.verify
import org.jooq.exception.DataAccessException
import org.jooq.tools.jdbc.MockConnection
import org.jooq.tools.jdbc.MockDataProvider
import org.jooq.tools.jdbc.MockResult
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import java.util.UUID

class FHIRFunctionsTests {

val queueMessage = """
{
"type": "convert",
"reportId": "${UUID.randomUUID()}",
"blobURL": "",
"digest": "",
"blobSubFolderName": "ignore.full-elr",
"topic": "full-elr",
"schemaName": ""
}
"""

@BeforeEach
fun beforeEach() {
mockkObject(QueueAccess)
every { QueueAccess.sendMessage(any(), any()) } returns "poison-123"
mockkObject(BlobAccess)
mockkConstructor(DatabaseLookupTableAccess::class)
}

private fun createFHIRFunctionsInstance(): FHIRFunctions {
val settings = FileSettings().loadOrganizations(UniversalPipelineTestUtils.universalPipelineOrganization)
val metadata = UnitTestUtils.simpleMetadata
metadata.lookupTableStore += mapOf(
"observation-mapping" to LookupTable("observation-mapping", emptyList())
)
val dataProvider = MockDataProvider { emptyArray<MockResult>() }
val connection = MockConnection(dataProvider)
val accessSpy = spyk(DatabaseAccess(connection))
val workflowEngine = WorkflowEngine.Builder()
.metadata(metadata)
.settingsProvider(settings)
.databaseAccess(accessSpy)
.build()
every { accessSpy.fetchReportFile(any()) } returns mockk<ReportFile>(relaxed = true)
return FHIRFunctions(workflowEngine, databaseAccess = accessSpy)
}

@Test
fun `test should add to the poison queue and catch an unexpected exception`() {
val fhirFunctions = createFHIRFunctionsInstance()

val mockReportEventService = mockk<IReportStreamEventService>(relaxed = true)
val init = slot<ReportStreamReportProcessingErrorEventBuilder.() -> Unit>()
every {
mockReportEventService.sendReportProcessingError(any(), any<ReportFile>(), any(), any(), capture(init))
} returns Unit
val mockFHIRConverter = mockk<FHIRConverter>(relaxed = true)
every { mockFHIRConverter.run(any(), any(), any(), any()) } throws RuntimeException("Error")
every { mockFHIRConverter.reportEventService } returns mockReportEventService
every { mockFHIRConverter.taskAction } returns TaskAction.convert
fhirFunctions.process(queueMessage, 1, mockFHIRConverter, ActionHistory(TaskAction.convert))

verify(exactly = 1) {
QueueAccess.sendMessage(
"${QueueMessage.elrConvertQueueName}-poison",
queueMessage
)
mockReportEventService.sendReportProcessingError(
ReportStreamEventName.PIPELINE_EXCEPTION,
any<ReportFile>(),
TaskAction.convert,
"Error",
init.captured
)
}
}

@Test
fun `test should not add to the poison queue and throw a data access exception`() {
val fhirFunctions = createFHIRFunctionsInstance()

val mockFHIRConverter = mockk<FHIRConverter>(relaxed = true)
every { mockFHIRConverter.run(any(), any(), any(), any()) } throws DataAccessException("Error")
assertThrows<DataAccessException> {
fhirFunctions.process(queueMessage, 1, mockFHIRConverter, ActionHistory(TaskAction.convert))
}

verify(exactly = 0) {
QueueAccess.sendMessage(
"${QueueMessage.elrConvertQueueName}-poison",
queueMessage
)
}
}
}
8 changes: 4 additions & 4 deletions prime-router/src/test/kotlin/azure/ReportFunctionTests.kt
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ class ReportFunctionTests {
} returns report1

every { engine.recordReceivedReport(any(), any(), any(), any(), any()) } returns blobInfo
every { engine.queue.sendMessage(any(), any(), any()) } returns Unit
every { engine.queue.sendMessage(any(), any(), any()) } returns ""
val bodyBytes = "".toByteArray()
mockkObject(ReportWriter)
every { ReportWriter.getBodyBytes(any(), any(), any()) }.returns(bodyBytes)
Expand Down Expand Up @@ -660,7 +660,7 @@ class ReportFunctionTests {
every { actionHistory.action.sendingOrg } returns "Test Sender"
every { actionHistory.action.actionName } returns TaskAction.receive
every { engine.recordReceivedReport(any(), any(), any(), any(), any()) } returns blobInfo
every { engine.queue.sendMessage(any(), any(), any()) } returns Unit
every { engine.queue.sendMessage(any(), any(), any()) } returns ""
val bodyBytes = "".toByteArray()
mockkObject(ReportWriter)
every { ReportWriter.getBodyBytes(any(), any(), any()) }.returns(bodyBytes)
Expand Down Expand Up @@ -714,7 +714,7 @@ class ReportFunctionTests {
every { actionHistory.action.sendingOrg } returns "Test Sender"
every { actionHistory.action.actionName } returns TaskAction.receive
every { engine.recordReceivedReport(any(), any(), any(), any(), any()) } returns blobInfo
every { engine.queue.sendMessage(any(), any(), any()) } returns Unit
every { engine.queue.sendMessage(any(), any(), any()) } returns ""
val bodyBytes = "".toByteArray()
mockkObject(ReportWriter)
every { ReportWriter.getBodyBytes(any(), any(), any()) }.returns(bodyBytes)
Expand Down Expand Up @@ -784,7 +784,7 @@ class ReportFunctionTests {
every { actionHistory.action.sendingOrg } returns "Test Sender"
every { actionHistory.action.actionName } returns TaskAction.receive
every { engine.recordReceivedReport(any(), any(), any(), any(), any()) } returns blobInfo
every { engine.queue.sendMessage(any(), any(), any()) } returns Unit
every { engine.queue.sendMessage(any(), any(), any()) } returns ""
val bodyBytes = "".toByteArray()
mockkObject(ReportWriter)
every { ReportWriter.getBodyBytes(any(), any(), any()) }.returns(bodyBytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class FHIRConverterIntegrationTests {
@BeforeEach
fun beforeEach() {
mockkObject(QueueAccess)
every { QueueAccess.sendMessage(any(), any()) } returns Unit
every { QueueAccess.sendMessage(any(), any()) } returns ""
mockkObject(BlobAccess)
every { BlobAccess getProperty "defaultBlobMetadata" } returns getBlobContainerMetadata()
mockkObject(BlobAccess.BlobContainerMetadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class FHIRDestinationFilterIntegrationTests : Logging {
mockkObject(BlobAccess)
mockkObject(BlobAccess.BlobContainerMetadata)

every { QueueAccess.sendMessage(any(), any()) } returns Unit
every { QueueAccess.sendMessage(any(), any()) } returns ""
every { BlobAccess getProperty "defaultBlobMetadata" } returns UniversalPipelineTestUtils
.getBlobContainerMetadata(azuriteContainer)
every { BlobAccess.BlobContainerMetadata.build(any(), any()) } returns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ class FHIRReceiverFilterIntegrationTests : Logging {
@BeforeEach
fun beforeEach() {
mockkObject(QueueAccess)
every { QueueAccess.sendMessage(any(), any()) } returns Unit
every { QueueAccess.sendMessage(any(), any()) } returns ""
mockkObject(BlobAccess)
every { BlobAccess getProperty "defaultBlobMetadata" } returns UniversalPipelineTestUtils
.getBlobContainerMetadata(azuriteContainer)
Expand Down
Loading

0 comments on commit b0451fc

Please sign in to comment.