From b4b91421e817ccbb4ed0012b55b0793bdbed7b2c Mon Sep 17 00:00:00 2001 From: deepgarg-visa <149145061+deepgarg-visa@users.noreply.github.com> Date: Tue, 15 Oct 2024 20:46:56 +0530 Subject: [PATCH 1/9] fix(search): make graphql query autoCompleteForMultiple to show exact matches first (#11586) --- .../elasticsearch/query/ESSearchDAO.java | 2 +- .../request/AutocompleteRequestHandler.java | 95 ++++++++++------- .../fixtures/SampleDataFixtureTestBase.java | 22 +++- .../AutocompleteRequestHandlerTest.java | 98 +++++++++++++----- .../sample_data/containerindex_v2.json.gz | Bin 295 -> 335 bytes 5 files changed, 154 insertions(+), 63 deletions(-) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/ESSearchDAO.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/ESSearchDAO.java index f09a81c0c8b891..2d7db075e676ff 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/ESSearchDAO.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/ESSearchDAO.java @@ -370,7 +370,7 @@ public AutoCompleteResult autoComplete( IndexConvention indexConvention = opContext.getSearchContext().getIndexConvention(); AutocompleteRequestHandler builder = AutocompleteRequestHandler.getBuilder( - entitySpec, customSearchConfiguration, queryFilterRewriteChain); + entitySpec, customSearchConfiguration, queryFilterRewriteChain, searchConfiguration); SearchRequest req = builder.getSearchRequest( opContext, diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/request/AutocompleteRequestHandler.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/request/AutocompleteRequestHandler.java index 294efb069a9046..45359285b4a046 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/request/AutocompleteRequestHandler.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/request/AutocompleteRequestHandler.java @@ -1,6 +1,5 @@ package com.linkedin.metadata.search.elasticsearch.query.request; -import static com.linkedin.metadata.models.SearchableFieldSpecExtractor.PRIMARY_URN_SEARCH_PROPERTIES; import static com.linkedin.metadata.search.utils.ESAccessControlUtil.restrictUrn; import static com.linkedin.metadata.search.utils.ESUtils.applyDefaultSearchFilters; @@ -8,6 +7,7 @@ import com.google.common.collect.ImmutableList; import com.linkedin.common.urn.Urn; import com.linkedin.data.template.StringArray; +import com.linkedin.metadata.config.search.SearchConfiguration; import com.linkedin.metadata.config.search.custom.AutocompleteConfiguration; import com.linkedin.metadata.config.search.custom.CustomSearchConfiguration; import com.linkedin.metadata.config.search.custom.QueryConfiguration; @@ -35,6 +35,7 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.Pair; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; import org.opensearch.index.query.*; @@ -46,7 +47,7 @@ @Slf4j public class AutocompleteRequestHandler { - private final List _defaultAutocompleteFields; + private final List _defaultAutocompleteFields; private final Map> searchableFieldTypes; private static final Map @@ -56,11 +57,13 @@ public class AutocompleteRequestHandler { private final EntitySpec entitySpec; private final QueryFilterRewriteChain queryFilterRewriteChain; + private final SearchConfiguration searchConfiguration; public AutocompleteRequestHandler( @Nonnull EntitySpec entitySpec, @Nullable CustomSearchConfiguration customSearchConfiguration, - @Nonnull QueryFilterRewriteChain queryFilterRewriteChain) { + @Nonnull QueryFilterRewriteChain queryFilterRewriteChain, + @Nonnull SearchConfiguration searchConfiguration) { this.entitySpec = entitySpec; List fieldSpecs = entitySpec.getSearchableFieldSpecs(); this.customizedQueryHandler = CustomizedQueryHandler.builder(customSearchConfiguration).build(); @@ -69,8 +72,12 @@ public AutocompleteRequestHandler( fieldSpecs.stream() .map(SearchableFieldSpec::getSearchableAnnotation) .filter(SearchableAnnotation::isEnableAutocomplete) - .map(SearchableAnnotation::getFieldName), - Stream.of("urn")) + .map( + searchableAnnotation -> + Pair.of( + searchableAnnotation.getFieldName(), + Double.toString(searchableAnnotation.getBoostScore()))), + Stream.of(Pair.of("urn", "1.0"))) .collect(Collectors.toList()); searchableFieldTypes = fieldSpecs.stream() @@ -87,17 +94,22 @@ public AutocompleteRequestHandler( return set1; })); this.queryFilterRewriteChain = queryFilterRewriteChain; + this.searchConfiguration = searchConfiguration; } public static AutocompleteRequestHandler getBuilder( @Nonnull EntitySpec entitySpec, @Nullable CustomSearchConfiguration customSearchConfiguration, - @Nonnull QueryFilterRewriteChain queryFilterRewriteChain) { + @Nonnull QueryFilterRewriteChain queryFilterRewriteChain, + @Nonnull SearchConfiguration searchConfiguration) { return AUTOCOMPLETE_QUERY_BUILDER_BY_ENTITY_NAME.computeIfAbsent( entitySpec, k -> new AutocompleteRequestHandler( - entitySpec, customSearchConfiguration, queryFilterRewriteChain)); + entitySpec, + customSearchConfiguration, + queryFilterRewriteChain, + searchConfiguration)); } public SearchRequest getSearchRequest( @@ -169,7 +181,7 @@ private BoolQueryBuilder getQuery( public BoolQueryBuilder getQuery( @Nonnull ObjectMapper objectMapper, @Nullable AutocompleteConfiguration customAutocompleteConfig, - List autocompleteFields, + List autocompleteFields, @Nonnull String query) { BoolQueryBuilder finalQuery = @@ -189,7 +201,7 @@ public BoolQueryBuilder getQuery( private Optional getAutocompleteQuery( @Nullable AutocompleteConfiguration customConfig, - List autocompleteFields, + List autocompleteFields, @Nonnull String query) { Optional result = Optional.empty(); @@ -200,33 +212,39 @@ private Optional getAutocompleteQuery( return result; } - private static BoolQueryBuilder defaultQuery( - List autocompleteFields, @Nonnull String query) { + private BoolQueryBuilder defaultQuery(List autocompleteFields, @Nonnull String query) { BoolQueryBuilder finalQuery = QueryBuilders.boolQuery().minimumShouldMatch(1); // Search for exact matches with higher boost and ngram matches - MultiMatchQueryBuilder autocompleteQueryBuilder = + MultiMatchQueryBuilder multiMatchQueryBuilder = QueryBuilders.multiMatchQuery(query).type(MultiMatchQueryBuilder.Type.BOOL_PREFIX); - final float urnBoost = - Float.parseFloat((String) PRIMARY_URN_SEARCH_PROPERTIES.get("boostScore")); autocompleteFields.forEach( - fieldName -> { - if ("urn".equals(fieldName)) { - autocompleteQueryBuilder.field(fieldName + ".ngram", urnBoost); - autocompleteQueryBuilder.field(fieldName + ".ngram._2gram", urnBoost); - autocompleteQueryBuilder.field(fieldName + ".ngram._3gram", urnBoost); - autocompleteQueryBuilder.field(fieldName + ".ngram._4gram", urnBoost); - } else { - autocompleteQueryBuilder.field(fieldName + ".ngram"); - autocompleteQueryBuilder.field(fieldName + ".ngram._2gram"); - autocompleteQueryBuilder.field(fieldName + ".ngram._3gram"); - autocompleteQueryBuilder.field(fieldName + ".ngram._4gram"); + pair -> { + final String fieldName = (String) pair.getLeft(); + final float boostScore = Float.parseFloat((String) pair.getRight()); + multiMatchQueryBuilder.field(fieldName + ".ngram"); + multiMatchQueryBuilder.field(fieldName + ".ngram._2gram"); + multiMatchQueryBuilder.field(fieldName + ".ngram._3gram"); + multiMatchQueryBuilder.field(fieldName + ".ngram._4gram"); + multiMatchQueryBuilder.field(fieldName + ".delimited"); + if (!fieldName.equalsIgnoreCase("urn")) { + multiMatchQueryBuilder.field(fieldName + ".ngram", boostScore); + multiMatchQueryBuilder.field( + fieldName + ".ngram._2gram", + boostScore * (searchConfiguration.getWordGram().getTwoGramFactor())); + multiMatchQueryBuilder.field( + fieldName + ".ngram._3gram", + boostScore * (searchConfiguration.getWordGram().getThreeGramFactor())); + multiMatchQueryBuilder.field( + fieldName + ".ngram._4gram", + boostScore * (searchConfiguration.getWordGram().getFourGramFactor())); + finalQuery.should( + QueryBuilders.matchQuery(fieldName + ".keyword", query).boost(boostScore)); } - autocompleteQueryBuilder.field(fieldName + ".delimited"); finalQuery.should(QueryBuilders.matchPhrasePrefixQuery(fieldName + ".delimited", query)); }); - finalQuery.should(autocompleteQueryBuilder); + finalQuery.should(multiMatchQueryBuilder); return finalQuery; } @@ -241,12 +259,17 @@ private HighlightBuilder getHighlights(@Nullable String field) { // Check for each field name and any subfields getAutocompleteFields(field) .forEach( - fieldName -> - highlightBuilder - .field(fieldName) - .field(fieldName + ".*") - .field(fieldName + ".ngram") - .field(fieldName + ".delimited")); + pair -> { + final String fieldName = (String) pair.getLeft(); + highlightBuilder + .field(fieldName) + .field(fieldName + ".*") + .field(fieldName + ".ngram") + .field(fieldName + ".delimited"); + if (!fieldName.equalsIgnoreCase("urn")) { + highlightBuilder.field(fieldName + ".keyword"); + } + }); // set field match req false for ngram highlightBuilder.fields().stream() @@ -256,9 +279,9 @@ private HighlightBuilder getHighlights(@Nullable String field) { return highlightBuilder; } - private List getAutocompleteFields(@Nullable String field) { - if (field != null && !field.isEmpty()) { - return ImmutableList.of(field); + private List getAutocompleteFields(@Nullable String field) { + if (field != null && !field.isEmpty() && !field.equalsIgnoreCase("urn")) { + return ImmutableList.of(Pair.of(field, "10.0")); } return _defaultAutocompleteFields; } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/fixtures/SampleDataFixtureTestBase.java b/metadata-io/src/test/java/com/linkedin/metadata/search/fixtures/SampleDataFixtureTestBase.java index bc3c892e07b1bb..504eb5f5fc13db 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/fixtures/SampleDataFixtureTestBase.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/fixtures/SampleDataFixtureTestBase.java @@ -283,7 +283,7 @@ public void testFixtureInitialization() { Map.of( "dataset", 13, "chart", 0, - "container", 1, + "container", 2, "dashboard", 0, "tag", 0, "mlmodel", 0); @@ -903,6 +903,26 @@ public void testContainerAutoComplete() { }); } + @Test + public void testContainerAutoComplete_with_exactMatch_onTop() { + List.of("container") + .forEach( + query -> { + try { + AutoCompleteResults result = + autocomplete( + getOperationContext(), new ContainerType(getEntityClient()), query); + assertTrue( + result.getSuggestions().get(0).equals("container"), + String.format( + "Expected query:`%s` on top of suggestions, found %s", + query, result.getSuggestions().get(0))); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + @Test public void testGroupAutoComplete() { List.of("T", "Te", "Tes", "Test ", "Test G", "Test Gro", "Test Group ") diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/query/request/AutocompleteRequestHandlerTest.java b/metadata-io/src/test/java/com/linkedin/metadata/search/query/request/AutocompleteRequestHandlerTest.java index 572d79ebf2f0ce..c5205906e9d373 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/query/request/AutocompleteRequestHandlerTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/query/request/AutocompleteRequestHandlerTest.java @@ -5,6 +5,10 @@ import static org.testng.Assert.assertTrue; import com.linkedin.metadata.TestEntitySpecBuilder; +import com.linkedin.metadata.config.search.ExactMatchConfiguration; +import com.linkedin.metadata.config.search.PartialConfiguration; +import com.linkedin.metadata.config.search.SearchConfiguration; +import com.linkedin.metadata.config.search.WordGramConfiguration; import com.linkedin.metadata.config.search.custom.AutocompleteConfiguration; import com.linkedin.metadata.config.search.custom.BoolQueryConfiguration; import com.linkedin.metadata.config.search.custom.CustomSearchConfiguration; @@ -32,14 +36,44 @@ import org.testng.annotations.Test; public class AutocompleteRequestHandlerTest { - private AutocompleteRequestHandler handler = - AutocompleteRequestHandler.getBuilder( - TestEntitySpecBuilder.getSpec(), - CustomSearchConfiguration.builder().build(), - QueryFilterRewriteChain.EMPTY); + private static SearchConfiguration testQueryConfig; + private static AutocompleteRequestHandler handler; private OperationContext mockOpContext = TestOperationContexts.systemContextNoSearchAuthorization(mock(EntityRegistry.class)); + static { + testQueryConfig = new SearchConfiguration(); + testQueryConfig.setMaxTermBucketSize(20); + + ExactMatchConfiguration exactMatchConfiguration = new ExactMatchConfiguration(); + exactMatchConfiguration.setExclusive(false); + exactMatchConfiguration.setExactFactor(10.0f); + exactMatchConfiguration.setWithPrefix(true); + exactMatchConfiguration.setPrefixFactor(6.0f); + exactMatchConfiguration.setCaseSensitivityFactor(0.7f); + exactMatchConfiguration.setEnableStructured(true); + + WordGramConfiguration wordGramConfiguration = new WordGramConfiguration(); + wordGramConfiguration.setTwoGramFactor(1.2f); + wordGramConfiguration.setThreeGramFactor(1.5f); + wordGramConfiguration.setFourGramFactor(1.8f); + + PartialConfiguration partialConfiguration = new PartialConfiguration(); + partialConfiguration.setFactor(0.4f); + partialConfiguration.setUrnFactor(0.7f); + + testQueryConfig.setExactMatch(exactMatchConfiguration); + testQueryConfig.setWordGram(wordGramConfiguration); + testQueryConfig.setPartial(partialConfiguration); + + handler = + AutocompleteRequestHandler.getBuilder( + TestEntitySpecBuilder.getSpec(), + CustomSearchConfiguration.builder().build(), + QueryFilterRewriteChain.EMPTY, + testQueryConfig); + } + private static final QueryConfiguration TEST_QUERY_CONFIG = QueryConfiguration.builder() .queryRegex(".*") @@ -88,9 +122,12 @@ public void testDefaultAutocompleteRequest() { BoolQueryBuilder wrapper = (BoolQueryBuilder) ((FunctionScoreQueryBuilder) sourceBuilder.query()).query(); BoolQueryBuilder query = (BoolQueryBuilder) extractNestedQuery(wrapper); - assertEquals(query.should().size(), 3); + assertEquals(query.should().size(), 4); - MultiMatchQueryBuilder autocompleteQuery = (MultiMatchQueryBuilder) query.should().get(2); + MatchQueryBuilder matchQueryBuilder = (MatchQueryBuilder) query.should().get(0); + assertEquals("keyPart1.keyword", matchQueryBuilder.fieldName()); + + MultiMatchQueryBuilder autocompleteQuery = (MultiMatchQueryBuilder) query.should().get(3); Map queryFields = autocompleteQuery.fields(); assertTrue(queryFields.containsKey("keyPart1.ngram")); assertTrue(queryFields.containsKey("keyPart1.ngram._2gram")); @@ -99,7 +136,7 @@ public void testDefaultAutocompleteRequest() { assertEquals(autocompleteQuery.type(), MultiMatchQueryBuilder.Type.BOOL_PREFIX); MatchPhrasePrefixQueryBuilder prefixQuery = - (MatchPhrasePrefixQueryBuilder) query.should().get(0); + (MatchPhrasePrefixQueryBuilder) query.should().get(1); assertEquals("keyPart1.delimited", prefixQuery.fieldName()); assertEquals(wrapper.mustNot().size(), 1); @@ -108,15 +145,16 @@ public void testDefaultAutocompleteRequest() { assertEquals(removedFilter.value(), true); HighlightBuilder highlightBuilder = sourceBuilder.highlighter(); List highlightedFields = highlightBuilder.fields(); - assertEquals(highlightedFields.size(), 8); + assertEquals(highlightedFields.size(), 9); assertEquals(highlightedFields.get(0).name(), "keyPart1"); assertEquals(highlightedFields.get(1).name(), "keyPart1.*"); assertEquals(highlightedFields.get(2).name(), "keyPart1.ngram"); assertEquals(highlightedFields.get(3).name(), "keyPart1.delimited"); - assertEquals(highlightedFields.get(4).name(), "urn"); - assertEquals(highlightedFields.get(5).name(), "urn.*"); - assertEquals(highlightedFields.get(6).name(), "urn.ngram"); - assertEquals(highlightedFields.get(7).name(), "urn.delimited"); + assertEquals(highlightedFields.get(4).name(), "keyPart1.keyword"); + assertEquals(highlightedFields.get(5).name(), "urn"); + assertEquals(highlightedFields.get(6).name(), "urn.*"); + assertEquals(highlightedFields.get(7).name(), "urn.ngram"); + assertEquals(highlightedFields.get(8).name(), "urn.delimited"); } @Test @@ -130,9 +168,12 @@ public void testAutocompleteRequestWithField() { (BoolQueryBuilder) ((FunctionScoreQueryBuilder) sourceBuilder.query()).query(); assertEquals(wrapper.should().size(), 1); BoolQueryBuilder query = (BoolQueryBuilder) extractNestedQuery(wrapper); - assertEquals(query.should().size(), 2); + assertEquals(query.should().size(), 3); - MultiMatchQueryBuilder autocompleteQuery = (MultiMatchQueryBuilder) query.should().get(1); + MatchQueryBuilder matchQueryBuilder = (MatchQueryBuilder) query.should().get(0); + assertEquals("field.keyword", matchQueryBuilder.fieldName()); + + MultiMatchQueryBuilder autocompleteQuery = (MultiMatchQueryBuilder) query.should().get(2); Map queryFields = autocompleteQuery.fields(); assertTrue(queryFields.containsKey("field.ngram")); assertTrue(queryFields.containsKey("field.ngram._2gram")); @@ -141,7 +182,7 @@ public void testAutocompleteRequestWithField() { assertEquals(autocompleteQuery.type(), MultiMatchQueryBuilder.Type.BOOL_PREFIX); MatchPhrasePrefixQueryBuilder prefixQuery = - (MatchPhrasePrefixQueryBuilder) query.should().get(0); + (MatchPhrasePrefixQueryBuilder) query.should().get(1); assertEquals("field.delimited", prefixQuery.fieldName()); MatchQueryBuilder removedFilter = (MatchQueryBuilder) wrapper.mustNot().get(0); @@ -149,11 +190,12 @@ public void testAutocompleteRequestWithField() { assertEquals(removedFilter.value(), true); HighlightBuilder highlightBuilder = sourceBuilder.highlighter(); List highlightedFields = highlightBuilder.fields(); - assertEquals(highlightedFields.size(), 4); + assertEquals(highlightedFields.size(), 5); assertEquals(highlightedFields.get(0).name(), "field"); assertEquals(highlightedFields.get(1).name(), "field.*"); assertEquals(highlightedFields.get(2).name(), "field.ngram"); assertEquals(highlightedFields.get(3).name(), "field.delimited"); + assertEquals(highlightedFields.get(4).name(), "field.keyword"); } @Test @@ -174,7 +216,8 @@ public void testCustomConfigWithDefault() { .build()) .build())) .build(), - QueryFilterRewriteChain.EMPTY); + QueryFilterRewriteChain.EMPTY, + testQueryConfig); SearchRequest autocompleteRequest = withoutDefaultQuery.getSearchRequest(mockOpContext, "input", null, null, 10); @@ -200,7 +243,8 @@ public void testCustomConfigWithDefault() { .build()) .build())) .build(), - QueryFilterRewriteChain.EMPTY); + QueryFilterRewriteChain.EMPTY, + testQueryConfig); autocompleteRequest = withDefaultQuery.getSearchRequest(mockOpContext, "input", null, null, 10); sourceBuilder = autocompleteRequest.source(); @@ -215,7 +259,7 @@ public void testCustomConfigWithDefault() { BoolQueryBuilder defaultQuery = (BoolQueryBuilder) shouldQueries.stream().filter(qb -> qb instanceof BoolQueryBuilder).findFirst().get(); - assertEquals(defaultQuery.should().size(), 3); + assertEquals(defaultQuery.should().size(), 4); // Custom customQuery = @@ -243,7 +287,8 @@ public void testCustomConfigWithInheritedQueryFunctionScores() { .build()) .build())) .build(), - QueryFilterRewriteChain.EMPTY); + QueryFilterRewriteChain.EMPTY, + testQueryConfig); SearchRequest autocompleteRequest = withInherit.getSearchRequest(mockOpContext, "input", null, null, 10); @@ -282,7 +327,8 @@ public void testCustomConfigWithInheritedQueryFunctionScores() { .build()) .build())) .build(), - QueryFilterRewriteChain.EMPTY); + QueryFilterRewriteChain.EMPTY, + testQueryConfig); autocompleteRequest = noQueryCustomization.getSearchRequest(mockOpContext, "input", null, null, 10); @@ -345,7 +391,8 @@ public void testCustomConfigWithFunctionScores() { "deprecated", Map.of("value", false))))))) .build())) .build(), - QueryFilterRewriteChain.EMPTY); + QueryFilterRewriteChain.EMPTY, + testQueryConfig); SearchRequest autocompleteRequest = explicitNoInherit.getSearchRequest(mockOpContext, "input", null, null, 10); @@ -398,7 +445,8 @@ public void testCustomConfigWithFunctionScores() { "deprecated", Map.of("value", false))))))) .build())) .build(), - QueryFilterRewriteChain.EMPTY); + QueryFilterRewriteChain.EMPTY, + testQueryConfig); autocompleteRequest = explicit.getSearchRequest(mockOpContext, "input", null, null, 10); sourceBuilder = autocompleteRequest.source(); @@ -411,7 +459,7 @@ public void testCustomConfigWithFunctionScores() { assertEquals(customQuery, QueryBuilders.matchAllQuery()); // standard query still present - assertEquals(((BoolQueryBuilder) query.should().get(1)).should().size(), 3); + assertEquals(((BoolQueryBuilder) query.should().get(1)).should().size(), 4); // custom functions included assertEquals(wrapper.filterFunctionBuilders(), expectedCustomScoreFunctions); diff --git a/metadata-io/src/test/resources/elasticsearch/sample_data/containerindex_v2.json.gz b/metadata-io/src/test/resources/elasticsearch/sample_data/containerindex_v2.json.gz index 2fa49c810abfa1af8cef95ac79136377d4b1ce4c..bd36747255f8604c37bc50fceb13293218a46330 100644 GIT binary patch literal 335 zcmV-V0kHlbiwFqPZ4PGu17mM)bYW?3WpZh5WMz0?b}}w%b8l_{?T|lf!!QuV_kIk` zHpGe(Qb9H^U0PGRl@QL-nHa1;mCh|8^t)Gf6K64$ln#Xigoekx_wIfN)_KE3|BJQ} z(i(!spcgz-IiKa2&DEm1U*&SK0tqT#u28B>0!;xL)=8Ry$9i!bH#ZG8EX zZ9eY~zo14K=nQSNq|lL-R=e~#@c$u>yT5#I{4>na(ck#jP_rqjLlL2NH=>sl17mM)bYW?3WpZh5WMz0?b}}w%b8l_{&5$u`!!Qtq_x=pc zF~o`!T!A+)U0PGRl@QL-1q&?6mCh|8^uJef9eXN;PKD!!$Gz|F-j$8sGLgFIwUEvd zXp3I(Tors Date: Wed, 16 Oct 2024 01:12:58 +0900 Subject: [PATCH 2/9] feat: add contributor pr open comment action (#11487) Co-authored-by: Harshal Sheth --- .../workflows/contributor-open-pr-comment.yml | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 .github/workflows/contributor-open-pr-comment.yml diff --git a/.github/workflows/contributor-open-pr-comment.yml b/.github/workflows/contributor-open-pr-comment.yml new file mode 100644 index 00000000000000..b3da16ca994baa --- /dev/null +++ b/.github/workflows/contributor-open-pr-comment.yml @@ -0,0 +1,39 @@ +name: PR Comment + +on: + pull_request: + types: [opened] + +jobs: + post-pr-opened-comment: + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v3 + + - name: Get and Format Username (PR only) + if: github.event_name == 'pull_request' + run: | + formatted_username=$(echo "${{ github.event.pull_request.user.login }}" | tr '[:upper:]' '[:lower:]' | sed 's/ /-/g') + echo "FORMATTED_USERNAME=$formatted_username" >> $GITHUB_ENV + + - name: Create Comment (PR only) + if: github.event_name == 'pull_request' + uses: actions/github-script@v6 + with: + script: | + if (context.payload.pull_request) { + const prUser = process.env.FORMATTED_USERNAME; + const url = `https://contributors.datahubproject.io/${prUser}`; + const body = `Hello @${prUser} :smile: \n\n Thank you so much for opening a pull request!\n\n![Image](https://contributors.datahubproject.io/api/og?userId=${{ github.event.pull_request.user.login }})\nYou can check out your contributor card and see all your past stats [here](${url})!`; + + // Create a comment on the PR + await github.rest.issues.createComment({ + owner: context.repo.owner, + repo: context.repo.repo, + issue_number: context.payload.pull_request.number, + body: body + }); + } else { + console.log('Not a pull request event.'); + } From fdae71d25f68842609bd7e2ec837744b56531029 Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Tue, 15 Oct 2024 11:44:55 -0500 Subject: [PATCH 3/9] docs(ingestion): add architecture diagrams (#11628) --- docs/advanced/mcp-mcl.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/docs/advanced/mcp-mcl.md b/docs/advanced/mcp-mcl.md index 9efb9b794954da..333891ba1a95d3 100644 --- a/docs/advanced/mcp-mcl.md +++ b/docs/advanced/mcp-mcl.md @@ -14,6 +14,18 @@ To mitigate these downsides, we are committed to providing cross-language client Ultimately, we intend to realize a state in which the Entities and Aspect schemas can be altered without requiring generated code and without maintaining a single mega-model schema (looking at you, Snapshot.pdl). The intention is that changes to the metadata model become even easier than they are today. +### Synchronous Ingestion Architecture + +

+ +

+ +### Asynchronous Ingestion Architecture + +

+ +

+ ## Modeling A Metadata Change Proposal is defined (in PDL) as follows From 5d3e464c21f384d5eb25a0291a77067a9605a9fc Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Tue, 15 Oct 2024 12:35:41 -0500 Subject: [PATCH 4/9] feat(validations): Ingest and metadata schema validators (#11619) Co-authored-by: Pedro Silva --- docs/how/updating-datahub.md | 2 + .../aspect/validation/FieldPathValidator.java | 116 +++++++++ .../validators/FieldPathValidatorTest.java | 233 ++++++++++++++++++ .../java/com/linkedin/metadata/Constants.java | 7 + .../datahub/ingestion/run/pipeline_config.py | 17 +- .../datahub/testing/compare_metadata_json.py | 4 + .../ExecutionRequestResultValidator.java | 70 ++++++ .../ExecutionRequestResultValidatorTest.java | 166 +++++++++++++ .../src/main/resources/entity-registry.yml | 6 + .../SpringStandardPluginConfiguration.java | 46 ++++ 10 files changed, 663 insertions(+), 4 deletions(-) create mode 100644 entity-registry/src/main/java/com/linkedin/metadata/aspect/validation/FieldPathValidator.java create mode 100644 entity-registry/src/test/java/com/linkedin/metadata/aspect/validators/FieldPathValidatorTest.java create mode 100644 metadata-io/src/main/java/com/linkedin/metadata/aspect/validation/ExecutionRequestResultValidator.java create mode 100644 metadata-io/src/test/java/com/linkedin/metadata/aspect/validation/ExecutionRequestResultValidatorTest.java diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index 00e020bd2a3875..dbcc7da8467035 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -24,6 +24,8 @@ This file documents any backwards-incompatible changes in DataHub and assists pe - #11484 - Metadata service authentication enabled by default - #11484 - Rest API authorization enabled by default - #10472 - `SANDBOX` added as a FabricType. No rollbacks allowed once metadata with this fabric type is added without manual cleanups in databases. +- #11619 - schema field/column paths can no longer be empty strings +- #11619 - schema field/column paths can no longer be duplicated within the schema ### Potential Downtime diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/validation/FieldPathValidator.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/validation/FieldPathValidator.java new file mode 100644 index 00000000000000..7c279254e1bc33 --- /dev/null +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/validation/FieldPathValidator.java @@ -0,0 +1,116 @@ +package com.linkedin.metadata.aspect.validation; + +import static com.linkedin.metadata.Constants.*; + +import com.linkedin.metadata.aspect.RetrieverContext; +import com.linkedin.metadata.aspect.batch.BatchItem; +import com.linkedin.metadata.aspect.batch.ChangeMCP; +import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; +import com.linkedin.metadata.aspect.plugins.validation.AspectPayloadValidator; +import com.linkedin.metadata.aspect.plugins.validation.AspectValidationException; +import com.linkedin.metadata.aspect.plugins.validation.ValidationExceptionCollection; +import com.linkedin.schema.EditableSchemaFieldInfo; +import com.linkedin.schema.EditableSchemaMetadata; +import com.linkedin.schema.SchemaField; +import com.linkedin.schema.SchemaMetadata; +import java.util.Collection; +import java.util.Optional; +import java.util.stream.Stream; +import javax.annotation.Nonnull; +import lombok.Getter; +import lombok.Setter; +import lombok.experimental.Accessors; + +/** + * 1. Validates the Schema Field Path specification, specifically that all field IDs must be unique + * across all fields within a schema. 2. Validates that the field path id is not empty. + * + * @see Field + * Path V2 docs + */ +@Setter +@Getter +@Accessors(chain = true) +public class FieldPathValidator extends AspectPayloadValidator { + @Nonnull private AspectPluginConfig config; + + /** Prevent any MCP for SchemaMetadata where field ids are duplicated. */ + @Override + protected Stream validateProposedAspects( + @Nonnull Collection mcpItems, + @Nonnull RetrieverContext retrieverContext) { + + ValidationExceptionCollection exceptions = ValidationExceptionCollection.newCollection(); + + mcpItems.forEach( + i -> { + if (i.getAspectName().equals(SCHEMA_METADATA_ASPECT_NAME)) { + processSchemaMetadataAspect(i, exceptions); + } else { + processEditableSchemaMetadataAspect(i, exceptions); + } + }); + + return exceptions.streamAllExceptions(); + } + + @Override + protected Stream validatePreCommitAspects( + @Nonnull Collection changeMCPs, @Nonnull RetrieverContext retrieverContext) { + return Stream.of(); + } + + private static void processEditableSchemaMetadataAspect( + BatchItem i, ValidationExceptionCollection exceptions) { + final EditableSchemaMetadata schemaMetadata = i.getAspect(EditableSchemaMetadata.class); + final long uniquePaths = + validateAndCount( + i, + schemaMetadata.getEditableSchemaFieldInfo().stream() + .map(EditableSchemaFieldInfo::getFieldPath), + exceptions); + + if (uniquePaths != schemaMetadata.getEditableSchemaFieldInfo().size()) { + exceptions.addException( + i, + String.format( + "Cannot perform %s action on proposal. EditableSchemaMetadata aspect has duplicated field paths", + i.getChangeType())); + } + } + + private static void processSchemaMetadataAspect( + BatchItem i, ValidationExceptionCollection exceptions) { + final SchemaMetadata schemaMetadata = i.getAspect(SchemaMetadata.class); + final long uniquePaths = + validateAndCount( + i, schemaMetadata.getFields().stream().map(SchemaField::getFieldPath), exceptions); + + if (uniquePaths != schemaMetadata.getFields().size()) { + exceptions.addException( + i, + String.format( + "Cannot perform %s action on proposal. SchemaMetadata aspect has duplicated field paths", + i.getChangeType())); + } + } + + private static long validateAndCount( + BatchItem i, Stream fieldPaths, ValidationExceptionCollection exceptions) { + return fieldPaths + .distinct() + // inspect the stream of fieldPath validation errors since we're already iterating + .peek( + fieldPath -> + validateFieldPath(fieldPath) + .ifPresent(message -> exceptions.addException(i, message))) + .count(); + } + + private static Optional validateFieldPath(String fieldPath) { + if (fieldPath == null || fieldPath.isEmpty()) { + return Optional.of("SchemaMetadata aspect has empty field path."); + } + return Optional.empty(); + } +} diff --git a/entity-registry/src/test/java/com/linkedin/metadata/aspect/validators/FieldPathValidatorTest.java b/entity-registry/src/test/java/com/linkedin/metadata/aspect/validators/FieldPathValidatorTest.java new file mode 100644 index 00000000000000..bd5912764edce3 --- /dev/null +++ b/entity-registry/src/test/java/com/linkedin/metadata/aspect/validators/FieldPathValidatorTest.java @@ -0,0 +1,233 @@ +package com.linkedin.metadata.aspect.validators; + +import static com.linkedin.metadata.Constants.*; +import static org.mockito.Mockito.*; +import static org.testng.Assert.*; + +import com.linkedin.common.urn.DatasetUrn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.aspect.AspectRetriever; +import com.linkedin.metadata.aspect.GraphRetriever; +import com.linkedin.metadata.aspect.RetrieverContext; +import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; +import com.linkedin.metadata.aspect.plugins.validation.AspectValidationException; +import com.linkedin.metadata.aspect.validation.CreateIfNotExistsValidator; +import com.linkedin.metadata.aspect.validation.FieldPathValidator; +import com.linkedin.metadata.models.registry.EntityRegistry; +import com.linkedin.schema.EditableSchemaFieldInfo; +import com.linkedin.schema.EditableSchemaFieldInfoArray; +import com.linkedin.schema.EditableSchemaMetadata; +import com.linkedin.schema.SchemaField; +import com.linkedin.schema.SchemaFieldArray; +import com.linkedin.schema.SchemaFieldDataType; +import com.linkedin.schema.SchemaMetadata; +import com.linkedin.schema.StringType; +import com.linkedin.test.metadata.aspect.TestEntityRegistry; +import com.linkedin.test.metadata.aspect.batch.TestMCP; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +public class FieldPathValidatorTest { + + private static final AspectPluginConfig validatorConfig = + AspectPluginConfig.builder() + .supportedOperations( + Arrays.stream(ChangeType.values()) + .map(Objects::toString) + .collect(Collectors.toList())) + .className(CreateIfNotExistsValidator.class.getName()) + .supportedEntityAspectNames(List.of(AspectPluginConfig.EntityAspectName.ALL)) + .enabled(true) + .build(); + private EntityRegistry entityRegistry; + private RetrieverContext mockRetrieverContext; + private static final DatasetUrn TEST_DATASET_URN; + private final FieldPathValidator test = new FieldPathValidator().setConfig(validatorConfig); + + static { + try { + TEST_DATASET_URN = + DatasetUrn.createFromUrn( + UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,test,PROD)")); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + @BeforeTest + public void init() { + entityRegistry = new TestEntityRegistry(); + AspectRetriever mockAspectRetriever = mock(AspectRetriever.class); + when(mockAspectRetriever.getEntityRegistry()).thenReturn(entityRegistry); + GraphRetriever mockGraphRetriever = mock(GraphRetriever.class); + mockRetrieverContext = mock(RetrieverContext.class); + when(mockRetrieverContext.getAspectRetriever()).thenReturn(mockAspectRetriever); + when(mockRetrieverContext.getGraphRetriever()).thenReturn(mockGraphRetriever); + } + + @Test + public void testValidateNonDuplicatedSchemaFieldPath() { + final SchemaMetadata schema = getMockSchemaMetadataAspect(false); + assertEquals( + test.validateProposed( + Set.of( + TestMCP.builder() + .changeType(ChangeType.UPSERT) + .urn(TEST_DATASET_URN) + .entitySpec(entityRegistry.getEntitySpec(TEST_DATASET_URN.getEntityType())) + .aspectSpec( + entityRegistry + .getEntitySpec(TEST_DATASET_URN.getEntityType()) + .getAspectSpec(SCHEMA_METADATA_ASPECT_NAME)) + .recordTemplate(schema) + .build()), + mockRetrieverContext) + .count(), + 0); + } + + @Test + public void testValidateDuplicatedSchemaFieldPath() { + final SchemaMetadata schema = getMockSchemaMetadataAspect(true); + + assertEquals( + test.validateProposed( + Set.of( + TestMCP.builder() + .changeType(ChangeType.UPSERT) + .urn(TEST_DATASET_URN) + .entitySpec(entityRegistry.getEntitySpec(TEST_DATASET_URN.getEntityType())) + .aspectSpec( + entityRegistry + .getEntitySpec(TEST_DATASET_URN.getEntityType()) + .getAspectSpec(SCHEMA_METADATA_ASPECT_NAME)) + .recordTemplate(schema) + .build()), + mockRetrieverContext) + .count(), + 1); + } + + @Test + public void testValidateNonDuplicatedEditableSchemaFieldPath() { + final EditableSchemaMetadata schema = getMockEditableSchemaMetadataAspect(false); + assertEquals( + test.validateProposed( + Set.of( + TestMCP.builder() + .changeType(ChangeType.UPSERT) + .urn(TEST_DATASET_URN) + .entitySpec(entityRegistry.getEntitySpec(TEST_DATASET_URN.getEntityType())) + .aspectSpec( + entityRegistry + .getEntitySpec(TEST_DATASET_URN.getEntityType()) + .getAspectSpec(EDITABLE_SCHEMA_METADATA_ASPECT_NAME)) + .recordTemplate(schema) + .build()), + mockRetrieverContext) + .count(), + 0); + } + + @Test + public void testValidateDuplicatedEditableSchemaFieldPath() { + final EditableSchemaMetadata schema = getMockEditableSchemaMetadataAspect(true); + + assertEquals( + test.validateProposed( + Set.of( + TestMCP.builder() + .changeType(ChangeType.UPSERT) + .urn(TEST_DATASET_URN) + .entitySpec(entityRegistry.getEntitySpec(TEST_DATASET_URN.getEntityType())) + .aspectSpec( + entityRegistry + .getEntitySpec(TEST_DATASET_URN.getEntityType()) + .getAspectSpec(EDITABLE_SCHEMA_METADATA_ASPECT_NAME)) + .recordTemplate(schema) + .build()), + mockRetrieverContext) + .count(), + 1); + } + + @Test + public void testEmptySchemaFieldPath() { + final SchemaMetadata schema = getMockSchemaMetadataAspect(false, ""); + TestMCP testItem = + TestMCP.builder() + .changeType(ChangeType.UPSERT) + .urn(TEST_DATASET_URN) + .entitySpec(entityRegistry.getEntitySpec(TEST_DATASET_URN.getEntityType())) + .aspectSpec( + entityRegistry + .getEntitySpec(TEST_DATASET_URN.getEntityType()) + .getAspectSpec(SCHEMA_METADATA_ASPECT_NAME)) + .recordTemplate(schema) + .build(); + + Set exceptions = + test.validateProposed(Set.of(testItem), mockRetrieverContext).collect(Collectors.toSet()); + + assertEquals( + exceptions, + Set.of( + AspectValidationException.forItem( + testItem, "SchemaMetadata aspect has empty field path."))); + } + + private static SchemaMetadata getMockSchemaMetadataAspect(boolean duplicateFields) { + return getMockSchemaMetadataAspect(duplicateFields, null); + } + + private static SchemaMetadata getMockSchemaMetadataAspect( + boolean duplicateFields, @Nullable String fieldPath) { + List fields = new ArrayList<>(); + fields.add( + new SchemaField() + .setType( + new SchemaFieldDataType() + .setType(SchemaFieldDataType.Type.create(new StringType()))) + .setNullable(false) + .setNativeDataType("string") + .setFieldPath(fieldPath == null ? "test" : fieldPath)); + + if (duplicateFields) { + fields.add( + new SchemaField() + .setType( + new SchemaFieldDataType() + .setType(SchemaFieldDataType.Type.create(new StringType()))) + .setNullable(false) + .setNativeDataType("string") + .setFieldPath(fieldPath == null ? "test" : fieldPath)); + } + + return new SchemaMetadata() + .setPlatform(TEST_DATASET_URN.getPlatformEntity()) + .setFields(new SchemaFieldArray(fields)); + } + + private static EditableSchemaMetadata getMockEditableSchemaMetadataAspect( + boolean duplicateFields) { + + List fields = new ArrayList<>(); + fields.add(new EditableSchemaFieldInfo().setFieldPath("test")); + + if (duplicateFields) { + fields.add(new EditableSchemaFieldInfo().setFieldPath("test")); + } + + return new EditableSchemaMetadata() + .setEditableSchemaFieldInfo(new EditableSchemaFieldInfoArray(fields)); + } +} diff --git a/li-utils/src/main/java/com/linkedin/metadata/Constants.java b/li-utils/src/main/java/com/linkedin/metadata/Constants.java index e085a5876a42b0..8961677b568788 100644 --- a/li-utils/src/main/java/com/linkedin/metadata/Constants.java +++ b/li-utils/src/main/java/com/linkedin/metadata/Constants.java @@ -319,6 +319,13 @@ public class Constants { public static final String EXECUTION_REQUEST_INPUT_ASPECT_NAME = "dataHubExecutionRequestInput"; public static final String EXECUTION_REQUEST_SIGNAL_ASPECT_NAME = "dataHubExecutionRequestSignal"; public static final String EXECUTION_REQUEST_RESULT_ASPECT_NAME = "dataHubExecutionRequestResult"; + public static final String EXECUTION_REQUEST_STATUS_RUNNING = "RUNNING"; + public static final String EXECUTION_REQUEST_STATUS_FAILURE = "FAILURE"; + public static final String EXECUTION_REQUEST_STATUS_SUCCESS = "SUCCESS"; + public static final String EXECUTION_REQUEST_STATUS_TIMEOUT = "TIMEOUT"; + public static final String EXECUTION_REQUEST_STATUS_CANCELLED = "CANCELLED"; + public static final String EXECUTION_REQUEST_STATUS_ABORTED = "ABORTED"; + public static final String EXECUTION_REQUEST_STATUS_DUPLICATE = "DUPLICATE"; // DataHub Access Token public static final String ACCESS_TOKEN_KEY_ASPECT_NAME = "dataHubAccessTokenKey"; diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py index 98629ba030695a..2b2f992249f1e8 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py @@ -1,6 +1,7 @@ import datetime import logging -import uuid +import random +import string from typing import Any, Dict, List, Optional from pydantic import Field, validator @@ -71,6 +72,15 @@ class FlagsConfig(ConfigModel): ) +def _generate_run_id(source_type: Optional[str] = None) -> str: + current_time = datetime.datetime.now().strftime("%Y_%m_%d-%H_%M_%S") + random_suffix = "".join(random.choices(string.ascii_lowercase + string.digits, k=6)) + + if source_type is None: + source_type = "ingestion" + return f"{source_type}-{current_time}-{random_suffix}" + + class PipelineConfig(ConfigModel): source: SourceConfig sink: Optional[DynamicTypedConfig] = None @@ -91,12 +101,11 @@ def run_id_should_be_semantic( cls, v: Optional[str], values: Dict[str, Any], **kwargs: Any ) -> str: if v == DEFAULT_RUN_ID: + source_type = None if "source" in values and hasattr(values["source"], "type"): source_type = values["source"].type - current_time = datetime.datetime.now().strftime("%Y_%m_%d-%H_%M_%S") - return f"{source_type}-{current_time}" - return str(uuid.uuid1()) # default run_id if we cannot infer a source type + return _generate_run_id(source_type) else: assert v is not None return v diff --git a/metadata-ingestion/src/datahub/testing/compare_metadata_json.py b/metadata-ingestion/src/datahub/testing/compare_metadata_json.py index 61b222f8d2dd50..155773f9898b47 100644 --- a/metadata-ingestion/src/datahub/testing/compare_metadata_json.py +++ b/metadata-ingestion/src/datahub/testing/compare_metadata_json.py @@ -27,6 +27,8 @@ r"root\[\d+\]\['aspect'\]\['json'\]\['lastUpdatedTimestamp'\]", r"root\[\d+\]\['aspect'\]\['json'\]\['created'\]", r"root\[\d+\]\['aspect'\]\['json'\]\['lastModified'\]", + r"root\[\d+\].*?\['systemMetadata'\]\['runId'\]", + r"root\[\d+\].*?\['systemMetadata'\]\['lastRunId'\]", ] @@ -82,6 +84,8 @@ def assert_metadata_files_equal( json_path = f"root[{i}]['aspect']['json'][{j}]['value']" ignore_paths = (*ignore_paths, re.escape(json_path)) + ignore_paths = (*ignore_paths, *default_exclude_paths) + diff = diff_metadata_json(output, golden, ignore_paths, ignore_order=ignore_order) if diff and update_golden: if isinstance(diff, MCPDiff) and diff.is_delta_valid: diff --git a/metadata-io/src/main/java/com/linkedin/metadata/aspect/validation/ExecutionRequestResultValidator.java b/metadata-io/src/main/java/com/linkedin/metadata/aspect/validation/ExecutionRequestResultValidator.java new file mode 100644 index 00000000000000..b77d3b48d5bd58 --- /dev/null +++ b/metadata-io/src/main/java/com/linkedin/metadata/aspect/validation/ExecutionRequestResultValidator.java @@ -0,0 +1,70 @@ +package com.linkedin.metadata.aspect.validation; + +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_ABORTED; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_CANCELLED; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_DUPLICATE; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_SUCCESS; + +import com.linkedin.execution.ExecutionRequestResult; +import com.linkedin.metadata.aspect.RetrieverContext; +import com.linkedin.metadata.aspect.batch.BatchItem; +import com.linkedin.metadata.aspect.batch.ChangeMCP; +import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; +import com.linkedin.metadata.aspect.plugins.validation.AspectPayloadValidator; +import com.linkedin.metadata.aspect.plugins.validation.AspectValidationException; +import java.util.Collection; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Stream; +import javax.annotation.Nonnull; +import lombok.Getter; +import lombok.Setter; +import lombok.experimental.Accessors; +import lombok.extern.slf4j.Slf4j; + +/** A Validator for StructuredProperties Aspect that is attached to entities like Datasets, etc. */ +@Setter +@Getter +@Slf4j +@Accessors(chain = true) +public class ExecutionRequestResultValidator extends AspectPayloadValidator { + private static final Set IMMUTABLE_STATUS = + Set.of( + EXECUTION_REQUEST_STATUS_ABORTED, + EXECUTION_REQUEST_STATUS_CANCELLED, + EXECUTION_REQUEST_STATUS_SUCCESS, + EXECUTION_REQUEST_STATUS_DUPLICATE); + + @Nonnull private AspectPluginConfig config; + + @Override + protected Stream validateProposedAspects( + @Nonnull Collection mcpItems, + @Nonnull RetrieverContext retrieverContext) { + return Stream.of(); + } + + @Override + protected Stream validatePreCommitAspects( + @Nonnull Collection changeMCPs, @Nonnull RetrieverContext retrieverContext) { + return changeMCPs.stream() + .filter(item -> item.getPreviousRecordTemplate() != null) + .map( + item -> { + ExecutionRequestResult existingResult = + item.getPreviousAspect(ExecutionRequestResult.class); + + if (IMMUTABLE_STATUS.contains(existingResult.getStatus())) { + ExecutionRequestResult currentResult = item.getAspect(ExecutionRequestResult.class); + return AspectValidationException.forItem( + item, + String.format( + "Invalid update to immutable state for aspect dataHubExecutionRequestResult. Execution urn: %s previous status: %s. Denied status update: %s", + item.getUrn(), existingResult.getStatus(), currentResult.getStatus())); + } + + return null; + }) + .filter(Objects::nonNull); + } +} diff --git a/metadata-io/src/test/java/com/linkedin/metadata/aspect/validation/ExecutionRequestResultValidatorTest.java b/metadata-io/src/test/java/com/linkedin/metadata/aspect/validation/ExecutionRequestResultValidatorTest.java new file mode 100644 index 00000000000000..f46772ca7b350d --- /dev/null +++ b/metadata-io/src/test/java/com/linkedin/metadata/aspect/validation/ExecutionRequestResultValidatorTest.java @@ -0,0 +1,166 @@ +package com.linkedin.metadata.aspect.validation; + +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_ENTITY_NAME; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_RESULT_ASPECT_NAME; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_ABORTED; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_CANCELLED; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_DUPLICATE; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_FAILURE; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_RUNNING; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_SUCCESS; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_TIMEOUT; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.execution.ExecutionRequestResult; +import com.linkedin.metadata.aspect.RetrieverContext; +import com.linkedin.metadata.aspect.SystemAspect; +import com.linkedin.metadata.aspect.batch.ChangeMCP; +import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; +import com.linkedin.metadata.aspect.plugins.validation.AspectValidationException; +import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl; +import com.linkedin.metadata.utils.AuditStampUtils; +import io.datahubproject.metadata.context.OperationContext; +import io.datahubproject.test.metadata.context.TestOperationContexts; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.testng.annotations.Test; + +public class ExecutionRequestResultValidatorTest { + private static final OperationContext TEST_CONTEXT = + TestOperationContexts.systemContextNoSearchAuthorization(); + private static final AspectPluginConfig TEST_PLUGIN_CONFIG = + AspectPluginConfig.builder() + .className(ExecutionRequestResultValidator.class.getName()) + .enabled(true) + .supportedOperations(List.of("UPSERT")) + .supportedEntityAspectNames( + List.of( + AspectPluginConfig.EntityAspectName.builder() + .entityName(EXECUTION_REQUEST_ENTITY_NAME) + .aspectName(EXECUTION_REQUEST_RESULT_ASPECT_NAME) + .build())) + .build(); + private static final Urn TEST_URN = UrnUtils.getUrn("urn:li:dataHubExecutionRequest:xyz"); + + @Test + public void testAllowed() { + ExecutionRequestResultValidator test = new ExecutionRequestResultValidator(); + test.setConfig(TEST_PLUGIN_CONFIG); + + Set allowedUpdateStates = + Set.of( + EXECUTION_REQUEST_STATUS_RUNNING, + EXECUTION_REQUEST_STATUS_FAILURE, + EXECUTION_REQUEST_STATUS_TIMEOUT); + Set destinationStates = new HashSet<>(allowedUpdateStates); + destinationStates.addAll( + Set.of( + EXECUTION_REQUEST_STATUS_ABORTED, + EXECUTION_REQUEST_STATUS_CANCELLED, + EXECUTION_REQUEST_STATUS_SUCCESS, + EXECUTION_REQUEST_STATUS_DUPLICATE)); + + List testItems = + new ArrayList<>( + // Tests with previous state + allowedUpdateStates.stream() + .flatMap( + prevState -> + destinationStates.stream() + .map( + destState -> { + SystemAspect prevData = mock(SystemAspect.class); + when(prevData.getRecordTemplate()) + .thenReturn( + new ExecutionRequestResult().setStatus(prevState)); + return ChangeItemImpl.builder() + .changeType(ChangeType.UPSERT) + .urn(TEST_URN) + .aspectName(EXECUTION_REQUEST_RESULT_ASPECT_NAME) + .recordTemplate( + new ExecutionRequestResult().setStatus(destState)) + .previousSystemAspect(prevData) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build(TEST_CONTEXT.getAspectRetriever()); + })) + .toList()); + // Tests with no previous + testItems.addAll( + destinationStates.stream() + .map( + destState -> + ChangeItemImpl.builder() + .changeType(ChangeType.UPSERT) + .urn(TEST_URN) + .aspectName(EXECUTION_REQUEST_RESULT_ASPECT_NAME) + .recordTemplate(new ExecutionRequestResult().setStatus(destState)) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build(TEST_CONTEXT.getAspectRetriever())) + .toList()); + + List result = + test.validatePreCommitAspects(testItems, mock(RetrieverContext.class)).toList(); + + assertTrue(result.isEmpty(), "Did not expect any validation errors."); + } + + @Test + public void testDenied() { + ExecutionRequestResultValidator test = new ExecutionRequestResultValidator(); + test.setConfig(TEST_PLUGIN_CONFIG); + + Set deniedUpdateStates = + Set.of( + EXECUTION_REQUEST_STATUS_ABORTED, + EXECUTION_REQUEST_STATUS_CANCELLED, + EXECUTION_REQUEST_STATUS_SUCCESS, + EXECUTION_REQUEST_STATUS_DUPLICATE); + Set destinationStates = new HashSet<>(deniedUpdateStates); + destinationStates.addAll( + Set.of( + EXECUTION_REQUEST_STATUS_RUNNING, + EXECUTION_REQUEST_STATUS_FAILURE, + EXECUTION_REQUEST_STATUS_TIMEOUT)); + + List testItems = + new ArrayList<>( + // Tests with previous state + deniedUpdateStates.stream() + .flatMap( + prevState -> + destinationStates.stream() + .map( + destState -> { + SystemAspect prevData = mock(SystemAspect.class); + when(prevData.getRecordTemplate()) + .thenReturn( + new ExecutionRequestResult().setStatus(prevState)); + return ChangeItemImpl.builder() + .changeType(ChangeType.UPSERT) + .urn(TEST_URN) + .aspectName(EXECUTION_REQUEST_RESULT_ASPECT_NAME) + .recordTemplate( + new ExecutionRequestResult().setStatus(destState)) + .previousSystemAspect(prevData) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build(TEST_CONTEXT.getAspectRetriever()); + })) + .toList()); + + List result = + test.validatePreCommitAspects(testItems, mock(RetrieverContext.class)).toList(); + + assertEquals( + result.size(), + deniedUpdateStates.size() * destinationStates.size(), + "Expected ALL items to be denied."); + } +} diff --git a/metadata-models/src/main/resources/entity-registry.yml b/metadata-models/src/main/resources/entity-registry.yml index 9b692b51dc2b5f..ec9c3fee1c404c 100644 --- a/metadata-models/src/main/resources/entity-registry.yml +++ b/metadata-models/src/main/resources/entity-registry.yml @@ -673,6 +673,12 @@ plugins: supportedEntityAspectNames: - entityName: '*' aspectName: '*' + - className: 'com.linkedin.metadata.aspect.plugins.validation.AspectPayloadValidator' + enabled: true + spring: + enabled: true + packageScan: + - com.linkedin.gms.factory.plugins mcpSideEffects: - className: 'com.linkedin.metadata.structuredproperties.hooks.PropertyDefinitionDeleteSideEffect' packageScan: diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/plugins/SpringStandardPluginConfiguration.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/plugins/SpringStandardPluginConfiguration.java index 4a2095685abe1f..943b1c7184a60d 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/plugins/SpringStandardPluginConfiguration.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/plugins/SpringStandardPluginConfiguration.java @@ -1,5 +1,8 @@ package com.linkedin.gms.factory.plugins; +import static com.linkedin.metadata.Constants.EDITABLE_SCHEMA_METADATA_ASPECT_NAME; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_ENTITY_NAME; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_RESULT_ASPECT_NAME; import static com.linkedin.metadata.Constants.SCHEMA_METADATA_ASPECT_NAME; import com.linkedin.metadata.Constants; @@ -7,6 +10,9 @@ import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; import com.linkedin.metadata.aspect.plugins.hooks.MCPSideEffect; import com.linkedin.metadata.aspect.plugins.hooks.MutationHook; +import com.linkedin.metadata.aspect.plugins.validation.AspectPayloadValidator; +import com.linkedin.metadata.aspect.validation.ExecutionRequestResultValidator; +import com.linkedin.metadata.aspect.validation.FieldPathValidator; import com.linkedin.metadata.dataproducts.sideeffects.DataProductUnsetSideEffect; import com.linkedin.metadata.schemafields.sideeffects.SchemaFieldSideEffect; import com.linkedin.metadata.timeline.eventgenerator.EntityChangeEventGeneratorRegistry; @@ -21,6 +27,7 @@ @Configuration @Slf4j public class SpringStandardPluginConfiguration { + private static final String ALL = "*"; @Value("${metadataChangeProposal.validation.ignoreUnknown}") private boolean ignoreUnknownEnabled; @@ -104,4 +111,43 @@ public MCPSideEffect dataProductUnsetSideEffect() { log.info("Initialized {}", SchemaFieldSideEffect.class.getName()); return new DataProductUnsetSideEffect().setConfig(config); } + + @Bean + public AspectPayloadValidator fieldPathValidator() { + return new FieldPathValidator() + .setConfig( + AspectPluginConfig.builder() + .className(FieldPathValidator.class.getName()) + .enabled(true) + .supportedOperations( + List.of("CREATE", "CREATE_ENTITY", "UPSERT", "UPDATE", "RESTATE")) + .supportedEntityAspectNames( + List.of( + AspectPluginConfig.EntityAspectName.builder() + .entityName(ALL) + .aspectName(SCHEMA_METADATA_ASPECT_NAME) + .build(), + AspectPluginConfig.EntityAspectName.builder() + .entityName(ALL) + .aspectName(EDITABLE_SCHEMA_METADATA_ASPECT_NAME) + .build())) + .build()); + } + + @Bean + public AspectPayloadValidator dataHubExecutionRequestResultValidator() { + return new ExecutionRequestResultValidator() + .setConfig( + AspectPluginConfig.builder() + .className(ExecutionRequestResultValidator.class.getName()) + .enabled(true) + .supportedOperations(List.of("UPSERT", "UPDATE")) + .supportedEntityAspectNames( + List.of( + AspectPluginConfig.EntityAspectName.builder() + .entityName(EXECUTION_REQUEST_ENTITY_NAME) + .aspectName(EXECUTION_REQUEST_RESULT_ASPECT_NAME) + .build())) + .build()); + } } From 555f391c24fe245c96fdbdb7462ab80e9a87acaa Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Tue, 15 Oct 2024 13:14:21 -0500 Subject: [PATCH 5/9] fix(ci): Update contributor-open-pr-comment.yml (#11631) --- .github/workflows/contributor-open-pr-comment.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/contributor-open-pr-comment.yml b/.github/workflows/contributor-open-pr-comment.yml index b3da16ca994baa..2f700290ee0f28 100644 --- a/.github/workflows/contributor-open-pr-comment.yml +++ b/.github/workflows/contributor-open-pr-comment.yml @@ -4,6 +4,9 @@ on: pull_request: types: [opened] +permissions: + pull-requests: write + jobs: post-pr-opened-comment: runs-on: ubuntu-latest From 14a22bfeaf8bedcb873da6dbcb0a40833fde96a3 Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Tue, 15 Oct 2024 13:14:46 -0500 Subject: [PATCH 6/9] fix(ci): add runtime limit (#11630) --- .github/workflows/metadata-ingestion.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/metadata-ingestion.yml b/.github/workflows/metadata-ingestion.yml index c718febca398a9..92dcb3d8ac2890 100644 --- a/.github/workflows/metadata-ingestion.yml +++ b/.github/workflows/metadata-ingestion.yml @@ -26,6 +26,7 @@ concurrency: jobs: metadata-ingestion: runs-on: ubuntu-latest + timeout-minutes: 40 env: SPARK_VERSION: 3.3.2 DATAHUB_TELEMETRY_ENABLED: false From 2bc96e9e69f4523cd5eb27bf0b60e309694b7b17 Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Tue, 15 Oct 2024 14:43:36 -0500 Subject: [PATCH 7/9] fix(ci): metadata-io req python (#11632) --- .github/workflows/metadata-io.yml | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/.github/workflows/metadata-io.yml b/.github/workflows/metadata-io.yml index 7018b42949e892..5ee2223d71b039 100644 --- a/.github/workflows/metadata-io.yml +++ b/.github/workflows/metadata-io.yml @@ -57,17 +57,16 @@ jobs: - name: Disk Check run: df -h . && docker images - uses: acryldata/sane-checkout-action@v3 + - uses: actions/setup-python@v5 + with: + python-version: "3.10" + cache: "pip" - name: Set up JDK 17 uses: actions/setup-java@v4 with: distribution: "zulu" java-version: 17 - uses: gradle/actions/setup-gradle@v3 - - uses: actions/setup-python@v5 - if: ${{ needs.setup.outputs.ingestion_change == 'true' }} - with: - python-version: "3.10" - cache: "pip" - name: Gradle build (and test) run: | ./gradlew :metadata-io:test From 2f2a7af58428563a3acb2963c8278445513f8e85 Mon Sep 17 00:00:00 2001 From: Hyejin Yoon <0327jane@gmail.com> Date: Wed, 16 Oct 2024 15:49:59 +0900 Subject: [PATCH 8/9] feat: add quickstart post (#11623) --- .../examples/mce_files/bootstrap_mce.json | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/metadata-ingestion/examples/mce_files/bootstrap_mce.json b/metadata-ingestion/examples/mce_files/bootstrap_mce.json index bc218e5e8c2d53..f0c4e7ff996ed3 100644 --- a/metadata-ingestion/examples/mce_files/bootstrap_mce.json +++ b/metadata-ingestion/examples/mce_files/bootstrap_mce.json @@ -3613,6 +3613,33 @@ }, "systemMetadata": null }, + { + "entityType": "post", + "entityUrn": "urn:li:post:f3a68539-f7e4-4c41-a4fd-9e57c085d8de", + "changeType": "UPSERT", + "aspectName": "postInfo", + "aspect": { + "json": { + "type": "HOME_PAGE_ANNOUNCEMENT", + "content": { + "title": "Join Metadata & AI Summit 2024", + "type": "LINK", + "link": "http://www.acryldata.io/conference?utm_source=datahub_quickstart&utm_medium=metadata_ai_2024&utm_campaign=pinned_announcement", + "media": { + "type": "IMAGE", + "location": "https://formulatedby.com/wp-content/uploads/2024/07/0193320a6d93e7508d1598f7b24662f75a87e92f-352x456-1.svg" + } + }, + "created": 1712547125049, + "lastModified": 1712547125049 + } + }, + "systemMetadata": { + "lastObserved": 1712548844816, + "runId": "datahub-2024_04_08-13_00_44", + "lastRunId": "no-run-id-provided" + } + }, { "entityType": "post", "entityUrn": "urn:li:post:f3a68539-f7e4-4c41-a4fd-9e57c085d8dd", From e76647dd7a094907bdfa22682b79b35965e5537c Mon Sep 17 00:00:00 2001 From: Tamas Nemeth Date: Wed, 16 Oct 2024 10:04:01 +0200 Subject: [PATCH 9/9] feat(ingest/bigquery): Generate platform resource entities for BigQuery labels (#11602) Co-authored-by: Shirshanka Das --- .../ingestion/source/bigquery_v2/bigquery.py | 1 + .../bigquery_platform_resource_helper.py | 144 +++++++++++++ .../source/bigquery_v2/bigquery_schema_gen.py | 114 +++++++++-- .../bigquery_v2/bigquery_mcp_golden.json | 192 ++++++++++++++++++ .../integration/bigquery_v2/test_bigquery.py | 41 +++- .../tests/unit/test_bigquery_source.py | 74 ++++--- 6 files changed, 517 insertions(+), 49 deletions(-) create mode 100644 metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_platform_resource_helper.py diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index 4cc3ec50bacd46..c30dade921d257 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -188,6 +188,7 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config): self.sql_parser_schema_resolver, self.profiler, self.identifiers, + self.ctx.graph, ) self.add_config_to_report() diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_platform_resource_helper.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_platform_resource_helper.py new file mode 100644 index 00000000000000..d2da895be985dc --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_platform_resource_helper.py @@ -0,0 +1,144 @@ +import logging +from dataclasses import dataclass +from typing import Optional + +import cachetools +from pydantic import BaseModel, ValidationError + +from datahub.api.entities.platformresource.platform_resource import ( + PlatformResource, + PlatformResourceKey, +) +from datahub.ingestion.graph.client import DataHubGraph +from datahub.metadata.urns import TagUrn + +logger: logging.Logger = logging.getLogger(__name__) + + +@dataclass +class BigQueryLabel: + key: str + value: Optional[str] + + def primary_key(self) -> str: + return f"{self.key}/{self.value}" if self.value else f"{self.key}" + + +class BigQueryLabelInfo(BaseModel): + datahub_urn: str + managed_by_datahub: bool + key: str + value: str + + +@dataclass() +class BigQueryLabelPlatformResource: + datahub_urn: str + project: Optional[str] + managed_by_datahub: bool + label: BigQueryLabel + + def platform_resource_key(self) -> PlatformResourceKey: + return PlatformResourceKey( + platform="bigquery", + resource_type="BigQueryLabelInfo", + platform_instance=self.project, + primary_key=self.label.primary_key(), + ) + + def platform_resource_info(self) -> BigQueryLabelInfo: + bq_label_info = BigQueryLabelInfo( + datahub_urn=self.datahub_urn, + managed_by_datahub=self.managed_by_datahub, + key=self.label.key, + value=self.label.value, + ) + return bq_label_info + + def platform_resource(self) -> PlatformResource: + return PlatformResource.create( + key=self.platform_resource_key(), + secondary_keys=[self.datahub_urn], + value=self.platform_resource_info(), + ) + + +class BigQueryPlatformResourceHelper: + def __init__( + self, + bq_project: Optional[str], + graph: Optional[DataHubGraph], + ): + self.bq_project = bq_project + self.graph = graph + + platform_resource_cache: cachetools.LRUCache = cachetools.LRUCache(maxsize=500) + + def get_platform_resource( + self, platform_resource_key: PlatformResourceKey + ) -> Optional[PlatformResource]: + # if graph is not available we always create a new PlatformResource + if not self.graph: + return None + if self.platform_resource_cache.get(platform_resource_key.primary_key): + return self.platform_resource_cache.get(platform_resource_key.primary_key) + + platform_resource = PlatformResource.from_datahub( + key=platform_resource_key, graph_client=self.graph + ) + if platform_resource: + self.platform_resource_cache[ + platform_resource_key.primary_key + ] = platform_resource + return platform_resource + return None + + def generate_label_platform_resource( + self, + bigquery_label: BigQueryLabel, + tag_urn: TagUrn, + managed_by_datahub: bool = True, + ) -> PlatformResource: + new_platform_resource = BigQueryLabelPlatformResource( + datahub_urn=tag_urn.urn(), + project=self.bq_project, + managed_by_datahub=managed_by_datahub, + label=bigquery_label, + ) + + platform_resource = self.get_platform_resource( + new_platform_resource.platform_resource_key() + ) + if platform_resource: + if ( + platform_resource.resource_info + and platform_resource.resource_info.value + ): + try: + existing_info: Optional[BigQueryLabelInfo] = platform_resource.resource_info.value.as_pydantic_object(BigQueryLabelInfo) # type: ignore + except ValidationError as e: + logger.error( + f"Error converting existing value to BigQueryLabelInfo: {e}. Creating new one. Maybe this is because of a non backward compatible schema change." + ) + existing_info = None + + if existing_info: + if ( + new_platform_resource.platform_resource_info() == existing_info + or existing_info.managed_by_datahub + ): + return platform_resource + else: + raise ValueError( + f"Datahub URN mismatch for platform resources. Old (existing) platform resource: {platform_resource} and new platform resource: {new_platform_resource}" + ) + + logger.info(f"Created platform resource {new_platform_resource}") + + self.platform_resource_cache.update( + { + new_platform_resource.platform_resource_key().primary_key: new_platform_resource.platform_resource() + } + ) + + return new_platform_resource.platform_resource() diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py index 11d06771d4e4f4..1235f638f68ff7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py @@ -6,6 +6,7 @@ from google.cloud.bigquery.table import TableListItem +from datahub.api.entities.platformresource.platform_resource import PlatformResource from datahub.configuration.pattern_utils import is_schema_allowed, is_tag_allowed from datahub.emitter.mce_builder import make_tag_urn from datahub.emitter.mcp import MetadataChangeProposalWrapper @@ -16,6 +17,7 @@ ClassificationHandler, classification_workunit_processor, ) +from datahub.ingestion.graph.client import DataHubGraph from datahub.ingestion.source.bigquery_v2.bigquery_audit import ( BigqueryTableIdentifier, BigQueryTableRef, @@ -25,6 +27,11 @@ from datahub.ingestion.source.bigquery_v2.bigquery_helper import ( unquote_and_decode_unicode_escape_seq, ) +from datahub.ingestion.source.bigquery_v2.bigquery_platform_resource_helper import ( + BigQueryLabel, + BigQueryLabelInfo, + BigQueryPlatformResourceHelper, +) from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report from datahub.ingestion.source.bigquery_v2.bigquery_schema import ( BigqueryColumn, @@ -84,6 +91,7 @@ GlobalTagsClass, TagAssociationClass, ) +from datahub.metadata.urns import TagUrn from datahub.sql_parsing.schema_resolver import SchemaResolver from datahub.utilities.file_backed_collections import FileBackedDict from datahub.utilities.hive_schema_to_avro import ( @@ -160,6 +168,7 @@ def __init__( sql_parser_schema_resolver: SchemaResolver, profiler: BigqueryProfiler, identifiers: BigQueryIdentifierBuilder, + graph: Optional[DataHubGraph] = None, ): self.config = config self.report = report @@ -168,6 +177,7 @@ def __init__( self.sql_parser_schema_resolver = sql_parser_schema_resolver self.profiler = profiler self.identifiers = identifiers + self.graph = graph self.classification_handler = ClassificationHandler(self.config, self.report) self.data_reader: Optional[BigQueryDataReader] = None @@ -188,6 +198,21 @@ def __init__( # Maps snapshot ref -> Snapshot self.snapshots_by_ref: FileBackedDict[BigqueryTableSnapshot] = FileBackedDict() + bq_project = ( + self.config.project_on_behalf + if self.config.project_on_behalf + else self.config.credential.project_id + if self.config.credential + else None + ) + + self.platform_resource_helper: BigQueryPlatformResourceHelper = ( + BigQueryPlatformResourceHelper( + bq_project, + self.graph, + ) + ) + @property def store_table_refs(self): return ( @@ -264,13 +289,28 @@ def gen_dataset_containers( ) -> Iterable[MetadataWorkUnit]: schema_container_key = self.gen_dataset_key(project_id, dataset) - tags_joined: Optional[List[str]] = None + tags_joined: List[str] = [] if tags and self.config.capture_dataset_label_as_tag: - tags_joined = [ - self.make_tag_from_label(k, v) - for k, v in tags.items() - if is_tag_allowed(self.config.capture_dataset_label_as_tag, k) - ] + for k, v in tags.items(): + if is_tag_allowed(self.config.capture_dataset_label_as_tag, k): + tag_urn = TagUrn.from_string(self.make_tag_urn_from_label(k, v)) + label = BigQueryLabel(key=k, value=v) + try: + platform_resource: PlatformResource = self.platform_resource_helper.generate_label_platform_resource( + label, tag_urn, managed_by_datahub=False + ) + label_info: BigQueryLabelInfo = platform_resource.resource_info.value.as_pydantic_object( # type: ignore + BigQueryLabelInfo + ) + tag_urn = TagUrn.from_string(label_info.datahub_urn) + + for mcpw in platform_resource.to_mcps(): + yield mcpw.as_workunit() + except ValueError as e: + logger.warning( + f"Failed to generate platform resource for label {k}:{v}: {e}" + ) + tags_joined.append(tag_urn.urn()) database_container_key = self.gen_project_id_key(database=project_id) @@ -676,10 +716,11 @@ def _process_snapshot( dataset_name=dataset_name, ) - def make_tag_from_label(self, key: str, value: str) -> str: - if not value.startswith(ENCODED_TAG_PREFIX): + def make_tag_urn_from_label(self, key: str, value: str) -> str: + if value: return make_tag_urn(f"""{key}:{value}""") - return self.modified_base32decode(value) + else: + return make_tag_urn(key) def gen_table_dataset_workunits( self, @@ -724,13 +765,26 @@ def gen_table_dataset_workunits( tags_to_add = None if table.labels and self.config.capture_table_label_as_tag: tags_to_add = [] - tags_to_add.extend( - [ - self.make_tag_from_label(k, v) - for k, v in table.labels.items() - if is_tag_allowed(self.config.capture_table_label_as_tag, k) - ] - ) + for k, v in table.labels.items(): + if is_tag_allowed(self.config.capture_table_label_as_tag, k): + tag_urn = TagUrn.from_string(self.make_tag_urn_from_label(k, v)) + try: + label = BigQueryLabel(key=k, value=v) + platform_resource: PlatformResource = self.platform_resource_helper.generate_label_platform_resource( + label, tag_urn, managed_by_datahub=False + ) + label_info: BigQueryLabelInfo = platform_resource.resource_info.value.as_pydantic_object( # type: ignore + BigQueryLabelInfo + ) + tag_urn = TagUrn.from_string(label_info.datahub_urn) + + for mcpw in platform_resource.to_mcps(): + yield mcpw.as_workunit() + except ValueError as e: + logger.warning( + f"Failed to generate platform resource for label {k}:{v}: {e}" + ) + tags_to_add.append(tag_urn.urn()) yield from self.gen_dataset_workunits( table=table, @@ -749,13 +803,29 @@ def gen_view_dataset_workunits( project_id: str, dataset_name: str, ) -> Iterable[MetadataWorkUnit]: - tags_to_add = None + tags_to_add = [] if table.labels and self.config.capture_view_label_as_tag: - tags_to_add = [ - self.make_tag_from_label(k, v) - for k, v in table.labels.items() - if is_tag_allowed(self.config.capture_view_label_as_tag, k) - ] + for k, v in table.labels.items(): + if is_tag_allowed(self.config.capture_view_label_as_tag, k): + tag_urn = TagUrn.from_string(self.make_tag_urn_from_label(k, v)) + try: + label = BigQueryLabel(key=k, value=v) + platform_resource: PlatformResource = self.platform_resource_helper.generate_label_platform_resource( + label, tag_urn, managed_by_datahub=False + ) + label_info: BigQueryLabelInfo = platform_resource.resource_info.value.as_pydantic_object( # type: ignore + BigQueryLabelInfo + ) + tag_urn = TagUrn.from_string(label_info.datahub_urn) + + for mcpw in platform_resource.to_mcps(): + yield mcpw.as_workunit() + except ValueError as e: + logger.warning( + f"Failed to generate platform resource for label {k}:{v}: {e}" + ) + + tags_to_add.append(tag_urn.urn()) yield from self.gen_dataset_workunits( table=table, columns=columns, diff --git a/metadata-ingestion/tests/integration/bigquery_v2/bigquery_mcp_golden.json b/metadata-ingestion/tests/integration/bigquery_v2/bigquery_mcp_golden.json index 02660f0fae08ed..b268926f155b74 100644 --- a/metadata-ingestion/tests/integration/bigquery_v2/bigquery_mcp_golden.json +++ b/metadata-ingestion/tests/integration/bigquery_v2/bigquery_mcp_golden.json @@ -199,6 +199,49 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "platformResource", + "entityUrn": "urn:li:platformResource:79d443a7956814fdab2168e11392bbf2", + "changeType": "UPSERT", + "aspectName": "platformResourceInfo", + "aspect": { + "json": { + "resourceType": "BigQueryLabelInfo", + "primaryKey": "priority/high", + "secondaryKeys": [ + "urn:li:tag:priority:high" + ], + "value": { + "blob": "{\"datahub_urn\": \"urn:li:tag:priority:high\", \"managed_by_datahub\": false, \"key\": \"priority\", \"value\": \"high\"}", + "contentType": "JSON", + "schemaType": "JSON", + "schemaRef": "BigQueryLabelInfo" + } + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "platformResource", + "entityUrn": "urn:li:platformResource:79d443a7956814fdab2168e11392bbf2", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:bigquery", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:bigquery,project-id-1)" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)", @@ -215,6 +258,49 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "platformResource", + "entityUrn": "urn:li:platformResource:0a8c87e84bd90486c4fd57bbae6557e3", + "changeType": "UPSERT", + "aspectName": "platformResourceInfo", + "aspect": { + "json": { + "resourceType": "BigQueryLabelInfo", + "primaryKey": "purchase", + "secondaryKeys": [ + "urn:li:tag:purchase" + ], + "value": { + "blob": "{\"datahub_urn\": \"urn:li:tag:purchase\", \"managed_by_datahub\": false, \"key\": \"purchase\", \"value\": \"\"}", + "contentType": "JSON", + "schemaType": "JSON", + "schemaRef": "BigQueryLabelInfo" + } + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "platformResource", + "entityUrn": "urn:li:platformResource:0a8c87e84bd90486c4fd57bbae6557e3", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:bigquery", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:bigquery,project-id-1)" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)", @@ -309,6 +395,38 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "platformResource", + "entityUrn": "urn:li:platformResource:79d443a7956814fdab2168e11392bbf2", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "platformResource", + "entityUrn": "urn:li:platformResource:0a8c87e84bd90486c4fd57bbae6557e3", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)", @@ -330,6 +448,45 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "platformResource", + "entityUrn": "urn:li:platformResource:7da6409504c5c6444b4ce60b0239b759", + "changeType": "UPSERT", + "aspectName": "platformResourceInfo", + "aspect": { + "json": { + "resourceType": "BigQueryLabelInfo", + "primaryKey": "mixedcasetag", + "value": { + "blob": "{\"datahub_urn\": \"urn:li:tag:MixedCaseTag\", \"managed_by_datahub\": true, \"key\": \"mixedcasetag\", \"value\": \"\"}", + "contentType": "JSON", + "schemaType": "JSON", + "schemaRef": "BigQueryLabelInfo" + } + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "platformResource", + "entityUrn": "urn:li:platformResource:7da6409504c5c6444b4ce60b0239b759", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:bigquery" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)", @@ -343,6 +500,9 @@ }, { "tag": "urn:li:tag:purchase" + }, + { + "tag": "urn:li:tag:MixedCaseTag" } ] } @@ -353,6 +513,22 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "platformResource", + "entityUrn": "urn:li:platformResource:7da6409504c5c6444b4ce60b0239b759", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)", @@ -1082,5 +1258,21 @@ "runId": "bigquery-2022_02_03-07_00_00", "lastRunId": "no-run-id-provided" } +}, +{ + "entityType": "tag", + "entityUrn": "urn:li:tag:MixedCaseTag", + "changeType": "UPSERT", + "aspectName": "tagKey", + "aspect": { + "json": { + "name": "MixedCaseTag" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py index 0ac4e94a5a24f3..39cefcb42f360b 100644 --- a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py +++ b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py @@ -1,12 +1,16 @@ import random import string from datetime import datetime, timezone -from typing import Any, Dict +from typing import Any, Dict, Optional from unittest.mock import MagicMock, patch from freezegun import freeze_time from google.cloud.bigquery.table import TableListItem +from datahub.api.entities.platformresource.platform_resource import ( + PlatformResource, + PlatformResourceKey, +) from datahub.ingestion.glossary.classifier import ( ClassificationConfig, DynamicTypedClassifierConfig, @@ -14,6 +18,10 @@ from datahub.ingestion.glossary.datahub_classifier import DataHubClassifierConfig from datahub.ingestion.source.bigquery_v2.bigquery_audit import BigqueryTableIdentifier from datahub.ingestion.source.bigquery_v2.bigquery_data_reader import BigQueryDataReader +from datahub.ingestion.source.bigquery_v2.bigquery_platform_resource_helper import ( + BigQueryLabelInfo, + BigQueryPlatformResourceHelper, +) from datahub.ingestion.source.bigquery_v2.bigquery_schema import ( BigqueryColumn, BigqueryDataset, @@ -51,6 +59,13 @@ def recipe(mcp_output_path: str, source_config_override: dict = {}) -> dict: "type": "bigquery", "config": { "project_ids": ["project-id-1"], + "credential": { + "project_id": "project-id-1", + "private_key_id": "private_key_id", + "private_key": "private_key", + "client_email": "client_email", + "client_id": "client_id", + }, "include_usage_statistics": False, "include_table_lineage": True, "include_data_platform_instance": True, @@ -82,6 +97,7 @@ def recipe(mcp_output_path: str, source_config_override: dict = {}) -> dict: @patch.object(BigQuerySchemaApi, "get_datasets_for_project_id") @patch.object(BigQuerySchemaApi, "get_columns_for_dataset") @patch.object(BigQueryDataReader, "get_sample_data_for_table") +@patch.object(BigQueryPlatformResourceHelper, "get_platform_resource") @patch("google.cloud.bigquery.Client") @patch("google.cloud.datacatalog_v1.PolicyTagManagerClient") @patch("google.cloud.resourcemanager_v3.ProjectsClient") @@ -89,6 +105,7 @@ def test_bigquery_v2_ingest( client, policy_tag_manager_client, projects_client, + get_platform_resource, get_sample_data_for_table, get_columns_for_dataset, get_datasets_for_project_id, @@ -104,6 +121,25 @@ def test_bigquery_v2_ingest( mcp_output_path = "{}/{}".format(tmp_path, "bigquery_mcp_output.json") dataset_name = "bigquery-dataset-1" + + def side_effect(*args: Any) -> Optional[PlatformResource]: + if args[0].primary_key == "mixedcasetag": + return PlatformResource.create( + key=PlatformResourceKey( + primary_key="mixedcasetag", + resource_type="BigQueryLabelInfo", + platform="bigquery", + ), + value=BigQueryLabelInfo( + datahub_urn="urn:li:tag:MixedCaseTag", + managed_by_datahub=True, + key="mixedcasetag", + value="", + ), + ) + return None + + get_platform_resource.side_effect = side_effect get_datasets_for_project_id.return_value = [ BigqueryDataset(name=dataset_name, location="US") ] @@ -158,7 +194,8 @@ def test_bigquery_v2_ingest( rows_count=None, labels={ "priority": "high", - "purchase": "urn_li_encoded_tag_ovzg4otmne5hiylhhjyhk4tdnbqxgzi_", + "purchase": "", + "mixedcasetag": "", }, ) get_tables_for_dataset.return_value = iter([bigquery_table]) diff --git a/metadata-ingestion/tests/unit/test_bigquery_source.py b/metadata-ingestion/tests/unit/test_bigquery_source.py index 38239d150dd6b5..b605e9b3f8a3e6 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_source.py +++ b/metadata-ingestion/tests/unit/test_bigquery_source.py @@ -489,30 +489,45 @@ def test_gen_table_dataset_workunits( gen = schema_gen.gen_table_dataset_workunits( bigquery_table, [], project_id, dataset_name ) - mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata) - assert mcp.aspect == StatusClass(removed=False) + mcps = list(gen) + + # Helper function to find MCP by aspect type + def find_mcp_by_aspect(aspect_type): + return next( + mcp # type: ignore + for mcp in mcps + if isinstance(mcp.metadata.aspect, aspect_type) # type: ignore + ) - mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata) - assert isinstance(mcp.aspect, SchemaMetadataClass) - assert mcp.aspect.schemaName == f"{project_id}.{dataset_name}.{bigquery_table.name}" - assert mcp.aspect.fields == [] + # Assert StatusClass + status_mcp = find_mcp_by_aspect(StatusClass) + assert status_mcp.metadata.aspect.removed is False - mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata) - assert isinstance(mcp.aspect, DatasetPropertiesClass) - assert mcp.aspect.name == bigquery_table.name + # Assert SchemaMetadataClass + schema_mcp = find_mcp_by_aspect(SchemaMetadataClass) + assert ( + schema_mcp.metadata.aspect.schemaName + == f"{project_id}.{dataset_name}.{bigquery_table.name}" + ) + assert schema_mcp.metadata.aspect.fields == [] + + # Assert DatasetPropertiesClass + dataset_props_mcp = find_mcp_by_aspect(DatasetPropertiesClass) + assert dataset_props_mcp.metadata.aspect.name == bigquery_table.name assert ( - mcp.aspect.qualifiedName == f"{project_id}.{dataset_name}.{bigquery_table.name}" + dataset_props_mcp.metadata.aspect.qualifiedName + == f"{project_id}.{dataset_name}.{bigquery_table.name}" ) - assert mcp.aspect.description == bigquery_table.comment - assert mcp.aspect.created == TimeStampClass( + assert dataset_props_mcp.metadata.aspect.description == bigquery_table.comment + assert dataset_props_mcp.metadata.aspect.created == TimeStampClass( time=int(bigquery_table.created.timestamp() * 1000) ) - assert mcp.aspect.lastModified == TimeStampClass( + assert dataset_props_mcp.metadata.aspect.lastModified == TimeStampClass( time=int(bigquery_table.last_altered.timestamp() * 1000) ) - assert mcp.aspect.tags == [] + assert dataset_props_mcp.metadata.aspect.tags == [] - assert mcp.aspect.customProperties == { + expected_custom_properties = { "expiration_date": str(bigquery_table.expires), "size_in_bytes": str(bigquery_table.size_in_bytes), "billable_bytes_active": str(bigquery_table.active_billable_bytes), @@ -523,24 +538,33 @@ def test_gen_table_dataset_workunits( "max_shard_id": str(bigquery_table.max_shard_id), "is_sharded": "True", } + assert ( + dataset_props_mcp.metadata.aspect.customProperties == expected_custom_properties + ) - mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata) - assert isinstance(mcp.aspect, GlobalTagsClass) - assert mcp.aspect.tags == [ + # Assert GlobalTagsClass + global_tags_mcp = find_mcp_by_aspect(GlobalTagsClass) + assert global_tags_mcp.metadata.aspect.tags == [ TagAssociationClass( "urn:li:tag:data_producer_owner_email:games_team-nytimes_com" ) ] - mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata) - assert isinstance(mcp.aspect, ContainerClass) + # Assert ContainerClass + container_mcp = find_mcp_by_aspect(ContainerClass) + assert container_mcp is not None - mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata) - assert isinstance(mcp.aspect, DataPlatformInstanceClass) + # Assert DataPlatformInstanceClass + data_platform_instance_mcp = find_mcp_by_aspect(DataPlatformInstanceClass) + assert data_platform_instance_mcp is not None - mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata) - assert isinstance(mcp.aspect, SubTypesClass) - assert mcp.aspect.typeNames[1] == DatasetSubTypes.TABLE + # Assert SubTypesClass + sub_types_mcp = find_mcp_by_aspect(SubTypesClass) + assert sub_types_mcp.metadata.aspect.typeNames[1] == DatasetSubTypes.TABLE + + # Ensure all MCPs were checked + # TODO: Test for PlatformResource MCPs as well + assert len(mcps) >= 7 @patch.object(BigQueryV2Config, "get_bigquery_client")