Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Nov 14, 2023
2 parents a99cc49 + ec13847 commit d6b848a
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 20 deletions.
22 changes: 20 additions & 2 deletions docs/deploy/telemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,25 @@

To effectively build and maintain the DataHub Project, we must understand how end-users work within DataHub. Beginning in version 0.8.35, DataHub collects anonymous usage statistics and errors to inform our roadmap priorities and to enable us to proactively address errors.

Deployments are assigned a UUID which is sent along with event details, Java version, OS, and timestamp; telemetry collection is enabled by default and can be disabled by setting `DATAHUB_TELEMETRY_ENABLED=false` in your Docker Compose config.
Both the DataHub backend and the ingestion framework collect telemetry.

## DataHub Backend Telemetry

The source code is available [here.](../../metadata-service/factories/src/main/java/com/linkedin/gms/factory/telemetry/TelemetryUtils.java)
Deployments are assigned a UUID which is sent along with event details, Java version, OS, and timestamp.
The source code is available [here](../../metadata-service/factories/src/main/java/com/linkedin/gms/factory/telemetry/TelemetryUtils.java).

## Ingestion Framework Telemetry

The ingestion framework collects telemetry including CLI invocations, source/sink types, error types, versions, and timestamps. If you run with `datahub --debug`, all telemetry calls will be logged.

On first invocation, the CLI will generate a randomized UUID, which will be sent alongside every telemetry event. This config is stored in `~/.datahub/telemetry-config.json`.

The source code is available [here](../../metadata-ingestion/src/datahub/telemetry/telemetry.py).

## Disabling Telemetry

Telemetry is enabled by default. While we are careful to anonymize all telemetry data and encourage users to keep it enabled so that we can improve DataHub, we understand that some users may wish to disable it.

You can disable backend telemetry by setting the `DATAHUB_TELEMETRY_ENABLED` environment variable to `false`. You'll need to set this on both the datahub-gms and datahub-actions containers.

If you're using the DataHub CLI, ingestion framework telemetry will be disabled when the `DATAHUB_TELEMETRY_ENABLED` environment variable is set to `false`. To persist this change for your machine, run `datahub telemetry disable`.
1 change: 1 addition & 0 deletions docs/lineage/airflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ In order to use this example, you must first configure the Datahub hook. Like in
If you're not seeing lineage in DataHub, check the following:

- Validate that the plugin is loaded in Airflow. Go to Admin -> Plugins and check that the DataHub plugin is listed.
- With the v2 plugin, it should also print a log line like `INFO [datahub_airflow_plugin.datahub_listener] DataHub plugin v2 using DataHubRestEmitter: configured to talk to <datahub_url>` during Airflow startup, and the `airflow plugins` command should list `datahub_plugin` with a listener enabled.
- If using the v2 plugin's automatic lineage, ensure that the `enable_extractors` config is set to true and that automatic lineage is supported for your operator.
- If using manual lineage annotation, ensure that you're using the `datahub_airflow_plugin.entities.Dataset` or `datahub_airflow_plugin.entities.Urn` classes for your inlets and outlets.

Expand Down
2 changes: 1 addition & 1 deletion metadata-events/mxe-schemas/rename-namespace.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/sh
#!/bin/bash

SCRIPT_ROOT="$( cd "$( dirname "${BASH_SOURCE[0]:-$0}" )" >/dev/null && pwd )"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import copy
import functools
import logging
import os
import threading
from typing import TYPE_CHECKING, Callable, Dict, List, Optional, TypeVar, cast

Expand Down Expand Up @@ -55,7 +56,10 @@ def hookimpl(f: _F) -> _F: # type: ignore[misc] # noqa: F811

_airflow_listener_initialized = False
_airflow_listener: Optional["DataHubListener"] = None
_RUN_IN_THREAD = True
_RUN_IN_THREAD = os.getenv("DATAHUB_AIRFLOW_PLUGIN_RUN_IN_THREAD", "true").lower() in (
"true",
"1",
)
_RUN_IN_THREAD_TIMEOUT = 30


Expand Down Expand Up @@ -133,7 +137,7 @@ def __init__(self, config: DatahubLineageConfig):

self._emitter = config.make_emitter_hook().make_emitter()
self._graph: Optional[DataHubGraph] = None
logger.info(f"DataHub plugin using {repr(self._emitter)}")
logger.info(f"DataHub plugin v2 using {repr(self._emitter)}")

# See discussion here https://github.com/OpenLineage/OpenLineage/pull/508 for
# why we need to keep track of tasks ourselves.
Expand Down
11 changes: 3 additions & 8 deletions metadata-ingestion/scripts/docgen.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@
import sys
import textwrap
from importlib.metadata import metadata, requires
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
from typing import Any, Dict, Iterable, List, Optional

import click
from pydantic import BaseModel, Field
from pydantic.dataclasses import dataclass

from datahub.configuration.common import ConfigModel
from datahub.ingestion.api.decorators import (
Expand Down Expand Up @@ -94,7 +93,6 @@ class Component(BaseModel):

@staticmethod
def map_field_path_to_components(field_path: str) -> List[Component]:

m = re.match(FieldRow._V2_FIELD_PATH_TOKEN_MATCHER_PREFIX, field_path)
v = re.match(FieldRow._V2_FIELD_PATH_FIELD_NAME_MATCHER, field_path)
components: List[FieldRow.Component] = []
Expand Down Expand Up @@ -197,7 +195,7 @@ def get_checkbox(self) -> str:
# Using a non-breaking space to prevent the checkbox from being
# broken into a new line.
if not self.parent: # None and empty string both count
return f'&nbsp;<abbr title="Required">✅</abbr>'
return '&nbsp;<abbr title="Required">✅</abbr>'
else:
return f'&nbsp;<abbr title="Required if {self.parent} is set">❓</abbr>'
else:
Expand Down Expand Up @@ -356,7 +354,6 @@ def priority_value(path: str) -> str:


def gen_md_table_from_struct(schema_dict: Dict[str, Any]) -> List[str]:

from datahub.ingestion.extractor.json_schema_util import JsonSchemaTranslator

# we don't want default field values to be injected into the description of the field
Expand Down Expand Up @@ -460,7 +457,6 @@ def get_additional_deps_for_extra(extra_name: str) -> List[str]:


def relocate_path(orig_path: str, relative_path: str, relocated_path: str) -> str:

newPath = os.path.join(os.path.dirname(orig_path), relative_path)
assert os.path.exists(newPath)

Expand Down Expand Up @@ -515,7 +511,6 @@ def generate(

if extra_docs:
for path in glob.glob(f"{extra_docs}/**/*[.md|.yaml|.yml]", recursive=True):

m = re.search("/docs/sources/(.*)/(.*).md", path)
if m:
platform_name = m.group(1).lower()
Expand Down Expand Up @@ -741,7 +736,7 @@ def generate(
i += 1
f.write(f"---\nsidebar_position: {i}\n---\n\n")
f.write(
f"import Tabs from '@theme/Tabs';\nimport TabItem from '@theme/TabItem';\n\n"
"import Tabs from '@theme/Tabs';\nimport TabItem from '@theme/TabItem';\n\n"
)
f.write(f"# {platform_docs['name']}\n")

Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/cli/docker_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ def quickstart( # noqa: C901
logger.debug("docker compose up timed out, sending SIGTERM")
up_process.terminate()
try:
up_process.wait(timeout=3)
up_process.wait(timeout=8)
except subprocess.TimeoutExpired:
logger.debug("docker compose up still running, sending SIGKILL")
up_process.kill()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,7 @@ def from_api( # noqa: C901
)
else:
logger.warning(
f"Failed to extract explore {explore_name} from model {model}.", e
f"Failed to extract explore {explore_name} from model {model}: {e}"
)

except AssertionError:
Expand Down
2 changes: 2 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ def construct_dashboard(
if creator is not None:
modified_actor = builder.make_user_urn(creator)
if report_info.get("last_saved_at") is None:
# Sometimes mode returns null for last_saved_at.
# In that case, we use the created_at timestamp instead.
report_info["last_saved_at"] = report_info.get("created_at")

modified_ts = int(
Expand Down
38 changes: 33 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/mongodb.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, Iterable, List, Optional, Tuple, Type, Union, ValuesView

import bson.timestamp
Expand Down Expand Up @@ -74,6 +75,12 @@
DENY_DATABASE_LIST = set(["admin", "config", "local"])


class HostingEnvironment(Enum):
SELF_HOSTED = "SELF_HOSTED"
ATLAS = "ATLAS"
AWS_DOCUMENTDB = "AWS_DOCUMENTDB"


class MongoDBConfig(
PlatformInstanceConfigMixin, EnvConfigMixin, StatefulIngestionConfigBase
):
Expand Down Expand Up @@ -108,6 +115,11 @@ class MongoDBConfig(
# errors out with "16793600" as the maximum size supported.
maxDocumentSize: Optional[PositiveInt] = Field(default=16793600, description="")

hostingEnvironment: Optional[HostingEnvironment] = Field(
default=HostingEnvironment.SELF_HOSTED,
description="Hosting environment of MongoDB, default is SELF_HOSTED, currently support `SELF_HOSTED`, `ATLAS`, `AWS_DOCUMENTDB`",
)

database_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="regex patterns for databases to filter in ingestion.",
Expand Down Expand Up @@ -176,7 +188,7 @@ def construct_schema_pymongo(
delimiter: str,
use_random_sampling: bool,
max_document_size: int,
is_version_gte_4_4: bool,
should_add_document_size_filter: bool,
sample_size: Optional[int] = None,
) -> Dict[Tuple[str, ...], SchemaDescription]:
"""
Expand All @@ -191,15 +203,19 @@ def construct_schema_pymongo(
the PyMongo collection
delimiter:
string to concatenate field names by
use_random_sampling:
boolean to indicate if random sampling should be added to aggregation
max_document_size:
maximum size of the document that will be considered for generating the schema.
should_add_document_size_filter:
boolean to indicate if document size filter should be added to aggregation
sample_size:
number of items in the collection to sample
(reads entire collection if not provided)
max_document_size:
maximum size of the document that will be considered for generating the schema.
"""

aggregations: List[Dict] = []
if is_version_gte_4_4:
if should_add_document_size_filter:
doc_size_field = "temporary_doc_size_field"
# create a temporary field to store the size of the document. filter on it and then remove it.
aggregations = [
Expand Down Expand Up @@ -381,7 +397,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
delimiter=".",
use_random_sampling=self.config.useRandomSampling,
max_document_size=self.config.maxDocumentSize,
is_version_gte_4_4=self.is_server_version_gte_4_4(),
should_add_document_size_filter=self.should_add_document_size_filter(),
sample_size=self.config.schemaSamplingSize,
)

Expand Down Expand Up @@ -475,6 +491,18 @@ def is_server_version_gte_4_4(self) -> bool:

return False

def is_hosted_on_aws_documentdb(self) -> bool:
return self.config.hostingEnvironment == HostingEnvironment.AWS_DOCUMENTDB

def should_add_document_size_filter(self) -> bool:
# the operation $bsonsize is only available in server version greater than 4.4
# and is not supported by AWS DocumentDB, we should only add this operation to
# aggregation for mongodb that doesn't run on AWS DocumentDB and version is greater than 4.4
# https://docs.aws.amazon.com/documentdb/latest/developerguide/mongo-apis.html
return (
self.is_server_version_gte_4_4() and not self.is_hosted_on_aws_documentdb()
)

def get_report(self) -> MongoDBSourceReport:
return self.report

Expand Down

0 comments on commit d6b848a

Please sign in to comment.