tutorial make loop how español asyncio async python multithreading python-asyncio aiohttp

make - ¿Cómo combinar asyncio python con hilos?



process async python (2)

He construido con éxito un microservicio RESTful con Python asyncio y aiohttp que escucha un evento POST para recopilar eventos en tiempo real de varios alimentadores.

A continuación, crea una estructura en memoria para almacenar en caché las últimas 24 horas de eventos en una estructura antedicha defaultdict / deque.

Ahora me gustaría revisar periódicamente la estructura del disco, preferiblemente con pickle.

Dado que la estructura de la memoria puede ser> 100 MB, me gustaría evitar retener el procesamiento de eventos entrantes durante el tiempo que lleva controlar la estructura.

Prefiero crear una copia de instantánea (por ejemplo, copia profunda) de la estructura y luego tomarme mi tiempo para escribirla en el disco y repetirla en un intervalo de tiempo preestablecido.

He estado buscando ejemplos sobre cómo combinar subprocesos (¿y es un subproceso incluso la mejor solución para esto?) Y asyncio para ese propósito, pero no pude encontrar algo que me ayudara.

Cualquier puntero para empezar es muy apreciado!


También utilicé run_in_executor , pero encontré esta función un poco burda en la mayoría de las circunstancias, ya que requiere partial() para los argumentos de palabras clave y nunca la llamo con otro que no sea un ejecutor único y el bucle de eventos predeterminado. Así que hice una envoltura de conveniencia a su alrededor con valores predeterminados razonables y manejo automático de argumentos de palabras clave.

from time import sleep import asyncio as aio loop = aio.get_event_loop() class Executor: """In most cases, you can just use the ''execute'' instance as a function, i.e. y = await execute(f, a, b, k=c) => run f(a, b, k=c) in the executor, assign result to y. The defaults can be changed, though, with your own instantiation of Executor, i.e. execute = Executor(nthreads=4)""" def __init__(self, loop=loop, nthreads=1): from concurrent.futures import ThreadPoolExecutor self._ex = ThreadPoolExecutor(nthreads) self._loop = loop def __call__(self, f, *args, **kw): from functools import partial return self._loop.run_in_executor(self._ex, partial(f, *args, **kw)) execute = Executor() ... def cpu_bound_operation(t, alpha=30): sleep(t) return 20*alpha async def main(): y = await execute(cpu_bound_operation, 5, alpha=-2) loop.run_until_complete(main())


Es bastante simple delegar un método a un subproceso o subproceso utilizando BaseEventLoop.run_in_executor :

import asyncio import time from concurrent.futures import ProcessPoolExecutor def cpu_bound_operation(x): time.sleep(x) # This is some operation that is CPU-bound @asyncio.coroutine def main(): # Run cpu_bound_operation in the ProcessPoolExecutor # This will make your coroutine block, but won''t block # the event loop; other coroutines can run in meantime. yield from loop.run_in_executor(p, cpu_bound_operation, 5) loop = asyncio.get_event_loop() p = ProcessPoolExecutor(2) # Create a ProcessPool with 2 processes loop.run_until_complete(main())

En cuanto a si usar un ProcessPoolExecutor o ThreadPoolExecutor , es algo difícil de decir; El decapado de un objeto grande definitivamente consumirá algunos ciclos de CPU, lo que inicialmente pensaría que ProcessPoolExecutor es el camino a seguir. Sin embargo, pasar su objeto de 100 MB a un Process en el grupo requeriría escabullir la instancia en su proceso principal, enviar los bytes al proceso hijo a través de IPC, desentrañarlo en el niño, y luego volverlo a encerrar para que pueda escribirlo en el disco . Teniendo en cuenta eso, supongo que la sobrecarga de decapado / despeje será lo suficientemente grande como para que estés mejor usando un ThreadPoolExecutor , a pesar de que vas a tener un impacto en el rendimiento debido a la GIL.

Dicho esto, es muy sencillo probar ambas formas y descubrirlo con seguridad, por lo que también puede hacerlo.