|
Server : LiteSpeed System : Linux server51.dnsbootclub.com 4.18.0-553.62.1.lve.el8.x86_64 #1 SMP Mon Jul 21 17:50:35 UTC 2025 x86_64 User : nandedex ( 1060) PHP Version : 8.1.33 Disable Function : NONE Directory : /opt/cppython/lib/python3.8/test/test_asyncio/ |
import asyncio
import unittest
import time
def tearDownModule():
asyncio.set_event_loop_policy(None)
class SlowTask:
""" Task will run for this defined time, ignoring cancel requests """
TASK_TIMEOUT = 0.2
def __init__(self):
self.exited = False
async def run(self):
exitat = time.monotonic() + self.TASK_TIMEOUT
while True:
tosleep = exitat - time.monotonic()
if tosleep <= 0:
break
try:
await asyncio.sleep(tosleep)
except asyncio.CancelledError:
pass
self.exited = True
class AsyncioWaitForTest(unittest.TestCase):
async def atest_asyncio_wait_for_cancelled(self):
t = SlowTask()
waitfortask = asyncio.create_task(asyncio.wait_for(t.run(), t.TASK_TIMEOUT * 2))
await asyncio.sleep(0)
waitfortask.cancel()
await asyncio.wait({waitfortask})
self.assertTrue(t.exited)
def test_asyncio_wait_for_cancelled(self):
asyncio.run(self.atest_asyncio_wait_for_cancelled())
async def atest_asyncio_wait_for_timeout(self):
t = SlowTask()
try:
await asyncio.wait_for(t.run(), t.TASK_TIMEOUT / 2)
except asyncio.TimeoutError:
pass
self.assertTrue(t.exited)
def test_asyncio_wait_for_timeout(self):
asyncio.run(self.atest_asyncio_wait_for_timeout())
if __name__ == '__main__':
unittest.main()