Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
gabe-lyons authored Oct 11, 2023
2 parents decd327 + 1b06c6a commit e7efcc2
Show file tree
Hide file tree
Showing 23 changed files with 875 additions and 373 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,7 @@ private void configureQueryResolvers(final RuntimeWiring.Builder builder) {
.dataFetcher("glossaryNode", getResolver(glossaryNodeType))
.dataFetcher("domain", getResolver((domainType)))
.dataFetcher("dataPlatform", getResolver(dataPlatformType))
.dataFetcher("dataPlatformInstance", getResolver(dataPlatformInstanceType))
.dataFetcher("mlFeatureTable", getResolver(mlFeatureTableType))
.dataFetcher("mlFeature", getResolver(mlFeatureType))
.dataFetcher("mlPrimaryKey", getResolver(mlPrimaryKeyType))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,25 @@
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.AutoCompleteResults;
import com.linkedin.datahub.graphql.generated.DataPlatformInstance;
import com.linkedin.datahub.graphql.generated.Entity;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.datahub.graphql.generated.FacetFilterInput;
import com.linkedin.datahub.graphql.generated.SearchResults;
import com.linkedin.datahub.graphql.types.dataplatforminstance.mappers.DataPlatformInstanceMapper;
import com.linkedin.datahub.graphql.types.mappers.AutoCompleteResultsMapper;
import com.linkedin.datahub.graphql.types.SearchableEntityType;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.query.AutoCompleteResult;
import com.linkedin.metadata.query.filter.Filter;
import graphql.execution.DataFetcherResult;
import org.apache.commons.lang3.NotImplementedException;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
Expand All @@ -22,7 +31,10 @@
import java.util.function.Function;
import java.util.stream.Collectors;

public class DataPlatformInstanceType implements com.linkedin.datahub.graphql.types.EntityType<DataPlatformInstance, String> {
import static com.linkedin.metadata.Constants.DATA_PLATFORM_INSTANCE_ENTITY_NAME;

public class DataPlatformInstanceType implements SearchableEntityType<DataPlatformInstance, String>,
com.linkedin.datahub.graphql.types.EntityType<DataPlatformInstance, String> {

static final Set<String> ASPECTS_TO_FETCH = ImmutableSet.of(
Constants.DATA_PLATFORM_INSTANCE_KEY_ASPECT_NAME,
Expand Down Expand Up @@ -84,4 +96,24 @@ public List<DataFetcherResult<DataPlatformInstance>> batchLoad(@Nonnull List<Str
}
}

@Override
public SearchResults search(@Nonnull String query,
@Nullable List<FacetFilterInput> filters,
int start,
int count,
@Nonnull final QueryContext context) throws Exception {
throw new NotImplementedException("Searchable type (deprecated) not implemented on DataPlatformInstance entity type");
}

@Override
public AutoCompleteResults autoComplete(@Nonnull String query,
@Nullable String field,
@Nullable Filter filters,
int limit,
@Nonnull final QueryContext context) throws Exception {
final AutoCompleteResult result = _entityClient.autoComplete(DATA_PLATFORM_INSTANCE_ENTITY_NAME, query,
filters, limit, context.getAuthentication());
return AutoCompleteResultsMapper.map(result);
}

}
5 changes: 5 additions & 0 deletions datahub-graphql-core/src/main/resources/entity.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,11 @@ type Query {
listOwnershipTypes(
"Input required for listing custom ownership types"
input: ListOwnershipTypesInput!): ListOwnershipTypesResult!

"""
Fetch a Data Platform Instance by primary key (urn)
"""
dataPlatformInstance(urn: String!): DataPlatformInstance
}

"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
Expand Down Expand Up @@ -35,4 +36,20 @@ public Set<String> getOwners() {
}
return fieldResolvers.get(ResourceFieldType.OWNER).getFieldValuesFuture().join().getValues();
}

/**
* Fetch the platform instance for a Resolved Resource Spec
* @return a Platform Instance or null if one does not exist.
*/
@Nullable
public String getDataPlatformInstance() {
if (!fieldResolvers.containsKey(ResourceFieldType.DATA_PLATFORM_INSTANCE)) {
return null;
}
Set<String> dataPlatformInstance = fieldResolvers.get(ResourceFieldType.DATA_PLATFORM_INSTANCE).getFieldValuesFuture().join().getValues();
if (dataPlatformInstance.size() > 0) {
return dataPlatformInstance.stream().findFirst().get();
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,9 @@ public enum ResourceFieldType {
/**
* Domains of resource
*/
DOMAIN
DOMAIN,
/**
* Data platform instance of resource
*/
DATA_PLATFORM_INSTANCE
}
9 changes: 9 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/api/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from dataclasses import dataclass
from typing import TYPE_CHECKING, Dict, Generic, Iterable, Optional, Tuple, TypeVar

from datahub.configuration.common import ConfigurationError
from datahub.emitter.mce_builder import set_dataset_urn_to_lower
from datahub.ingestion.api.committable import Committable
from datahub.ingestion.graph.client import DataHubGraph
Expand Down Expand Up @@ -75,3 +76,11 @@ def register_checkpointer(self, committable: Committable) -> None:

def get_committables(self) -> Iterable[Tuple[str, Committable]]:
yield from self.checkpointers.items()

def require_graph(self, operation: Optional[str] = None) -> DataHubGraph:
if not self.graph:
raise ConfigurationError(
f"{operation or 'This operation'} requires a graph, but none was provided. "
"To provide one, either use the datahub-rest sink or set the top-level datahub_api config in the recipe."
)
return self.graph
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,9 @@ def __init__(self, config: CSVEnricherConfig, ctx: PipelineContext):
# Map from entity urn to a list of SubResourceRow.
self.editable_schema_metadata_map: Dict[str, List[SubResourceRow]] = {}
self.should_overwrite: bool = self.config.write_semantics == "OVERRIDE"
if not self.should_overwrite and not self.ctx.graph:
raise ConfigurationError(
"With PATCH semantics, the csv-enricher source requires a datahub_api to connect to. "
"Consider using the datahub-rest sink or provide a datahub_api: configuration on your ingestion recipe."
)

if not self.should_overwrite:
self.ctx.require_graph(operation="The csv-enricher's PATCH semantics flag")

def get_resource_glossary_terms_work_unit(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@
DBTCommonConfig,
DBTNode,
DBTSourceBase,
DBTTest,
DBTTestResult,
)
from datahub.ingestion.source.dbt.dbt_tests import DBTTest, DBTTestResult

logger = logging.getLogger(__name__)

Expand Down
Loading

0 comments on commit e7efcc2

Please sign in to comment.