programming make loop how asyncio async python python-3.x python-asyncio

python - make - ¿Cómo funciona realmente el asyncio?



python async api (4)

Esta pregunta está motivada por mi otra pregunta: ¿Cómo esperar en cdef?

Hay toneladas de artículos y publicaciones en el blog sobre asyncio , pero todos son muy superficiales. No pude encontrar ninguna información sobre cómo se implementa realmente asyncio y qué hace que la E / S sea asíncrona. Estaba tratando de leer el código fuente, pero son miles de líneas que no son del código C de grado más alto, muchas de las cuales tratan con objetos auxiliares, pero lo más importante es que es difícil establecer una conexión entre la sintaxis de Python y el código C que se traduciría. dentro.

La propia documentación de Asycnio es aún menos útil. No hay información sobre cómo funciona, solo algunas pautas sobre cómo usarlo, que a veces también son engañosas o están mal escritas.

Estoy familiarizado con la implementación de coroutines de Go, y tenía la esperanza de que Python hiciera lo mismo. Si ese fuera el caso, el código que encontré en la publicación vinculada anteriormente habría funcionado. Como no lo hizo, ahora estoy tratando de averiguar por qué. Mi mejor conjetura hasta ahora es la siguiente, corríjame en el lugar en el que estoy equivocado:

  1. Las definiciones de procedimiento de la forma async def foo(): ... realidad se interpretan como métodos de una clase hereditaria de coroutine .
  2. Tal vez, async def se divide realmente en múltiples métodos por las instrucciones de await , donde el objeto, en el que se llama a estos métodos, es capaz de realizar un seguimiento del progreso realizado a través de la ejecución hasta el momento.
  3. Si lo anterior es cierto, entonces, esencialmente, la ejecución de un coroutine se reduce a métodos de llamada del objeto de coroutine por algún administrador global (loop?).
  4. El administrador global es consciente de alguna manera (¿cómo?) Cuando las operaciones de E / S se realizan mediante el código Python (¿solo?) Y es capaz de elegir uno de los métodos actuales pendientes para ejecutar después de que el método de ejecución actual renuncie al control (toque en la await declaración).

En otras palabras, aquí está mi intento de "desugaring" de alguna sintaxis de asyncio en algo más comprensible:

async def coro(name): print(''before'', name) await asyncio.sleep() print(''after'', name) asyncio.gather(coro(''first''), coro(''second'')) # translated from async def coro(name) class Coro(coroutine): def before(self, name): print(''before'', name) def after(self, name): print(''after'', name) def __init__(self, name): self.name = name self.parts = self.before, self.after self.pos = 0 def __call__(): self.parts[self.pos](self.name) self.pos += 1 def done(self): return self.pos == len(self.parts) # translated from asyncio.gather() class AsyncIOManager: def gather(*coros): while not every(c.done() for c in coros): coro = random.choice(coros) coro()

Si mi suposición es correcta, entonces tengo un problema. ¿Cómo ocurre realmente la E / S en este escenario? ¿En un hilo aparte? ¿Se suspende todo el intérprete y la E / S ocurre fuera del intérprete? ¿Qué se entiende exactamente por E / S? Si mi procedimiento de Python se llama C open() y, a su vez, envía una interrupción al kernel, renunciando al control, ¿cómo sabe esto el intérprete de Python y puede continuar ejecutando otro código, mientras que el kernel hace el I / real? ¿O y hasta que despierte el procedimiento de Python que envió la interrupción originalmente? ¿Cómo puede el intérprete de Python, en principio, ser consciente de que esto está sucediendo?


¿Cómo funciona el asyncio?

Antes de responder a esta pregunta, necesitamos entender algunos términos básicos, omítalos si ya conoce alguno de ellos.

Generators

Los generadores son objetos que nos permiten suspender la ejecución de una función python. Los generadores curados por el usuario se implementan utilizando el yield palabras clave. Al crear una función normal que contiene la palabra clave de yield , convertimos esa función en un generador:

>>> def test(): ... yield 1 ... yield 2 ... >>> gen = test() >>> next(gen) 1 >>> next(gen) 2 >>> next(gen) Traceback (most recent call last): File "<stdin>", line 1, in <module> StopIteration

Como puede ver, al llamar a next() en el generador, el intérprete carga el marco de la prueba y devuelve el valor de yield . Si se vuelve a llamar a next() , el marco se carga de nuevo en la pila de intérpretes y continúa obteniendo otro valor.

La tercera vez que se llama a next() , nuestro generador se terminó y se lanzó StopIteration .

Comunicándose con un generador.

Una característica menos conocida de los generadores, es el hecho de que puede comunicarse con ellos mediante dos métodos: send() y throw() .

>>> def test(): ... val = yield 1 ... print(val) ... yield 2 ... yield 3 ... >>> gen = test() >>> next(gen) 1 >>> gen.send("abc") abc 2 >>> gen.throw(Exception()) Traceback (most recent call last): File "<stdin>", line 1, in <module> File "<stdin>", line 4, in test Exception

Al llamar a gen.send() , el valor se pasa como un valor de retorno de la palabra clave de yield .

gen.throw() por otro lado, permite lanzar Excepciones dentro de los generadores, con la excepción planteada en el mismo punto en que se llamó el yield .

Devolviendo valores desde generadores

Al StopIteration un valor de un generador, el valor se coloca dentro de la excepción StopIteration . Posteriormente, podemos recuperar el valor de la excepción y utilizarlo según nuestras necesidades.

>>> def test(): ... yield 1 ... return "abc" ... >>> gen = test() >>> next(gen) 1 >>> try: ... next(gen) ... except StopIteration as exc: ... print(exc.value) ... abc

He aquí, una nueva palabra clave: yield from

Python 3.4 vino con la adición de una nueva palabra clave: yield . Lo que esa palabra clave nos permite hacer es transmitir cualquier next() , send() y throw() en un generador anidado más interno. Si el generador interno devuelve un valor, también es el valor de retorno del yield from :

>>> def inner(): ... print((yield 2)) ... return 3 ... >>> def outer(): ... yield 1 ... val = yield from inner() ... print(val) ... yield 4 ... >>> gen = outer() >>> next(gen) 1 >>> next(gen) 2 >>> gen.send("abc") abc 3 4

Poniendolo todo junto

Al introducir el yield from la nueva palabra clave en Python 3.4, ahora pudimos crear generadores dentro de generadores que, al igual que un túnel, pasan los datos de un lado a otro desde los generadores más internos a los más externos. Esto ha generado un nuevo significado para los generadores: las coroutinas .

Coroutines son funciones que se pueden detener y reanudar mientras se ejecutan. En Python, se definen utilizando la palabra clave async def . Al igual que los generadores, ellos también usan su propia forma de yield from que está a la await . Antes de que se introdujeran async y await en Python 3.5, creamos coroutines exactamente de la misma manera que se crearon los generadores (con yield from lugar de await ).

async def inner(): return 1 async def outer(): await inner()

Como todos los iteradores o generadores que implementan el __iter__() , coroutines implementa __await__() que les permite continuar cada vez await coro se llama a await coro .

Hay un buen diagrama de secuencia dentro de los documentos de Python que deberías revisar.

En asyncio, aparte de las funciones de coroutine, tenemos 2 objetos importantes: tareas y futuros .

Futures

Los futuros son objetos que tienen el __await__() implementado, y su trabajo es mantener cierto estado y resultado. El estado puede ser uno de los siguientes:

  1. PENDIENTE: el futuro no tiene ningún resultado o excepción establecida.
  2. CANCELADO - el futuro fue cancelado usando fut.cancel()
  3. FINALIZADO: el futuro se terminó, ya sea por un conjunto de resultados utilizando fut.set_result() o por un conjunto de excepciones utilizando fut.set_exception()

El resultado, tal como lo ha adivinado, puede ser un objeto de Python, que se devolverá, o una excepción que puede ser provocada.

Otra característica importante de future objetos future es que contienen un método llamado add_done_callback() . Este método permite llamar a las funciones tan pronto como se realiza la tarea, ya sea que haya generado una excepción o haya finalizado.

Tasks

Los objetos de tareas son futuros especiales, que se envuelven alrededor de las coroutinas y se comunican con las coroutinas más internas y externas. Cada vez que un coroutine await un futuro, el futuro vuelve a la tarea (al igual que en el yield from ), y la tarea lo recibe.

A continuación, la tarea se une al futuro. Lo hace llamando a add_done_callback() en el futuro. De ahora en adelante, si se hará el futuro, ya sea cancelando, pasando una excepción o pasando un objeto Python como resultado, se llamará la devolución de llamada de la tarea y volverá a existir.

Asyncio

La pregunta final que debemos responder es: ¿cómo se implementa el IO?

Profundo dentro de asyncio, tenemos un bucle de eventos. Un evento de bucle de tareas. El trabajo del bucle de eventos es llamar a las tareas cada vez que estén listas y coordinar todo ese esfuerzo en una sola máquina de trabajo.

La parte IO del bucle de eventos se basa en una única función crucial llamada select . Select es una función de bloqueo, implementada por el sistema operativo que se encuentra debajo, que permite esperar en los sockets los datos entrantes o salientes. Al recibir los datos, se activa y devuelve los sockets que recibieron los datos o los sockets que están listos para escribir.

Cuando intenta recibir o enviar datos a través de un socket a través de asyncio, lo que realmente sucede a continuación es que el socket primero se comprueba si tiene algún dato que pueda leerse o enviarse inmediatamente. Si el .send() está lleno, o el .recv() está vacío, el socket se registra en la función de select (simplemente agregándolo a una de las listas, rlist para recv y wlist para send ) y el correspondiente La función await un nuevo objeto future creado, vinculado a ese zócalo.

Cuando todas las tareas disponibles están a la espera de futuros, el bucle de eventos se select y espera. Cuando el uno de los sockets tiene datos entrantes, o se ha agotado el búfer de send , asyncio verifica el objeto futuro vinculado a ese socket y lo configura en listo.

Ahora toda la magia pasa. El futuro está listo, la tarea que se agregó antes con add_done_callback() vuelve a la vida, y llama a .send() en la rutina que reanuda la más interna (debido a la cadena de await ) y lees los datos recién recibidos de un búfer cercano fueron derramados.

Método de la cadena de nuevo, en caso de recv() :

  1. select.select espera.
  2. Se devuelve un socket listo, con datos.
  3. Los datos del socket se mueven a un búfer.
  4. future.set_result() .
  5. La tarea que se agregó con add_done_callback() ahora está add_done_callback() .
  6. La tarea llama a .send() en la coroutine que va hasta la coroutine más interna y la despierta.
  7. Los datos se leen desde el búfer y se devuelven a nuestro humilde usuario.

En resumen, asyncio utiliza las capacidades del generador, que permiten pausar y reanudar funciones. Utiliza el yield from capacidades que permiten pasar datos de un lado a otro desde el generador más interno hasta el más externo. Utiliza todos esos para detener la ejecución de la función mientras espera que se complete la E / S (mediante la función de select del sistema operativo).

¿Y lo mejor de todo? Mientras una de las funciones está en pausa, otra puede ejecutarse e intercalarse con el delicado tejido, que es asincio.


Hablar de async/await asyncio y asyncio no es lo mismo. La primera es una construcción fundamental de bajo nivel (coroutines), mientras que la última es una biblioteca que utiliza estas construcciones. Por el contrario, no hay una única respuesta final.

La siguiente es una descripción general de cómo funcionan las bibliotecas async/await asyncio y asyncio - asyncio . Es decir, puede haber otros trucos en la parte superior (hay ...) pero son intrascendentes a menos que los construyas tú mismo. La diferencia debe ser despreciable a menos que ya sepa lo suficiente como para no tener que hacer una pregunta de este tipo.

1. Coroutines versus subrutinas en una cáscara de nuez

Al igual que las subrutinas (funciones, procedimientos, ...), coroutines (generadores, ...) son una abstracción de la pila de llamadas y el puntero de instrucción: hay una pila de piezas de código en ejecución, y cada una está en una instrucción específica.

La distinción de def contra async def es meramente por claridad. La diferencia real es el return versus el yield . A partir de esto, await o yield from la diferencia de llamadas individuales a pilas completas.

1.1. Subrutinas

Una subrutina representa un nuevo nivel de pila para contener variables locales y un recorrido único de sus instrucciones para llegar a su fin. Considere una subrutina como esta:

def subfoo(bar): qux = 3 return qux * bar

Cuando lo ejecutas, eso significa

  1. asignar espacio de pila para bar y qux
  2. Ejecutar recursivamente la primera instrucción y saltar a la siguiente instrucción
  3. Una vez en un return , empuje su valor a la pila que llama
  4. Borrar la pila (1.) y el puntero de instrucciones (2.)

Notablemente, 4. significa que una subrutina siempre comienza en el mismo estado. Todo lo exclusivo de la función en sí se pierde al finalizar. Una función no se puede reanudar, incluso si hay instrucciones después de la return .

root -/ : /- subfoo --/ :/--<---return --/ | V

1.2. Coroutines como subrutinas persistentes.

Una coroutina es como una subrutina, pero puede salir sin destruir su estado. Considera una coroutina como esta:

def cofoo(bar): qux = yield bar # yield marks a break point return qux

Cuando lo ejecutas, eso significa

  1. asignar espacio de pila para bar y qux
  2. Ejecutar recursivamente la primera instrucción y saltar a la siguiente instrucción
    1. una vez en un yield , empuje su valor a la pila de llamada pero almacene la pila y el puntero de instrucción
    2. una vez que llama al yield , restaure la pila y el puntero de instrucción y presione los argumentos a qux
  3. Una vez en un return , empuje su valor a la pila que llama
  4. Borrar la pila (1.) y el puntero de instrucciones (2.)

Tenga en cuenta la adición de 2.1 y 2.2: una coroutina se puede suspender y reanudar en puntos predefinidos. Esto es similar a cómo se suspende una subrutina al llamar a otra subrutina. La diferencia es que la coroutina activa no está estrictamente vinculada a su pila de llamadas. En su lugar, una coroutina suspendida es parte de una pila separada y aislada.

root -/ : /- cofoo --/ :/--<+--yield --/ | : V :

Esto significa que las coroutinas suspendidas pueden almacenarse libremente o moverse entre pilas. Cualquier pila de llamadas que tenga acceso a una rutina puede decidir reanudarla.

1.3. Atravesando la pila de llamadas

Hasta ahora, nuestra coroutine solo baja la pila de llamadas con yield . Una subrutina puede subir y bajar la pila de llamadas con return y () . Para completar, las rutinas también necesitan un mecanismo para subir la pila de llamadas. Considera una coroutina como esta:

def wrap(): yield ''before'' yield from cofoo() yield ''after''

Cuando lo ejecutas, eso significa que todavía asigna la pila y el puntero de instrucciones como una subrutina. Cuando se suspende, todavía es como almacenar una subrutina.

Sin embargo, el yield from ambos hace. Suspende la pila y el puntero de instrucción de la wrap y ejecuta el cofoo . Tenga en cuenta que la wrap permanece suspendida hasta que el cofoo termine por completo. Cada vez que cofoo suspende o se envía algo, cofoo se conecta directamente a la pila de llamadas.

1.4. Coroutines hasta el fondo.

Según lo establecido, el yield from permite conectar dos ámbitos a través de otro intermedio. Cuando se aplica recursivamente, eso significa que la parte superior de la pila se puede conectar a la parte inferior de la pila.

root -/ : /-> coro_a -yield-from-> coro_b --/ :/ <-+------------------------yield ---/ | : :/ --+-- coro_a.send----------yield ---/ : coro_b <-/

Tenga en cuenta que root y coro_b no se conocen entre sí. Esto hace que las coroutinas sean mucho más limpias que las devoluciones de llamada: las coroutinas aún se basan en una relación 1: 1 como las subrutinas. Coroutines suspende y reanuda toda su pila de ejecución existente hasta un punto de llamada regular.

Cabe destacar que la root podría tener un número arbitrario de coroutines para reanudar. Sin embargo, nunca se puede reanudar más de uno al mismo tiempo. ¡Los coroutines de la misma raíz son concurrentes pero no paralelos!

1.5. Python async y await

Hasta ahora, la explicación ha utilizado explícitamente el yield y el yield from vocabulario de los generadores: la funcionalidad subyacente es la misma. La nueva sintaxis Python3.5 async y await existe principalmente para mayor claridad.

def foo(): # subroutine? return None def foo(): # coroutine? yield from foofoo() # generator? coroutine? async def foo(): # coroutine! await foofoo() # coroutine! return None

Se necesita async for y async with sentencias porque se rompería el yield from/await cadena yield from/await con las afirmaciones simples for y with .

2. Anatomía de un evento simple.

Por sí misma, una coroutine no tiene ningún concepto de ceder el control a otra coroutine. Solo puede ceder el control a la persona que llama en la parte inferior de una pila de coroutine. Esta persona que llama puede cambiar a otra computadora central y ejecutarla.

Este nodo raíz de varias coroutinas es comúnmente un bucle de eventos : en suspensión, una corutina produce un evento en el que desea reanudar. A su vez, el bucle de eventos es capaz de esperar eficientemente a que estos eventos ocurran. Esto le permite decidir qué rutina ejecutará a continuación, o cómo esperar antes de reanudar.

Tal diseño implica que hay un conjunto de eventos predefinidos que el bucle comprende. Varios coroutines se await unos a otros, hasta que finalmente se await un evento. Este evento puede comunicarse directamente con el bucle de eventos mediante el control.

loop -/ : /-> coroutine --await--> event --/ :/ <-+----------------------- yield --/ | : | : # loop waits for event to happen | : :/ --+-- send(reply) -------- yield --/ : coroutine <--yield-- event <-/

La clave es que la suspensión correcta permite que el bucle de eventos y los eventos se comuniquen directamente. La pila intermedia de coroutine no requiere ningún conocimiento sobre qué bucle lo está ejecutando, ni cómo funcionan los eventos.

2.1.1. Eventos en el tiempo

El evento más simple de manejar es alcanzar un punto en el tiempo. Este es también un bloque fundamental de código roscado: un hilo sleep repetidamente hasta que se cumple una condición. Sin embargo, una ejecución regular bloquea la ejecución por sí misma: queremos que otras coroutinas no se bloqueen. En su lugar, queremos decirle al ciclo de eventos cuándo debe reanudar la pila actual de coroutine.

2.1.2. Definiendo un evento

Un evento es simplemente un valor que podemos identificar, ya sea a través de una enumeración, un tipo u otra identidad. Podemos definir esto con una clase simple que almacena nuestro tiempo objetivo. Además de almacenar la información del evento, podemos permitirle await una clase directamente.

class AsyncSleep: """Event to sleep until a point in time""" def __init__(self, until: float): self.until = until # used whenever someone ``await``s an instance of this Event def __await__(self): # yield this Event to the loop yield self def __repr__(self): return ''%s(until=%.1f)'' % (self.__class__.__name__, self.until)

Esta clase solo almacena el evento, no dice cómo manejarlo realmente.

La única característica especial es __await__ : es lo que busca la palabra clave __await__ . Prácticamente, es un iterador pero no está disponible para la maquinaria de iteración regular.

2.2.1. A la espera de un evento

Ahora que tenemos un evento, ¿cómo reaccionan los coroutines? Deberíamos poder expresar el equivalente de sleep await nuestro evento. Para ver mejor lo que está pasando, esperamos dos veces durante la mitad del tiempo:

import time async def asleep(duration: float): """await that ``duration`` seconds pass""" await AsyncSleep(time.time() + duration / 2) await AsyncSleep(time.time() + duration / 2)

Podemos instanciar y ejecutar directamente esta coroutine. Similar a un generador, el uso de coroutine.send ejecuta la coroutine hasta que yield un resultado.

coroutine = asleep(100) while True: print(coroutine.send(None)) time.sleep(0.1)

Esto nos da dos eventos AsyncSleep y luego una StopIteration cuando se realiza la rutina. Tenga en cuenta que el único retraso es de time.sleep en el bucle! Cada AsyncSleep solo almacena un desplazamiento de la hora actual.

2.2.2. Evento + sueño

En este punto, tenemos dos mecanismos separados a nuestra disposición:

  • AsyncSleep Eventos que pueden producirse desde el interior de una coroutine.
  • time.sleep que puede esperar sin impactar a las coroutinas.

Cabe destacar que estos dos son ortogonales: ninguno afecta o activa al otro. Como resultado, podemos idear nuestra propia estrategia para sleep para enfrentar el retraso de un AsyncSleep .

2.3. Un bucle de eventos ingenuo.

Si tenemos varias coroutinas, cada una nos puede decir cuándo quiere que nos despierten. Entonces podemos esperar hasta que el primero de ellos quiera reanudarse, luego el siguiente, y así sucesivamente. Cabe destacar que en cada punto solo nos importa cuál es el siguiente .

Esto hace que para una programación sencilla:

  1. Clasificar coroutines por su tiempo de despertar deseado
  2. Elige el primero que quiera despertar.
  3. espera hasta este punto en el tiempo
  4. corre esta coroutine
  5. repetir desde 1.

Una implementación trivial no necesita conceptos avanzados. Una list permite ordenar coroutines por fecha. La espera es un tiempo regular. time.sleep . Ejecutar coroutines funciona igual que antes con coroutine.send .

def run(*coroutines): """Cooperatively run all ``coroutines`` until completion""" # store wake-up-time and coroutines waiting = [(0, coroutine) for coroutine in coroutines] while waiting: # 2. pick the first coroutine that wants to wake up until, coroutine = waiting.pop(0) # 3. wait until this point in time time.sleep(max(0.0, until - time.time())) # 4. run this coroutine try: command = coroutine.send(None) except StopIteration: continue # 1. sort coroutines by their desired suspension if isinstance(command, AsyncSleep): waiting.append((command.until, coroutine)) waiting.sort(key=lambda item: item[0])

Por supuesto, esto tiene un amplio margen de mejora. Podemos usar un montón para la cola de espera o una tabla de despacho para eventos. También podríamos obtener valores de retorno del StopIteration y asignarlos a la rutina. Sin embargo, el principio fundamental sigue siendo el mismo.

2.4. Espera cooperativa

El evento AsyncSleep y el evento run loop son una implementación completamente funcional de eventos cronometrados.

async def sleepy(identifier: str = "coroutine", count=5): for i in range(count): print(identifier, ''step'', i + 1, ''at %.2f'' % time.time()) await asleep(0.1) run(*(sleepy("coroutine %d" % j) for j in range(5)))

Esto cambia de manera cooperativa entre cada una de las cinco coroutinas, suspendiendo cada una por 0.1 segundos. Aunque el bucle de eventos es síncrono, sigue ejecutando el trabajo en 0,5 segundos en lugar de 2,5 segundos. Cada coroutine mantiene el estado y actúa independientemente.

3. Bucle de eventos de E / S

Un bucle de eventos que admite la sleep es adecuado para el sondeo . Sin embargo, la espera de E / S en un identificador de archivo se puede hacer de manera más eficiente: el sistema operativo implementa E / S y, por lo tanto, sabe qué controladores están listos. Idealmente, un bucle de eventos debería admitir un evento explícito "listo para E / S".

3.1. La llamada select

Python ya tiene una interfaz para consultar el sistema operativo para leer los identificadores de E / S. Cuando se le llama con identificadores para leer o escribir, devuelve los manejadores listos para leer o escribir:

readable, writeable, _ = select.select(rlist, wlist, xlist, timeout)

Por ejemplo, podemos open un archivo para escribir y esperar a que esté listo:

write_target = open(''/tmp/foo'') readable, writeable, _ = select.select([], [write_target], [])

Una vez que seleccione devoluciones, writeable contiene nuestro archivo abierto.

3.2. Evento básico de E / S

De manera similar a la solicitud de AsyncSleep , necesitamos definir un evento para E / S. Con la lógica de select subyacente, el evento debe referirse a un objeto legible, digamos un archivo open . Además, almacenamos la cantidad de datos para leer.

class AsyncRead: def __init__(self, file, amount=1): self.file = file self.amount = amount self._buffer = '''' def __await__(self): while len(self._buffer) < self.amount: yield self # we only get here if ``read`` should not block self._buffer += self.file.read(1) return self._buffer def __repr__(self): return ''%s(file=%s, amount=%d, progress=%d)'' % ( self.__class__.__name__, self.file, self.amount, len(self._buffer) )

Al igual que con AsyncSleep , principalmente almacenamos los datos necesarios para la llamada al sistema subyacente. Esta vez, __await__ se puede reanudar varias veces, hasta que se haya leído nuestra amount deseada. Además, return el resultado de E / S en lugar de simplemente reanudarlo.

3.3. Aumentar un bucle de eventos con E / S de lectura

La base de nuestro bucle de eventos sigue siendo la run definida anteriormente. Primero, necesitamos rastrear las solicitudes de lectura. Esto ya no es un programa ordenado, solo mapeamos las solicitudes de lectura a las rutinarias.

# new waiting_read = {} # type: Dict[file, coroutine]

Como select.select toma un parámetro de tiempo de espera, podemos usarlo en lugar de time.sleep .

# old time.sleep(max(0.0, until - time.time())) # new readable, _, _ = select.select(list(reads), [], [])

Esto nos da todos los archivos legibles - si hay alguno, ejecutamos la correspondiente coroutine. Si no hay ninguno, hemos esperado el tiempo suficiente para que se ejecute nuestra rutina actual.

# new - reschedule waiting coroutine, run readable coroutine if readable: waiting.append((until, coroutine)) waiting.sort() coroutine = waiting_read[readable[0]]

Finalmente, tenemos que escuchar las solicitudes de lectura.

# new if isinstance(command, AsyncSleep): ... elif isinstance(command, AsyncRead): ...

3.4. Poniendo todo junto

Lo anterior fue un poco de una simplificación. Necesitamos hacer algunos cambios para no morir de hambre en las coroutinas si siempre podemos leer. Necesitamos manejar tener nada que leer o nada que esperar. Sin embargo, el resultado final todavía se ajusta a 30 LOC.

def run(*coroutines): """Cooperatively run all ``coroutines`` until completion""" waiting_read = {} # type: Dict[file, coroutine] waiting = [(0, coroutine) for coroutine in coroutines] while waiting or waiting_read: # 2. wait until the next coroutine may run or read ... try: until, coroutine = waiting.pop(0) except IndexError: until, coroutine = float(''inf''), None readable, _, _ = select.select(list(waiting_read), [], []) else: readable, _, _ = select.select(list(waiting_read), [], [], max(0.0, until - time.time())) # ... and select the appropriate one if readable and time.time() < until: if until and coroutine: waiting.append((until, coroutine)) waiting.sort() coroutine = waiting_read.pop(readable[0]) # 3. run this coroutine try: command = coroutine.send(None) except StopIteration: continue # 1. sort coroutines by their desired suspension ... if isinstance(command, AsyncSleep): waiting.append((command.until, coroutine)) waiting.sort(key=lambda item: item[0]) # ... or register reads elif isinstance(command, AsyncRead): waiting_read[command.file] = coroutine

3.5. Cooperativa de E / S

AsyncSleep , AsyncRead y las implementaciones de ejecución ahora son completamente funcionales para dormir y / o leer. Igual que para sleepy , podemos definir un ayudante para probar la lectura:

async def ready(path, amount=1024*32): print(''read'', path, ''at'', ''%d'' % time.time()) with open(path, ''rb'') as file: result = return await AsyncRead(file, amount) print(''done'', path, ''at'', ''%d'' % time.time()) print(''got'', len(result), ''B'') run(sleepy(''background'', 5), ready(''/dev/urandom''))

Al ejecutar esto, podemos ver que nuestra E / S está intercalada con la tarea en espera:

id background round 1 read /dev/urandom at 1530721148 id background round 2 id background round 3 id background round 4 id background round 5 done /dev/urandom at 1530721148 got 1024 B

4. E / S sin bloqueo

Si bien I / O en los archivos entiende el concepto, no es realmente adecuado para una biblioteca como asyncio : la llamada de select siempre vuelve para los archivos , y tanto la open como la read pueden bloquearse indefinidamente . Esto bloquea todas las secuencias de un ciclo de eventos, lo cual es malo. Las bibliotecas como aiofiles utilizan subprocesos y sincronización para falsificar E / S sin bloqueo y eventos en el archivo.

Sin embargo, los sockets permiten la E / S sin bloqueo, y su latencia inherente lo hace mucho más crítico. Cuando se usa en un bucle de eventos, la espera de datos y el reintento se pueden ajustar sin bloquear nada.

4.1. Evento de I / O sin bloqueo

Similar a nuestro AsyncRead , podemos definir un evento de suspender y leer para sockets. En lugar de tomar un archivo, tomamos un socket, que no debe ser bloqueado. Además, nuestro __await__ usa socket.recv lugar de file.read .

class AsyncRecv: def __init__(self, connection, amount=1, read_buffer=1024): assert not connection.getblocking(), ''connection must be non-blocking for async recv'' self.connection = connection self.amount = amount self.read_buffer = read_buffer self._buffer = b'''' def __await__(self): while len(self._buffer) < self.amount: try: self._buffer += self.connection.recv(self.read_buffer) except BlockingIOError: yield self return self._buffer def __repr__(self): return ''%s(file=%s, amount=%d, progress=%d)'' % ( self.__class__.__name__, self.connection, self.amount, len(self._buffer) )

A diferencia de AsyncRead , __await__ realiza una E / S realmente sin bloqueo. Cuando los datos están disponibles, siempre se lee. Cuando no hay datos disponibles, siempre se suspende. Eso significa que el bucle de eventos solo se bloquea mientras realizamos un trabajo útil.

4.2. Desbloqueo del bucle de eventos

En lo que respecta al bucle de eventos, nada cambia mucho. El evento a escuchar sigue siendo el mismo que para los archivos: un descriptor de archivos marcado como listo para select .

# old elif isinstance(command, AsyncRead): waiting_read[command.file] = coroutine # new elif isinstance(command, AsyncRead): waiting_read[command.file] = coroutine elif isinstance(command, AsyncRecv): waiting_read[command.connection] = coroutine

En este punto, debería ser obvio que AsyncRead y AsyncRecv son el mismo tipo de evento. Podríamos fácilmente refactorizarlos para que sean un evento con un componente de E / S intercambiable. En efecto, el bucle de eventos, las guías y los eventos separan claramente un planificador, un código intermedio arbitrario y la E / S real.

4.3. El lado feo de la E / S sin bloqueo

En principio, lo que debe hacer en este punto es replicar la lógica de read como un recv para AsyncRecv . Sin embargo, esto es mucho más feo ahora: tienes que manejar los rendimientos anticipados cuando las funciones se bloquean dentro del kernel, pero el control de rendimiento te lo permite. Por ejemplo, abrir una conexión en lugar de abrir un archivo es mucho más largo:

# file file = open(path, ''rb'') # non-blocking socket connection = socket.socket() connection.setblocking(False) # open without blocking - retry on failure try: connection.connect((url, port)) except BlockingIOError: pass

En pocas palabras, lo que queda son unas pocas docenas de líneas de manejo de Excepciones. Los eventos y el bucle de eventos ya funcionan en este punto.

id background round 1 read localhost:25000 at 1530783569 read /dev/urandom at 1530783569 done localhost:25000 at 1530783569 got 32768 B id background round 2 id background round 3 id background round 4 done /dev/urandom at 1530783569 got 4096 B id background round 5

Apéndice

Código de ejemplo en github


Todo se reduce a los dos desafíos principales que asyncio está abordando:

  • ¿Cómo realizar múltiples E / S en un solo hilo?
  • ¿Cómo implementar la multitarea cooperativa?

La respuesta al primer punto ha existido durante mucho tiempo y se llama un bucle de selección . En Python, se implementa en el módulo de selectores .

La segunda pregunta está relacionada con el concepto de coroutine , es decir, funciones que pueden detener su ejecución y ser restauradas más adelante. En Python, las coroutinas se implementan utilizando Generators y el rendimiento de la declaración. Eso es lo que se esconde detrás de la sintaxis async / await .

Más recursos en esta answer .

EDIT: Dirigiendo su comentario sobre goroutines:

El equivalente más cercano a un goroutine en asyncio no es realmente una coroutine sino una tarea (vea la diferencia en la documentation ). En Python, una coroutine (o un generador) no sabe nada sobre los conceptos de bucle de eventos o E / S. Simplemente es una función que puede detener su ejecución yieldmientras se mantiene su estado actual, por lo que puede restaurarse más adelante. La yield fromsintaxis permite encadenarlos de forma transparente.

Ahora, dentro de una tarea asíntica, la corutina en el extremo más bajo de la cadena siempre termina dando un future . Este futuro luego burbujea hasta el bucle de eventos y se integra en la maquinaria interna. Cuando el futuro está configurado para ser realizado por alguna otra devolución de llamada interna, el bucle de eventos puede restaurar la tarea enviando el futuro nuevamente a la cadena básica.

EDITAR: abordar algunas de las preguntas en su post:

¿Cómo ocurre realmente la E / S en este escenario? ¿En un hilo aparte? ¿Se suspende todo el intérprete y la E / S ocurre fuera del intérprete?

No, no pasa nada en un hilo. La E / S siempre se administra mediante el bucle de eventos, principalmente a través de descriptores de archivos. Sin embargo, el registro de esos descriptores de archivos suele estar oculto por las coroutinas de alto nivel, lo que hace el trabajo sucio para usted.

¿Qué se entiende exactamente por E / S? Si mi procedimiento de Python se llama C open () y, a su vez, envía una interrupción al kernel, renunciando al control, ¿cómo sabe esto el intérprete de Python y puede continuar ejecutando otro código, mientras que el kernel hace el I / ¿O y hasta que despierte el procedimiento de Python que envió la interrupción originalmente? ¿Cómo puede el intérprete de Python, en principio, ser consciente de que esto está sucediendo?

Una E / S es cualquier llamada de bloqueo. En asyncio, todas las operaciones de E / S deben pasar por el bucle de eventos, porque, como usted dijo, el bucle de eventos no tiene forma de saber que se está realizando una llamada de bloqueo en algún código síncrono. Eso significa que no debes usar un síncrono openen el contexto de una coroutine. En su lugar, use una biblioteca dedicada como aiofiles que proporciona una versión asíncrona de open.


Su corodesugaring es conceptualmente correcta, pero un poco incompleta.

awaitno se suspende incondicionalmente, pero solo si se encuentra con una llamada de bloqueo. ¿Cómo saber que una llamada está bloqueando? Esto se decide por el código que se espera. Por ejemplo, una implementación esperada de la lectura de sockets podría desugarse para:

def read(sock, n): # sock must be in non-blocking mode try: return sock.recv(n) except EWOULDBLOCK: event_loop.add_reader(sock.fileno, current_task()) return SUSPEND

En asyncio real, el código equivalente modifica el estado de a en Futurelugar de devolver valores mágicos, pero el concepto es el mismo. Cuando se adapta adecuadamente a un objeto similar a un generador, el código anterior puede ser awaited.

En el lado de la persona que llama, cuando su coroutine contiene:

data = await read(sock, 1024)

Se desgasifica en algo cercano a:

data = read(sock, 1024) if data is SUSPEND: return SUSPEND self.pos += 1 self.parts[self.pos](...)

Las personas familiarizadas con los generadores tienden a describir lo anterior en términos de lo yield fromque hace la suspensión automáticamente.

La cadena de suspensión continúa hasta el bucle de eventos, que advierte que la coroutine está suspendida, la elimina del conjunto ejecutable y continúa ejecutando las corrutinas ejecutables, si las hubiera. Si no se pueden ejecutar coroutines, el bucle espera select()hasta que un descriptor de archivos en el que esté interesada una coroutine esté listo para IO. (El bucle de eventos mantiene una asignación de descriptor de archivo a correlación).

En el ejemplo anterior, una vez que select()el ciclo del evento sockes legible, se volverá a agregar coroal conjunto ejecutable, por lo que se continuará desde el punto de suspensión.

En otras palabras:

  1. Todo sucede en el mismo hilo por defecto.

  2. El bucle de eventos es responsable de programar las llamadas rutinarias y despertarlas cuando esté listo lo que estaban esperando (normalmente, una llamada IO que normalmente se bloquearía o se agotaría el tiempo de espera).

Para obtener información sobre los bucles de eventos de conducción de coroutine, recomiendo esta charla de Dave Beazley, donde demuestra la codificación de un bucle de eventos desde cero frente a la audiencia en vivo.