From 906a5b91a8adbd3165c01607e9301c84b886106f Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 13 Nov 2023 21:51:11 -0500 Subject: [PATCH 1/4] chore(ingest): cleanup various methods (#9221) --- metadata-events/mxe-schemas/rename-namespace.sh | 2 +- metadata-ingestion/scripts/docgen.py | 11 +++-------- .../datahub/ingestion/source/looker/looker_common.py | 2 +- .../src/datahub/ingestion/source/mode.py | 2 ++ 4 files changed, 7 insertions(+), 10 deletions(-) diff --git a/metadata-events/mxe-schemas/rename-namespace.sh b/metadata-events/mxe-schemas/rename-namespace.sh index 6402e09b65c07..ef04868a6bd15 100755 --- a/metadata-events/mxe-schemas/rename-namespace.sh +++ b/metadata-events/mxe-schemas/rename-namespace.sh @@ -1,4 +1,4 @@ -#!/bin/sh +#!/bin/bash SCRIPT_ROOT="$( cd "$( dirname "${BASH_SOURCE[0]:-$0}" )" >/dev/null && pwd )" diff --git a/metadata-ingestion/scripts/docgen.py b/metadata-ingestion/scripts/docgen.py index 1a4db09e961ce..3e4595650d46a 100644 --- a/metadata-ingestion/scripts/docgen.py +++ b/metadata-ingestion/scripts/docgen.py @@ -7,11 +7,10 @@ import sys import textwrap from importlib.metadata import metadata, requires -from typing import Any, Dict, Iterable, List, Optional, Tuple, Union +from typing import Any, Dict, Iterable, List, Optional import click from pydantic import BaseModel, Field -from pydantic.dataclasses import dataclass from datahub.configuration.common import ConfigModel from datahub.ingestion.api.decorators import ( @@ -94,7 +93,6 @@ class Component(BaseModel): @staticmethod def map_field_path_to_components(field_path: str) -> List[Component]: - m = re.match(FieldRow._V2_FIELD_PATH_TOKEN_MATCHER_PREFIX, field_path) v = re.match(FieldRow._V2_FIELD_PATH_FIELD_NAME_MATCHER, field_path) components: List[FieldRow.Component] = [] @@ -197,7 +195,7 @@ def get_checkbox(self) -> str: # Using a non-breaking space to prevent the checkbox from being # broken into a new line. if not self.parent: # None and empty string both count - return f' ' + return ' ' else: return f' ' else: @@ -356,7 +354,6 @@ def priority_value(path: str) -> str: def gen_md_table_from_struct(schema_dict: Dict[str, Any]) -> List[str]: - from datahub.ingestion.extractor.json_schema_util import JsonSchemaTranslator # we don't want default field values to be injected into the description of the field @@ -460,7 +457,6 @@ def get_additional_deps_for_extra(extra_name: str) -> List[str]: def relocate_path(orig_path: str, relative_path: str, relocated_path: str) -> str: - newPath = os.path.join(os.path.dirname(orig_path), relative_path) assert os.path.exists(newPath) @@ -515,7 +511,6 @@ def generate( if extra_docs: for path in glob.glob(f"{extra_docs}/**/*[.md|.yaml|.yml]", recursive=True): - m = re.search("/docs/sources/(.*)/(.*).md", path) if m: platform_name = m.group(1).lower() @@ -741,7 +736,7 @@ def generate( i += 1 f.write(f"---\nsidebar_position: {i}\n---\n\n") f.write( - f"import Tabs from '@theme/Tabs';\nimport TabItem from '@theme/TabItem';\n\n" + "import Tabs from '@theme/Tabs';\nimport TabItem from '@theme/TabItem';\n\n" ) f.write(f"# {platform_docs['name']}\n") diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py index 7ca5ce49019ab..e440750cba0d0 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py @@ -828,7 +828,7 @@ def from_api( # noqa: C901 ) else: logger.warning( - f"Failed to extract explore {explore_name} from model {model}.", e + f"Failed to extract explore {explore_name} from model {model}: {e}" ) except AssertionError: diff --git a/metadata-ingestion/src/datahub/ingestion/source/mode.py b/metadata-ingestion/src/datahub/ingestion/source/mode.py index c46b56da422d9..e4ea3b2ed099f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/mode.py +++ b/metadata-ingestion/src/datahub/ingestion/source/mode.py @@ -218,6 +218,8 @@ def construct_dashboard( if creator is not None: modified_actor = builder.make_user_urn(creator) if report_info.get("last_saved_at") is None: + # Sometimes mode returns null for last_saved_at. + # In that case, we use the created_at timestamp instead. report_info["last_saved_at"] = report_info.get("created_at") modified_ts = int( From f1b6aa782277e3611eeb64edf6c123598de109df Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 13 Nov 2023 21:51:27 -0500 Subject: [PATCH 2/4] docs: clarify how to disable telemetry (#9236) --- docs/deploy/telemetry.md | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/docs/deploy/telemetry.md b/docs/deploy/telemetry.md index c5458cc5df05e..a1b25337de767 100644 --- a/docs/deploy/telemetry.md +++ b/docs/deploy/telemetry.md @@ -4,7 +4,25 @@ To effectively build and maintain the DataHub Project, we must understand how end-users work within DataHub. Beginning in version 0.8.35, DataHub collects anonymous usage statistics and errors to inform our roadmap priorities and to enable us to proactively address errors. -Deployments are assigned a UUID which is sent along with event details, Java version, OS, and timestamp; telemetry collection is enabled by default and can be disabled by setting `DATAHUB_TELEMETRY_ENABLED=false` in your Docker Compose config. +Both the DataHub backend and the ingestion framework collect telemetry. +## DataHub Backend Telemetry -The source code is available [here.](../../metadata-service/factories/src/main/java/com/linkedin/gms/factory/telemetry/TelemetryUtils.java) \ No newline at end of file +Deployments are assigned a UUID which is sent along with event details, Java version, OS, and timestamp. +The source code is available [here](../../metadata-service/factories/src/main/java/com/linkedin/gms/factory/telemetry/TelemetryUtils.java). + +## Ingestion Framework Telemetry + +The ingestion framework collects telemetry including CLI invocations, source/sink types, error types, versions, and timestamps. If you run with `datahub --debug`, all telemetry calls will be logged. + +On first invocation, the CLI will generate a randomized UUID, which will be sent alongside every telemetry event. This config is stored in `~/.datahub/telemetry-config.json`. + +The source code is available [here](../../metadata-ingestion/src/datahub/telemetry/telemetry.py). + +## Disabling Telemetry + +Telemetry is enabled by default. While we are careful to anonymize all telemetry data and encourage users to keep it enabled so that we can improve DataHub, we understand that some users may wish to disable it. + +You can disable backend telemetry by setting the `DATAHUB_TELEMETRY_ENABLED` environment variable to `false`. You'll need to set this on both the datahub-gms and datahub-actions containers. + +If you're using the DataHub CLI, ingestion framework telemetry will be disabled when the `DATAHUB_TELEMETRY_ENABLED` environment variable is set to `false`. To persist this change for your machine, run `datahub telemetry disable`. From cfeecd799dd58793e12deb941cb51d590f7a6f4f Mon Sep 17 00:00:00 2001 From: Tony Ouyang Date: Tue, 14 Nov 2023 09:12:39 -0800 Subject: [PATCH 3/4] feat(ingest/mongodb): support AWS DocumentDB for MongoDB (#9201) --- .../src/datahub/ingestion/source/mongodb.py | 38 ++++++++++++++++--- 1 file changed, 33 insertions(+), 5 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/mongodb.py b/metadata-ingestion/src/datahub/ingestion/source/mongodb.py index ce2b9ce2981e0..2aa8b1d37d477 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/mongodb.py +++ b/metadata-ingestion/src/datahub/ingestion/source/mongodb.py @@ -1,5 +1,6 @@ import logging from dataclasses import dataclass, field +from enum import Enum from typing import Dict, Iterable, List, Optional, Tuple, Type, Union, ValuesView import bson.timestamp @@ -74,6 +75,12 @@ DENY_DATABASE_LIST = set(["admin", "config", "local"]) +class HostingEnvironment(Enum): + SELF_HOSTED = "SELF_HOSTED" + ATLAS = "ATLAS" + AWS_DOCUMENTDB = "AWS_DOCUMENTDB" + + class MongoDBConfig( PlatformInstanceConfigMixin, EnvConfigMixin, StatefulIngestionConfigBase ): @@ -108,6 +115,11 @@ class MongoDBConfig( # errors out with "16793600" as the maximum size supported. maxDocumentSize: Optional[PositiveInt] = Field(default=16793600, description="") + hostingEnvironment: Optional[HostingEnvironment] = Field( + default=HostingEnvironment.SELF_HOSTED, + description="Hosting environment of MongoDB, default is SELF_HOSTED, currently support `SELF_HOSTED`, `ATLAS`, `AWS_DOCUMENTDB`", + ) + database_pattern: AllowDenyPattern = Field( default=AllowDenyPattern.allow_all(), description="regex patterns for databases to filter in ingestion.", @@ -176,7 +188,7 @@ def construct_schema_pymongo( delimiter: str, use_random_sampling: bool, max_document_size: int, - is_version_gte_4_4: bool, + should_add_document_size_filter: bool, sample_size: Optional[int] = None, ) -> Dict[Tuple[str, ...], SchemaDescription]: """ @@ -191,15 +203,19 @@ def construct_schema_pymongo( the PyMongo collection delimiter: string to concatenate field names by + use_random_sampling: + boolean to indicate if random sampling should be added to aggregation + max_document_size: + maximum size of the document that will be considered for generating the schema. + should_add_document_size_filter: + boolean to indicate if document size filter should be added to aggregation sample_size: number of items in the collection to sample (reads entire collection if not provided) - max_document_size: - maximum size of the document that will be considered for generating the schema. """ aggregations: List[Dict] = [] - if is_version_gte_4_4: + if should_add_document_size_filter: doc_size_field = "temporary_doc_size_field" # create a temporary field to store the size of the document. filter on it and then remove it. aggregations = [ @@ -381,7 +397,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: delimiter=".", use_random_sampling=self.config.useRandomSampling, max_document_size=self.config.maxDocumentSize, - is_version_gte_4_4=self.is_server_version_gte_4_4(), + should_add_document_size_filter=self.should_add_document_size_filter(), sample_size=self.config.schemaSamplingSize, ) @@ -475,6 +491,18 @@ def is_server_version_gte_4_4(self) -> bool: return False + def is_hosted_on_aws_documentdb(self) -> bool: + return self.config.hostingEnvironment == HostingEnvironment.AWS_DOCUMENTDB + + def should_add_document_size_filter(self) -> bool: + # the operation $bsonsize is only available in server version greater than 4.4 + # and is not supported by AWS DocumentDB, we should only add this operation to + # aggregation for mongodb that doesn't run on AWS DocumentDB and version is greater than 4.4 + # https://docs.aws.amazon.com/documentdb/latest/developerguide/mongo-apis.html + return ( + self.is_server_version_gte_4_4() and not self.is_hosted_on_aws_documentdb() + ) + def get_report(self) -> MongoDBSourceReport: return self.report From ec13847f54fb167571359bb233489b8b353bad02 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 14 Nov 2023 14:25:26 -0500 Subject: [PATCH 4/4] feat(airflow): make RUN_IN_THREAD configurable (#9226) --- docs/lineage/airflow.md | 1 + .../src/datahub_airflow_plugin/datahub_listener.py | 8 ++++++-- metadata-ingestion/src/datahub/cli/docker_cli.py | 2 +- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/docs/lineage/airflow.md b/docs/lineage/airflow.md index 19ed1598d4c5a..3a13aefa834a4 100644 --- a/docs/lineage/airflow.md +++ b/docs/lineage/airflow.md @@ -193,6 +193,7 @@ In order to use this example, you must first configure the Datahub hook. Like in If you're not seeing lineage in DataHub, check the following: - Validate that the plugin is loaded in Airflow. Go to Admin -> Plugins and check that the DataHub plugin is listed. +- With the v2 plugin, it should also print a log line like `INFO [datahub_airflow_plugin.datahub_listener] DataHub plugin v2 using DataHubRestEmitter: configured to talk to ` during Airflow startup, and the `airflow plugins` command should list `datahub_plugin` with a listener enabled. - If using the v2 plugin's automatic lineage, ensure that the `enable_extractors` config is set to true and that automatic lineage is supported for your operator. - If using manual lineage annotation, ensure that you're using the `datahub_airflow_plugin.entities.Dataset` or `datahub_airflow_plugin.entities.Urn` classes for your inlets and outlets. diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py index d00b10bbe1756..c39eef2635658 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py @@ -1,6 +1,7 @@ import copy import functools import logging +import os import threading from typing import TYPE_CHECKING, Callable, Dict, List, Optional, TypeVar, cast @@ -55,7 +56,10 @@ def hookimpl(f: _F) -> _F: # type: ignore[misc] # noqa: F811 _airflow_listener_initialized = False _airflow_listener: Optional["DataHubListener"] = None -_RUN_IN_THREAD = True +_RUN_IN_THREAD = os.getenv("DATAHUB_AIRFLOW_PLUGIN_RUN_IN_THREAD", "true").lower() in ( + "true", + "1", +) _RUN_IN_THREAD_TIMEOUT = 30 @@ -133,7 +137,7 @@ def __init__(self, config: DatahubLineageConfig): self._emitter = config.make_emitter_hook().make_emitter() self._graph: Optional[DataHubGraph] = None - logger.info(f"DataHub plugin using {repr(self._emitter)}") + logger.info(f"DataHub plugin v2 using {repr(self._emitter)}") # See discussion here https://github.com/OpenLineage/OpenLineage/pull/508 for # why we need to keep track of tasks ourselves. diff --git a/metadata-ingestion/src/datahub/cli/docker_cli.py b/metadata-ingestion/src/datahub/cli/docker_cli.py index 77e3285d359ef..08f3faae8abb2 100644 --- a/metadata-ingestion/src/datahub/cli/docker_cli.py +++ b/metadata-ingestion/src/datahub/cli/docker_cli.py @@ -766,7 +766,7 @@ def quickstart( # noqa: C901 logger.debug("docker compose up timed out, sending SIGTERM") up_process.terminate() try: - up_process.wait(timeout=3) + up_process.wait(timeout=8) except subprocess.TimeoutExpired: logger.debug("docker compose up still running, sending SIGKILL") up_process.kill()