Skip to content

Commit

Permalink
Fix Hybrid Container Healthcheck Throttling (aws#136)
Browse files Browse the repository at this point in the history
*Issue #, if available:* N/A

*Description of changes:*

Discovered an issue where the worker healthchecks were not being
performed in the hybrid container when the worker subprocess was run
after the scheduler subprocess. Upon further investigation the root
cause was determined to be the throttling conditions for the method
`_check_process_conditions` which restricted the container to one call
of the function per 60 seconds. This PR removes calls to
`_check_process_conditions` in the subprocess `execution_loop_iter` loop
if the subprocess does not have conditions. Also changes the throttle
settings of `_check_process_conditions` to be instance level rather than
global so that it is throttled per subprocess.

---

By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice.
  • Loading branch information
kuyperse authored Sep 4, 2024
1 parent b9bca58 commit 568ab1d
Showing 1 changed file with 2 additions and 2 deletions.
4 changes: 2 additions & 2 deletions images/airflow/2.9.2/python/mwaa/subprocess/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def start(
exc_info=sys.exc_info(),
)

@throttle(60) # so we don't make excessive calls to process conditions
@throttle(seconds=60, instance_level_throttling=True) # avoid excessive calls to process conditions
def _check_process_conditions(self) -> List[ProcessConditionResponse]:
# Evaluate all conditions
checked_conditions = [c.check(self.process_status) for c in self.conditions]
Expand Down Expand Up @@ -246,7 +246,7 @@ def execution_loop_iter(self):
if self.process_status == ProcessStatus.FINISHED:
# We are done; call shutdown to ensure that we free all resources.
self.shutdown()
elif self.process_status == ProcessStatus.RUNNING:
elif self.process_status == ProcessStatus.RUNNING and self.conditions:
# The process is still running, so we need to check conditions.
failed_conditions = self._check_process_conditions()

Expand Down

0 comments on commit 568ab1d

Please sign in to comment.