asyncio
n'est pas le seul moteur asynchroneasync
et await
ne lui sont pas intrinsèquement liésComment réécrire asyncio
?
https://github.com/entwanne/presentation_python_plongee_asynchrone
async def simple_print(msg):
print(msg)
simple_print
est une fonction renvoyant une coroutinesimple_print
<function __main__.simple_print(msg)>
simple_print('Hello')
<coroutine object simple_print at 0x7fc51869a440>
await
await simple_print('Hello')
Hello
asyncio.run
asyncio.run(simple_print('Hello'))
loop = asyncio.new_event_loop()
loop.run_until_complete(simple_print('Hello'))
__await__
coro = simple_print('Hello')
dir(coro)
['__await__', '__class__', '__del__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__name__', '__ne__', '__new__', '__qualname__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'close', 'cr_await', 'cr_code', 'cr_frame', 'cr_origin', 'cr_running', 'send', 'throw']
coroutine_wrapper
)aw = coro.__await__()
aw
<coroutine_wrapper at 0x7fc5186adf50>
dir(aw)
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__iter__', '__le__', '__lt__', '__ne__', '__new__', '__next__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'close', 'send', 'throw']
for _ in simple_print('Hello').__await__():
pass
Hello
async def complex_work():
await simple_print('Hello')
await asyncio.sleep(0)
await simple_print('World')
for _ in complex_work().__await__():
pass
Hello World
it = complex_work().__await__()
next(it)
Hello
next(it)
World
--------------------------------------------------------------------------- StopIteration Traceback (most recent call last) <ipython-input-12-bc1ab118995a> in <module> ----> 1 next(it) StopIteration:
await asyncio.sleep(0)
a pour effet de yield
await
est équivalent à yield from
__await__
complex_work
:class ComplexWork:
def __await__(self):
print('Hello')
yield
print('World')
yield
rend la méthode génératrice, qui renvoie donc un itérateurawait ComplexWork()
Hello World
it = ComplexWork().__await__()
next(it)
Hello
next(it)
World
--------------------------------------------------------------------------- StopIteration Traceback (most recent call last) <ipython-input-16-bc1ab118995a> in <module> ----> 1 next(it) StopIteration:
class Waiter:
def __init__(self):
self.done = False
def __await__(self):
while not self.done:
yield
Waiter
permet à deux tâches de se synchroniserwaiter = Waiter()
async def wait_job(waiter):
print('start')
await waiter # wait for count_up_to to be finished
print('finished')
async def count_up_to(waiter, n):
for i in range(n):
print(i)
await asyncio.sleep(0)
waiter.done = True
await asyncio.gather(wait_job(waiter), count_up_to(waiter, 10))
start 0 1 2 3 4 5 6 7 8 9 finished
[None, None]
def run_task(task):
it = task.__await__()
while True:
try:
next(it)
except StopIteration:
break
def run_tasks(*tasks):
tasks = [task.__await__() for task in tasks]
while tasks:
# On prend la première tâche disponible
task = tasks.pop(0)
try:
next(task)
except StopIteration:
# La tâche est terminée
pass
else:
# La tâche continue, on la remet en queue de liste
tasks.append(task)
run_tasks(simple_print(1), ComplexWork(), simple_print(2), simple_print(3))
1 Hello 2 3 World
waiter = Waiter()
run_tasks(wait_job(waiter), count_up_to(waiter, 5))
start 0 1 2 3 4 finished
class interrupt:
def __await__(self):
yield
import time
async def sleep_until(t):
while time.time() < t:
await interrupt()
async def sleep(duration):
await sleep_until(time.time() + duration)
async def print_messages(*messages, sleep_time=1):
for msg in messages:
print(msg)
await sleep(sleep_time)
run_tasks(print_messages('foo', 'bar', 'baz'),
print_messages('aaa', 'bbb', 'ccc', sleep_time=0.7))
foo aaa bbb bar ccc baz
add_task
)class Loop:
def __init__(self):
self.tasks = []
def add_task(self, task):
if hasattr(task, '__await__'):
task = task.__await__()
self.tasks.append(task)
def run(self):
while self.tasks:
task = self.tasks.pop(0)
try:
next(task)
except StopIteration:
pass
else:
self.add_task(task)
class Loop:
[...]
def run_task(self, task):
self.add_task(task)
self.run()
loop = Loop()
loop.run_task(print_messages('foo', 'bar', 'baz'))
foo bar baz
Loop.current
pour rendre la boucle accessible depuis nos tâchesclass Loop:
[...]
current = None
def run(self):
Loop.current = self
while self.tasks:
task = self.tasks.pop(0)
try:
next(task)
except StopIteration:
pass
else:
self.add_task(task)
Implémentation de gather
, utilitaire permettant d'attendre simultanément plusieurs tâches
Amélioration de notre classe Waiter
pour attendre plusieurs validations
class Waiter:
def __init__(self, n=1):
self.i = n
def set(self):
self.i -= 1
def __await__(self):
while self.i > 0:
yield
gather
pour attendre N
tâchesasync def gather(*tasks):
waiter = Waiter(len(tasks))
async def task_wrapper(task):
await task
waiter.set()
for t in tasks:
Loop.current.add_task(task_wrapper(t))
await waiter
loop = Loop()
loop.run_task(gather(print_messages('foo', 'bar', 'baz'),
print_messages('aaa', 'bbb', 'ccc', sleep_time=0.7)))
foo aaa bbb bar ccc baz
select
pour savoir quand la socket est disponibleimport select
class AIOSocket:
def __init__(self, socket):
self.socket = socket
self.pollin = select.epoll()
self.pollin.register(self, select.EPOLLIN)
self.pollout = select.epoll()
self.pollout.register(self, select.EPOLLOUT)
def close(self):
self.socket.close()
def fileno(self):
return self.socket.fileno()
def __enter__(self):
return self
def __exit__(self, *args):
self.socket.close()
class AIOSocket:
[...]
async def bind(self, addr):
while not self.pollin.poll():
await interrupt()
self.socket.bind(addr)
async def listen(self):
while not self.pollin.poll():
await interrupt()
self.socket.listen()
async def connect(self, addr):
while not self.pollin.poll():
await interrupt()
self.socket.connect(addr)
class AIOSocket:
[...]
async def accept(self):
while not self.pollin.poll(0):
await interrupt()
client, _ = self.socket.accept()
return self.__class__(client)
async def recv(self, bufsize):
while not self.pollin.poll(0):
await interrupt()
return self.socket.recv(bufsize)
async def send(self, bytes):
while not self.pollout.poll(0):
await interrupt()
return self.socket.send(bytes)
import socket
def aiosocket(family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0, fileno=None):
return AIOSocket(socket.socket(family, type, proto, fileno))
async def server_coro():
with aiosocket() as server:
await server.bind(('localhost', 8080))
await server.listen()
with await server.accept() as client:
msg = await client.recv(1024)
print('Received from client', msg)
await client.send(msg[::-1])
async def client_coro():
with aiosocket() as client:
await client.connect(('localhost', 8080))
await client.send(b'Hello World!')
msg = await client.recv(1024)
print('Received from server', msg)
loop = Loop()
loop.run_task(gather(server_coro(), client_coro()))
Received from client b'Hello World!' Received from server b'!dlroW olleH'
sleep
est inefficaceWaiter
qui n'a besoin d'être lancée qu'une fois sa condition validéeasyncio
utilise un mécanisme de futures :async def test():
await asyncio.sleep(1)
loop = Loop()
loop.run_task(test())
--------------------------------------------------------------------------- RuntimeError Traceback (most recent call last) <ipython-input-39-c4e5f116b58d> in <module> 3 4 loop = Loop() ----> 5 loop.run_task(test()) <ipython-input-28-be3b5982fb63> in run_task(self, task) 4 def run_task(self, task): 5 self.add_task(task) ----> 6 self.run() <ipython-input-30-0d24482e31f8> in run(self) 9 task = self.tasks.pop(0) 10 try: ---> 11 next(task) 12 except StopIteration: 13 pass <ipython-input-39-c4e5f116b58d> in test() 1 async def test(): ----> 2 await asyncio.sleep(1) 3 4 loop = Loop() 5 loop.run_task(test()) /usr/lib64/python3.7/asyncio/tasks.py in sleep(delay, result, loop) 593 future, result) 594 try: --> 595 return await future 596 finally: 597 h.cancel() RuntimeError: await wasn't used with future
yield
utilisé pour rendre la main à la boucle peut être accompagné d'une valeurclass Future:
def __await__(self):
yield self
assert self.done
class Future:
def __init__(self):
self._done = False
self.task = None
def __await__(self):
yield self
assert self._done
def set(self):
self._done = True
if self.task is not None:
Loop.current.add_task(self.task)
class Loop:
[...]
def run(self):
Loop.current = self
while self.tasks:
task = self.tasks.pop(0)
try:
result = next(task)
except StopIteration:
continue
if isinstance(result, Future):
result.task = task
else:
self.tasks.append(task)
from functools import total_ordering
@total_ordering
class TimeEvent:
def __init__(self, t, future):
self.t = t
self.future = future
def __eq__(self, rhs):
return self.t == rhs.t
def __lt__(self, rhs):
return self.t < rhs
call_later
import heapq
class Loop:
[...]
handlers = []
def call_later(self, t, future):
heapq.heappush(self.handlers, TimeEvent(t, future))
class Loop:
[...]
def run(self):
Loop.current = self
while self.tasks or self.handlers:
if self.handlers and self.handlers[0].t <= time.time():
handler = heapq.heappop(self.handlers)
handler.future.set()
if not self.tasks:
continue
task = self.tasks.pop(0)
try:
result = next(task)
except StopIteration:
continue
if isinstance(result, Future):
result.task = task
else:
self.tasks.append(task)
sleep
import time
async def sleep(t):
future = Future()
Loop.current.call_later(time.time() + t, future)
await future
async def foo():
print('before')
await sleep(5)
print('after')
loop = Loop()
loop.run_task(foo())
before after
for
et with
asynchrones (async for
, async with
)__aiter__
renvoyant un itérateur asynchrone__anext__
renvoyant le prochain élémentStopAsyncIteration
en fin d'itérationrange
class ARange:
def __init__(self, stop):
self.stop = stop
def __aiter__(self):
return ARangeIterator(self)
class ARangeIterator:
def __init__(self, arange):
self.arange = arange
self.i = 0
async def __anext__(self):
if self.i >= self.arange.stop:
raise StopAsyncIteration
await sleep(1)
i = self.i
self.i += 1
return i
async def test_for():
async for val in ARange(5):
print(val)
loop = Loop()
loop.run_task(test_for())
0 1 2 3 4
async def arange(stop):
for i in range(stop):
await sleep(1)
yield i
__aenter__
et __aexit__
class Server:
def __init__(self, addr):
self.socket = aiosocket()
self.addr = addr
async def __aenter__(self):
await self.socket.bind(self.addr)
await self.socket.listen()
return self.socket
async def __aexit__(self, *args):
self.socket.close()
async def test_with():
async with Server(('localhost', 8080)) as server:
with await server.accept() as client:
msg = await client.recv(1024)
print('Received from client', msg)
await client.send(msg[::-1])
loop = Loop()
loop.run_task(gather(test_with(), client_coro()))
Received from client b'Hello World!' Received from server b'!dlroW olleH'
contextlib
(Python 3.7)from contextlib import asynccontextmanager
@asynccontextmanager
async def server(addr):
socket = aiosocket()
try:
await socket.bind(addr)
await socket.listen()
yield socket
finally:
socket.close()
asyncio
par ces exemples