diff --git a/prime-router/src/main/kotlin/fhirengine/engine/FHIRConverter.kt b/prime-router/src/main/kotlin/fhirengine/engine/FHIRConverter.kt index 53a4242dfc4..87bff24f1e9 100644 --- a/prime-router/src/main/kotlin/fhirengine/engine/FHIRConverter.kt +++ b/prime-router/src/main/kotlin/fhirengine/engine/FHIRConverter.kt @@ -114,6 +114,7 @@ class FHIRConverter( companion object { private val clientIdHeader = "client_id" + private val payloadNameHeader = "payloadname" /** * Converts a [FhirConvertQueueMessage] into the input to the convert processing @@ -160,6 +161,7 @@ class FHIRConverter( val blobSubFolderName = message.blobSubFolderName val clientId = message.headers[clientIdHeader] + val payloadName = message.headers[payloadNameHeader] val sender = clientId?.takeIf { it.isNotBlank() }?.let { settings.findSender(it) } if (sender == null) { throw SubmissionSenderNotFound(clientId ?: "", reportId, blobUrl) @@ -181,7 +183,8 @@ class FHIRConverter( // is properly recorded in the report file table with the correct sender actionHistory.trackExternalInputReport( report, - BlobAccess.BlobInfo(format, blobUrl, blobDigest.toByteArray()) + BlobAccess.BlobInfo(format, blobUrl, blobDigest.toByteArray()), + payloadName ) actionHistory.trackActionSenderInfo(sender.fullName) diff --git a/prime-router/src/main/kotlin/history/db/ReportGraph.kt b/prime-router/src/main/kotlin/history/db/ReportGraph.kt index 103b9831eb3..0b44dcfe271 100644 --- a/prime-router/src/main/kotlin/history/db/ReportGraph.kt +++ b/prime-router/src/main/kotlin/history/db/ReportGraph.kt @@ -123,11 +123,10 @@ class ReportGraph( } /** - * Recursively goes up the report_linage table from any report until it reaches - * a report with an action type of "receive" (the root report) + * Recursively goes up the report_lineage table from any report until it reaches + * a report that does not appear in report_lineage as a child report (the root report) * - * This will return null if no report with action type "receive" is present or if - * the root is passed in + * This will return null if the root is passed in */ fun getRootReport(childReportId: UUID): ReportFile? { return db.transactReturning { txn -> @@ -172,19 +171,19 @@ class ReportGraph( .from(cte) .join(REPORT_FILE) .on(REPORT_FILE.REPORT_ID.eq(ItemGraphTable.ITEM_GRAPH.PARENT_REPORT_ID)) - .join(ACTION) - .on(ACTION.ACTION_ID.eq(REPORT_FILE.ACTION_ID)) - .where(ACTION.ACTION_NAME.eq(TaskAction.receive)) + .leftJoin(REPORT_LINEAGE) + .on(REPORT_FILE.REPORT_ID.eq(REPORT_LINEAGE.CHILD_REPORT_ID)) + .where(REPORT_LINEAGE.PARENT_REPORT_ID.isNull()) + .orderBy(REPORT_FILE.ACTION_ID.asc()) .fetchOneInto(Item::class.java) return rootItem } /** - * Recursively goes up the report_linage table from any report until it reaches - * all reports with an action type of "receive" (the root report) + * Recursively goes up the report_lineage table from any report until it reaches + * all reports that do not appear in report_lineage as a child report (the root report) * - * This will return null if no report with action type "receive" is present or if - * the root is passed in + * This will return null if the root is passed in * * If the passed in report ID has multiple root reports, they will all be returned */ @@ -476,9 +475,10 @@ class ReportGraph( .from(cte) .join(REPORT_FILE) .on(REPORT_FILE.REPORT_ID.eq(cte.field(0, UUID::class.java))) - .join(ACTION) - .on(ACTION.ACTION_ID.eq(REPORT_FILE.ACTION_ID)) - .where(ACTION.ACTION_NAME.eq(TaskAction.receive)) + .leftJoin(REPORT_LINEAGE) + .on(REPORT_FILE.REPORT_ID.eq(REPORT_LINEAGE.CHILD_REPORT_ID)) + .where(REPORT_LINEAGE.PARENT_REPORT_ID.isNull()) + .orderBy(REPORT_FILE.ACTION_ID.asc()) /** * Accepts a list of ids and walks down the report lineage graph diff --git a/prime-router/src/test/kotlin/common/UniversalPipelineTestUtils.kt b/prime-router/src/test/kotlin/common/UniversalPipelineTestUtils.kt index a61f5b13d86..96ac1340b5d 100644 --- a/prime-router/src/test/kotlin/common/UniversalPipelineTestUtils.kt +++ b/prime-router/src/test/kotlin/common/UniversalPipelineTestUtils.kt @@ -441,7 +441,7 @@ object UniversalPipelineTestUtils { fileName: String = "mr_fhir_face.fhir", ): Report { val blobUrl = BlobAccess.uploadBlob( - "${TaskAction.receive.literal}/$fileName", + "${previousAction.literal}/$fileName", reportContents.toByteArray(), getBlobContainerMetadata(azuriteContainer) ) diff --git a/prime-router/src/test/kotlin/fhirengine/azure/FHIRTranslatorIntegrationTests.kt b/prime-router/src/test/kotlin/fhirengine/azure/FHIRTranslatorIntegrationTests.kt index 99eada67991..d8a9adeb1dd 100644 --- a/prime-router/src/test/kotlin/fhirengine/azure/FHIRTranslatorIntegrationTests.kt +++ b/prime-router/src/test/kotlin/fhirengine/azure/FHIRTranslatorIntegrationTests.kt @@ -640,7 +640,7 @@ class FHIRTranslatorIntegrationTests : Logging { } @Test - fun `successfully translate HL7 for FHIR receiver when isSendOriginal is true`() { + fun `successfully translate HL7 for FHIR receiver when isSendOriginal is true from receive step`() { // set up val receiverSetupData = listOf( UniversalPipelineTestUtils.ReceiverSetupData( @@ -720,7 +720,7 @@ class FHIRTranslatorIntegrationTests : Logging { } @Test - fun `successfully translate for FHIR receiver when isSendOriginal is true`() { + fun `successfully translate for FHIR receiver when isSendOriginal is true from receive step`() { // set up val receiverSetupData = listOf( UniversalPipelineTestUtils.ReceiverSetupData( @@ -798,4 +798,166 @@ class FHIRTranslatorIntegrationTests : Logging { assertThat(translatedValue).isEqualTo(reportContents.toByteArray()) } } + + @Test + fun `successfully translate HL7 for FHIR receiver when isSendOriginal is true from convert step`() { + // set up + val receiverSetupData = listOf( + UniversalPipelineTestUtils.ReceiverSetupData( + "x", + jurisdictionalFilter = listOf("true"), + qualityFilter = listOf("true"), + routingFilter = listOf("true"), + conditionFilter = listOf("true"), + format = MimeFormat.FHIR + ) + ) + val receivers = UniversalPipelineTestUtils.createReceivers(receiverSetupData) + val org = UniversalPipelineTestUtils.createOrganizationWithReceivers(receivers) + val translator = createFHIRTranslator(azureEventService, org) + val reportContents = File(HL7_WITH_BIRTH_TIME).readText() + val convertReport = UniversalPipelineTestUtils.createReport( + reportContents, + TaskAction.convert, + Event.EventAction.CONVERT, + azuriteContainer, + TaskAction.convert, + fileName = "originalhl7.hl7" + ) + val queueMessage = generateQueueMessage( + convertReport, + reportContents, + UniversalPipelineTestUtils.hl7SenderWithSendOriginal, + "phd.x" + ) + val fhirFunctions = UniversalPipelineTestUtils.createFHIRFunctionsInstance() + + // execute + fhirFunctions.process(queueMessage, 1, translator, ActionHistory(TaskAction.translate)) + + // check that send queue was updated + verify(exactly = 1) { + QueueAccess.sendMessage(QueueMessage.elrSendQueueName, any()) + } + + // check action table + UniversalPipelineTestUtils.checkActionTable(listOf(TaskAction.convert, TaskAction.translate)) + + // verify task and report_file tables were updated correctly in the Translate function (new task and new + // record file created) + ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> + val report = fetchChildReports(convertReport, txn, 1).single() + assertThat(report.nextAction).isEqualTo(TaskAction.send) + assertThat(report.receivingOrg).isEqualTo("phd") + assertThat(report.receivingOrgSvc).isEqualTo("x") + assertThat(report.schemaName).isEqualTo("None") + assertThat(report.schemaTopic).isEqualTo(Topic.ELR_ELIMS) + assertThat(report.bodyFormat).isEqualTo("HL7") + + val batchTask = DSL.using(txn).select(Task.TASK.asterisk()).from(Task.TASK) + .where(Task.TASK.NEXT_ACTION.eq(TaskAction.batch)) + .fetchOneInto(Task.TASK) + // verify batch queue task does not exist + assertThat(batchTask).isNull() + + val sendTask = DSL.using(txn).select(Task.TASK.asterisk()).from(Task.TASK) + .where(Task.TASK.NEXT_ACTION.eq(TaskAction.send)) + .fetchOneInto(Task.TASK) + // verify send queue task exists + assertThat(sendTask).isNotNull() + assertThat(sendTask!!.reportId).isEqualTo(report.reportId) + + // verify message format is HL7 and is for the expected receiver + assertThat(sendTask.receiverName).isEqualTo("phd.x") + assertThat(sendTask.bodyFormat).isEqualTo("HL7") + + // verify message matches the original HL7 input + val translatedValue = BlobAccess.downloadBlobAsByteArray( + report.bodyUrl, + UniversalPipelineTestUtils.getBlobContainerMetadata(azuriteContainer) + ) + assertThat(translatedValue).isEqualTo(reportContents.toByteArray()) + } + } + + @Test + fun `successfully translate for FHIR receiver when isSendOriginal is true from convert step`() { + // set up + val receiverSetupData = listOf( + UniversalPipelineTestUtils.ReceiverSetupData( + "x", + jurisdictionalFilter = listOf("true"), + qualityFilter = listOf("true"), + routingFilter = listOf("true"), + conditionFilter = listOf("true"), + format = MimeFormat.FHIR + ) + ) + val receivers = UniversalPipelineTestUtils.createReceivers(receiverSetupData) + val org = UniversalPipelineTestUtils.createOrganizationWithReceivers(receivers) + val translator = createFHIRTranslator(azureEventService, org) + val reportContents = File(MULTIPLE_TARGETS_FHIR_PATH).readText() + val convertReport = UniversalPipelineTestUtils.createReport( + reportContents, + TaskAction.convert, + Event.EventAction.CONVERT, + azuriteContainer, + TaskAction.convert + ) + + val queueMessage = generateQueueMessage( + convertReport, + reportContents, + UniversalPipelineTestUtils.fhirSenderWithSendOriginal, + "phd.x" + ) + val fhirFunctions = UniversalPipelineTestUtils.createFHIRFunctionsInstance() + + // execute + fhirFunctions.process(queueMessage, 1, translator, ActionHistory(TaskAction.translate)) + + // check that send queue was updated + verify(exactly = 1) { + QueueAccess.sendMessage(QueueMessage.elrSendQueueName, any()) + } + + // check action table + UniversalPipelineTestUtils.checkActionTable(listOf(TaskAction.convert, TaskAction.translate)) + + // verify task and report_file tables were updated correctly in the Translate function (new task and new + // record file created) + ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> + val report = fetchChildReports(convertReport, txn, 1).single() + assertThat(report.nextAction).isEqualTo(TaskAction.send) + assertThat(report.receivingOrg).isEqualTo("phd") + assertThat(report.receivingOrgSvc).isEqualTo("x") + assertThat(report.schemaName).isEqualTo("None") + assertThat(report.schemaTopic).isEqualTo(Topic.ELR_ELIMS) + assertThat(report.bodyFormat).isEqualTo("FHIR") + + val batchTask = DSL.using(txn).select(Task.TASK.asterisk()).from(Task.TASK) + .where(Task.TASK.NEXT_ACTION.eq(TaskAction.batch)) + .fetchOneInto(Task.TASK) + // verify batch queue task does not exist + assertThat(batchTask).isNull() + + val sendTask = DSL.using(txn).select(Task.TASK.asterisk()).from(Task.TASK) + .where(Task.TASK.NEXT_ACTION.eq(TaskAction.send)) + .fetchOneInto(Task.TASK) + // verify send queue task exists + assertThat(sendTask).isNotNull() + assertThat(sendTask!!.reportId).isEqualTo(report.reportId) + + // verify message format is FHIR and is for the expected receiver + assertThat(sendTask.receiverName).isEqualTo("phd.x") + assertThat(sendTask.bodyFormat).isEqualTo("FHIR") + + // verify message matches the original FHIR input + val translatedValue = BlobAccess.downloadBlobAsByteArray( + report.bodyUrl, + UniversalPipelineTestUtils.getBlobContainerMetadata(azuriteContainer) + ) + assertThat(translatedValue).isEqualTo(reportContents.toByteArray()) + } + } } \ No newline at end of file