Skip to content

Commit

Permalink
[components] Replace RequiredScope with RenderedModel (#26759)
Browse files Browse the repository at this point in the history
## Summary & Motivation

This changes up how we manage marking objects as having deferred fields. The result is a lot more terse, and generally easier to work around.

The base class exposes a method to get all of the rendered properties, which can be overridden in scenarios where we want to customize this. For now, our only existing usage can just use this raw properties dictionary directly

## How I Tested These Changes

## Changelog

NOCHANGELOG
  • Loading branch information
OwenKephart authored Dec 31, 2024
1 parent 6ed178e commit 1665f69
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 88 deletions.
Original file line number Diff line number Diff line change
@@ -1,23 +1,33 @@
import functools
import json
import os
from typing import AbstractSet, Any, Callable, Mapping, Optional, Sequence, Type, TypeVar, Union
import typing
from typing import (
Annotated,
Any,
Callable,
Mapping,
Optional,
Sequence,
Type,
TypeVar,
Union,
get_origin,
)

import dagster._check as check
from dagster._core.definitions.declarative_automation.automation_condition import (
AutomationCondition,
)
from dagster._record import record
from jinja2.nativetypes import NativeTemplate
from pydantic import BaseModel, Field
from pydantic.fields import FieldInfo
from pydantic import BaseModel, ConfigDict, TypeAdapter

T = TypeVar("T")

REF_BASE = "#/$defs/"
REF_TEMPLATE = f"{REF_BASE}{{model}}"

CONTEXT_KEY = "required_rendering_scope"
JSON_SCHEMA_EXTRA_KEY = "requires_rendering_scope"


def automation_condition_scope() -> Mapping[str, Any]:
Expand All @@ -27,31 +37,64 @@ def automation_condition_scope() -> Mapping[str, Any]:
}


def RenderingScope(field: Optional[FieldInfo] = None, *, required_scope: AbstractSet[str]) -> Any:
"""Defines a Pydantic Field that requires a specific scope to be available before rendering.
def requires_additional_scope(subschema: Mapping[str, Any]) -> bool:
raw = check.opt_inst(subschema.get(JSON_SCHEMA_EXTRA_KEY), bool)
return raw or False


def _env(key: str) -> Optional[str]:
return os.environ.get(key)


ShouldRenderFn = Callable[[Sequence[Union[str, int]]], bool]


@record(checked=False)
class RenderingMetadata:
"""Stores metadata about how a field should be rendered.
Examples:
```python
class Schema(BaseModel):
a: str = RenderingScope(required_scope={"foo", "bar"})
b: Optional[int] = RenderingScope(Field(default=None), required_scope={"baz"})
class MyModel(BaseModel):
some_field: Annotated[str, RenderingMetadata(output_type=MyOtherModel)]
opt_field: Annotated[Optional[str], RenderingMetadata(output_type=(None, MyOtherModel))]
```
"""
return FieldInfo.merge_field_infos(
field or Field(), Field(json_schema_extra={CONTEXT_KEY: json.dumps(list(required_scope))})
)

output_type: Type

def get_required_rendering_context(subschema: Mapping[str, Any]) -> Optional[AbstractSet[str]]:
raw = check.opt_inst(subschema.get(CONTEXT_KEY), str)
return set(json.loads(raw)) if raw else None

def _get_expected_type(annotation: Type) -> Optional[Type]:
origin = get_origin(annotation)
if origin is Annotated:
_, f_metadata, *_ = typing.get_args(annotation)
if isinstance(f_metadata, RenderingMetadata):
return f_metadata.output_type
else:
return annotation
return None

def _env(key: str) -> Optional[str]:
return os.environ.get(key)

class RenderedModel(BaseModel):
"""Base class for models that get rendered lazily."""

ShouldRenderFn = Callable[[Sequence[Union[str, int]]], bool]
model_config = ConfigDict(json_schema_extra={JSON_SCHEMA_EXTRA_KEY: True})

def render_properties(self, value_resolver: "TemplatedValueResolver") -> Mapping[str, Any]:
"""Returns a dictionary of rendered properties for this class."""
rendered_properties = value_resolver.render_obj(self.model_dump(exclude_unset=True))

# validate that the rendered properties match the output type
for k, v in rendered_properties.items():
annotation = self.__annotations__[k]
expected_type = _get_expected_type(annotation)
if expected_type is not None:
# hook into pydantic's type validation to handle complicated stuff like Optional[Mapping[str, int]]
TypeAdapter(
expected_type, config={"arbitrary_types_allowed": True}
).validate_python(v)

return rendered_properties


@record
Expand Down Expand Up @@ -110,12 +153,12 @@ def render_params(self, val: T, target_type: Type) -> T:
should_render = lambda _: True
else:
should_render = functools.partial(
has_rendering_scope, json_schema=json_schema, subschema=json_schema
can_render_with_default_scope, json_schema=json_schema, subschema=json_schema
)
return self._render_obj(val, [], should_render=should_render)


def has_rendering_scope(
def can_render_with_default_scope(
valpath: Sequence[Union[str, int]], json_schema: Mapping[str, Any], subschema: Mapping[str, Any]
) -> bool:
"""Given a valpath and the json schema of a given target type, determines if there is a rendering scope
Expand All @@ -126,14 +169,17 @@ def has_rendering_scope(
if "$ref" in subschema:
subschema = json_schema["$defs"].get(subschema["$ref"][len(REF_BASE) :])

if get_required_rendering_context(subschema) is not None:
if requires_additional_scope(subschema):
return False
elif len(valpath) == 0:
return True

# Optional[ComplexType] (e.g.) will contain multiple schemas in the "anyOf" field
if "anyOf" in subschema:
return all(has_rendering_scope(valpath, json_schema, inner) for inner in subschema["anyOf"])
return all(
can_render_with_default_scope(valpath, json_schema, inner)
for inner in subschema["anyOf"]
)

el = valpath[0]
if isinstance(el, str):
Expand All @@ -152,4 +198,4 @@ def has_rendering_scope(
return subschema.get("additionalProperties", True)

_, *rest = valpath
return has_rendering_scope(rest, json_schema, inner)
return can_render_with_default_scope(rest, json_schema, inner)
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,35 @@
from dagster._record import replace
from pydantic import BaseModel, Field

from dagster_components.core.component_rendering import RenderingScope, TemplatedValueResolver
from dagster_components.core.component_rendering import (
RenderedModel,
RenderingMetadata,
TemplatedValueResolver,
)


class OpSpecBaseModel(BaseModel):
name: Optional[str] = None
tags: Optional[Dict[str, str]] = None


class AssetAttributesModel(BaseModel):
class AssetAttributesModel(RenderedModel):
key: Optional[str] = None
deps: Sequence[str] = []
description: Optional[str] = None
metadata: Union[str, Mapping[str, Any]] = {}
metadata: Annotated[
Union[str, Mapping[str, Any]], RenderingMetadata(output_type=Mapping[str, Any])
] = {}
group_name: Optional[str] = None
skippable: bool = False
code_version: Optional[str] = None
owners: Sequence[str] = []
tags: Union[str, Mapping[str, str]] = {}
automation_condition: Optional[Union[str, AutomationCondition]] = RenderingScope(
Field(None), required_scope={"automation_condition"}
)

class Config:
# required for AutomationCondition
arbitrary_types_allowed = True

def get_resolved_attributes(self, value_resolver: TemplatedValueResolver) -> Mapping[str, Any]:
return value_resolver.render_obj(self.model_dump(exclude_unset=True))
tags: Annotated[
Union[str, Mapping[str, str]], RenderingMetadata(output_type=Mapping[str, str])
] = {}
automation_condition: Annotated[
Optional[str], RenderingMetadata(output_type=Optional[AutomationCondition])
] = None


class AssetSpecProcessor(ABC, BaseModel):
Expand All @@ -62,7 +63,7 @@ def apply_to_spec(

# add the original spec to the context and resolve values
return self._apply_to_spec(
spec, self.attributes.get_resolved_attributes(value_resolver.with_context(asset=spec))
spec, self.attributes.render_properties(value_resolver.with_context(asset=spec))
)

def apply(self, defs: Definitions, value_resolver: TemplatedValueResolver) -> Definitions:
Expand Down Expand Up @@ -102,8 +103,5 @@ def _apply_to_spec(self, spec: AssetSpec, attributes: Mapping[str, Any]) -> Asse


AssetAttributes = Sequence[
Annotated[
Union[MergeAttributes, ReplaceAttributes],
RenderingScope(Field(union_mode="left_to_right"), required_scope={"asset"}),
]
Annotated[Union[MergeAttributes, ReplaceAttributes], Field(union_mode="left_to_right")]
]
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,20 @@
TemplatedValueResolver,
component_type,
)
from dagster_components.core.component_rendering import RenderingScope
from dagster_components.core.component_rendering import RenderedModel
from dagster_components.core.dsl_schema import AssetAttributes, AssetSpecProcessor, OpSpecBaseModel
from dagster_components.generate import generate_component_yaml


class DbtNodeTranslatorParams(BaseModel):
class DbtNodeTranslatorParams(RenderedModel):
key: Optional[str] = None
group: Optional[str] = None


class DbtProjectParams(BaseModel):
dbt: DbtCliResource
op: Optional[OpSpecBaseModel] = None
translator: Optional[DbtNodeTranslatorParams] = RenderingScope(
Field(default=None), required_scope={"node"}
)
translator: Optional[DbtNodeTranslatorParams] = None
asset_attributes: Optional[AssetAttributes] = None


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def load(cls, context: ComponentLoadContext) -> "PipesSubprocessScriptCollection
if not script_path.exists():
raise FileNotFoundError(f"Script {script_path} does not exist")
path_specs[script_path] = [
AssetSpec(**asset.get_resolved_attributes(context.templated_value_resolver))
AssetSpec(**asset.render_properties(context.templated_value_resolver))
for asset in script.assets
]

Expand Down
Loading

0 comments on commit 1665f69

Please sign in to comment.