From 5735d9ec877718071a2cc12ff762e861485a98af Mon Sep 17 00:00:00 2001 From: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com> Date: Tue, 25 Jun 2024 21:46:13 +0530 Subject: [PATCH] feat(data quality): custom assertions models, graphql, sdk (#10761) --- .../datahub/graphql/GmsGraphQLEngine.java | 9 + .../datahub/graphql/GmsGraphQLEngineArgs.java | 2 + .../resolvers/assertion/AssertionUtils.java | 27 ++ .../ReportAssertionResultResolver.java | 113 ++++++ .../UpsertCustomAssertionResolver.java | 108 ++++++ .../types/assertion/AssertionMapper.java | 27 ++ .../src/main/resources/assertions.graphql | 163 +++++++++ .../src/main/resources/entity.graphql | 79 ++++ .../ReportAssertionResultResolverTest.java | 163 +++++++++ .../UpsertCustomAssertionResolverTest.java | 345 ++++++++++++++++++ .../types/assertion/AssertionMapperTest.java | 69 ++++ .../src/datahub/ingestion/graph/client.py | 108 ++++++ .../service/AssertionServiceTest.java | 318 ++++++++++++++++ .../kafka/MaeConsumerApplication.java | 1 + .../com/linkedin/assertion/AssertionInfo.pdl | 18 +- .../assertion/CustomAssertionInfo.pdl | 43 +++ .../assertions/AssertionServiceFactory.java | 28 ++ .../factory/graphql/GraphQLEngineFactory.java | 8 + .../metadata/service/AssertionService.java | 186 ++++++++++ .../assertions/custom_assertions_test.py | 188 ++++++++++ 20 files changed, 2001 insertions(+), 2 deletions(-) create mode 100644 datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/assertion/AssertionUtils.java create mode 100644 datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/assertion/ReportAssertionResultResolver.java create mode 100644 datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/assertion/UpsertCustomAssertionResolver.java create mode 100644 datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/assertion/ReportAssertionResultResolverTest.java create mode 100644 datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/assertion/UpsertCustomAssertionResolverTest.java create mode 100644 metadata-io/src/test/java/com/linkedin/metadata/service/AssertionServiceTest.java create mode 100644 metadata-models/src/main/pegasus/com/linkedin/assertion/CustomAssertionInfo.pdl create mode 100644 metadata-service/factories/src/main/java/com/linkedin/gms/factory/assertions/AssertionServiceFactory.java create mode 100644 metadata-service/services/src/main/java/com/linkedin/metadata/service/AssertionService.java create mode 100644 smoke-test/tests/assertions/custom_assertions_test.py diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngine.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngine.java index 98bf85ebd976ac..b17e4bd386bdac 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngine.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngine.java @@ -121,6 +121,8 @@ import com.linkedin.datahub.graphql.resolvers.assertion.AssertionRunEventResolver; import com.linkedin.datahub.graphql.resolvers.assertion.DeleteAssertionResolver; import com.linkedin.datahub.graphql.resolvers.assertion.EntityAssertionsResolver; +import com.linkedin.datahub.graphql.resolvers.assertion.ReportAssertionResultResolver; +import com.linkedin.datahub.graphql.resolvers.assertion.UpsertCustomAssertionResolver; import com.linkedin.datahub.graphql.resolvers.auth.CreateAccessTokenResolver; import com.linkedin.datahub.graphql.resolvers.auth.DebugAccessResolver; import com.linkedin.datahub.graphql.resolvers.auth.GetAccessTokenMetadataResolver; @@ -377,6 +379,7 @@ import com.linkedin.metadata.query.filter.SortCriterion; import com.linkedin.metadata.query.filter.SortOrder; import com.linkedin.metadata.recommendation.RecommendationsService; +import com.linkedin.metadata.service.AssertionService; import com.linkedin.metadata.service.BusinessAttributeService; import com.linkedin.metadata.service.DataProductService; import com.linkedin.metadata.service.ERModelRelationshipService; @@ -454,6 +457,7 @@ public class GmsGraphQLEngine { private final FormService formService; private final RestrictedService restrictedService; private ConnectionService connectionService; + private AssertionService assertionService; private final BusinessAttributeService businessAttributeService; private final FeatureFlags featureFlags; @@ -575,6 +579,7 @@ public GmsGraphQLEngine(final GmsGraphQLEngineArgs args) { this.formService = args.formService; this.restrictedService = args.restrictedService; this.connectionService = args.connectionService; + this.assertionService = args.assertionService; this.businessAttributeService = args.businessAttributeService; this.ingestionConfiguration = Objects.requireNonNull(args.ingestionConfiguration); @@ -1220,6 +1225,10 @@ private void configureMutationResolvers(final RuntimeWiring.Builder builder) { "createTestConnectionRequest", new CreateTestConnectionRequestResolver( this.entityClient, this.ingestionConfiguration)) + .dataFetcher( + "upsertCustomAssertion", new UpsertCustomAssertionResolver(assertionService)) + .dataFetcher( + "reportAssertionResult", new ReportAssertionResultResolver(assertionService)) .dataFetcher( "deleteAssertion", new DeleteAssertionResolver(this.entityClient, this.entityService)) diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngineArgs.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngineArgs.java index d4d4d592d6bca7..f6ab3a603dbb7b 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngineArgs.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngineArgs.java @@ -25,6 +25,7 @@ import com.linkedin.metadata.graph.SiblingGraphService; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.recommendation.RecommendationsService; +import com.linkedin.metadata.service.AssertionService; import com.linkedin.metadata.service.BusinessAttributeService; import com.linkedin.metadata.service.DataProductService; import com.linkedin.metadata.service.ERModelRelationshipService; @@ -86,6 +87,7 @@ public class GmsGraphQLEngineArgs { boolean graphQLQueryIntrospectionEnabled; BusinessAttributeService businessAttributeService; ConnectionService connectionService; + AssertionService assertionService; // any fork specific args should go below this line } diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/assertion/AssertionUtils.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/assertion/AssertionUtils.java new file mode 100644 index 00000000000000..757ff38de60065 --- /dev/null +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/assertion/AssertionUtils.java @@ -0,0 +1,27 @@ +package com.linkedin.datahub.graphql.resolvers.assertion; + +import com.datahub.authorization.ConjunctivePrivilegeGroup; +import com.datahub.authorization.DisjunctivePrivilegeGroup; +import com.google.common.collect.ImmutableList; +import com.linkedin.common.urn.Urn; +import com.linkedin.datahub.graphql.QueryContext; +import com.linkedin.datahub.graphql.authorization.AuthorizationUtils; +import com.linkedin.metadata.authorization.PoliciesConfig; + +public class AssertionUtils { + public static boolean isAuthorizedToEditAssertionFromAssertee( + final QueryContext context, final Urn asserteeUrn) { + final DisjunctivePrivilegeGroup orPrivilegeGroups = + new DisjunctivePrivilegeGroup( + ImmutableList.of( + AuthorizationUtils.ALL_PRIVILEGES_GROUP, + new ConjunctivePrivilegeGroup( + ImmutableList.of(PoliciesConfig.EDIT_ENTITY_ASSERTIONS_PRIVILEGE.getType())))); + return AuthorizationUtils.isAuthorized( + context.getAuthorizer(), + context.getActorUrn(), + asserteeUrn.getEntityType(), + asserteeUrn.toString(), + orPrivilegeGroups); + } +} diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/assertion/ReportAssertionResultResolver.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/assertion/ReportAssertionResultResolver.java new file mode 100644 index 00000000000000..b1d2b40933d7e0 --- /dev/null +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/assertion/ReportAssertionResultResolver.java @@ -0,0 +1,113 @@ +package com.linkedin.datahub.graphql.resolvers.assertion; + +import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.*; + +import com.linkedin.assertion.AssertionResult; +import com.linkedin.assertion.AssertionResultError; +import com.linkedin.assertion.AssertionResultErrorType; +import com.linkedin.assertion.AssertionResultType; +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.data.template.SetMode; +import com.linkedin.data.template.StringMap; +import com.linkedin.datahub.graphql.QueryContext; +import com.linkedin.datahub.graphql.exception.AuthorizationException; +import com.linkedin.datahub.graphql.generated.AssertionResultInput; +import com.linkedin.datahub.graphql.generated.StringMapEntryInput; +import com.linkedin.metadata.service.AssertionService; +import graphql.execution.DataFetcherExceptionHandler; +import graphql.execution.DataFetcherResult; +import graphql.schema.DataFetcher; +import graphql.schema.DataFetchingEnvironment; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class ReportAssertionResultResolver implements DataFetcher> { + + public static final String ERROR_MESSAGE_KEY = "message"; + private final AssertionService _assertionService; + + public ReportAssertionResultResolver(AssertionService assertionService) { + _assertionService = assertionService; + } + + /** + * This is called by the graphql engine to fetch the value. The {@link DataFetchingEnvironment} is + * a composite context object that tells you all you need to know about how to fetch a data value + * in graphql type terms. + * + * @param environment this is the data fetching environment which contains all the context you + * need to fetch a value + * @return a value of type T. May be wrapped in a {@link DataFetcherResult} + * @throws Exception to relieve the implementations from having to wrap checked exceptions. Any + * exception thrown from a {@code DataFetcher} will eventually be handled by the registered + * {@link DataFetcherExceptionHandler} and the related field will have a value of {@code null} + * in the result. + */ + @Override + public CompletableFuture get(DataFetchingEnvironment environment) throws Exception { + final QueryContext context = environment.getContext(); + final Urn assertionUrn = UrnUtils.getUrn(environment.getArgument("urn")); + final AssertionResultInput input = + bindArgument(environment.getArgument("result"), AssertionResultInput.class); + + return CompletableFuture.supplyAsync( + () -> { + final Urn asserteeUrn = + _assertionService.getEntityUrnForAssertion( + context.getOperationContext(), assertionUrn); + if (asserteeUrn == null) { + throw new RuntimeException( + String.format( + "Failed to report Assertion Run Event. Assertion with urn %s does not exist or is not associated with any entity.", + assertionUrn)); + } + + // Check whether the current user is allowed to update the assertion. + if (AssertionUtils.isAuthorizedToEditAssertionFromAssertee(context, asserteeUrn)) { + AssertionResult assertionResult = mapAssertionResult(input); + _assertionService.addAssertionRunEvent( + context.getOperationContext(), + assertionUrn, + asserteeUrn, + input.getTimestampMillis(), + assertionResult, + mapContextParameters(input.getProperties())); + return true; + } + throw new AuthorizationException( + "Unauthorized to perform this action. Please contact your DataHub administrator."); + }); + } + + private static StringMap mapContextParameters(List input) { + + if (input == null || input.isEmpty()) { + return null; + } + StringMap entries = new StringMap(); + input.forEach(entry -> entries.put(entry.getKey(), entry.getValue())); + return entries; + } + + private AssertionResult mapAssertionResult(AssertionResultInput input) { + AssertionResult assertionResult = new AssertionResult(); + assertionResult.setType(AssertionResultType.valueOf(input.getType().toString())); + assertionResult.setExternalUrl(input.getExternalUrl(), SetMode.IGNORE_NULL); + if (assertionResult.getType() == AssertionResultType.ERROR && input.getError() != null) { + assertionResult.setError(mapAssertionResultError(input)); + } + return assertionResult; + } + + private static AssertionResultError mapAssertionResultError(AssertionResultInput input) { + AssertionResultError error = new AssertionResultError(); + error.setType(AssertionResultErrorType.valueOf(input.getError().getType().toString())); + StringMap errorProperties = new StringMap(); + errorProperties.put(ERROR_MESSAGE_KEY, input.getError().getMessage()); + error.setProperties(errorProperties); + return error; + } +} diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/assertion/UpsertCustomAssertionResolver.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/assertion/UpsertCustomAssertionResolver.java new file mode 100644 index 00000000000000..026f486e32c116 --- /dev/null +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/assertion/UpsertCustomAssertionResolver.java @@ -0,0 +1,108 @@ +package com.linkedin.datahub.graphql.resolvers.assertion; + +import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.*; +import static com.linkedin.metadata.Constants.*; + +import com.linkedin.assertion.CustomAssertionInfo; +import com.linkedin.common.DataPlatformInstance; +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.data.template.SetMode; +import com.linkedin.datahub.graphql.QueryContext; +import com.linkedin.datahub.graphql.exception.AuthorizationException; +import com.linkedin.datahub.graphql.generated.Assertion; +import com.linkedin.datahub.graphql.generated.PlatformInput; +import com.linkedin.datahub.graphql.generated.UpsertCustomAssertionInput; +import com.linkedin.datahub.graphql.types.assertion.AssertionMapper; +import com.linkedin.metadata.key.DataPlatformKey; +import com.linkedin.metadata.service.AssertionService; +import com.linkedin.metadata.utils.EntityKeyUtils; +import com.linkedin.metadata.utils.SchemaFieldUtils; +import graphql.schema.DataFetcher; +import graphql.schema.DataFetchingEnvironment; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import javax.annotation.Nonnull; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class UpsertCustomAssertionResolver implements DataFetcher> { + + private final AssertionService _assertionService; + + public UpsertCustomAssertionResolver(@Nonnull final AssertionService assertionService) { + _assertionService = Objects.requireNonNull(assertionService, "assertionService is required"); + } + + @Override + public CompletableFuture get(DataFetchingEnvironment environment) throws Exception { + final QueryContext context = environment.getContext(); + final String maybeAssertionUrn = environment.getArgument("urn"); + final UpsertCustomAssertionInput input = + bindArgument(environment.getArgument("input"), UpsertCustomAssertionInput.class); + + final Urn entityUrn = UrnUtils.getUrn(input.getEntityUrn()); + final Urn assertionUrn; + + if (maybeAssertionUrn == null) { + assertionUrn = _assertionService.generateAssertionUrn(); + } else { + assertionUrn = UrnUtils.getUrn(maybeAssertionUrn); + } + + return CompletableFuture.supplyAsync( + () -> { + // Check whether the current user is allowed to update the assertion. + if (AssertionUtils.isAuthorizedToEditAssertionFromAssertee(context, entityUrn)) { + _assertionService.upsertCustomAssertion( + context.getOperationContext(), + assertionUrn, + entityUrn, + input.getDescription(), + input.getExternalUrl(), + mapAssertionPlatform(input.getPlatform()), + createCustomAssertionInfo(input, entityUrn)); + + return AssertionMapper.map( + context, + _assertionService.getAssertionEntityResponse( + context.getOperationContext(), assertionUrn)); + } + throw new AuthorizationException( + "Unauthorized to perform this action. Please contact your DataHub administrator."); + }); + } + + @SneakyThrows + private DataPlatformInstance mapAssertionPlatform(PlatformInput platformInput) { + DataPlatformInstance platform = new DataPlatformInstance(); + if (platformInput.getUrn() != null) { + platform.setPlatform(Urn.createFromString(platformInput.getUrn())); + } else if (platformInput.getName() != null) { + platform.setPlatform( + EntityKeyUtils.convertEntityKeyToUrn( + new DataPlatformKey().setPlatformName(platformInput.getName()), + DATA_PLATFORM_ENTITY_NAME)); + } else { + throw new IllegalArgumentException( + "Failed to upsert Custom Assertion. Platform Name or Platform Urn must be specified."); + } + + return platform; + } + + private CustomAssertionInfo createCustomAssertionInfo( + UpsertCustomAssertionInput input, Urn entityUrn) { + CustomAssertionInfo customAssertionInfo = new CustomAssertionInfo(); + customAssertionInfo.setType(input.getType()); + customAssertionInfo.setEntity(entityUrn); + customAssertionInfo.setLogic(input.getLogic(), SetMode.IGNORE_NULL); + + if (input.getFieldPath() != null) { + customAssertionInfo.setField( + SchemaFieldUtils.generateSchemaFieldUrn(entityUrn.toString(), input.getFieldPath())); + } + return customAssertionInfo; + } +} diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/assertion/AssertionMapper.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/assertion/AssertionMapper.java index 1e7fac2edbc9a9..a5f6cadb41566e 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/assertion/AssertionMapper.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/assertion/AssertionMapper.java @@ -22,6 +22,7 @@ import com.linkedin.datahub.graphql.generated.AssertionStdParameters; import com.linkedin.datahub.graphql.generated.AssertionType; import com.linkedin.datahub.graphql.generated.AuditStamp; +import com.linkedin.datahub.graphql.generated.CustomAssertionInfo; import com.linkedin.datahub.graphql.generated.DataPlatform; import com.linkedin.datahub.graphql.generated.DatasetAssertionInfo; import com.linkedin.datahub.graphql.generated.DatasetAssertionScope; @@ -162,10 +163,20 @@ public static com.linkedin.datahub.graphql.generated.AssertionInfo mapAssertionI mapSchemaAssertionInfo(context, gmsAssertionInfo.getSchemaAssertion()); assertionInfo.setSchemaAssertion(schemaAssertionInfo); } + if (gmsAssertionInfo.hasCustomAssertion()) { + CustomAssertionInfo customAssertionInfo = + mapCustomAssertionInfo(context, gmsAssertionInfo.getCustomAssertion()); + assertionInfo.setCustomAssertion(customAssertionInfo); + } + // Source Type if (gmsAssertionInfo.hasSource()) { assertionInfo.setSource(mapSource(gmsAssertionInfo.getSource())); } + + if (gmsAssertionInfo.hasExternalUrl()) { + assertionInfo.setExternalUrl(gmsAssertionInfo.getExternalUrl().toString()); + } return assertionInfo; } @@ -320,6 +331,22 @@ private static SchemaAssertionInfo mapSchemaAssertionInfo( return result; } + private static CustomAssertionInfo mapCustomAssertionInfo( + @Nullable final QueryContext context, + final com.linkedin.assertion.CustomAssertionInfo gmsCustomAssertionInfo) { + CustomAssertionInfo result = new CustomAssertionInfo(); + result.setType(gmsCustomAssertionInfo.getType()); + result.setEntityUrn(gmsCustomAssertionInfo.getEntity().toString()); + if (gmsCustomAssertionInfo.hasField()) { + result.setField(AssertionMapper.mapDatasetSchemaField(gmsCustomAssertionInfo.getField())); + } + if (gmsCustomAssertionInfo.hasLogic()) { + result.setLogic(gmsCustomAssertionInfo.getLogic()); + } + + return result; + } + private static SchemaAssertionField mapSchemaField(final SchemaField gmsField) { SchemaAssertionField result = new SchemaAssertionField(); result.setPath(gmsField.getFieldPath()); diff --git a/datahub-graphql-core/src/main/resources/assertions.graphql b/datahub-graphql-core/src/main/resources/assertions.graphql index 3014289e511788..be9cf61069635e 100644 --- a/datahub-graphql-core/src/main/resources/assertions.graphql +++ b/datahub-graphql-core/src/main/resources/assertions.graphql @@ -1,3 +1,136 @@ +extend type Mutation { + """ + Upsert a Custom Assertion + """ + upsertCustomAssertion( + """ + Urn of custom assertion. If not provided, one will be generated. + """ + urn: String + + """ + Input for upserting a custom assertion. + """ + input: UpsertCustomAssertionInput! + ): Assertion! + + """ + Report result for an assertion + """ + reportAssertionResult( + """ + Urn of custom assertion. + """ + urn: String! + + """ + Input for reporting result of the assertion + """ + result: AssertionResultInput! + ): Boolean! +} + +""" +Input for upserting a Custom Assertion. +""" +input UpsertCustomAssertionInput { + """ + The entity targeted by this assertion. + """ + entityUrn: String! + + """ + The type of the custom assertion. + """ + type: String! + + """ + The description of this assertion. + """ + description: String! + + """ + The dataset field targeted by this assertion, if any. + """ + fieldPath: String + + """ + The external Platform associated with the assertion + """ + platform: PlatformInput! + + """ + Native platform URL of the Assertion + """ + externalUrl: String + + """ + Logic comprising a raw, unstructured assertion. for example - custom SQL query for the assertion. + """ + logic: String + +} + +""" +Input for reporting result of the assertion +""" +input AssertionResultInput { + """ + Optional: Provide a timestamp associated with the run event. If not provided, one will be generated for you based + on the current time. + """ + timestampMillis: Long + + """ + The final result of assertion, e.g. either SUCCESS or FAILURE. + """ + type: AssertionResultType! + + """ + Additional key-value pairs representing runtime context + """ + properties: [StringMapEntryInput!] + + """ + Native platform URL of the Assertion Run Event + """ + externalUrl: String + + """ + Error details, if type is ERROR + """ + error: AssertionResultErrorInput +} + +""" +Input for reporting an Error during Assertion Run +""" +input AssertionResultErrorInput { + """ + The type of error encountered + """ + type: AssertionResultErrorType! + + """ + The error message with details of error encountered + """ + message: String! +} +""" +Input representing A Data Platform +""" +input PlatformInput { + """ + Urn of platform + """ + urn: String + + """ + Name of platform + """ + name: String +} + """ Defines a schema field, each with a specified path and type. """ @@ -96,6 +229,11 @@ extend type AssertionInfo { """ schemaAssertion: SchemaAssertionInfo + """ + Information about Custom assertion + """ + customAssertion: CustomAssertionInfo + """ The source or origin of the Assertion definition. """ @@ -899,3 +1037,28 @@ type SchemaAssertionInfo { """ compatibility: SchemaAssertionCompatibility! } + +""" +Information about a custom assertion +""" +type CustomAssertionInfo { + """ + The type of custom assertion. + """ + type: String! + + """ + The entity targeted by this custom assertion. + """ + entityUrn: String! + + """ + The field serving as input to the assertion, if any. + """ + field: SchemaFieldRef + + """ + Logic comprising a raw, unstructured assertion. + """ + logic: String +} \ No newline at end of file diff --git a/datahub-graphql-core/src/main/resources/entity.graphql b/datahub-graphql-core/src/main/resources/entity.graphql index 211ae97910202b..d48a9976e15d77 100644 --- a/datahub-graphql-core/src/main/resources/entity.graphql +++ b/datahub-graphql-core/src/main/resources/entity.graphql @@ -7328,6 +7328,11 @@ type AssertionInfo { An optional human-readable description of the assertion """ description: String + + """ + URL where assertion details are available + """ + externalUrl: String } """ @@ -7490,6 +7495,75 @@ type AssertionResult { """ nativeResults: [StringMapEntry!] + """ + Error details, if type is ERROR + """ + error: AssertionResultError +} + +""" +An error encountered when evaluating an AssertionResult +""" +type AssertionResultError { + """ + The type of error encountered + """ + type: AssertionResultErrorType! + + """ + Additional metadata depending on the type of error + """ + properties: [StringMapEntry!] +} + +""" +The type of error encountered when evaluating an AssertionResult +""" +enum AssertionResultErrorType { + """ + Source is unreachable + """ + SOURCE_CONNECTION_ERROR + + """ + Source query failed to execute + """ + SOURCE_QUERY_FAILED + + """ + Invalid parameters were detected + """ + INVALID_PARAMETERS + + """ + Insufficient data to evaluate assertion + """ + INSUFFICIENT_DATA + + """ + Event type not supported by the specified source + """ + INVALID_SOURCE_TYPE + + """ + Platform not supported + """ + UNSUPPORTED_PLATFORM + + """ + Error while executing a custom SQL assertion + """ + CUSTOM_SQL_ERROR + + """ + Error while executing a field assertion + """ + FIELD_ASSERTION_ERROR + + """ + Unknown error + """ + UNKNOWN_ERROR } type BatchSpec { @@ -7843,6 +7917,11 @@ enum AssertionType { A schema or structural assertion. """ DATA_SCHEMA + + """ + A custom assertion. + """ + CUSTOM } """ diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/assertion/ReportAssertionResultResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/assertion/ReportAssertionResultResolverTest.java new file mode 100644 index 00000000000000..0cf4ed9896f89b --- /dev/null +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/assertion/ReportAssertionResultResolverTest.java @@ -0,0 +1,163 @@ +package com.linkedin.datahub.graphql.resolvers.assertion; + +import static com.linkedin.datahub.graphql.TestUtils.*; +import static org.mockito.ArgumentMatchers.*; +import static org.testng.Assert.*; + +import com.google.common.collect.ImmutableList; +import com.linkedin.assertion.AssertionResult; +import com.linkedin.assertion.AssertionResultError; +import com.linkedin.assertion.AssertionRunEvent; +import com.linkedin.assertion.AssertionRunStatus; +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.data.template.StringMap; +import com.linkedin.datahub.graphql.QueryContext; +import com.linkedin.datahub.graphql.generated.AssertionResultErrorInput; +import com.linkedin.datahub.graphql.generated.AssertionResultErrorType; +import com.linkedin.datahub.graphql.generated.AssertionResultInput; +import com.linkedin.datahub.graphql.generated.AssertionResultType; +import com.linkedin.datahub.graphql.generated.StringMapEntryInput; +import com.linkedin.metadata.service.AssertionService; +import graphql.schema.DataFetchingEnvironment; +import io.datahubproject.metadata.context.OperationContext; +import java.util.Map; +import java.util.concurrent.CompletionException; +import org.mockito.Mockito; +import org.testng.annotations.Test; + +public class ReportAssertionResultResolverTest { + + private static final Urn TEST_DATASET_URN = + UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,name,PROD)"); + + private static final Urn TEST_ASSERTION_URN = UrnUtils.getUrn("urn:li:assertion:test"); + + private static final String customAssertionUrl = "https://dq-platform-native-url"; + + private static final AssertionResultInput TEST_INPUT = + new AssertionResultInput( + 0L, + AssertionResultType.ERROR, + ImmutableList.of(new StringMapEntryInput("prop1", "value1")), + customAssertionUrl, + new AssertionResultErrorInput( + AssertionResultErrorType.UNKNOWN_ERROR, "an unknown error occurred")); + + ; + + private static final AssertionRunEvent TEST_ASSERTION_RUN_EVENT = + new AssertionRunEvent() + .setAssertionUrn(TEST_ASSERTION_URN) + .setAsserteeUrn(TEST_DATASET_URN) + .setTimestampMillis(0L) + .setRunId("0") + .setStatus(AssertionRunStatus.COMPLETE) + .setResult( + new AssertionResult() + .setType(com.linkedin.assertion.AssertionResultType.ERROR) + .setError( + new AssertionResultError() + .setType(com.linkedin.assertion.AssertionResultErrorType.UNKNOWN_ERROR) + .setProperties( + new StringMap(Map.of("message", "an unknown error occurred")))) + .setExternalUrl(customAssertionUrl)) + .setRuntimeContext(new StringMap(Map.of("prop1", "value1"))); + + @Test + public void testGetSuccessReportAssertionRunEvent() throws Exception { + // Update resolver + AssertionService mockedService = Mockito.mock(AssertionService.class); + ReportAssertionResultResolver resolver = new ReportAssertionResultResolver(mockedService); + + // Execute resolver + QueryContext mockContext = getMockAllowContext(); + DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class); + Mockito.when(mockEnv.getArgument(Mockito.eq("urn"))).thenReturn(TEST_ASSERTION_URN.toString()); + Mockito.when(mockEnv.getArgument(Mockito.eq("result"))).thenReturn(TEST_INPUT); + Mockito.when(mockEnv.getContext()).thenReturn(mockContext); + + Mockito.when( + mockedService.getEntityUrnForAssertion( + any(OperationContext.class), Mockito.eq(TEST_ASSERTION_URN))) + .thenReturn(TEST_DATASET_URN); + + resolver.get(mockEnv).get(); + + // Validate that we created the assertion + Mockito.verify(mockedService, Mockito.times(1)) + .addAssertionRunEvent( + any(OperationContext.class), + Mockito.eq(TEST_ASSERTION_URN), + Mockito.eq(TEST_DATASET_URN), + Mockito.eq(TEST_ASSERTION_RUN_EVENT.getTimestampMillis()), + Mockito.eq(TEST_ASSERTION_RUN_EVENT.getResult()), + Mockito.eq(TEST_ASSERTION_RUN_EVENT.getRuntimeContext())); + } + + @Test + public void testGetUpdateAssertionUnauthorized() throws Exception { + // Update resolver + AssertionService mockedService = Mockito.mock(AssertionService.class); + ReportAssertionResultResolver resolver = new ReportAssertionResultResolver(mockedService); + + // Execute resolver + DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class); + QueryContext mockContext = getMockDenyContext(); + Mockito.when(mockEnv.getArgument(Mockito.eq("urn"))).thenReturn(TEST_ASSERTION_URN.toString()); + Mockito.when(mockEnv.getArgument(Mockito.eq("result"))).thenReturn(TEST_INPUT); + Mockito.when(mockEnv.getContext()).thenReturn(mockContext); + + Mockito.when( + mockedService.getEntityUrnForAssertion( + any(OperationContext.class), Mockito.eq(TEST_ASSERTION_URN))) + .thenReturn(TEST_DATASET_URN); + + CompletionException e = + expectThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); + assert e.getMessage() + .contains( + "Unauthorized to perform this action. Please contact your DataHub administrator."); + + // Validate that we created the assertion + Mockito.verify(mockedService, Mockito.times(0)) + .addAssertionRunEvent( + any(OperationContext.class), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any()); + } + + @Test + public void testGetAssertionServiceException() { + // Update resolver + AssertionService mockService = Mockito.mock(AssertionService.class); + + Mockito.when( + mockService.getEntityUrnForAssertion( + any(OperationContext.class), Mockito.eq(TEST_ASSERTION_URN))) + .thenReturn(TEST_DATASET_URN); + Mockito.doThrow(RuntimeException.class) + .when(mockService) + .addAssertionRunEvent( + any(OperationContext.class), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any()); + + ReportAssertionResultResolver resolver = new ReportAssertionResultResolver(mockService); + + // Execute resolver + QueryContext mockContext = getMockAllowContext(); + DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class); + Mockito.when(mockEnv.getArgument(Mockito.eq("urn"))).thenReturn(TEST_ASSERTION_URN.toString()); + Mockito.when(mockEnv.getArgument(Mockito.eq("result"))).thenReturn(TEST_INPUT); + Mockito.when(mockEnv.getContext()).thenReturn(mockContext); + + assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); + } +} diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/assertion/UpsertCustomAssertionResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/assertion/UpsertCustomAssertionResolverTest.java new file mode 100644 index 00000000000000..2ac6335ba9fea4 --- /dev/null +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/assertion/UpsertCustomAssertionResolverTest.java @@ -0,0 +1,345 @@ +package com.linkedin.datahub.graphql.resolvers.assertion; + +import static com.linkedin.datahub.graphql.TestUtils.*; +import static org.mockito.ArgumentMatchers.*; +import static org.testng.Assert.*; + +import com.google.common.collect.ImmutableMap; +import com.linkedin.assertion.AssertionInfo; +import com.linkedin.assertion.AssertionSource; +import com.linkedin.assertion.AssertionSourceType; +import com.linkedin.assertion.AssertionType; +import com.linkedin.assertion.CustomAssertionInfo; +import com.linkedin.common.AuditStamp; +import com.linkedin.common.DataPlatformInstance; +import com.linkedin.common.url.Url; +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.datahub.graphql.QueryContext; +import com.linkedin.datahub.graphql.generated.Assertion; +import com.linkedin.datahub.graphql.generated.PlatformInput; +import com.linkedin.datahub.graphql.generated.UpsertCustomAssertionInput; +import com.linkedin.entity.Aspect; +import com.linkedin.entity.EntityResponse; +import com.linkedin.entity.EnvelopedAspect; +import com.linkedin.entity.EnvelopedAspectMap; +import com.linkedin.metadata.Constants; +import com.linkedin.metadata.service.AssertionService; +import graphql.schema.DataFetchingEnvironment; +import io.datahubproject.metadata.context.OperationContext; +import java.util.concurrent.CompletionException; +import org.mockito.Mockito; +import org.testng.annotations.Test; + +public class UpsertCustomAssertionResolverTest { + + private static final Urn TEST_DATASET_URN = + UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,name,PROD)"); + + private static final String TEST_INVALID_DATASET_URN = "dataset.name"; + + private static final Urn TEST_FIELD_URN = + UrnUtils.getUrn( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,name,PROD),field1)"); + private static final Urn TEST_ASSERTION_URN = UrnUtils.getUrn("urn:li:assertion:test"); + + private static final String TEST_INVALID_ASSERTION_URN = "test"; + private static final Urn TEST_ACTOR_URN = UrnUtils.getUrn("urn:li:actor:test"); + + private static final Urn TEST_PLATFORM_URN = UrnUtils.getUrn("urn:li:dataPlatform:DQplatform"); + + private static final String customAssertionType = "My custom category"; + private static final String customAssertionDescription = "Description of custom assertion"; + private static final String customAssertionUrl = "https://dq-platform-native-url"; + + private static final String customAssertionLogic = "custom script of assertion"; + + private static final UpsertCustomAssertionInput TEST_INPUT = + new UpsertCustomAssertionInput( + TEST_DATASET_URN.toString(), + customAssertionType, + customAssertionDescription, + "field1", + new PlatformInput(null, "DQplatform"), + customAssertionUrl, + customAssertionLogic); + + private static final UpsertCustomAssertionInput TEST_INPUT_MISSING_PLATFORM = + new UpsertCustomAssertionInput( + TEST_DATASET_URN.toString(), + customAssertionType, + customAssertionDescription, + "field1", + new PlatformInput(null, null), + customAssertionUrl, + customAssertionLogic); + + private static final UpsertCustomAssertionInput TEST_INPUT_INVALID_ENTITY_URN = + new UpsertCustomAssertionInput( + TEST_INVALID_DATASET_URN, + customAssertionType, + customAssertionDescription, + "field1", + new PlatformInput(null, "DQplatform"), + customAssertionUrl, + customAssertionLogic); + + private static final AssertionInfo TEST_ASSERTION_INFO = + new AssertionInfo() + .setType(AssertionType.CUSTOM) + .setDescription(customAssertionDescription) + .setExternalUrl(new Url(customAssertionUrl)) + .setSource( + new AssertionSource() + .setType(AssertionSourceType.EXTERNAL) + .setCreated( + new AuditStamp() + .setTime(System.currentTimeMillis()) + .setActor(TEST_ACTOR_URN))) + .setCustomAssertion( + new CustomAssertionInfo() + .setEntity(TEST_DATASET_URN) + .setType(customAssertionType) + .setField(TEST_FIELD_URN) + .setLogic(customAssertionLogic)); + + private static final DataPlatformInstance TEST_DATA_PLATFORM_INSTANCE = + new DataPlatformInstance().setPlatform(TEST_PLATFORM_URN); + + @Test + public void testGetSuccessCreateAssertion() throws Exception { + // Update resolver + AssertionService mockedService = Mockito.mock(AssertionService.class); + UpsertCustomAssertionResolver resolver = new UpsertCustomAssertionResolver(mockedService); + + // Execute resolver + QueryContext mockContext = getMockAllowContext(); + DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class); + Mockito.when(mockEnv.getArgument(Mockito.eq("urn"))).thenReturn(null); + Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(TEST_INPUT); + Mockito.when(mockEnv.getContext()).thenReturn(mockContext); + + Mockito.when(mockedService.generateAssertionUrn()).thenReturn(TEST_ASSERTION_URN); + Mockito.when( + mockedService.getAssertionEntityResponse( + any(OperationContext.class), Mockito.eq(TEST_ASSERTION_URN))) + .thenReturn( + new EntityResponse() + .setAspects( + new EnvelopedAspectMap( + ImmutableMap.of( + Constants.ASSERTION_INFO_ASPECT_NAME, + new EnvelopedAspect().setValue(new Aspect(TEST_ASSERTION_INFO.data())), + Constants.DATA_PLATFORM_INSTANCE_ASPECT_NAME, + new EnvelopedAspect() + .setValue(new Aspect(TEST_DATA_PLATFORM_INSTANCE.data()))))) + .setEntityName(Constants.ASSERTION_ENTITY_NAME) + .setUrn(TEST_ASSERTION_URN)); + + Assertion assertion = resolver.get(mockEnv).get(); + + // Don't validate each field since we have mapper tests already. + assertNotNull(assertion); + assertEquals(assertion.getUrn(), TEST_ASSERTION_URN.toString()); + + // Validate that we created the assertion + Mockito.verify(mockedService, Mockito.times(1)) + .upsertCustomAssertion( + any(OperationContext.class), + Mockito.eq(TEST_ASSERTION_URN), + Mockito.eq(TEST_ASSERTION_INFO.getCustomAssertion().getEntity()), + Mockito.eq(TEST_ASSERTION_INFO.getDescription()), + Mockito.eq(TEST_ASSERTION_INFO.getExternalUrl().toString()), + Mockito.eq(TEST_DATA_PLATFORM_INSTANCE), + Mockito.eq(TEST_ASSERTION_INFO.getCustomAssertion())); + } + + @Test + public void testGetSuccessUpdateAssertion() throws Exception { + // Update resolver + AssertionService mockedService = Mockito.mock(AssertionService.class); + UpsertCustomAssertionResolver resolver = new UpsertCustomAssertionResolver(mockedService); + + // Execute resolver + QueryContext mockContext = getMockAllowContext(); + DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class); + Mockito.when(mockEnv.getArgument(Mockito.eq("urn"))).thenReturn(TEST_ASSERTION_URN.toString()); + Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(TEST_INPUT); + Mockito.when(mockEnv.getContext()).thenReturn(mockContext); + + Mockito.when( + mockedService.getAssertionEntityResponse( + any(OperationContext.class), Mockito.eq(TEST_ASSERTION_URN))) + .thenReturn( + new EntityResponse() + .setAspects( + new EnvelopedAspectMap( + ImmutableMap.of( + Constants.ASSERTION_INFO_ASPECT_NAME, + new EnvelopedAspect().setValue(new Aspect(TEST_ASSERTION_INFO.data())), + Constants.DATA_PLATFORM_INSTANCE_ASPECT_NAME, + new EnvelopedAspect() + .setValue(new Aspect(TEST_DATA_PLATFORM_INSTANCE.data()))))) + .setEntityName(Constants.ASSERTION_ENTITY_NAME) + .setUrn(TEST_ASSERTION_URN)); + + Assertion assertion = resolver.get(mockEnv).get(); + + // Don't validate each field since we have mapper tests already. + assertNotNull(assertion); + assertEquals(assertion.getUrn(), TEST_ASSERTION_URN.toString()); + + // Validate that we created the assertion + Mockito.verify(mockedService, Mockito.times(1)) + .upsertCustomAssertion( + any(OperationContext.class), + Mockito.eq(TEST_ASSERTION_URN), + Mockito.eq(TEST_ASSERTION_INFO.getCustomAssertion().getEntity()), + Mockito.eq(TEST_ASSERTION_INFO.getDescription()), + Mockito.eq(TEST_ASSERTION_INFO.getExternalUrl().toString()), + Mockito.eq(TEST_DATA_PLATFORM_INSTANCE), + Mockito.eq(TEST_ASSERTION_INFO.getCustomAssertion())); + } + + @Test + public void testGetUpdateAssertionUnauthorized() throws Exception { + // Update resolver + AssertionService mockedService = Mockito.mock(AssertionService.class); + UpsertCustomAssertionResolver resolver = new UpsertCustomAssertionResolver(mockedService); + + // Execute resolver + DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class); + QueryContext mockContext = getMockDenyContext(); + Mockito.when(mockEnv.getArgument(Mockito.eq("urn"))).thenReturn(TEST_ASSERTION_URN.toString()); + Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(TEST_INPUT); + Mockito.when(mockEnv.getContext()).thenReturn(mockContext); + + CompletionException e = + expectThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); + assert e.getMessage() + .contains( + "Unauthorized to perform this action. Please contact your DataHub administrator."); + + Mockito.verify(mockedService, Mockito.times(0)) + .upsertCustomAssertion( + any(OperationContext.class), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any()); + } + + @Test + public void testGetUpsertAssertionMissingPlatformFailure() throws Exception { + // Update resolver + AssertionService mockedService = Mockito.mock(AssertionService.class); + UpsertCustomAssertionResolver resolver = new UpsertCustomAssertionResolver(mockedService); + + // Execute resolver + DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class); + QueryContext mockContext = getMockAllowContext(); + Mockito.when(mockEnv.getArgument(Mockito.eq("urn"))).thenReturn(TEST_ASSERTION_URN.toString()); + Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(TEST_INPUT_MISSING_PLATFORM); + Mockito.when(mockEnv.getContext()).thenReturn(mockContext); + + CompletionException e = + expectThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); + assert e.getMessage() + .contains( + "Failed to upsert Custom Assertion. Platform Name or Platform Urn must be specified."); + + Mockito.verify(mockedService, Mockito.times(0)) + .upsertCustomAssertion( + any(OperationContext.class), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any()); + } + + @Test + public void testGetUpsertAssertionInvalidAssertionUrn() throws Exception { + // Update resolver + AssertionService mockedService = Mockito.mock(AssertionService.class); + UpsertCustomAssertionResolver resolver = new UpsertCustomAssertionResolver(mockedService); + + // Execute resolver + DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class); + QueryContext mockContext = getMockAllowContext(); + Mockito.when(mockEnv.getArgument(Mockito.eq("urn"))).thenReturn(TEST_INVALID_ASSERTION_URN); + Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(TEST_INPUT); + Mockito.when(mockEnv.getContext()).thenReturn(mockContext); + + RuntimeException e = expectThrows(RuntimeException.class, () -> resolver.get(mockEnv).join()); + assert e.getMessage().contains("invalid urn"); + + Mockito.verify(mockedService, Mockito.times(0)) + .upsertCustomAssertion( + any(OperationContext.class), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any()); + } + + @Test + public void testGetUpsertAssertionInvalidEntityUrn() throws Exception { + // Update resolver + AssertionService mockedService = Mockito.mock(AssertionService.class); + UpsertCustomAssertionResolver resolver = new UpsertCustomAssertionResolver(mockedService); + + // Execute resolver + DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class); + QueryContext mockContext = getMockAllowContext(); + Mockito.when(mockEnv.getArgument(Mockito.eq("urn"))).thenReturn(TEST_ASSERTION_URN.toString()); + Mockito.when(mockEnv.getArgument(Mockito.eq("input"))) + .thenReturn(TEST_INPUT_INVALID_ENTITY_URN); + Mockito.when(mockEnv.getContext()).thenReturn(mockContext); + + RuntimeException e = expectThrows(RuntimeException.class, () -> resolver.get(mockEnv).join()); + assert e.getMessage().contains("invalid urn"); + + Mockito.verify(mockedService, Mockito.times(0)) + .upsertCustomAssertion( + any(OperationContext.class), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any()); + } + + @Test + public void testGetAssertionServiceException() { + // Update resolver + AssertionService mockService = Mockito.mock(AssertionService.class); + Mockito.doThrow(RuntimeException.class) + .when(mockService) + .upsertCustomAssertion( + any(OperationContext.class), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any()); + + UpsertCustomAssertionResolver resolver = new UpsertCustomAssertionResolver(mockService); + + // Execute resolver + QueryContext mockContext = getMockAllowContext(); + DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class); + Mockito.when(mockEnv.getArgument(Mockito.eq("urn"))).thenReturn(TEST_ASSERTION_URN.toString()); + Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(TEST_INPUT); + Mockito.when(mockEnv.getContext()).thenReturn(mockContext); + + assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); + } +} diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/types/assertion/AssertionMapperTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/types/assertion/AssertionMapperTest.java index 376af14af08f65..82f4fe687bf769 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/types/assertion/AssertionMapperTest.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/types/assertion/AssertionMapperTest.java @@ -5,12 +5,14 @@ import com.google.common.collect.ImmutableList; import com.linkedin.assertion.AssertionInfo; import com.linkedin.assertion.AssertionSource; +import com.linkedin.assertion.AssertionSourceType; import com.linkedin.assertion.AssertionStdAggregation; import com.linkedin.assertion.AssertionStdOperator; import com.linkedin.assertion.AssertionStdParameter; import com.linkedin.assertion.AssertionStdParameterType; import com.linkedin.assertion.AssertionStdParameters; import com.linkedin.assertion.AssertionType; +import com.linkedin.assertion.CustomAssertionInfo; import com.linkedin.assertion.DatasetAssertionInfo; import com.linkedin.assertion.DatasetAssertionScope; import com.linkedin.assertion.FreshnessAssertionInfo; @@ -23,6 +25,7 @@ import com.linkedin.common.GlobalTags; import com.linkedin.common.TagAssociationArray; import com.linkedin.common.UrnArray; +import com.linkedin.common.url.Url; import com.linkedin.common.urn.TagUrn; import com.linkedin.common.urn.UrnUtils; import com.linkedin.data.DataMap; @@ -115,6 +118,22 @@ public void testMapDataSchemaAssertion() { verifyAssertionInfo(input, output); } + @Test + public void testMapCustomAssertion() { + // Case 1: Without nullable fields + AssertionInfo input = createCustomAssertionInfoWithoutNullableFields(); + EntityResponse customAssertionEntityResponse = createAssertionInfoEntityResponse(input); + Assertion output = AssertionMapper.map(null, customAssertionEntityResponse); + verifyAssertionInfo(input, output); + + // Case 2: With nullable fields + input = createCustomAssertionInfoWithNullableFields(); + EntityResponse customAssertionEntityResponseWithNullables = + createAssertionInfoEntityResponse(input); + output = AssertionMapper.map(null, customAssertionEntityResponseWithNullables); + verifyAssertionInfo(input, output); + } + private void verifyAssertionInfo(AssertionInfo input, Assertion output) { Assert.assertNotNull(output); Assert.assertNotNull(output.getInfo()); @@ -125,6 +144,10 @@ private void verifyAssertionInfo(AssertionInfo input, Assertion output) { verifyDatasetAssertion(input.getDatasetAssertion(), output.getInfo().getDatasetAssertion()); } + if (input.hasExternalUrl()) { + Assert.assertEquals(input.getExternalUrl().toString(), output.getInfo().getExternalUrl()); + } + if (input.hasFreshnessAssertion()) { verifyFreshnessAssertion( input.getFreshnessAssertion(), output.getInfo().getFreshnessAssertion()); @@ -137,6 +160,10 @@ private void verifyAssertionInfo(AssertionInfo input, Assertion output) { if (input.hasSource()) { verifySource(input.getSource(), output.getInfo().getSource()); } + + if (input.hasCustomAssertion()) { + verifyCustomAssertion(input.getCustomAssertion(), output.getInfo().getCustomAssertion()); + } } private void verifyDatasetAssertion( @@ -184,6 +211,19 @@ private void verifySchemaAssertion( output.getSchema().getFields().size(), input.getSchema().getFields().size()); } + private void verifyCustomAssertion( + CustomAssertionInfo input, + com.linkedin.datahub.graphql.generated.CustomAssertionInfo output) { + Assert.assertEquals(output.getEntityUrn(), input.getEntity().toString()); + Assert.assertEquals(output.getType(), input.getType()); + if (input.hasLogic()) { + Assert.assertEquals(output.getLogic(), input.getLogic()); + } + if (input.hasField()) { + Assert.assertEquals(output.getField().getPath(), input.getField().getEntityKey().get(1)); + } + } + private void verifyCronSchedule( FreshnessCronSchedule input, com.linkedin.datahub.graphql.generated.FreshnessCronSchedule output) { @@ -315,6 +355,35 @@ private AssertionInfo createSchemaAssertion() { return info; } + private AssertionInfo createCustomAssertionInfoWithoutNullableFields() { + AssertionInfo info = new AssertionInfo(); + info.setType(AssertionType.CUSTOM); + CustomAssertionInfo customAssertionInfo = new CustomAssertionInfo(); + customAssertionInfo.setType("Custom Type 1"); + customAssertionInfo.setEntity(UrnUtils.getUrn("urn:li:dataset:1")); + info.setCustomAssertion(customAssertionInfo); + return info; + } + + private AssertionInfo createCustomAssertionInfoWithNullableFields() { + AssertionInfo info = new AssertionInfo(); + info.setType(AssertionType.CUSTOM); + info.setExternalUrl(new Url("https://xyz.com")); + info.setDescription("Description of custom assertion"); + CustomAssertionInfo customAssertionInfo = new CustomAssertionInfo(); + customAssertionInfo.setType("Custom Type 1"); + customAssertionInfo.setEntity( + UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,name,PROD)")); + customAssertionInfo.setField( + UrnUtils.getUrn( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,name,PROD),field)")); + customAssertionInfo.setLogic("custom logic"); + info.setCustomAssertion(customAssertionInfo); + info.setSource(new AssertionSource().setType(AssertionSourceType.EXTERNAL)); + + return info; + } + private AssertionStdParameters createAssertionStdParameters() { AssertionStdParameters parameters = new AssertionStdParameters(); parameters.setValue(createAssertionStdParameter()); diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index 310a18cc0c9c63..795f5c1e4cd9b5 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -15,6 +15,7 @@ Iterable, Iterator, List, + Literal, Optional, Tuple, Type, @@ -1483,6 +1484,113 @@ def get_entities_v2( retval[entity_urn][aspect_key] = aspect_value return retval + def upsert_custom_assertion( + self, + urn: Optional[str], + entity_urn: str, + type: str, + description: str, + platform_name: Optional[str] = None, + platform_urn: Optional[str] = None, + field_path: Optional[str] = None, + external_url: Optional[str] = None, + logic: Optional[str] = None, + ) -> Dict: + graph_query: str = """ + mutation upsertCustomAssertion( + $assertionUrn: String, + $entityUrn: String!, + $type: String!, + $description: String!, + $fieldPath: String, + $platformName: String, + $platformUrn: String, + $externalUrl: String, + $logic: String + ) { + upsertCustomAssertion(urn: $assertionUrn, input: { + entityUrn: $entityUrn + type: $type + description: $description + fieldPath: $fieldPath + platform: { + urn: $platformUrn + name: $platformName + } + externalUrl: $externalUrl + logic: $logic + }) { + urn + } + } + """ + + variables = { + "assertionUrn": urn, + "entityUrn": entity_urn, + "type": type, + "description": description, + "fieldPath": field_path, + "platformName": platform_name, + "platformUrn": platform_urn, + "externalUrl": external_url, + "logic": logic, + } + + res = self.execute_graphql( + query=graph_query, + variables=variables, + ) + + return res["upsertCustomAssertion"] + + def report_assertion_result( + self, + urn: str, + timestamp_millis: int, + type: Literal["SUCCESS", "FAILURE", "ERROR", "INIT"], + properties: Optional[List[Dict[str, str]]] = None, + external_url: Optional[str] = None, + error_type: Optional[str] = None, + error_message: Optional[str] = None, + ) -> bool: + graph_query: str = """ + mutation reportAssertionResult( + $assertionUrn: String!, + $timestampMillis: Long!, + $type: AssertionResultType!, + $properties: [StringMapEntryInput!], + $externalUrl: String, + $error: AssertionResultErrorInput, + ) { + reportAssertionResult(urn: $assertionUrn, result: { + timestampMillis: $timestampMillis + type: $type + properties: $properties + externalUrl: $externalUrl + error: $error + }) + } + """ + + variables = { + "assertionUrn": urn, + "timestampMillis": timestamp_millis, + "type": type, + "properties": properties, + "externalUrl": external_url, + "error": {"type": error_type, "message": error_message} + if error_type + else None, + } + + res = self.execute_graphql( + query=graph_query, + variables=variables, + ) + + return res["reportAssertionResult"] + def close(self) -> None: self._make_schema_resolver.cache_clear() super().close() diff --git a/metadata-io/src/test/java/com/linkedin/metadata/service/AssertionServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/service/AssertionServiceTest.java new file mode 100644 index 00000000000000..83cec7841ec249 --- /dev/null +++ b/metadata-io/src/test/java/com/linkedin/metadata/service/AssertionServiceTest.java @@ -0,0 +1,318 @@ +package com.linkedin.metadata.service; + +import static com.linkedin.metadata.Constants.*; +import static com.linkedin.metadata.service.AssertionService.*; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + +import com.google.common.collect.ImmutableList; +import com.linkedin.assertion.AssertionInfo; +import com.linkedin.assertion.AssertionResult; +import com.linkedin.assertion.AssertionResultError; +import com.linkedin.assertion.AssertionResultErrorType; +import com.linkedin.assertion.AssertionResultType; +import com.linkedin.assertion.AssertionRunEvent; +import com.linkedin.assertion.AssertionRunStatus; +import com.linkedin.assertion.AssertionSourceType; +import com.linkedin.assertion.AssertionType; +import com.linkedin.assertion.CustomAssertionInfo; +import com.linkedin.common.DataPlatformInstance; +import com.linkedin.common.EntityRelationship; +import com.linkedin.common.EntityRelationshipArray; +import com.linkedin.common.EntityRelationships; +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.data.template.StringMap; +import com.linkedin.entity.client.SystemEntityClient; +import com.linkedin.metadata.graph.GraphClient; +import com.linkedin.metadata.query.filter.RelationshipDirection; +import com.linkedin.metadata.utils.GenericRecordUtils; +import com.linkedin.mxe.MetadataChangeProposal; +import com.linkedin.r2.RemoteInvocationException; +import io.datahubproject.metadata.context.OperationContext; +import io.datahubproject.test.metadata.context.TestOperationContexts; +import java.util.List; +import java.util.Map; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +public class AssertionServiceTest { + private static final Urn TEST_ACTOR_URN = UrnUtils.getUrn("urn:li:corpuser:test"); + private static final Urn TEST_ASSERTION_URN = UrnUtils.getUrn("urn:li:assertion:test"); + private static final Urn TEST_DATASET_URN = + UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,name,PROD)"); + private static final Urn TEST_FIELD_URN = + UrnUtils.getUrn( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,name,PROD),field1)"); + private static final Urn TEST_PLATFORM_URN = UrnUtils.getUrn("urn:li:dataPlatform:hive"); + + private static final Urn TEST_PLATFORM_INSTANCE_URN = + UrnUtils.getUrn("urn:li:dataPlatformInstance:(urn:li:dataPlatform:custom,instance1)"); + + private OperationContext opContext; + + @BeforeTest + public void setup() { + opContext = TestOperationContexts.userContextNoSearchAuthorization(TEST_ACTOR_URN); + } + + @Test + public void testUpsertCustomAssertionRequiredFields() + throws Exception, RemoteInvocationException { + SystemEntityClient mockedEntityClient = mock(SystemEntityClient.class); + AssertionService assertionService = + new AssertionService(mockedEntityClient, mock(GraphClient.class)); + + String descriptionOfCustomAssertion = "Description of custom assertion"; + Mockito.doAnswer( + invocation -> { + List aspects = invocation.getArgument(1); + Assert.assertEquals(aspects.size(), 2); + + MetadataChangeProposal proposal1 = aspects.get(0); + Assert.assertEquals(proposal1.getEntityUrn(), TEST_ASSERTION_URN); + Assert.assertEquals(proposal1.getAspectName(), ASSERTION_INFO_ASPECT_NAME); + AssertionInfo info = + GenericRecordUtils.deserializeAspect( + proposal1.getAspect().getValue(), + proposal1.getAspect().getContentType(), + AssertionInfo.class); + + Assert.assertEquals(info.getType(), AssertionType.CUSTOM); + Assert.assertEquals(info.getDescription(), descriptionOfCustomAssertion); + Assert.assertEquals(info.getSource().getType(), AssertionSourceType.EXTERNAL); + CustomAssertionInfo customAssertionInfo = info.getCustomAssertion(); + Assert.assertEquals(customAssertionInfo.getEntity(), TEST_DATASET_URN); + + MetadataChangeProposal proposal2 = aspects.get(1); + Assert.assertEquals(proposal2.getEntityUrn(), TEST_ASSERTION_URN); + Assert.assertEquals(proposal2.getAspectName(), DATA_PLATFORM_INSTANCE_ASPECT_NAME); + DataPlatformInstance dataPlatformInstance = + GenericRecordUtils.deserializeAspect( + proposal2.getAspect().getValue(), + proposal2.getAspect().getContentType(), + DataPlatformInstance.class); + Assert.assertEquals(dataPlatformInstance.getPlatform(), TEST_PLATFORM_URN); + return null; + }) + .when(mockedEntityClient) + .batchIngestProposals(any(OperationContext.class), Mockito.anyList(), Mockito.eq(false)); + + Urn urn = + assertionService.upsertCustomAssertion( + opContext, + TEST_ASSERTION_URN, + TEST_DATASET_URN, + descriptionOfCustomAssertion, + null, + new DataPlatformInstance().setPlatform(TEST_PLATFORM_URN), + new CustomAssertionInfo().setEntity(TEST_DATASET_URN)); + Assert.assertEquals(urn.getEntityType(), "assertion"); + + Mockito.verify(mockedEntityClient, Mockito.times(1)) + .batchIngestProposals(any(OperationContext.class), Mockito.anyList(), Mockito.eq(false)); + } + + @Test + public void testUpsertCustomAssertionAllFields() throws Exception, RemoteInvocationException { + SystemEntityClient mockedEntityClient = mock(SystemEntityClient.class); + AssertionService assertionService = + new AssertionService(mockedEntityClient, mock(GraphClient.class)); + String descriptionOfCustomAssertion = "Description of custom assertion"; + String externalUrlOfCustomAssertion = "https://xyz.com/abc"; + String customCategory = "Custom category"; + String customLogic = "select percentile(field1, 0.66) from table"; + Mockito.doAnswer( + invocation -> { + List aspects = invocation.getArgument(1); + Assert.assertEquals(aspects.size(), 2); + + MetadataChangeProposal proposal1 = aspects.get(0); + Assert.assertEquals(proposal1.getEntityUrn(), TEST_ASSERTION_URN); + Assert.assertEquals(proposal1.getAspectName(), ASSERTION_INFO_ASPECT_NAME); + AssertionInfo info = + GenericRecordUtils.deserializeAspect( + proposal1.getAspect().getValue(), + proposal1.getAspect().getContentType(), + AssertionInfo.class); + + Assert.assertEquals(info.getType(), AssertionType.CUSTOM); + Assert.assertEquals(info.getDescription(), descriptionOfCustomAssertion); + Assert.assertEquals(info.getExternalUrl().toString(), externalUrlOfCustomAssertion); + Assert.assertEquals(info.getSource().getType(), AssertionSourceType.EXTERNAL); + CustomAssertionInfo customAssertionInfo = info.getCustomAssertion(); + Assert.assertEquals(customAssertionInfo.getEntity(), TEST_DATASET_URN); + Assert.assertEquals(customAssertionInfo.getField(), TEST_FIELD_URN); + Assert.assertEquals(customAssertionInfo.getType(), customCategory); + Assert.assertEquals(customAssertionInfo.getLogic(), customLogic); + + MetadataChangeProposal proposal2 = aspects.get(1); + Assert.assertEquals(proposal2.getEntityUrn(), TEST_ASSERTION_URN); + Assert.assertEquals(proposal2.getAspectName(), DATA_PLATFORM_INSTANCE_ASPECT_NAME); + DataPlatformInstance dataPlatformInstance = + GenericRecordUtils.deserializeAspect( + proposal2.getAspect().getValue(), + proposal2.getAspect().getContentType(), + DataPlatformInstance.class); + Assert.assertEquals(dataPlatformInstance.getPlatform(), TEST_PLATFORM_URN); + Assert.assertEquals(dataPlatformInstance.getInstance(), TEST_PLATFORM_INSTANCE_URN); + return null; + }) + .when(mockedEntityClient) + .batchIngestProposals(any(OperationContext.class), Mockito.anyList(), Mockito.eq(false)); + + Urn urn = + assertionService.upsertCustomAssertion( + opContext, + TEST_ASSERTION_URN, + TEST_DATASET_URN, + descriptionOfCustomAssertion, + externalUrlOfCustomAssertion, + new DataPlatformInstance() + .setPlatform(TEST_PLATFORM_URN) + .setInstance(TEST_PLATFORM_INSTANCE_URN), + new CustomAssertionInfo() + .setEntity(TEST_DATASET_URN) + .setField(TEST_FIELD_URN) + .setType(customCategory) + .setLogic(customLogic)); + Assert.assertEquals(urn.getEntityType(), "assertion"); + + Mockito.verify(mockedEntityClient, Mockito.times(1)) + .batchIngestProposals(any(OperationContext.class), Mockito.anyList(), Mockito.eq(false)); + } + + @Test + public void testAddAssertionRunEventRequiredFields() throws Exception, RemoteInvocationException { + SystemEntityClient mockedEntityClient = mock(SystemEntityClient.class); + AssertionService assertionService = + new AssertionService(mockedEntityClient, mock(GraphClient.class)); + Long eventtime = 1718619000000L; + + Mockito.doAnswer( + invocation -> { + MetadataChangeProposal proposal = invocation.getArgument(1); + + Assert.assertEquals(proposal.getEntityUrn(), TEST_ASSERTION_URN); + Assert.assertEquals(proposal.getAspectName(), ASSERTION_RUN_EVENT_ASPECT_NAME); + AssertionRunEvent runEvent = + GenericRecordUtils.deserializeAspect( + proposal.getAspect().getValue(), + proposal.getAspect().getContentType(), + AssertionRunEvent.class); + + Assert.assertEquals(runEvent.getAssertionUrn(), TEST_ASSERTION_URN); + Assert.assertEquals(runEvent.getAsserteeUrn(), TEST_DATASET_URN); + Assert.assertEquals(runEvent.getTimestampMillis(), eventtime); + Assert.assertEquals(runEvent.getStatus(), AssertionRunStatus.COMPLETE); + + AssertionResult result = runEvent.getResult(); + Assert.assertEquals(result.getType(), AssertionResultType.SUCCESS); + + return null; + }) + .when(mockedEntityClient) + .ingestProposal(any(OperationContext.class), Mockito.any(), Mockito.eq(false)); + + assertionService.addAssertionRunEvent( + opContext, + TEST_ASSERTION_URN, + TEST_DATASET_URN, + eventtime, + new AssertionResult().setType(AssertionResultType.SUCCESS), + null); + + Mockito.verify(mockedEntityClient, Mockito.times(1)) + .ingestProposal(any(OperationContext.class), Mockito.any(), Mockito.eq(false)); + } + + @Test + public void testAddAssertionRunEventAllFields() throws Exception, RemoteInvocationException { + SystemEntityClient mockedEntityClient = mock(SystemEntityClient.class); + AssertionService assertionService = + new AssertionService(mockedEntityClient, mock(GraphClient.class)); + Long eventtime = 1718619000000L; + StringMap customProps = new StringMap(Map.of("prop-1", "value-1")); + StringMap errorProps = new StringMap(Map.of("message", "errorMessage")); + String externalUrlOfAssertion = "https://abc/xyz"; + + Mockito.doAnswer( + invocation -> { + MetadataChangeProposal proposal = invocation.getArgument(1); + + Assert.assertEquals(proposal.getEntityUrn(), TEST_ASSERTION_URN); + Assert.assertEquals(proposal.getAspectName(), ASSERTION_RUN_EVENT_ASPECT_NAME); + AssertionRunEvent runEvent = + GenericRecordUtils.deserializeAspect( + proposal.getAspect().getValue(), + proposal.getAspect().getContentType(), + AssertionRunEvent.class); + + Assert.assertEquals(runEvent.getAssertionUrn(), TEST_ASSERTION_URN); + Assert.assertEquals(runEvent.getAsserteeUrn(), TEST_DATASET_URN); + Assert.assertEquals(runEvent.getTimestampMillis(), eventtime); + Assert.assertEquals(runEvent.getStatus(), AssertionRunStatus.COMPLETE); + Assert.assertEquals(runEvent.getRuntimeContext(), customProps); + + AssertionResult result = runEvent.getResult(); + Assert.assertEquals(result.getType(), AssertionResultType.ERROR); + Assert.assertEquals(result.getExternalUrl(), externalUrlOfAssertion); + Assert.assertEquals( + result.getError().getType(), AssertionResultErrorType.UNKNOWN_ERROR); + Assert.assertEquals(result.getError().getProperties(), errorProps); + + return null; + }) + .when(mockedEntityClient) + .ingestProposal(any(OperationContext.class), Mockito.any(), Mockito.eq(false)); + + assertionService.addAssertionRunEvent( + opContext, + TEST_ASSERTION_URN, + TEST_DATASET_URN, + eventtime, + new AssertionResult() + .setType(AssertionResultType.ERROR) + .setExternalUrl(externalUrlOfAssertion) + .setError( + new AssertionResultError() + .setType(AssertionResultErrorType.UNKNOWN_ERROR) + .setProperties(errorProps)), + customProps); + + Mockito.verify(mockedEntityClient, Mockito.times(1)) + .ingestProposal(any(OperationContext.class), Mockito.any(), Mockito.eq(false)); + } + + @Test + public void testGetEntityUrnForAssertion() throws Exception { + // Test data and mocks + SystemEntityClient mockClient = mock(SystemEntityClient.class); + GraphClient mockGraphClient = mock(GraphClient.class); + + Mockito.when( + mockGraphClient.getRelatedEntities( + Mockito.eq(TEST_ASSERTION_URN.toString()), + Mockito.eq(ImmutableList.of("Asserts")), + Mockito.eq(RelationshipDirection.OUTGOING), + Mockito.eq(0), + Mockito.eq(1), + Mockito.anyString())) + .thenReturn( + new EntityRelationships() + .setTotal(1) + .setRelationships( + new EntityRelationshipArray( + ImmutableList.of(new EntityRelationship().setEntity(TEST_DATASET_URN))))); + + final AssertionService service = new AssertionService(mockClient, mockGraphClient); + + // Test method + final Urn entityUrn = service.getEntityUrnForAssertion(opContext, TEST_ASSERTION_URN); + + // Assert result + Assert.assertEquals(entityUrn, TEST_DATASET_URN); + } +} diff --git a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/MaeConsumerApplication.java b/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/MaeConsumerApplication.java index f654cdd817e34a..9a4c01dabf9a77 100644 --- a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/MaeConsumerApplication.java +++ b/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/MaeConsumerApplication.java @@ -33,6 +33,7 @@ "io.datahubproject.metadata.jobs.common.health.kafka", "com.linkedin.gms.factory.context", "com.linkedin.gms.factory.timeseries", + "com.linkedin.gms.factory.assertion", }, excludeFilters = { @ComponentScan.Filter( diff --git a/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionInfo.pdl b/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionInfo.pdl index 65196a69ce3660..c9af15a8d019a3 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionInfo.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionInfo.pdl @@ -17,7 +17,8 @@ record AssertionInfo includes CustomProperties, ExternalReference { @Searchable = { } type: enum AssertionType { /** - * A single-dataset assertion. When this is the value, the datasetAssertion field will be populated. + * A single-dataset assertion. + * When this is the value, the datasetAssertion field will be populated. */ DATASET @@ -49,6 +50,14 @@ record AssertionInfo includes CustomProperties, ExternalReference { * Would have named this SCHEMA but the codegen for PDL does not allow this (reserved word). */ DATA_SCHEMA + + /** + * A custom assertion. + * When this is the value, the customAssertion field will be populated. + * Use this assertion type when the exact type of assertion is not modeled in DataHub or + * as a starting point when integrating third-party data quality tools. + */ + CUSTOM } /** @@ -67,7 +76,7 @@ record AssertionInfo includes CustomProperties, ExternalReference { volumeAssertion: optional VolumeAssertionInfo /** - * A SQL Assertion definition. This field is populated when the type is SQL. + * A SQL Assertion definition. This field is populated when the type is SQL. */ sqlAssertion: optional SqlAssertionInfo @@ -81,6 +90,11 @@ record AssertionInfo includes CustomProperties, ExternalReference { */ schemaAssertion: optional SchemaAssertionInfo + /** + * A Custom Assertion definition. This field is populated when type is CUSTOM. + */ + customAssertion: optional CustomAssertionInfo + /** * The source or origin of the Assertion definition. * diff --git a/metadata-models/src/main/pegasus/com/linkedin/assertion/CustomAssertionInfo.pdl b/metadata-models/src/main/pegasus/com/linkedin/assertion/CustomAssertionInfo.pdl new file mode 100644 index 00000000000000..9473b2b3c22bf8 --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/assertion/CustomAssertionInfo.pdl @@ -0,0 +1,43 @@ +namespace com.linkedin.assertion + +import com.linkedin.common.Urn + +/** +* Attributes that are applicable to Custom Assertions +**/ +record CustomAssertionInfo { + /** + * The type of custom assertion. + * This is how your assertion will appear categorized in DataHub UI. + */ + @Searchable = { + "fieldName": "customType" + } + type: string + + /** + * The entity targeted by this assertion. + * This can have support more entityTypes (e.g. dataJob) in future + */ + @Relationship = { + "name": "Asserts", + "entityTypes": [ "dataset" ] + } + entity: Urn + + /** + * dataset schema field targeted by this assertion. + * + * This field is expected to be provided if the assertion is on dataset field + */ + @Relationship = { + "name": "Asserts", + "entityTypes": [ "schemaField" ] + } + field: optional Urn + + /* + * Logic for the assertion as expressed in the native assertion language. Code fragments, query strings, etc. + */ + logic: optional string +} \ No newline at end of file diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/assertions/AssertionServiceFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/assertions/AssertionServiceFactory.java new file mode 100644 index 00000000000000..96fb2219bf22c9 --- /dev/null +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/assertions/AssertionServiceFactory.java @@ -0,0 +1,28 @@ +package com.linkedin.gms.factory.assertions; + +import com.linkedin.entity.client.SystemEntityClient; +import com.linkedin.metadata.graph.GraphClient; +import com.linkedin.metadata.service.AssertionService; +import javax.annotation.Nonnull; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Scope; + +@Configuration +public class AssertionServiceFactory { + + @Autowired + @Qualifier("graphClient") + private GraphClient _graphClient; + + @Bean(name = "assertionService") + @Scope("singleton") + @Nonnull + protected AssertionService getInstance( + @Qualifier("systemEntityClient") final SystemEntityClient systemEntityClient) + throws Exception { + return new AssertionService(systemEntityClient, _graphClient); + } +} diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/graphql/GraphQLEngineFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/graphql/GraphQLEngineFactory.java index aa80fc62db09c4..3229f12f9021d0 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/graphql/GraphQLEngineFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/graphql/GraphQLEngineFactory.java @@ -14,6 +14,7 @@ import com.linkedin.datahub.graphql.concurrency.GraphQLWorkerPoolThreadFactory; import com.linkedin.entity.client.EntityClient; import com.linkedin.entity.client.SystemEntityClient; +import com.linkedin.gms.factory.assertions.AssertionServiceFactory; import com.linkedin.gms.factory.auth.DataHubTokenServiceFactory; import com.linkedin.gms.factory.common.GitVersionFactory; import com.linkedin.gms.factory.common.IndexConventionFactory; @@ -31,6 +32,7 @@ import com.linkedin.metadata.graph.SiblingGraphService; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.recommendation.RecommendationsService; +import com.linkedin.metadata.service.AssertionService; import com.linkedin.metadata.service.BusinessAttributeService; import com.linkedin.metadata.service.DataProductService; import com.linkedin.metadata.service.ERModelRelationshipService; @@ -69,6 +71,7 @@ DataHubTokenServiceFactory.class, GitVersionFactory.class, SiblingGraphServiceFactory.class, + AssertionServiceFactory.class, }) public class GraphQLEngineFactory { @Autowired @@ -194,6 +197,10 @@ public class GraphQLEngineFactory { @Qualifier("connectionService") private ConnectionService _connectionService; + @Autowired + @Qualifier("assertionService") + private AssertionService assertionService; + @Bean(name = "graphQLEngine") @Nonnull protected GraphQLEngine graphQLEngine( @@ -249,6 +256,7 @@ protected GraphQLEngine graphQLEngine( args.setGraphQLQueryDepthLimit(configProvider.getGraphQL().getQuery().getDepthLimit()); args.setBusinessAttributeService(businessAttributeService); args.setConnectionService(_connectionService); + args.setAssertionService(assertionService); return new GmsGraphQLEngine(args).builder().build(); } diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/service/AssertionService.java b/metadata-service/services/src/main/java/com/linkedin/metadata/service/AssertionService.java new file mode 100644 index 00000000000000..ad0d2c84c90371 --- /dev/null +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/service/AssertionService.java @@ -0,0 +1,186 @@ +package com.linkedin.metadata.service; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.linkedin.assertion.AssertionInfo; +import com.linkedin.assertion.AssertionResult; +import com.linkedin.assertion.AssertionRunEvent; +import com.linkedin.assertion.AssertionRunStatus; +import com.linkedin.assertion.AssertionSource; +import com.linkedin.assertion.AssertionSourceType; +import com.linkedin.assertion.AssertionType; +import com.linkedin.assertion.CustomAssertionInfo; +import com.linkedin.common.DataPlatformInstance; +import com.linkedin.common.EntityRelationships; +import com.linkedin.common.url.Url; +import com.linkedin.common.urn.Urn; +import com.linkedin.data.template.StringMap; +import com.linkedin.entity.EntityResponse; +import com.linkedin.entity.client.SystemEntityClient; +import com.linkedin.metadata.Constants; +import com.linkedin.metadata.entity.AspectUtils; +import com.linkedin.metadata.graph.GraphClient; +import com.linkedin.metadata.key.AssertionKey; +import com.linkedin.metadata.query.filter.RelationshipDirection; +import com.linkedin.metadata.utils.EntityKeyUtils; +import com.linkedin.mxe.MetadataChangeProposal; +import io.datahubproject.metadata.context.OperationContext; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.UUID; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class AssertionService extends BaseService { + + private final GraphClient _graphClient; + + private static final String ASSERTS_RELATIONSHIP_NAME = "Asserts"; + + public AssertionService( + @Nonnull SystemEntityClient entityClient, @Nonnull GraphClient graphClient) { + super(entityClient); + _graphClient = graphClient; + } + + @Nonnull + public Urn generateAssertionUrn() { + final AssertionKey key = new AssertionKey(); + final String id = UUID.randomUUID().toString(); + key.setAssertionId(id); + return EntityKeyUtils.convertEntityKeyToUrn(key, Constants.ASSERTION_ENTITY_NAME); + } + + /** + * Retrieves the entity associated with the assertion + * + * @param opContext the operation context + * @param assertionUrn the urn of the assertion to retrieve entity for + * @return Entity urn associated with the assertion + */ + public @Nullable Urn getEntityUrnForAssertion( + @Nonnull OperationContext opContext, @Nonnull final Urn assertionUrn) { + try { + // Fetch the entity associated with the assertion from the Graph + final EntityRelationships relationships = + _graphClient.getRelatedEntities( + assertionUrn.toString(), + ImmutableList.of(ASSERTS_RELATIONSHIP_NAME), + RelationshipDirection.OUTGOING, + 0, + 1, + opContext.getActorContext().getActorUrn().toString()); + + if (relationships.hasRelationships() && !relationships.getRelationships().isEmpty()) { + return relationships.getRelationships().get(0).getEntity(); + } + } catch (Exception e) { + throw new RuntimeException( + String.format("Failed to retrieve entity for assertion with urn %s", assertionUrn), e); + } + return null; + } + + /** + * Returns an instance of {@link EntityResponse} for the specified View urn, or null if one cannot + * be found. + * + * @param assertionUrn the urn of the View + * @return an instance of {@link EntityResponse} for the View, null if it does not exist. + */ + @Nullable + public EntityResponse getAssertionEntityResponse( + @Nonnull OperationContext opContext, @Nonnull final Urn assertionUrn) { + Objects.requireNonNull(assertionUrn, "assertionUrn must not be null"); + Objects.requireNonNull(opContext, "opContext must not be null"); + try { + return this.entityClient.getV2( + opContext, + Constants.ASSERTION_ENTITY_NAME, + assertionUrn, + ImmutableSet.of( + Constants.ASSERTION_INFO_ASPECT_NAME, + Constants.ASSERTION_ACTIONS_ASPECT_NAME, + Constants.DATA_PLATFORM_INSTANCE_ASPECT_NAME, + Constants.GLOBAL_TAGS_ASPECT_NAME)); + } catch (Exception e) { + throw new RuntimeException( + String.format("Failed to retrieve Assertion with urn %s", assertionUrn), e); + } + } + + public Urn upsertCustomAssertion( + @Nonnull OperationContext opContext, + @Nonnull Urn assertionUrn, + @Nonnull Urn entityUrn, + @Nonnull String description, + @Nullable String externalUrl, + @Nonnull DataPlatformInstance dataPlatformInstance, + @Nonnull CustomAssertionInfo customAssertionInfo) { + + Objects.requireNonNull(entityUrn, "entityUrn must not be null"); + Objects.requireNonNull(description, "description must not be null"); + Objects.requireNonNull(customAssertionInfo, "info must not be null"); + Objects.requireNonNull(dataPlatformInstance, "opContext must not be null"); + + AssertionInfo assertionInfo = new AssertionInfo(); + assertionInfo.setType(AssertionType.CUSTOM); + assertionInfo.setDescription(description); + if (externalUrl != null) { + assertionInfo.setExternalUrl(new Url(externalUrl)); + } + assertionInfo.setSource(new AssertionSource().setType(AssertionSourceType.EXTERNAL)); + assertionInfo.setCustomAssertion(customAssertionInfo); + + final List aspects = new ArrayList<>(); + aspects.add( + AspectUtils.buildMetadataChangeProposal( + assertionUrn, Constants.ASSERTION_INFO_ASPECT_NAME, assertionInfo)); + aspects.add( + (AspectUtils.buildMetadataChangeProposal( + assertionUrn, Constants.DATA_PLATFORM_INSTANCE_ASPECT_NAME, dataPlatformInstance))); + + try { + this.entityClient.batchIngestProposals(opContext, aspects, false); + return assertionUrn; + } catch (Exception e) { + throw new RuntimeException( + String.format("Failed to upsert Custom Assertion with urn %s", assertionUrn), e); + } + } + + public void addAssertionRunEvent( + @Nonnull OperationContext opContext, + @Nonnull Urn assertionUrn, + @Nonnull Urn asserteeUrn, + @Nonnull Long timestampMillis, + @Nonnull AssertionResult assertionResult, + @Nullable StringMap contextParameters) { + AssertionRunEvent assertionRunEvent = new AssertionRunEvent(); + assertionRunEvent.setTimestampMillis(timestampMillis); + assertionRunEvent.setRunId(timestampMillis.toString()); + assertionRunEvent.setAssertionUrn(assertionUrn); + assertionRunEvent.setAsserteeUrn(asserteeUrn); + assertionRunEvent.setStatus(AssertionRunStatus.COMPLETE); + assertionRunEvent.setResult(assertionResult); + if (contextParameters != null) { + assertionRunEvent.setRuntimeContext(contextParameters); + } + + try { + this.entityClient.ingestProposal( + opContext, + AspectUtils.buildMetadataChangeProposal( + assertionUrn, Constants.ASSERTION_RUN_EVENT_ASPECT_NAME, assertionRunEvent), + false); + } catch (Exception e) { + throw new RuntimeException( + String.format( + "Failed to upsert Assertion Run Event for assertion with urn %s", assertionUrn), + e); + } + } +} diff --git a/smoke-test/tests/assertions/custom_assertions_test.py b/smoke-test/tests/assertions/custom_assertions_test.py new file mode 100644 index 00000000000000..509f1cf0f04e04 --- /dev/null +++ b/smoke-test/tests/assertions/custom_assertions_test.py @@ -0,0 +1,188 @@ +import time +from typing import Any + +import pytest +from datahub.emitter.mce_builder import make_dataset_urn +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.graph.client import ( + DatahubClientConfig, + DataHubGraph, + DataHubGraphConfig, +) +from datahub.metadata.schema_classes import StatusClass + +from tests.consistency_utils import wait_for_writes_to_sync +from tests.utils import delete_urn, get_gms_url, wait_for_healthcheck_util + +restli_default_headers = { + "X-RestLi-Protocol-Version": "2.0.0", +} + +TEST_DATASET_URN = make_dataset_urn(platform="postgres", name="foo_custom") + + +@pytest.fixture(scope="module", autouse=False) +def graph() -> DataHubGraph: + graph: DataHubGraph = DataHubGraph(config=DatahubClientConfig(server=get_gms_url())) + return graph + + +@pytest.fixture(scope="session") +def wait_for_healthchecks(): + wait_for_healthcheck_util() + mcpw = MetadataChangeProposalWrapper( + entityUrn=TEST_DATASET_URN, aspect=StatusClass(removed=False) + ) + with DataHubGraph(DataHubGraphConfig()) as graph: + graph.emit(mcpw) + yield + delete_urn(TEST_DATASET_URN) + + +def test_create_update_delete_dataset_custom_assertion( + wait_for_healthchecks: Any, graph: DataHubGraph +) -> None: + # Create custom assertion + resp = graph.upsert_custom_assertion( + urn=None, + entity_urn=TEST_DATASET_URN, + type="My custom category", + description="Description of custom assertion", + platform_name="customDQPlatform", + ) + + assert resp.get("urn") + assertion_urn = resp["urn"] + + # Update custom assertion + resp = graph.upsert_custom_assertion( + urn=assertion_urn, + entity_urn=TEST_DATASET_URN, + type="My custom category", + description="Updated Description of custom assertion", + platform_name="customDQPlatform", + external_url="http://some_url", + ) + + wait_for_writes_to_sync() + + # Report custom assertion result for success + result_reported = graph.report_assertion_result( + urn=assertion_urn, + timestamp_millis=0, + type="SUCCESS", + external_url="http://some_url/run/1", + ) + assert result_reported + + # Report custom assertion result for error + result_reported = graph.report_assertion_result( + urn=assertion_urn, + timestamp_millis=round(time.time() * 1000), + type="ERROR", + external_url="http://some_url/run/2", + error_type="SOURCE_QUERY_FAILED", + error_message="Source query failed with error Permission Denied.", + ) + assert result_reported + + # Report custom assertion result for failure + result_reported = graph.report_assertion_result( + urn=assertion_urn, + timestamp_millis=round(time.time() * 1000), + type="FAILURE", + external_url="http://some_url/run/3", + ) + assert result_reported + + wait_for_writes_to_sync() + + graphql_query_retrive_assertion = """ + query dataset($datasetUrn: String!) { + dataset(urn: $datasetUrn) { + assertions(start: 0, count: 1000) { + start + count + total + assertions { + urn + # Fetch the last run of each associated assertion. + runEvents(status: COMPLETE, limit: 3) { + total + failed + succeeded + runEvents { + timestampMillis + status + result { + type + externalUrl + nativeResults { + key + value + } + } + } + } + info { + type + description + externalUrl + lastUpdated { + time + actor + } + customAssertion { + type + entityUrn + field { + path + } + logic + } + source { + type + created { + time + actor + } + } + } + } + } + } + } + """ + + dataset_assertions = graph.execute_graphql( + query=graphql_query_retrive_assertion, + variables={"datasetUrn": TEST_DATASET_URN}, + ) + + assertions = dataset_assertions["dataset"]["assertions"]["assertions"] + assert assertions + assert assertions[0]["urn"] == assertion_urn + assert assertions[0]["info"] + assert assertions[0]["info"]["type"] == "CUSTOM" + assert assertions[0]["info"]["externalUrl"] == "http://some_url" + assert ( + assertions[0]["info"]["description"] + == "Updated Description of custom assertion" + ) + assert assertions[0]["info"]["customAssertion"] + assert assertions[0]["info"]["customAssertion"]["type"] == "My custom category" + + assert assertions[0]["runEvents"] + assert assertions[0]["runEvents"]["total"] == 3 + assert assertions[0]["runEvents"]["succeeded"] == 1 + assert assertions[0]["runEvents"]["failed"] == 1 + assert assertions[0]["runEvents"]["runEvents"][0]["result"]["externalUrl"] + + graph.delete_entity(assertion_urn, True) + + dataset_assertions = graph.execute_graphql( + query=graphql_query_retrive_assertion, + variables={"datasetUrn": TEST_DATASET_URN}, + ) + assertions = dataset_assertions["dataset"]["assertions"]["assertions"] + assert not assertions