003 File Manager
Current Path:
/usr/local/lib/python3.8/test/test_asyncio
usr
/
local
/
lib
/
python3.8
/
test
/
test_asyncio
/
📁
..
📄
__init__.py
(244 B)
📄
__main__.py
(58 B)
📁
__pycache__
📄
echo.py
(148 B)
📄
echo2.py
(123 B)
📄
echo3.py
(276 B)
📄
functional.py
(7.47 KB)
📄
test_asyncio_waitfor.py
(1.45 KB)
📄
test_base_events.py
(78.14 KB)
📄
test_buffered_proto.py
(2.28 KB)
📄
test_context.py
(1020 B)
📄
test_events.py
(99.68 KB)
📄
test_futures.py
(26.42 KB)
📄
test_futures2.py
(694 B)
📄
test_locks.py
(33.67 KB)
📄
test_pep492.py
(6.04 KB)
📄
test_proactor_events.py
(35.04 KB)
📄
test_protocols.py
(2 KB)
📄
test_queues.py
(21.16 KB)
📄
test_runners.py
(5.06 KB)
📄
test_selector_events.py
(47.17 KB)
📄
test_sendfile.py
(19.67 KB)
📄
test_server.py
(3.93 KB)
📄
test_sock_lowlevel.py
(11.84 KB)
📄
test_sslproto.py
(25.78 KB)
📄
test_streams.py
(36.21 KB)
📄
test_subprocess.py
(25.33 KB)
📄
test_tasks.py
(107.17 KB)
📄
test_transports.py
(3.53 KB)
📄
test_unix_events.py
(66.23 KB)
📄
test_windows_events.py
(8.92 KB)
📄
test_windows_utils.py
(4.07 KB)
📄
utils.py
(16.83 KB)
Editing: test_asyncio_waitfor.py
import asyncio import unittest import time def tearDownModule(): asyncio.set_event_loop_policy(None) class SlowTask: """ Task will run for this defined time, ignoring cancel requests """ TASK_TIMEOUT = 0.2 def __init__(self): self.exited = False async def run(self): exitat = time.monotonic() + self.TASK_TIMEOUT while True: tosleep = exitat - time.monotonic() if tosleep <= 0: break try: await asyncio.sleep(tosleep) except asyncio.CancelledError: pass self.exited = True class AsyncioWaitForTest(unittest.TestCase): async def atest_asyncio_wait_for_cancelled(self): t = SlowTask() waitfortask = asyncio.create_task(asyncio.wait_for(t.run(), t.TASK_TIMEOUT * 2)) await asyncio.sleep(0) waitfortask.cancel() await asyncio.wait({waitfortask}) self.assertTrue(t.exited) def test_asyncio_wait_for_cancelled(self): asyncio.run(self.atest_asyncio_wait_for_cancelled()) async def atest_asyncio_wait_for_timeout(self): t = SlowTask() try: await asyncio.wait_for(t.run(), t.TASK_TIMEOUT / 2) except asyncio.TimeoutError: pass self.assertTrue(t.exited) def test_asyncio_wait_for_timeout(self): asyncio.run(self.atest_asyncio_wait_for_timeout()) if __name__ == '__main__': unittest.main()
Upload File
Create Folder