forked from python/cpython
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
pythongh-128002: fix many thread safety issues in asyncio (python#128147
) * Makes `_asyncio.Task` and `_asyncio.Future` thread-safe by adding critical sections * Add assertions to check for thread safety checking locking of object by critical sections in internal functions * Make `_asyncio.all_tasks` thread safe when eager tasks are used * Add a thread safety test
- Loading branch information
1 parent
f157485
commit 513a4ef
Showing
3 changed files
with
951 additions
and
175 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
import asyncio | ||
import unittest | ||
from threading import Thread | ||
from unittest import TestCase | ||
|
||
from test.support import threading_helper | ||
|
||
threading_helper.requires_working_threading(module=True) | ||
|
||
def tearDownModule(): | ||
asyncio._set_event_loop_policy(None) | ||
|
||
|
||
class TestFreeThreading: | ||
def test_all_tasks_race(self) -> None: | ||
async def main(): | ||
loop = asyncio.get_running_loop() | ||
future = loop.create_future() | ||
|
||
async def coro(): | ||
await future | ||
|
||
tasks = set() | ||
|
||
async with asyncio.TaskGroup() as tg: | ||
for _ in range(100): | ||
tasks.add(tg.create_task(coro())) | ||
|
||
all_tasks = self.all_tasks(loop) | ||
self.assertEqual(len(all_tasks), 101) | ||
|
||
for task in all_tasks: | ||
self.assertEqual(task.get_loop(), loop) | ||
self.assertFalse(task.done()) | ||
|
||
current = self.current_task() | ||
self.assertEqual(current.get_loop(), loop) | ||
self.assertSetEqual(all_tasks, tasks | {current}) | ||
future.set_result(None) | ||
|
||
def runner(): | ||
with asyncio.Runner() as runner: | ||
loop = runner.get_loop() | ||
loop.set_task_factory(self.factory) | ||
runner.run(main()) | ||
|
||
threads = [] | ||
|
||
for _ in range(10): | ||
thread = Thread(target=runner) | ||
threads.append(thread) | ||
|
||
with threading_helper.start_threads(threads): | ||
pass | ||
|
||
|
||
class TestPyFreeThreading(TestFreeThreading, TestCase): | ||
all_tasks = staticmethod(asyncio.tasks._py_all_tasks) | ||
current_task = staticmethod(asyncio.tasks._py_current_task) | ||
|
||
def factory(self, loop, coro, context=None): | ||
return asyncio.tasks._PyTask(coro, loop=loop, context=context) | ||
|
||
|
||
@unittest.skipUnless(hasattr(asyncio.tasks, "_c_all_tasks"), "requires _asyncio") | ||
class TestCFreeThreading(TestFreeThreading, TestCase): | ||
all_tasks = staticmethod(getattr(asyncio.tasks, "_c_all_tasks", None)) | ||
current_task = staticmethod(getattr(asyncio.tasks, "_c_current_task", None)) | ||
|
||
def factory(self, loop, coro, context=None): | ||
return asyncio.tasks._CTask(coro, loop=loop, context=context) | ||
|
||
|
||
class TestEagerPyFreeThreading(TestPyFreeThreading): | ||
def factory(self, loop, coro, context=None): | ||
return asyncio.tasks._PyTask(coro, loop=loop, context=context, eager_start=True) | ||
|
||
|
||
@unittest.skipUnless(hasattr(asyncio.tasks, "_c_all_tasks"), "requires _asyncio") | ||
class TestEagerCFreeThreading(TestCFreeThreading, TestCase): | ||
def factory(self, loop, coro, context=None): | ||
return asyncio.tasks._CTask(coro, loop=loop, context=context, eager_start=True) |
Oops, something went wrong.