From 01b3461d990de30e903320c25a2411c0e533a710 Mon Sep 17 00:00:00 2001 From: Tamas Nemeth Date: Fri, 26 Jul 2024 13:50:34 +0200 Subject: [PATCH 1/4] fix(protobuf/build): Fix protobuf check jar script (#11006) --- .../java/acryl-spark-lineage/README.md | 13 ++++++++----- .../java/datahub/spark/DatahubSparkListener.java | 1 - .../java/datahub-protobuf/scripts/check_jar.sh | 4 +++- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/metadata-integration/java/acryl-spark-lineage/README.md b/metadata-integration/java/acryl-spark-lineage/README.md index 81108aa7b914d4..9caa5a6dec65db 100644 --- a/metadata-integration/java/acryl-spark-lineage/README.md +++ b/metadata-integration/java/acryl-spark-lineage/README.md @@ -24,7 +24,7 @@ When running jobs using spark-submit, the agent needs to be configured in the co ```text #Configuring DataHub spark agent jar -spark.jars.packages io.acryl:acryl-spark-lineage:0.2.15 +spark.jars.packages io.acryl:acryl-spark-lineage:0.2.16 spark.extraListeners datahub.spark.DatahubSparkListener spark.datahub.rest.server http://localhost:8080 ``` @@ -32,7 +32,7 @@ spark.datahub.rest.server http://localhost:8080 ## spark-submit command line ```sh -spark-submit --packages io.acryl:acryl-spark-lineage:0.2.15 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py +spark-submit --packages io.acryl:acryl-spark-lineage:0.2.16 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py ``` ### Configuration Instructions: Amazon EMR @@ -41,7 +41,7 @@ Set the following spark-defaults configuration properties as it stated [here](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html) ```text -spark.jars.packages io.acryl:acryl-spark-lineage:0.2.15 +spark.jars.packages io.acryl:acryl-spark-lineage:0.2.16 spark.extraListeners datahub.spark.DatahubSparkListener spark.datahub.rest.server https://your_datahub_host/gms #If you have authentication set up then you also need to specify the Datahub access token @@ -56,7 +56,7 @@ When running interactive jobs from a notebook, the listener can be configured wh spark = SparkSession.builder .master("spark://spark-master:7077") .appName("test-application") -.config("spark.jars.packages", "io.acryl:acryl-spark-lineage:0.2.15") +.config("spark.jars.packages", "io.acryl:acryl-spark-lineage:0.2.16") .config("spark.extraListeners", "datahub.spark.DatahubSparkListener") .config("spark.datahub.rest.server", "http://localhost:8080") .enableHiveSupport() @@ -79,7 +79,7 @@ appName("test-application") config("spark.master","spark://spark-master:7077") . -config("spark.jars.packages","io.acryl:acryl-spark-lineage:0.2.13") +config("spark.jars.packages","io.acryl:acryl-spark-lineage:0.2.16") . config("spark.extraListeners","datahub.spark.DatahubSparkListener") @@ -356,6 +356,9 @@ Use Java 8 to build the project. The project uses Gradle as the build tool. To b + ## Changelog +### Version 0.2.16 +- Remove logging DataHub config into logs + ### Version 0.2.15 - Add Kafka emitter to emit lineage to kafka - Add File emitter to emit lineage to file diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java index d64e159482c1be..52507a682a1f8b 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java @@ -257,7 +257,6 @@ private synchronized SparkLineageConf loadDatahubConfig( this.appContext.setDatabricksTags(databricksTags.orElse(null)); } - log.info("Datahub configuration: {}", datahubConf.root().render()); Optional emitterConfig = initializeEmitter(datahubConf); SparkLineageConf sparkLineageConf = SparkLineageConf.toSparkLineageConf(datahubConf, appContext, emitterConfig.orElse(null)); diff --git a/metadata-integration/java/datahub-protobuf/scripts/check_jar.sh b/metadata-integration/java/datahub-protobuf/scripts/check_jar.sh index e3aa181c588019..fe3dd8d18f6999 100755 --- a/metadata-integration/java/datahub-protobuf/scripts/check_jar.sh +++ b/metadata-integration/java/datahub-protobuf/scripts/check_jar.sh @@ -39,7 +39,9 @@ jar -tvf $jarFile |\ grep -v "darwin" |\ grep -v "MetadataChangeProposal.avsc" |\ grep -v "aix" |\ - grep -v "com/sun/" + grep -v "com/sun/" |\ + grep -v "VersionInfo.java" |\ + grep -v "mime.types" if [ $? -ne 0 ]; then echo "✅ No unexpected class paths found in ${jarFile}" From 0274c7029209d1d075450c0977e82dfb58767c63 Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Fri, 26 Jul 2024 11:10:13 -0700 Subject: [PATCH 2/4] fix(ui/ingest): Support invalid cron jobs (#10998) --- .../src/app/context/UserContextProvider.tsx | 1 + datahub-web-react/src/app/context/userContext.tsx | 2 ++ .../src/app/ingest/ManageIngestionPage.tsx | 14 ++++++++++---- .../ingest/source/IngestionSourceTableColumns.tsx | 8 +++++++- .../cypress/e2e/mutations/managing_secrets.js | 1 + 5 files changed, 21 insertions(+), 5 deletions(-) diff --git a/datahub-web-react/src/app/context/UserContextProvider.tsx b/datahub-web-react/src/app/context/UserContextProvider.tsx index 3bcff15cc27485..66593d346f3df4 100644 --- a/datahub-web-react/src/app/context/UserContextProvider.tsx +++ b/datahub-web-react/src/app/context/UserContextProvider.tsx @@ -127,6 +127,7 @@ const UserContextProvider = ({ children }: { children: React.ReactNode }) => { return ( { * Determines which view should be visible: ingestion sources or secrets. */ const me = useUserContext(); - const { config } = useAppConfig(); + const { config, loaded } = useAppConfig(); const isIngestionEnabled = config?.managedIngestionConfig.enabled; const showIngestionTab = isIngestionEnabled && me && me.platformPrivileges?.manageIngestion; const showSecretsTab = isIngestionEnabled && me && me.platformPrivileges?.manageSecrets; - const defaultTab = showIngestionTab ? TabType.Sources : TabType.Secrets; - const [selectedTab, setSelectedTab] = useState(defaultTab); + const [selectedTab, setSelectedTab] = useState(TabType.Sources); + + // defaultTab might not be calculated correctly on mount, if `config` or `me` haven't been loaded yet + useEffect(() => { + if (loaded && me.loaded && !showIngestionTab && selectedTab === TabType.Sources) { + setSelectedTab(TabType.Secrets); + } + }, [loaded, me.loaded, showIngestionTab, selectedTab]); const onClickTab = (newTab: string) => { setSelectedTab(TabType[newTab]); diff --git a/datahub-web-react/src/app/ingest/source/IngestionSourceTableColumns.tsx b/datahub-web-react/src/app/ingest/source/IngestionSourceTableColumns.tsx index 155e75f1895f53..4b7fb472226172 100644 --- a/datahub-web-react/src/app/ingest/source/IngestionSourceTableColumns.tsx +++ b/datahub-web-react/src/app/ingest/source/IngestionSourceTableColumns.tsx @@ -106,7 +106,13 @@ export function LastExecutionColumn(time: any) { } export function ScheduleColumn(schedule: any, record: any) { - const tooltip = schedule && `Runs ${cronstrue.toString(schedule).toLowerCase()} (${record.timezone})`; + let tooltip: string; + try { + tooltip = schedule && `Runs ${cronstrue.toString(schedule).toLowerCase()} (${record.timezone})`; + } catch (e) { + tooltip = 'Invalid cron schedule'; + console.debug('Error parsing cron schedule', e); + } return ( {schedule || 'None'} diff --git a/smoke-test/tests/cypress/cypress/e2e/mutations/managing_secrets.js b/smoke-test/tests/cypress/cypress/e2e/mutations/managing_secrets.js index 6953fe04940520..dd331fbcbd5ae2 100644 --- a/smoke-test/tests/cypress/cypress/e2e/mutations/managing_secrets.js +++ b/smoke-test/tests/cypress/cypress/e2e/mutations/managing_secrets.js @@ -11,6 +11,7 @@ describe("managing secrets for ingestion creation", () => { // Navigate to the manage ingestion page → secrets cy.loginWithCredentials(); cy.goToIngestionPage(); + cy.clickOptionWithText("Secrets"); // Create a new secret cy.clickOptionWithTestId("create-secret-button"); From f816a14a98870a656d7c125b2bfebf919822290e Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Fri, 26 Jul 2024 11:15:46 -0700 Subject: [PATCH 3/4] fix(ingest): fix graph config loading (#11002) Co-authored-by: Pedro Silva --- docs/how/updating-datahub.md | 3 +- .../src/datahub/cli/cli_utils.py | 27 +---- .../src/datahub/cli/config_utils.py | 114 ++++++++++++++++-- .../src/datahub/cli/lite_cli.py | 12 +- .../src/datahub/emitter/rest_emitter.py | 14 ++- metadata-ingestion/src/datahub/entrypoints.py | 10 +- .../src/datahub/ingestion/graph/client.py | 109 +---------------- .../src/datahub/ingestion/graph/config.py | 19 +++ .../src/datahub/ingestion/run/pipeline.py | 3 + .../datahub/ingestion/run/pipeline_config.py | 34 +----- .../src/datahub/upgrade/upgrade.py | 3 +- .../tests/unit/test_cli_utils.py | 12 +- .../tests/unit/test_pipeline.py | 49 +++++++- smoke-test/smoke.sh | 2 + smoke-test/tests/utils.py | 2 +- 15 files changed, 218 insertions(+), 195 deletions(-) create mode 100644 metadata-ingestion/src/datahub/ingestion/graph/config.py diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index a9c24849544a3c..9618aff78075cf 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -66,6 +66,8 @@ New (optional fields `systemMetadata` and `headers`): ### Other Notable Changes - #10498 - Tableau ingestion can now be configured to ingest multiple sites at once and add the sites as containers. The feature is currently only available for Tableau Server. +- #10466 - Extends configuration in `~/.datahubenv` to match `DatahubClientConfig` object definition. See full configuration in https://datahubproject.io/docs/python-sdk/clients/. The CLI should now respect the updated configurations specified in `~/.datahubenv` across its functions and utilities. This means that for systems where ssl certification is disabled, setting `disable_ssl_verification: true` in `~./datahubenv` will apply to all CLI calls. +- #11002 - We will not auto-generate a `~/.datahubenv` file. You must either run `datahub init` to create that file, or set environment variables so that the config is loaded. ## 0.13.3 @@ -80,7 +82,6 @@ New (optional fields `systemMetadata` and `headers`): ### Deprecations ### Other Notable Change -- #10466 - Extends configuration in `~/.datahubenv` to match `DatahubClientConfig` object definition. See full configuration in https://datahubproject.io/docs/python-sdk/clients/. The CLI should now respect the updated configurations specified in `~/.datahubenv` across its functions and utilities. This means that for systems where ssl certification is disabled, setting `disable_ssl_verification: true` in `~./datahubenv` will apply to all CLI calls. ## 0.13.1 diff --git a/metadata-ingestion/src/datahub/cli/cli_utils.py b/metadata-ingestion/src/datahub/cli/cli_utils.py index b0039b5f87b343..06861065ca6f25 100644 --- a/metadata-ingestion/src/datahub/cli/cli_utils.py +++ b/metadata-ingestion/src/datahub/cli/cli_utils.py @@ -1,7 +1,5 @@ import json import logging -import os -import os.path import typing from datetime import datetime from typing import Any, Dict, List, Optional, Tuple, Type, Union @@ -25,9 +23,6 @@ log = logging.getLogger(__name__) -ENV_DATAHUB_SYSTEM_CLIENT_ID = "DATAHUB_SYSTEM_CLIENT_ID" -ENV_DATAHUB_SYSTEM_CLIENT_SECRET = "DATAHUB_SYSTEM_CLIENT_SECRET" - # TODO: Many of the methods in this file duplicate logic that already lives # in the DataHubGraph client. We should refactor this to use the client instead. # For the methods that aren't duplicates, that logic should be moved to the client. @@ -37,14 +32,6 @@ def first_non_null(ls: List[Optional[str]]) -> Optional[str]: return next((el for el in ls if el is not None and el.strip() != ""), None) -def get_system_auth() -> Optional[str]: - system_client_id = os.environ.get(ENV_DATAHUB_SYSTEM_CLIENT_ID) - system_client_secret = os.environ.get(ENV_DATAHUB_SYSTEM_CLIENT_SECRET) - if system_client_id is not None and system_client_secret is not None: - return f"Basic {system_client_id}:{system_client_secret}" - return None - - def parse_run_restli_response(response: requests.Response) -> dict: response_json = response.json() if response.status_code != 200: @@ -310,20 +297,16 @@ def command(ctx: click.Context) -> None: return command -def get_session_login_as( +def get_frontend_session_login_as( username: str, password: str, frontend_url: str ) -> requests.Session: session = requests.Session() headers = { "Content-Type": "application/json", } - system_auth = get_system_auth() - if system_auth is not None: - session.headers.update({"Authorization": system_auth}) - else: - data = '{"username":"' + username + '", "password":"' + password + '"}' - response = session.post(f"{frontend_url}/logIn", headers=headers, data=data) - response.raise_for_status() + data = '{"username":"' + username + '", "password":"' + password + '"}' + response = session.post(f"{frontend_url}/logIn", headers=headers, data=data) + response.raise_for_status() return session @@ -367,7 +350,7 @@ def generate_access_token( validity: str = "ONE_HOUR", ) -> Tuple[str, str]: frontend_url = guess_frontend_url_from_gms_url(gms_url) - session = get_session_login_as( + session = get_frontend_session_login_as( username=username, password=password, frontend_url=frontend_url, diff --git a/metadata-ingestion/src/datahub/cli/config_utils.py b/metadata-ingestion/src/datahub/cli/config_utils.py index 7a3fee1c760dae..bb85809174ea96 100644 --- a/metadata-ingestion/src/datahub/cli/config_utils.py +++ b/metadata-ingestion/src/datahub/cli/config_utils.py @@ -4,36 +4,134 @@ import logging import os -from typing import Optional +import sys +from typing import Optional, Tuple import click import yaml +from pydantic import BaseModel, ValidationError from datahub.cli.env_utils import get_boolean_env_variable +from datahub.ingestion.graph.config import DatahubClientConfig -log = logging.getLogger(__name__) +logger = logging.getLogger(__name__) -DEFAULT_GMS_HOST = "http://localhost:8080" CONDENSED_DATAHUB_CONFIG_PATH = "~/.datahubenv" DATAHUB_CONFIG_PATH = os.path.expanduser(CONDENSED_DATAHUB_CONFIG_PATH) DATAHUB_ROOT_FOLDER = os.path.expanduser("~/.datahub") ENV_SKIP_CONFIG = "DATAHUB_SKIP_CONFIG" +ENV_DATAHUB_SYSTEM_CLIENT_ID = "DATAHUB_SYSTEM_CLIENT_ID" +ENV_DATAHUB_SYSTEM_CLIENT_SECRET = "DATAHUB_SYSTEM_CLIENT_SECRET" -def persist_datahub_config(config: dict) -> None: - with open(DATAHUB_CONFIG_PATH, "w+") as outfile: - yaml.dump(config, outfile, default_flow_style=False) +ENV_METADATA_HOST_URL = "DATAHUB_GMS_URL" +ENV_METADATA_TOKEN = "DATAHUB_GMS_TOKEN" +ENV_METADATA_HOST = "DATAHUB_GMS_HOST" +ENV_METADATA_PORT = "DATAHUB_GMS_PORT" +ENV_METADATA_PROTOCOL = "DATAHUB_GMS_PROTOCOL" + + +class MissingConfigError(Exception): + SHOW_STACK_TRACE = False + + +def get_system_auth() -> Optional[str]: + system_client_id = os.environ.get(ENV_DATAHUB_SYSTEM_CLIENT_ID) + system_client_secret = os.environ.get(ENV_DATAHUB_SYSTEM_CLIENT_SECRET) + if system_client_id is not None and system_client_secret is not None: + return f"Basic {system_client_id}:{system_client_secret}" return None -def should_skip_config() -> bool: +def _should_skip_config() -> bool: return get_boolean_env_variable(ENV_SKIP_CONFIG, False) -def get_client_config() -> Optional[dict]: +def persist_raw_datahub_config(config: dict) -> None: + with open(DATAHUB_CONFIG_PATH, "w+") as outfile: + yaml.dump(config, outfile, default_flow_style=False) + return None + + +def get_raw_client_config() -> Optional[dict]: with open(DATAHUB_CONFIG_PATH) as stream: try: return yaml.safe_load(stream) except yaml.YAMLError as exc: click.secho(f"{DATAHUB_CONFIG_PATH} malformed, error: {exc}", bold=True) return None + + +class DatahubConfig(BaseModel): + gms: DatahubClientConfig + + +def _get_config_from_env() -> Tuple[Optional[str], Optional[str]]: + host = os.environ.get(ENV_METADATA_HOST) + port = os.environ.get(ENV_METADATA_PORT) + token = os.environ.get(ENV_METADATA_TOKEN) + protocol = os.environ.get(ENV_METADATA_PROTOCOL, "http") + url = os.environ.get(ENV_METADATA_HOST_URL) + if port is not None: + url = f"{protocol}://{host}:{port}" + return url, token + # The reason for using host as URL is backward compatibility + # If port is not being used we assume someone is using host env var as URL + if url is None and host is not None: + logger.warning( + f"Do not use {ENV_METADATA_HOST} as URL. Use {ENV_METADATA_HOST_URL} instead" + ) + return url or host, token + + +def load_client_config() -> DatahubClientConfig: + gms_host_env, gms_token_env = _get_config_from_env() + if gms_host_env: + # TODO We should also load system auth credentials here. + return DatahubClientConfig(server=gms_host_env, token=gms_token_env) + + if _should_skip_config(): + raise MissingConfigError( + "You have set the skip config flag, but no GMS host or token was provided in env variables." + ) + + try: + _ensure_datahub_config() + client_config_dict = get_raw_client_config() + datahub_config: DatahubClientConfig = DatahubConfig.parse_obj( + client_config_dict + ).gms + + return datahub_config + except ValidationError as e: + click.echo(f"Error loading your {CONDENSED_DATAHUB_CONFIG_PATH}") + click.echo(e, err=True) + sys.exit(1) + + +def _ensure_datahub_config() -> None: + if not os.path.isfile(DATAHUB_CONFIG_PATH): + raise MissingConfigError( + f"No {CONDENSED_DATAHUB_CONFIG_PATH} file found, and no configuration was found in environment variables. " + f"Run `datahub init` to create a {CONDENSED_DATAHUB_CONFIG_PATH} file." + ) + + +def write_gms_config( + host: str, token: Optional[str], merge_with_previous: bool = True +) -> None: + config = DatahubConfig(gms=DatahubClientConfig(server=host, token=token)) + if merge_with_previous: + try: + previous_config = get_raw_client_config() + assert isinstance(previous_config, dict) + except Exception as e: + # ok to fail on this + previous_config = {} + logger.debug( + f"Failed to retrieve config from file {DATAHUB_CONFIG_PATH}: {e}. This isn't fatal." + ) + config_dict = {**previous_config, **config.dict()} + else: + config_dict = config.dict() + persist_raw_datahub_config(config_dict) diff --git a/metadata-ingestion/src/datahub/cli/lite_cli.py b/metadata-ingestion/src/datahub/cli/lite_cli.py index 7000cdbd730947..841c2f27528b72 100644 --- a/metadata-ingestion/src/datahub/cli/lite_cli.py +++ b/metadata-ingestion/src/datahub/cli/lite_cli.py @@ -11,12 +11,12 @@ from datahub.cli.config_utils import ( DATAHUB_ROOT_FOLDER, - get_client_config, - persist_datahub_config, + DatahubConfig, + get_raw_client_config, + persist_raw_datahub_config, ) from datahub.ingestion.api.common import PipelineContext, RecordEnvelope from datahub.ingestion.api.sink import NoopWriteCallback -from datahub.ingestion.graph.client import DatahubConfig from datahub.ingestion.run.pipeline import Pipeline from datahub.ingestion.sink.file import FileSink, FileSinkConfig from datahub.lite.duckdb_lite_config import DuckDBLiteConfig @@ -45,7 +45,7 @@ class LiteCliConfig(DatahubConfig): def get_lite_config() -> LiteLocalConfig: - client_config_dict = get_client_config() + client_config_dict = get_raw_client_config() lite_config = LiteCliConfig.parse_obj(client_config_dict) return lite_config.lite @@ -309,10 +309,10 @@ def search( def write_lite_config(lite_config: LiteLocalConfig) -> None: - cli_config = get_client_config() + cli_config = get_raw_client_config() assert isinstance(cli_config, dict) cli_config["lite"] = lite_config.dict() - persist_datahub_config(cli_config) + persist_raw_datahub_config(cli_config) @lite.command(context_settings=dict(allow_extra_args=True)) diff --git a/metadata-ingestion/src/datahub/emitter/rest_emitter.py b/metadata-ingestion/src/datahub/emitter/rest_emitter.py index bb639339427d91..db4f1749d85446 100644 --- a/metadata-ingestion/src/datahub/emitter/rest_emitter.py +++ b/metadata-ingestion/src/datahub/emitter/rest_emitter.py @@ -10,11 +10,8 @@ from requests.adapters import HTTPAdapter, Retry from requests.exceptions import HTTPError, RequestException -from datahub.cli.cli_utils import ( - ensure_has_system_metadata, - fixup_gms_url, - get_system_auth, -) +from datahub.cli import config_utils +from datahub.cli.cli_utils import ensure_has_system_metadata, fixup_gms_url from datahub.configuration.common import ConfigurationError, OperationalError from datahub.emitter.generic_emitter import Emitter from datahub.emitter.mcp import MetadataChangeProposalWrapper @@ -91,7 +88,12 @@ def __init__( if token: self._session.headers.update({"Authorization": f"Bearer {token}"}) else: - system_auth = get_system_auth() + # HACK: When no token is provided but system auth env variables are set, we use them. + # Ideally this should simply get passed in as config, instead of being sneakily injected + # in as part of this constructor. + # It works because everything goes through here. The DatahubGraph inherits from the + # rest emitter, and the rest sink uses the rest emitter under the hood. + system_auth = config_utils.get_system_auth() if system_auth is not None: self._session.headers.update({"Authorization": system_auth}) diff --git a/metadata-ingestion/src/datahub/entrypoints.py b/metadata-ingestion/src/datahub/entrypoints.py index d6b888b391bfb5..d088380d5d38c4 100644 --- a/metadata-ingestion/src/datahub/entrypoints.py +++ b/metadata-ingestion/src/datahub/entrypoints.py @@ -13,7 +13,11 @@ generate_access_token, make_shim_command, ) -from datahub.cli.config_utils import DATAHUB_CONFIG_PATH, get_boolean_env_variable +from datahub.cli.config_utils import ( + DATAHUB_CONFIG_PATH, + get_boolean_env_variable, + write_gms_config, +) from datahub.cli.delete_cli import delete from datahub.cli.docker_cli import docker from datahub.cli.exists_cli import exists @@ -33,7 +37,7 @@ from datahub.cli.telemetry import telemetry as telemetry_cli from datahub.cli.timeline_cli import timeline from datahub.configuration.common import should_show_stack_trace -from datahub.ingestion.graph.client import get_default_graph, write_gms_config +from datahub.ingestion.graph.client import get_default_graph from datahub.telemetry import telemetry from datahub.utilities._custom_package_loader import model_version_name from datahub.utilities.logging_manager import configure_logging @@ -149,7 +153,7 @@ def init(use_password: bool = False) -> None: type=str, default="", ) - write_gms_config(host, token) + write_gms_config(host, token, merge_with_previous=False) click.echo(f"Written to {DATAHUB_CONFIG_PATH}") diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index b2a768099c481d..4d7b58d185d756 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -3,8 +3,6 @@ import functools import json import logging -import os -import sys import textwrap import time from dataclasses import dataclass @@ -24,10 +22,9 @@ Union, ) -import click from avro.schema import RecordSchema from deprecated import deprecated -from pydantic import BaseModel, ValidationError +from pydantic import BaseModel from requests.models import HTTPError from datahub.cli import config_utils @@ -37,6 +34,7 @@ from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.rest_emitter import DatahubRestEmitter from datahub.emitter.serialization_helper import post_json_transform +from datahub.ingestion.graph.config import DatahubClientConfig from datahub.ingestion.graph.connections import ( connections_gql, get_id_from_connection_urn, @@ -90,26 +88,6 @@ _MISSING_SERVER_ID = "missing" _GRAPH_DUMMY_RUN_ID = "__datahub-graph-client" -ENV_METADATA_HOST_URL = "DATAHUB_GMS_URL" -ENV_METADATA_TOKEN = "DATAHUB_GMS_TOKEN" -ENV_METADATA_HOST = "DATAHUB_GMS_HOST" -ENV_METADATA_PORT = "DATAHUB_GMS_PORT" -ENV_METADATA_PROTOCOL = "DATAHUB_GMS_PROTOCOL" - - -class DatahubClientConfig(ConfigModel): - """Configuration class for holding connectivity to datahub gms""" - - server: str = "http://localhost:8080" - token: Optional[str] = None - timeout_sec: Optional[int] = None - retry_status_codes: Optional[List[int]] = None - retry_max_times: Optional[int] = None - extra_headers: Optional[Dict[str, str]] = None - ca_certificate_path: Optional[str] = None - client_certificate_path: Optional[str] = None - disable_ssl_verification: bool = False - # Alias for backwards compatibility. # DEPRECATION: Remove in v0.10.2. @@ -1775,88 +1753,7 @@ def close(self) -> None: def get_default_graph() -> DataHubGraph: - graph_config = load_client_config() + graph_config = config_utils.load_client_config() graph = DataHubGraph(graph_config) graph.test_connection() return graph - - -class DatahubConfig(BaseModel): - gms: DatahubClientConfig - - -config_override: Dict = {} - - -def get_details_from_env() -> Tuple[Optional[str], Optional[str]]: - host = os.environ.get(ENV_METADATA_HOST) - port = os.environ.get(ENV_METADATA_PORT) - token = os.environ.get(ENV_METADATA_TOKEN) - protocol = os.environ.get(ENV_METADATA_PROTOCOL, "http") - url = os.environ.get(ENV_METADATA_HOST_URL) - if port is not None: - url = f"{protocol}://{host}:{port}" - return url, token - # The reason for using host as URL is backward compatibility - # If port is not being used we assume someone is using host env var as URL - if url is None and host is not None: - logger.warning( - f"Do not use {ENV_METADATA_HOST} as URL. Use {ENV_METADATA_HOST_URL} instead" - ) - return url or host, token - - -def load_client_config() -> DatahubClientConfig: - try: - ensure_datahub_config() - client_config_dict = config_utils.get_client_config() - datahub_config: DatahubClientConfig = DatahubConfig.parse_obj( - client_config_dict - ).gms - except ValidationError as e: - click.echo( - f"Received error, please check your {config_utils.CONDENSED_DATAHUB_CONFIG_PATH}" - ) - click.echo(e, err=True) - sys.exit(1) - - # Override gms & token configs if specified. - if len(config_override.keys()) > 0: - datahub_config.server = str(config_override.get(ENV_METADATA_HOST_URL)) - datahub_config.token = config_override.get(ENV_METADATA_TOKEN) - elif config_utils.should_skip_config(): - gms_host_env, gms_token_env = get_details_from_env() - if gms_host_env: - datahub_config.server = gms_host_env - datahub_config.token = gms_token_env - - return datahub_config - - -def ensure_datahub_config() -> None: - if not os.path.isfile(config_utils.DATAHUB_CONFIG_PATH): - click.secho( - f"No {config_utils.CONDENSED_DATAHUB_CONFIG_PATH} file found, generating one for you...", - bold=True, - ) - write_gms_config(config_utils.DEFAULT_GMS_HOST, None) - - -def write_gms_config( - host: str, token: Optional[str], merge_with_previous: bool = True -) -> None: - config = DatahubConfig(gms=DatahubClientConfig(server=host, token=token)) - if merge_with_previous: - try: - previous_config = config_utils.get_client_config() - assert isinstance(previous_config, dict) - except Exception as e: - # ok to fail on this - previous_config = {} - logger.debug( - f"Failed to retrieve config from file {config_utils.DATAHUB_CONFIG_PATH}: {e}. This isn't fatal." - ) - config_dict = {**previous_config, **config.dict()} - else: - config_dict = config.dict() - config_utils.persist_datahub_config(config_dict) diff --git a/metadata-ingestion/src/datahub/ingestion/graph/config.py b/metadata-ingestion/src/datahub/ingestion/graph/config.py new file mode 100644 index 00000000000000..cf0ec45b71458c --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/graph/config.py @@ -0,0 +1,19 @@ +from typing import Dict, List, Optional + +from datahub.configuration.common import ConfigModel + + +class DatahubClientConfig(ConfigModel): + """Configuration class for holding connectivity to datahub gms""" + + # TODO: Having a default for the server doesn't make a ton of sense. This should be handled + # by callers / the CLI, but the actual client should not have any magic. + server: str = "http://localhost:8080" + token: Optional[str] = None + timeout_sec: Optional[int] = None + retry_status_codes: Optional[List[int]] = None + retry_max_times: Optional[int] = None + extra_headers: Optional[Dict[str, str]] = None + ca_certificate_path: Optional[str] = None + client_certificate_path: Optional[str] = None + disable_ssl_verification: bool = False diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py index 60930f03763ed9..0a91c4918d8019 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py @@ -252,6 +252,9 @@ def __init__( ) if self.config.sink is None: + logger.info( + "No sink configured, attempting to use the default datahub-rest sink." + ) with _add_init_error_context("configure the default rest sink"): self.sink_type = "datahub-rest" self.sink = _make_default_rest_sink(self.ctx) diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py index 51b657ad5bf777..98629ba030695a 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py @@ -1,14 +1,12 @@ import datetime import logging -import os import uuid from typing import Any, Dict, List, Optional -from pydantic import Field, root_validator, validator +from pydantic import Field, validator -from datahub.configuration import config_loader from datahub.configuration.common import ConfigModel, DynamicTypedConfig -from datahub.ingestion.graph.client import DatahubClientConfig, load_client_config +from datahub.ingestion.graph.client import DatahubClientConfig from datahub.ingestion.sink.file import FileSinkConfig logger = logging.getLogger(__name__) @@ -103,34 +101,6 @@ def run_id_should_be_semantic( assert v is not None return v - @root_validator(pre=True) - def default_sink_is_datahub_rest(cls, values: Dict[str, Any]) -> Any: - if "sink" not in values: - config = load_client_config() - # update this - default_sink_config = { - "type": "datahub-rest", - "config": config.dict(exclude_defaults=True), - } - # resolve env variables if present - default_sink_config = config_loader.resolve_env_variables( - default_sink_config, environ=os.environ - ) - values["sink"] = default_sink_config - - return values - - @validator("datahub_api", always=True) - def datahub_api_should_use_rest_sink_as_default( - cls, v: Optional[DatahubClientConfig], values: Dict[str, Any], **kwargs: Any - ) -> Optional[DatahubClientConfig]: - if v is None and "sink" in values and hasattr(values["sink"], "type"): - sink_type = values["sink"].type - if sink_type == "datahub-rest": - sink_config = values["sink"].config - v = DatahubClientConfig.parse_obj_allow_extras(sink_config) - return v - @classmethod def from_dict( cls, resolved_dict: dict, raw_dict: Optional[dict] = None diff --git a/metadata-ingestion/src/datahub/upgrade/upgrade.py b/metadata-ingestion/src/datahub/upgrade/upgrade.py index cdb7df57efe798..9e2639abdca415 100644 --- a/metadata-ingestion/src/datahub/upgrade/upgrade.py +++ b/metadata-ingestion/src/datahub/upgrade/upgrade.py @@ -12,7 +12,8 @@ from termcolor import colored from datahub import __version__ -from datahub.ingestion.graph.client import DataHubGraph, load_client_config +from datahub.cli.config_utils import load_client_config +from datahub.ingestion.graph.client import DataHubGraph log = logging.getLogger(__name__) diff --git a/metadata-ingestion/tests/unit/test_cli_utils.py b/metadata-ingestion/tests/unit/test_cli_utils.py index 68cb985af47340..af3a184d97e41c 100644 --- a/metadata-ingestion/tests/unit/test_cli_utils.py +++ b/metadata-ingestion/tests/unit/test_cli_utils.py @@ -2,7 +2,7 @@ from unittest import mock from datahub.cli import cli_utils -from datahub.ingestion.graph.client import get_details_from_env +from datahub.cli.config_utils import _get_config_from_env def test_first_non_null(): @@ -17,14 +17,14 @@ def test_first_non_null(): @mock.patch.dict(os.environ, {"DATAHUB_GMS_HOST": "http://localhost:9092"}) def test_correct_url_when_gms_host_in_old_format(): - assert get_details_from_env() == ("http://localhost:9092", None) + assert _get_config_from_env() == ("http://localhost:9092", None) @mock.patch.dict( os.environ, {"DATAHUB_GMS_HOST": "localhost", "DATAHUB_GMS_PORT": "8080"} ) def test_correct_url_when_gms_host_and_port_set(): - assert get_details_from_env() == ("http://localhost:8080", None) + assert _get_config_from_env() == ("http://localhost:8080", None) @mock.patch.dict( @@ -36,7 +36,7 @@ def test_correct_url_when_gms_host_and_port_set(): }, ) def test_correct_url_when_gms_host_port_url_set(): - assert get_details_from_env() == ("http://localhost:8080", None) + assert _get_config_from_env() == ("http://localhost:8080", None) @mock.patch.dict( @@ -49,7 +49,7 @@ def test_correct_url_when_gms_host_port_url_set(): }, ) def test_correct_url_when_gms_host_port_url_protocol_set(): - assert get_details_from_env() == ("https://localhost:8080", None) + assert _get_config_from_env() == ("https://localhost:8080", None) @mock.patch.dict( @@ -59,7 +59,7 @@ def test_correct_url_when_gms_host_port_url_protocol_set(): }, ) def test_correct_url_when_url_set(): - assert get_details_from_env() == ("https://example.com", None) + assert _get_config_from_env() == ("https://example.com", None) def test_fixup_gms_url(): diff --git a/metadata-ingestion/tests/unit/test_pipeline.py b/metadata-ingestion/tests/unit/test_pipeline.py index 01403163bbfbdf..defa117e370f00 100644 --- a/metadata-ingestion/tests/unit/test_pipeline.py +++ b/metadata-ingestion/tests/unit/test_pipeline.py @@ -11,6 +11,7 @@ from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.transform import Transformer from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.graph.config import DatahubClientConfig from datahub.ingestion.run.pipeline import Pipeline, PipelineContext from datahub.ingestion.sink.datahub_rest import DatahubRestSink from datahub.metadata.com.linkedin.pegasus2avro.mxe import SystemMetadata @@ -60,7 +61,13 @@ def test_configure(self, mock_sink, mock_source, mock_consumer): "datahub.ingestion.graph.client.DataHubGraph.get_config", return_value={"noCode": True}, ) - def test_configure_without_sink(self, mock_emitter, mock_graph): + @patch( + "datahub.cli.config_utils.load_client_config", + return_value=DatahubClientConfig(server="http://fake-gms-server:8080"), + ) + def test_configure_without_sink( + self, mock_emitter, mock_graph, mock_load_client_config + ): pipeline = Pipeline.create( { "source": { @@ -71,8 +78,44 @@ def test_configure_without_sink(self, mock_emitter, mock_graph): ) # assert that the default sink is a DatahubRestSink assert isinstance(pipeline.sink, DatahubRestSink) - assert pipeline.sink.config.server == "http://localhost:8080" - # token value is read from ~/.datahubenv which may be None or not + assert pipeline.sink.config.server == "http://fake-gms-server:8080" + assert pipeline.sink.config.token is None + + @freeze_time(FROZEN_TIME) + @patch( + "datahub.emitter.rest_emitter.DatahubRestEmitter.test_connection", + return_value={"noCode": True}, + ) + @patch( + "datahub.ingestion.graph.client.DataHubGraph.get_config", + return_value={"noCode": True}, + ) + @patch( + "datahub.cli.config_utils.load_client_config", + return_value=DatahubClientConfig(server="http://fake-internal-server:8080"), + ) + @patch( + "datahub.cli.config_utils.get_system_auth", + return_value="Basic user:pass", + ) + def test_configure_without_sink_use_system_auth( + self, mock_emitter, mock_graph, mock_load_client_config, mock_get_system_auth + ): + pipeline = Pipeline.create( + { + "source": { + "type": "file", + "config": {"path": "test_file.json"}, + }, + } + ) + # assert that the default sink is a DatahubRestSink + assert isinstance(pipeline.sink, DatahubRestSink) + assert pipeline.sink.config.server == "http://fake-internal-server:8080" + assert pipeline.sink.config.token is None + assert ( + pipeline.sink.emitter._session.headers["Authorization"] == "Basic user:pass" + ) @freeze_time(FROZEN_TIME) @patch( diff --git a/smoke-test/smoke.sh b/smoke-test/smoke.sh index db0389be1f4897..fafb2076fe6990 100755 --- a/smoke-test/smoke.sh +++ b/smoke-test/smoke.sh @@ -24,6 +24,8 @@ source venv/bin/activate source ./set-cypress-creds.sh +export DATAHUB_GMS_URL=http://localhost:8080 + # no_cypress_suite0, no_cypress_suite1, cypress_suite1, cypress_rest if [[ -z "${TEST_STRATEGY}" ]]; then pytest -rP --durations=20 -vv --continue-on-collection-errors --junit-xml=junit.smoke.xml diff --git a/smoke-test/tests/utils.py b/smoke-test/tests/utils.py index 0895056fe3dddf..7564f1a05e79e0 100644 --- a/smoke-test/tests/utils.py +++ b/smoke-test/tests/utils.py @@ -23,7 +23,7 @@ def get_frontend_session(): def login_as(username: str, password: str): - return cli_utils.get_session_login_as( + return cli_utils.get_frontend_session_login_as( username=username, password=password, frontend_url=get_frontend_url() ) From 1f7c92bb994fc8682f82aa65dd712ab62a6c062f Mon Sep 17 00:00:00 2001 From: Pedro Silva Date: Fri, 26 Jul 2024 19:20:21 +0100 Subject: [PATCH 4/4] feat(docs): Document __DATAHUB_TO_FILE_ directive (#10968) Co-authored-by: Harshal Sheth --- metadata-ingestion/recipe_overview.md | 29 +++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/metadata-ingestion/recipe_overview.md b/metadata-ingestion/recipe_overview.md index a748edbf3bb449..27d5cb8c85f23b 100644 --- a/metadata-ingestion/recipe_overview.md +++ b/metadata-ingestion/recipe_overview.md @@ -90,6 +90,35 @@ similar to variable substitution in GNU bash or in docker-compose files. For details, see [variable-substitution](https://docs.docker.com/compose/compose-file/compose-file-v2/#variable-substitution). This environment variable substitution should be used to mask sensitive information in recipe files. As long as you can get env variables securely to the ingestion process there would not be any need to store sensitive information in recipes. +### Loading Sensitive Data as Files in Recipes + + +Some sources (e.g. kafka, bigquery, mysql) require paths to files on a local file system. This doesn't work for UI ingestion, where the recipe needs to be totally self-sufficient. To add files to ingestion processes as part of the necessary configuration, DataHub offers a directive `__DATAHUB_TO_FILE_` which allows recipes to set the contents of files. + +The syntax for this directive is: `__DATAHUB_TO_FILE_: ` which will get turned into `: `. Note that value can be specified inline or using an env var/secret. + +I.e: + +```yaml +source: + type: mysql + config: + # Coordinates + host_port: localhost:3306 + database: dbname + + # Credentials + username: root + password: example + # If you need to use SSL with MySQL: + options: + connect_args: + __DATAHUB_TO_FILE_ssl_key: '${secret}' # use this for secrets that you need to mount to a file + # this will get converted into + # ssl_key: /tmp/path/to/file # where file contains the contents of ${secret} + ... +``` + ### Transformations If you'd like to modify data before it reaches the ingestion sinks – for instance, adding additional owners or tags – you can use a transformer to write your own module and integrate it with DataHub. Transformers require extending the recipe with a new section to describe the transformers that you want to run.