Skip to content

Commit

Permalink
create merge provenance resource
Browse files Browse the repository at this point in the history
  • Loading branch information
mrdnctrk committed Jan 8, 2025
1 parent b51ccdc commit 2d92405
Show file tree
Hide file tree
Showing 7 changed files with 254 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,13 @@
import org.hl7.fhir.instance.model.api.IBase;
import org.hl7.fhir.instance.model.api.IBaseOperationOutcome;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.Provenance;
import org.hl7.fhir.r4.model.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Date;

import static ca.uhn.fhir.batch2.jobs.merge.MergeAppCtx.JOB_MERGE;
import static ca.uhn.fhir.rest.api.Constants.STATUS_HTTP_200_OK;
import static ca.uhn.fhir.rest.api.Constants.STATUS_HTTP_202_ACCEPTED;
Expand Down Expand Up @@ -84,13 +87,18 @@ public ResourceMergeService(
myBatch2TaskHelper = theBatch2TaskHelper;
myFhirContext = myPatientDao.getContext();
myHapiTransactionService = theHapiTransactionService;
myMergeResourceHelper = new MergeResourceHelper(myPatientDao);
IFhirResourceDao<Provenance> provenanceDao = theDaoRegistry.getResourceDao(Provenance.class);
myMergeResourceHelper = new MergeResourceHelper(myPatientDao, provenanceDao);
myMergeValidationService = new MergeValidationService(myFhirContext, theDaoRegistry);
}

/**
* Perform the $merge operation. If the number of resources to be changed exceeds the provided batch size,
* then switch to async mode. See the <a href="https://build.fhir.org/patient-operation-merge.html">Patient $merge spec</a>
* Perform the $merge operation. Operation can be performed synchronously or asynchronously depending on
* the prefer-async request header.
* If the operation is requested to be performed synchronously and the number of
* resources to be changed exceeds the provided batch size,
* and error is returned indicating that operation needs to be performed asynchronously. See the
* <a href="https://build.fhir.org/patient-operation-merge.html">Patient $merge spec</a>
* for details on what the difference is between synchronous and asynchronous mode.
*
* @param theMergeOperationParameters the merge operation parameters
Expand Down Expand Up @@ -211,6 +219,7 @@ private void doMergeSync(
MergeOperationOutcome theMergeOutcome,
RequestPartitionId partitionId) {

Date startTime = new Date();
ReplaceReferencesRequest replaceReferencesRequest = new ReplaceReferencesRequest(
theSourceResource.getIdElement(),
theTargetResource.getIdElement(),
Expand All @@ -225,7 +234,8 @@ private void doMergeSync(
theTargetResource,
(Patient) theMergeOperationParameters.getResultResource(),
theMergeOperationParameters.getDeleteSource(),
theRequestDetails);
theRequestDetails,
startTime);
theMergeOutcome.setUpdatedTargetResource(updatedTarget);

String detailsText = "Merge operation completed successfully.";
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ public void before() throws Exception {

myTestHelper = new ReplaceReferencesTestHelper(myFhirContext, myDaoRegistry);
myTestHelper.beforeEach();
// keep the version on Provenance.target fields to verify that Provenance resources were saved
// with versioned target references
myFhirContext.getParserOptions()
.setDontStripVersionsFromReferencesAtPaths("Provenance.target");


mySrd.setRequestPartitionId(RequestPartitionId.allPartitions());
}
Expand Down Expand Up @@ -84,6 +89,8 @@ public void testHappyPath(boolean theDeleteSource, boolean theWithResultResource
myTestHelper.assertSourcePatientUpdatedOrDeleted(theDeleteSource);
myTestHelper.assertTargetPatientUpdated(theDeleteSource,
myTestHelper.getExpectedIdentifiersForTargetAfterMerge(theWithResultResource));

myTestHelper.assertMergeProvenance(theDeleteSource);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,16 @@ public void before() throws Exception {
myStorageSettings.setReuseCachedSearchResultsForMillis(null);
myStorageSettings.setAllowMultipleDelete(true);
myFhirContext.setParserErrorHandler(new StrictErrorHandler());
// keep the version on Provenance.target fields to verify that Provenance resources were saved
// with versioned target references
myFhirContext.getParserOptions()
.setDontStripVersionsFromReferencesAtPaths("Provenance.target");

myTestHelper = new ReplaceReferencesTestHelper(myFhirContext, myDaoRegistry);
myTestHelper.beforeEach();
}

@ParameterizedTest
@ParameterizedTest(name = "{index}: deleteSource={0}, resultPatient={1}, preview={2}, async={3}")
@CsvSource({
// withDelete, withInputResultPatient, withPreview, isAsync
"true, true, true, false",
Expand All @@ -106,7 +110,6 @@ public void before() throws Exception {
})
public void testMerge(boolean withDelete, boolean withInputResultPatient, boolean withPreview, boolean isAsync) {
// setup

ReplaceReferencesTestHelper.PatientMergeInputParameters inParams = new ReplaceReferencesTestHelper.PatientMergeInputParameters();
myTestHelper.setSourceAndTarget(inParams);
inParams.deleteSource = withDelete;
Expand Down Expand Up @@ -225,6 +228,7 @@ public void testMerge(boolean withDelete, boolean withInputResultPatient, boolea
myTestHelper.assertAllReferencesUpdated(withDelete);
myTestHelper.assertSourcePatientUpdatedOrDeleted(withDelete);
myTestHelper.assertTargetPatientUpdated(withDelete, expectedIdentifiersOnTargetAfterMerge);
myTestHelper.assertMergeProvenance(withDelete);
}
}

Expand Down Expand Up @@ -361,8 +365,7 @@ private Parameters callMergeOperation(Parameters inParameters, boolean isAsync)
class MyExceptionHandler implements TestExecutionExceptionHandler {
@Override
public void handleTestExecutionException(ExtensionContext theExtensionContext, Throwable theThrowable) throws Throwable {
if (theThrowable instanceof BaseServerResponseException) {
BaseServerResponseException ex = (BaseServerResponseException) theThrowable;
if (theThrowable instanceof BaseServerResponseException ex) {
String message = extractFailureMessage(ex);
throw ex.getClass().getDeclaredConstructor(String.class, Throwable.class).newInstance(message, ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoPatient;
import ca.uhn.fhir.jpa.api.dao.PatientEverythingParameters;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.client.api.IGenericClient;
import ca.uhn.fhir.rest.gclient.IOperationUntypedWithInputAndPartialOutput;
import ca.uhn.fhir.rest.param.ReferenceParam;
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
import ca.uhn.fhir.rest.server.provider.ProviderConstants;
import ca.uhn.fhir.util.JsonUtil;
Expand All @@ -49,6 +51,8 @@
import org.hl7.fhir.r4.model.Organization;
import org.hl7.fhir.r4.model.Parameters;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.Period;
import org.hl7.fhir.r4.model.Provenance;
import org.hl7.fhir.r4.model.Reference;
import org.hl7.fhir.r4.model.Resource;
import org.hl7.fhir.r4.model.StringType;
Expand All @@ -57,6 +61,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -96,6 +102,7 @@ public class ReplaceReferencesTestHelper {
private final IFhirResourceDao<Encounter> myEncounterDao;
private final IFhirResourceDao<CarePlan> myCarePlanDao;
private final IFhirResourceDao<Observation> myObservationDao;
private final IFhirResourceDao<Provenance> myProvenanceDao;

private IIdType myOrgId;
private IIdType mySourcePatientId;
Expand All @@ -117,6 +124,7 @@ public ReplaceReferencesTestHelper(FhirContext theFhirContext, DaoRegistry theDa
myEncounterDao = theDaoRegistry.getResourceDao(Encounter.class);
myCarePlanDao = theDaoRegistry.getResourceDao(CarePlan.class);
myObservationDao = theDaoRegistry.getResourceDao(Observation.class);
myProvenanceDao = theDaoRegistry.getResourceDao(Provenance.class);
}

public void beforeEach() throws Exception {
Expand Down Expand Up @@ -203,6 +211,52 @@ public IIdType getTargetPatientId() {
return myTargetPatientId;
}

public List<IBaseResource> searchProvenance(String targetId) {
SearchParameterMap map = new SearchParameterMap();
map.add("target", new ReferenceParam(targetId));
IBundleProvider searchBundle = myProvenanceDao.search(map, mySrd);
return searchBundle.getAllResources();
}

public void assertMergeProvenance(boolean theDeleteSource) {
List<IBaseResource> provenances = searchProvenance(myTargetPatientId.getValue());
assertThat(provenances).hasSize(1);
Provenance provenance = (Provenance) provenances.get(0);

// assert targets
assertThat(provenance.getTarget()).hasSize(theDeleteSource ? 1 : 2);
// the first target reference should be the target patient
String targetPatientReference = provenance.getTarget().get(0).getReference();
assertThat(targetPatientReference).isEqualTo(myTargetPatientId.getValue() + "/_history/2");
if (!theDeleteSource) {
// the second target reference should be the source patient, if it wasn't deleted
String sourcePatientReference = provenance.getTarget().get(1).getReference();
assertThat(sourcePatientReference).isEqualTo(mySourcePatientId.getValue() + "/_history/2");
}

Instant now = Instant.now();
Instant oneMinuteAgo = now.minus(1, ChronoUnit.MINUTES);
assertThat(provenance.getRecorded()).isBetween(oneMinuteAgo, now);

Period period = provenance.getOccurredPeriod();
assertThat(period.getStart()).isBefore(period.getEnd());
assertThat(period.getStart()).isBetween(oneMinuteAgo, now);
assertThat(period.getEnd()).isEqualTo(provenance.getRecorded());

// validate provenance.reason
assertThat(provenance.getReason()).hasSize(1);
Coding reasonCoding = provenance.getReason().get(0).getCodingFirstRep();
assertThat(reasonCoding).isNotNull();
assertThat(reasonCoding.getSystem()).isEqualTo("http://terminology.hl7.org/CodeSystem/v3-ActReason");
assertThat(reasonCoding.getCode()).isEqualTo("PATADMIN");

// validate provenance.activity
Coding activityCoding = provenance.getActivity().getCodingFirstRep();
assertThat(activityCoding).isNotNull();
assertThat(activityCoding.getSystem()).isEqualTo("http://terminology.hl7.org/CodeSystem/iso-21089-lifecycle");
assertThat(activityCoding.getCode()).isEqualTo("merge");
}

private Set<IIdType> getTargetEverythingResourceIds() {
PatientEverythingParameters everythingParams = new PatientEverythingParameters();
everythingParams.setCount(new IntegerType(100));
Expand Down Expand Up @@ -432,7 +486,7 @@ private void validateJobReport(JobInstance theJobInstance, IIdType theTaskId) {

public List<Identifier> getExpectedIdentifiersForTargetAfterMerge(boolean theWithInputResultPatient) {

List<Identifier> expectedIdentifiersOnTargetAfterMerge = null;
List<Identifier> expectedIdentifiersOnTargetAfterMerge;
if (theWithInputResultPatient) {
expectedIdentifiersOnTargetAfterMerge =
List.of(new Identifier().setSystem("SYS1A").setValue("VAL1A"));
Expand All @@ -450,7 +504,7 @@ public List<Identifier> getExpectedIdentifiersForTargetAfterMerge(boolean theWit

public void assertSourcePatientUpdatedOrDeleted(boolean withDelete) {
if (withDelete) {
assertThrows(ResourceGoneException.class, () -> readSourcePatient());
assertThrows(ResourceGoneException.class, this::readSourcePatient);
} else {
Patient source = readSourcePatient();
assertThat(source.getLink()).hasSize(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,20 @@
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.model.api.TemporalPrecisionEnum;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.provider.ProviderConstants;
import jakarta.annotation.Nullable;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.CodeableConcept;
import org.hl7.fhir.r4.model.Identifier;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.Period;
import org.hl7.fhir.r4.model.Provenance;
import org.hl7.fhir.r4.model.Reference;

import java.util.Date;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -43,10 +48,17 @@
*/
public class MergeResourceHelper {

private static final String ACTIVITY_CODE_SYSTEM = "http://terminology.hl7.org/CodeSystem/iso-21089-lifecycle";
private static final String ACTIVITY_CODE_MERGE = "merge";
private static final String ACT_REASON_CODE_SYSTEM = "http://terminology.hl7.org/CodeSystem/v3-ActReason";
private static final String ACT_REASON_PATIENT_ADMINISTRATION_CODE = "PATADMIN";

private final IFhirResourceDao<Patient> myPatientDao;
private final IFhirResourceDao<Provenance> myProvenceDao;

public MergeResourceHelper(IFhirResourceDao<Patient> theDao) {
myPatientDao = theDao;
public MergeResourceHelper(IFhirResourceDao<Patient> thePatientDao, IFhirResourceDao<Provenance> theProvenanceDao) {
myPatientDao = thePatientDao;
myProvenceDao = theProvenanceDao;
}

public static int setResourceLimitFromParameter(
Expand All @@ -66,7 +78,8 @@ public void updateMergedResourcesAfterReferencesReplaced(
IIdType theTargetResourceId,
@Nullable Patient theResultResource,
boolean theDeleteSource,
RequestDetails theRequestDetails) {
RequestDetails theRequestDetails,
Date theStartTime) {
Patient sourceResource = myPatientDao.read(theSourceResourceId, theRequestDetails);
Patient targetResource = myPatientDao.read(theTargetResourceId, theRequestDetails);

Expand All @@ -76,7 +89,8 @@ public void updateMergedResourcesAfterReferencesReplaced(
targetResource,
theResultResource,
theDeleteSource,
theRequestDetails);
theRequestDetails,
theStartTime);
}

public Patient updateMergedResourcesAfterReferencesReplaced(
Expand All @@ -85,7 +99,8 @@ public Patient updateMergedResourcesAfterReferencesReplaced(
Patient theTargetResource,
@Nullable Patient theResultResource,
boolean theDeleteSource,
RequestDetails theRequestDetails) {
RequestDetails theRequestDetails,
Date theStartTime) {

AtomicReference<Patient> targetPatientAfterUpdate = new AtomicReference<>();
myHapiTransactionService.withRequest(theRequestDetails).execute(() -> {
Expand All @@ -100,6 +115,13 @@ public Patient updateMergedResourcesAfterReferencesReplaced(
prepareSourcePatientForUpdate(theSourceResource, theTargetResource);
updateResource(theSourceResource, theRequestDetails);
}

createProvenance(
theSourceResource,
targetPatientAfterUpdate.get(),
theDeleteSource,
theRequestDetails,
theStartTime);
});

return targetPatientAfterUpdate.get();
Expand Down Expand Up @@ -184,4 +206,36 @@ private Patient updateResource(Patient theResource, RequestDetails theRequestDet
private void deleteResource(Patient theResource, RequestDetails theRequestDetails) {
myPatientDao.delete(theResource.getIdElement(), theRequestDetails);
}

private void createProvenance(
Patient theSourcePatient,
Patient theTargetPatient,
boolean theDeleteSource,
RequestDetails theRequestDetails,
Date theStartTime) {

Provenance provenance = new Provenance();
provenance.addTarget().setReference(theTargetPatient.getIdElement().getValue());
if (!theDeleteSource) {
provenance.addTarget().setReference(theSourcePatient.getIdElement().getValue());
}
Date now = new Date();
provenance.setOccurred(new Period()
.setStart(theStartTime, TemporalPrecisionEnum.MILLI)
.setEnd(now, TemporalPrecisionEnum.MILLI));
provenance.setRecorded(now);
CodeableConcept activityCodeableConcept = new CodeableConcept();
activityCodeableConcept.addCoding().setSystem(ACTIVITY_CODE_SYSTEM).setCode(ACTIVITY_CODE_MERGE);
provenance.setActivity(activityCodeableConcept);

CodeableConcept activityReasonCodeableConcept = new CodeableConcept();
activityReasonCodeableConcept
.addCoding()
.setSystem(ACT_REASON_CODE_SYSTEM)
.setCode(ACT_REASON_PATIENT_ADMINISTRATION_CODE);
provenance.addReason(activityReasonCodeableConcept);

// TODO Emre: should we add agent
myProvenceDao.create(provenance, theRequestDetails);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import jakarta.annotation.Nonnull;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.Provenance;

import java.util.Date;

public class MergeUpdateTaskReducerStep extends ReplaceReferenceUpdateTaskReducerStep<MergeJobParameters> {
private final IHapiTransactionService myHapiTransactionService;
Expand All @@ -48,6 +51,8 @@ public RunOutcome run(
@Nonnull IJobDataSink<ReplaceReferenceResultsJson> theDataSink)
throws JobExecutionFailedException {

Date startTime = theStepExecutionDetails.getInstance().getStartTime();

MergeJobParameters mergeJobParameters = theStepExecutionDetails.getParameters();
SystemRequestDetails requestDetails =
SystemRequestDetails.forRequestPartitionId(mergeJobParameters.getPartitionId());
Expand All @@ -59,16 +64,17 @@ public RunOutcome run(
}

IFhirResourceDao<Patient> patientDao = myDaoRegistry.getResourceDao(Patient.class);

MergeResourceHelper helper = new MergeResourceHelper(patientDao);
IFhirResourceDao<Provenance> provenanceDao = myDaoRegistry.getResourceDao(Provenance.class);
MergeResourceHelper helper = new MergeResourceHelper(patientDao, provenanceDao);

helper.updateMergedResourcesAfterReferencesReplaced(
myHapiTransactionService,
mergeJobParameters.getSourceId().asIdDt(),
mergeJobParameters.getTargetId().asIdDt(),
resultResource,
mergeJobParameters.getDeleteSource(),
requestDetails);
requestDetails,
startTime);

return super.run(theStepExecutionDetails, theDataSink);
}
Expand Down

0 comments on commit 2d92405

Please sign in to comment.