make - ¿Cómo combinar asyncio python con hilos?
process async python (2)
He construido con éxito un microservicio RESTful con Python asyncio y aiohttp que escucha un evento POST para recopilar eventos en tiempo real de varios alimentadores.
A continuación, crea una estructura en memoria para almacenar en caché las últimas 24 horas de eventos en una estructura antedicha defaultdict / deque.
Ahora me gustaría revisar periódicamente la estructura del disco, preferiblemente con pickle.
Dado que la estructura de la memoria puede ser> 100 MB, me gustaría evitar retener el procesamiento de eventos entrantes durante el tiempo que lleva controlar la estructura.
Prefiero crear una copia de instantánea (por ejemplo, copia profunda) de la estructura y luego tomarme mi tiempo para escribirla en el disco y repetirla en un intervalo de tiempo preestablecido.
He estado buscando ejemplos sobre cómo combinar subprocesos (¿y es un subproceso incluso la mejor solución para esto?) Y asyncio para ese propósito, pero no pude encontrar algo que me ayudara.
Cualquier puntero para empezar es muy apreciado!
También utilicé run_in_executor
, pero encontré esta función un poco burda en la mayoría de las circunstancias, ya que requiere partial()
para los argumentos de palabras clave y nunca la llamo con otro que no sea un ejecutor único y el bucle de eventos predeterminado. Así que hice una envoltura de conveniencia a su alrededor con valores predeterminados razonables y manejo automático de argumentos de palabras clave.
from time import sleep
import asyncio as aio
loop = aio.get_event_loop()
class Executor:
"""In most cases, you can just use the ''execute'' instance as a
function, i.e. y = await execute(f, a, b, k=c) => run f(a, b, k=c) in
the executor, assign result to y. The defaults can be changed, though,
with your own instantiation of Executor, i.e. execute =
Executor(nthreads=4)"""
def __init__(self, loop=loop, nthreads=1):
from concurrent.futures import ThreadPoolExecutor
self._ex = ThreadPoolExecutor(nthreads)
self._loop = loop
def __call__(self, f, *args, **kw):
from functools import partial
return self._loop.run_in_executor(self._ex, partial(f, *args, **kw))
execute = Executor()
...
def cpu_bound_operation(t, alpha=30):
sleep(t)
return 20*alpha
async def main():
y = await execute(cpu_bound_operation, 5, alpha=-2)
loop.run_until_complete(main())
Es bastante simple delegar un método a un subproceso o subproceso utilizando BaseEventLoop.run_in_executor
:
import asyncio
import time
from concurrent.futures import ProcessPoolExecutor
def cpu_bound_operation(x):
time.sleep(x) # This is some operation that is CPU-bound
@asyncio.coroutine
def main():
# Run cpu_bound_operation in the ProcessPoolExecutor
# This will make your coroutine block, but won''t block
# the event loop; other coroutines can run in meantime.
yield from loop.run_in_executor(p, cpu_bound_operation, 5)
loop = asyncio.get_event_loop()
p = ProcessPoolExecutor(2) # Create a ProcessPool with 2 processes
loop.run_until_complete(main())
En cuanto a si usar un ProcessPoolExecutor
o ThreadPoolExecutor
, es algo difícil de decir; El decapado de un objeto grande definitivamente consumirá algunos ciclos de CPU, lo que inicialmente pensaría que ProcessPoolExecutor
es el camino a seguir. Sin embargo, pasar su objeto de 100 MB a un Process
en el grupo requeriría escabullir la instancia en su proceso principal, enviar los bytes al proceso hijo a través de IPC, desentrañarlo en el niño, y luego volverlo a encerrar para que pueda escribirlo en el disco . Teniendo en cuenta eso, supongo que la sobrecarga de decapado / despeje será lo suficientemente grande como para que estés mejor usando un ThreadPoolExecutor
, a pesar de que vas a tener un impacto en el rendimiento debido a la GIL.
Dicho esto, es muy sencillo probar ambas formas y descubrirlo con seguridad, por lo que también puede hacerlo.