Skip to content

Commit

Permalink
Merge branch 'master' into CUS-903-dbt-meta-mapping
Browse files Browse the repository at this point in the history
  • Loading branch information
ethan-cartwright authored Oct 2, 2023
2 parents 45ce14c + b61c38a commit c27b5d6
Show file tree
Hide file tree
Showing 46 changed files with 623 additions and 378 deletions.
16 changes: 16 additions & 0 deletions .github/scripts/docker_helpers.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,26 @@ function get_tag {
echo $(echo ${GITHUB_REF} | sed -e "s,refs/heads/${MAIN_BRANCH},${MAIN_BRANCH_TAG}\,${SHORT_SHA},g" -e 's,refs/tags/,,g' -e 's,refs/pull/\([0-9]*\).*,pr\1,g')
}

function get_tag_slim {
echo $(echo ${GITHUB_REF} | sed -e "s,refs/heads/${MAIN_BRANCH},${MAIN_BRANCH_TAG}-slim\,${SHORT_SHA}-slim,g" -e 's,refs/tags/,,g' -e 's,refs/pull/\([0-9]*\).*,pr\1-slim,g')
}

function get_tag_full {
echo $(echo ${GITHUB_REF} | sed -e "s,refs/heads/${MAIN_BRANCH},${MAIN_BRANCH_TAG}-full\,${SHORT_SHA}-full,g" -e 's,refs/tags/,,g' -e 's,refs/pull/\([0-9]*\).*,pr\1-full,g')
}

function get_python_docker_release_v {
echo $(echo ${GITHUB_REF} | sed -e "s,refs/heads/${MAIN_BRANCH},0.0.0+docker.${SHORT_SHA},g" -e 's,refs/tags/v\(.*\),\1+docker,g' -e 's,refs/pull/\([0-9]*\).*,0.0.0+docker.pr\1,g')
}

function get_unique_tag {
echo $(echo ${GITHUB_REF} | sed -e "s,refs/heads/${MAIN_BRANCH},${SHORT_SHA},g" -e 's,refs/tags/,,g' -e 's,refs/pull/\([0-9]*\).*,pr\1,g')
}

function get_unique_tag_slim {
echo $(echo ${GITHUB_REF} | sed -e "s,refs/heads/${MAIN_BRANCH},${SHORT_SHA}-slim,g" -e 's,refs/tags/,,g' -e 's,refs/pull/\([0-9]*\).*,pr\1-slim,g')
}

function get_unique_tag_full {
echo $(echo ${GITHUB_REF} | sed -e "s,refs/heads/${MAIN_BRANCH},${SHORT_SHA}-full,g" -e 's,refs/tags/,,g' -e 's,refs/pull/\([0-9]*\).*,pr\1-full,g')
}
22 changes: 11 additions & 11 deletions .github/workflows/docker-unified.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ jobs:
run: |
source .github/scripts/docker_helpers.sh
echo "tag=$(get_tag)" >> $GITHUB_OUTPUT
echo "slim_tag=$(get_tag)-slim" >> $GITHUB_OUTPUT
echo "full_tag=$(get_tag)-full" >> $GITHUB_OUTPUT
echo "slim_tag=$(get_tag_slim)" >> $GITHUB_OUTPUT
echo "full_tag=$(get_tag_full)" >> $GITHUB_OUTPUT
echo "unique_tag=$(get_unique_tag)" >> $GITHUB_OUTPUT
echo "unique_slim_tag=$(get_unique_tag)-slim" >> $GITHUB_OUTPUT
echo "unique_full_tag=$(get_unique_tag)" >> $GITHUB_OUTPUT
echo "unique_slim_tag=$(get_unique_tag_slim)" >> $GITHUB_OUTPUT
echo "unique_full_tag=$(get_unique_tag_full)" >> $GITHUB_OUTPUT
echo "python_release_version=$(get_python_docker_release_v)" >> $GITHUB_OUTPUT
- name: Check whether publishing enabled
id: publish
Expand Down Expand Up @@ -459,7 +459,7 @@ jobs:
platforms: linux/amd64,linux/arm64/v8
- name: Compute DataHub Ingestion (Base) Tag
id: tag
run: echo "tag=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.tag || 'head' }}" >> $GITHUB_OUTPUT
run: echo "tag=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_tag || 'head' }}" >> $GITHUB_OUTPUT
datahub_ingestion_base_slim_build:
name: Build and Push DataHub Ingestion (Base-Slim) Docker Image
runs-on: ubuntu-latest
Expand Down Expand Up @@ -524,14 +524,14 @@ jobs:
if: ${{ needs.setup.outputs.publish != 'true' && steps.filter.outputs.datahub-ingestion-base == 'true' }}
with:
image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_tag || 'head' }}
- name: Build and push Base-Full Image
- name: Build and push (Base-Full) Image
if: ${{ steps.filter.outputs.datahub-ingestion-base == 'true' }}
uses: ./.github/actions/docker-custom-build-and-push
with:
target: full-install
images: |
${{ env.DATAHUB_INGESTION_BASE_IMAGE }}
tags: ${{ needs.setup.outputs.unique_full_tag }}
tags: ${{ needs.setup.outputs.full_tag }}
username: ${{ secrets.ACRYL_DOCKER_USERNAME }}
password: ${{ secrets.ACRYL_DOCKER_PASSWORD }}
build-args: |
Expand Down Expand Up @@ -656,7 +656,7 @@ jobs:
uses: ishworkh/docker-image-artifact-download@v1
if: ${{ needs.setup.outputs.publish != 'true' && steps.filter.outputs.datahub-ingestion-base == 'true' }}
with:
image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_full_tag || 'head' }}
image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_tag || 'head' }}
- name: Build and push Full Image
if: ${{ steps.filter.outputs.datahub-ingestion-base == 'true' || steps.filter.outputs.datahub-ingestion == 'true' || needs.setup.outputs.publish }}
uses: ./.github/actions/docker-custom-build-and-push
Expand All @@ -666,9 +666,9 @@ jobs:
${{ env.DATAHUB_INGESTION_IMAGE }}
build-args: |
BASE_IMAGE=${{ env.DATAHUB_INGESTION_BASE_IMAGE }}
DOCKER_VERSION=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_full_tag || 'head' }}
DOCKER_VERSION=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_tag || 'head' }}
RELEASE_VERSION=${{ needs.setup.outputs.python_release_version }}
tags: ${{ needs.setup.outputs.unique_full_tag }}
tags: ${{ needs.setup.outputs.tag }}
username: ${{ secrets.ACRYL_DOCKER_USERNAME }}
password: ${{ secrets.ACRYL_DOCKER_PASSWORD }}
publish: ${{ needs.setup.outputs.publish }}
Expand All @@ -677,7 +677,7 @@ jobs:
platforms: linux/amd64,linux/arm64/v8
- name: Compute Tag (Full)
id: tag
run: echo "tag=${{ (steps.filter.outputs.datahub-ingestion-base == 'true' || steps.filter.outputs.datahub-ingestion == 'true') && needs.setup.outputs.unique_full_tag || 'head' }}" >> $GITHUB_OUTPUT
run: echo "tag=${{ (steps.filter.outputs.datahub-ingestion-base == 'true' || steps.filter.outputs.datahub-ingestion == 'true') && needs.setup.outputs.unique_tag || 'head' }}" >> $GITHUB_OUTPUT
datahub_ingestion_full_scan:
permissions:
contents: read # for actions/checkout to fetch code
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ subprojects {
maxParallelForks = Runtime.runtime.availableProcessors().intdiv(2) ?: 1

if (project.configurations.getByName("testImplementation").getDependencies()
.any{ it.getName() == "testng" }) {
.any{ it.getName().contains("testng") }) {
useTestNG()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public static List<ReindexConfig> getAllReindexConfigs(List<ElasticSearchIndexed
List<ReindexConfig> reindexConfigs = new ArrayList<>(_reindexConfigs);
if (reindexConfigs.isEmpty()) {
for (ElasticSearchIndexed elasticSearchIndexed : elasticSearchIndexedList) {
reindexConfigs.addAll(elasticSearchIndexed.getReindexConfigs());
reindexConfigs.addAll(elasticSearchIndexed.buildReindexConfigs());
}
_reindexConfigs = new ArrayList<>(reindexConfigs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.linkedin.metadata.models.registry.ConfigEntityRegistry;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.search.SearchService;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.EntityIndexBuilders;
import io.ebean.Database;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.test.mock.mockito.MockBean;
Expand Down Expand Up @@ -35,4 +36,7 @@ public class UpgradeCliApplicationTestConfiguration {

@MockBean
ConfigEntityRegistry configEntityRegistry;

@MockBean
public EntityIndexBuilders entityIndexBuilders;
}
2 changes: 2 additions & 0 deletions docker/datahub-ingestion-base/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ ext {
docker_registry = rootProject.ext.docker_registry == 'linkedin' ? 'acryldata' : docker_registry
docker_repo = 'datahub-ingestion-base'
docker_dir = 'datahub-ingestion-base'

revision = 2 // increment to trigger rebuild
}

docker {
Expand Down
2 changes: 2 additions & 0 deletions docker/datahub-ingestion/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ ext {
docker_registry = rootProject.ext.docker_registry == 'linkedin' ? 'acryldata' : docker_registry
docker_repo = 'datahub-ingestion'
docker_dir = 'datahub-ingestion'

revision = 2 // increment to trigger rebuild
}

dependencies {
Expand Down
7 changes: 2 additions & 5 deletions metadata-ingestion/src/datahub/cli/delete_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,8 @@
from datahub.cli import cli_utils
from datahub.configuration.datetimes import ClickDatetime
from datahub.emitter.aspect import ASPECT_MAP, TIMESERIES_ASPECT_MAP
from datahub.ingestion.graph.client import (
DataHubGraph,
RemovedStatusFilter,
get_default_graph,
)
from datahub.ingestion.graph.client import DataHubGraph, get_default_graph
from datahub.ingestion.graph.filters import RemovedStatusFilter
from datahub.telemetry import telemetry
from datahub.upgrade import upgrade
from datahub.utilities.perf_timer import PerfTimer
Expand Down
171 changes: 8 additions & 163 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@
from datahub.cli.cli_utils import get_url_and_token
from datahub.configuration.common import ConfigModel, GraphError, OperationalError
from datahub.emitter.aspect import TIMESERIES_ASPECT_MAP
from datahub.emitter.mce_builder import (
DEFAULT_ENV,
Aspect,
make_data_platform_urn,
make_dataplatform_instance_urn,
)
from datahub.emitter.mce_builder import DEFAULT_ENV, Aspect
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.emitter.serialization_helper import post_json_transform
from datahub.ingestion.graph.filters import (
RemovedStatusFilter,
SearchFilterRule,
generate_filter,
)
from datahub.ingestion.source.state.checkpoint import Checkpoint
from datahub.metadata.schema_classes import (
ASPECT_NAME_MAP,
Expand Down Expand Up @@ -59,8 +59,6 @@

logger = logging.getLogger(__name__)

SearchFilterRule = Dict[str, Any]


class DatahubClientConfig(ConfigModel):
"""Configuration class for holding connectivity to datahub gms"""
Expand All @@ -81,19 +79,6 @@ class DatahubClientConfig(ConfigModel):
DataHubGraphConfig = DatahubClientConfig


class RemovedStatusFilter(enum.Enum):
"""Filter for the status of entities during search."""

NOT_SOFT_DELETED = "NOT_SOFT_DELETED"
"""Search only entities that have not been marked as deleted."""

ALL = "ALL"
"""Search all entities, including deleted entities."""

ONLY_SOFT_DELETED = "ONLY_SOFT_DELETED"
"""Search only soft-deleted entities."""


@dataclass
class RelatedEntity:
urn: str
Expand Down Expand Up @@ -567,7 +552,7 @@ def _bulk_fetch_schema_info_by_filter(
# Add the query default of * if no query is specified.
query = query or "*"

orFilters = self.generate_filter(
orFilters = generate_filter(
platform, platform_instance, env, container, status, extraFilters
)

Expand Down Expand Up @@ -621,54 +606,6 @@ def _bulk_fetch_schema_info_by_filter(
if entity.get("schemaMetadata"):
yield entity["urn"], entity["schemaMetadata"]

def generate_filter(
self,
platform: Optional[str],
platform_instance: Optional[str],
env: Optional[str],
container: Optional[str],
status: RemovedStatusFilter,
extraFilters: Optional[List[SearchFilterRule]],
) -> List[Dict[str, List[SearchFilterRule]]]:
andFilters: List[SearchFilterRule] = []

# Platform filter.
if platform:
andFilters.append(self._get_platform_filter(platform))

# Platform instance filter.
if platform_instance:
andFilters.append(
self._get_platform_instance_filter(platform, platform_instance)
)

# Browse path v2 filter.
if container:
andFilters.append(self._get_container_filter(container))

# Status filter.
status_filter = self._get_status_filer(status)
if status_filter:
andFilters.append(status_filter)

# Extra filters.
if extraFilters:
andFilters += extraFilters

orFilters: List[Dict[str, List[SearchFilterRule]]] = [{"and": andFilters}]

# Env filter
if env:
envOrConditions = self._get_env_or_conditions(env)
# This matches ALL of the andFilters and at least one of the envOrConditions.
orFilters = [
{"and": andFilters["and"] + [extraCondition]}
for extraCondition in envOrConditions
for andFilters in orFilters
]

return orFilters

def get_urns_by_filter(
self,
*,
Expand Down Expand Up @@ -709,7 +646,7 @@ def get_urns_by_filter(
query = query or "*"

# Env filter.
orFilters = self.generate_filter(
orFilters = generate_filter(
platform, platform_instance, env, container, status, extraFilters
)

Expand Down Expand Up @@ -778,98 +715,6 @@ def _scroll_across_entities(
f"Scrolling to next scrollAcrossEntities page: {scroll_id}"
)

def _get_env_or_conditions(self, env: str) -> List[SearchFilterRule]:
# The env filter is a bit more tricky since it's not always stored
# in the same place in ElasticSearch.
return [
# For most entity types, we look at the origin field.
{
"field": "origin",
"value": env,
"condition": "EQUAL",
},
# For containers, we look at the customProperties field.
# For any containers created after https://github.com/datahub-project/datahub/pull/8027,
# we look for the "env" property. Otherwise, we use the "instance" property.
{
"field": "customProperties",
"value": f"env={env}",
},
{
"field": "customProperties",
"value": f"instance={env}",
},
# Note that not all entity types have an env (e.g. dashboards / charts).
# If the env filter is specified, these will be excluded.
]

def _get_status_filer(
self, status: RemovedStatusFilter
) -> Optional[SearchFilterRule]:
if status == RemovedStatusFilter.NOT_SOFT_DELETED:
# Subtle: in some cases (e.g. when the dataset doesn't have a status aspect), the
# removed field is simply not present in the ElasticSearch document. Ideally this
# would be a "removed" : "false" filter, but that doesn't work. Instead, we need to
# use a negated filter.
return {
"field": "removed",
"values": ["true"],
"condition": "EQUAL",
"negated": True,
}

elif status == RemovedStatusFilter.ONLY_SOFT_DELETED:
return {
"field": "removed",
"values": ["true"],
"condition": "EQUAL",
}

elif status == RemovedStatusFilter.ALL:
# We don't need to add a filter for this case.
return None
else:
raise ValueError(f"Invalid status filter: {status}")

def _get_container_filter(self, container: str) -> SearchFilterRule:
# Warn if container is not a fully qualified urn.
# TODO: Change this once we have a first-class container urn type.
if guess_entity_type(container) != "container":
raise ValueError(f"Invalid container urn: {container}")

return {
"field": "browsePathV2",
"values": [container],
"condition": "CONTAIN",
}

def _get_platform_instance_filter(
self, platform: Optional[str], platform_instance: str
) -> SearchFilterRule:
if platform:
# Massage the platform instance into a fully qualified urn, if necessary.
platform_instance = make_dataplatform_instance_urn(
platform, platform_instance
)

# Warn if platform_instance is not a fully qualified urn.
# TODO: Change this once we have a first-class data platform instance urn type.
if guess_entity_type(platform_instance) != "dataPlatformInstance":
raise ValueError(f"Invalid data platform instance urn: {platform_instance}")

return {
"field": "platformInstance",
"values": [platform_instance],
"condition": "EQUAL",
}

def _get_platform_filter(self, platform: str) -> SearchFilterRule:
return {
"field": "platform.keyword",
"values": [make_data_platform_urn(platform)],
"condition": "EQUAL",
}

def _get_types(self, entity_types: Optional[List[str]]) -> Optional[List[str]]:
types: Optional[List[str]] = None
if entity_types is not None:
Expand Down
Loading

0 comments on commit c27b5d6

Please sign in to comment.