Skip to content

Commit

Permalink
[dagster-k8s] per_step_k8s_config working with dynamic jobs (#26670)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Allow run time configuration of dynamic steps using the
`k8s_job_executor`. Previously, dynamic steps would not be configured
when using `per_step_k8s_config`. Now doing something along line of the
following:

```yml
execution:
  config:
    per_step_k8s_config:
      dyn_sink:
         container_config:
           resources:
             requests:
               cpu: '300m'
```

Given the following job:

```python
@job
def dynamic_producer_consumer_job():
    @op(out=DynamicOut(int))
    def dyn_producer():
        yield from (DynamicOutput(i, str(i)) for i in range(3))
    @op
    def dyn_sink(producer: int): ...
```

Will configure **all** `dyn_sink` ops with the following modified
config.

cc @Kuhlwein

resolves #26588

## How I Tested These Changes
Red green coverage given a dynamic job. Revert the following
[line](https://github.com/dagster-io/dagster/pull/26670/files#diff-e1b8a7f982ea59869272b4bc9529eab7f62d9474f250298d01d9a1ef68a537fbR243)
to watch the test fail.

# CHANGELOG

- [dagster-k8s] k8s_job_executor supports per step configuration for
dynamic steps.
  • Loading branch information
abhinavDhulipala authored Dec 26, 2024
1 parent f158a04 commit bc84a5a
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 6 deletions.
5 changes: 3 additions & 2 deletions python_modules/libraries/dagster-k8s/dagster_k8s/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,10 @@ def _get_container_context(
user_defined_k8s_config = get_user_defined_k8s_config(
step_handler_context.step_tags[step_key]
)

step_context = step_handler_context.get_step_context(step_key)
op_name = step_context.step.op_name
per_op_override = UserDefinedDagsterK8sConfig.from_dict(
self._per_step_k8s_config.get(step_key, {})
self._per_step_k8s_config.get(op_name, {})
)

return context.merge(K8sContainerContext(run_k8s_config=user_defined_k8s_config)).merge(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
import json
from typing import Optional
from unittest import mock

import pytest
from dagster import job, op, repository
from dagster import (
DagsterInstance,
DynamicOut,
DynamicOutput,
OpExecutionContext,
job,
op,
repository,
)
from dagster._config import process_config, resolve_to_config_type
from dagster._core.definitions.reconstruct import reconstructable
from dagster._core.execution.api import create_execution_plan
from dagster._core.execution.context.system import PlanData, PlanOrchestrationContext
from dagster._core.execution.context_creation_job import create_context_free_log_manager
from dagster._core.execution.plan.state import KnownExecutionState
from dagster._core.execution.retries import RetryMode
from dagster._core.executor.init import InitExecutorContext
from dagster._core.executor.step_delegating.step_handler.base import StepHandlerContext
Expand Down Expand Up @@ -115,6 +125,26 @@ def foo():
foo()


@job
def dynamic_producer_consumer_job():
@op(out=DynamicOut(int))
def dyn_producer():
for i in [3, 4]:
yield DynamicOutput(
i,
str(i),
)

@op
def dyn_sink(context: OpExecutionContext, producer: int) -> KnownExecutionState:
context.log.info(f"got input {producer}")
# hacky way for to get known context -> InMemIOManager -> StepOrchestrationContext
# for step handler testing
return context.get_step_execution_context().get_known_state()

dyn_producer().map(dyn_sink).collect()


@repository
def bar_repo():
return [bar]
Expand Down Expand Up @@ -211,8 +241,15 @@ def _get_executor(instance, job_def, executor_config=None):
)


def _step_handler_context(job_def, dagster_run, instance, executor):
execution_plan = create_execution_plan(job_def)
def _step_handler_context(
job_def,
dagster_run,
instance,
executor,
step: str = "foo",
known_state: Optional[KnownExecutionState] = None,
):
execution_plan = create_execution_plan(job_def, known_state=known_state)
log_manager = create_context_free_log_manager(instance, dagster_run)

plan_context = PlanOrchestrationContext(
Expand All @@ -232,7 +269,8 @@ def _step_handler_context(job_def, dagster_run, instance, executor):
execute_step_args = ExecuteStepArgs(
reconstructable(bar).get_python_origin(),
dagster_run.run_id,
["foo"],
# note that k8s_job_executor can only execute one step at a time.
[step],
print_serialized_events=False,
)

Expand Down Expand Up @@ -734,3 +772,49 @@ def test_per_step_k8s_config(k8s_run_launcher_instance, python_origin_with_conta
assert raw_k8s_config.container_config["resources"] == FOURTH_RESOURCES_TAGS
assert raw_k8s_config.container_config["working_dir"] == "MY_WORKING_DIR"
assert raw_k8s_config.container_config["volume_mounts"] == OTHER_VOLUME_MOUNTS_TAGS


def test_per_step_k8s_config_dynamic_job(k8s_run_launcher_instance: DagsterInstance):
run_id = "de07af8f-d5f4-4a43-b545-132c3310999d"
result = dynamic_producer_consumer_job.execute_in_process(
instance=k8s_run_launcher_instance,
run_id=run_id,
)
assert result.success
recon_job = reconstructable(dynamic_producer_consumer_job)
executor = _get_executor(
k8s_run_launcher_instance,
recon_job,
{
"step_k8s_config": { # injected into every step
"container_config": {
"working_dir": "MY_WORKING_DIR", # set on every step
"resources": THIRD_RESOURCES_TAGS, # overridden by the per_step level, so ignored
}
},
"per_step_k8s_config": {
"dyn_sink": {
"container_config": {
"resources": FOURTH_RESOURCES_TAGS,
}
}
},
},
)
dynamic_step = "3"
dyn_known_state = result.output_for_node("dyn_sink")[dynamic_step]
step_handler_context = _step_handler_context(
recon_job,
result.dagster_run,
k8s_run_launcher_instance,
executor,
step=f"dyn_sink[{dynamic_step}]",
known_state=dyn_known_state,
)
container_context = executor._step_handler._get_container_context( # noqa: SLF001 # pyright: ignore[reportAttributeAccessIssue]
step_handler_context
)
raw_k8s_config = container_context.run_k8s_config

assert raw_k8s_config.container_config["resources"] == FOURTH_RESOURCES_TAGS
assert raw_k8s_config.container_config["working_dir"] == "MY_WORKING_DIR"

0 comments on commit bc84a5a

Please sign in to comment.