From 4ec3208918791b517a6d18c41905ee2dbe189a12 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 4 Dec 2023 14:31:58 -0500 Subject: [PATCH] feat(ingest): support stdin in `datahub put` (#9359) --- .../src/datahub/cli/ingest_cli.py | 4 +++ metadata-ingestion/src/datahub/cli/put_cli.py | 27 ++++++++++--------- .../src/datahub/cli/specific/file_loader.py | 1 + .../datahub/configuration/config_loader.py | 22 ++++++++++----- .../src/datahub/configuration/json_loader.py | 11 ++++++++ .../source/metadata/business_glossary.py | 2 +- .../ingestion/source/metadata/lineage.py | 2 +- .../tests/unit/config/test_config_loader.py | 9 +++++-- 8 files changed, 55 insertions(+), 23 deletions(-) create mode 100644 metadata-ingestion/src/datahub/configuration/json_loader.py diff --git a/metadata-ingestion/src/datahub/cli/ingest_cli.py b/metadata-ingestion/src/datahub/cli/ingest_cli.py index dd0287004a3686..b7827ec9f050b4 100644 --- a/metadata-ingestion/src/datahub/cli/ingest_cli.py +++ b/metadata-ingestion/src/datahub/cli/ingest_cli.py @@ -147,6 +147,9 @@ async def run_pipeline_to_completion(pipeline: Pipeline) -> int: squirrel_original_config=True, squirrel_field="__raw_config", allow_stdin=True, + allow_remote=True, + process_directives=True, + resolve_env_vars=True, ) raw_pipeline_config = pipeline_config.pop("__raw_config") @@ -268,6 +271,7 @@ def deploy( pipeline_config = load_config_file( config, allow_stdin=True, + allow_remote=True, resolve_env_vars=False, ) diff --git a/metadata-ingestion/src/datahub/cli/put_cli.py b/metadata-ingestion/src/datahub/cli/put_cli.py index 6a1d82388dc2a1..324d7f94db258e 100644 --- a/metadata-ingestion/src/datahub/cli/put_cli.py +++ b/metadata-ingestion/src/datahub/cli/put_cli.py @@ -1,11 +1,11 @@ -import json import logging -from typing import Any, Optional +from typing import Optional import click from click_default_group import DefaultGroup from datahub.cli.cli_utils import post_entity +from datahub.configuration.config_loader import load_config_file from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.graph.client import get_default_graph from datahub.metadata.schema_classes import ( @@ -36,22 +36,23 @@ def put() -> None: @click.option("--urn", required=True, type=str) @click.option("-a", "--aspect", required=True, type=str) @click.option("-d", "--aspect-data", required=True, type=str) -@click.pass_context @upgrade.check_upgrade @telemetry.with_telemetry() -def aspect(ctx: Any, urn: str, aspect: str, aspect_data: str) -> None: +def aspect(urn: str, aspect: str, aspect_data: str) -> None: """Update a single aspect of an entity""" entity_type = guess_entity_type(urn) - with open(aspect_data) as fp: - aspect_obj = json.load(fp) - status = post_entity( - urn=urn, - aspect_name=aspect, - entity_type=entity_type, - aspect_value=aspect_obj, - ) - click.secho(f"Update succeeded with status {status}", fg="green") + aspect_obj = load_config_file( + aspect_data, allow_stdin=True, resolve_env_vars=False, process_directives=False + ) + + status = post_entity( + urn=urn, + aspect_name=aspect, + entity_type=entity_type, + aspect_value=aspect_obj, + ) + click.secho(f"Update succeeded with status {status}", fg="green") @put.command() diff --git a/metadata-ingestion/src/datahub/cli/specific/file_loader.py b/metadata-ingestion/src/datahub/cli/specific/file_loader.py index a9787343fdb911..cad32eb0a22a18 100644 --- a/metadata-ingestion/src/datahub/cli/specific/file_loader.py +++ b/metadata-ingestion/src/datahub/cli/specific/file_loader.py @@ -21,5 +21,6 @@ def load_file(config_file: Path) -> Union[dict, list]: squirrel_original_config=False, resolve_env_vars=False, allow_stdin=False, + process_directives=False, ) return res diff --git a/metadata-ingestion/src/datahub/configuration/config_loader.py b/metadata-ingestion/src/datahub/configuration/config_loader.py index 30ca4ff6aed2d1..2f41af6f7286e6 100644 --- a/metadata-ingestion/src/datahub/configuration/config_loader.py +++ b/metadata-ingestion/src/datahub/configuration/config_loader.py @@ -11,6 +11,7 @@ from expandvars import UnboundVariable, expandvars from datahub.configuration.common import ConfigurationError, ConfigurationMechanism +from datahub.configuration.json_loader import JsonConfigurationMechanism from datahub.configuration.toml import TomlConfigurationMechanism from datahub.configuration.yaml import YamlConfigurationMechanism @@ -100,33 +101,42 @@ def load_config_file( squirrel_original_config: bool = False, squirrel_field: str = "__orig_config", allow_stdin: bool = False, - resolve_env_vars: bool = True, - process_directives: bool = True, + allow_remote: bool = True, # TODO: Change the default to False. + resolve_env_vars: bool = True, # TODO: Change the default to False. + process_directives: bool = False, ) -> dict: config_mech: ConfigurationMechanism if allow_stdin and config_file == "-": # If we're reading from stdin, we assume that the input is a YAML file. + # Note that YAML is a superset of JSON, so this will also read JSON files. config_mech = YamlConfigurationMechanism() raw_config_file = sys.stdin.read() else: config_file_path = pathlib.Path(config_file) if config_file_path.suffix in {".yaml", ".yml"}: config_mech = YamlConfigurationMechanism() + elif config_file_path.suffix == ".json": + config_mech = JsonConfigurationMechanism() elif config_file_path.suffix == ".toml": config_mech = TomlConfigurationMechanism() else: raise ConfigurationError( - f"Only .toml and .yml are supported. Cannot process file type {config_file_path.suffix}" + f"Only .toml, .yml, and .json are supported. Cannot process file type {config_file_path.suffix}" ) + url_parsed = parse.urlparse(str(config_file)) - if url_parsed.scheme in ("http", "https"): # URLs will return http/https + if allow_remote and url_parsed.scheme in ( + "http", + "https", + ): # URLs will return http/https + # If the URL is remote, we need to fetch it. try: response = requests.get(str(config_file)) raw_config_file = response.text except Exception as e: raise ConfigurationError( - f"Cannot read remote file {config_file_path}, error:{e}" - ) + f"Cannot read remote file {config_file_path}: {e}" + ) from e else: if not config_file_path.is_file(): raise ConfigurationError(f"Cannot open config file {config_file_path}") diff --git a/metadata-ingestion/src/datahub/configuration/json_loader.py b/metadata-ingestion/src/datahub/configuration/json_loader.py new file mode 100644 index 00000000000000..35667eb5951fc7 --- /dev/null +++ b/metadata-ingestion/src/datahub/configuration/json_loader.py @@ -0,0 +1,11 @@ +import json +from typing import IO + +from datahub.configuration import ConfigurationMechanism + + +class JsonConfigurationMechanism(ConfigurationMechanism): + """Ability to load configuration from json files""" + + def load_config(self, config_fp: IO) -> dict: + return json.load(config_fp) diff --git a/metadata-ingestion/src/datahub/ingestion/source/metadata/business_glossary.py b/metadata-ingestion/src/datahub/ingestion/source/metadata/business_glossary.py index 97877df63707f5..6baa70aa581d62 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/metadata/business_glossary.py +++ b/metadata-ingestion/src/datahub/ingestion/source/metadata/business_glossary.py @@ -495,7 +495,7 @@ def create(cls, config_dict, ctx): def load_glossary_config( cls, file_name: Union[str, pathlib.Path] ) -> BusinessGlossaryConfig: - config = load_config_file(file_name) + config = load_config_file(file_name, resolve_env_vars=True) glossary_cfg = BusinessGlossaryConfig.parse_obj(config) return glossary_cfg diff --git a/metadata-ingestion/src/datahub/ingestion/source/metadata/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/metadata/lineage.py index f33c6e0edae3dc..659444fe610e03 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/metadata/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/metadata/lineage.py @@ -147,7 +147,7 @@ def create( @staticmethod def load_lineage_config(file_name: str) -> LineageConfig: - config = load_config_file(file_name) + config = load_config_file(file_name, resolve_env_vars=True) lineage_config = LineageConfig.parse_obj(config) return lineage_config diff --git a/metadata-ingestion/tests/unit/config/test_config_loader.py b/metadata-ingestion/tests/unit/config/test_config_loader.py index 3253c96b876aa9..f9a4076e18363d 100644 --- a/metadata-ingestion/tests/unit/config/test_config_loader.py +++ b/metadata-ingestion/tests/unit/config/test_config_loader.py @@ -134,7 +134,7 @@ def test_load_success(pytestconfig, filename, golden_config, env, referenced_env assert list_referenced_env_variables(raw_config) == referenced_env_vars with mock.patch.dict(os.environ, env): - loaded_config = load_config_file(filepath) + loaded_config = load_config_file(filepath, resolve_env_vars=True) assert loaded_config == golden_config # TODO check referenced env vars @@ -183,7 +183,12 @@ def test_write_file_directive(pytestconfig): fake_ssl_key = "my-secret-key-value" with mock.patch.dict(os.environ, {"DATAHUB_SSL_KEY": fake_ssl_key}): - loaded_config = load_config_file(filepath, squirrel_original_config=False) + loaded_config = load_config_file( + filepath, + squirrel_original_config=False, + resolve_env_vars=True, + process_directives=True, + ) # Check that the rest of the dict is unmodified. diff = deepdiff.DeepDiff(