Skip to content

Commit

Permalink
fix(ingest): fix graph config loading (datahub-project#11002)
Browse files Browse the repository at this point in the history
Co-authored-by: Pedro Silva <[email protected]>
  • Loading branch information
hsheth2 and pedro93 authored Jul 26, 2024
1 parent 0274c70 commit f816a14
Show file tree
Hide file tree
Showing 15 changed files with 218 additions and 195 deletions.
3 changes: 2 additions & 1 deletion docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ New (optional fields `systemMetadata` and `headers`):

### Other Notable Changes
- #10498 - Tableau ingestion can now be configured to ingest multiple sites at once and add the sites as containers. The feature is currently only available for Tableau Server.
- #10466 - Extends configuration in `~/.datahubenv` to match `DatahubClientConfig` object definition. See full configuration in https://datahubproject.io/docs/python-sdk/clients/. The CLI should now respect the updated configurations specified in `~/.datahubenv` across its functions and utilities. This means that for systems where ssl certification is disabled, setting `disable_ssl_verification: true` in `~./datahubenv` will apply to all CLI calls.
- #11002 - We will not auto-generate a `~/.datahubenv` file. You must either run `datahub init` to create that file, or set environment variables so that the config is loaded.

## 0.13.3

Expand All @@ -80,7 +82,6 @@ New (optional fields `systemMetadata` and `headers`):
### Deprecations

### Other Notable Change
- #10466 - Extends configuration in `~/.datahubenv` to match `DatahubClientConfig` object definition. See full configuration in https://datahubproject.io/docs/python-sdk/clients/. The CLI should now respect the updated configurations specified in `~/.datahubenv` across its functions and utilities. This means that for systems where ssl certification is disabled, setting `disable_ssl_verification: true` in `~./datahubenv` will apply to all CLI calls.

## 0.13.1

Expand Down
27 changes: 5 additions & 22 deletions metadata-ingestion/src/datahub/cli/cli_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import json
import logging
import os
import os.path
import typing
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple, Type, Union
Expand All @@ -25,9 +23,6 @@

log = logging.getLogger(__name__)

ENV_DATAHUB_SYSTEM_CLIENT_ID = "DATAHUB_SYSTEM_CLIENT_ID"
ENV_DATAHUB_SYSTEM_CLIENT_SECRET = "DATAHUB_SYSTEM_CLIENT_SECRET"

# TODO: Many of the methods in this file duplicate logic that already lives
# in the DataHubGraph client. We should refactor this to use the client instead.
# For the methods that aren't duplicates, that logic should be moved to the client.
Expand All @@ -37,14 +32,6 @@ def first_non_null(ls: List[Optional[str]]) -> Optional[str]:
return next((el for el in ls if el is not None and el.strip() != ""), None)


def get_system_auth() -> Optional[str]:
system_client_id = os.environ.get(ENV_DATAHUB_SYSTEM_CLIENT_ID)
system_client_secret = os.environ.get(ENV_DATAHUB_SYSTEM_CLIENT_SECRET)
if system_client_id is not None and system_client_secret is not None:
return f"Basic {system_client_id}:{system_client_secret}"
return None


def parse_run_restli_response(response: requests.Response) -> dict:
response_json = response.json()
if response.status_code != 200:
Expand Down Expand Up @@ -310,20 +297,16 @@ def command(ctx: click.Context) -> None:
return command


def get_session_login_as(
def get_frontend_session_login_as(
username: str, password: str, frontend_url: str
) -> requests.Session:
session = requests.Session()
headers = {
"Content-Type": "application/json",
}
system_auth = get_system_auth()
if system_auth is not None:
session.headers.update({"Authorization": system_auth})
else:
data = '{"username":"' + username + '", "password":"' + password + '"}'
response = session.post(f"{frontend_url}/logIn", headers=headers, data=data)
response.raise_for_status()
data = '{"username":"' + username + '", "password":"' + password + '"}'
response = session.post(f"{frontend_url}/logIn", headers=headers, data=data)
response.raise_for_status()
return session


Expand Down Expand Up @@ -367,7 +350,7 @@ def generate_access_token(
validity: str = "ONE_HOUR",
) -> Tuple[str, str]:
frontend_url = guess_frontend_url_from_gms_url(gms_url)
session = get_session_login_as(
session = get_frontend_session_login_as(
username=username,
password=password,
frontend_url=frontend_url,
Expand Down
114 changes: 106 additions & 8 deletions metadata-ingestion/src/datahub/cli/config_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,134 @@

import logging
import os
from typing import Optional
import sys
from typing import Optional, Tuple

import click
import yaml
from pydantic import BaseModel, ValidationError

from datahub.cli.env_utils import get_boolean_env_variable
from datahub.ingestion.graph.config import DatahubClientConfig

log = logging.getLogger(__name__)
logger = logging.getLogger(__name__)

DEFAULT_GMS_HOST = "http://localhost:8080"
CONDENSED_DATAHUB_CONFIG_PATH = "~/.datahubenv"
DATAHUB_CONFIG_PATH = os.path.expanduser(CONDENSED_DATAHUB_CONFIG_PATH)
DATAHUB_ROOT_FOLDER = os.path.expanduser("~/.datahub")
ENV_SKIP_CONFIG = "DATAHUB_SKIP_CONFIG"

ENV_DATAHUB_SYSTEM_CLIENT_ID = "DATAHUB_SYSTEM_CLIENT_ID"
ENV_DATAHUB_SYSTEM_CLIENT_SECRET = "DATAHUB_SYSTEM_CLIENT_SECRET"

def persist_datahub_config(config: dict) -> None:
with open(DATAHUB_CONFIG_PATH, "w+") as outfile:
yaml.dump(config, outfile, default_flow_style=False)
ENV_METADATA_HOST_URL = "DATAHUB_GMS_URL"
ENV_METADATA_TOKEN = "DATAHUB_GMS_TOKEN"
ENV_METADATA_HOST = "DATAHUB_GMS_HOST"
ENV_METADATA_PORT = "DATAHUB_GMS_PORT"
ENV_METADATA_PROTOCOL = "DATAHUB_GMS_PROTOCOL"


class MissingConfigError(Exception):
SHOW_STACK_TRACE = False


def get_system_auth() -> Optional[str]:
system_client_id = os.environ.get(ENV_DATAHUB_SYSTEM_CLIENT_ID)
system_client_secret = os.environ.get(ENV_DATAHUB_SYSTEM_CLIENT_SECRET)
if system_client_id is not None and system_client_secret is not None:
return f"Basic {system_client_id}:{system_client_secret}"
return None


def should_skip_config() -> bool:
def _should_skip_config() -> bool:
return get_boolean_env_variable(ENV_SKIP_CONFIG, False)


def get_client_config() -> Optional[dict]:
def persist_raw_datahub_config(config: dict) -> None:
with open(DATAHUB_CONFIG_PATH, "w+") as outfile:
yaml.dump(config, outfile, default_flow_style=False)
return None


def get_raw_client_config() -> Optional[dict]:
with open(DATAHUB_CONFIG_PATH) as stream:
try:
return yaml.safe_load(stream)
except yaml.YAMLError as exc:
click.secho(f"{DATAHUB_CONFIG_PATH} malformed, error: {exc}", bold=True)
return None


class DatahubConfig(BaseModel):
gms: DatahubClientConfig


def _get_config_from_env() -> Tuple[Optional[str], Optional[str]]:
host = os.environ.get(ENV_METADATA_HOST)
port = os.environ.get(ENV_METADATA_PORT)
token = os.environ.get(ENV_METADATA_TOKEN)
protocol = os.environ.get(ENV_METADATA_PROTOCOL, "http")
url = os.environ.get(ENV_METADATA_HOST_URL)
if port is not None:
url = f"{protocol}://{host}:{port}"
return url, token
# The reason for using host as URL is backward compatibility
# If port is not being used we assume someone is using host env var as URL
if url is None and host is not None:
logger.warning(
f"Do not use {ENV_METADATA_HOST} as URL. Use {ENV_METADATA_HOST_URL} instead"
)
return url or host, token


def load_client_config() -> DatahubClientConfig:
gms_host_env, gms_token_env = _get_config_from_env()
if gms_host_env:
# TODO We should also load system auth credentials here.
return DatahubClientConfig(server=gms_host_env, token=gms_token_env)

if _should_skip_config():
raise MissingConfigError(
"You have set the skip config flag, but no GMS host or token was provided in env variables."
)

try:
_ensure_datahub_config()
client_config_dict = get_raw_client_config()
datahub_config: DatahubClientConfig = DatahubConfig.parse_obj(
client_config_dict
).gms

return datahub_config
except ValidationError as e:
click.echo(f"Error loading your {CONDENSED_DATAHUB_CONFIG_PATH}")
click.echo(e, err=True)
sys.exit(1)


def _ensure_datahub_config() -> None:
if not os.path.isfile(DATAHUB_CONFIG_PATH):
raise MissingConfigError(
f"No {CONDENSED_DATAHUB_CONFIG_PATH} file found, and no configuration was found in environment variables. "
f"Run `datahub init` to create a {CONDENSED_DATAHUB_CONFIG_PATH} file."
)


def write_gms_config(
host: str, token: Optional[str], merge_with_previous: bool = True
) -> None:
config = DatahubConfig(gms=DatahubClientConfig(server=host, token=token))
if merge_with_previous:
try:
previous_config = get_raw_client_config()
assert isinstance(previous_config, dict)
except Exception as e:
# ok to fail on this
previous_config = {}
logger.debug(
f"Failed to retrieve config from file {DATAHUB_CONFIG_PATH}: {e}. This isn't fatal."
)
config_dict = {**previous_config, **config.dict()}
else:
config_dict = config.dict()
persist_raw_datahub_config(config_dict)
12 changes: 6 additions & 6 deletions metadata-ingestion/src/datahub/cli/lite_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@

from datahub.cli.config_utils import (
DATAHUB_ROOT_FOLDER,
get_client_config,
persist_datahub_config,
DatahubConfig,
get_raw_client_config,
persist_raw_datahub_config,
)
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope
from datahub.ingestion.api.sink import NoopWriteCallback
from datahub.ingestion.graph.client import DatahubConfig
from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.sink.file import FileSink, FileSinkConfig
from datahub.lite.duckdb_lite_config import DuckDBLiteConfig
Expand Down Expand Up @@ -45,7 +45,7 @@ class LiteCliConfig(DatahubConfig):


def get_lite_config() -> LiteLocalConfig:
client_config_dict = get_client_config()
client_config_dict = get_raw_client_config()
lite_config = LiteCliConfig.parse_obj(client_config_dict)
return lite_config.lite

Expand Down Expand Up @@ -309,10 +309,10 @@ def search(


def write_lite_config(lite_config: LiteLocalConfig) -> None:
cli_config = get_client_config()
cli_config = get_raw_client_config()
assert isinstance(cli_config, dict)
cli_config["lite"] = lite_config.dict()
persist_datahub_config(cli_config)
persist_raw_datahub_config(cli_config)


@lite.command(context_settings=dict(allow_extra_args=True))
Expand Down
14 changes: 8 additions & 6 deletions metadata-ingestion/src/datahub/emitter/rest_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,8 @@
from requests.adapters import HTTPAdapter, Retry
from requests.exceptions import HTTPError, RequestException

from datahub.cli.cli_utils import (
ensure_has_system_metadata,
fixup_gms_url,
get_system_auth,
)
from datahub.cli import config_utils
from datahub.cli.cli_utils import ensure_has_system_metadata, fixup_gms_url
from datahub.configuration.common import ConfigurationError, OperationalError
from datahub.emitter.generic_emitter import Emitter
from datahub.emitter.mcp import MetadataChangeProposalWrapper
Expand Down Expand Up @@ -91,7 +88,12 @@ def __init__(
if token:
self._session.headers.update({"Authorization": f"Bearer {token}"})
else:
system_auth = get_system_auth()
# HACK: When no token is provided but system auth env variables are set, we use them.
# Ideally this should simply get passed in as config, instead of being sneakily injected
# in as part of this constructor.
# It works because everything goes through here. The DatahubGraph inherits from the
# rest emitter, and the rest sink uses the rest emitter under the hood.
system_auth = config_utils.get_system_auth()
if system_auth is not None:
self._session.headers.update({"Authorization": system_auth})

Expand Down
10 changes: 7 additions & 3 deletions metadata-ingestion/src/datahub/entrypoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@
generate_access_token,
make_shim_command,
)
from datahub.cli.config_utils import DATAHUB_CONFIG_PATH, get_boolean_env_variable
from datahub.cli.config_utils import (
DATAHUB_CONFIG_PATH,
get_boolean_env_variable,
write_gms_config,
)
from datahub.cli.delete_cli import delete
from datahub.cli.docker_cli import docker
from datahub.cli.exists_cli import exists
Expand All @@ -33,7 +37,7 @@
from datahub.cli.telemetry import telemetry as telemetry_cli
from datahub.cli.timeline_cli import timeline
from datahub.configuration.common import should_show_stack_trace
from datahub.ingestion.graph.client import get_default_graph, write_gms_config
from datahub.ingestion.graph.client import get_default_graph
from datahub.telemetry import telemetry
from datahub.utilities._custom_package_loader import model_version_name
from datahub.utilities.logging_manager import configure_logging
Expand Down Expand Up @@ -149,7 +153,7 @@ def init(use_password: bool = False) -> None:
type=str,
default="",
)
write_gms_config(host, token)
write_gms_config(host, token, merge_with_previous=False)

click.echo(f"Written to {DATAHUB_CONFIG_PATH}")

Expand Down
Loading

0 comments on commit f816a14

Please sign in to comment.