Skip to content

Commit

Permalink
16409 do not use receive step to determine root report (#16723)
Browse files Browse the repository at this point in the history
* replace check for receive step with check for report_ids that do not appear as child in report_lineage

* remove unnecessary join to action table

* track payloadname for submissions from the submissions api

* add tests for sendoriginal from convert step

* order by action_id for consistent result ordering
  • Loading branch information
jack-h-wang authored Dec 13, 2024
1 parent 986d6fb commit 6cb62ec
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class FHIRConverter(
companion object {

private val clientIdHeader = "client_id"
private val payloadNameHeader = "payloadname"

/**
* Converts a [FhirConvertQueueMessage] into the input to the convert processing
Expand Down Expand Up @@ -157,6 +158,7 @@ class FHIRConverter(
val blobSubFolderName = message.blobSubFolderName

val clientId = message.headers[clientIdHeader]
val payloadName = message.headers[payloadNameHeader]
val sender = clientId?.takeIf { it.isNotBlank() }?.let { settings.findSender(it) }
if (sender == null) {
throw SubmissionSenderNotFound(clientId ?: "", reportId, blobUrl)
Expand All @@ -178,7 +180,8 @@ class FHIRConverter(
// is properly recorded in the report file table with the correct sender
actionHistory.trackExternalInputReport(
report,
BlobAccess.BlobInfo(format, blobUrl, blobDigest.toByteArray())
BlobAccess.BlobInfo(format, blobUrl, blobDigest.toByteArray()),
payloadName
)
actionHistory.trackActionSenderInfo(sender.fullName)

Expand Down
28 changes: 14 additions & 14 deletions prime-router/src/main/kotlin/history/db/ReportGraph.kt
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,10 @@ class ReportGraph(
}

/**
* Recursively goes up the report_linage table from any report until it reaches
* a report with an action type of "receive" (the root report)
* Recursively goes up the report_lineage table from any report until it reaches
* a report that does not appear in report_lineage as a child report (the root report)
*
* This will return null if no report with action type "receive" is present or if
* the root is passed in
* This will return null if the root is passed in
*/
fun getRootReport(childReportId: UUID): ReportFile? {
return db.transactReturning { txn ->
Expand Down Expand Up @@ -172,19 +171,19 @@ class ReportGraph(
.from(cte)
.join(REPORT_FILE)
.on(REPORT_FILE.REPORT_ID.eq(ItemGraphTable.ITEM_GRAPH.PARENT_REPORT_ID))
.join(ACTION)
.on(ACTION.ACTION_ID.eq(REPORT_FILE.ACTION_ID))
.where(ACTION.ACTION_NAME.eq(TaskAction.receive))
.leftJoin(REPORT_LINEAGE)
.on(REPORT_FILE.REPORT_ID.eq(REPORT_LINEAGE.CHILD_REPORT_ID))
.where(REPORT_LINEAGE.PARENT_REPORT_ID.isNull())
.orderBy(REPORT_FILE.ACTION_ID.asc())
.fetchOneInto(Item::class.java)
return rootItem
}

/**
* Recursively goes up the report_linage table from any report until it reaches
* all reports with an action type of "receive" (the root report)
* Recursively goes up the report_lineage table from any report until it reaches
* all reports that do not appear in report_lineage as a child report (the root report)
*
* This will return null if no report with action type "receive" is present or if
* the root is passed in
* This will return null if the root is passed in
*
* If the passed in report ID has multiple root reports, they will all be returned
*/
Expand Down Expand Up @@ -476,9 +475,10 @@ class ReportGraph(
.from(cte)
.join(REPORT_FILE)
.on(REPORT_FILE.REPORT_ID.eq(cte.field(0, UUID::class.java)))
.join(ACTION)
.on(ACTION.ACTION_ID.eq(REPORT_FILE.ACTION_ID))
.where(ACTION.ACTION_NAME.eq(TaskAction.receive))
.leftJoin(REPORT_LINEAGE)
.on(REPORT_FILE.REPORT_ID.eq(REPORT_LINEAGE.CHILD_REPORT_ID))
.where(REPORT_LINEAGE.PARENT_REPORT_ID.isNull())
.orderBy(REPORT_FILE.ACTION_ID.asc())

/**
* Accepts a list of ids and walks down the report lineage graph
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ object UniversalPipelineTestUtils {
fileName: String = "mr_fhir_face.fhir",
): Report {
val blobUrl = BlobAccess.uploadBlob(
"${TaskAction.receive.literal}/$fileName",
"${previousAction.literal}/$fileName",
reportContents.toByteArray(),
getBlobContainerMetadata(azuriteContainer)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ class FHIRTranslatorIntegrationTests : Logging {
}

@Test
fun `successfully translate HL7 for FHIR receiver when isSendOriginal is true`() {
fun `successfully translate HL7 for FHIR receiver when isSendOriginal is true from receive step`() {
// set up
val receiverSetupData = listOf(
UniversalPipelineTestUtils.ReceiverSetupData(
Expand Down Expand Up @@ -720,7 +720,7 @@ class FHIRTranslatorIntegrationTests : Logging {
}

@Test
fun `successfully translate for FHIR receiver when isSendOriginal is true`() {
fun `successfully translate for FHIR receiver when isSendOriginal is true from receive step`() {
// set up
val receiverSetupData = listOf(
UniversalPipelineTestUtils.ReceiverSetupData(
Expand Down Expand Up @@ -798,4 +798,166 @@ class FHIRTranslatorIntegrationTests : Logging {
assertThat(translatedValue).isEqualTo(reportContents.toByteArray())
}
}

@Test
fun `successfully translate HL7 for FHIR receiver when isSendOriginal is true from convert step`() {
// set up
val receiverSetupData = listOf(
UniversalPipelineTestUtils.ReceiverSetupData(
"x",
jurisdictionalFilter = listOf("true"),
qualityFilter = listOf("true"),
routingFilter = listOf("true"),
conditionFilter = listOf("true"),
format = MimeFormat.FHIR
)
)
val receivers = UniversalPipelineTestUtils.createReceivers(receiverSetupData)
val org = UniversalPipelineTestUtils.createOrganizationWithReceivers(receivers)
val translator = createFHIRTranslator(azureEventService, org)
val reportContents = File(HL7_WITH_BIRTH_TIME).readText()
val convertReport = UniversalPipelineTestUtils.createReport(
reportContents,
TaskAction.convert,
Event.EventAction.CONVERT,
azuriteContainer,
TaskAction.convert,
fileName = "originalhl7.hl7"
)
val queueMessage = generateQueueMessage(
convertReport,
reportContents,
UniversalPipelineTestUtils.hl7SenderWithSendOriginal,
"phd.x"
)
val fhirFunctions = UniversalPipelineTestUtils.createFHIRFunctionsInstance()

// execute
fhirFunctions.process(queueMessage, 1, translator, ActionHistory(TaskAction.translate))

// check that send queue was updated
verify(exactly = 1) {
QueueAccess.sendMessage(QueueMessage.elrSendQueueName, any())
}

// check action table
UniversalPipelineTestUtils.checkActionTable(listOf(TaskAction.convert, TaskAction.translate))

// verify task and report_file tables were updated correctly in the Translate function (new task and new
// record file created)
ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn ->
val report = fetchChildReports(convertReport, txn, 1).single()
assertThat(report.nextAction).isEqualTo(TaskAction.send)
assertThat(report.receivingOrg).isEqualTo("phd")
assertThat(report.receivingOrgSvc).isEqualTo("x")
assertThat(report.schemaName).isEqualTo("None")
assertThat(report.schemaTopic).isEqualTo(Topic.ELR_ELIMS)
assertThat(report.bodyFormat).isEqualTo("HL7")

val batchTask = DSL.using(txn).select(Task.TASK.asterisk()).from(Task.TASK)
.where(Task.TASK.NEXT_ACTION.eq(TaskAction.batch))
.fetchOneInto(Task.TASK)
// verify batch queue task does not exist
assertThat(batchTask).isNull()

val sendTask = DSL.using(txn).select(Task.TASK.asterisk()).from(Task.TASK)
.where(Task.TASK.NEXT_ACTION.eq(TaskAction.send))
.fetchOneInto(Task.TASK)
// verify send queue task exists
assertThat(sendTask).isNotNull()
assertThat(sendTask!!.reportId).isEqualTo(report.reportId)

// verify message format is HL7 and is for the expected receiver
assertThat(sendTask.receiverName).isEqualTo("phd.x")
assertThat(sendTask.bodyFormat).isEqualTo("HL7")

// verify message matches the original HL7 input
val translatedValue = BlobAccess.downloadBlobAsByteArray(
report.bodyUrl,
UniversalPipelineTestUtils.getBlobContainerMetadata(azuriteContainer)
)
assertThat(translatedValue).isEqualTo(reportContents.toByteArray())
}
}

@Test
fun `successfully translate for FHIR receiver when isSendOriginal is true from convert step`() {
// set up
val receiverSetupData = listOf(
UniversalPipelineTestUtils.ReceiverSetupData(
"x",
jurisdictionalFilter = listOf("true"),
qualityFilter = listOf("true"),
routingFilter = listOf("true"),
conditionFilter = listOf("true"),
format = MimeFormat.FHIR
)
)
val receivers = UniversalPipelineTestUtils.createReceivers(receiverSetupData)
val org = UniversalPipelineTestUtils.createOrganizationWithReceivers(receivers)
val translator = createFHIRTranslator(azureEventService, org)
val reportContents = File(MULTIPLE_TARGETS_FHIR_PATH).readText()
val convertReport = UniversalPipelineTestUtils.createReport(
reportContents,
TaskAction.convert,
Event.EventAction.CONVERT,
azuriteContainer,
TaskAction.convert
)

val queueMessage = generateQueueMessage(
convertReport,
reportContents,
UniversalPipelineTestUtils.fhirSenderWithSendOriginal,
"phd.x"
)
val fhirFunctions = UniversalPipelineTestUtils.createFHIRFunctionsInstance()

// execute
fhirFunctions.process(queueMessage, 1, translator, ActionHistory(TaskAction.translate))

// check that send queue was updated
verify(exactly = 1) {
QueueAccess.sendMessage(QueueMessage.elrSendQueueName, any())
}

// check action table
UniversalPipelineTestUtils.checkActionTable(listOf(TaskAction.convert, TaskAction.translate))

// verify task and report_file tables were updated correctly in the Translate function (new task and new
// record file created)
ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn ->
val report = fetchChildReports(convertReport, txn, 1).single()
assertThat(report.nextAction).isEqualTo(TaskAction.send)
assertThat(report.receivingOrg).isEqualTo("phd")
assertThat(report.receivingOrgSvc).isEqualTo("x")
assertThat(report.schemaName).isEqualTo("None")
assertThat(report.schemaTopic).isEqualTo(Topic.ELR_ELIMS)
assertThat(report.bodyFormat).isEqualTo("FHIR")

val batchTask = DSL.using(txn).select(Task.TASK.asterisk()).from(Task.TASK)
.where(Task.TASK.NEXT_ACTION.eq(TaskAction.batch))
.fetchOneInto(Task.TASK)
// verify batch queue task does not exist
assertThat(batchTask).isNull()

val sendTask = DSL.using(txn).select(Task.TASK.asterisk()).from(Task.TASK)
.where(Task.TASK.NEXT_ACTION.eq(TaskAction.send))
.fetchOneInto(Task.TASK)
// verify send queue task exists
assertThat(sendTask).isNotNull()
assertThat(sendTask!!.reportId).isEqualTo(report.reportId)

// verify message format is FHIR and is for the expected receiver
assertThat(sendTask.receiverName).isEqualTo("phd.x")
assertThat(sendTask.bodyFormat).isEqualTo("FHIR")

// verify message matches the original FHIR input
val translatedValue = BlobAccess.downloadBlobAsByteArray(
report.bodyUrl,
UniversalPipelineTestUtils.getBlobContainerMetadata(azuriteContainer)
)
assertThat(translatedValue).isEqualTo(reportContents.toByteArray())
}
}
}

0 comments on commit 6cb62ec

Please sign in to comment.