Skip to content

Commit

Permalink
feat(openapi-v3): support async and createIfNotExists params on aspect
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker committed Oct 12, 2024
1 parent 4133319 commit d0cafac
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1357,6 +1357,7 @@ private Stream<IngestResult> ingestProposalSync(
return IngestResult.builder()
.urn(item.getUrn())
.request(item)
.result(result)
.publishedMCL(result.getMclFuture() != null)
.sqlCommitted(true)
.isUpdate(result.getOldValue() != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ protected abstract E buildGenericEntity(
@Nonnull UpdateAspectResult updateAspectResult,
boolean withSystemMetadata);

protected abstract E buildGenericEntity(
@Nonnull String aspectName,
@Nonnull IngestResult ingestResult,
boolean withSystemMetadata);

protected abstract AspectsBatch toMCPBatch(
@Nonnull OperationContext opContext, String entityArrayList, Actor actor)
throws JsonProcessingException, InvalidUrnException;
Expand Down Expand Up @@ -560,6 +565,7 @@ public ResponseEntity<E> createAspect(
@PathVariable("entityName") String entityName,
@PathVariable("entityUrn") String entityUrn,
@PathVariable("aspectName") String aspectName,
@RequestParam(value = "async", required = false, defaultValue = "true") Boolean async,
@RequestParam(value = "systemMetadata", required = false, defaultValue = "false")
Boolean withSystemMetadata,
@RequestParam(value = "createIfNotExists", required = false, defaultValue = "true")
Expand Down Expand Up @@ -595,20 +601,24 @@ public ResponseEntity<E> createAspect(
jsonAspect,
authentication.getActor());

List<UpdateAspectResult> results =
entityService.ingestAspects(
Set<IngestResult> results =
entityService.ingestProposal(
opContext,
AspectsBatchImpl.builder()
.retrieverContext(opContext.getRetrieverContext().get())
.items(List.of(upsert))
.build(),
true,
true);
async);

return ResponseEntity.of(
results.stream()
.findFirst()
.map(result -> buildGenericEntity(aspectName, result, withSystemMetadata)));
if (!async) {
return ResponseEntity.of(
results.stream()
.findFirst()
.map(result -> buildGenericEntity(aspectName, result.getResult(), withSystemMetadata)));
} else {
return results.stream().map(result -> ResponseEntity.accepted().body(buildGenericEntity(aspectName, result, withSystemMetadata)))
.findFirst().orElse(ResponseEntity.accepted().build());
}
}

@Tag(name = "Generic Aspects")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,20 +216,36 @@ protected List<GenericEntityV2> buildEntityVersionedAspectList(
.collect(Collectors.toList());
}

@Override
protected GenericEntityV2 buildGenericEntity(
@Nonnull String aspectName,
@Nonnull UpdateAspectResult updateAspectResult,
boolean withSystemMetadata) {
return GenericEntityV2.builder()
.urn(updateAspectResult.getUrn().toString())
.build(
objectMapper,
Map.of(
aspectName,
Pair.of(
updateAspectResult.getNewValue(),
withSystemMetadata ? updateAspectResult.getNewSystemMetadata() : null)));
}

@Override
protected GenericEntityV2 buildGenericEntity(
@Nonnull String aspectName,
@Nonnull UpdateAspectResult updateAspectResult,
@Nonnull IngestResult ingestResult,
boolean withSystemMetadata) {
return GenericEntityV2.builder()
.urn(updateAspectResult.getUrn().toString())
.urn(ingestResult.getUrn().toString())
.build(
objectMapper,
Map.of(
aspectName,
Pair.of(
updateAspectResult.getNewValue(),
withSystemMetadata ? updateAspectResult.getNewSystemMetadata() : null)));
ingestResult.getRequest().getRecordTemplate(),
withSystemMetadata ? ingestResult.getRequest().getSystemMetadata() : null)));
}

private List<GenericEntityV2> toRecordTemplates(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1100,6 +1100,22 @@ private static PathItem buildSingleEntityAspectPath(
new Operation()
.summary(String.format("Create aspect %s on %s ", aspect, upperFirstEntity))
.tags(tags)
.parameters(List.of(
new Parameter()
.in(NAME_QUERY)
.name("async")
.description("Use async ingestion for high throughput.")
.schema(new Schema().type(TYPE_BOOLEAN)._default(true)),
new Parameter()
.in(NAME_QUERY)
.name(NAME_SYSTEM_METADATA)
.description("Include systemMetadata with response.")
.schema(new Schema().type(TYPE_BOOLEAN)._default(false)),
new Parameter()
.in(NAME_QUERY)
.name("createIfNotExists")
.description("Only create the aspect if it doesn't exist.")
.schema(new Schema().type(TYPE_BOOLEAN)._default(true))))
.requestBody(requestBody)
.responses(new ApiResponses().addApiResponse("201", successPostResponse));
// Patch Operation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,22 +309,41 @@ protected List<GenericEntityV3> buildEntityList(
return responseList;
}

@Override
protected GenericEntityV3 buildGenericEntity(
@Nonnull String aspectName,
@Nonnull UpdateAspectResult updateAspectResult,
boolean withSystemMetadata) {
return GenericEntityV3.builder()
.build(
objectMapper,
updateAspectResult.getUrn(),
Map.of(
aspectName,
AspectItem.builder()
.aspect(updateAspectResult.getNewValue())
.systemMetadata(
withSystemMetadata ? updateAspectResult.getNewSystemMetadata() : null)
.auditStamp(withSystemMetadata ? updateAspectResult.getAuditStamp() : null)
.build()));
}

@Override
protected GenericEntityV3 buildGenericEntity(
@Nonnull String aspectName,
@Nonnull UpdateAspectResult updateAspectResult,
@Nonnull IngestResult ingestResult,
boolean withSystemMetadata) {
return GenericEntityV3.builder()
.build(
objectMapper,
updateAspectResult.getUrn(),
ingestResult.getUrn(),
Map.of(
aspectName,
AspectItem.builder()
.aspect(updateAspectResult.getNewValue())
.aspect(ingestResult.getRequest().getRecordTemplate())
.systemMetadata(
withSystemMetadata ? updateAspectResult.getNewSystemMetadata() : null)
.auditStamp(withSystemMetadata ? updateAspectResult.getAuditStamp() : null)
withSystemMetadata ? ingestResult.getRequest().getSystemMetadata() : null)
.auditStamp(withSystemMetadata ? ingestResult.getRequest().getAuditStamp() : null)
.build()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
public class IngestResult {
Urn urn;
BatchItem request;
UpdateAspectResult result;
boolean publishedMCL;
boolean processedMCL;
boolean publishedMCP;
Expand Down

0 comments on commit d0cafac

Please sign in to comment.