Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate reserved metadata value types #26746

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ def __new__(
**kwargs,
):
from dagster._core.definitions.asset_dep import coerce_to_deps_and_check_duplicates
from dagster._core.definitions.metadata.metadata_set import validate_metadata_values

only_allow_hidden_params_in_kwargs(AssetSpec, kwargs)

Expand All @@ -199,12 +200,15 @@ def __new__(
}
validate_kind_tags(kind_tags)

metadata = check.opt_mapping_param(metadata, "metadata", key_type=str)
validate_metadata_values(metadata)

return super().__new__(
cls,
key=key,
deps=asset_deps,
description=check.opt_str_param(description, "description"),
metadata=check.opt_mapping_param(metadata, "metadata", key_type=str),
metadata=metadata,
skippable=check.bool_param(skippable, "skippable"),
group_name=check.opt_str_param(group_name, "group_name"),
code_version=check.opt_str_param(code_version, "code_version"),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from abc import ABC, abstractmethod
from functools import lru_cache
from typing import AbstractSet, Any, Iterable, Mapping, Optional, Type
from typing import TYPE_CHECKING, AbstractSet, Any, Iterable, Mapping, Optional, Type

from typing_extensions import TypeVar

Expand All @@ -11,11 +11,15 @@
TableColumnConstraints as TableColumnConstraints,
TableColumnLineage,
TableSchema,
TableSchemaMetadataValue,
)
from dagster._model import DagsterModel
from dagster._model.pydantic_compat_layer import model_fields
from dagster._utils.typing_api import flatten_unions

if TYPE_CHECKING:
from dagster._core.definitions.metadata import RawMetadataMapping

# Python types that have a MetadataValue types that directly wraps them
DIRECTLY_WRAPPED_METADATA_TYPES = {
str,
Expand Down Expand Up @@ -197,6 +201,14 @@ def current_key_by_legacy_key(cls) -> Mapping[str, str]:
return {"relation_identifier": "table_name"}


METADATA_KEY_PREFIX = f"{TableMetadataSet.namespace()}/"
COLUMN_SCHEMA_METADATA_KEY = f"{METADATA_KEY_PREFIX}column_schema"
TABLE_NAME_METADATA_KEY = f"{METADATA_KEY_PREFIX}table_name"
ROW_COUNT_METADATA_KEY = f"{METADATA_KEY_PREFIX}row_count"
PARTITION_ROW_COUNT_METADATA_KEY = f"{METADATA_KEY_PREFIX}partition_row_count"
COLUMN_LINEAGE_METADATA_KEY = f"{METADATA_KEY_PREFIX}column_lineage"


class UriMetadataSet(NamespacedMetadataSet):
"""Metadata entry which supplies a URI address for an asset.
For example, the S3 address of a file or bucket.
Expand All @@ -210,3 +222,41 @@ class UriMetadataSet(NamespacedMetadataSet):
@classmethod
def namespace(cls) -> str:
return "dagster"


def validate_metadata_values(metadata: "RawMetadataMapping") -> None:
if metadata.get(COLUMN_SCHEMA_METADATA_KEY):
value = metadata[COLUMN_SCHEMA_METADATA_KEY]
check.invariant(
isinstance(value, TableSchema) or isinstance(value, TableSchemaMetadataValue),
f"Value for metadata key '{COLUMN_SCHEMA_METADATA_KEY}' must be a TableSchema "
f"or TableSchemaMetadataValue, received {type(value).__name__}",
)
if metadata.get(TABLE_NAME_METADATA_KEY):
value = metadata[TABLE_NAME_METADATA_KEY]
check.invariant(
isinstance(value, str),
f"Value for metadata key '{TABLE_NAME_METADATA_KEY}' must be a string, received "
f"{type(value).__name__}",
)
if metadata.get(ROW_COUNT_METADATA_KEY):
value = metadata[ROW_COUNT_METADATA_KEY]
check.invariant(
isinstance(value, int),
f"Value for metadata key '{ROW_COUNT_METADATA_KEY}' must be an int, received "
f"{type(value).__name__}",
)
if metadata.get(PARTITION_ROW_COUNT_METADATA_KEY):
value = metadata[PARTITION_ROW_COUNT_METADATA_KEY]
check.invariant(
isinstance(value, int),
f"Value for metadata key '{PARTITION_ROW_COUNT_METADATA_KEY}' must be an int, received "
f"{type(value).__name__}",
)
if metadata.get(COLUMN_LINEAGE_METADATA_KEY):
value = metadata[COLUMN_LINEAGE_METADATA_KEY]
check.invariant(
isinstance(value, TableColumnLineage),
f"Value for metadata key '{COLUMN_LINEAGE_METADATA_KEY}' must be a TableColumnLineage, "
f"received {type(value).__name__}",
)
16 changes: 11 additions & 5 deletions python_modules/dagster/dagster/_core/definitions/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from dagster._core.definitions.data_version import DataVersion
from dagster._core.definitions.events import AssetKey, CoercibleToAssetKey
from dagster._core.definitions.metadata import RawMetadataMapping
from dagster._core.definitions.metadata.metadata_set import validate_metadata_values


class AssetResult(
Expand Down Expand Up @@ -35,14 +36,19 @@ def __new__(

asset_key = AssetKey.from_coercible(asset_key) if asset_key else None

metadata = check.opt_nullable_mapping_param(
metadata,
"metadata",
key_type=str,
)

if metadata:
validate_metadata_values(metadata)

return super().__new__(
cls,
asset_key=asset_key,
metadata=check.opt_nullable_mapping_param(
metadata,
"metadata",
key_type=str,
),
metadata=metadata,
check_results=check.opt_sequence_param(
check_results, "check_results", of_type=AssetCheckResult
),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
import pytest
from dagster import AssetKey, AssetMaterialization, TableColumn, TableSchema
from dagster import (
AssetKey,
AssetMaterialization,
Definitions,
MaterializeResult,
TableColumn,
TableSchema,
asset,
)
from dagster._check import CheckError
from dagster._core.definitions.metadata import TableMetadataSet
from dagster._core.definitions.metadata.table import TableColumnDep, TableColumnLineage
Expand Down Expand Up @@ -122,3 +130,27 @@ def test_column_lineage() -> None:
materialization = AssetMaterialization(asset_key="foo", metadata=splat_table_metadata)
extracted_table_metadata = TableMetadataSet.extract(materialization.metadata)
assert extracted_table_metadata.column_lineage == expected_column_lineage


def test_error_on_invalid_reserved_metadata_value():
with pytest.raises(
CheckError, match="Value for metadata key 'dagster/column_schema' must be a TableSchema"
):

@asset(
metadata={
"dagster/column_schema": "invalid",
}
)
def foo():
pass

@asset()
def bar():
yield MaterializeResult(metadata={"dagster/column_schema": "invalid"})

defs = Definitions(assets=[bar])
with pytest.raises(
CheckError, match="Value for metadata key 'dagster/column_schema' must be a TableSchema"
):
defs.get_implicit_global_asset_job_def().execute_in_process()
Comment on lines +135 to +156
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we include tests that cover the other values we added validation for?