Skip to content

Commit

Permalink
feat(ingest): support stdin in datahub put (datahub-project#9359)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Dec 4, 2023
1 parent c0ef728 commit 4ec3208
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 23 deletions.
4 changes: 4 additions & 0 deletions metadata-ingestion/src/datahub/cli/ingest_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ async def run_pipeline_to_completion(pipeline: Pipeline) -> int:
squirrel_original_config=True,
squirrel_field="__raw_config",
allow_stdin=True,
allow_remote=True,
process_directives=True,
resolve_env_vars=True,
)
raw_pipeline_config = pipeline_config.pop("__raw_config")

Expand Down Expand Up @@ -268,6 +271,7 @@ def deploy(
pipeline_config = load_config_file(
config,
allow_stdin=True,
allow_remote=True,
resolve_env_vars=False,
)

Expand Down
27 changes: 14 additions & 13 deletions metadata-ingestion/src/datahub/cli/put_cli.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import json
import logging
from typing import Any, Optional
from typing import Optional

import click
from click_default_group import DefaultGroup

from datahub.cli.cli_utils import post_entity
from datahub.configuration.config_loader import load_config_file
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.graph.client import get_default_graph
from datahub.metadata.schema_classes import (
Expand Down Expand Up @@ -36,22 +36,23 @@ def put() -> None:
@click.option("--urn", required=True, type=str)
@click.option("-a", "--aspect", required=True, type=str)
@click.option("-d", "--aspect-data", required=True, type=str)
@click.pass_context
@upgrade.check_upgrade
@telemetry.with_telemetry()
def aspect(ctx: Any, urn: str, aspect: str, aspect_data: str) -> None:
def aspect(urn: str, aspect: str, aspect_data: str) -> None:
"""Update a single aspect of an entity"""

entity_type = guess_entity_type(urn)
with open(aspect_data) as fp:
aspect_obj = json.load(fp)
status = post_entity(
urn=urn,
aspect_name=aspect,
entity_type=entity_type,
aspect_value=aspect_obj,
)
click.secho(f"Update succeeded with status {status}", fg="green")
aspect_obj = load_config_file(
aspect_data, allow_stdin=True, resolve_env_vars=False, process_directives=False
)

status = post_entity(
urn=urn,
aspect_name=aspect,
entity_type=entity_type,
aspect_value=aspect_obj,
)
click.secho(f"Update succeeded with status {status}", fg="green")


@put.command()
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/src/datahub/cli/specific/file_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ def load_file(config_file: Path) -> Union[dict, list]:
squirrel_original_config=False,
resolve_env_vars=False,
allow_stdin=False,
process_directives=False,
)
return res
22 changes: 16 additions & 6 deletions metadata-ingestion/src/datahub/configuration/config_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from expandvars import UnboundVariable, expandvars

from datahub.configuration.common import ConfigurationError, ConfigurationMechanism
from datahub.configuration.json_loader import JsonConfigurationMechanism
from datahub.configuration.toml import TomlConfigurationMechanism
from datahub.configuration.yaml import YamlConfigurationMechanism

Expand Down Expand Up @@ -100,33 +101,42 @@ def load_config_file(
squirrel_original_config: bool = False,
squirrel_field: str = "__orig_config",
allow_stdin: bool = False,
resolve_env_vars: bool = True,
process_directives: bool = True,
allow_remote: bool = True, # TODO: Change the default to False.
resolve_env_vars: bool = True, # TODO: Change the default to False.
process_directives: bool = False,
) -> dict:
config_mech: ConfigurationMechanism
if allow_stdin and config_file == "-":
# If we're reading from stdin, we assume that the input is a YAML file.
# Note that YAML is a superset of JSON, so this will also read JSON files.
config_mech = YamlConfigurationMechanism()
raw_config_file = sys.stdin.read()
else:
config_file_path = pathlib.Path(config_file)
if config_file_path.suffix in {".yaml", ".yml"}:
config_mech = YamlConfigurationMechanism()
elif config_file_path.suffix == ".json":
config_mech = JsonConfigurationMechanism()
elif config_file_path.suffix == ".toml":
config_mech = TomlConfigurationMechanism()
else:
raise ConfigurationError(
f"Only .toml and .yml are supported. Cannot process file type {config_file_path.suffix}"
f"Only .toml, .yml, and .json are supported. Cannot process file type {config_file_path.suffix}"
)

url_parsed = parse.urlparse(str(config_file))
if url_parsed.scheme in ("http", "https"): # URLs will return http/https
if allow_remote and url_parsed.scheme in (
"http",
"https",
): # URLs will return http/https
# If the URL is remote, we need to fetch it.
try:
response = requests.get(str(config_file))
raw_config_file = response.text
except Exception as e:
raise ConfigurationError(
f"Cannot read remote file {config_file_path}, error:{e}"
)
f"Cannot read remote file {config_file_path}: {e}"
) from e
else:
if not config_file_path.is_file():
raise ConfigurationError(f"Cannot open config file {config_file_path}")
Expand Down
11 changes: 11 additions & 0 deletions metadata-ingestion/src/datahub/configuration/json_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import json
from typing import IO

from datahub.configuration import ConfigurationMechanism


class JsonConfigurationMechanism(ConfigurationMechanism):
"""Ability to load configuration from json files"""

def load_config(self, config_fp: IO) -> dict:
return json.load(config_fp)
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ def create(cls, config_dict, ctx):
def load_glossary_config(
cls, file_name: Union[str, pathlib.Path]
) -> BusinessGlossaryConfig:
config = load_config_file(file_name)
config = load_config_file(file_name, resolve_env_vars=True)
glossary_cfg = BusinessGlossaryConfig.parse_obj(config)
return glossary_cfg

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def create(

@staticmethod
def load_lineage_config(file_name: str) -> LineageConfig:
config = load_config_file(file_name)
config = load_config_file(file_name, resolve_env_vars=True)
lineage_config = LineageConfig.parse_obj(config)
return lineage_config

Expand Down
9 changes: 7 additions & 2 deletions metadata-ingestion/tests/unit/config/test_config_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def test_load_success(pytestconfig, filename, golden_config, env, referenced_env
assert list_referenced_env_variables(raw_config) == referenced_env_vars

with mock.patch.dict(os.environ, env):
loaded_config = load_config_file(filepath)
loaded_config = load_config_file(filepath, resolve_env_vars=True)
assert loaded_config == golden_config

# TODO check referenced env vars
Expand Down Expand Up @@ -183,7 +183,12 @@ def test_write_file_directive(pytestconfig):
fake_ssl_key = "my-secret-key-value"

with mock.patch.dict(os.environ, {"DATAHUB_SSL_KEY": fake_ssl_key}):
loaded_config = load_config_file(filepath, squirrel_original_config=False)
loaded_config = load_config_file(
filepath,
squirrel_original_config=False,
resolve_env_vars=True,
process_directives=True,
)

# Check that the rest of the dict is unmodified.
diff = deepdiff.DeepDiff(
Expand Down

0 comments on commit 4ec3208

Please sign in to comment.