Skip to content

Commit

Permalink
claire/validate-reserved-metadata-types
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed Dec 27, 2024
1 parent 3bbf3af commit 4755709
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ def __new__(
**kwargs,
):
from dagster._core.definitions.asset_dep import coerce_to_deps_and_check_duplicates
from dagster._core.definitions.metadata.metadata_set import validate_metadata_values

only_allow_hidden_params_in_kwargs(AssetSpec, kwargs)

Expand All @@ -199,12 +200,15 @@ def __new__(
}
validate_kind_tags(kind_tags)

metadata = check.opt_mapping_param(metadata, "metadata", key_type=str)
validate_metadata_values(metadata)

return super().__new__(
cls,
key=key,
deps=asset_deps,
description=check.opt_str_param(description, "description"),
metadata=check.opt_mapping_param(metadata, "metadata", key_type=str),
metadata=metadata,
skippable=check.bool_param(skippable, "skippable"),
group_name=check.opt_str_param(group_name, "group_name"),
code_version=check.opt_str_param(code_version, "code_version"),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from abc import ABC, abstractmethod
from functools import lru_cache
from typing import AbstractSet, Any, Iterable, Mapping, Optional, Type
from typing import TYPE_CHECKING, AbstractSet, Any, Iterable, Mapping, Optional, Type

from typing_extensions import TypeVar

Expand All @@ -11,11 +11,15 @@
TableColumnConstraints as TableColumnConstraints,
TableColumnLineage,
TableSchema,
TableSchemaMetadataValue,
)
from dagster._model import DagsterModel
from dagster._model.pydantic_compat_layer import model_fields
from dagster._utils.typing_api import flatten_unions

if TYPE_CHECKING:
from dagster._core.definitions.metadata import RawMetadataMapping

# Python types that have a MetadataValue types that directly wraps them
DIRECTLY_WRAPPED_METADATA_TYPES = {
str,
Expand Down Expand Up @@ -197,6 +201,13 @@ def current_key_by_legacy_key(cls) -> Mapping[str, str]:
return {"relation_identifier": "table_name"}


METADATA_KEY_PREFIX = f"{TableMetadataSet.namespace()}/"
COLUMN_SCHEMA_METADATA_KEY = f"{METADATA_KEY_PREFIX}column_schema"
TABLE_NAME_METADATA_KEY = f"{METADATA_KEY_PREFIX}table_name"
ROW_COUNT_METADATA_KEY = f"{METADATA_KEY_PREFIX}row_count"
COLUMN_LINEAGE_METADATA_KEY = f"{METADATA_KEY_PREFIX}column_lineage"


class UriMetadataSet(NamespacedMetadataSet):
"""Metadata entry which supplies a URI address for an asset.
For example, the S3 address of a file or bucket.
Expand All @@ -210,3 +221,34 @@ class UriMetadataSet(NamespacedMetadataSet):
@classmethod
def namespace(cls) -> str:
return "dagster"


def validate_metadata_values(metadata: "RawMetadataMapping") -> None:
if metadata.get(COLUMN_SCHEMA_METADATA_KEY):
value = metadata[COLUMN_SCHEMA_METADATA_KEY]
check.invariant(
isinstance(value, TableSchema) or isinstance(value, TableSchemaMetadataValue),
f"Value for metadata key '{COLUMN_SCHEMA_METADATA_KEY}' must be a TableSchema, "
f"received {type(value).__name__}",
)
if metadata.get(TABLE_NAME_METADATA_KEY):
value = metadata[TABLE_NAME_METADATA_KEY]
check.invariant(
isinstance(metadata[TABLE_NAME_METADATA_KEY], str),
f"Value for metadata key '{TABLE_NAME_METADATA_KEY}' must be a string, received "
f"{type(value).__name__}",
)
if metadata.get(ROW_COUNT_METADATA_KEY):
value = metadata[ROW_COUNT_METADATA_KEY]
check.invariant(
isinstance(metadata[ROW_COUNT_METADATA_KEY], int),
f"Value for metadata key '{ROW_COUNT_METADATA_KEY}' must be an int, received "
f"{type(value).__name__}",
)
if metadata.get(COLUMN_LINEAGE_METADATA_KEY):
value = metadata[COLUMN_SCHEMA_METADATA_KEY]
check.invariant(
isinstance(metadata[ROW_COUNT_METADATA_KEY], TableColumnLineage),
f"Value for metadata key '{ROW_COUNT_METADATA_KEY}' must be a TableColumnLineage, "
f"received {type(value).__name__}",
)
16 changes: 11 additions & 5 deletions python_modules/dagster/dagster/_core/definitions/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from dagster._core.definitions.data_version import DataVersion
from dagster._core.definitions.events import AssetKey, CoercibleToAssetKey
from dagster._core.definitions.metadata import RawMetadataMapping
from dagster._core.definitions.metadata.metadata_set import validate_metadata_values


class AssetResult(
Expand Down Expand Up @@ -35,14 +36,19 @@ def __new__(

asset_key = AssetKey.from_coercible(asset_key) if asset_key else None

metadata = check.opt_nullable_mapping_param(
metadata,
"metadata",
key_type=str,
)

if metadata:
validate_metadata_values(metadata)

return super().__new__(
cls,
asset_key=asset_key,
metadata=check.opt_nullable_mapping_param(
metadata,
"metadata",
key_type=str,
),
metadata=metadata,
check_results=check.opt_sequence_param(
check_results, "check_results", of_type=AssetCheckResult
),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
import pytest
from dagster import AssetKey, AssetMaterialization, TableColumn, TableSchema
from dagster import (
AssetKey,
AssetMaterialization,
Definitions,
MaterializeResult,
TableColumn,
TableSchema,
asset,
)
from dagster._check import CheckError
from dagster._core.definitions.metadata import TableMetadataSet
from dagster._core.definitions.metadata.table import TableColumnDep, TableColumnLineage
Expand Down Expand Up @@ -122,3 +130,27 @@ def test_column_lineage() -> None:
materialization = AssetMaterialization(asset_key="foo", metadata=splat_table_metadata)
extracted_table_metadata = TableMetadataSet.extract(materialization.metadata)
assert extracted_table_metadata.column_lineage == expected_column_lineage


def test_error_on_invalid_reserved_metadata_value():
with pytest.raises(
CheckError, match="Value for metadata key 'dagster/column_schema' must be a TableSchema"
):

@asset(
metadata={
"dagster/column_schema": "invalid",
}
)
def foo():
pass

@asset()
def bar():
yield MaterializeResult(metadata={"dagster/column_schema": "invalid"})

defs = Definitions(assets=[bar])
with pytest.raises(
CheckError, match="Value for metadata key 'dagster/column_schema' must be a TableSchema"
):
defs.get_implicit_global_asset_job_def().execute_in_process()

0 comments on commit 4755709

Please sign in to comment.