Skip to content

Commit

Permalink
0.2.2 release (#273)
Browse files Browse the repository at this point in the history
  • Loading branch information
boetro authored Aug 24, 2023
1 parent 0ccf8e6 commit f62b3c1
Show file tree
Hide file tree
Showing 25 changed files with 434 additions and 281 deletions.
218 changes: 157 additions & 61 deletions buildflow/core/app/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,32 @@
from buildflow.io.strategies._strategy import StategyType


@dataclasses.dataclass
class _PrimitiveCacheEntry:
primitive: Primitive
pulumi_resource: pulumi.Resource


@dataclasses.dataclass
class _PrimitiveCache:
cache: List[_PrimitiveCacheEntry] = dataclasses.field(default_factory=list)

def __contains__(self, primitive: Primitive):
return self.get(primitive) is not None

def append(self, entry: _PrimitiveCacheEntry):
self.cache.append(entry)

def get(self, primitive: Primitive):
for entry in self.cache:
if entry.primitive == primitive:
return entry.pulumi_resource
return None

def clear(self):
self.cache.clear()


def _get_directory_path_of_caller():
# NOTE: This function is used to get the file path of the caller of the
# Flow(). This is used to determine the directory to look for a BuildFlow
Expand All @@ -45,6 +71,53 @@ def _get_directory_path_of_caller():
FlowID = str


def _traverse_primitive_for_pulumi(
primitive: Primitive,
type_: Type,
credentials: CredentialType,
initial_opts: pulumi.ResourceOptions,
visited_primitives: _PrimitiveCache,
) -> pulumi.Resource:
pulumi_provider = primitive.pulumi_provider()
if pulumi_provider is None:
raise ValueError(
"_traverse_primitive_for_pulumi should never be called with "
"an unmanaged primitive."
)
fields = dataclasses.fields(primitive)
parent_resources = []
for field in fields:
field_value = getattr(primitive, field.name)
if (
field_value is not None
and isinstance(field_value, Primitive)
and field_value.pulumi_provider() is not None
):
visited_resource = visited_primitives.get(field_value)
# Visit all non managed parent primitives to create the parent resources
if visited_resource is None:
parent_resources.append(
_traverse_primitive_for_pulumi(
field_value,
type_,
credentials,
initial_opts,
visited_primitives,
)
)
else:
parent_resources.append(visited_resource)

opts = pulumi.ResourceOptions.merge(
initial_opts, pulumi.ResourceOptions(depends_on=parent_resources)
)
resource = pulumi_provider.pulumi_resource(
type_=type_, credentials=credentials, opts=opts
)
visited_primitives.append(_PrimitiveCacheEntry(primitive, resource))
return resource


@dataclasses.dataclass
class PrimitiveState:
primitive_class: str
Expand Down Expand Up @@ -91,25 +164,28 @@ def parse_resource_states(
last_updated: datetime.datetime,
pulumi_stack_name: str,
):
def find_child_resources(
def find_attached_resources(
parent_resource: ResourceState,
) -> Dict[str, ResourceState]:
"""Find direct child resources for a given URN."""
"""Find all attached resources for a given URN."""
resources = {}
dependencies = []
for resource in resource_states:
if (
parent_resource.resource_type in resource.resource_urn
and resource.resource_type != parent_resource.resource_type
):
if resource.parent == parent_resource.resource_urn:
if (
resource.cloud_console_url is None
and parent_resource.cloud_console_url is not None
):
resource.cloud_console_url = parent_resource.cloud_console_url
child_resources = find_child_resources(resource)
if not child_resources:
resources[resource.resource_urn] = resource
resources.update(child_resources)
resources[resource.resource_urn] = resource
if resource.resource_urn in parent_resource.dependencies:
dependencies.append(resource)

for dependency in dependencies:
for resource in resource_states:
if dependency.parent == resource.resource_urn:
resources.update(find_attached_resources(resource))

return resources

def find_processor_resource(processor_id: str) -> Optional[ResourceState]:
Expand Down Expand Up @@ -159,7 +235,7 @@ def find_processor_resource(processor_id: str) -> Optional[ResourceState]:

if source_resource:
source_child_resources = list(
find_child_resources(source_resource).values()
find_attached_resources(source_resource).values()
)
tracked_resources.add(source_urn)
tracked_resources.update(
Expand All @@ -172,7 +248,7 @@ def find_processor_resource(processor_id: str) -> Optional[ResourceState]:

if sink_resource:
sink_child_resources = list(
find_child_resources(sink_resource).values()
find_attached_resources(sink_resource).values()
)
tracked_resources.add(sink_urn)
tracked_resources.update(
Expand Down Expand Up @@ -255,7 +331,7 @@ def __init__(
self._infra_actor_ref: Optional[InfraActor] = None
# NOTE: we use a list here instead of a set because we have no
# guarantee that primitives will be cachable.
self._primitive_cache: List[Primitive] = []
self._primitive_cache: _PrimitiveCache = _PrimitiveCache()

def _get_infra_actor(self) -> InfraActor:
if self._infra_actor_ref is None:
Expand Down Expand Up @@ -313,7 +389,15 @@ def pipeline(
autoscale_options: AutoscalerOptions = AutoscalerOptions.default(),
log_level: str = "INFO",
):
if sink is None:
if not dataclasses.is_dataclass(source):
raise ValueError(
f"source must be a dataclass. Received: {type(source).__name__}"
)
if sink is not None and not dataclasses.is_dataclass(sink):
raise ValueError(
f"sink must be a dataclass. Received: {type(sink).__name__}"
)
elif sink is None:
sink = Empty()

# Convert any Portableprimitives into cloud-specific primitives
Expand Down Expand Up @@ -496,6 +580,24 @@ def apply(self):

async def _apply(self):
logging.debug(f"Setting up Infra for Flow({self.flow_id})...")
if self.options.infra_options.require_confirmation:
await self._get_infra_actor().plan(processors=self._processors)
print("Would you like to apply these changes?")
response = input('Enter "y (yes)" to confirm, "n (no) to reject": ')
while True:
if response.lower() in ["n", "no"]:
print("User rejected Infra changes. Aborting.")
return
elif response.lower() in ["y", "yes"]:
print("User confirmed Infra changes. Applying.")
# Reset the primitive cache
self._primitive_cache.clear()
break
else:
response = input(
'Invalid response. Enter "y (yes)" to '
'confirm, "n (no) to reject": '
)
await self._get_infra_actor().apply(processors=self._processors)
logging.debug(f"...Finished setting up Infra for Flow({self.flow_id})")

Expand Down Expand Up @@ -585,18 +687,8 @@ def decorator_function(original_process_fn_or_class):
)
input_type, output_type = self._input_output_type(full_arg_spec)

# NOTE: We only create Pulumi resources for managed primitives and only
# for the first time we see a primitive.
include_source_primitive = False
if source_primitive not in self._primitive_cache:
self._primitive_cache.append(source_primitive)
include_source_primitive = True
include_sink_primitive = False
if sink_primitive not in self._primitive_cache:
self._primitive_cache.append(sink_primitive)
include_sink_primitive = True

processor_id = original_process_fn_or_class.__name__
primitive_cache = self._primitive_cache

def pulumi_resources_for_pipeline():
class PipelineComponentResource(pulumi.ComponentResource):
Expand All @@ -622,28 +714,34 @@ def __init__(
# usage will not, so the urn will not be included under this
# Processor's ComponentResource. Builds the source's
# pulumi.CompositeResource (if it exists)
if include_source_primitive:
source_pulumi_provider = source_primitive.pulumi_provider()
if source_pulumi_provider is not None:
source_resource = (
source_pulumi_provider.pulumi_resource(
type_=input_type,
credentials=source_credentials,
opts=child_opts,
)
)
outputs["source_urn"] = source_resource.urn
source_pulumi_provider = source_primitive.pulumi_provider()
if (
source_pulumi_provider is not None
and source_primitive not in primitive_cache
):
source_resource = _traverse_primitive_for_pulumi(
primitive=source_primitive,
type_=input_type,
credentials=source_credentials,
initial_opts=child_opts,
visited_primitives=primitive_cache,
)
outputs["source_urn"] = source_resource.urn

# Builds the sink's pulumi.CompositeResource (if it exists)
if include_sink_primitive:
sink_pulumi_provider = sink_primitive.pulumi_provider()
if sink_pulumi_provider is not None:
sink_resource = sink_pulumi_provider.pulumi_resource(
type_=output_type,
credentials=sink_credentials,
opts=child_opts,
)
outputs["sink_urn"] = sink_resource.urn
sink_pulumi_provider = sink_primitive.pulumi_provider()
if (
sink_pulumi_provider is not None
and sink_primitive not in primitive_cache
):
sink_resource = _traverse_primitive_for_pulumi(
primitive=sink_primitive,
type_=output_type,
credentials=sink_credentials,
initial_opts=child_opts,
visited_primitives=primitive_cache,
)
outputs["sink_urn"] = sink_resource.urn

self.register_outputs(outputs)

Expand Down Expand Up @@ -725,14 +823,8 @@ def decorator_function(original_process_fn_or_class):
)
_, output_type = self._input_output_type(full_arg_spec)

# NOTE: We only create Pulumi resources for managed primitives and only
# for the first time we see a primitive.
include_sink_primitive = False
if sink_primitive not in self._primitive_cache:
self._primitive_cache.append(sink_primitive)
include_sink_primitive = True

processor_id = original_process_fn_or_class.__name__
primitive_cache = self._primitive_cache

def pulumi_resources_for_collector():
class CollectorComponentResource(pulumi.ComponentResource):
Expand All @@ -759,15 +851,19 @@ def __init__(
# pulumi.CompositeResource (if it exists)

# Builds the sink's pulumi.CompositeResource (if it exists)
if include_sink_primitive:
sink_pulumi_provider = sink_primitive.pulumi_provider()
if sink_pulumi_provider is not None:
sink_resource = sink_pulumi_provider.pulumi_resource(
type_=output_type,
credentials=sink_credentials,
opts=child_opts,
)
outputs["sink_urn"] = sink_resource.urn
sink_pulumi_provider = sink_primitive.pulumi_provider()
if (
sink_pulumi_provider is not None
and sink_primitive not in primitive_cache
):
sink_resource = _traverse_primitive_for_pulumi(
sink_primitive,
output_type,
sink_credentials,
child_opts,
visited_primitives=primitive_cache,
)
outputs["sink_urn"] = sink_resource.urn

self.register_outputs(outputs)

Expand Down
Loading

0 comments on commit f62b3c1

Please sign in to comment.