Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Jul 26, 2024
2 parents 68cad74 + 1f7c92b commit 25af9f0
Show file tree
Hide file tree
Showing 24 changed files with 279 additions and 207 deletions.
1 change: 1 addition & 0 deletions datahub-web-react/src/app/context/UserContextProvider.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ const UserContextProvider = ({ children }: { children: React.ReactNode }) => {
return (
<UserContext.Provider
value={{
loaded: !!meData,
urn: meData?.me?.corpUser?.urn,
user: meData?.me?.corpUser as CorpUser,
platformPrivileges: meData?.me?.platformPrivileges as PlatformPrivileges,
Expand Down
2 changes: 2 additions & 0 deletions datahub-web-react/src/app/context/userContext.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export type State = {
* Context about the currently-authenticated user.
*/
export type UserContextType = {
loaded: boolean;
urn?: string | null;
user?: CorpUser | null;
platformPrivileges?: PlatformPrivileges | null;
Expand All @@ -53,6 +54,7 @@ export const DEFAULT_STATE: State = {
};

export const DEFAULT_CONTEXT = {
loaded: false,
urn: undefined,
user: undefined,
state: DEFAULT_STATE,
Expand Down
14 changes: 10 additions & 4 deletions datahub-web-react/src/app/ingest/ManageIngestionPage.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Tabs, Typography } from 'antd';
import React, { useState } from 'react';
import React, { useEffect, useState } from 'react';
import styled from 'styled-components';
import { IngestionSourceList } from './source/IngestionSourceList';
import { useAppConfig } from '../useAppConfig';
Expand Down Expand Up @@ -51,12 +51,18 @@ export const ManageIngestionPage = () => {
* Determines which view should be visible: ingestion sources or secrets.
*/
const me = useUserContext();
const { config } = useAppConfig();
const { config, loaded } = useAppConfig();
const isIngestionEnabled = config?.managedIngestionConfig.enabled;
const showIngestionTab = isIngestionEnabled && me && me.platformPrivileges?.manageIngestion;
const showSecretsTab = isIngestionEnabled && me && me.platformPrivileges?.manageSecrets;
const defaultTab = showIngestionTab ? TabType.Sources : TabType.Secrets;
const [selectedTab, setSelectedTab] = useState<TabType>(defaultTab);
const [selectedTab, setSelectedTab] = useState<TabType>(TabType.Sources);

// defaultTab might not be calculated correctly on mount, if `config` or `me` haven't been loaded yet
useEffect(() => {
if (loaded && me.loaded && !showIngestionTab && selectedTab === TabType.Sources) {
setSelectedTab(TabType.Secrets);
}
}, [loaded, me.loaded, showIngestionTab, selectedTab]);

const onClickTab = (newTab: string) => {
setSelectedTab(TabType[newTab]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,13 @@ export function LastExecutionColumn(time: any) {
}

export function ScheduleColumn(schedule: any, record: any) {
const tooltip = schedule && `Runs ${cronstrue.toString(schedule).toLowerCase()} (${record.timezone})`;
let tooltip: string;
try {
tooltip = schedule && `Runs ${cronstrue.toString(schedule).toLowerCase()} (${record.timezone})`;
} catch (e) {
tooltip = 'Invalid cron schedule';
console.debug('Error parsing cron schedule', e);
}
return (
<Tooltip title={tooltip || 'Not scheduled'}>
<Typography.Text code>{schedule || 'None'}</Typography.Text>
Expand Down
3 changes: 2 additions & 1 deletion docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ New (optional fields `systemMetadata` and `headers`):

### Other Notable Changes
- #10498 - Tableau ingestion can now be configured to ingest multiple sites at once and add the sites as containers. The feature is currently only available for Tableau Server.
- #10466 - Extends configuration in `~/.datahubenv` to match `DatahubClientConfig` object definition. See full configuration in https://datahubproject.io/docs/python-sdk/clients/. The CLI should now respect the updated configurations specified in `~/.datahubenv` across its functions and utilities. This means that for systems where ssl certification is disabled, setting `disable_ssl_verification: true` in `~./datahubenv` will apply to all CLI calls.
- #11002 - We will not auto-generate a `~/.datahubenv` file. You must either run `datahub init` to create that file, or set environment variables so that the config is loaded.

## 0.13.3

Expand All @@ -80,7 +82,6 @@ New (optional fields `systemMetadata` and `headers`):
### Deprecations

### Other Notable Change
- #10466 - Extends configuration in `~/.datahubenv` to match `DatahubClientConfig` object definition. See full configuration in https://datahubproject.io/docs/python-sdk/clients/. The CLI should now respect the updated configurations specified in `~/.datahubenv` across its functions and utilities. This means that for systems where ssl certification is disabled, setting `disable_ssl_verification: true` in `~./datahubenv` will apply to all CLI calls.

## 0.13.1

Expand Down
29 changes: 29 additions & 0 deletions metadata-ingestion/recipe_overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,35 @@ similar to variable substitution in GNU bash or in docker-compose files.
For details, see [variable-substitution](https://docs.docker.com/compose/compose-file/compose-file-v2/#variable-substitution).
This environment variable substitution should be used to mask sensitive information in recipe files. As long as you can get env variables securely to the ingestion process there would not be any need to store sensitive information in recipes.

### Loading Sensitive Data as Files in Recipes


Some sources (e.g. kafka, bigquery, mysql) require paths to files on a local file system. This doesn't work for UI ingestion, where the recipe needs to be totally self-sufficient. To add files to ingestion processes as part of the necessary configuration, DataHub offers a directive `__DATAHUB_TO_FILE_` which allows recipes to set the contents of files.

The syntax for this directive is: `__DATAHUB_TO_FILE_<property>: <value>` which will get turned into `<property>: <path to file containing value>`. Note that value can be specified inline or using an env var/secret.

I.e:

```yaml
source:
type: mysql
config:
# Coordinates
host_port: localhost:3306
database: dbname

# Credentials
username: root
password: example
# If you need to use SSL with MySQL:
options:
connect_args:
__DATAHUB_TO_FILE_ssl_key: '${secret}' # use this for secrets that you need to mount to a file
# this will get converted into
# ssl_key: /tmp/path/to/file # where file contains the contents of ${secret}
...
```

### Transformations

If you'd like to modify data before it reaches the ingestion sinks – for instance, adding additional owners or tags – you can use a transformer to write your own module and integrate it with DataHub. Transformers require extending the recipe with a new section to describe the transformers that you want to run.
Expand Down
27 changes: 5 additions & 22 deletions metadata-ingestion/src/datahub/cli/cli_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import json
import logging
import os
import os.path
import typing
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple, Type, Union
Expand All @@ -25,9 +23,6 @@

log = logging.getLogger(__name__)

ENV_DATAHUB_SYSTEM_CLIENT_ID = "DATAHUB_SYSTEM_CLIENT_ID"
ENV_DATAHUB_SYSTEM_CLIENT_SECRET = "DATAHUB_SYSTEM_CLIENT_SECRET"

# TODO: Many of the methods in this file duplicate logic that already lives
# in the DataHubGraph client. We should refactor this to use the client instead.
# For the methods that aren't duplicates, that logic should be moved to the client.
Expand All @@ -37,14 +32,6 @@ def first_non_null(ls: List[Optional[str]]) -> Optional[str]:
return next((el for el in ls if el is not None and el.strip() != ""), None)


def get_system_auth() -> Optional[str]:
system_client_id = os.environ.get(ENV_DATAHUB_SYSTEM_CLIENT_ID)
system_client_secret = os.environ.get(ENV_DATAHUB_SYSTEM_CLIENT_SECRET)
if system_client_id is not None and system_client_secret is not None:
return f"Basic {system_client_id}:{system_client_secret}"
return None


def parse_run_restli_response(response: requests.Response) -> dict:
response_json = response.json()
if response.status_code != 200:
Expand Down Expand Up @@ -310,20 +297,16 @@ def command(ctx: click.Context) -> None:
return command


def get_session_login_as(
def get_frontend_session_login_as(
username: str, password: str, frontend_url: str
) -> requests.Session:
session = requests.Session()
headers = {
"Content-Type": "application/json",
}
system_auth = get_system_auth()
if system_auth is not None:
session.headers.update({"Authorization": system_auth})
else:
data = '{"username":"' + username + '", "password":"' + password + '"}'
response = session.post(f"{frontend_url}/logIn", headers=headers, data=data)
response.raise_for_status()
data = '{"username":"' + username + '", "password":"' + password + '"}'
response = session.post(f"{frontend_url}/logIn", headers=headers, data=data)
response.raise_for_status()
return session


Expand Down Expand Up @@ -367,7 +350,7 @@ def generate_access_token(
validity: str = "ONE_HOUR",
) -> Tuple[str, str]:
frontend_url = guess_frontend_url_from_gms_url(gms_url)
session = get_session_login_as(
session = get_frontend_session_login_as(
username=username,
password=password,
frontend_url=frontend_url,
Expand Down
114 changes: 106 additions & 8 deletions metadata-ingestion/src/datahub/cli/config_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,134 @@

import logging
import os
from typing import Optional
import sys
from typing import Optional, Tuple

import click
import yaml
from pydantic import BaseModel, ValidationError

from datahub.cli.env_utils import get_boolean_env_variable
from datahub.ingestion.graph.config import DatahubClientConfig

log = logging.getLogger(__name__)
logger = logging.getLogger(__name__)

DEFAULT_GMS_HOST = "http://localhost:8080"
CONDENSED_DATAHUB_CONFIG_PATH = "~/.datahubenv"
DATAHUB_CONFIG_PATH = os.path.expanduser(CONDENSED_DATAHUB_CONFIG_PATH)
DATAHUB_ROOT_FOLDER = os.path.expanduser("~/.datahub")
ENV_SKIP_CONFIG = "DATAHUB_SKIP_CONFIG"

ENV_DATAHUB_SYSTEM_CLIENT_ID = "DATAHUB_SYSTEM_CLIENT_ID"
ENV_DATAHUB_SYSTEM_CLIENT_SECRET = "DATAHUB_SYSTEM_CLIENT_SECRET"

def persist_datahub_config(config: dict) -> None:
with open(DATAHUB_CONFIG_PATH, "w+") as outfile:
yaml.dump(config, outfile, default_flow_style=False)
ENV_METADATA_HOST_URL = "DATAHUB_GMS_URL"
ENV_METADATA_TOKEN = "DATAHUB_GMS_TOKEN"
ENV_METADATA_HOST = "DATAHUB_GMS_HOST"
ENV_METADATA_PORT = "DATAHUB_GMS_PORT"
ENV_METADATA_PROTOCOL = "DATAHUB_GMS_PROTOCOL"


class MissingConfigError(Exception):
SHOW_STACK_TRACE = False


def get_system_auth() -> Optional[str]:
system_client_id = os.environ.get(ENV_DATAHUB_SYSTEM_CLIENT_ID)
system_client_secret = os.environ.get(ENV_DATAHUB_SYSTEM_CLIENT_SECRET)
if system_client_id is not None and system_client_secret is not None:
return f"Basic {system_client_id}:{system_client_secret}"
return None


def should_skip_config() -> bool:
def _should_skip_config() -> bool:
return get_boolean_env_variable(ENV_SKIP_CONFIG, False)


def get_client_config() -> Optional[dict]:
def persist_raw_datahub_config(config: dict) -> None:
with open(DATAHUB_CONFIG_PATH, "w+") as outfile:
yaml.dump(config, outfile, default_flow_style=False)
return None


def get_raw_client_config() -> Optional[dict]:
with open(DATAHUB_CONFIG_PATH) as stream:
try:
return yaml.safe_load(stream)
except yaml.YAMLError as exc:
click.secho(f"{DATAHUB_CONFIG_PATH} malformed, error: {exc}", bold=True)
return None


class DatahubConfig(BaseModel):
gms: DatahubClientConfig


def _get_config_from_env() -> Tuple[Optional[str], Optional[str]]:
host = os.environ.get(ENV_METADATA_HOST)
port = os.environ.get(ENV_METADATA_PORT)
token = os.environ.get(ENV_METADATA_TOKEN)
protocol = os.environ.get(ENV_METADATA_PROTOCOL, "http")
url = os.environ.get(ENV_METADATA_HOST_URL)
if port is not None:
url = f"{protocol}://{host}:{port}"
return url, token
# The reason for using host as URL is backward compatibility
# If port is not being used we assume someone is using host env var as URL
if url is None and host is not None:
logger.warning(
f"Do not use {ENV_METADATA_HOST} as URL. Use {ENV_METADATA_HOST_URL} instead"
)
return url or host, token


def load_client_config() -> DatahubClientConfig:
gms_host_env, gms_token_env = _get_config_from_env()
if gms_host_env:
# TODO We should also load system auth credentials here.
return DatahubClientConfig(server=gms_host_env, token=gms_token_env)

if _should_skip_config():
raise MissingConfigError(
"You have set the skip config flag, but no GMS host or token was provided in env variables."
)

try:
_ensure_datahub_config()
client_config_dict = get_raw_client_config()
datahub_config: DatahubClientConfig = DatahubConfig.parse_obj(
client_config_dict
).gms

return datahub_config
except ValidationError as e:
click.echo(f"Error loading your {CONDENSED_DATAHUB_CONFIG_PATH}")
click.echo(e, err=True)
sys.exit(1)


def _ensure_datahub_config() -> None:
if not os.path.isfile(DATAHUB_CONFIG_PATH):
raise MissingConfigError(
f"No {CONDENSED_DATAHUB_CONFIG_PATH} file found, and no configuration was found in environment variables. "
f"Run `datahub init` to create a {CONDENSED_DATAHUB_CONFIG_PATH} file."
)


def write_gms_config(
host: str, token: Optional[str], merge_with_previous: bool = True
) -> None:
config = DatahubConfig(gms=DatahubClientConfig(server=host, token=token))
if merge_with_previous:
try:
previous_config = get_raw_client_config()
assert isinstance(previous_config, dict)
except Exception as e:
# ok to fail on this
previous_config = {}
logger.debug(
f"Failed to retrieve config from file {DATAHUB_CONFIG_PATH}: {e}. This isn't fatal."
)
config_dict = {**previous_config, **config.dict()}
else:
config_dict = config.dict()
persist_raw_datahub_config(config_dict)
12 changes: 6 additions & 6 deletions metadata-ingestion/src/datahub/cli/lite_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@

from datahub.cli.config_utils import (
DATAHUB_ROOT_FOLDER,
get_client_config,
persist_datahub_config,
DatahubConfig,
get_raw_client_config,
persist_raw_datahub_config,
)
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope
from datahub.ingestion.api.sink import NoopWriteCallback
from datahub.ingestion.graph.client import DatahubConfig
from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.sink.file import FileSink, FileSinkConfig
from datahub.lite.duckdb_lite_config import DuckDBLiteConfig
Expand Down Expand Up @@ -45,7 +45,7 @@ class LiteCliConfig(DatahubConfig):


def get_lite_config() -> LiteLocalConfig:
client_config_dict = get_client_config()
client_config_dict = get_raw_client_config()
lite_config = LiteCliConfig.parse_obj(client_config_dict)
return lite_config.lite

Expand Down Expand Up @@ -309,10 +309,10 @@ def search(


def write_lite_config(lite_config: LiteLocalConfig) -> None:
cli_config = get_client_config()
cli_config = get_raw_client_config()
assert isinstance(cli_config, dict)
cli_config["lite"] = lite_config.dict()
persist_datahub_config(cli_config)
persist_raw_datahub_config(cli_config)


@lite.command(context_settings=dict(allow_extra_args=True))
Expand Down
14 changes: 8 additions & 6 deletions metadata-ingestion/src/datahub/emitter/rest_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,8 @@
from requests.adapters import HTTPAdapter, Retry
from requests.exceptions import HTTPError, RequestException

from datahub.cli.cli_utils import (
ensure_has_system_metadata,
fixup_gms_url,
get_system_auth,
)
from datahub.cli import config_utils
from datahub.cli.cli_utils import ensure_has_system_metadata, fixup_gms_url
from datahub.configuration.common import ConfigurationError, OperationalError
from datahub.emitter.generic_emitter import Emitter
from datahub.emitter.mcp import MetadataChangeProposalWrapper
Expand Down Expand Up @@ -91,7 +88,12 @@ def __init__(
if token:
self._session.headers.update({"Authorization": f"Bearer {token}"})
else:
system_auth = get_system_auth()
# HACK: When no token is provided but system auth env variables are set, we use them.
# Ideally this should simply get passed in as config, instead of being sneakily injected
# in as part of this constructor.
# It works because everything goes through here. The DatahubGraph inherits from the
# rest emitter, and the rest sink uses the rest emitter under the hood.
system_auth = config_utils.get_system_auth()
if system_auth is not None:
self._session.headers.update({"Authorization": system_auth})

Expand Down
Loading

0 comments on commit 25af9f0

Please sign in to comment.