Skip to content

Commit

Permalink
Don't dispose pools when using internal api (apache#38552)
Browse files Browse the repository at this point in the history
Needed for AIP-44
  • Loading branch information
dstandish authored Mar 28, 2024
1 parent 4e070ef commit 5de9075
Showing 1 changed file with 8 additions and 6 deletions.
14 changes: 8 additions & 6 deletions airflow/task/task_runner/standard_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import psutil
from setproctitle import setproctitle

from airflow.api_internal.internal_api_call import InternalApiConfig
from airflow.models.taskinstance import TaskReturnCode
from airflow.settings import CAN_FORK
from airflow.task.task_runner.base_task_runner import BaseTaskRunner
Expand Down Expand Up @@ -74,11 +75,12 @@ def _start_by_fork(self):
from airflow.cli.cli_parser import get_parser
from airflow.sentry import Sentry

# Force a new SQLAlchemy session. We can't share open DB handles
# between process. The cli code will re-create this as part of its
# normal startup
settings.engine.pool.dispose()
settings.engine.dispose()
if not InternalApiConfig.get_use_internal_api():
# Force a new SQLAlchemy session. We can't share open DB handles
# between process. The cli code will re-create this as part of its
# normal startup
settings.engine.pool.dispose()
settings.engine.dispose()

parser = get_parser()
# [1:] - remove "airflow" from the start of the command
Expand Down Expand Up @@ -107,7 +109,7 @@ def _start_by_fork(self):
except Exception as exc:
return_code = 1

self.log.error(
self.log.exception(
"Failed to execute job %s for task %s (%s; %r)",
job_id,
self._task_instance.task_id,
Expand Down

0 comments on commit 5de9075

Please sign in to comment.