Skip to content

Commit

Permalink
fix(backport v3.8.x): backport PR#395: retry_task_map: fix mem leak w…
Browse files Browse the repository at this point in the history
…hen network is totally cut off (#436)

* fix: retry_task_map: fix mem leak when network is totally cut off (#395)

Previously, when handling failed task, the retry_task_map first releases the semaphore, and then reschedules the failed task directly without acquire the semaphore again. This results in the concurrent tasks semaphore being escaped when there are a lot of failed tasks. 
This is especially critical when the network is totally cut off, the failed tasks are pilling up and new tasks are still being scheduled, resulting in memory leaks.

This PR fixes the above problem by fixing the task_done_cb, now when handling and re-scheduling the failed tasks, the semaphore will be kept. semaphore will only be released when the task is done successfully, or the thread-pool shutdowns.

Other refinements to the retry_task_map includes:
1. watchdog: when watchdog func failed, watchdog thread will try to drain the pending workitem queue.
2. fut_gen: when thread-pool shutdowns, drain the finished futures queue.
3. dispatcher: directly exits when thread-pool shutdowns.
4. watchdog: refine the watchdog thread implementation.
  • Loading branch information
Bodong-Yang authored Nov 29, 2024
1 parent 1a9329a commit 8dbdfb2
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 56 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ on:
pull_request:
branches:
- main
- v*
push:
branches:
- main
- v*
paths:
- "src/**"
- "tests/**"
Expand Down
2 changes: 1 addition & 1 deletion src/otaclient/app/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class BaseConfig(_InternalSettings):
"otaclient": INFO,
"otaclient_api": INFO,
"otaclient_common": INFO,
"otaproxy": INFO,
"ota_proxy": INFO,
}
LOG_FORMAT = (
"[%(asctime)s][%(levelname)s]-%(name)s:%(funcName)s:%(lineno)d,%(message)s"
Expand Down
177 changes: 122 additions & 55 deletions src/otaclient_common/retry_task_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from concurrent.futures import Future, ThreadPoolExecutor
from functools import partial
from queue import Empty, SimpleQueue
from typing import Any, Callable, Generator, Iterable, Optional
from typing import TYPE_CHECKING, Any, Callable, Generator, Iterable, Optional

from otaclient_common.typing import RT, T

Expand All @@ -35,7 +35,7 @@ class TasksEnsureFailed(Exception):
"""Exception for tasks ensuring failed."""


class ThreadPoolExecutorWithRetry(ThreadPoolExecutor):
class _ThreadPoolExecutorWithRetry(ThreadPoolExecutor):

def __init__(
self,
Expand All @@ -48,21 +48,6 @@ def __init__(
initializer: Callable[..., Any] | None = None,
initargs: tuple = (),
) -> None:
"""Initialize a ThreadPoolExecutorWithRetry instance.
Args:
max_concurrent (int): Limit the number pending scheduled tasks.
max_workers (Optional[int], optional): Max number of worker threads in the pool. Defaults to None.
max_total_retry (Optional[int], optional): Max total retry counts before abort. Defaults to None.
thread_name_prefix (str, optional): Defaults to "".
watchdog_func (Optional[Callable]): A custom func to be called on watchdog thread, when
this func raises exception, the watchdog will interrupt the tasks execution. Defaults to None.
watchdog_check_interval (int): Defaults to 3(seconds).
initializer (Callable[..., Any] | None): The same <initializer> param passed through to ThreadPoolExecutor.
Defaults to None.
initargs (tuple): The same <initargs> param passed through to ThreadPoolExecutor.
Defaults to ().
"""
self._start_lock, self._started = threading.Lock(), False
self._total_task_num = 0
"""
Expand All @@ -78,65 +63,80 @@ def __init__(
self._concurrent_semaphore = threading.Semaphore(max_concurrent)
self._fut_queue: SimpleQueue[Future[Any]] = SimpleQueue()

self._watchdog_check_interval = watchdog_check_interval
self._checker_funcs: list[Callable[[], Any]] = []
if isinstance(max_total_retry, int) and max_total_retry > 0:
self._checker_funcs.append(partial(self._max_retry_check, max_total_retry))
if callable(watchdog_func):
self._checker_funcs.append(watchdog_func)

super().__init__(
max_workers=max_workers,
thread_name_prefix=thread_name_prefix,
initializer=initializer,
initargs=initargs,
)

if max_total_retry or callable(watchdog_func):
threading.Thread(
target=self._watchdog,
args=(max_total_retry, watchdog_func, watchdog_check_interval),
daemon=True,
).start()
def _max_retry_check(self, max_total_retry: int) -> None:
if self._retry_count > max_total_retry:
raise TasksEnsureFailed("exceed max retry count, abort")

def _watchdog(
self,
max_retry: int | None,
watchdog_func: Callable[..., Any] | None,
_checker_funcs: list[Callable[[], Any]],
interval: int,
) -> None:
"""Watchdog will shutdown the threadpool on certain conditions being met."""
while not self._shutdown and not concurrent_fut_thread._shutdown:
if max_retry and self._retry_count > max_retry:
logger.warning(f"exceed {max_retry=}, abort")
return self.shutdown(wait=True)

if callable(watchdog_func):
try:
watchdog_func()
except Exception as e:
logger.warning(f"custom watchdog func failed: {e!r}, abort")
return self.shutdown(wait=True)
while not (self._shutdown or self._broken or concurrent_fut_thread._shutdown):
time.sleep(interval)
try:
for _func in _checker_funcs:
_func()
except Exception as e:
logger.warning(
f"watchdog failed: {e!r}, shutdown the pool and draining the workitem queue on shutdown.."
)
self.shutdown(wait=False)
# drain the worker queues to cancel all the futs
with contextlib.suppress(Empty):
while True:
self._work_queue.get_nowait()

def _task_done_cb(
self, fut: Future[Any], /, *, item: T, func: Callable[[T], Any]
) -> None:
self._concurrent_semaphore.release() # always release se first
if self._shutdown or self._broken or concurrent_fut_thread._shutdown:
self._concurrent_semaphore.release() # on shutdown, always release a se
return # on shutdown, no need to put done fut into fut_queue
self._fut_queue.put_nowait(fut)

# ------ on task failed ------ #
if fut.exception():
self._retry_count = next(self._retry_counter)
with contextlib.suppress(Exception): # on threadpool shutdown
try: # try to re-schedule the failed task
self.submit(func, item).add_done_callback(
partial(self._task_done_cb, item=item, func=func)
)
except Exception: # if re-schedule doesn't happen, release se
self._concurrent_semaphore.release()
else: # release semaphore when succeeded
self._concurrent_semaphore.release()

def _fut_gen(self, interval: int) -> Generator[Future[Any], Any, None]:
"""Generator which yields the done future, controlled by the caller."""
finished_tasks = 0
while finished_tasks == 0 or finished_tasks != self._total_task_num:
if self._total_task_num < 0:
return

if self._shutdown or self._broken or concurrent_fut_thread._shutdown:
logger.warning(
f"failed to ensure all tasks, {finished_tasks=}, {self._total_task_num=}"
f"dispatcher exits on threadpool shutdown, {finished_tasks=}, {self._total_task_num=}"
)
raise TasksEnsureFailed
with contextlib.suppress(Empty):
while True: # drain the _fut_queue
self._fut_queue.get_nowait()
raise TasksEnsureFailed # raise exc to upper caller

try:
done_fut = self._fut_queue.get_nowait()
Expand All @@ -153,20 +153,6 @@ def ensure_tasks(
*,
ensure_tasks_pull_interval: int = 1,
) -> Generator[Future[RT], None, None]:
"""Ensure all the items in <iterable> are processed by <func> in the pool.
Args:
func (Callable[[T], RT]): The function to take the item from <iterable>.
iterable (Iterable[T]): The iterable of items to be processed by <func>.
Raises:
ValueError: If the pool is shutdown or broken, or this method has already
being called once.
TasksEnsureFailed: If failed to ensure all the tasks are finished.
Yields:
The Future instance of each processed tasks.
"""
with self._start_lock:
if self._started:
try:
Expand All @@ -175,19 +161,34 @@ def ensure_tasks(
del self, func, iterable
self._started = True

if self._checker_funcs:
threading.Thread(
target=self._watchdog,
args=(self._checker_funcs, self._watchdog_check_interval),
daemon=True,
).start()

# ------ dispatch tasks from iterable ------ #
def _dispatcher() -> None:
_tasks_count = -1 # means no task is scheduled
try:
for _tasks_count, item in enumerate(iterable, start=1):
if (
self._shutdown
or self._broken
or concurrent_fut_thread._shutdown
):
logger.warning("threadpool is closing, exit")
return # directly exit on shutdown

self._concurrent_semaphore.acquire()
fut = self.submit(func, item)
fut.add_done_callback(
partial(self._task_done_cb, item=item, func=func)
)
except Exception as e:
logger.error(f"tasks dispatcher failed: {e!r}, abort")
self.shutdown(wait=True)
self.shutdown(wait=False)
return

self._total_task_num = _tasks_count
Expand All @@ -203,3 +204,69 @@ def _dispatcher() -> None:
# a generator so that the first fut will be dispatched before
# we start to get from fut_queue.
return self._fut_gen(ensure_tasks_pull_interval)


# only expose APIs we want to exposed
if TYPE_CHECKING:

class ThreadPoolExecutorWithRetry:

def __init__(
self,
max_concurrent: int,
max_workers: Optional[int] = None,
max_total_retry: Optional[int] = None,
thread_name_prefix: str = "",
watchdog_func: Optional[Callable] = None,
watchdog_check_interval: int = 3, # seconds
initializer: Callable[..., Any] | None = None,
initargs: tuple = (),
) -> None:
"""Initialize a ThreadPoolExecutorWithRetry instance.
Args:
max_concurrent (int): Limit the number pending scheduled tasks.
max_workers (Optional[int], optional): Max number of worker threads in the pool. Defaults to None.
max_total_retry (Optional[int], optional): Max total retry counts before abort. Defaults to None.
thread_name_prefix (str, optional): Defaults to "".
watchdog_func (Optional[Callable]): A custom func to be called on watchdog thread, when
this func raises exception, the watchdog will interrupt the tasks execution. Defaults to None.
watchdog_check_interval (int): Defaults to 3(seconds).
initializer (Callable[..., Any] | None): The same <initializer> param passed through to ThreadPoolExecutor.
Defaults to None.
initargs (tuple): The same <initargs> param passed through to ThreadPoolExecutor.
Defaults to ().
"""
raise NotImplementedError

def ensure_tasks(
self,
func: Callable[[T], RT],
iterable: Iterable[T],
*,
ensure_tasks_pull_interval: int = 1,
) -> Generator[Future[RT], None, None]:
"""Ensure all the items in <iterable> are processed by <func> in the pool.
Args:
func (Callable[[T], RT]): The function to take the item from <iterable>.
iterable (Iterable[T]): The iterable of items to be processed by <func>.
Raises:
ValueError: If the pool is shutdown or broken, or this method has already
being called once.
TasksEnsureFailed: If failed to ensure all the tasks are finished.
Yields:
The Future instance of each processed tasks.
"""
raise NotImplementedError

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
raise NotImplementedError

else:
ThreadPoolExecutorWithRetry = _ThreadPoolExecutorWithRetry

1 comment on commit 8dbdfb2

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coverage

Coverage Report
FileStmtsMissCoverMissing
src/ota_metadata/legacy
   __init__.py110100% 
   parser.py3353888%100, 156, 161, 197–198, 208–209, 212, 224, 282, 292–295, 334–337, 417, 420, 428–430, 443, 452–453, 456–457, 669–670, 673, 700–702, 752, 755–757
   types.py841384%37, 40–42, 112–116, 122–125
src/ota_proxy
   __init__.py361072%59, 61, 63, 72, 81–82, 102, 104–106
   __main__.py770%16–18, 20, 22–23, 25
   _consts.py150100% 
   cache_control_header.py68494%71, 91, 113, 121
   cache_streaming.py1422284%154–156, 184–186, 211, 225, 229–230, 265–266, 268, 280, 348, 354–355, 358, 366–369
   config.py170100% 
   db.py731875%109, 115, 153, 159–160, 163, 169, 171, 192–199, 201–202
   errors.py50100% 
   lru_cache_helper.py47295%84–85
   ota_cache.py2155972%70–71, 140, 151–152, 184–185, 202, 239–243, 247–249, 251, 253–260, 262–264, 267–268, 272–273, 277, 324, 332–334, 413–416, 430, 433–434, 448–449, 451–453, 457–458, 464–465, 496, 502, 529, 581–583
   server_app.py1393971%76, 79, 85, 101, 103, 162, 171, 213–214, 216–218, 221, 226–228, 231–232, 235, 238, 241, 244, 257–258, 261–262, 264, 267, 293–296, 299, 313–315, 321–323
   utils.py140100% 
src/otaclient
   __init__.py5260%17, 19
   __main__.py110%16
   log_setting.py52590%53, 55, 64–66
src/otaclient/app
   __main__.py110%16
   configs.py760100% 
   errors.py1200100% 
   interface.py30100% 
   main.py46589%52–53, 75–77
   ota_client.py37310871%79, 87, 108, 135, 137–138, 140, 144, 148–149, 154–155, 161, 163, 201–204, 210, 214, 220, 339, 351–352, 354, 363, 366, 371–372, 375, 381, 383–387, 406–409, 412–419, 447–450, 496–497, 501, 503–504, 534–535, 544–551, 558, 561–567, 612–615, 623, 659–661, 666–668, 671–672, 674–675, 677, 735–736, 739, 747–748, 751, 762–763, 766, 774–775, 778, 789, 808, 835, 854, 872
   ota_client_stub.py39310972%75–77, 79–80, 88–91, 94–96, 100, 105–106, 108–109, 112, 114–115, 118–120, 123–124, 127–129, 134–139, 143, 146–150, 152–153, 161–163, 166, 203–205, 210, 246, 271, 274, 277, 381, 405, 407, 431, 477, 534, 604–605, 644, 663–665, 671–674, 678–680, 687–689, 692, 696–699, 752, 841–843, 850, 880–881, 884–888, 897–906, 913, 919, 922–923, 927, 930
   update_stats.py104991%57, 103, 105, 114, 116, 125, 127, 148, 179
src/otaclient/boot_control
   __init__.py40100% 
   _common.py24811254%74–75, 96–98, 114–115, 135–136, 155–156, 175–176, 195–196, 218–220, 235–236, 260–266, 287, 295, 313, 321, 340–341, 344–345, 368, 370–379, 381–390, 392–394, 413, 416, 424, 432, 448–450, 452–457, 550, 555, 560, 673, 677–678, 681, 689, 691–692, 718–719, 721–724, 729, 735–736, 739–740, 742, 749–750, 761–767, 777–779, 783–784, 787–788, 791, 797
   _firmware_package.py942276%83, 87, 137, 181, 187, 210–211, 214–219, 221–222, 225–230, 232
   _grub.py41712869%217, 265–268, 274–278, 315–316, 323–328, 331–337, 340, 343–344, 349, 351–353, 362–368, 370–371, 373–375, 384–386, 388–390, 469–470, 474–475, 527, 533, 559, 581, 585–586, 601–603, 627–630, 642, 646–648, 650–652, 711–714, 739–742, 765–768, 780–781, 784–785, 820, 826, 846–847, 849, 861, 864, 867, 870, 874–876, 894–897, 925–928, 933–941, 946–954
   _jetson_cboot.py2622620%20, 22–25, 27–29, 35–38, 40–41, 57–58, 60, 62–63, 69, 73, 132, 135, 137–138, 141, 148–149, 157–158, 161, 167–168, 176, 185–189, 191, 197, 200–201, 207, 210–211, 216–217, 219, 225–226, 229–230, 233–235, 237, 243, 248–250, 252–254, 259, 261–264, 266–267, 276–277, 280–281, 286–287, 290–294, 297–298, 303–304, 307, 310–314, 319–322, 325, 328–329, 332, 335–336, 339, 343–348, 352–353, 357, 360–361, 364, 367–370, 372, 375–376, 380, 383, 386–389, 391, 398, 402–403, 406–407, 413–414, 420, 422–423, 427, 429, 431–433, 436, 440, 443, 446–447, 449, 452, 460–461, 468, 478, 481, 489–490, 495–498, 500, 507, 509–511, 517–518, 522–523, 526, 530, 533, 535, 542–546, 548, 560–563, 566, 569, 571, 578, 582–583, 585–586, 588–590, 592, 594, 597, 600, 603, 605–606, 609–613, 617–619, 621, 629–633, 635, 638, 642, 645, 656–657, 662, 672, 675–683, 687–696, 700–709, 713, 715–717, 719–720, 722–723
   _jetson_common.py1724573%132, 140, 288–291, 294, 311, 319, 354, 359–364, 382, 408–409, 411–413, 417–420, 422–423, 425–429, 431, 438–439, 442–443, 453, 456–457, 460, 462, 506–507
   _jetson_uefi.py39727131%127–129, 134–135, 154–156, 161–164, 331, 449, 451–454, 458, 462–463, 465–473, 475, 487–488, 491–492, 495–496, 499–501, 505–506, 511–513, 517, 521–522, 525–526, 529–530, 534, 537–538, 540, 545–546, 550, 553–554, 559, 563–565, 569–571, 573, 577–580, 582–583, 605–606, 610–611, 613, 617, 621–622, 625–626, 633, 636–638, 641, 643–644, 649–650, 653–656, 658–659, 664, 666–667, 675, 678–681, 683–684, 686, 690–691, 695, 703–707, 710–711, 713, 716–720, 723, 726–730, 734–735, 738–743, 746–747, 750–753, 755–756, 763–764, 774–777, 780, 783–786, 789–793, 796–797, 800, 803–806, 809, 811, 816–817, 820, 823–826, 828, 834, 839–840, 859–860, 863, 871–872, 879, 889, 892, 899–900, 905–908, 916–919, 927–928, 940–943, 945, 948, 951, 959, 970–972, 974–976, 978–982, 987–988, 990, 1003, 1007, 1010, 1020, 1025, 1033–1034, 1037, 1041, 1043–1045, 1051–1052, 1057, 1065–1072, 1077–1085, 1090–1098, 1104–1106
   _rpi_boot.py28713453%55, 58, 122–123, 127, 135–138, 152–155, 162–163, 165–166, 171–172, 175–176, 185–186, 224, 230–234, 237, 255–257, 261–263, 268–270, 274–276, 286–287, 290, 293, 295–296, 298–299, 301–303, 309, 312–313, 323–326, 334–338, 340, 342–343, 348–349, 356–362, 393, 395–398, 408–411, 415–416, 418–422, 450–453, 472–475, 480, 483, 501–504, 509–517, 522–530, 547–550, 556–558, 561, 564
   configs.py550100% 
   protocol.py40100% 
   selecter.py412929%45–47, 50–51, 55–56, 59–61, 64, 66, 70, 78–80, 82–83, 85–86, 90, 92, 94–95, 97, 99–100, 102, 104
src/otaclient/configs
   _common.py80100% 
   ecu_info.py58198%108
   proxy_info.py52296%88, 90
src/otaclient/create_standby
   __init__.py12558%29–31, 33, 35
   common.py2244480%62, 65–66, 70–72, 74, 78–79, 81, 127, 175–177, 179–181, 183, 186–189, 193, 204, 278–279, 281–286, 298, 335, 363, 366–368, 384–385, 399, 403, 425–426
   interface.py50100% 
   rebuild_mode.py97990%93–95, 107–112
src/otaclient_api/v2
   api_caller.py39684%45–47, 83–85
   api_stub.py170100% 
   types.py2562391%86, 89–92, 131, 209–210, 212, 259, 262–263, 506–508, 512–513, 515, 518–519, 522–523, 586
src/otaclient_common
   __init__.py34876%42–44, 61, 63, 69, 76–77
   common.py1561888%47, 202, 205–207, 222, 229–231, 297–299, 309, 318–320, 366, 370
   downloader.py1991094%107–108, 126, 153, 369, 424, 428, 516–517, 526
   linux.py611575%51–53, 59, 69, 74, 76, 108–109, 133–134, 190, 195–196, 198
   logging.py29196%55
   persist_file_handling.py1181884%113, 118, 150–152, 163, 192–193, 228–232, 242–244, 246–247
   proto_streamer.py42880%33, 48, 66–67, 72, 81–82, 100
   proto_wrapper.py3984887%87, 165, 172, 184–186, 205, 210, 221, 257, 263, 268, 299, 303, 307, 402, 462, 469, 472, 492, 499, 501, 526, 532, 535, 537, 562, 568, 571, 573, 605, 609, 611, 625, 642, 669, 672, 676, 707, 713, 760–763, 765, 803–805
   retry_task_map.py105595%158–159, 161, 181–182
   typing.py25388%69–70, 72
TOTAL6298167973% 

Tests Skipped Failures Errors Time
217 0 💤 0 ❌ 0 🔥 12m 55s ⏱️

Please sign in to comment.