python iterator async-await coroutine

Python: cómo ejecutar dos funciones "agregadas"(como suma) al mismo tiempo, alimentándolas desde el mismo iterador



iterator async-await (2)

Consideremos cómo aplicar dos funciones agregadas al mismo iterador, que solo podemos agotar una vez. El intento inicial (cuya sum códigos duros y el max para la brevedad, pero es trivialmente generalizable a un número arbitrario de funciones agregadas) podría tener este aspecto:

def max_and_sum_buffer(it): content = list(it) p = sum(content) m = max(content) return p, m

Esta implementación tiene el inconveniente de que almacena todos los elementos generados en la memoria a la vez, a pesar de que ambas funciones son perfectamente capaces de procesar el flujo. La pregunta anticipa esta exclusión y solicita explícitamente que se produzca el resultado sin amortiguar la salida del iterador. ¿Es posible hacer esto?

Ejecución en serie: itertools.tee

Ciertamente parece posible. Después de todo, los iteradores de Python son external , por lo que cada iterador ya es capaz de suspenderse. ¿Qué tan difícil puede ser proporcionar un adaptador que divide un iterador en dos nuevos iteradores que proporcionan el mismo contenido? De hecho, esta es exactamente la descripción de itertools.tee , que parece perfectamente adecuada para la iteración paralela:

def max_and_sum_tee(it): it1, it2 = itertools.tee(it) p = sum(it1) # XXX m = max(it2) return p, m

Lo anterior produce el resultado correcto, pero no funciona de la manera que nos gustaría. El problema es que no estamos iterando en paralelo. Las funciones agregadas como sum y max nunca se suspenden: cada una insiste en consumir todo el contenido del iterador antes de producir el resultado. Así que la sum lo it1 antes de que max haya tenido la oportunidad de correr. it1 elementos de it1 mientras lo deja solo, esto hará que esos elementos se acumulen dentro de un FIFO interno compartido entre los dos iteradores. Eso es inevitable aquí, ya que max(it2) debe ver los mismos elementos, tee no tiene más remedio que acumularlos. (Para más detalles interesantes sobre el tee , refiérase a esta publicación ) .

En otras palabras, no hay diferencia entre esta implementación y la primera, excepto que la primera al menos hace que el almacenamiento en búfer sea explícito. Para eliminar el almacenamiento en búfer, la sum y el max deben ejecutarse en paralelo, no uno tras otro.

Temas: concurrent.futures

Veamos qué sucede si ejecutamos las funciones agregadas en subprocesos separados, aún utilizando tee para duplicar el iterador original:

def max_and_sum_threads_simple(it): it1, it2 = itertools.tee(it) with concurrent.futures.ThreadPoolExecutor(2) as executor: sum_future = executor.submit(lambda: sum(it1)) max_future = executor.submit(lambda: max(it2)) return sum_future.result(), max_future.result()

Ahora, sum y max realmente se ejecutan en paralelo (tanto como lo permite la GIL ), los hilos están siendo gestionados por el excelente módulo concurrent.futures . Sin embargo, tiene un error fatal: para que el tee no almacene los datos, sum y max deben procesar sus artículos exactamente a la misma velocidad. Si uno es incluso un poco más rápido que el otro, se separarán y el tee amortiguará todos los elementos intermedios. Dado que no hay manera de predecir qué tan rápido se ejecutará cada uno, la cantidad de almacenamiento en búfer es impredecible y tiene el peor caso de almacenar todo en búfer.

Para asegurarse de que no se produzca el almacenamiento en búfer, se debe reemplazar tee con un generador personalizado que no almacene en búfer y bloquee hasta que todos los consumidores hayan observado el valor anterior antes de pasar al siguiente. Como antes, cada consumidor se ejecuta en su propio subproceso, pero ahora el subproceso de llamada está ocupado ejecutando un productor, un bucle que en realidad itera sobre el iterador de origen e indica que hay un nuevo valor disponible. Aquí hay una implementación:

def max_and_sum_threads(it): STOP = object() next_val = None consumed = threading.Barrier(2 + 1) # 2 consumers + 1 producer val_id = 0 got_val = threading.Condition() def send(val): nonlocal next_val, val_id consumed.wait() with got_val: next_val = val val_id += 1 got_val.notify_all() def produce(): for elem in it: send(elem) send(STOP) def consume(): last_val_id = -1 while True: consumed.wait() with got_val: got_val.wait_for(lambda: val_id != last_val_id) if next_val is STOP: return yield next_val last_val_id = val_id with concurrent.futures.ThreadPoolExecutor(2) as executor: sum_future = executor.submit(lambda: sum(consume())) max_future = executor.submit(lambda: max(consume())) produce() return sum_future.result(), max_future.result()

Esto es bastante una cantidad de código para algo tan simple conceptualmente, pero es necesario para el correcto funcionamiento.

produce() desplaza sobre el iterador externo y envía los artículos a los consumidores, un valor a la vez. Utiliza una barrier , una conveniente primitiva de sincronización agregada en Python 3.2, para esperar hasta que todos los consumidores next_val con el valor anterior antes de sobrescribirlo con el nuevo en next_val . Una vez que el nuevo valor está realmente listo, se emite una condition . consume() es un generador que transmite los valores producidos a medida que llegan, hasta la detección de STOP . El código se puede generalizar y ejecutar cualquier número de funciones agregadas en paralelo creando consumidores en un bucle y ajustando su número al crear la barrera.

El inconveniente de esta implementación es que requiere la creación de subprocesos (posiblemente aliviados al hacer que el conjunto de subprocesos sea global) y una gran cantidad de sincronización muy cuidadosa en cada paso de iteración. Esta sincronización destruye el rendimiento: esta versión es casi 2000 veces más lenta que la tee solo hilo y 475 veces más lenta que la versión simple pero no determinista.

Sin embargo, mientras se utilicen subprocesos, no se puede evitar la sincronización de alguna forma. Para eliminar completamente la sincronización, debemos abandonar los hilos y cambiar a la multitarea cooperativa. La pregunta es: ¿es posible suspender la ejecución de funciones síncronas ordinarias como sum y max para cambiar entre ellas?

Fibras: Greenlet

Resulta que el módulo de extensión de terceros de greenlet permite exactamente eso. Los Greenlets son una implementación de fibers , micro-hilos ligeros que cambian entre sí explícitamente. Esto es algo así como los generadores de Python, que usan el yield para suspender, excepto que los greenlets ofrecen un mecanismo de suspensión mucho más flexible, lo que permite a uno elegir a quién suspender.

Esto hace que sea bastante fácil portar la versión de max_and_sum a greenlets:

def max_and_sum_greenlet(it): STOP = object() consumers = None def send(val): for g in consumers: g.switch(val) def produce(): for elem in it: send(elem) send(STOP) def consume(): g_produce = greenlet.getcurrent().parent while True: val = g_produce.switch() if val is STOP: return yield val sum_result = [] max_result = [] gsum = greenlet.greenlet(lambda: sum_result.append(sum(consume()))) gsum.switch() gmax = greenlet.greenlet(lambda: max_result.append(max(consume()))) gmax.switch() consumers = (gsum, gmax) produce() return sum_result[0], max_result[0]

La lógica es la misma, pero con menos código. Como antes, produce produce valores recuperados del iterador de origen, pero su send no se preocupa por la sincronización, ya que no es necesario hacerlo cuando todo es de un solo hilo. En su lugar, cambia explícitamente a cada consumidor a su vez para hacer lo suyo, mientras que el consumidor cambia debidamente de vuelta. Después de pasar por todos los consumidores, el productor está listo para la próxima iteración.

Los resultados se recuperan utilizando una lista intermedia de un solo elemento porque greenlet no proporciona acceso al valor de retorno de la función de destino (y tampoco lo hace threading.Thread , por lo que optamos por concurrent.futures arriba).

Sin embargo, hay desventajas de usar greenlets. Primero, no vienen con la biblioteca estándar, necesita instalar la extensión greenlet. Entonces, greenlet es intrínsecamente no portátil porque el código de cambio de pila no es compatible con el sistema operativo y el compilador y puede considerarse algo así como un hack (aunque extremadamente inteligente ). Un Python dirigido a WebAssembly o JVM o GraalVM sería muy poco probable que soporte Greenlet. Este no es un tema apremiante, pero definitivamente es algo que se debe tener en cuenta a largo plazo.

Coroutines: Asyncio

A partir de Python 3.5, Python proporciona coroutines nativos. A diferencia de los greenlets, y similares a los generadores, las corrutinas son distintas de las funciones regulares y deben definirse mediante la async def . Coroutines no puede ejecutarse fácilmente desde un código síncrono, sino que debe ser procesado por un programador que los lleve a la finalización. El programador también se conoce como un bucle de eventos porque su otro trabajo es recibir eventos IO y pasarlos a devoluciones de llamada y corrutinas apropiadas. En la biblioteca estándar, esta es la función del módulo asyncio .

Antes de implementar un max_and_sum basado en max_and_sum , primero debemos resolver un obstáculo. A diferencia de greenlet, asyncio solo puede suspender la ejecución de coroutines, no de funciones arbitrarias. Así que necesitamos reemplazar sum y max con coroutines que hacen esencialmente lo mismo. Esto es tan simple como implementarlos de la manera obvia, solo reemplazando con async for , permitiendo al iterador asíncrono suspender la rutina mientras espera que llegue el siguiente valor:

async def asum(it): s = 0 async for elem in it: s += elem return s async def amax(it): NONE_YET = object() largest = NONE_YET async for elem in it: if largest is NONE_YET or elem > largest: largest = elem if largest is NONE_YET: raise ValueError("amax() arg is an empty sequence") return largest # or, using https://github.com/vxgmichel/aiostream # #from aiostream.stream import accumulate #def asum(it): # return accumulate(it, initializer=0) #def amax(it): # return accumulate(it, max)

Uno podría razonablemente preguntar si proporcionar un nuevo par de funciones agregadas es hacer trampa; después de todo, las soluciones anteriores fueron cuidadosas al usar sum existentes y max integradas. La respuesta dependerá de la interpretación exacta de la pregunta, pero yo diría que las nuevas funciones están permitidas porque no son en absoluto específicas para la tarea en cuestión. Hacen exactamente lo mismo que hacen los incorporados, pero consumen iteradores asíncronos. Sospecho que la única razón por la que tales funciones no existen en algún lugar de la biblioteca estándar se debe a que las rutinas y los iteradores asíncronos son una característica relativamente nueva.

Con eso fuera del camino, podemos proceder a escribir max_and_sum como coroutine:

async def max_and_sum_asyncio(it): loop = asyncio.get_event_loop() STOP = object() next_val = loop.create_future() consumed = loop.create_future() used_cnt = 2 # number of consumers async def produce(): for elem in it: next_val.set_result(elem) await consumed next_val.set_result(STOP) async def consume(): nonlocal next_val, consumed, used_cnt while True: val = await next_val if val is STOP: return yield val used_cnt -= 1 if not used_cnt: consumed.set_result(None) consumed = loop.create_future() next_val = loop.create_future() used_cnt = 2 else: await consumed s, m, _ = await asyncio.gather(asum(consume()), amax(consume()), produce()) return s, m

A pesar de que esta versión se basa en cambiar entre coroutines dentro de un solo hilo, al igual que el que usa greenlet, se ve diferente. asyncio no proporciona un cambio explícito de coroutines, basa el cambio de tareas en la primitiva await suspensión / reanudación. El objetivo de await puede ser otra guía, pero también un "futuro" abstracto, un marcador de posición de valor que se completará más adelante con alguna otra fuente. Una vez que el valor esperado está disponible, el bucle de eventos reanuda automáticamente la ejecución de la rutina, con la expresión de await evaluándose al valor proporcionado. Entonces, en lugar de produce cambios hacia los consumidores, se suspende a la espera de un futuro que llegará una vez que todos los consumidores hayan observado el valor producido.

consume() es un generador asíncrono , que es como un generador ordinario, excepto que crea un iterador asíncrono, que nuestras comunidades agregadas ya están preparadas para aceptar utilizando async for . El equivalente de un iterador asíncrono de __next__ se llama __anext__ y es una coroutine, permitiendo que la coroutine que agota el iterador async se suspenda mientras espera que llegue el nuevo valor. Cuando un generador asíncrono en ejecución se suspende en await , eso es observado por async for como una suspensión de la invocación implícita __anext__ . consume() hace exactamente eso cuando espera los valores proporcionados por el produce y, a medida que están disponibles, los transmite para agrupar coroutines como asum y amax . La espera se realiza utilizando el futuro next_val , que lleva el siguiente elemento desde it . La espera de ese futuro en el interior consume() suspende el generador asíncrono, y con ello el agregado de coroutine.

La ventaja de este enfoque en comparación con el cambio explícito de greenlets es que facilita mucho la combinación de coroutines que no se conocen entre sí en el mismo bucle de eventos. Por ejemplo, uno podría tener dos instancias de max_and_sum ejecutándose en paralelo (en el mismo hilo), o ejecutar una función agregada más compleja que invocó más código asíncrono para realizar cálculos.

La siguiente función de conveniencia muestra cómo ejecutar lo anterior desde un código que no es asyncio:

def max_and_sum_asyncio_sync(it): # trivially instantiate the coroutine and execute it in the # default event loop coro = max_and_sum_asyncio(it) return asyncio.get_event_loop().run_until_complete(coro)

Actuación

Medir y comparar el rendimiento de estos enfoques para la ejecución en paralelo puede ser engañoso porque sum y max casi no procesan, lo que sobrecarga la sobrecarga de la paralelización. Trátelos como trataría cualquier marca microbiológica, con un gran grano de sal. Dicho esto, ¡veamos los números de todos modos!

Las mediciones se produjeron utilizando Python 3.6. Las funciones se ejecutaron solo una vez y se dieron un range(10000) , su tiempo se midió restando time.time() antes y después de la ejecución. Aquí están los resultados:

  • max_and_sum_buffer y max_and_sum_tee : 0.66 ms: casi exactamente el mismo tiempo para ambos, con la versión tee un poco más rápida.

  • max_and_sum_threads_simple : 2.7 ms. Este tiempo significa muy poco debido al almacenamiento en búfer no determinístico, por lo que podría estar midiendo el tiempo para iniciar dos subprocesos y la sincronización realizada internamente por Python.

  • max_and_sum_threads : 1.29 segundos , con mucho, la opción más lenta, ~ 2000 veces más lenta que la más rápida. Este horrible resultado es probablemente causado por una combinación de las múltiples sincronizaciones realizadas en cada paso de la iteración y su interacción con la GIL.

  • max_and_sum_greenlet : 25.5 ms, lento en comparación con la versión inicial, pero mucho más rápido que la versión con hilos. Con una función agregada suficientemente compleja, uno puede imaginar el uso de esta versión en producción.

  • max_and_sum_asyncio : 351 ms, casi 14 veces más lento que la versión greenlet. Este es un resultado decepcionante debido a que Asyncio Coroutines es más liviano que los Greenlets, y el cambio entre ellos debería ser mucho más rápido que el cambio entre fibras. Es probable que la sobrecarga de ejecutar el planificador de rutina y el bucle de eventos (que en este caso es excesivo dado que el código no hace E / S) está destruyendo el rendimiento en este micro-punto de referencia.

  • max_and_sum_asyncio usando uvloop : 125 ms. Esto es más del doble de la velocidad de asyncio regular, pero sigue siendo casi 5 veces más lento que el de greenlet.

La ejecución de los ejemplos en PyPy no trae una aceleración significativa, de hecho, la mayoría de los ejemplos se ejecutan un poco más lentamente, incluso después de ejecutarlos varias veces para garantizar el calentamiento del JIT. La función asyncio requiere una rewrite para no usar generadores asíncronos (ya que PyPy a partir de esta escritura implementa Python 3.5), y se ejecuta en algo menos de 100 ms. Esto es comparable al rendimiento de CPython + uvloop, es decir, mejor, pero no espectacular en comparación con greenlet.

Imagine que tenemos un iterador, digamos iter(range(1, 1000)) . Y tenemos dos funciones, cada una de las cuales acepta un iterador como único parámetro, digamos sum() y max() . En el mundo SQL los llamaríamos funciones agregadas.

¿Hay alguna forma de obtener resultados de ambos sin amortiguar la salida del iterador?

Para hacerlo, tendríamos que pausar y reanudar la ejecución de la función agregada, para alimentar a ambos con los mismos valores sin almacenarlos. Tal vez hay una manera de expresarlo utilizando async cosas sin dormir?


Si se cumple para sus funciones agregadas que f(a,b,c,...) == f(a, f(b, f(c, ...))) , entonces simplemente podría alternar entre sus funciones y aliméntelos un elemento a la vez, combinándolos cada vez con el resultado de la aplicación anterior, como lo haría reduce , por ejemplo, de esta manera:

def aggregate(iterator, *functions): first = next(iterator) result = [first] * len(functions) for item in iterator: for i, f in enumerate(functions): result[i] = f((result[i], item)) return result

Esto es considerablemente más lento (alrededor de 10 a 20 veces) que solo materializando el iterador en una lista y aplicando la función agregada en la lista en su totalidad, o usando itertools.tee (que básicamente hace lo mismo, internamente), pero tiene El beneficio de no usar memoria adicional.

Tenga en cuenta, sin embargo, que mientras esto funciona bien para funciones como sum , min o max , no funciona para otras funciones de agregación, por ejemplo, encontrando la media o el elemento de la mediana de un iterador, como mean(a, b, c) != mean(a, mean(b, c)) . (Para la mean , por supuesto, puede obtener la sum y dividirla por el número de elementos, pero calcular, por ejemplo, la mediana tomando solo un elemento a la vez será más difícil).