Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[rfc] initial partitioned asset checks exploration #26745

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from typing import Mapping, NamedTuple, Optional
from typing import TYPE_CHECKING, Mapping, NamedTuple, Optional

import dagster._check as check
from dagster._core.definitions.asset_check_spec import AssetCheckKey, AssetCheckSeverity
from dagster._core.definitions.events import AssetKey, MetadataValue
from dagster._core.definitions.metadata import normalize_metadata
from dagster._serdes import whitelist_for_serdes

if TYPE_CHECKING:
from dagster._core.definitions.partition import PartitionsSubset


@whitelist_for_serdes
class AssetCheckEvaluationPlanned(
Expand All @@ -14,16 +17,30 @@ class AssetCheckEvaluationPlanned(
[
("asset_key", AssetKey),
("check_name", str),
("partition_key", Optional[str]),
("partition_subset", Optional["PartitionsSubset"]),
],
)
):
"""Metadata for the event when an asset check is launched."""

def __new__(cls, asset_key: AssetKey, check_name: str):
def __new__(
cls,
asset_key: AssetKey,
check_name: str,
partition_key: Optional[str],
partition_subset: Optional["PartitionsSubset"],
):
from dagster._core.definitions.partition import PartitionsSubset

return super(AssetCheckEvaluationPlanned, cls).__new__(
cls,
asset_key=check.inst_param(asset_key, "asset_key", AssetKey),
check_name=check.str_param(check_name, "check_name"),
partition_key=check.opt_str_param(partition_key, "partition_key"),
partition_subset=check.opt_inst_param(
partition_subset, "partition_subset", PartitionsSubset
),
)

@property
Expand Down Expand Up @@ -68,6 +85,7 @@ class AssetCheckEvaluation(
),
("severity", AssetCheckSeverity),
("description", Optional[str]),
("partition_key", Optional[str]),
],
)
):
Expand Down Expand Up @@ -101,6 +119,7 @@ def __new__(
target_materialization_data: Optional[AssetCheckEvaluationTargetMaterializationData] = None,
severity: AssetCheckSeverity = AssetCheckSeverity.ERROR,
description: Optional[str] = None,
partition_key: Optional[str] = None,
):
normed_metadata = normalize_metadata(
check.dict_param(metadata, "metadata", key_type=str),
Expand All @@ -119,6 +138,7 @@ def __new__(
),
severity=check.inst_param(severity, "severity", AssetCheckSeverity),
description=check.opt_str_param(description, "description"),
partition_key=check.opt_str_param(partition_key, "partition_key"),
)

@property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class AssetCheckResult(
("metadata", PublicAttr[Mapping[str, MetadataValue]]),
("severity", PublicAttr[AssetCheckSeverity]),
("description", PublicAttr[Optional[str]]),
("partition_key", PublicAttr[Optional[str]]),
],
),
EventWithMetadata,
Expand Down Expand Up @@ -63,6 +64,7 @@ def __new__(
metadata: Optional[Mapping[str, RawMetadataValue]] = None,
severity: AssetCheckSeverity = AssetCheckSeverity.ERROR,
description: Optional[str] = None,
partition_key: Optional[str] = None,
):
normalized_metadata = normalize_metadata(
check.opt_mapping_param(metadata, "metadata", key_type=str),
Expand All @@ -75,6 +77,7 @@ def __new__(
metadata=normalized_metadata,
severity=check.inst_param(severity, "severity", AssetCheckSeverity),
description=check.opt_str_param(description, "description"),
partition_key=check.opt_str_param(partition_key, "partition_key"),
)

def resolve_target_check_key(
Expand Down Expand Up @@ -169,6 +172,7 @@ def to_asset_check_evaluation(
target_materialization_data=target_materialization_data,
severity=self.severity,
description=self.description,
partition_key=self.partition_key,
)

def with_metadata(self, metadata: Mapping[str, RawMetadataValue]) -> "AssetCheckResult":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,7 @@ def resources(self) -> Any:
@_copy_docs_from_op_execution_context
def is_subset(self):
return self.op_execution_context.is_subset

@property
def partition_keys(self):
return self.op_execution_context.partition_keys
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ def create_step_outputs(

step_outputs: List[StepOutput] = []
for name, output_def in node.definition.output_dict.items():
asset_key = asset_layer.asset_key_for_output(handle, name)
asset_check_key = asset_layer.asset_check_key_for_output(handle, name)
asset_key = asset_layer.asset_key_for_output(handle, name) or asset_check_key.asset_key
asset_node = asset_layer.asset_graph.get(asset_key) if asset_key else None

step_outputs.append(
Expand All @@ -81,7 +82,7 @@ def create_step_outputs(
if asset_node and asset_node.key in asset_layer.asset_keys_for_node(handle)
else None,
is_asset_partitioned=bool(asset_node.partitions_def) if asset_node else False,
asset_check_key=asset_layer.asset_check_key_for_output(handle, name),
asset_check_key=asset_check_key,
),
)
)
Expand Down
44 changes: 35 additions & 9 deletions python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@
from dagster._core.definitions.asset_check_spec import AssetCheckKey
from dagster._core.definitions.base_asset_graph import BaseAssetGraph
from dagster._core.definitions.job_definition import JobDefinition
from dagster._core.definitions.partition import PartitionsDefinition
from dagster._core.definitions.partition import PartitionsDefinition, PartitionsSubset
from dagster._core.definitions.repository_definition.repository_definition import (
RepositoryLoadData,
)
Expand Down Expand Up @@ -1353,17 +1353,16 @@ def _ensure_persisted_execution_plan_snapshot(

return execution_plan_snapshot_id

def _log_materialization_planned_event_for_asset(
def _extract_partition_key_and_subset_for_asset(
self,
dagster_run: DagsterRun,
asset_key: AssetKey,
job_name: str,
step: "ExecutionStepSnap",
output: "ExecutionStepOutputSnap",
tags: Mapping[str, str],
asset_graph: Optional["BaseAssetGraph"],
) -> None:
dagster_run: DagsterRun,
output: "ExecutionStepOutputSnap",
step: "ExecutionStepSnap",
) -> Tuple[Optional[str], Optional["PartitionsSubset"]]:
from dagster._core.definitions.partition import DynamicPartitionsDefinition
from dagster._core.events import AssetMaterializationPlannedData, DagsterEvent

partition_tag = dagster_run.tags.get(PARTITION_NAME_TAG)
partition_range_start, partition_range_end = (
Expand Down Expand Up @@ -1412,6 +1411,22 @@ def _log_materialization_planned_event_for_asset(
partition = (
partition_tag if check.not_none(output.properties).is_asset_partitioned else None
)
return partition, partitions_subset

def _log_materialization_planned_event_for_asset(
self,
dagster_run: DagsterRun,
asset_key: AssetKey,
job_name: str,
step: "ExecutionStepSnap",
output: "ExecutionStepOutputSnap",
asset_graph: Optional["BaseAssetGraph"],
) -> None:
from dagster._core.events import AssetMaterializationPlannedData, DagsterEvent

partition, partitions_subset = self._extract_partition_key_and_subset_for_asset(
asset_key, dagster_run.tags, asset_graph, dagster_run, output, step
)
materialization_planned = DagsterEvent.build_asset_materialization_planned_event(
job_name,
step.key,
Expand Down Expand Up @@ -1446,7 +1461,16 @@ def _log_asset_planned_events(
)
target_asset_key = asset_check_key.asset_key
check_name = asset_check_key.name

partition, partition_subset = (
self._extract_partition_key_and_subset_for_asset(
target_asset_key,
dagster_run.tags,
asset_graph,
dagster_run,
output,
step,
)
)
event = DagsterEvent(
event_type_value=DagsterEventType.ASSET_CHECK_EVALUATION_PLANNED.value,
job_name=job_name,
Expand All @@ -1457,6 +1481,8 @@ def _log_asset_planned_events(
event_specific_data=AssetCheckEvaluationPlanned(
target_asset_key,
check_name=check_name,
partition_key=partition,
partition_subset=partition_subset,
),
step_key=step.key,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class AssetCheckExecutionRecord(
# Old records won't have an event if the status is PLANNED.
("event", Optional[EventLogEntry]),
("create_timestamp", float),
("partition", Optional[str]),
],
),
LoadableBy[AssetCheckKey],
Expand All @@ -68,13 +69,15 @@ def __new__(
status: AssetCheckExecutionRecordStatus,
event: Optional[EventLogEntry],
create_timestamp: float,
partition: Optional[str] = None,
):
check.inst_param(key, "key", AssetCheckKey)
check.int_param(id, "id")
check.str_param(run_id, "run_id")
check.inst_param(status, "status", AssetCheckExecutionRecordStatus)
check.opt_inst_param(event, "event", EventLogEntry)
check.float_param(create_timestamp, "create_timestamp")
check.opt_str_param(partition, "partition")

event_type = event.dagster_event_type if event else None
if status == AssetCheckExecutionRecordStatus.PLANNED:
Expand All @@ -101,6 +104,7 @@ def __new__(
status=status,
event=event,
create_timestamp=create_timestamp,
partition=partition,
)

@property
Expand All @@ -125,6 +129,7 @@ def from_db_row(cls, row, key: AssetCheckKey) -> "AssetCheckExecutionRecord":
else None
),
create_timestamp=utc_datetime_from_naive(row["create_timestamp"]).timestamp(),
partition=row["partition"],
)

@classmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@
),
db.Column("asset_key", db.Text),
db.Column("check_name", db.Text),
db.Column("partition", db.Text), # Currently unused. Planned for future partition support
db.Column("partition", db.Text),
db.Column("run_id", db.String(255)),
db.Column("execution_status", db.String(255)), # Planned, Success, or Failure
# Either an AssetCheckEvaluationPlanned or AssetCheckEvaluation event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2819,16 +2819,31 @@ def _store_asset_check_evaluation_planned(
AssetCheckEvaluationPlanned, check.not_none(event.dagster_event).event_specific_data
)
with self.index_connection() as conn:
conn.execute(
AssetCheckExecutionsTable.insert().values(
asset_key=planned.asset_key.to_string(),
check_name=planned.check_name,
run_id=event.run_id,
execution_status=AssetCheckExecutionRecordStatus.PLANNED.value,
evaluation_event=serialize_value(event),
evaluation_event_timestamp=self._event_insert_timestamp(event),
if planned.partition_subset is None:
conn.execute(
AssetCheckExecutionsTable.insert().values(
asset_key=planned.asset_key.to_string(),
check_name=planned.check_name,
run_id=event.run_id,
execution_status=AssetCheckExecutionRecordStatus.PLANNED.value,
evaluation_event=serialize_value(event),
evaluation_event_timestamp=self._event_insert_timestamp(event),
partition=planned.partition_key,
)
)
)
else:
for partition in planned.partition_subset.get_partition_keys():
conn.execute(
AssetCheckExecutionsTable.insert().values(
asset_key=planned.asset_key.to_string(),
check_name=planned.check_name,
run_id=event.run_id,
execution_status=AssetCheckExecutionRecordStatus.PLANNED.value,
evaluation_event=serialize_value(event),
evaluation_event_timestamp=self._event_insert_timestamp(event),
partition=partition,
)
)

def _event_insert_timestamp(self, event):
# Postgres requires a datetime that is in UTC but has no timezone info
Expand Down Expand Up @@ -2859,6 +2874,7 @@ def _store_runless_asset_check_evaluation(
if evaluation.target_materialization_data
else None
),
partition=evaluation.partition_key,
)
)

Expand All @@ -2875,6 +2891,7 @@ def _update_asset_check_evaluation(self, event: EventLogEntry, event_id: Optiona
AssetCheckExecutionsTable.c.asset_key == evaluation.asset_key.to_string(),
AssetCheckExecutionsTable.c.check_name == evaluation.check_name,
AssetCheckExecutionsTable.c.run_id == event.run_id,
AssetCheckExecutionsTable.c.partition == evaluation.partition_key,
)
)
.values(
Expand Down Expand Up @@ -2921,6 +2938,7 @@ def get_asset_check_execution_history(
AssetCheckExecutionsTable.c.execution_status,
AssetCheckExecutionsTable.c.evaluation_event,
AssetCheckExecutionsTable.c.create_timestamp,
AssetCheckExecutionsTable.c.partition,
]
)
.where(
Expand Down Expand Up @@ -2980,6 +2998,7 @@ def get_latest_asset_check_execution_by_key(
AssetCheckExecutionsTable.c.execution_status,
AssetCheckExecutionsTable.c.evaluation_event,
AssetCheckExecutionsTable.c.create_timestamp,
AssetCheckExecutionsTable.c.partition,
]
).select_from(
AssetCheckExecutionsTable.join(
Expand Down
Loading