diff --git a/python_modules/libraries/dagster-k8s/dagster_k8s/ops/k8s_job_op.py b/python_modules/libraries/dagster-k8s/dagster_k8s/ops/k8s_job_op.py index 83562d7f7cd76..ff471ba33f4cd 100644 --- a/python_modules/libraries/dagster-k8s/dagster_k8s/ops/k8s_job_op.py +++ b/python_modules/libraries/dagster-k8s/dagster_k8s/ops/k8s_job_op.py @@ -18,6 +18,7 @@ from dagster._annotations import experimental from dagster._core.errors import DagsterExecutionInterruptedError from dagster._utils.merger import merge_dicts +from urllib3.exceptions import ProtocolError from dagster_k8s.client import DEFAULT_JOB_POD_COUNT, DagsterKubernetesClient, k8s_api_retry from dagster_k8s.container_context import K8sContainerContext @@ -377,33 +378,38 @@ def execute_k8s_job( start_time=start_time, # pyright: ignore[reportArgumentType] ) - log_stream = watch.stream( - api_client.core_api.read_namespaced_pod_log, - name=pod_to_watch, - namespace=namespace, - container=container_name, - ) - while True: - if timeout and time.time() - start_time > timeout: - watch.stop() - raise Exception("Timed out waiting for pod to finish") try: - log_entry = k8s_api_retry( - lambda: next(log_stream), - max_retries=int( - os.getenv("DAGSTER_EXECUTE_K8S_JOB_STREAM_LOGS_RETRIES", "3") - ), - timeout=int( - os.getenv( - "DAGSTER_EXECUTE_K8S_JOB_STREAM_LOGS_WAIT_BETWEEN_ATTEMPTS", "5" - ) - ), + log_stream = watch.stream( + api_client.core_api.read_namespaced_pod_log, + name=pod_to_watch, + namespace=namespace, + container=container_name, ) - print(log_entry) # noqa: T201 + while True: + if timeout and time.time() - start_time > timeout: + watch.stop() + raise Exception("Timed out waiting for pod to finish") + + log_entry = k8s_api_retry( + lambda: next(log_stream), + max_retries=int( + os.getenv("DAGSTER_EXECUTE_K8S_JOB_STREAM_LOGS_RETRIES", "3") + ), + timeout=int( + os.getenv( + "DAGSTER_EXECUTE_K8S_JOB_STREAM_LOGS_WAIT_BETWEEN_ATTEMPTS", "5" + ) + ), + ) + print(log_entry) # noqa: T201 except StopIteration: break - else: + except ProtocolError as e: + context.log.warning( + f"urllib3.exceptions.ProtocolError. Pausing and will reconnect. {e}" + ) + time.sleep(5) context.log.info("Pod logs are disabled, because restart_policy is not Never") if job_spec_config and job_spec_config.get("parallelism"):