Skip to content

Commit

Permalink
15766: item sent event
Browse files Browse the repository at this point in the history
  • Loading branch information
mkalish committed Sep 19, 2024
1 parent 7d4313a commit 4aac7e0
Show file tree
Hide file tree
Showing 21 changed files with 190 additions and 61 deletions.
45 changes: 45 additions & 0 deletions prime-router/src/main/kotlin/azure/ActionHistory.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import com.microsoft.azure.functions.HttpRequestMessage
import com.microsoft.azure.functions.HttpResponseMessage
import com.microsoft.azure.functions.HttpStatusType
import com.networknt.org.apache.commons.validator.routines.InetAddressValidator
import fhirengine.engine.CustomFhirPathFunctions
import gov.cdc.prime.reportstream.shared.BlobUtils
import gov.cdc.prime.router.ActionLog
import gov.cdc.prime.router.ActionLogLevel
import gov.cdc.prime.router.ClientSource
Expand All @@ -25,11 +27,16 @@ import gov.cdc.prime.router.azure.db.tables.pojos.ItemLineage
import gov.cdc.prime.router.azure.db.tables.pojos.ReportFile
import gov.cdc.prime.router.azure.db.tables.pojos.ReportLineage
import gov.cdc.prime.router.azure.db.tables.pojos.Task
import gov.cdc.prime.router.azure.observability.bundleDigest.BundleDigestExtractor
import gov.cdc.prime.router.azure.observability.bundleDigest.FhirPathBundleDigestLabResultExtractorStrategy
import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventName
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventProperties
import gov.cdc.prime.router.common.AzureHttpUtils.getSenderIP
import gov.cdc.prime.router.common.JacksonMapperUtilities
import gov.cdc.prime.router.fhirengine.translation.hl7.utils.CustomContext
import gov.cdc.prime.router.fhirengine.utils.FhirTranscoder
import gov.cdc.prime.router.report.ReportService
import io.ktor.http.HttpStatusCode
import org.apache.logging.log4j.kotlin.Logging
import org.jooq.impl.SQLDataType
Expand Down Expand Up @@ -565,6 +572,7 @@ class ActionHistory(
result: String,
header: WorkflowEngine.Header,
reportEventService: IReportStreamEventService,
reportService: ReportService,
transportType: String,
) {
if (isReportAlreadyTracked(sentReportId)) {
Expand Down Expand Up @@ -616,6 +624,43 @@ class ActionHistory(
)
}

itemLineages.forEach { itemLineage ->
val receiverFilterReportFile = reportService.getReportsForStep(
itemLineage.parentReportId,
itemLineage.parentIndex,
TaskAction.receiver_filter
)
if (receiverFilterReportFile != null) {
val blob = BlobAccess.downloadBlob(
receiverFilterReportFile.bodyUrl,
BlobUtils.digestToString(receiverFilterReportFile.blobDigest)
)
val bundle = FhirTranscoder.decode(blob)
val bundleDigestExtractor = BundleDigestExtractor(
FhirPathBundleDigestLabResultExtractorStrategy(
CustomContext(
bundle,
bundle,
mutableMapOf(),
CustomFhirPathFunctions()
)
)
)
reportEventService.sendItemEvent(ReportStreamEventName.ITEM_SENT, reportFile, TaskAction.send) {
trackingId(bundle)
parentReportId(header.reportFile.reportId)
childItemIndex(itemLineage.childIndex)
params(
mapOf(
ReportStreamEventProperties.BUNDLE_DIGEST
to bundleDigestExtractor.generateDigest(bundle),
)
)
}
} else {
logger.error("No translate report found for sent item.")
}
}
reportsOut[reportFile.reportId] = reportFile
}

Expand Down
3 changes: 2 additions & 1 deletion prime-router/src/main/kotlin/azure/SendFunction.kt
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ class SendFunction(
retryItems,
context,
actionHistory,
reportEventService
reportEventService,
workflowEngine.reportService
)
if (nextRetry != null) {
nextRetryItems += nextRetry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ enum class ReportStreamEventName {
ITEM_ROUTED,
REPORT_LAST_MILE_FAILURE,
REPORT_NOT_PROCESSABLE,
ITEM_SENT,
}

/**
Expand Down
53 changes: 44 additions & 9 deletions prime-router/src/main/kotlin/history/db/ReportGraph.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import org.jooq.CommonTableExpression
import org.jooq.DSLContext
import org.jooq.Record
import org.jooq.Record1
import org.jooq.Record2
import org.jooq.SelectOnConditionStep
import org.jooq.impl.CustomRecord
import org.jooq.impl.CustomTable
Expand Down Expand Up @@ -213,6 +212,17 @@ class ReportGraph(
return descendantReportRecords(txn, cte, searchedForTaskActions).fetchInto(ReportFile::class.java)
}

fun getAncestorReports(
txn: DataAccessTransaction,
childReportId: UUID,
childIndex: Int,
searchedForTaskActions: Set<TaskAction>? = null,
): ReportFile? {
val cte = itemAncestorGraphCommonTableExpression(childReportId, childIndex)

return ancestorReportRecords(txn, cte, searchedForTaskActions).fetchOneInto(ReportFile::class.java)
}

/**
* Returns all the metadata rows associated with the passed in [ItemGraphRecord]
*
Expand Down Expand Up @@ -421,19 +431,15 @@ class ReportGraph(
*/
fun reportAncestorGraphCommonTableExpression(childReportIds: List<UUID>) =
DSL.name(lineageCteName).fields(
PARENT_REPORT_ID_FIELD,
PATH_FIELD
PARENT_REPORT_ID_FIELD
).`as`(
DSL.select(
REPORT_LINEAGE.PARENT_REPORT_ID,
REPORT_LINEAGE.CHILD_REPORT_ID.cast(SQLDataType.VARCHAR),
REPORT_LINEAGE.PARENT_REPORT_ID
).from(REPORT_LINEAGE)
.where(REPORT_LINEAGE.CHILD_REPORT_ID.`in`(childReportIds))
.unionAll(
DSL.select(
REPORT_LINEAGE.PARENT_REPORT_ID,
DSL.field("$lineageCteName.$PATH_FIELD", SQLDataType.VARCHAR)
.concat(REPORT_LINEAGE.PARENT_REPORT_ID)
REPORT_LINEAGE.PARENT_REPORT_ID
)
.from(REPORT_LINEAGE)
.join(DSL.table(DSL.name(lineageCteName)))
Expand All @@ -454,7 +460,7 @@ class ReportGraph(
*/
private fun rootReportRecords(
txn: DataAccessTransaction,
cte: CommonTableExpression<Record2<UUID, String>>,
cte: CommonTableExpression<Record1<UUID>>,
) = DSL.using(txn)
.withRecursive(cte)
.select(REPORT_FILE.asterisk())
Expand Down Expand Up @@ -520,4 +526,33 @@ class ReportGraph(

return select
}

/**
* Fetches all descendant report records in a recursive manner.
*
* @param txn the data access transaction
* @param cte the common table expression for report lineage
* @return the descendant report records
*/
private fun ancestorReportRecords(
txn: DataAccessTransaction,
cte: CommonTableExpression<ItemGraphRecord>,
searchedForTaskActions: Set<TaskAction>?,
): SelectOnConditionStep<Record> {
val select = DSL.using(txn)
.withRecursive(cte)
.select(REPORT_FILE.asterisk())
.distinctOn(REPORT_FILE.REPORT_ID)
.from(cte)
.join(REPORT_FILE)
.on(REPORT_FILE.REPORT_ID.eq(ItemGraphTable.ITEM_GRAPH.PARENT_REPORT_ID))
.join(ACTION)
.on(ACTION.ACTION_ID.eq(REPORT_FILE.ACTION_ID))

if (searchedForTaskActions != null) {
select.where(ACTION.ACTION_NAME.`in`(searchedForTaskActions))
}

return select
}
}
7 changes: 7 additions & 0 deletions prime-router/src/main/kotlin/report/ReportService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gov.cdc.prime.router.report

import gov.cdc.prime.router.ReportId
import gov.cdc.prime.router.azure.DatabaseAccess
import gov.cdc.prime.router.azure.db.enums.TaskAction
import gov.cdc.prime.router.azure.db.tables.pojos.ReportFile
import gov.cdc.prime.router.common.BaseEngine
import gov.cdc.prime.router.history.db.ReportGraph
Expand Down Expand Up @@ -50,6 +51,12 @@ class ReportService(
return reportGraph.getRootReports(childReportId)
}

fun getReportsForStep(childReportId: ReportId, childIndex: Int, task: TaskAction): ReportFile? {
return db.transactReturning { txn ->
reportGraph.getAncestorReports(txn, childReportId, childIndex, setOf(task))
}
}

/**
* Gets the root report and concatenates sender fields
*
Expand Down
5 changes: 4 additions & 1 deletion prime-router/src/main/kotlin/transport/AS2Transport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService
import gov.cdc.prime.router.credentials.CredentialHelper
import gov.cdc.prime.router.credentials.CredentialRequestReason
import gov.cdc.prime.router.credentials.UserJksCredential
import gov.cdc.prime.router.report.ReportService
import org.apache.hc.core5.util.Timeout
import org.apache.http.conn.ConnectTimeoutException
import org.apache.logging.log4j.kotlin.Logging
Expand Down Expand Up @@ -50,6 +51,7 @@ class AS2Transport(val metadata: Metadata? = null) : ITransport, Logging {
context: ExecutionContext,
actionHistory: ActionHistory,
reportEventService: IReportStreamEventService,
reportService: ReportService,
): RetryItems? {
// DevNote: This code is similar to the SFTP code in structure
//
Expand All @@ -70,6 +72,7 @@ class AS2Transport(val metadata: Metadata? = null) : ITransport, Logging {
val msg = "${receiver.fullName}: Successful upload of $reportId"
context.logger.info(msg)
actionHistory.trackActionResult(msg)
actionHistory.trackItemLineages(Report.createItemLineagesFromDb(header, sentReportId))
actionHistory.trackSentReport(
receiver,
sentReportId,
Expand All @@ -78,9 +81,9 @@ class AS2Transport(val metadata: Metadata? = null) : ITransport, Logging {
msg,
header,
reportEventService,
reportService,
this::class.java.simpleName
)
actionHistory.trackItemLineages(Report.createItemLineagesFromDb(header, sentReportId))
null
} catch (t: Throwable) {
val receiverFullName = header.receiver?.fullName ?: "null"
Expand Down
5 changes: 4 additions & 1 deletion prime-router/src/main/kotlin/transport/BlobStoreTransport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import gov.cdc.prime.router.azure.BlobAccess
import gov.cdc.prime.router.azure.WorkflowEngine
import gov.cdc.prime.router.azure.db.enums.TaskAction
import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService
import gov.cdc.prime.router.report.ReportService

class BlobStoreTransport : ITransport {
override fun send(
Expand All @@ -21,6 +22,7 @@ class BlobStoreTransport : ITransport {
context: ExecutionContext,
actionHistory: ActionHistory,
reportEventService: IReportStreamEventService,
reportService: ReportService,
): RetryItems? {
val blobTransportType = transportType as BlobStoreTransportType
val envVar: String = blobTransportType.containerName
Expand All @@ -33,6 +35,7 @@ class BlobStoreTransport : ITransport {
val msg = "Successfully copied $bodyUrl to $newUrl"
context.logger.info(msg)
actionHistory.trackActionResult(msg)
actionHistory.trackItemLineages(Report.createItemLineagesFromDb(header, sentReportId))
actionHistory.trackSentReport(
receiver,
sentReportId,
Expand All @@ -41,9 +44,9 @@ class BlobStoreTransport : ITransport {
msg,
header,
reportEventService,
reportService,
this::class.java.simpleName
)
actionHistory.trackItemLineages(Report.createItemLineagesFromDb(header, sentReportId))
null
} catch (t: Throwable) {
val msg =
Expand Down
2 changes: 2 additions & 0 deletions prime-router/src/main/kotlin/transport/EmailTransport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import gov.cdc.prime.router.TransportType
import gov.cdc.prime.router.azure.ActionHistory
import gov.cdc.prime.router.azure.WorkflowEngine
import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService
import gov.cdc.prime.router.report.ReportService
import org.thymeleaf.TemplateEngine
import org.thymeleaf.context.Context
import org.thymeleaf.templateresolver.StringTemplateResolver
Expand All @@ -33,6 +34,7 @@ class EmailTransport : ITransport {
context: ExecutionContext,
actionHistory: ActionHistory, // not used by emailer
reportEventService: IReportStreamEventService,
reportService: ReportService,
): RetryItems? {
val emailTransport = transportType as EmailTransportType
val content = buildContent(header)
Expand Down
13 changes: 10 additions & 3 deletions prime-router/src/main/kotlin/transport/GAENTransport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import gov.cdc.prime.router.common.HttpClientUtils
import gov.cdc.prime.router.credentials.CredentialHelper
import gov.cdc.prime.router.credentials.CredentialRequestReason
import gov.cdc.prime.router.credentials.UserApiKeyCredential
import gov.cdc.prime.router.report.ReportService
import io.ktor.client.HttpClient
import io.ktor.http.ContentType
import io.ktor.http.HttpStatusCode
Expand Down Expand Up @@ -78,6 +79,7 @@ class GAENTransport(val httpClient: HttpClient? = null) : ITransport, Logging {
context: ExecutionContext,
actionHistory: ActionHistory,
reportEventService: IReportStreamEventService,
reportService: ReportService,
): RetryItems? {
val gaenTransportInfo = transportType as GAENTransportType
val reportId = header.reportFile.reportId
Expand Down Expand Up @@ -106,7 +108,7 @@ class GAENTransport(val httpClient: HttpClient? = null) : ITransport, Logging {

// Record the work in history and logs
when (postResult) {
PostResult.SUCCESS -> recordFullSuccess(params, reportEventService)
PostResult.SUCCESS -> recordFullSuccess(params, reportEventService, reportService)
PostResult.RETRY -> recordFailureWithRetry(params)
PostResult.FAIL -> recordFailure(params)
}
Expand All @@ -123,12 +125,17 @@ class GAENTransport(val httpClient: HttpClient? = null) : ITransport, Logging {
/**
* Record in [ActionHistory] the full success of this notification. Log an info message as well.
*/
private fun recordFullSuccess(params: SendParams, reportEventService: IReportStreamEventService) {
private fun recordFullSuccess(
params: SendParams,
reportEventService: IReportStreamEventService,
reportService: ReportService,
) {
val msg = "${params.receiver.fullName}: Successful exposure notifications of ${params.comboId}"
val history = params.actionHistory
params.context.logger.info(msg)
history.setActionType(TaskAction.send)
history.trackActionResult(msg)
history.trackItemLineages(Report.createItemLineagesFromDb(params.header, params.sentReportId))
history.trackSentReport(
params.receiver,
params.sentReportId,
Expand All @@ -137,9 +144,9 @@ class GAENTransport(val httpClient: HttpClient? = null) : ITransport, Logging {
msg,
params.header,
reportEventService,
reportService,
this::class.java.simpleName
)
history.trackItemLineages(Report.createItemLineagesFromDb(params.header, params.sentReportId))
}

/**
Expand Down
2 changes: 2 additions & 0 deletions prime-router/src/main/kotlin/transport/ITransport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import gov.cdc.prime.router.TransportType
import gov.cdc.prime.router.azure.ActionHistory
import gov.cdc.prime.router.azure.WorkflowEngine
import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService
import gov.cdc.prime.router.report.ReportService

interface ITransport {
/**
Expand All @@ -26,5 +27,6 @@ interface ITransport {
context: ExecutionContext,
actionHistory: ActionHistory,
reportEventService: IReportStreamEventService,
reportService: ReportService,
): RetryItems?
}
Loading

0 comments on commit 4aac7e0

Please sign in to comment.