Skip to content

Commit

Permalink
15766: item sent event
Browse files Browse the repository at this point in the history
  • Loading branch information
mkalish committed Sep 18, 2024
1 parent 7d4313a commit 2f64dd9
Show file tree
Hide file tree
Showing 20 changed files with 130 additions and 54 deletions.
34 changes: 34 additions & 0 deletions prime-router/src/main/kotlin/azure/ActionHistory.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import com.microsoft.azure.functions.HttpRequestMessage
import com.microsoft.azure.functions.HttpResponseMessage
import com.microsoft.azure.functions.HttpStatusType
import com.networknt.org.apache.commons.validator.routines.InetAddressValidator
import fhirengine.engine.CustomFhirPathFunctions
import gov.cdc.prime.reportstream.shared.BlobUtils
import gov.cdc.prime.router.ActionLog
import gov.cdc.prime.router.ActionLogLevel
import gov.cdc.prime.router.ClientSource
Expand All @@ -25,11 +27,16 @@ import gov.cdc.prime.router.azure.db.tables.pojos.ItemLineage
import gov.cdc.prime.router.azure.db.tables.pojos.ReportFile
import gov.cdc.prime.router.azure.db.tables.pojos.ReportLineage
import gov.cdc.prime.router.azure.db.tables.pojos.Task
import gov.cdc.prime.router.azure.observability.bundleDigest.BundleDigestExtractor
import gov.cdc.prime.router.azure.observability.bundleDigest.FhirPathBundleDigestLabResultExtractorStrategy
import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventName
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventProperties
import gov.cdc.prime.router.common.AzureHttpUtils.getSenderIP
import gov.cdc.prime.router.common.JacksonMapperUtilities
import gov.cdc.prime.router.fhirengine.translation.hl7.utils.CustomContext
import gov.cdc.prime.router.fhirengine.utils.FhirTranscoder
import gov.cdc.prime.router.report.ReportService
import io.ktor.http.HttpStatusCode
import org.apache.logging.log4j.kotlin.Logging
import org.jooq.impl.SQLDataType
Expand Down Expand Up @@ -565,6 +572,7 @@ class ActionHistory(
result: String,
header: WorkflowEngine.Header,
reportEventService: IReportStreamEventService,
reportService: ReportService,
transportType: String,
) {
if (isReportAlreadyTracked(sentReportId)) {
Expand Down Expand Up @@ -616,6 +624,32 @@ class ActionHistory(
)
}

val reportFiles =
reportService.getReportsForStep(header.reportFile.reportId, TaskAction.receiver_filter)
reportFiles.forEach { file ->
val blob = BlobAccess.downloadBlob(file.bodyUrl, BlobUtils.digestToString(file.blobDigest))
val bundle = FhirTranscoder.decode(blob)
val bundleDigestExtractor = BundleDigestExtractor(
FhirPathBundleDigestLabResultExtractorStrategy(
CustomContext(
bundle,
bundle,
mutableMapOf(),
CustomFhirPathFunctions()
)
)
)
reportEventService.sendItemEvent(ReportStreamEventName.ITEM_SENT, reportFile, TaskAction.send) {
trackingId(bundle)
parentReportId(header.reportFile.reportId)
params(
mapOf(
ReportStreamEventProperties.BUNDLE_DIGEST
to bundleDigestExtractor.generateDigest(bundle),
)
)
}
}
reportsOut[reportFile.reportId] = reportFile
}

Expand Down
3 changes: 2 additions & 1 deletion prime-router/src/main/kotlin/azure/SendFunction.kt
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ class SendFunction(
retryItems,
context,
actionHistory,
reportEventService
reportEventService,
workflowEngine.reportService
)
if (nextRetry != null) {
nextRetryItems += nextRetry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ enum class ReportStreamEventName {
ITEM_ROUTED,
REPORT_LAST_MILE_FAILURE,
REPORT_NOT_PROCESSABLE,
ITEM_SENT,
}

/**
Expand Down
22 changes: 13 additions & 9 deletions prime-router/src/main/kotlin/history/db/ReportGraph.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import org.jooq.CommonTableExpression
import org.jooq.DSLContext
import org.jooq.Record
import org.jooq.Record1
import org.jooq.Record2
import org.jooq.SelectOnConditionStep
import org.jooq.impl.CustomRecord
import org.jooq.impl.CustomTable
Expand Down Expand Up @@ -213,6 +212,15 @@ class ReportGraph(
return descendantReportRecords(txn, cte, searchedForTaskActions).fetchInto(ReportFile::class.java)
}

fun getAncestorReports(
txn: DataAccessTransaction,
childReportId: UUID,
searchedForTaskActions: Set<TaskAction>? = null,
): List<ReportFile> {
val cte = reportAncestorGraphCommonTableExpression(listOf(childReportId))
return descendantReportRecords(txn, cte, searchedForTaskActions).fetchInto(ReportFile::class.java)
}

/**
* Returns all the metadata rows associated with the passed in [ItemGraphRecord]
*
Expand Down Expand Up @@ -421,19 +429,15 @@ class ReportGraph(
*/
fun reportAncestorGraphCommonTableExpression(childReportIds: List<UUID>) =
DSL.name(lineageCteName).fields(
PARENT_REPORT_ID_FIELD,
PATH_FIELD
PARENT_REPORT_ID_FIELD
).`as`(
DSL.select(
REPORT_LINEAGE.PARENT_REPORT_ID,
REPORT_LINEAGE.CHILD_REPORT_ID.cast(SQLDataType.VARCHAR),
REPORT_LINEAGE.PARENT_REPORT_ID
).from(REPORT_LINEAGE)
.where(REPORT_LINEAGE.CHILD_REPORT_ID.`in`(childReportIds))
.unionAll(
DSL.select(
REPORT_LINEAGE.PARENT_REPORT_ID,
DSL.field("$lineageCteName.$PATH_FIELD", SQLDataType.VARCHAR)
.concat(REPORT_LINEAGE.PARENT_REPORT_ID)
REPORT_LINEAGE.PARENT_REPORT_ID
)
.from(REPORT_LINEAGE)
.join(DSL.table(DSL.name(lineageCteName)))
Expand All @@ -454,7 +458,7 @@ class ReportGraph(
*/
private fun rootReportRecords(
txn: DataAccessTransaction,
cte: CommonTableExpression<Record2<UUID, String>>,
cte: CommonTableExpression<Record1<UUID>>,
) = DSL.using(txn)
.withRecursive(cte)
.select(REPORT_FILE.asterisk())
Expand Down
7 changes: 7 additions & 0 deletions prime-router/src/main/kotlin/report/ReportService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gov.cdc.prime.router.report

import gov.cdc.prime.router.ReportId
import gov.cdc.prime.router.azure.DatabaseAccess
import gov.cdc.prime.router.azure.db.enums.TaskAction
import gov.cdc.prime.router.azure.db.tables.pojos.ReportFile
import gov.cdc.prime.router.common.BaseEngine
import gov.cdc.prime.router.history.db.ReportGraph
Expand Down Expand Up @@ -50,6 +51,12 @@ class ReportService(
return reportGraph.getRootReports(childReportId)
}

fun getReportsForStep(childReportId: ReportId, task: TaskAction): List<ReportFile> {
return db.transactReturning { txn ->
reportGraph.getAncestorReports(txn, childReportId, setOf(task))
}
}

/**
* Gets the root report and concatenates sender fields
*
Expand Down
3 changes: 3 additions & 0 deletions prime-router/src/main/kotlin/transport/AS2Transport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService
import gov.cdc.prime.router.credentials.CredentialHelper
import gov.cdc.prime.router.credentials.CredentialRequestReason
import gov.cdc.prime.router.credentials.UserJksCredential
import gov.cdc.prime.router.report.ReportService
import org.apache.hc.core5.util.Timeout
import org.apache.http.conn.ConnectTimeoutException
import org.apache.logging.log4j.kotlin.Logging
Expand Down Expand Up @@ -50,6 +51,7 @@ class AS2Transport(val metadata: Metadata? = null) : ITransport, Logging {
context: ExecutionContext,
actionHistory: ActionHistory,
reportEventService: IReportStreamEventService,
reportService: ReportService,
): RetryItems? {
// DevNote: This code is similar to the SFTP code in structure
//
Expand Down Expand Up @@ -78,6 +80,7 @@ class AS2Transport(val metadata: Metadata? = null) : ITransport, Logging {
msg,
header,
reportEventService,
reportService,
this::class.java.simpleName
)
actionHistory.trackItemLineages(Report.createItemLineagesFromDb(header, sentReportId))
Expand Down
3 changes: 3 additions & 0 deletions prime-router/src/main/kotlin/transport/BlobStoreTransport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import gov.cdc.prime.router.azure.BlobAccess
import gov.cdc.prime.router.azure.WorkflowEngine
import gov.cdc.prime.router.azure.db.enums.TaskAction
import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService
import gov.cdc.prime.router.report.ReportService

class BlobStoreTransport : ITransport {
override fun send(
Expand All @@ -21,6 +22,7 @@ class BlobStoreTransport : ITransport {
context: ExecutionContext,
actionHistory: ActionHistory,
reportEventService: IReportStreamEventService,
reportService: ReportService,
): RetryItems? {
val blobTransportType = transportType as BlobStoreTransportType
val envVar: String = blobTransportType.containerName
Expand All @@ -41,6 +43,7 @@ class BlobStoreTransport : ITransport {
msg,
header,
reportEventService,
reportService,
this::class.java.simpleName
)
actionHistory.trackItemLineages(Report.createItemLineagesFromDb(header, sentReportId))
Expand Down
2 changes: 2 additions & 0 deletions prime-router/src/main/kotlin/transport/EmailTransport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import gov.cdc.prime.router.TransportType
import gov.cdc.prime.router.azure.ActionHistory
import gov.cdc.prime.router.azure.WorkflowEngine
import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService
import gov.cdc.prime.router.report.ReportService
import org.thymeleaf.TemplateEngine
import org.thymeleaf.context.Context
import org.thymeleaf.templateresolver.StringTemplateResolver
Expand All @@ -33,6 +34,7 @@ class EmailTransport : ITransport {
context: ExecutionContext,
actionHistory: ActionHistory, // not used by emailer
reportEventService: IReportStreamEventService,
reportService: ReportService,
): RetryItems? {
val emailTransport = transportType as EmailTransportType
val content = buildContent(header)
Expand Down
11 changes: 9 additions & 2 deletions prime-router/src/main/kotlin/transport/GAENTransport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import gov.cdc.prime.router.common.HttpClientUtils
import gov.cdc.prime.router.credentials.CredentialHelper
import gov.cdc.prime.router.credentials.CredentialRequestReason
import gov.cdc.prime.router.credentials.UserApiKeyCredential
import gov.cdc.prime.router.report.ReportService
import io.ktor.client.HttpClient
import io.ktor.http.ContentType
import io.ktor.http.HttpStatusCode
Expand Down Expand Up @@ -78,6 +79,7 @@ class GAENTransport(val httpClient: HttpClient? = null) : ITransport, Logging {
context: ExecutionContext,
actionHistory: ActionHistory,
reportEventService: IReportStreamEventService,
reportService: ReportService,
): RetryItems? {
val gaenTransportInfo = transportType as GAENTransportType
val reportId = header.reportFile.reportId
Expand Down Expand Up @@ -106,7 +108,7 @@ class GAENTransport(val httpClient: HttpClient? = null) : ITransport, Logging {

// Record the work in history and logs
when (postResult) {
PostResult.SUCCESS -> recordFullSuccess(params, reportEventService)
PostResult.SUCCESS -> recordFullSuccess(params, reportEventService, reportService)
PostResult.RETRY -> recordFailureWithRetry(params)
PostResult.FAIL -> recordFailure(params)
}
Expand All @@ -123,7 +125,11 @@ class GAENTransport(val httpClient: HttpClient? = null) : ITransport, Logging {
/**
* Record in [ActionHistory] the full success of this notification. Log an info message as well.
*/
private fun recordFullSuccess(params: SendParams, reportEventService: IReportStreamEventService) {
private fun recordFullSuccess(
params: SendParams,
reportEventService: IReportStreamEventService,
reportService: ReportService,
) {
val msg = "${params.receiver.fullName}: Successful exposure notifications of ${params.comboId}"
val history = params.actionHistory
params.context.logger.info(msg)
Expand All @@ -137,6 +143,7 @@ class GAENTransport(val httpClient: HttpClient? = null) : ITransport, Logging {
msg,
params.header,
reportEventService,
reportService,
this::class.java.simpleName
)
history.trackItemLineages(Report.createItemLineagesFromDb(params.header, params.sentReportId))
Expand Down
2 changes: 2 additions & 0 deletions prime-router/src/main/kotlin/transport/ITransport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import gov.cdc.prime.router.TransportType
import gov.cdc.prime.router.azure.ActionHistory
import gov.cdc.prime.router.azure.WorkflowEngine
import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService
import gov.cdc.prime.router.report.ReportService

interface ITransport {
/**
Expand All @@ -26,5 +27,6 @@ interface ITransport {
context: ExecutionContext,
actionHistory: ActionHistory,
reportEventService: IReportStreamEventService,
reportService: ReportService,
): RetryItems?
}
3 changes: 3 additions & 0 deletions prime-router/src/main/kotlin/transport/NullTransport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import gov.cdc.prime.router.TransportType
import gov.cdc.prime.router.azure.ActionHistory
import gov.cdc.prime.router.azure.WorkflowEngine
import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService
import gov.cdc.prime.router.report.ReportService

/**
* The Null transport is intended for testing and benchmarking purposes.
Expand All @@ -21,6 +22,7 @@ class NullTransport : ITransport {
context: ExecutionContext,
actionHistory: ActionHistory,
reportEventService: IReportStreamEventService,
reportService: ReportService,
): RetryItems? {
if (header.content == null) error("No content for report ${header.reportFile.reportId}")
val receiver = header.receiver ?: error("No receiver defined for report ${header.reportFile.reportId}")
Expand All @@ -34,6 +36,7 @@ class NullTransport : ITransport {
msg,
header,
reportEventService,
reportService,
this::class.java.simpleName
)
actionHistory.trackItemLineages(Report.createItemLineagesFromDb(header, sentReportId))
Expand Down
3 changes: 3 additions & 0 deletions prime-router/src/main/kotlin/transport/RESTTransport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import gov.cdc.prime.router.credentials.UserApiKeyCredential
import gov.cdc.prime.router.credentials.UserAssertionCredential
import gov.cdc.prime.router.credentials.UserJksCredential
import gov.cdc.prime.router.credentials.UserPassCredential
import gov.cdc.prime.router.report.ReportService
import gov.cdc.prime.router.tokens.AuthUtils
import io.ktor.client.HttpClient
import io.ktor.client.call.body
Expand Down Expand Up @@ -93,6 +94,7 @@ class RESTTransport(private val httpClient: HttpClient? = null) : ITransport {
context: ExecutionContext,
actionHistory: ActionHistory,
reportEventService: IReportStreamEventService,
reportService: ReportService,
): RetryItems? {
val logger: Logger = context.logger

Expand Down Expand Up @@ -157,6 +159,7 @@ class RESTTransport(private val httpClient: HttpClient? = null) : ITransport {
msg,
header,
reportEventService,
reportService,
this::class.java.simpleName
)
actionHistory.trackItemLineages(Report.createItemLineagesFromDb(header, sentReportId))
Expand Down
3 changes: 3 additions & 0 deletions prime-router/src/main/kotlin/transport/SftpTransport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import gov.cdc.prime.router.credentials.SftpCredential
import gov.cdc.prime.router.credentials.UserPassCredential
import gov.cdc.prime.router.credentials.UserPemCredential
import gov.cdc.prime.router.credentials.UserPpkCredential
import gov.cdc.prime.router.report.ReportService
import net.schmizz.sshj.DefaultConfig
import net.schmizz.sshj.SSHClient
import net.schmizz.sshj.sftp.RemoteResourceFilter
Expand Down Expand Up @@ -48,6 +49,7 @@ class SftpTransport : ITransport, Logging {
context: ExecutionContext,
actionHistory: ActionHistory,
reportEventService: IReportStreamEventService,
reportService: ReportService,
): RetryItems? {
val sftpTransportType = transportType as SFTPTransportType

Expand All @@ -72,6 +74,7 @@ class SftpTransport : ITransport, Logging {
msg,
header,
reportEventService,
reportService,
this::class.java.simpleName
)
actionHistory.trackItemLineages(Report.createItemLineagesFromDb(header, sentReportId))
Expand Down
3 changes: 3 additions & 0 deletions prime-router/src/main/kotlin/transport/SoapTransport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import gov.cdc.prime.router.credentials.CredentialHelper
import gov.cdc.prime.router.credentials.CredentialRequestReason
import gov.cdc.prime.router.credentials.SoapCredential
import gov.cdc.prime.router.credentials.UserJksCredential
import gov.cdc.prime.router.report.ReportService
import gov.cdc.prime.router.serializers.SoapEnvelope
import gov.cdc.prime.router.serializers.SoapObjectService
import io.ktor.client.HttpClient
Expand Down Expand Up @@ -154,6 +155,7 @@ class SoapTransport(private val httpClient: HttpClient? = null) : ITransport {
context: ExecutionContext,
actionHistory: ActionHistory,
reportEventService: IReportStreamEventService,
reportService: ReportService,
): RetryItems? {
// verify that we have a SOAP transport type for our parameters. I think if we ever fell
// into this scenario with different parameters there's something seriously wrong in the system,
Expand Down Expand Up @@ -211,6 +213,7 @@ class SoapTransport(private val httpClient: HttpClient? = null) : ITransport {
msg,
header,
reportEventService,
reportService,
this::class.java.simpleName
)
actionHistory.trackItemLineages(Report.createItemLineagesFromDb(header, sentReportId))
Expand Down
Loading

0 comments on commit 2f64dd9

Please sign in to comment.