Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Dec 27, 2024
2 parents 199bfcd + 3ca8d09 commit 421bd46
Show file tree
Hide file tree
Showing 21 changed files with 376 additions and 269 deletions.
50 changes: 36 additions & 14 deletions .github/workflows/docker-unified.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1010,18 +1010,39 @@ jobs:
needs: setup
outputs:
matrix: ${{ steps.set-matrix.outputs.matrix }}
cypress_batch_count: ${{ steps.set-batch-count.outputs.cypress_batch_count }}
python_batch_count: ${{ steps.set-batch-count.outputs.python_batch_count }}
steps:
- id: set-batch-count
# Tests are split simply to ensure the configured number of batches for parallelization. This may need some
# increase as a new tests added increase the duration where an additional parallel batch helps.
# python_batch_count is used to split pytests in the smoke-test (batches of actual test functions)
# cypress_batch_count is used to split the collection of cypress test specs into batches.
run: |
echo "cypress_batch_count=11" >> "$GITHUB_OUTPUT"
echo "python_batch_count=5" >> "$GITHUB_OUTPUT"
- id: set-matrix
# For m batches for python and n batches for cypress, we need a test matrix of python x m + cypress x n.
# while the github action matrix generation can handle these two parts individually, there isnt a way to use the
# two generated matrices for the same job. So, produce that matrix with scripting and use the include directive
# to add it to the test matrix.
run: |
if [ '${{ needs.setup.outputs.frontend_only }}' == 'true' ]; then
echo 'matrix=["cypress_suite1","cypress_rest"]' >> "$GITHUB_OUTPUT"
elif [ '${{ needs.setup.outputs.ingestion_only }}' == 'true' ]; then
echo 'matrix=["no_cypress_suite0","no_cypress_suite1"]' >> "$GITHUB_OUTPUT"
elif [[ '${{ needs.setup.outputs.backend_change }}' == 'true' || '${{ needs.setup.outputs.smoke_test_change }}' == 'true' ]]; then
echo 'matrix=["no_cypress_suite0","no_cypress_suite1","cypress_suite1","cypress_rest"]' >> "$GITHUB_OUTPUT"
else
echo 'matrix=[]' >> "$GITHUB_OUTPUT"
python_batch_count=${{ steps.set-batch-count.outputs.python_batch_count }}
python_matrix=$(printf "{\"test_strategy\":\"pytests\",\"batch\":\"0\",\"batch_count\":\"$python_batch_count\"}"; for ((i=1;i<python_batch_count;i++)); do printf ",{\"test_strategy\":\"pytests\", \"batch_count\":\"$python_batch_count\",\"batch\":\"%d\"}" $i; done)
cypress_batch_count=${{ steps.set-batch-count.outputs.cypress_batch_count }}
cypress_matrix=$(printf "{\"test_strategy\":\"cypress\",\"batch\":\"0\",\"batch_count\":\"$cypress_batch_count\"}"; for ((i=1;i<cypress_batch_count;i++)); do printf ",{\"test_strategy\":\"cypress\", \"batch_count\":\"$cypress_batch_count\",\"batch\":\"%d\"}" $i; done)
includes=''
if [[ "${{ needs.setup.outputs.frontend_only }}" == 'true' ]]; then
includes=$cypress_matrix
elif [ "${{ needs.setup.outputs.ingestion_only }}" == 'true' ]; then
includes=$python_matrix
elif [[ "${{ needs.setup.outputs.backend_change }}" == 'true' || "${{ needs.setup.outputs.smoke_test_change }}" == 'true' ]]; then
includes="$python_matrix,$cypress_matrix"
fi
echo "matrix={\"include\":[$includes] }" >> "$GITHUB_OUTPUT"
smoke_test:
name: Run Smoke Tests
Expand All @@ -1042,8 +1063,7 @@ jobs:
]
strategy:
fail-fast: false
matrix:
test_strategy: ${{ fromJson(needs.smoke_test_matrix.outputs.matrix) }}
matrix: ${{ fromJson(needs.smoke_test_matrix.outputs.matrix) }}
if: ${{ always() && !failure() && !cancelled() && needs.smoke_test_matrix.outputs.matrix != '[]' }}
steps:
- name: Free up disk space
Expand Down Expand Up @@ -1219,6 +1239,8 @@ jobs:
CYPRESS_RECORD_KEY: ${{ secrets.CYPRESS_RECORD_KEY }}
CLEANUP_DATA: "false"
TEST_STRATEGY: ${{ matrix.test_strategy }}
BATCH_COUNT: ${{ matrix.batch_count }}
BATCH_NUMBER: ${{ matrix.batch }}
run: |
echo "$DATAHUB_VERSION"
./gradlew --stop
Expand All @@ -1229,25 +1251,25 @@ jobs:
if: failure()
run: |
docker ps -a
TEST_STRATEGY="-${{ matrix.test_strategy }}"
TEST_STRATEGY="-${{ matrix.test_strategy }}-${{ matrix.batch }}"
source .github/scripts/docker_logs.sh
- name: Upload logs
uses: actions/upload-artifact@v3
if: failure()
with:
name: docker-logs-${{ matrix.test_strategy }}
name: docker-logs-${{ matrix.test_strategy }}-${{ matrix.batch }}
path: "docker_logs/*.log"
retention-days: 5
- name: Upload screenshots
uses: actions/upload-artifact@v3
if: failure()
with:
name: cypress-snapshots-${{ matrix.test_strategy }}
name: cypress-snapshots-${{ matrix.test_strategy }}-${{ matrix.batch }}
path: smoke-test/tests/cypress/cypress/screenshots/
- uses: actions/upload-artifact@v3
if: always()
with:
name: Test Results (smoke tests) ${{ matrix.test_strategy }}
name: Test Results (smoke tests) ${{ matrix.test_strategy }} ${{ matrix.batch }}
path: |
**/build/reports/tests/test/**
**/build/test-results/test/**
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ buildscript {
ext.pegasusVersion = '29.57.0'
ext.mavenVersion = '3.6.3'
ext.versionGradle = '8.11.1'
ext.springVersion = '6.1.13'
ext.springVersion = '6.1.14'
ext.springBootVersion = '3.2.9'
ext.springKafkaVersion = '3.1.6'
ext.openTelemetryVersion = '1.18.0'
Expand Down
1 change: 1 addition & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
- #12077: `Kafka` source no longer ingests schemas from schema registry as separate entities by default, set `ingest_schemas_as_entities` to `true` to ingest them
- OpenAPI Update: PIT Keep Alive parameter added to scroll. NOTE: This parameter requires the `pointInTimeCreationEnabled` feature flag to be enabled and the `elasticSearch.implementation` configuration to be `elasticsearch`. This feature is not supported for OpenSearch at this time and the parameter will not be respected without both of these set.
- OpenAPI Update 2: Previously there was an incorrectly marked parameter named `sort` on the generic list entities endpoint for v3. This parameter is deprecated and only supports a single string value while the documentation indicates it supports a list of strings. This documentation error has been fixed and the correct field, `sortCriteria`, is now documented which supports a list of strings.
- #12223: For dbt Cloud ingestion, the "View in dbt" link will point at the "Explore" page in the dbt Cloud UI. You can revert to the old behavior of linking to the dbt Cloud IDE by setting `external_url_mode: ide".

### Breaking Changes

Expand Down
13 changes: 10 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from datetime import datetime
from json import JSONDecodeError
from typing import Dict, List, Optional, Tuple
from typing import Dict, List, Literal, Optional, Tuple
from urllib.parse import urlparse

import dateutil.parser
Expand Down Expand Up @@ -62,6 +62,11 @@ class DBTCloudConfig(DBTCommonConfig):
description="The ID of the run to ingest metadata from. If not specified, we'll default to the latest run.",
)

external_url_mode: Literal["explore", "ide"] = Field(
default="explore",
description='Where should the "View in dbt" link point to - either the "Explore" UI or the dbt Cloud IDE',
)

@root_validator(pre=True)
def set_metadata_endpoint(cls, values: dict) -> dict:
if values.get("access_url") and not values.get("metadata_endpoint"):
Expand Down Expand Up @@ -527,5 +532,7 @@ def _parse_into_dbt_column(
)

def get_external_url(self, node: DBTNode) -> Optional[str]:
# TODO: Once dbt Cloud supports deep linking to specific files, we can use that.
return f"{self.config.access_url}/develop/{self.config.account_id}/projects/{self.config.project_id}"
if self.config.external_url_mode == "explore":
return f"{self.config.access_url}/explore/{self.config.account_id}/projects/{self.config.project_id}/environments/production/details/{node.dbt_name}"
else:
return f"{self.config.access_url}/develop/{self.config.account_id}/projects/{self.config.project_id}"
Original file line number Diff line number Diff line change
Expand Up @@ -186,16 +186,16 @@ def resolve_includes(
f"traversal_path={traversal_path}, included_files = {included_files}, seen_so_far: {seen_so_far}"
)
if "*" not in inc and not included_files:
reporter.report_failure(
reporter.warning(
title="Error Resolving Include",
message=f"Cannot resolve include {inc}",
context=f"Path: {path}",
message="Cannot resolve included file",
context=f"Include: {inc}, path: {path}, traversal_path: {traversal_path}",
)
elif not included_files:
reporter.report_failure(
reporter.warning(
title="Error Resolving Include",
message=f"Did not resolve anything for wildcard include {inc}",
context=f"Path: {path}",
message="Did not find anything matching the wildcard include",
context=f"Include: {inc}, path: {path}, traversal_path: {traversal_path}",
)
# only load files that we haven't seen so far
included_files = [x for x in included_files if x not in seen_so_far]
Expand Down Expand Up @@ -231,9 +231,7 @@ def resolve_includes(
source_config,
reporter,
seen_so_far,
traversal_path=traversal_path
+ "."
+ pathlib.Path(included_file).stem,
traversal_path=f"{traversal_path} -> {pathlib.Path(included_file).stem}",
)
)
except Exception as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,14 @@ def __init__(
tenant_id: str,
metadata_api_timeout: int,
):
self.__access_token: Optional[str] = None
self.__access_token_expiry_time: Optional[datetime] = None
self.__tenant_id = tenant_id
self._access_token: Optional[str] = None
self._access_token_expiry_time: Optional[datetime] = None

self._tenant_id = tenant_id
# Test connection by generating access token
logger.info(f"Trying to connect to {self._get_authority_url()}")
# Power-Bi Auth (Service Principal Auth)
self.__msal_client = msal.ConfidentialClientApplication(
self._msal_client = msal.ConfidentialClientApplication(
client_id,
client_credential=client_secret,
authority=DataResolverBase.AUTHORITY + tenant_id,
Expand Down Expand Up @@ -168,18 +169,18 @@ def _get_app(
pass

def _get_authority_url(self):
return f"{DataResolverBase.AUTHORITY}{self.__tenant_id}"
return f"{DataResolverBase.AUTHORITY}{self._tenant_id}"

def get_authorization_header(self):
return {Constant.Authorization: self.get_access_token()}

def get_access_token(self):
if self.__access_token is not None and not self._is_access_token_expired():
return self.__access_token
def get_access_token(self) -> str:
if self._access_token is not None and not self._is_access_token_expired():
return self._access_token

logger.info("Generating PowerBi access token")

auth_response = self.__msal_client.acquire_token_for_client(
auth_response = self._msal_client.acquire_token_for_client(
scopes=[DataResolverBase.SCOPE]
)

Expand All @@ -193,24 +194,24 @@ def get_access_token(self):

logger.info("Generated PowerBi access token")

self.__access_token = "Bearer {}".format(
self._access_token = "Bearer {}".format(
auth_response.get(Constant.ACCESS_TOKEN)
)
safety_gap = 300
self.__access_token_expiry_time = datetime.now() + timedelta(
self._access_token_expiry_time = datetime.now() + timedelta(
seconds=(
max(auth_response.get(Constant.ACCESS_TOKEN_EXPIRY, 0) - safety_gap, 0)
)
)

logger.debug(f"{Constant.PBIAccessToken}={self.__access_token}")
logger.debug(f"{Constant.PBIAccessToken}={self._access_token}")

return self.__access_token
return self._access_token

def _is_access_token_expired(self) -> bool:
if not self.__access_token_expiry_time:
if not self._access_token_expiry_time:
return True
return self.__access_token_expiry_time < datetime.now()
return self._access_token_expiry_time < datetime.now()

def get_dashboards(self, workspace: Workspace) -> List[Dashboard]:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,20 @@ class SnowflakeIdentifierConfig(
description="Whether to convert dataset urns to lowercase.",
)


class SnowflakeUsageConfig(BaseUsageConfig):
email_domain: Optional[str] = pydantic.Field(
default=None,
description="Email domain of your organization so users can be displayed on UI appropriately.",
)

email_as_user_identifier: bool = Field(
default=True,
description="Format user urns as an email, if the snowflake user's email is set. If `email_domain` is "
"provided, generates email addresses for snowflake users with unset emails, based on their "
"username.",
)


class SnowflakeUsageConfig(BaseUsageConfig):
apply_view_usage_to_tables: bool = pydantic.Field(
default=False,
description="Whether to apply view's usage to its base tables. If set to True, usage is applied to base tables only.",
Expand Down Expand Up @@ -267,13 +275,6 @@ class SnowflakeV2Config(
" Map of share name -> details of share.",
)

email_as_user_identifier: bool = Field(
default=True,
description="Format user urns as an email, if the snowflake user's email is set. If `email_domain` is "
"provided, generates email addresses for snowflake users with unset emails, based on their "
"username.",
)

include_assertion_results: bool = Field(
default=False,
description="Whether to ingest assertion run results for assertions created using Datahub"
Expand Down
Loading

0 comments on commit 421bd46

Please sign in to comment.