diff --git a/python_modules/dagster/dagster/_core/definitions/asset_spec.py b/python_modules/dagster/dagster/_core/definitions/asset_spec.py index b2e52fe8bf5d9..1a64cc22ea090 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_spec.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_spec.py @@ -177,6 +177,7 @@ def __new__( **kwargs, ): from dagster._core.definitions.asset_dep import coerce_to_deps_and_check_duplicates + from dagster._core.definitions.metadata.metadata_set import validate_metadata_values only_allow_hidden_params_in_kwargs(AssetSpec, kwargs) @@ -199,12 +200,15 @@ def __new__( } validate_kind_tags(kind_tags) + metadata = check.opt_mapping_param(metadata, "metadata", key_type=str) + validate_metadata_values(metadata) + return super().__new__( cls, key=key, deps=asset_deps, description=check.opt_str_param(description, "description"), - metadata=check.opt_mapping_param(metadata, "metadata", key_type=str), + metadata=metadata, skippable=check.bool_param(skippable, "skippable"), group_name=check.opt_str_param(group_name, "group_name"), code_version=check.opt_str_param(code_version, "code_version"), diff --git a/python_modules/dagster/dagster/_core/definitions/metadata/metadata_set.py b/python_modules/dagster/dagster/_core/definitions/metadata/metadata_set.py index 79baf830ac2dd..1a031e1bc3527 100644 --- a/python_modules/dagster/dagster/_core/definitions/metadata/metadata_set.py +++ b/python_modules/dagster/dagster/_core/definitions/metadata/metadata_set.py @@ -1,6 +1,6 @@ from abc import ABC, abstractmethod from functools import lru_cache -from typing import AbstractSet, Any, Iterable, Mapping, Optional, Type +from typing import TYPE_CHECKING, AbstractSet, Any, Iterable, Mapping, Optional, Type from typing_extensions import TypeVar @@ -11,11 +11,15 @@ TableColumnConstraints as TableColumnConstraints, TableColumnLineage, TableSchema, + TableSchemaMetadataValue, ) from dagster._model import DagsterModel from dagster._model.pydantic_compat_layer import model_fields from dagster._utils.typing_api import flatten_unions +if TYPE_CHECKING: + from dagster._core.definitions.metadata import RawMetadataMapping + # Python types that have a MetadataValue types that directly wraps them DIRECTLY_WRAPPED_METADATA_TYPES = { str, @@ -197,6 +201,14 @@ def current_key_by_legacy_key(cls) -> Mapping[str, str]: return {"relation_identifier": "table_name"} +METADATA_KEY_PREFIX = f"{TableMetadataSet.namespace()}/" +COLUMN_SCHEMA_METADATA_KEY = f"{METADATA_KEY_PREFIX}column_schema" +TABLE_NAME_METADATA_KEY = f"{METADATA_KEY_PREFIX}table_name" +ROW_COUNT_METADATA_KEY = f"{METADATA_KEY_PREFIX}row_count" +PARTITION_ROW_COUNT_METADATA_KEY = f"{METADATA_KEY_PREFIX}partition_row_count" +COLUMN_LINEAGE_METADATA_KEY = f"{METADATA_KEY_PREFIX}column_lineage" + + class UriMetadataSet(NamespacedMetadataSet): """Metadata entry which supplies a URI address for an asset. For example, the S3 address of a file or bucket. @@ -210,3 +222,41 @@ class UriMetadataSet(NamespacedMetadataSet): @classmethod def namespace(cls) -> str: return "dagster" + + +def validate_metadata_values(metadata: "RawMetadataMapping") -> None: + if metadata.get(COLUMN_SCHEMA_METADATA_KEY): + value = metadata[COLUMN_SCHEMA_METADATA_KEY] + check.invariant( + isinstance(value, TableSchema) or isinstance(value, TableSchemaMetadataValue), + f"Value for metadata key '{COLUMN_SCHEMA_METADATA_KEY}' must be a TableSchema " + f"or TableSchemaMetadataValue, received {type(value).__name__}", + ) + if metadata.get(TABLE_NAME_METADATA_KEY): + value = metadata[TABLE_NAME_METADATA_KEY] + check.invariant( + isinstance(value, str), + f"Value for metadata key '{TABLE_NAME_METADATA_KEY}' must be a string, received " + f"{type(value).__name__}", + ) + if metadata.get(ROW_COUNT_METADATA_KEY): + value = metadata[ROW_COUNT_METADATA_KEY] + check.invariant( + isinstance(value, int), + f"Value for metadata key '{ROW_COUNT_METADATA_KEY}' must be an int, received " + f"{type(value).__name__}", + ) + if metadata.get(PARTITION_ROW_COUNT_METADATA_KEY): + value = metadata[PARTITION_ROW_COUNT_METADATA_KEY] + check.invariant( + isinstance(value, int), + f"Value for metadata key '{PARTITION_ROW_COUNT_METADATA_KEY}' must be an int, received " + f"{type(value).__name__}", + ) + if metadata.get(COLUMN_LINEAGE_METADATA_KEY): + value = metadata[COLUMN_LINEAGE_METADATA_KEY] + check.invariant( + isinstance(value, TableColumnLineage), + f"Value for metadata key '{COLUMN_LINEAGE_METADATA_KEY}' must be a TableColumnLineage, " + f"received {type(value).__name__}", + ) diff --git a/python_modules/dagster/dagster/_core/definitions/result.py b/python_modules/dagster/dagster/_core/definitions/result.py index 9ddae017d8f96..cb1a799b13be8 100644 --- a/python_modules/dagster/dagster/_core/definitions/result.py +++ b/python_modules/dagster/dagster/_core/definitions/result.py @@ -6,6 +6,7 @@ from dagster._core.definitions.data_version import DataVersion from dagster._core.definitions.events import AssetKey, CoercibleToAssetKey from dagster._core.definitions.metadata import RawMetadataMapping +from dagster._core.definitions.metadata.metadata_set import validate_metadata_values class AssetResult( @@ -35,14 +36,19 @@ def __new__( asset_key = AssetKey.from_coercible(asset_key) if asset_key else None + metadata = check.opt_nullable_mapping_param( + metadata, + "metadata", + key_type=str, + ) + + if metadata: + validate_metadata_values(metadata) + return super().__new__( cls, asset_key=asset_key, - metadata=check.opt_nullable_mapping_param( - metadata, - "metadata", - key_type=str, - ), + metadata=metadata, check_results=check.opt_sequence_param( check_results, "check_results", of_type=AssetCheckResult ), diff --git a/python_modules/dagster/dagster_tests/definitions_tests/metadata_tests/test_table_metadata_set.py b/python_modules/dagster/dagster_tests/definitions_tests/metadata_tests/test_table_metadata_set.py index 65f6e49b1058a..727648fb3647d 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/metadata_tests/test_table_metadata_set.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/metadata_tests/test_table_metadata_set.py @@ -1,5 +1,13 @@ import pytest -from dagster import AssetKey, AssetMaterialization, TableColumn, TableSchema +from dagster import ( + AssetKey, + AssetMaterialization, + Definitions, + MaterializeResult, + TableColumn, + TableSchema, + asset, +) from dagster._check import CheckError from dagster._core.definitions.metadata import TableMetadataSet from dagster._core.definitions.metadata.table import TableColumnDep, TableColumnLineage @@ -122,3 +130,27 @@ def test_column_lineage() -> None: materialization = AssetMaterialization(asset_key="foo", metadata=splat_table_metadata) extracted_table_metadata = TableMetadataSet.extract(materialization.metadata) assert extracted_table_metadata.column_lineage == expected_column_lineage + + +def test_error_on_invalid_reserved_metadata_value(): + with pytest.raises( + CheckError, match="Value for metadata key 'dagster/column_schema' must be a TableSchema" + ): + + @asset( + metadata={ + "dagster/column_schema": "invalid", + } + ) + def foo(): + pass + + @asset() + def bar(): + yield MaterializeResult(metadata={"dagster/column_schema": "invalid"}) + + defs = Definitions(assets=[bar]) + with pytest.raises( + CheckError, match="Value for metadata key 'dagster/column_schema' must be a TableSchema" + ): + defs.get_implicit_global_asset_job_def().execute_in_process()