|
Server : LiteSpeed System : Linux server51.dnsbootclub.com 4.18.0-553.62.1.lve.el8.x86_64 #1 SMP Mon Jul 21 17:50:35 UTC 2025 x86_64 User : nandedex ( 1060) PHP Version : 8.1.33 Disable Function : NONE Directory : /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/utils/ |
from typing import List, Union, Tuple
import asyncio
class AsyncIterate: # not AsyncIterable because python use this name already
def __init__(self, data: Union[List, Tuple]):
self.queue = iter(data)
def __aiter__(self):
return self
async def __anext__(self):
data = await self.fetch_data()
if data is not None:
return data
else:
raise StopAsyncIteration
async def fetch_data(self):
try:
item = next(self.queue)
except StopIteration:
item = None
return item
async def gather(*tasks: List) -> AsyncIterate:
results = await asyncio.gather(*tasks)
return AsyncIterate(results)