Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

execute_k8s_job does not handle watch client stale state #26626

Open
OrenLederman opened this issue Dec 20, 2024 · 1 comment · May be fixed by #26760
Open

execute_k8s_job does not handle watch client stale state #26626

OrenLederman opened this issue Dec 20, 2024 · 1 comment · May be fixed by #26760
Labels
area: execution Related to Execution deployment: k8s Related to deploying Dagster to Kubernetes type: bug Something isn't working

Comments

@OrenLederman
Copy link
Contributor

OrenLederman commented Dec 20, 2024

What's the issue?

Long calls to execute_k8s_job sometimes fail when reading the logs. The method has retries around next(log_stream), but if the watch client enters a stale state, the code ends up failing. Example log:

dagster._core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "run_generic_training":

  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/plan/execute_plan.py", line 245, in dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/plan/execute_step.py", line 499, in core_dagster_event_sequence_for_step
    for user_event in _step_output_error_checked_user_event_sequence(
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/plan/execute_step.py", line 183, in _step_output_error_checked_user_event_sequence
    for user_event in user_event_sequence:
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/plan/execute_step.py", line 87, in _process_asset_results_to_events
    for user_event in user_event_sequence:
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/plan/compute.py", line 193, in execute_core_compute
    for step_output in _yield_compute_results(step_context, inputs, compute_fn, compute_context):
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/plan/compute.py", line 162, in _yield_compute_results
    for event in iterate_with_context(
  File "/usr/local/lib/python3.10/dist-packages/dagster/_utils/__init__.py", line 480, in iterate_with_context
    with context_fn():
  File "/usr/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/plan/utils.py", line 84, in op_execution_error_boundary
    raise error_cls(

The above exception was caused by the following exception:
urllib3.exceptions.ProtocolError: Response ended prematurely

  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/plan/utils.py", line 54, in op_execution_error_boundary
    yield
  File "/usr/local/lib/python3.10/dist-packages/dagster/_utils/__init__.py", line 482, in iterate_with_context
    next_output = next(iterator)
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/plan/compute_generator.py", line 140, in _coerce_op_compute_fn_to_iterator
    result = invoke_compute_fn(
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/plan/compute_generator.py", line 128, in invoke_compute_fn
    return fn(context, **args_to_pass) if context_arg_provided else fn(**args_to_pass)
  File "/app/generic_ml_training/dags/ops.py", line 117, in run_generic_training
    execute_k8s_job(
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/decorator_utils.py", line 203, in wrapped_with_pre_call_fn
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/dagster_k8s/ops/k8s_job_op.py", line 424, in execute_k8s_job
    raise e
  File "/usr/local/lib/python3.10/dist-packages/dagster_k8s/ops/k8s_job_op.py", line 389, in execute_k8s_job
    log_entry = k8s_api_retry(
  File "/usr/local/lib/python3.10/dist-packages/dagster_k8s/client.py", line 144, in k8s_api_retry
    return fn()
  File "/usr/local/lib/python3.10/dist-packages/dagster_k8s/ops/k8s_job_op.py", line 390, in <lambda>
    lambda: next(log_stream),
  File "/usr/local/lib/python3.10/dist-packages/kubernetes/watch/watch.py", line 178, in stream
    for line in iter_resp_lines(resp):
  File "/usr/local/lib/python3.10/dist-packages/kubernetes/watch/watch.py", line 56, in iter_resp_lines
    for segment in resp.stream(amt=None, decode_content=False):
  File "/usr/local/lib/python3.10/dist-packages/urllib3/response.py", line 1057, in stream
    yield from self.read_chunked(amt, decode_content=decode_content)
  File "/usr/local/lib/python3.10/dist-packages/urllib3/response.py", line 1206, in read_chunked
    self._update_chunk_length()
  File "/usr/local/lib/python3.10/dist-packages/urllib3/response.py", line 1136, in _update_chunk_length
    raise ProtocolError("Response ended prematurely") from None

The above exception occurred during handling of the following exception:
ValueError: invalid literal for int() with base 16: b''

  File "/usr/local/lib/python3.10/dist-packages/urllib3/response.py", line 1128, in _update_chunk_length
    self.chunk_left = int(line, 16)

I found similar issues reported in ansible-playbook, and the relevant issue in the kubernetes client. The solution is to move the watch client creation (log_stream = watch.stream()) into a loop as well. I'm trying it out in my repo and will post a PR with a fix after I confirm that it's working (or at least not introducing new issues)

What did you expect to happen?

The code shouldn't fail because of intermediate errors

How to reproduce?

This is difficult to reproduce. It originates from the underlying k8s client and only happens very rarely (but often enough to fail long running, expensive, training jobs).

Dagster version

1.9.3

Deployment type

Dagster Helm chart

Deployment details

No response

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.
By submitting this issue, you agree to follow Dagster's Code of Conduct.

@OrenLederman OrenLederman added the type: bug Something isn't working label Dec 20, 2024
@garethbrickman garethbrickman added deployment: k8s Related to deploying Dagster to Kubernetes area: execution Related to Execution labels Dec 20, 2024
@OrenLederman
Copy link
Contributor Author

As a workaround, this seems to be working (again, hard to confirm because I can't easily recreate the issue). I just created a copy of the execute_k8s_job method with a few small modifications. I'll create a PR soon.

# mypy: ignore-errors

# Code taken from https://github.com/dagster-io/dagster/blob/a14cb87a2caa69ce10d27674e7de518721bc5cfd/python_modules/libraries/dagster-k8s/dagster_k8s/ops/k8s_job_op.py

# The only change is the loop that streams logs from the pod. It adds another loop and ensures that the watch.stream is called again if there's a failure

import os
import time
from typing import Any, Dict, List, Optional

from urllib3.exceptions import ProtocolError

import kubernetes.config
import kubernetes.watch
from dagster import (
    OpExecutionContext,
)
from dagster._annotations import experimental
from dagster._core.errors import DagsterExecutionInterruptedError

from dagster_k8s.client import DEFAULT_JOB_POD_COUNT, DagsterKubernetesClient, k8s_api_retry
from dagster_k8s.container_context import K8sContainerContext
from dagster_k8s.job import (
    DagsterK8sJobConfig,
    K8sConfigMergeBehavior,
    UserDefinedDagsterK8sConfig,
    construct_dagster_k8s_job,
    get_k8s_job_name,
)
from dagster_k8s.launcher import K8sRunLauncher


@experimental
def execute_k8s_job(
    context: OpExecutionContext,
    image: str,
    command: Optional[List[str]] = None,
    args: Optional[List[str]] = None,
    namespace: Optional[str] = None,
    image_pull_policy: Optional[str] = None,
    image_pull_secrets: Optional[List[Dict[str, str]]] = None,
    service_account_name: Optional[str] = None,
    env_config_maps: Optional[List[str]] = None,
    env_secrets: Optional[List[str]] = None,
    env_vars: Optional[List[str]] = None,
    volume_mounts: Optional[List[Dict[str, Any]]] = None,
    volumes: Optional[List[Dict[str, Any]]] = None,
    labels: Optional[Dict[str, str]] = None,
    resources: Optional[Dict[str, Any]] = None,
    scheduler_name: Optional[str] = None,
    load_incluster_config: bool = True,
    kubeconfig_file: Optional[str] = None,
    timeout: Optional[int] = None,
    container_config: Optional[Dict[str, Any]] = None,
    pod_template_spec_metadata: Optional[Dict[str, Any]] = None,
    pod_spec_config: Optional[Dict[str, Any]] = None,
    job_metadata: Optional[Dict[str, Any]] = None,
    job_spec_config: Optional[Dict[str, Any]] = None,
    k8s_job_name: Optional[str] = None,
    merge_behavior: K8sConfigMergeBehavior = K8sConfigMergeBehavior.DEEP,
    delete_failed_k8s_jobs: Optional[bool] = True,
    _kubeconfig_file_context: Optional[str] = None,
):
    """This function is a utility for executing a Kubernetes job from within a Dagster op.

    Args:
        image (str): The image in which to launch the k8s job.
        command (Optional[List[str]]): The command to run in the container within the launched
            k8s job. Default: None.
        args (Optional[List[str]]): The args for the command for the container. Default: None.
        namespace (Optional[str]): Override the kubernetes namespace in which to run the k8s job.
            Default: None.
        image_pull_policy (Optional[str]): Allows the image pull policy to be overridden, e.g. to
            facilitate local testing with `kind <https://kind.sigs.k8s.io/>`_. Default:
            ``"Always"``. See:
            https://kubernetes.io/docs/concepts/containers/images/#updating-images.
        image_pull_secrets (Optional[List[Dict[str, str]]]): Optionally, a list of dicts, each of
            which corresponds to a Kubernetes ``LocalObjectReference`` (e.g.,
            ``{'name': 'myRegistryName'}``). This allows you to specify the ```imagePullSecrets`` on
            a pod basis. Typically, these will be provided through the service account, when needed,
            and you will not need to pass this argument. See:
            https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod
            and https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#podspec-v1-core
        service_account_name (Optional[str]): The name of the Kubernetes service account under which
            to run the Job. Defaults to "default"        env_config_maps (Optional[List[str]]): A list of custom ConfigMapEnvSource names from which to
            draw environment variables (using ``envFrom``) for the Job. Default: ``[]``. See:
            https://kubernetes.io/docs/tasks/inject-data-application/define-environment-variable-container/#define-an-environment-variable-for-a-container
        env_secrets (Optional[List[str]]): A list of custom Secret names from which to
            draw environment variables (using ``envFrom``) for the Job. Default: ``[]``. See:
            https://kubernetes.io/docs/tasks/inject-data-application/distribute-credentials-secure/#configure-all-key-value-pairs-in-a-secret-as-container-environment-variables
        env_vars (Optional[List[str]]): A list of environment variables to inject into the Job.
            Default: ``[]``. See: https://kubernetes.io/docs/tasks/inject-data-application/distribute-credentials-secure/#configure-all-key-value-pairs-in-a-secret-as-container-environment-variables
        volume_mounts (Optional[List[Permissive]]): A list of volume mounts to include in the job's
            container. Default: ``[]``. See:
            https://v1-18.docs.kubernetes.io/docs/reference/generated/kubernetes-api/v1.18/#volumemount-v1-core
        volumes (Optional[List[Permissive]]): A list of volumes to include in the Job's Pod. Default: ``[]``. See:
            https://v1-18.docs.kubernetes.io/docs/reference/generated/kubernetes-api/v1.18/#volume-v1-core
        labels (Optional[Dict[str, str]]): Additional labels that should be included in the Job's Pod. See:
            https://kubernetes.io/docs/concepts/overview/working-with-objects/labels
        resources (Optional[Dict[str, Any]]) Compute resource requirements for the container. See:
            https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/
        scheduler_name (Optional[str]): Use a custom Kubernetes scheduler for launched Pods. See:
            https://kubernetes.io/docs/tasks/extend-kubernetes/configure-multiple-schedulers/
        load_incluster_config (bool): Whether the op is running within a k8s cluster. If ``True``,
            we assume the launcher is running within the target cluster and load config using
            ``kubernetes.config.load_incluster_config``. Otherwise, we will use the k8s config
            specified in ``kubeconfig_file`` (using ``kubernetes.config.load_kube_config``) or fall
            back to the default kubeconfig. Default: True,
        kubeconfig_file (Optional[str]): The kubeconfig file from which to load config. Defaults to
            using the default kubeconfig. Default: None.
        timeout (Optional[int]): Raise an exception if the op takes longer than this timeout in
            seconds to execute. Default: None.
        container_config (Optional[Dict[str, Any]]): Raw k8s config for the k8s pod's main container
            (https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30/#container-v1-core).
            Keys can either snake_case or camelCase.Default: None.
        pod_template_spec_metadata (Optional[Dict[str, Any]]): Raw k8s config for the k8s pod's
            metadata (https://kubernetes.io/docs/reference/kubernetes-api/common-definitions/object-meta/#ObjectMeta).
            Keys can either snake_case or camelCase. Default: None.
        pod_spec_config (Optional[Dict[str, Any]]): Raw k8s config for the k8s pod's pod spec
            (https://kubernetes.io/docs/reference/kubernetes-api/workload-resources/pod-v1/#PodSpec).
            Keys can either snake_case or camelCase. Default: None.
        job_metadata (Optional[Dict[str, Any]]): Raw k8s config for the k8s job's metadata
            (https://kubernetes.io/docs/reference/kubernetes-api/common-definitions/object-meta/#ObjectMeta).
            Keys can either snake_case or camelCase. Default: None.
        job_spec_config (Optional[Dict[str, Any]]): Raw k8s config for the k8s job's job spec
            (https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30/#jobspec-v1-batch).
            Keys can either snake_case or camelCase.Default: None.
        k8s_job_name (Optional[str]): Overrides the name of the k8s job. If not set, will be set
            to a unique name based on the current run ID and the name of the calling op. If set,
            make sure that the passed in name is a valid Kubernetes job name that does not
            already exist in the cluster.
        merge_behavior (Optional[K8sConfigMergeBehavior]): How raw k8s config set on this op should
            be merged with any raw k8s config set on the code location that launched the op. By
            default, the value is K8sConfigMergeBehavior.DEEP, meaning that the two dictionaries
            are recursively merged, appending list fields together and merging dictionary fields.
            Setting it to SHALLOW will make the dictionaries shallowly merged - any shared values
            in the dictionaries will be replaced by the values set on this op.
        delete_failed_k8s_jobs (bool): Whether to immediately delete failed Kubernetes jobs. If False,
            failed jobs will remain accessible through the Kubernetes API until deleted by a user or cleaned up by the
            .spec.ttlSecondsAfterFinished parameter of the job.
            (https://kubernetes.io/docs/concepts/workloads/controllers/ttlafterfinished/).
            Defaults to True.
    """
    run_container_context = K8sContainerContext.create_for_run(
        context.dagster_run,
        (
            context.instance.run_launcher
            if isinstance(context.instance.run_launcher, K8sRunLauncher)
            else None
        ),
        include_run_tags=False,
    )

    container_config = container_config.copy() if container_config else {}
    if command:
        container_config["command"] = command

    op_container_context = K8sContainerContext(
        image_pull_policy=image_pull_policy,
        image_pull_secrets=image_pull_secrets,
        service_account_name=service_account_name,
        env_config_maps=env_config_maps,
        env_secrets=env_secrets,
        env_vars=env_vars,
        volume_mounts=volume_mounts,
        volumes=volumes,
        labels=labels,
        namespace=namespace,
        resources=resources,
        scheduler_name=scheduler_name,
        run_k8s_config=UserDefinedDagsterK8sConfig.from_dict(
            {
                "container_config": container_config,
                "pod_template_spec_metadata": pod_template_spec_metadata,
                "pod_spec_config": pod_spec_config,
                "job_metadata": job_metadata,
                "job_spec_config": job_spec_config,
                "merge_behavior": merge_behavior.value,
            }
        ),
    )

    container_context = run_container_context.merge(op_container_context)

    namespace = container_context.namespace

    user_defined_k8s_config = container_context.run_k8s_config

    k8s_job_config = DagsterK8sJobConfig(
        job_image=image,
        dagster_home=None,
    )

    job_name = k8s_job_name or get_k8s_job_name(
        context.run_id, context.get_step_execution_context().step.key
    )

    retry_number = context.retry_number
    if retry_number > 0:
        job_name = f"{job_name}-{retry_number}"

    labels = {
        "dagster/job": context.dagster_run.job_name,
        "dagster/op": context.op.name,
        "dagster/run-id": context.dagster_run.run_id,
    }
    if context.dagster_run.remote_job_origin:
        labels[
            "dagster/code-location"
        ] = context.dagster_run.remote_job_origin.repository_origin.code_location_origin.location_name

    job = construct_dagster_k8s_job(
        job_config=k8s_job_config,
        args=args,
        job_name=job_name,
        pod_name=job_name,
        component="k8s_job_op",
        user_defined_k8s_config=user_defined_k8s_config,
        labels=labels,
    )

    if load_incluster_config:
        kubernetes.config.load_incluster_config()
    else:
        kubernetes.config.load_kube_config(kubeconfig_file, context=_kubeconfig_file_context)

    # changing this to be able to be passed in will allow for unit testing
    api_client = DagsterKubernetesClient.production_client()

    context.log.info(f"Creating Kubernetes job {job_name} in namespace {namespace}...")

    start_time = time.time()

    api_client.batch_api.create_namespaced_job(namespace, job)

    context.log.info("Waiting for Kubernetes job to finish...")

    timeout = timeout or 0

    try:
        api_client.wait_for_job(
            job_name=job_name,
            namespace=namespace,
            wait_timeout=timeout,
            start_time=start_time,
        )

        restart_policy = user_defined_k8s_config.pod_spec_config.get("restart_policy", "Never")

        if restart_policy == "Never":
            container_name = container_config.get("name", "dagster")

            pods = api_client.wait_for_job_to_have_pods(
                job_name,
                namespace,
                wait_timeout=timeout,
                start_time=start_time,
            )

            pod_names = [p.metadata.name for p in pods]

            if not pod_names:
                raise Exception("No pod names in job after it started")

            pod_to_watch = pod_names[0]
            watch = kubernetes.watch.Watch()  # consider moving in to api_client

            api_client.wait_for_pod(
                pod_to_watch,
                namespace,  # pyright: ignore[reportArgumentType]
                wait_timeout=timeout,
                start_time=start_time,  # pyright: ignore[reportArgumentType]
            )

            while True:
                log_stream = watch.stream(
                    api_client.core_api.read_namespaced_pod_log,
                    name=pod_to_watch,
                    namespace=namespace,
                    container=container_name,
                )
                try:
                    while True:
                        if timeout and time.time() - start_time > timeout:
                            watch.stop()
                            raise Exception("Timed out waiting for pod to finish")

                        log_entry = k8s_api_retry(
                            lambda: next(log_stream),
                            max_retries=int(
                                os.getenv("DAGSTER_EXECUTE_K8S_JOB_STREAM_LOGS_RETRIES", "3")
                            ),
                            timeout=int(
                                os.getenv(
                                    "DAGSTER_EXECUTE_K8S_JOB_STREAM_LOGS_WAIT_BETWEEN_ATTEMPTS", "5"
                                )
                            ),
                        )
                        print(log_entry)  # noqa: T201
                except StopIteration:
                    break
                except ProtocolError as e:
                    context.log.info(
                        f"urllib3.exceptions.ProtocolError. Pausing and will reconnect. {str(e)}"
                    )
                    time.sleep(3)
        else:
            context.log.info("Pod logs are disabled, because restart_policy is not Never")

        if job_spec_config and job_spec_config.get("parallelism"):
            num_pods_to_wait_for = job_spec_config["parallelism"]
        else:
            num_pods_to_wait_for = DEFAULT_JOB_POD_COUNT

        api_client.wait_for_running_job_to_succeed(
            job_name=job_name,
            namespace=namespace,
            wait_timeout=timeout,
            start_time=start_time,
            num_pods_to_wait_for=num_pods_to_wait_for,
        )
    except (DagsterExecutionInterruptedError, Exception) as e:
        try:
            pods = api_client.get_pod_names_in_job(job_name=job_name, namespace=namespace)
            pod_debug_info = "\n\n".join(
                [api_client.get_pod_debug_info(pod_name, namespace) for pod_name in pods]
            )
        except Exception:
            context.log.exception(
                f"Error trying to get pod debug information for failed k8s job {job_name}"
            )
        else:
            context.log.error(
                f"Debug information for failed k8s job {job_name}:\n\n{pod_debug_info}"
            )

        if delete_failed_k8s_jobs:
            context.log.info(
                f"Deleting Kubernetes job {job_name} in namespace {namespace} due to exception"
            )
            api_client.delete_job(job_name=job_name, namespace=namespace)
        raise e

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area: execution Related to Execution deployment: k8s Related to deploying Dagster to Kubernetes type: bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants