From 4e2cec86b3d21a2299d34652a335c73c073b2919 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Wed, 17 Apr 2024 20:09:30 -0700 Subject: [PATCH 1/5] feat(ingest/sigma): fix stateful ingestion (#10321) --- .../src/datahub/ingestion/source/sigma/config.py | 3 +++ metadata-ingestion/src/datahub/ingestion/source/sigma/sigma.py | 2 +- metadata-ingestion/src/datahub/ingestion/source/superset.py | 1 - 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sigma/config.py b/metadata-ingestion/src/datahub/ingestion/source/sigma/config.py index cacb96b4bcca00..06c381add05b9c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sigma/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sigma/config.py @@ -11,6 +11,7 @@ ) from datahub.ingestion.source.state.stale_entity_removal_handler import ( StaleEntityRemovalSourceReport, + StatefulStaleMetadataRemovalConfig, ) from datahub.ingestion.source.state.stateful_ingestion_base import ( StatefulIngestionConfigBase, @@ -84,3 +85,5 @@ class SigmaSourceConfig( default={}, description="A mapping of the sigma workspace/workbook/chart folder path to all chart's data sources platform details present inside that folder path.", ) + + stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None diff --git a/metadata-ingestion/src/datahub/ingestion/source/sigma/sigma.py b/metadata-ingestion/src/datahub/ingestion/source/sigma/sigma.py index 263232896e153c..46e92e8533a21e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sigma/sigma.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sigma/sigma.py @@ -105,7 +105,7 @@ class SigmaSource(StatefulIngestionSourceBase, TestableSource): platform: str = "sigma" def __init__(self, config: SigmaSourceConfig, ctx: PipelineContext): - super(SigmaSource, self).__init__(config, ctx) + super().__init__(config, ctx) self.config = config self.reporter = SigmaSourceReport() self.dataset_upstream_urn_mapping: Dict[str, List[str]] = {} diff --git a/metadata-ingestion/src/datahub/ingestion/source/superset.py b/metadata-ingestion/src/datahub/ingestion/source/superset.py index 1fbce27d0af240..9559b8e84b85da 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/superset.py +++ b/metadata-ingestion/src/datahub/ingestion/source/superset.py @@ -170,7 +170,6 @@ class SupersetSource(StatefulIngestionSourceBase): config: SupersetConfig report: StaleEntityRemovalSourceReport platform = "superset" - stale_entity_removal_handler: StaleEntityRemovalHandler def __hash__(self): return id(self) From 77f1a0c60e8589413c11c780d005b0e60de87232 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Wed, 17 Apr 2024 23:40:11 -0700 Subject: [PATCH 2/5] fix(ingest/profiling): compute sample row count correctly (#10319) --- .../ingestion/source/ge_data_profiler.py | 34 +++++++++++++------ .../source/sql/sql_generic_profiler.py | 2 +- 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py index d31b65f0034268..c04f2f8e5a9312 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py @@ -689,9 +689,28 @@ def generate_dataset_profile( # noqa: C901 (complexity) logger.debug(f"profiling {self.dataset_name}: flushing stage 1 queries") self.query_combiner.flush() + assert profile.rowCount is not None + full_row_count = profile.rowCount + if self.config.use_sampling and not self.config.limit: self.update_dataset_batch_use_sampling(profile) + # Note that this row count may be different from the full_row_count if we are using sampling. + row_count: int = profile.rowCount + if profile.partitionSpec and "SAMPLE" in profile.partitionSpec.partition: + # Querying exact row count of sample using `_get_dataset_rows`. + # We are not using `self.config.sample_size` directly as the actual row count + # in the sample may be different than configured `sample_size`. For BigQuery, + # we've even seen 160k rows returned for a sample size of 10k. + logger.debug("Recomputing row count for the sample") + + # Note that we can't just call `self._get_dataset_rows(profile)` here because + # there's some sort of caching happening that will return the full table row count + # instead of the sample row count. + row_count = self.dataset.get_row_count(str(self.dataset._table)) + + profile.partitionSpec.partition += f" (sample rows {row_count})" + columns_profiling_queue: List[_SingleColumnSpec] = [] if columns_to_profile: for column in all_columns: @@ -708,16 +727,6 @@ def generate_dataset_profile( # noqa: C901 (complexity) logger.debug(f"profiling {self.dataset_name}: flushing stage 2 queries") self.query_combiner.flush() - assert profile.rowCount is not None - row_count: int # used for null counts calculation - if profile.partitionSpec and "SAMPLE" in profile.partitionSpec.partition: - # Querying exact row count of sample using `_get_dataset_rows`. - # We are not using `self.config.sample_size` directly as actual row count - # in sample may be slightly different (more or less) than configured `sample_size`. - self._get_dataset_rows(profile) - - row_count = profile.rowCount - for column_spec in columns_profiling_queue: column = column_spec.column column_profile = column_spec.column_profile @@ -825,6 +834,10 @@ def generate_dataset_profile( # noqa: C901 (complexity) logger.debug(f"profiling {self.dataset_name}: flushing stage 3 queries") self.query_combiner.flush() + + # Reset the row count to the original value. + profile.rowCount = full_row_count + return profile def init_profile(self): @@ -1274,6 +1287,7 @@ def create_bigquery_temp_table( try: cursor: "BigQueryCursor" = cast("BigQueryCursor", raw_connection.cursor()) try: + logger.debug(f"Creating temporary table for {table_pretty_name}: {bq_sql}") cursor.execute(bq_sql) except Exception as e: if not instance.config.catch_exceptions: diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py index 4708836d3d259a..365539df7a83be 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py @@ -159,7 +159,7 @@ def get_profile_request( rows_count=table.rows_count, ): logger.debug( - f"Dataset {dataset_name} was not eliagable for profiling due to last_altered, size in bytes or count of rows limit" + f"Dataset {dataset_name} was not eligible for profiling due to last_altered, size in bytes or count of rows limit" ) # Profile only table level if dataset is filtered from profiling # due to size limits alone From a041a2ee52c3f6d8ce1f5ee6ad674850372e1f17 Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Thu, 18 Apr 2024 03:41:18 -0400 Subject: [PATCH 3/5] fix(ingest/transformers): Use set to store tags in AddDatasetTags (#10317) --- .../ingestion/transformer/add_dataset_tags.py | 29 +++++++------------ 1 file changed, 10 insertions(+), 19 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py index 7508b33c6bfc67..ef6ef43fa2d7f3 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py @@ -1,7 +1,6 @@ import logging -from typing import Callable, List, Optional, Union, cast +from typing import Callable, Dict, List, Optional, Union, cast -import datahub.emitter.mce_builder as builder from datahub.configuration.common import ( KeyValuePattern, TransformerSemanticsConfigModel, @@ -15,7 +14,6 @@ GlobalTagsClass, MetadataChangeProposalClass, TagAssociationClass, - TagKeyClass, ) from datahub.utilities.urns.tag_urn import TagUrn @@ -33,13 +31,13 @@ class AddDatasetTags(DatasetTagsTransformer): ctx: PipelineContext config: AddDatasetTagsConfig - processed_tags: List[TagAssociationClass] + processed_tags: Dict[str, TagAssociationClass] def __init__(self, config: AddDatasetTagsConfig, ctx: PipelineContext): super().__init__() self.ctx = ctx self.config = config - self.processed_tags = [] + self.processed_tags = {} @classmethod def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddDatasetTags": @@ -58,9 +56,9 @@ def transform_aspect( tags_to_add = self.config.get_tags_to_add(entity_urn) if tags_to_add is not None: out_global_tags_aspect.tags.extend(tags_to_add) - self.processed_tags.extend( - tags_to_add - ) # Keep track of tags added so that we can create them in handle_end_of_stream + # Keep track of tags added so that we can create them in handle_end_of_stream + for tag in tags_to_add: + self.processed_tags.setdefault(tag.tag, tag) return self.get_result_semantics( self.config, self.ctx.graph, entity_urn, out_global_tags_aspect @@ -76,19 +74,12 @@ def handle_end_of_stream( logger.debug("Generating tags") - for tag_association in self.processed_tags: - ids: List[str] = TagUrn.create_from_string( - tag_association.tag - ).get_entity_id() - - assert len(ids) == 1, "Invalid Tag Urn" - - tag_name: str = ids[0] - + for tag_association in self.processed_tags.values(): + tag_urn = TagUrn.create_from_string(tag_association.tag) mcps.append( MetadataChangeProposalWrapper( - entityUrn=builder.make_tag_urn(tag=tag_name), - aspect=TagKeyClass(name=tag_name), + entityUrn=tag_urn.urn(), + aspect=tag_urn.to_key_aspect(), ) ) From 91e3dc829ed64d6ca8dbbcbe691df6390999c766 Mon Sep 17 00:00:00 2001 From: ksrinath Date: Thu, 18 Apr 2024 13:50:37 +0530 Subject: [PATCH 4/5] feat(views): apply views to homepage entity counts & recommendations (#10283) Co-authored-by: gaurav2733 --- .../datahub/graphql/GmsGraphQLEngine.java | 7 ++- .../graphql/resolvers/ResolverUtils.java | 16 +++++ .../resolvers/group/EntityCountsResolver.java | 9 ++- .../ListRecommendationsResolver.java | 3 + .../src/main/resources/entity.graphql | 5 ++ .../src/main/resources/recommendation.graphql | 5 ++ .../src/app/home/HomePageRecommendations.tsx | 6 ++ .../metadata/client/JavaEntityClient.java | 6 +- .../candidatesource/MostPopularSource.java | 6 +- .../candidatesource/RecentlyEditedSource.java | 6 +- .../candidatesource/RecentlyViewedSource.java | 6 +- .../metadata/search/SearchService.java | 9 ++- .../search/cache/EntityDocCountCache.java | 34 +++++++++-- .../elasticsearch/ElasticSearchService.java | 6 +- .../elasticsearch/query/ESSearchDAO.java | 7 ++- .../RecommendationsServiceTest.java | 4 ++ ...ySearchAggregationCandidateSourceTest.java | 59 +++++++++++++------ .../candidatesource/TestSource.java | 6 +- .../linkedin/entity/client/EntityClient.java | 10 +++- .../entity/client/RestliEntityClient.java | 8 ++- .../RecommendationsService.java | 5 +- .../EntitySearchAggregationSource.java | 11 +++- .../RecentlySearchedSource.java | 6 +- .../candidatesource/RecommendationSource.java | 25 +++++++- .../metadata/search/EntitySearchService.java | 8 ++- 25 files changed, 224 insertions(+), 49 deletions(-) diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngine.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngine.java index 5466711d62f65d..e4418eade7a4c5 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngine.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngine.java @@ -55,6 +55,7 @@ import com.linkedin.datahub.graphql.generated.Dataset; import com.linkedin.datahub.graphql.generated.DatasetStatsSummary; import com.linkedin.datahub.graphql.generated.Domain; +import com.linkedin.datahub.graphql.generated.ERModelRelationship; import com.linkedin.datahub.graphql.generated.ERModelRelationshipProperties; import com.linkedin.datahub.graphql.generated.EntityPath; import com.linkedin.datahub.graphql.generated.EntityRelationship; @@ -987,8 +988,10 @@ private void configureQueryResolvers(final RuntimeWiring.Builder builder) { .dataFetcher("listUsers", new ListUsersResolver(this.entityClient)) .dataFetcher("listGroups", new ListGroupsResolver(this.entityClient)) .dataFetcher( - "listRecommendations", new ListRecommendationsResolver(recommendationsService)) - .dataFetcher("getEntityCounts", new EntityCountsResolver(this.entityClient)) + "listRecommendations", + new ListRecommendationsResolver(recommendationsService, viewService)) + .dataFetcher( + "getEntityCounts", new EntityCountsResolver(this.entityClient, viewService)) .dataFetcher("getAccessToken", new GetAccessTokenResolver(statefulTokenService)) .dataFetcher("listAccessTokens", new ListAccessTokensResolver(this.entityClient)) .dataFetcher( diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ResolverUtils.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ResolverUtils.java index 244012d320b43f..74d9e7f8a8c572 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ResolverUtils.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ResolverUtils.java @@ -1,5 +1,6 @@ package com.linkedin.datahub.graphql.resolvers; +import static com.linkedin.datahub.graphql.resolvers.search.SearchUtils.*; import static com.linkedin.metadata.Constants.*; import com.datahub.authentication.Authentication; @@ -7,11 +8,13 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableSet; import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; import com.linkedin.data.template.StringArray; import com.linkedin.datahub.graphql.QueryContext; import com.linkedin.datahub.graphql.exception.ValidationException; import com.linkedin.datahub.graphql.generated.AndFilterInput; import com.linkedin.datahub.graphql.generated.FacetFilterInput; +import com.linkedin.datahub.graphql.resolvers.search.SearchUtils; import com.linkedin.metadata.query.filter.Condition; import com.linkedin.metadata.query.filter.ConjunctiveCriterion; import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray; @@ -20,7 +23,10 @@ import com.linkedin.metadata.query.filter.Filter; import com.linkedin.metadata.search.utils.ESUtils; import com.linkedin.metadata.search.utils.QueryUtils; +import com.linkedin.metadata.service.ViewService; +import com.linkedin.view.DataHubViewInfo; import graphql.schema.DataFetchingEnvironment; +import io.datahubproject.metadata.context.OperationContext; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -226,4 +232,14 @@ public static Filter buildFilterWithUrns(@Nonnull Set urns, @Nullable Filte } return QueryUtils.newFilter(urnMatchCriterion); } + + public static Filter viewFilter( + OperationContext opContext, ViewService viewService, String viewUrn) { + if (viewUrn == null) { + return null; + } + DataHubViewInfo viewInfo = resolveView(opContext, viewService, UrnUtils.getUrn(viewUrn)); + Filter result = SearchUtils.combineFilters(null, viewInfo.getDefinition().getFilter()); + return result; + } } diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/group/EntityCountsResolver.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/group/EntityCountsResolver.java index 8abe2378982930..06a672f464f70a 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/group/EntityCountsResolver.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/group/EntityCountsResolver.java @@ -8,6 +8,7 @@ import com.linkedin.datahub.graphql.generated.EntityCountResults; import com.linkedin.datahub.graphql.types.entitytype.EntityTypeMapper; import com.linkedin.entity.client.EntityClient; +import com.linkedin.metadata.service.ViewService; import graphql.schema.DataFetcher; import graphql.schema.DataFetchingEnvironment; import io.opentelemetry.extension.annotations.WithSpan; @@ -20,8 +21,11 @@ public class EntityCountsResolver implements DataFetcher get(final DataFetchingEnvironment e context.getOperationContext(), input.getTypes().stream() .map(EntityTypeMapper::getName) - .collect(Collectors.toList())); + .collect(Collectors.toList()), + viewFilter(context.getOperationContext(), _viewService, input.getViewUrn())); // bind to a result. List resultList = diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/recommendation/ListRecommendationsResolver.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/recommendation/ListRecommendationsResolver.java index c5c75d1e5c2c77..c13a6ce732b91a 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/recommendation/ListRecommendationsResolver.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/recommendation/ListRecommendationsResolver.java @@ -22,6 +22,7 @@ import com.linkedin.metadata.recommendation.EntityRequestContext; import com.linkedin.metadata.recommendation.RecommendationsService; import com.linkedin.metadata.recommendation.SearchRequestContext; +import com.linkedin.metadata.service.ViewService; import graphql.schema.DataFetcher; import graphql.schema.DataFetchingEnvironment; import io.opentelemetry.extension.annotations.WithSpan; @@ -44,6 +45,7 @@ public class ListRecommendationsResolver new ListRecommendationsResult(Collections.emptyList()); private final RecommendationsService _recommendationsService; + private final ViewService _viewService; @WithSpan @Override @@ -60,6 +62,7 @@ public CompletableFuture get(DataFetchingEnvironment _recommendationsService.listRecommendations( context.getOperationContext(), mapRequestContext(input.getRequestContext()), + viewFilter(context.getOperationContext(), _viewService, input.getViewUrn()), input.getLimit()); return ListRecommendationsResult.builder() .setModules( diff --git a/datahub-graphql-core/src/main/resources/entity.graphql b/datahub-graphql-core/src/main/resources/entity.graphql index c4c82aa96a6000..296d62bc534a3a 100644 --- a/datahub-graphql-core/src/main/resources/entity.graphql +++ b/datahub-graphql-core/src/main/resources/entity.graphql @@ -1194,6 +1194,11 @@ Input for the get entity counts endpoint """ input EntityCountInput { types: [EntityType!] + + """ + Optional - A View to apply when generating results + """ + viewUrn: String } """ diff --git a/datahub-graphql-core/src/main/resources/recommendation.graphql b/datahub-graphql-core/src/main/resources/recommendation.graphql index 439b22142b0cb8..d329d71fbef694 100644 --- a/datahub-graphql-core/src/main/resources/recommendation.graphql +++ b/datahub-graphql-core/src/main/resources/recommendation.graphql @@ -23,6 +23,11 @@ input ListRecommendationsInput { Max number of modules to return """ limit: Int + + """ + Optional - A View to apply when generating results + """ + viewUrn: String } """ diff --git a/datahub-web-react/src/app/home/HomePageRecommendations.tsx b/datahub-web-react/src/app/home/HomePageRecommendations.tsx index 6574b70b20de6a..fa8da01e8079bc 100644 --- a/datahub-web-react/src/app/home/HomePageRecommendations.tsx +++ b/datahub-web-react/src/app/home/HomePageRecommendations.tsx @@ -22,6 +22,7 @@ import { } from '../onboarding/config/HomePageOnboardingConfig'; import { useToggleEducationStepIdsAllowList } from '../onboarding/useToggleEducationStepIdsAllowList'; import { useBusinessAttributesFlag } from '../useAppConfig'; +import { useUserContext } from '../context/useUserContext'; const PLATFORMS_MODULE_ID = 'Platforms'; const MOST_POPULAR_MODULE_ID = 'HighUsageEntities'; @@ -105,6 +106,9 @@ export const HomePageRecommendations = ({ user }: Props) => { const browseEntityList = entityRegistry.getBrowseEntityTypes(); const userUrn = user?.urn; + const userContext = useUserContext(); + const viewUrn = userContext.localState?.selectedViewUrn; + const businessAttributesFlag = useBusinessAttributesFlag(); const showSimplifiedHomepage = user?.settings?.appearance?.showSimplifiedHomepage; @@ -113,6 +117,7 @@ export const HomePageRecommendations = ({ user }: Props) => { variables: { input: { types: browseEntityList, + viewUrn }, }, }); @@ -133,6 +138,7 @@ export const HomePageRecommendations = ({ user }: Props) => { scenario, }, limit: 10, + viewUrn }, }, fetchPolicy: 'no-cache', diff --git a/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java b/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java index 68ce0c538c04b5..5006788fa9d76c 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java @@ -571,9 +571,11 @@ public void setWritable(@Nonnull OperationContext opContext, boolean canWrite) @Override @Nonnull public Map batchGetTotalEntityCount( - @Nonnull OperationContext opContext, @Nonnull List entityNames) + @Nonnull OperationContext opContext, + @Nonnull List entityNames, + @Nullable Filter filter) throws RemoteInvocationException { - return searchService.docCountPerEntity(opContext, entityNames); + return searchService.docCountPerEntity(opContext, entityNames, filter); } /** List all urns existing for a particular Entity type. */ diff --git a/metadata-io/src/main/java/com/linkedin/metadata/recommendation/candidatesource/MostPopularSource.java b/metadata-io/src/main/java/com/linkedin/metadata/recommendation/candidatesource/MostPopularSource.java index 8274ca87d2ec7f..399b0aa6e49a64 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/recommendation/candidatesource/MostPopularSource.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/recommendation/candidatesource/MostPopularSource.java @@ -8,6 +8,7 @@ import com.linkedin.metadata.datahubusage.DataHubUsageEventConstants; import com.linkedin.metadata.datahubusage.DataHubUsageEventType; import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.query.filter.Filter; import com.linkedin.metadata.recommendation.RecommendationContent; import com.linkedin.metadata.recommendation.RecommendationRenderType; import com.linkedin.metadata.recommendation.RecommendationRequestContext; @@ -23,6 +24,7 @@ import java.util.Set; import java.util.stream.Collectors; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.opensearch.action.search.SearchRequest; @@ -99,7 +101,9 @@ public boolean isEligible( @Override @WithSpan public List getRecommendations( - @Nonnull OperationContext opContext, @Nonnull RecommendationRequestContext requestContext) { + @Nonnull OperationContext opContext, + @Nonnull RecommendationRequestContext requestContext, + @Nullable Filter filter) { SearchRequest searchRequest = buildSearchRequest(opContext); try (Timer.Context ignored = MetricUtils.timer(this.getClass(), "getMostPopular").time()) { final SearchResponse searchResponse = diff --git a/metadata-io/src/main/java/com/linkedin/metadata/recommendation/candidatesource/RecentlyEditedSource.java b/metadata-io/src/main/java/com/linkedin/metadata/recommendation/candidatesource/RecentlyEditedSource.java index 298fd2455fc2c3..d75470127ded80 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/recommendation/candidatesource/RecentlyEditedSource.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/recommendation/candidatesource/RecentlyEditedSource.java @@ -8,6 +8,7 @@ import com.linkedin.metadata.datahubusage.DataHubUsageEventConstants; import com.linkedin.metadata.datahubusage.DataHubUsageEventType; import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.query.filter.Filter; import com.linkedin.metadata.recommendation.RecommendationContent; import com.linkedin.metadata.recommendation.RecommendationRenderType; import com.linkedin.metadata.recommendation.RecommendationRequestContext; @@ -22,6 +23,7 @@ import java.util.Set; import java.util.stream.Collectors; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.opensearch.action.search.SearchRequest; @@ -98,7 +100,9 @@ public boolean isEligible( @Override @WithSpan public List getRecommendations( - @Nonnull OperationContext opContext, @Nonnull RecommendationRequestContext requestContext) { + @Nonnull OperationContext opContext, + @Nonnull RecommendationRequestContext requestContext, + @Nullable Filter filter) { SearchRequest searchRequest = buildSearchRequest(opContext.getSessionActorContext().getActorUrn()); try (Timer.Context ignored = MetricUtils.timer(this.getClass(), "getRecentlyEdited").time()) { diff --git a/metadata-io/src/main/java/com/linkedin/metadata/recommendation/candidatesource/RecentlyViewedSource.java b/metadata-io/src/main/java/com/linkedin/metadata/recommendation/candidatesource/RecentlyViewedSource.java index f916346af88fe3..e9613495e8d220 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/recommendation/candidatesource/RecentlyViewedSource.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/recommendation/candidatesource/RecentlyViewedSource.java @@ -8,6 +8,7 @@ import com.linkedin.metadata.datahubusage.DataHubUsageEventConstants; import com.linkedin.metadata.datahubusage.DataHubUsageEventType; import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.query.filter.Filter; import com.linkedin.metadata.recommendation.RecommendationContent; import com.linkedin.metadata.recommendation.RecommendationRenderType; import com.linkedin.metadata.recommendation.RecommendationRequestContext; @@ -22,6 +23,7 @@ import java.util.Set; import java.util.stream.Collectors; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.opensearch.action.search.SearchRequest; @@ -98,7 +100,9 @@ public boolean isEligible( @Override @WithSpan public List getRecommendations( - @Nonnull OperationContext opContext, @Nonnull RecommendationRequestContext requestContext) { + @Nonnull OperationContext opContext, + @Nonnull RecommendationRequestContext requestContext, + @Nullable Filter filter) { SearchRequest searchRequest = buildSearchRequest(opContext.getSessionActorContext().getActorUrn()); try (Timer.Context ignored = MetricUtils.timer(this.getClass(), "getRecentlyViewed").time()) { diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/SearchService.java b/metadata-io/src/main/java/com/linkedin/metadata/search/SearchService.java index e1a381a8f29d9f..c8525f829d2066 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/SearchService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/SearchService.java @@ -40,13 +40,20 @@ public SearchService( public Map docCountPerEntity( @Nonnull OperationContext opContext, @Nonnull List entityNames) { + return docCountPerEntity(opContext, entityNames, null); + } + + public Map docCountPerEntity( + @Nonnull OperationContext opContext, + @Nonnull List entityNames, + @Nullable Filter filter) { return getEntitiesToSearch(opContext, entityNames, 0).stream() .collect( Collectors.toMap( Function.identity(), entityName -> _entityDocCountCache - .getEntityDocCount(opContext) + .getEntityDocCount(opContext, filter) .getOrDefault(entityName.toLowerCase(), 0L))); } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/cache/EntityDocCountCache.java b/metadata-io/src/main/java/com/linkedin/metadata/search/cache/EntityDocCountCache.java index 745ef6686d3204..1efaeb2b12f45f 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/cache/EntityDocCountCache.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/cache/EntityDocCountCache.java @@ -3,6 +3,7 @@ import com.google.common.base.Suppliers; import com.linkedin.metadata.config.cache.EntityDocCountCacheConfiguration; import com.linkedin.metadata.models.registry.EntityRegistry; +import com.linkedin.metadata.query.filter.Filter; import com.linkedin.metadata.search.EntitySearchService; import com.linkedin.metadata.utils.ConcurrencyUtils; import io.datahubproject.metadata.context.OperationContext; @@ -15,12 +16,22 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; public class EntityDocCountCache { private final EntityRegistry entityRegistry; private final EntitySearchService entitySearchService; private final EntityDocCountCacheConfiguration config; - private final Map>> entityDocCounts; + private final Map>> entityDocCounts; + + @AllArgsConstructor + @EqualsAndHashCode + private static final class EntityDocCountsKey { + private final String searchContextId; + private final Filter filter; + } public EntityDocCountCache( EntityRegistry entityRegistry, @@ -32,17 +43,27 @@ public EntityDocCountCache( this.entityDocCounts = new ConcurrentHashMap<>(); } - private Map fetchEntityDocCount(@Nonnull OperationContext opContext) { + private Map fetchEntityDocCount( + @Nonnull OperationContext opContext, @Nullable Filter filter) { return ConcurrencyUtils.transformAndCollectAsync( entityRegistry.getEntitySpecs().keySet(), Function.identity(), - Collectors.toMap(Function.identity(), v -> entitySearchService.docCount(opContext, v))); + Collectors.toMap( + Function.identity(), v -> entitySearchService.docCount(opContext, v, filter))); } @WithSpan public Map getEntityDocCount(@Nonnull OperationContext opContext) { + return getEntityDocCount(opContext, null); + } + + @WithSpan + public Map getEntityDocCount( + @Nonnull OperationContext opContext, @Nullable Filter filter) { return entityDocCounts - .computeIfAbsent(opContext.getSearchContextId(), k -> buildSupplier(opContext)) + .computeIfAbsent( + new EntityDocCountsKey(opContext.getSearchContextId(), filter), + k -> buildSupplier(opContext, filter)) .get(); } @@ -53,8 +74,9 @@ public List getNonEmptyEntities(@Nonnull OperationContext opContext) { .collect(Collectors.toList()); } - private Supplier> buildSupplier(@Nonnull OperationContext opContext) { + private Supplier> buildSupplier( + @Nonnull OperationContext opContext, @Nullable Filter filter) { return Suppliers.memoizeWithExpiration( - () -> fetchEntityDocCount(opContext), config.getTtlSeconds(), TimeUnit.SECONDS); + () -> fetchEntityDocCount(opContext, filter), config.getTtlSeconds(), TimeUnit.SECONDS); } } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java index 69ae1c311c5408..19cd1f767f4729 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java @@ -81,11 +81,13 @@ public void clear(@Nonnull OperationContext opContext) { } @Override - public long docCount(@Nonnull OperationContext opContext, @Nonnull String entityName) { + public long docCount( + @Nonnull OperationContext opContext, @Nonnull String entityName, @Nullable Filter filter) { return esSearchDAO.docCount( opContext.withSearchFlags( flags -> applyDefaultSearchFlags(flags, null, DEFAULT_SERVICE_SEARCH_FLAGS)), - entityName); + entityName, + filter); } @Override diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/ESSearchDAO.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/ESSearchDAO.java index 67de02614e10f5..d6bb1fb2401e84 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/ESSearchDAO.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/ESSearchDAO.java @@ -78,12 +78,17 @@ public class ESSearchDAO { @Nullable private final CustomSearchConfiguration customSearchConfiguration; public long docCount(@Nonnull OperationContext opContext, @Nonnull String entityName) { + return docCount(opContext, entityName, null); + } + + public long docCount( + @Nonnull OperationContext opContext, @Nonnull String entityName, @Nullable Filter filter) { EntitySpec entitySpec = opContext.getEntityRegistry().getEntitySpec(entityName); CountRequest countRequest = new CountRequest(opContext.getSearchContext().getIndexConvention().getIndexName(entitySpec)) .query( SearchRequestHandler.getFilterQuery( - opContext, null, entitySpec.getSearchableFieldTypes())); + opContext, filter, entitySpec.getSearchableFieldTypes())); try (Timer.Context ignored = MetricUtils.timer(this.getClass(), "docCount").time()) { return client.count(countRequest, RequestOptions.DEFAULT).getCount(); } catch (IOException e) { diff --git a/metadata-io/src/test/java/com/linkedin/metadata/recommendation/RecommendationsServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/recommendation/RecommendationsServiceTest.java index 976e8a62dff521..ca42f0327c86db 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/recommendation/RecommendationsServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/recommendation/RecommendationsServiceTest.java @@ -83,6 +83,7 @@ public void testService() throws URISyntaxException { TestOperationContexts.userContextNoSearchAuthorization( Urn.createFromString("urn:li:corpuser:me")), new RecommendationRequestContext().setScenario(ScenarioType.HOME), + null, 10); assertTrue(result.isEmpty()); @@ -95,6 +96,7 @@ public void testService() throws URISyntaxException { TestOperationContexts.userContextNoSearchAuthorization( Urn.createFromString("urn:li:corpuser:me")), new RecommendationRequestContext().setScenario(ScenarioType.HOME), + null, 10); assertEquals(result.size(), 1); RecommendationModule module = result.get(0); @@ -112,6 +114,7 @@ public void testService() throws URISyntaxException { TestOperationContexts.userContextNoSearchAuthorization( Urn.createFromString("urn:li:corpuser:me")), new RecommendationRequestContext().setScenario(ScenarioType.HOME), + null, 10); assertEquals(result.size(), 4); module = result.get(0); @@ -141,6 +144,7 @@ public void testService() throws URISyntaxException { TestOperationContexts.userContextNoSearchAuthorization( Urn.createFromString("urn:li:corpuser:me")), new RecommendationRequestContext().setScenario(ScenarioType.HOME), + null, 2); assertEquals(result.size(), 2); module = result.get(0); diff --git a/metadata-io/src/test/java/com/linkedin/metadata/recommendation/candidatesource/EntitySearchAggregationCandidateSourceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/recommendation/candidatesource/EntitySearchAggregationCandidateSourceTest.java index 5c79c000fb256f..99520c189034a9 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/recommendation/candidatesource/EntitySearchAggregationCandidateSourceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/recommendation/candidatesource/EntitySearchAggregationCandidateSourceTest.java @@ -1,9 +1,7 @@ package com.linkedin.metadata.recommendation.candidatesource; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; @@ -15,6 +13,7 @@ import com.linkedin.common.urn.Urn; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.query.filter.Criterion; +import com.linkedin.metadata.query.filter.Filter; import com.linkedin.metadata.recommendation.RecommendationContent; import com.linkedin.metadata.recommendation.RecommendationParams; import com.linkedin.metadata.recommendation.RecommendationRenderType; @@ -38,6 +37,7 @@ public class EntitySearchAggregationCandidateSourceTest { private EntitySearchAggregationSource valueBasedCandidateSource; private EntitySearchAggregationSource urnBasedCandidateSource; private OperationContext opContext; + private Filter filter; private static final Urn USER = new CorpuserUrn("test"); private static final RecommendationRequestContext CONTEXT = @@ -49,6 +49,7 @@ public void setup() { Mockito.reset(entitySearchService); valueBasedCandidateSource = buildCandidateSource("testValue", false); urnBasedCandidateSource = buildCandidateSource("testUrn", true); + filter = new Filter(); } private EntitySearchAggregationSource buildCandidateSource( @@ -97,12 +98,28 @@ public boolean isEligible( public void testWhenSearchServiceReturnsEmpty() { Mockito.when( entitySearchService.aggregateByValue( - any(OperationContext.class), eq(null), eq("testValue"), eq(null), anyInt())) + any(OperationContext.class), + eq(Collections.emptyList()), + eq("testValue"), + same(filter), + anyInt())) .thenReturn(Collections.emptyMap()); + List candidates = - valueBasedCandidateSource.getRecommendations(opContext, CONTEXT); + valueBasedCandidateSource.getRecommendations(opContext, CONTEXT, filter); assertTrue(candidates.isEmpty()); - assertFalse(valueBasedCandidateSource.getRecommendationModule(opContext, CONTEXT).isPresent()); + assertFalse( + valueBasedCandidateSource.getRecommendationModule(opContext, CONTEXT, filter).isPresent()); + + // Mockito's default stub could also return an empty map. + // Adding explicit verification to guard against this. + verify(entitySearchService, times(2)) + .aggregateByValue( + any(OperationContext.class), + eq(Collections.emptyList()), + eq("testValue"), + same(filter), + anyInt()); } @Test @@ -110,10 +127,10 @@ public void testWhenSearchServiceReturnsValueResults() { // One result Mockito.when( entitySearchService.aggregateByValue( - any(OperationContext.class), any(), eq("testValue"), eq(null), anyInt())) + any(OperationContext.class), any(), eq("testValue"), same(filter), anyInt())) .thenReturn(ImmutableMap.of("value1", 1L)); List candidates = - valueBasedCandidateSource.getRecommendations(opContext, CONTEXT); + valueBasedCandidateSource.getRecommendations(opContext, CONTEXT, filter); assertEquals(candidates.size(), 1); RecommendationContent content = candidates.get(0); assertEquals(content.getValue(), "value1"); @@ -128,14 +145,15 @@ public void testWhenSearchServiceReturnsValueResults() { new Criterion().setField("testValue").setValue("value1")); assertNotNull(params.getContentParams()); assertEquals(params.getContentParams().getCount().longValue(), 1L); - assertTrue(valueBasedCandidateSource.getRecommendationModule(opContext, CONTEXT).isPresent()); + assertTrue( + valueBasedCandidateSource.getRecommendationModule(opContext, CONTEXT, filter).isPresent()); // Multiple result Mockito.when( entitySearchService.aggregateByValue( - any(OperationContext.class), any(), eq("testValue"), eq(null), anyInt())) + any(OperationContext.class), any(), eq("testValue"), same(filter), anyInt())) .thenReturn(ImmutableMap.of("value1", 1L, "value2", 2L, "value3", 3L)); - candidates = valueBasedCandidateSource.getRecommendations(opContext, CONTEXT); + candidates = valueBasedCandidateSource.getRecommendations(opContext, CONTEXT, filter); assertEquals(candidates.size(), 2); content = candidates.get(0); assertEquals(content.getValue(), "value3"); @@ -163,7 +181,8 @@ public void testWhenSearchServiceReturnsValueResults() { new Criterion().setField("testValue").setValue("value2")); assertNotNull(params.getContentParams()); assertEquals(params.getContentParams().getCount().longValue(), 2L); - assertTrue(valueBasedCandidateSource.getRecommendationModule(opContext, CONTEXT).isPresent()); + assertTrue( + valueBasedCandidateSource.getRecommendationModule(opContext, CONTEXT, filter).isPresent()); } @Test @@ -174,10 +193,10 @@ public void testWhenSearchServiceReturnsUrnResults() { Urn testUrn3 = new TestEntityUrn("testUrn3", "testUrn3", "testUrn3"); Mockito.when( entitySearchService.aggregateByValue( - any(OperationContext.class), any(), eq("testUrn"), eq(null), anyInt())) + any(OperationContext.class), any(), eq("testUrn"), same(filter), anyInt())) .thenReturn(ImmutableMap.of(testUrn1.toString(), 1L)); List candidates = - urnBasedCandidateSource.getRecommendations(opContext, CONTEXT); + urnBasedCandidateSource.getRecommendations(opContext, CONTEXT, filter); assertEquals(candidates.size(), 1); RecommendationContent content = candidates.get(0); assertEquals(content.getValue(), testUrn1.toString()); @@ -192,16 +211,17 @@ public void testWhenSearchServiceReturnsUrnResults() { new Criterion().setField("testUrn").setValue(testUrn1.toString())); assertNotNull(params.getContentParams()); assertEquals(params.getContentParams().getCount().longValue(), 1L); - assertTrue(urnBasedCandidateSource.getRecommendationModule(opContext, CONTEXT).isPresent()); + assertTrue( + urnBasedCandidateSource.getRecommendationModule(opContext, CONTEXT, filter).isPresent()); // Multiple result Mockito.when( entitySearchService.aggregateByValue( - any(OperationContext.class), any(), eq("testUrn"), eq(null), anyInt())) + any(OperationContext.class), any(), eq("testUrn"), same(filter), anyInt())) .thenReturn( ImmutableMap.of( testUrn1.toString(), 1L, testUrn2.toString(), 2L, testUrn3.toString(), 3L)); - candidates = urnBasedCandidateSource.getRecommendations(opContext, CONTEXT); + candidates = urnBasedCandidateSource.getRecommendations(opContext, CONTEXT, filter); assertEquals(candidates.size(), 2); content = candidates.get(0); assertEquals(content.getValue(), testUrn3.toString()); @@ -229,6 +249,7 @@ public void testWhenSearchServiceReturnsUrnResults() { new Criterion().setField("testUrn").setValue(testUrn2.toString())); assertNotNull(params.getContentParams()); assertEquals(params.getContentParams().getCount().longValue(), 2L); - assertTrue(urnBasedCandidateSource.getRecommendationModule(opContext, CONTEXT).isPresent()); + assertTrue( + urnBasedCandidateSource.getRecommendationModule(opContext, CONTEXT, filter).isPresent()); } } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/recommendation/candidatesource/TestSource.java b/metadata-io/src/test/java/com/linkedin/metadata/recommendation/candidatesource/TestSource.java index 4350cdd2662a89..f94514e08c521e 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/recommendation/candidatesource/TestSource.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/recommendation/candidatesource/TestSource.java @@ -1,11 +1,13 @@ package com.linkedin.metadata.recommendation.candidatesource; +import com.linkedin.metadata.query.filter.Filter; import com.linkedin.metadata.recommendation.RecommendationContent; import com.linkedin.metadata.recommendation.RecommendationRenderType; import com.linkedin.metadata.recommendation.RecommendationRequestContext; import io.datahubproject.metadata.context.OperationContext; import java.util.List; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import lombok.Getter; import lombok.RequiredArgsConstructor; @@ -42,7 +44,9 @@ public boolean isEligible( @Override public List getRecommendations( - @Nonnull OperationContext opContext, @Nonnull RecommendationRequestContext requestContext) { + @Nonnull OperationContext opContext, + @Nonnull RecommendationRequestContext requestContext, + @Nullable Filter filter) { return contents; } } diff --git a/metadata-service/restli-client-api/src/main/java/com/linkedin/entity/client/EntityClient.java b/metadata-service/restli-client-api/src/main/java/com/linkedin/entity/client/EntityClient.java index e664e2141fe213..a7980d0d5c99f0 100644 --- a/metadata-service/restli-client-api/src/main/java/com/linkedin/entity/client/EntityClient.java +++ b/metadata-service/restli-client-api/src/main/java/com/linkedin/entity/client/EntityClient.java @@ -393,9 +393,17 @@ void setWritable(@Nonnull OperationContext opContext, boolean canWrite) @Nonnull Map batchGetTotalEntityCount( - @Nonnull OperationContext opContext, @Nonnull List entityName) + @Nonnull OperationContext opContext, + @Nonnull List entityName, + @Nullable Filter filter) throws RemoteInvocationException; + default Map batchGetTotalEntityCount( + @Nonnull OperationContext opContext, @Nonnull List entityName) + throws RemoteInvocationException { + return batchGetTotalEntityCount(opContext, entityName, null); + } + /** List all urns existing for a particular Entity type. */ ListUrnsResult listUrns( @Nonnull OperationContext opContext, diff --git a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java index 2be716048f3d8b..21246407f20293 100644 --- a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java +++ b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java @@ -772,8 +772,14 @@ public void setWritable(@Nonnull OperationContext opContext, boolean canWrite) @Override @Nonnull public Map batchGetTotalEntityCount( - @Nonnull OperationContext opContext, @Nonnull List entityName) + @Nonnull OperationContext opContext, + @Nonnull List entityName, + @Nullable Filter filter) throws RemoteInvocationException { + if (filter != null) { + throw new UnsupportedOperationException("Filter not yet supported in restli-client."); + } + EntitiesDoBatchGetTotalEntityCountRequestBuilder requestBuilder = ENTITIES_REQUEST_BUILDERS .actionBatchGetTotalEntityCount() diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/recommendation/RecommendationsService.java b/metadata-service/services/src/main/java/com/linkedin/metadata/recommendation/RecommendationsService.java index fcea114446c498..c554f2b919b063 100644 --- a/metadata-service/services/src/main/java/com/linkedin/metadata/recommendation/RecommendationsService.java +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/recommendation/RecommendationsService.java @@ -1,5 +1,6 @@ package com.linkedin.metadata.recommendation; +import com.linkedin.metadata.query.filter.Filter; import com.linkedin.metadata.recommendation.candidatesource.RecommendationSource; import com.linkedin.metadata.recommendation.ranker.RecommendationModuleRanker; import com.linkedin.metadata.utils.ConcurrencyUtils; @@ -10,6 +11,7 @@ import java.util.Optional; import java.util.stream.Collectors; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -57,6 +59,7 @@ private void validateRecommendationSources(final List cand public List listRecommendations( @Nonnull OperationContext opContext, @Nonnull RecommendationRequestContext requestContext, + @Nullable Filter filter, int limit) { // Get recommendation candidates from sources which are eligible, in parallel @@ -65,7 +68,7 @@ public List listRecommendations( _candidateSources.stream() .filter(source -> source.isEligible(opContext, requestContext)) .collect(Collectors.toList()), - source -> source.getRecommendationModule(opContext, requestContext), + source -> source.getRecommendationModule(opContext, requestContext, filter), (source, exception) -> { log.error( "Error while fetching candidate modules from source {}", source, exception); diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/recommendation/candidatesource/EntitySearchAggregationSource.java b/metadata-service/services/src/main/java/com/linkedin/metadata/recommendation/candidatesource/EntitySearchAggregationSource.java index 33e5510a13c6fa..da4dd9d76d4519 100644 --- a/metadata-service/services/src/main/java/com/linkedin/metadata/recommendation/candidatesource/EntitySearchAggregationSource.java +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/recommendation/candidatesource/EntitySearchAggregationSource.java @@ -6,6 +6,7 @@ import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.query.filter.Criterion; import com.linkedin.metadata.query.filter.CriterionArray; +import com.linkedin.metadata.query.filter.Filter; import com.linkedin.metadata.recommendation.ContentParams; import com.linkedin.metadata.recommendation.RecommendationContent; import com.linkedin.metadata.recommendation.RecommendationParams; @@ -72,10 +73,16 @@ protected boolean isValidCandidate(@Nonnull OperationContext opContext, T ca @Override @WithSpan public List getRecommendations( - @Nonnull OperationContext opContext, @Nullable RecommendationRequestContext requestContext) { + @Nonnull OperationContext opContext, + @Nullable RecommendationRequestContext requestContext, + @Nullable Filter filter) { Map aggregationResult = entitySearchService.aggregateByValue( - opContext, getEntityNames(entityRegistry), getSearchFieldName(), null, getMaxContent()); + opContext, + getEntityNames(entityRegistry), + getSearchFieldName(), + filter, + getMaxContent()); if (aggregationResult.isEmpty()) { return Collections.emptyList(); diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/recommendation/candidatesource/RecentlySearchedSource.java b/metadata-service/services/src/main/java/com/linkedin/metadata/recommendation/candidatesource/RecentlySearchedSource.java index e10ef7a3dd665d..ca3d43762e0738 100644 --- a/metadata-service/services/src/main/java/com/linkedin/metadata/recommendation/candidatesource/RecentlySearchedSource.java +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/recommendation/candidatesource/RecentlySearchedSource.java @@ -5,6 +5,7 @@ import com.linkedin.common.urn.Urn; import com.linkedin.metadata.datahubusage.DataHubUsageEventConstants; import com.linkedin.metadata.datahubusage.DataHubUsageEventType; +import com.linkedin.metadata.query.filter.Filter; import com.linkedin.metadata.recommendation.RecommendationContent; import com.linkedin.metadata.recommendation.RecommendationParams; import com.linkedin.metadata.recommendation.RecommendationRenderType; @@ -19,6 +20,7 @@ import java.util.Optional; import java.util.stream.Collectors; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.opensearch.action.search.SearchRequest; @@ -78,7 +80,9 @@ public boolean isEligible( @Override public List getRecommendations( - @Nonnull OperationContext opContext, @Nonnull RecommendationRequestContext requestContext) { + @Nonnull OperationContext opContext, + @Nonnull RecommendationRequestContext requestContext, + @Nullable Filter filter) { SearchRequest searchRequest = buildSearchRequest(opContext.getSessionActorContext().getActorUrn()); try (Timer.Context ignored = MetricUtils.timer(this.getClass(), "getRecentlySearched").time()) { diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/recommendation/candidatesource/RecommendationSource.java b/metadata-service/services/src/main/java/com/linkedin/metadata/recommendation/candidatesource/RecommendationSource.java index 95c5df64ed2f29..ddf203067f455e 100644 --- a/metadata-service/services/src/main/java/com/linkedin/metadata/recommendation/candidatesource/RecommendationSource.java +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/recommendation/candidatesource/RecommendationSource.java @@ -1,5 +1,6 @@ package com.linkedin.metadata.recommendation.candidatesource; +import com.linkedin.metadata.query.filter.Filter; import com.linkedin.metadata.recommendation.RecommendationContent; import com.linkedin.metadata.recommendation.RecommendationContentArray; import com.linkedin.metadata.recommendation.RecommendationModule; @@ -10,6 +11,7 @@ import java.util.List; import java.util.Optional; import javax.annotation.Nonnull; +import javax.annotation.Nullable; /** Base interface for defining a candidate source for recommendation module */ public interface RecommendationSource { @@ -42,7 +44,15 @@ boolean isEligible( */ @WithSpan List getRecommendations( - @Nonnull OperationContext opContext, @Nonnull RecommendationRequestContext requestContext); + @Nonnull OperationContext opContext, + @Nonnull RecommendationRequestContext requestContext, + @Nullable Filter filter); + + // retaining this for backward compatibility + default List getRecommendations( + @Nonnull OperationContext opContext, @Nonnull RecommendationRequestContext requestContext) { + return getRecommendations(opContext, requestContext, null); + } /** * Get the full recommendations module itself provided the request context. @@ -52,12 +62,15 @@ List getRecommendations( * @return list of recommendation candidates */ default Optional getRecommendationModule( - @Nonnull OperationContext opContext, @Nonnull RecommendationRequestContext requestContext) { + @Nonnull OperationContext opContext, + @Nonnull RecommendationRequestContext requestContext, + @Nullable Filter filter) { if (!isEligible(opContext, requestContext)) { return Optional.empty(); } - List recommendations = getRecommendations(opContext, requestContext); + List recommendations = + getRecommendations(opContext, requestContext, filter); if (recommendations.isEmpty()) { return Optional.empty(); } @@ -69,4 +82,10 @@ default Optional getRecommendationModule( .setRenderType(getRenderType()) .setContent(new RecommendationContentArray(recommendations))); } + + // retaining this for backward compatibility + default Optional getRecommendationModule( + @Nonnull OperationContext opContext, @Nonnull RecommendationRequestContext requestContext) { + return getRecommendationModule(opContext, requestContext); + } } diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/search/EntitySearchService.java b/metadata-service/services/src/main/java/com/linkedin/metadata/search/EntitySearchService.java index f99451b923ef04..a253f9ffc25311 100644 --- a/metadata-service/services/src/main/java/com/linkedin/metadata/search/EntitySearchService.java +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/search/EntitySearchService.java @@ -24,8 +24,14 @@ public interface EntitySearchService { * Get the number of documents corresponding to the entity * * @param entityName name of the entity + * @param filter optional filter */ - long docCount(@Nonnull OperationContext opContext, @Nonnull String entityName); + long docCount( + @Nonnull OperationContext opContext, @Nonnull String entityName, @Nullable Filter filter); + + default long docCount(@Nonnull OperationContext opContext, @Nonnull String entityName) { + return docCount(opContext, entityName, null); + } /** * Updates or inserts the given search document. From d3fb698d8d9f50a56c14ed0172975f27f5c6567d Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Thu, 18 Apr 2024 14:46:32 +0530 Subject: [PATCH 5/5] fix(ingest): make gms url configuration resilient in rest emitter (#10316) --- docs/lineage/airflow.md | 2 ++ .../airflow-plugin/tests/unit/test_airflow.py | 4 ++-- metadata-ingestion/src/datahub/emitter/rest_emitter.py | 4 ++-- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/docs/lineage/airflow.md b/docs/lineage/airflow.md index 2e584b8ce0d24c..d501ea407c0728 100644 --- a/docs/lineage/airflow.md +++ b/docs/lineage/airflow.md @@ -45,6 +45,8 @@ Set up a DataHub connection in Airflow, either via command line or the Airflow U airflow connections add --conn-type 'datahub-rest' 'datahub_rest_default' --conn-host 'http://datahub-gms:8080' --conn-password '' ``` +If you are using hosted Acryl Datahub then please use `https://YOUR_PREFIX.acryl.io/gms` as the `--conn-host` parameter. + #### Airflow UI On the Airflow UI, go to Admin -> Connections and click the "+" symbol to create a new connection. Select "DataHub REST Server" from the dropdown for "Connection Type" and enter the appropriate values. diff --git a/metadata-ingestion-modules/airflow-plugin/tests/unit/test_airflow.py b/metadata-ingestion-modules/airflow-plugin/tests/unit/test_airflow.py index 75b6f85b27bc12..c88f4d77b7aebd 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/unit/test_airflow.py +++ b/metadata-ingestion-modules/airflow-plugin/tests/unit/test_airflow.py @@ -41,13 +41,13 @@ datahub_rest_connection_config = Connection( conn_id="datahub_rest_test", conn_type="datahub_rest", - host="http://test_host:8080/", + host="http://test_host:8080", extra=None, ) datahub_rest_connection_config_with_timeout = Connection( conn_id="datahub_rest_test", conn_type="datahub_rest", - host="http://test_host:8080/", + host="http://test_host:8080", extra=json.dumps({"timeout_sec": 5}), ) diff --git a/metadata-ingestion/src/datahub/emitter/rest_emitter.py b/metadata-ingestion/src/datahub/emitter/rest_emitter.py index d4e974d5855178..8baa8481ea4f73 100644 --- a/metadata-ingestion/src/datahub/emitter/rest_emitter.py +++ b/metadata-ingestion/src/datahub/emitter/rest_emitter.py @@ -10,7 +10,7 @@ from requests.adapters import HTTPAdapter, Retry from requests.exceptions import HTTPError, RequestException -from datahub.cli.cli_utils import get_system_auth +from datahub.cli.cli_utils import fixup_gms_url, get_system_auth from datahub.configuration.common import ConfigurationError, OperationalError from datahub.emitter.generic_emitter import Emitter from datahub.emitter.mcp import MetadataChangeProposalWrapper @@ -72,7 +72,7 @@ def __init__( ): if not gms_server: raise ConfigurationError("gms server is required") - self._gms_server = gms_server + self._gms_server = fixup_gms_url(gms_server) self._token = token self.server_config: Dict[str, Any] = {}