Skip to content

Commit

Permalink
added provenance to replace-references and added provenance tests for…
Browse files Browse the repository at this point in the history
… merge as well
  • Loading branch information
mrdnctrk committed Jan 10, 2025
1 parent fbdc484 commit 6c1331e
Show file tree
Hide file tree
Showing 16 changed files with 487 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public IBaseParameters replaceReferences(
RequestPartitionId partitionId = myRequestPartitionHelperSvc.determineReadPartitionForRequest(
theServletRequest, ReadPartitionIdRequestDetails.forRead(targetId));
ReplaceReferencesRequest replaceReferencesRequest =
new ReplaceReferencesRequest(sourceId, targetId, resourceLimit, partitionId);
new ReplaceReferencesRequest(sourceId, targetId, resourceLimit, partitionId, true);
IBaseParameters retval =
getReplaceReferencesSvc().replaceReferences(replaceReferencesRequest, theServletRequest);
if (ParametersUtil.getNamedParameter(getContext(), retval, OPERATION_REPLACE_REFERENCES_OUTPUT_PARAM_TASK)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.jobs.replacereferences.ReplaceReferencesJobParameters;
import ca.uhn.fhir.batch2.jobs.replacereferences.ReplaceReferencesProvenanceSvc;
import ca.uhn.fhir.batch2.util.Batch2TaskHelper;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
Expand All @@ -42,6 +43,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Date;
import java.util.List;
import java.util.stream.Stream;

import static ca.uhn.fhir.batch2.jobs.replacereferences.ReplaceReferencesAppCtx.JOB_REPLACE_REFERENCES;
Expand All @@ -58,6 +61,7 @@ public class ReplaceReferencesSvcImpl implements IReplaceReferencesSvc {
private final ReplaceReferencesPatchBundleSvc myReplaceReferencesPatchBundleSvc;
private final Batch2TaskHelper myBatch2TaskHelper;
private final JpaStorageSettings myStorageSettings;
private final ReplaceReferencesProvenanceSvc myReplaceReferencesProvenanceSvc;

public ReplaceReferencesSvcImpl(
DaoRegistry theDaoRegistry,
Expand All @@ -74,6 +78,7 @@ public ReplaceReferencesSvcImpl(
myReplaceReferencesPatchBundleSvc = theReplaceReferencesPatchBundleSvc;
myBatch2TaskHelper = theBatch2TaskHelper;
myStorageSettings = theStorageSettings;
myReplaceReferencesProvenanceSvc = new ReplaceReferencesProvenanceSvc(theDaoRegistry);
}

@Override
Expand Down Expand Up @@ -123,6 +128,7 @@ private IBaseParameters replaceReferencesPreferAsync(
private IBaseParameters replaceReferencesPreferSync(
ReplaceReferencesRequest theReplaceReferencesRequest, RequestDetails theRequestDetails) {

Date startTime = new Date();
// TODO KHS get partition from request
StopLimitAccumulator<IdDt> accumulator = myHapiTransactionService
.withRequest(theRequestDetails)
Expand All @@ -139,6 +145,15 @@ private IBaseParameters replaceReferencesPreferSync(
Bundle result = myReplaceReferencesPatchBundleSvc.patchReferencingResources(
theReplaceReferencesRequest, accumulator.getItemList(), theRequestDetails);

if (theReplaceReferencesRequest.createProvenance) {
myReplaceReferencesProvenanceSvc.createProvenance(
theReplaceReferencesRequest.targetId,
theReplaceReferencesRequest.sourceId,
List.of(result),
startTime,
theRequestDetails);
}

Parameters retval = new Parameters();
retval.addParameter()
.setName(OPERATION_REPLACE_REFERENCES_OUTPUT_PARAM_OUTCOME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.jobs.merge.MergeJobParameters;
import ca.uhn.fhir.batch2.jobs.merge.MergeProvenanceSvc;
import ca.uhn.fhir.batch2.jobs.merge.MergeResourceHelper;
import ca.uhn.fhir.batch2.util.Batch2TaskHelper;
import ca.uhn.fhir.context.FhirContext;
Expand All @@ -36,20 +37,24 @@
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException;
import ca.uhn.fhir.util.OperationOutcomeUtil;
import ca.uhn.fhir.util.ParametersUtil;
import org.hl7.fhir.instance.model.api.IBase;
import org.hl7.fhir.instance.model.api.IBaseOperationOutcome;
import org.hl7.fhir.instance.model.api.IBaseParameters;
import org.hl7.fhir.r4.model.Bundle;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.Provenance;
import org.hl7.fhir.r4.model.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Date;
import java.util.List;

import static ca.uhn.fhir.batch2.jobs.merge.MergeAppCtx.JOB_MERGE;
import static ca.uhn.fhir.rest.api.Constants.STATUS_HTTP_200_OK;
import static ca.uhn.fhir.rest.api.Constants.STATUS_HTTP_202_ACCEPTED;
import static ca.uhn.fhir.rest.api.Constants.STATUS_HTTP_500_INTERNAL_ERROR;
import static ca.uhn.fhir.rest.server.provider.ProviderConstants.OPERATION_REPLACE_REFERENCES_OUTPUT_PARAM_OUTCOME;

/**
* Service for the FHIR $merge operation. Currently only supports Patient/$merge. The plan is to expand to other resource types.
Expand All @@ -68,6 +73,7 @@ public class ResourceMergeService {
private final MergeResourceHelper myMergeResourceHelper;
private final Batch2TaskHelper myBatch2TaskHelper;
private final MergeValidationService myMergeValidationService;
private final MergeProvenanceSvc myMergeProvenanceSvc;

public ResourceMergeService(
JpaStorageSettings theStorageSettings,
Expand All @@ -87,8 +93,8 @@ public ResourceMergeService(
myBatch2TaskHelper = theBatch2TaskHelper;
myFhirContext = myPatientDao.getContext();
myHapiTransactionService = theHapiTransactionService;
IFhirResourceDao<Provenance> provenanceDao = theDaoRegistry.getResourceDao(Provenance.class);
myMergeResourceHelper = new MergeResourceHelper(myPatientDao, provenanceDao);
myMergeProvenanceSvc = new MergeProvenanceSvc(theDaoRegistry);
myMergeResourceHelper = new MergeResourceHelper(theDaoRegistry, myMergeProvenanceSvc);
myMergeValidationService = new MergeValidationService(myFhirContext, theDaoRegistry);
}

Expand Down Expand Up @@ -224,18 +230,28 @@ private void doMergeSync(
theSourceResource.getIdElement(),
theTargetResource.getIdElement(),
theMergeOperationParameters.getResourceLimit(),
partitionId);
partitionId,
// don't create provenance as part of replace-references,
// we create it after updating source and target for merge
false);

myReplaceReferencesSvc.replaceReferences(replaceReferencesRequest, theRequestDetails);
IBaseParameters outParams =
myReplaceReferencesSvc.replaceReferences(replaceReferencesRequest, theRequestDetails);

Patient updatedTarget = myMergeResourceHelper.updateMergedResourcesAfterReferencesReplaced(
Bundle patchResultBundle = (Bundle) ParametersUtil.getNamedParameterResource(
myFhirContext, outParams, OPERATION_REPLACE_REFERENCES_OUTPUT_PARAM_OUTCOME)
.orElseThrow();

Patient updatedTarget = myMergeResourceHelper.updateMergedResourcesAndCreateProvenance(
myHapiTransactionService,
theSourceResource,
theTargetResource,
List.of(patchResultBundle),
(Patient) theMergeOperationParameters.getResultResource(),
theMergeOperationParameters.getDeleteSource(),
theRequestDetails,
startTime);

theMergeOutcome.setUpdatedTargetResource(updatedTarget);

String detailsText = "Merge operation completed successfully.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.fhir.util.CanonicalIdentifier;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.model.Bundle;
import org.hl7.fhir.r4.model.Coding;
import org.hl7.fhir.r4.model.IdType;
import org.hl7.fhir.r4.model.Identifier;
Expand Down Expand Up @@ -1464,9 +1465,14 @@ private void verifyProvenanceCreated(boolean theDeleteSource) {


private void setupReplaceReferencesForSuccessForSync() {
// set the count to less that the page size for sync processing
Parameters parameters = new Parameters();
Parameters.ParametersParameterComponent outcomeParameter = new Parameters.ParametersParameterComponent();
outcomeParameter.setName("outcome");
outcomeParameter.setResource(new Bundle());
parameters.addParameter(outcomeParameter);

when(myReplaceReferencesSvcMock.replaceReferences(isA(ReplaceReferencesRequest.class),
eq(myRequestDetailsMock))).thenReturn(new Parameters());
eq(myRequestDetailsMock))).thenReturn(parameters);
}

private void setupBatch2JobTaskHelperMock(Task theTaskToReturn) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import jakarta.annotation.Nonnull;
import jakarta.servlet.http.HttpServletResponse;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.Bundle;
import org.hl7.fhir.r4.model.Coding;
import org.hl7.fhir.r4.model.Encounter;
Expand All @@ -32,7 +33,9 @@
import org.junit.jupiter.params.provider.CsvSource;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static ca.uhn.fhir.jpa.provider.ReplaceReferencesSvcImpl.RESOURCE_TYPES_SYSTEM;
Expand Down Expand Up @@ -143,6 +146,7 @@ public void testMerge(boolean withDelete, boolean withInputResultPatient, boolea
List<Identifier> expectedIdentifiersOnTargetAfterMerge =
myTestHelper.getExpectedIdentifiersForTargetAfterMerge(withInputResultPatient);


// Assert Task inAsync mode, unless it is preview in which case we don't return a task
if (isAsync && !withPreview) {
assertThat(getLastHttpStatusCode()).isEqualTo(HttpServletResponse.SC_ACCEPTED);
Expand Down Expand Up @@ -246,6 +250,55 @@ void testMerge_smallResourceLimit() {
.satisfies(ex -> assertThat(extractFailureMessage((BaseServerResponseException) ex)).isEqualTo("HAPI-2597: Number of resources with references to "+ myTestHelper.getSourcePatientId() + " exceeds the resource-limit 5. Submit the request asynchronsly by adding the HTTP Header 'Prefer: respond-async'."));
}

@ParameterizedTest(name = "{index}: deleteSource={0}, async={1}")
@CsvSource({
"true, false",
"false, false",
"true, true",
"false, true",
})
void testMerge_resourcesWithNoReferences(boolean theDeleteSource, boolean theAsync) {

Patient sourcePatient = new Patient();
sourcePatient = (Patient )myPatientDao.create(sourcePatient, mySrd).getResource();


Patient targetPatient = new Patient();
targetPatient = (Patient) myPatientDao.create(targetPatient, mySrd).getResource();

ReplaceReferencesTestHelper.PatientMergeInputParameters inParams = new ReplaceReferencesTestHelper.PatientMergeInputParameters();
inParams.sourcePatient = new Reference(sourcePatient.getIdElement().toVersionless());
inParams.targetPatient = new Reference(targetPatient.getIdElement().toVersionless());
if (theDeleteSource) {
inParams.deleteSource = true;
}

Parameters outParams = callMergeOperation(inParams.asParametersResource(), theAsync);

if (theAsync) {
assertThat(getLastHttpStatusCode()).isEqualTo(HttpServletResponse.SC_ACCEPTED);
Task task = (Task) outParams.getParameter(OPERATION_MERGE_OUTPUT_PARAM_TASK).getResource();
assertNull(task.getIdElement().getVersionIdPart());
ourLog.info("Got task {}", task.getId());
String jobId = myTestHelper.getJobIdFromTask(task);
myBatch2JobHelper.awaitJobCompletion(jobId);
}

IIdType theExpectedTargetIdWithVersion = targetPatient.getIdElement().withVersion("2");
if (theDeleteSource) {
// when the source resource is being deleted and since there is no identifiers to copy over to the target
// in this test, the target is not actually updated, so its version will remain the same
theExpectedTargetIdWithVersion = targetPatient.getIdElement().withVersion("1");
}

myTestHelper.assertMergeProvenance(theDeleteSource,
sourcePatient.getIdElement().withVersion("2"),
theExpectedTargetIdWithVersion,
0,
Collections.EMPTY_SET);
}


@Test
void testMerge_SourceResourceCannotBeDeletedBecauseAnotherResourceReferencingSourceWasAddedWhileJobIsRunning_JobFails() {
ReplaceReferencesTestHelper.PatientMergeInputParameters inParams = new ReplaceReferencesTestHelper.PatientMergeInputParameters();
Expand Down Expand Up @@ -284,7 +337,7 @@ void testMerge_SourceResourceCannotBeDeletedBecauseAnotherResourceReferencingSou
assertThat(taskAfterJobFailure.getStatus()).isEqualTo(Task.TaskStatus.FAILED);
}

@ParameterizedTest
@ParameterizedTest(name = "{index}: deleteSource={0}, resultPatient={1}, preview={2}")
@CsvSource({
// withDelete, withInputResultPatient, withPreview
"true, true, true",
Expand All @@ -305,7 +358,7 @@ public void testMultipleTargetMatchesFails(boolean withDelete, boolean withInput
}


@ParameterizedTest
@ParameterizedTest(name = "{index}: deleteSource={0}, resultPatient={1}, preview={2}")
@CsvSource({
// withDelete, withInputResultPatient, withPreview
"true, true, true",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ public void after() throws Exception {
@BeforeEach
public void before() throws Exception {
super.before();
// keep the version on Provenance.target fields to verify that Provenance resources were saved
// with versioned target references
myFhirContext.getParserOptions()
.setDontStripVersionsFromReferencesAtPaths("Provenance.target");

myTestHelper = new ReplaceReferencesTestHelper(myFhirContext, myDaoRegistry);
myTestHelper.beforeEach();
Expand Down Expand Up @@ -81,6 +85,7 @@ void testReplaceReferences(boolean isAsync) {
// Check that the linked resources were updated

myTestHelper.assertAllReferencesUpdated();
myTestHelper.assertReplaceReferencesProvenance();
}

private JobInstance awaitJobCompletion(Task task) {
Expand Down Expand Up @@ -157,6 +162,7 @@ void testReplaceReferencesSmallTransactionEntriesSize() {
// Check that the linked resources were updated

myTestHelper.assertAllReferencesUpdated();
myTestHelper.assertReplaceReferencesProvenance();
}

// TODO ED we should add some tests for the invalid request error cases (and assert 4xx status code)
Expand Down
Loading

0 comments on commit 6c1331e

Please sign in to comment.