003 File Manager
Current Path:
/usr/local/lib/python3.8/test/test_asyncio
usr
/
local
/
lib
/
python3.8
/
test
/
test_asyncio
/
📁
..
📄
__init__.py
(244 B)
📄
__main__.py
(58 B)
📁
__pycache__
📄
echo.py
(148 B)
📄
echo2.py
(123 B)
📄
echo3.py
(276 B)
📄
functional.py
(7.47 KB)
📄
test_asyncio_waitfor.py
(1.45 KB)
📄
test_base_events.py
(78.14 KB)
📄
test_buffered_proto.py
(2.28 KB)
📄
test_context.py
(1020 B)
📄
test_events.py
(99.68 KB)
📄
test_futures.py
(26.42 KB)
📄
test_futures2.py
(694 B)
📄
test_locks.py
(33.67 KB)
📄
test_pep492.py
(6.04 KB)
📄
test_proactor_events.py
(35.04 KB)
📄
test_protocols.py
(2 KB)
📄
test_queues.py
(21.16 KB)
📄
test_runners.py
(5.06 KB)
📄
test_selector_events.py
(47.17 KB)
📄
test_sendfile.py
(19.67 KB)
📄
test_server.py
(3.93 KB)
📄
test_sock_lowlevel.py
(11.84 KB)
📄
test_sslproto.py
(25.78 KB)
📄
test_streams.py
(36.21 KB)
📄
test_subprocess.py
(25.33 KB)
📄
test_tasks.py
(107.17 KB)
📄
test_transports.py
(3.53 KB)
📄
test_unix_events.py
(66.23 KB)
📄
test_windows_events.py
(8.92 KB)
📄
test_windows_utils.py
(4.07 KB)
📄
utils.py
(16.83 KB)
Editing: test_runners.py
import asyncio import unittest from unittest import mock from . import utils as test_utils class TestPolicy(asyncio.AbstractEventLoopPolicy): def __init__(self, loop_factory): self.loop_factory = loop_factory self.loop = None def get_event_loop(self): # shouldn't ever be called by asyncio.run() raise RuntimeError def new_event_loop(self): return self.loop_factory() def set_event_loop(self, loop): if loop is not None: # we want to check if the loop is closed # in BaseTest.tearDown self.loop = loop class BaseTest(unittest.TestCase): def new_loop(self): loop = asyncio.BaseEventLoop() loop._process_events = mock.Mock() loop._selector = mock.Mock() loop._selector.select.return_value = () loop.shutdown_ag_run = False async def shutdown_asyncgens(): loop.shutdown_ag_run = True loop.shutdown_asyncgens = shutdown_asyncgens return loop def setUp(self): super().setUp() policy = TestPolicy(self.new_loop) asyncio.set_event_loop_policy(policy) def tearDown(self): policy = asyncio.get_event_loop_policy() if policy.loop is not None: self.assertTrue(policy.loop.is_closed()) self.assertTrue(policy.loop.shutdown_ag_run) asyncio.set_event_loop_policy(None) super().tearDown() class RunTests(BaseTest): def test_asyncio_run_return(self): async def main(): await asyncio.sleep(0) return 42 self.assertEqual(asyncio.run(main()), 42) def test_asyncio_run_raises(self): async def main(): await asyncio.sleep(0) raise ValueError('spam') with self.assertRaisesRegex(ValueError, 'spam'): asyncio.run(main()) def test_asyncio_run_only_coro(self): for o in {1, lambda: None}: with self.subTest(obj=o), \ self.assertRaisesRegex(ValueError, 'a coroutine was expected'): asyncio.run(o) def test_asyncio_run_debug(self): async def main(expected): loop = asyncio.get_event_loop() self.assertIs(loop.get_debug(), expected) asyncio.run(main(False)) asyncio.run(main(True), debug=True) with mock.patch('asyncio.coroutines._is_debug_mode', lambda: True): asyncio.run(main(True)) asyncio.run(main(False), debug=False) def test_asyncio_run_from_running_loop(self): async def main(): coro = main() try: asyncio.run(coro) finally: coro.close() # Suppress ResourceWarning with self.assertRaisesRegex(RuntimeError, 'cannot be called from a running'): asyncio.run(main()) def test_asyncio_run_cancels_hanging_tasks(self): lo_task = None async def leftover(): await asyncio.sleep(0.1) async def main(): nonlocal lo_task lo_task = asyncio.create_task(leftover()) return 123 self.assertEqual(asyncio.run(main()), 123) self.assertTrue(lo_task.done()) def test_asyncio_run_reports_hanging_tasks_errors(self): lo_task = None call_exc_handler_mock = mock.Mock() async def leftover(): try: await asyncio.sleep(0.1) except asyncio.CancelledError: 1 / 0 async def main(): loop = asyncio.get_running_loop() loop.call_exception_handler = call_exc_handler_mock nonlocal lo_task lo_task = asyncio.create_task(leftover()) return 123 self.assertEqual(asyncio.run(main()), 123) self.assertTrue(lo_task.done()) call_exc_handler_mock.assert_called_with({ 'message': test_utils.MockPattern(r'asyncio.run.*shutdown'), 'task': lo_task, 'exception': test_utils.MockInstanceOf(ZeroDivisionError) }) def test_asyncio_run_closes_gens_after_hanging_tasks_errors(self): spinner = None lazyboy = None class FancyExit(Exception): pass async def fidget(): while True: yield 1 await asyncio.sleep(1) async def spin(): nonlocal spinner spinner = fidget() try: async for the_meaning_of_life in spinner: # NoQA pass except asyncio.CancelledError: 1 / 0 async def main(): loop = asyncio.get_running_loop() loop.call_exception_handler = mock.Mock() nonlocal lazyboy lazyboy = asyncio.create_task(spin()) raise FancyExit with self.assertRaises(FancyExit): asyncio.run(main()) self.assertTrue(lazyboy.done()) self.assertIsNone(spinner.ag_frame) self.assertFalse(spinner.ag_running)
Upload File
Create Folder