Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include Time Spine Nodes in Dataflow Plan #1548

Merged
merged 11 commits into from
Dec 11, 2024
Merged
  •  
  •  
  •  
1 change: 1 addition & 0 deletions metricflow-semantics/metricflow_semantics/dag/id_prefix.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class StaticIdPrefix(IdPrefix, Enum, metaclass=EnumMetaClassHelper):
DATAFLOW_NODE_ADD_UUID_COLUMN_PREFIX = "auid"
DATAFLOW_NODE_JOIN_CONVERSION_EVENTS_PREFIX = "jce"
DATAFLOW_NODE_WINDOW_REAGGREGATION_ID_PREFIX = "wr"
DATAFLOW_NODE_ALIAS_SPECS_ID_PREFIX = "as"

SQL_EXPR_COLUMN_REFERENCE_ID_PREFIX = "cr"
SQL_EXPR_COMPARISON_ID_PREFIX = "cmp"
Expand Down
74 changes: 74 additions & 0 deletions metricflow-semantics/metricflow_semantics/instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ def accept(self, visitor: InstanceVisitor[VisitorOutputT]) -> VisitorOutputT:
"""See Visitable."""
raise NotImplementedError()

def with_new_spec(self, new_spec: SpecT, column_association_resolver: ColumnAssociationResolver) -> MdoInstance:
"""Returns a new instance with the spec replaced."""
raise NotImplementedError()


class LinkableInstance(MdoInstance, Generic[SpecT]):
"""An MdoInstance whose spec is linkable (i.e., it can have entity links)."""
Expand Down Expand Up @@ -105,6 +109,17 @@ class MeasureInstance(MdoInstance[MeasureSpec], SemanticModelElementInstance):
def accept(self, visitor: InstanceVisitor[VisitorOutputT]) -> VisitorOutputT: # noqa: D102
return visitor.visit_measure_instance(self)

def with_new_spec(
self, new_spec: MeasureSpec, column_association_resolver: ColumnAssociationResolver
) -> MeasureInstance:
"""Returns a new instance with the spec replaced."""
return MeasureInstance(
associated_columns=(column_association_resolver.resolve_spec(new_spec),),
defined_from=self.defined_from,
spec=new_spec,
aggregation_state=self.aggregation_state,
)


@dataclass(frozen=True)
class DimensionInstance(LinkableInstance[DimensionSpec], SemanticModelElementInstance): # noqa: D101
Expand All @@ -125,6 +140,16 @@ def with_entity_prefix(
spec=transformed_spec,
)

def with_new_spec(
self, new_spec: DimensionSpec, column_association_resolver: ColumnAssociationResolver
) -> DimensionInstance:
"""Returns a new instance with the spec replaced."""
return DimensionInstance(
associated_columns=(column_association_resolver.resolve_spec(new_spec),),
defined_from=self.defined_from,
spec=new_spec,
)


@dataclass(frozen=True)
class TimeDimensionInstance(LinkableInstance[TimeDimensionSpec], SemanticModelElementInstance): # noqa: D101
Expand All @@ -151,6 +176,16 @@ def with_new_defined_from(self, defined_from: Sequence[SemanticModelElementRefer
associated_columns=self.associated_columns, defined_from=tuple(defined_from), spec=self.spec
)

def with_new_spec(
self, new_spec: TimeDimensionSpec, column_association_resolver: ColumnAssociationResolver
) -> TimeDimensionInstance:
"""Returns a new instance with the spec replaced."""
return TimeDimensionInstance(
associated_columns=(column_association_resolver.resolve_spec(new_spec),),
defined_from=self.defined_from,
spec=new_spec,
)


@dataclass(frozen=True)
class EntityInstance(LinkableInstance[EntitySpec], SemanticModelElementInstance): # noqa: D101
Expand All @@ -171,6 +206,16 @@ def with_entity_prefix(
spec=transformed_spec,
)

def with_new_spec(
self, new_spec: EntitySpec, column_association_resolver: ColumnAssociationResolver
) -> EntityInstance:
"""Returns a new instance with the spec replaced."""
return EntityInstance(
associated_columns=(column_association_resolver.resolve_spec(new_spec),),
defined_from=self.defined_from,
spec=new_spec,
)


@dataclass(frozen=True)
class GroupByMetricInstance(LinkableInstance[GroupByMetricSpec], SerializableDataclass): # noqa: D101
Expand All @@ -192,6 +237,16 @@ def with_entity_prefix(
spec=transformed_spec,
)

def with_new_spec(
self, new_spec: GroupByMetricSpec, column_association_resolver: ColumnAssociationResolver
) -> GroupByMetricInstance:
"""Returns a new instance with the spec replaced."""
return GroupByMetricInstance(
associated_columns=(column_association_resolver.resolve_spec(new_spec),),
defined_from=self.defined_from,
spec=new_spec,
)


@dataclass(frozen=True)
class MetricInstance(MdoInstance[MetricSpec], SerializableDataclass): # noqa: D101
Expand All @@ -202,6 +257,16 @@ class MetricInstance(MdoInstance[MetricSpec], SerializableDataclass): # noqa: D
def accept(self, visitor: InstanceVisitor[VisitorOutputT]) -> VisitorOutputT: # noqa: D102
return visitor.visit_metric_instance(self)

def with_new_spec(
self, new_spec: MetricSpec, column_association_resolver: ColumnAssociationResolver
) -> MetricInstance:
"""Returns a new instance with the spec replaced."""
return MetricInstance(
associated_columns=(column_association_resolver.resolve_spec(new_spec),),
defined_from=self.defined_from,
spec=new_spec,
)


@dataclass(frozen=True)
class MetadataInstance(MdoInstance[MetadataSpec], SerializableDataclass): # noqa: D101
Expand All @@ -211,6 +276,15 @@ class MetadataInstance(MdoInstance[MetadataSpec], SerializableDataclass): # noq
def accept(self, visitor: InstanceVisitor[VisitorOutputT]) -> VisitorOutputT: # noqa: D102
return visitor.visit_metadata_instance(self)

def with_new_spec(
self, new_spec: MetadataSpec, column_association_resolver: ColumnAssociationResolver
) -> MetadataInstance:
"""Returns a new instance with the spec replaced."""
return MetadataInstance(
associated_columns=(column_association_resolver.resolve_spec(new_spec),),
spec=new_spec,
)


# Output type of transform function
TransformOutputT = TypeVar("TransformOutputT")
Expand Down
Loading
Loading