procesos multitarea ejemplos crear concurrentes con python multiprocessing tornado python-multithreading

ejemplos - multitarea en python



¿Cómo realizar mejor el multiprocesamiento dentro de las solicitudes con el servidor Tornado de Python? (3)

Estoy utilizando el servidor de Python sin bloqueo de E / S Tornado. Tengo una clase de solicitudes GET que pueden tardar una cantidad de tiempo significativa en completarse (piense en el rango de 5 a 10 segundos). El problema es que el Tornado bloquea estas solicitudes para que las solicitudes rápidas subsiguientes se retengan hasta que se complete la solicitud lenta.

Miré: https://github.com/facebook/tornado/wiki/Threading-and-concurrency y llegué a la conclusión de que quería una combinación de # 3 (otros procesos) y # 4 (otras hebras). El # 4 solo tuvo problemas y no pude obtener un control confiable de vuelta al ioloop cuando había otro subproceso haciendo "heavy_lifting". (Supongo que esto se debió a la GIL y al hecho de que la tarea heavy_lifting tiene una alta carga de CPU y sigue alejando el control del ioloop principal, pero eso es una suposición).

Así que he estado haciendo un prototipo de cómo resolver esto haciendo tareas de "levantamiento pesado" dentro de estas solicitudes GET lentas en un proceso separado y luego devolví una llamada al Tornado ioloop cuando el proceso se realiza para finalizar la solicitud. Esto libera el ioloop para manejar otras solicitudes.

He creado un ejemplo simple que demuestra una posible solución, pero tengo curiosidad por obtener comentarios de la comunidad sobre ella.

Mi pregunta es doble: ¿Cómo se puede simplificar este enfoque actual? ¿Qué trampas potencialmente existen con él?

El enfoque

  1. Utilice el decorador asynchronous integrado de Tornado que permite que una solicitud permanezca abierta y que el ioloop continúe.

  2. Genere un proceso separado para las tareas de "trabajo pesado" utilizando el módulo de multiprocessing de python. Primero intenté usar el módulo de threading , pero no pude devolver el control al ioloop. También parece que el mutliprocessing también aprovecharía los multinúcleos.

  3. Inicie un subproceso de "observador" en el proceso ioloop principal usando el módulo de threading , cuya tarea es observar un multiprocessing.Queue Cita los resultados de la tarea de "levantamiento pesado" cuando finaliza. Esto fue necesario porque necesitaba una manera de saber que la tarea de levantamiento pesado se había completado al mismo tiempo que podía notificar al ioloop que esta solicitud había finalizado.

  4. Asegúrese de que el hilo del "observador" renuncie al control del ioloop principal a menudo con las time.sleep(0) para que otras solicitudes continúen siendo procesadas fácilmente.

  5. Cuando haya un resultado en la cola, a continuación, agregue una devolución de llamada desde el subproceso del "observador" utilizando tornado.ioloop.IOLoop.instance().add_callback() que está documentada como la única forma segura de llamar a instancias de ioloop desde otros subprocesos.

  6. Asegúrese de llamar a finish() en la devolución de llamada para completar la solicitud y entregar una respuesta.

A continuación se muestra un código de ejemplo que muestra este enfoque. multi_tornado.py es el servidor que implementa el esquema anterior y call_multi.py es un script de ejemplo que llama al servidor de dos maneras diferentes para probar el servidor. Ambas pruebas llaman al servidor con 3 solicitudes GET lentas seguidas por 20 solicitudes GET rápidas. Los resultados se muestran tanto para la ejecución con como sin el subproceso activado.

En el caso de ejecutarlo sin "subprocesos", el bloque de 3 solicitudes lentas (cada una demora un poco más en completarse). Algunas de las 20 solicitudes rápidas se reparten entre algunas de las solicitudes lentas dentro del ioloop (no estoy totalmente seguro de cómo ocurre eso, pero podría ser un artefacto de que estoy ejecutando el script de prueba del servidor y del cliente en la misma máquina). El punto aquí es que todas las solicitudes rápidas se llevan a cabo en diversos grados.

En el caso de ejecutarlo con subprocesos habilitados, las 20 solicitudes rápidas se completan primero inmediatamente y las tres solicitudes lentas se completan casi al mismo tiempo después, ya que cada una se ha ejecutado en paralelo. Este es el comportamiento deseado. Las tres solicitudes lentas tardan 2,5 segundos en completarse en paralelo, mientras que en el caso de las tres solicitudes lentas tardan aproximadamente 3,5 segundos en total. Así que hay un 35% de aceleración general (supongo que debido a la compartición multinúcleo). Pero, lo que es más importante, las solicitudes rápidas se manejaron de inmediato en relación con las lentas.

No tengo mucha experiencia con la programación multiproceso, así que mientras esto parece funcionar aquí, tengo curiosidad por aprender:

¿Hay una manera más sencilla de lograr esto? ¿Qué monstruo puede estar al acecho en este enfoque?

(Nota: una solución de compromiso futura puede ser simplemente ejecutar más instancias de Tornado con un proxy inverso como nginx haciendo balanceo de carga. No importa lo que ejecutaré varias instancias con un equilibrador de carga, pero me preocupa simplemente lanzar hardware a este problema ya que parece que el hardware está tan directamente acoplado al problema en términos del bloqueo.)

Código de muestra

multi_tornado.py (servidor de muestra):

import time import threading import multiprocessing import math from tornado.web import RequestHandler, Application, asynchronous from tornado.ioloop import IOLoop # run in some other process - put result in q def heavy_lifting(q): t0 = time.time() for k in range(2000): math.factorial(k) t = time.time() q.put(t - t0) # report time to compute in queue class FastHandler(RequestHandler): def get(self): res = ''fast result '' + self.get_argument(''id'') print res self.write(res) self.flush() class MultiThreadedHandler(RequestHandler): # Note: This handler can be called with threaded = True or False def initialize(self, threaded=True): self._threaded = threaded self._q = multiprocessing.Queue() def start_process(self, worker, callback): # method to start process and watcher thread self._callback = callback if self._threaded: # launch process multiprocessing.Process(target=worker, args=(self._q,)).start() # start watching for process to finish threading.Thread(target=self._watcher).start() else: # threaded = False just call directly and block worker(self._q) self._watcher() def _watcher(self): # watches the queue for process result while self._q.empty(): time.sleep(0) # relinquish control if not ready # put callback back into the ioloop so we can finish request response = self._q.get(False) IOLoop.instance().add_callback(lambda: self._callback(response)) class SlowHandler(MultiThreadedHandler): @asynchronous def get(self): # start a thread to watch for self.start_process(heavy_lifting, self._on_response) def _on_response(self, delta): _id = self.get_argument(''id'') res = ''slow result {} <--- {:0.3f} s''.format(_id, delta) print res self.write(res) self.flush() self.finish() # be sure to finish request application = Application([ (r"/fast", FastHandler), (r"/slow", SlowHandler, dict(threaded=False)), (r"/slow_threaded", SlowHandler, dict(threaded=True)), ]) if __name__ == "__main__": application.listen(8888) IOLoop.instance().start()

call_multi.py (cliente tester):

import sys from tornado.ioloop import IOLoop from tornado import httpclient def run(slow): def show_response(res): print res.body # make 3 "slow" requests on server requests = [] for k in xrange(3): uri = ''http://localhost:8888/{}?id={}'' requests.append(uri.format(slow, str(k + 1))) # followed by 20 "fast" requests for k in xrange(20): uri = ''http://localhost:8888/fast?id={}'' requests.append(uri.format(k + 1)) # show results as they return http_client = httpclient.AsyncHTTPClient() print ''Scheduling Get Requests:'' print ''------------------------'' for req in requests: print req http_client.fetch(req, show_response) # execute requests on server print ''/nStart sending requests....'' IOLoop.instance().start() if __name__ == ''__main__'': scenario = sys.argv[1] if scenario == ''slow'' or scenario == ''slow_threaded'': run(scenario)

Resultados de la prueba

Al ejecutar python call_multi.py slow (el comportamiento de bloqueo):

Scheduling Get Requests: ------------------------ http://localhost:8888/slow?id=1 http://localhost:8888/slow?id=2 http://localhost:8888/slow?id=3 http://localhost:8888/fast?id=1 http://localhost:8888/fast?id=2 http://localhost:8888/fast?id=3 http://localhost:8888/fast?id=4 http://localhost:8888/fast?id=5 http://localhost:8888/fast?id=6 http://localhost:8888/fast?id=7 http://localhost:8888/fast?id=8 http://localhost:8888/fast?id=9 http://localhost:8888/fast?id=10 http://localhost:8888/fast?id=11 http://localhost:8888/fast?id=12 http://localhost:8888/fast?id=13 http://localhost:8888/fast?id=14 http://localhost:8888/fast?id=15 http://localhost:8888/fast?id=16 http://localhost:8888/fast?id=17 http://localhost:8888/fast?id=18 http://localhost:8888/fast?id=19 http://localhost:8888/fast?id=20 Start sending requests.... slow result 1 <--- 1.338 s fast result 1 fast result 2 fast result 3 fast result 4 fast result 5 fast result 6 fast result 7 slow result 2 <--- 1.169 s slow result 3 <--- 1.130 s fast result 8 fast result 9 fast result 10 fast result 11 fast result 13 fast result 12 fast result 14 fast result 15 fast result 16 fast result 18 fast result 17 fast result 19 fast result 20

Al ejecutar python call_multi.py slow_threaded (el comportamiento deseado):

Scheduling Get Requests: ------------------------ http://localhost:8888/slow_threaded?id=1 http://localhost:8888/slow_threaded?id=2 http://localhost:8888/slow_threaded?id=3 http://localhost:8888/fast?id=1 http://localhost:8888/fast?id=2 http://localhost:8888/fast?id=3 http://localhost:8888/fast?id=4 http://localhost:8888/fast?id=5 http://localhost:8888/fast?id=6 http://localhost:8888/fast?id=7 http://localhost:8888/fast?id=8 http://localhost:8888/fast?id=9 http://localhost:8888/fast?id=10 http://localhost:8888/fast?id=11 http://localhost:8888/fast?id=12 http://localhost:8888/fast?id=13 http://localhost:8888/fast?id=14 http://localhost:8888/fast?id=15 http://localhost:8888/fast?id=16 http://localhost:8888/fast?id=17 http://localhost:8888/fast?id=18 http://localhost:8888/fast?id=19 http://localhost:8888/fast?id=20 Start sending requests.... fast result 1 fast result 2 fast result 3 fast result 4 fast result 5 fast result 6 fast result 7 fast result 8 fast result 9 fast result 10 fast result 11 fast result 12 fast result 13 fast result 14 fast result 15 fast result 19 fast result 20 fast result 17 fast result 16 fast result 18 slow result 2 <--- 2.485 s slow result 3 <--- 2.491 s slow result 1 <--- 2.517 s


Si está dispuesto a usar concurrent.futures.ProcessPoolExecutor lugar de multiprocessing , esto es realmente muy simple. El ioloop de Tornado ya admite concurrent.futures.Future , por lo que jugarán muy bien juntos fuera de la caja. concurrent.futures está incluido en Python 3.2+, y ha sido portado a Python 2.x.

Aquí hay un ejemplo:

import time from concurrent.futures import ProcessPoolExecutor from tornado.ioloop import IOLoop from tornado import gen def f(a, b, c, blah=None): print "got %s %s %s and %s" % (a, b, c, blah) time.sleep(5) return "hey there" @gen.coroutine def test_it(): pool = ProcessPoolExecutor(max_workers=1) fut = pool.submit(f, 1, 2, 3, blah="ok") # This returns a concurrent.futures.Future print("running it asynchronously") ret = yield fut print("it returned %s" % ret) pool.shutdown() IOLoop.instance().run_sync(test_it)

Salida:

running it asynchronously got 1 2 3 and ok it returned hey there

ProcessPoolExecutor tiene una API más limitada que multiprocessing.Pool , pero si no necesita las funciones más avanzadas de multiprocessing.Pool , vale la pena utilizarla porque la integración es mucho más sencilla.


Si sus solicitudes de obtención tardan tanto, entonces el tornado es el marco incorrecto.

Le sugiero que use nginx para enrutar los torneos rápidos al tornado y los más lentos a un servidor diferente.

PeterBe tiene un artículo interesante en el que ejecuta múltiples servidores Tornado y establece que uno de ellos es "el más lento" para manejar las solicitudes de larga duración.


multiprocessing.Pool se puede integrar en el bucle de E / S de tornado , pero es un poco desordenado. Se puede hacer una integración mucho más limpia usando concurrent.futures (vea mi otra respuesta para más detalles), pero si está atascado en Python 2.xy no puede instalar el backport concurrent.futures , aquí le explicamos cómo puede hacerlo estrictamente. utilizando multiprocessing :

Los métodos multiprocessing.Pool.apply_async y multiprocessing.Pool.map_async tienen un parámetro de callback opcional, lo que significa que ambos pueden potencialmente conectarse a un tornado.gen.Task . Entonces, en la mayoría de los casos, ejecutar el código de forma asíncrona en un subproceso es tan simple como esto:

import multiprocessing import contextlib from tornado import gen from tornado.gen import Return from tornado.ioloop import IOLoop from functools import partial def worker(): print "async work here" @gen.coroutine def async_run(func, *args, **kwargs): result = yield gen.Task(pool.apply_async, func, args, kwargs) raise Return(result) if __name__ == "__main__": pool = multiprocessing.Pool(multiprocessing.cpu_count()) func = partial(async_run, worker) IOLoop().run_sync(func)

Como mencioné, esto funciona bien en la mayoría de los casos. Pero si worker() lanza una excepción, la callback nunca se llama, lo que significa que la gen.Task termina, y se cuelga para siempre. Ahora, si sabe que su trabajo nunca arrojará una excepción (porque envolvió todo en un try / except , por ejemplo), puede utilizar este enfoque felizmente. Sin embargo, si desea dejar que las excepciones se escapen de su trabajador, la única solución que encontré fue subclasificar algunos componentes de multiprocesamiento y hacer que devuelvan la callback incluso si el subproceso de trabajo generó una excepción:

from multiprocessing.pool import ApplyResult, Pool, RUN import multiprocessing class TornadoApplyResult(ApplyResult): def _set(self, i, obj): self._success, self._value = obj if self._callback: self._callback(self._value) self._cond.acquire() try: self._ready = True self._cond.notify() finally: self._cond.release() del self._cache[self._job] class TornadoPool(Pool): def apply_async(self, func, args=(), kwds={}, callback=None): '''''' Asynchronous equivalent of `apply()` builtin This version will call `callback` even if an exception is raised by `func`. '''''' assert self._state == RUN result = TornadoApplyResult(self._cache, callback) self._taskqueue.put(([(result._job, None, func, args, kwds)], None)) return result ... if __name__ == "__main__": pool = TornadoPool(multiprocessing.cpu_count()) ...

Con estos cambios, el objeto de excepción será devuelto por gen.Task , en lugar de gen.Task cuelga indefinidamente. También actualicé mi método async_run para volver a generar la excepción cuando se devolvió, e hice algunos otros cambios para proporcionar mejores rastreos para las excepciones lanzadas en los subprocesos de trabajo. Aquí está el código completo:

import multiprocessing from multiprocessing.pool import Pool, ApplyResult, RUN from functools import wraps import tornado.web from tornado.ioloop import IOLoop from tornado.gen import Return from tornado import gen class WrapException(Exception): def __init__(self): exc_type, exc_value, exc_tb = sys.exc_info() self.exception = exc_value self.formatted = ''''.join(traceback.format_exception(exc_type, exc_value, exc_tb)) def __str__(self): return ''/n%s/nOriginal traceback:/n%s'' % (Exception.__str__(self), self.formatted) class TornadoApplyResult(ApplyResult): def _set(self, i, obj): self._success, self._value = obj if self._callback: self._callback(self._value) self._cond.acquire() try: self._ready = True self._cond.notify() finally: self._cond.release() del self._cache[self._job] class TornadoPool(Pool): def apply_async(self, func, args=(), kwds={}, callback=None): '''''' Asynchronous equivalent of `apply()` builtin This version will call `callback` even if an exception is raised by `func`. '''''' assert self._state == RUN result = TornadoApplyResult(self._cache, callback) self._taskqueue.put(([(result._job, None, func, args, kwds)], None)) return result @gen.coroutine def async_run(func, *args, **kwargs): """ Runs the given function in a subprocess. This wraps the given function in a gen.Task and runs it in a multiprocessing.Pool. It is meant to be used as a Tornado co-routine. Note that if func returns an Exception (or an Exception sub-class), this function will raise the Exception, rather than return it. """ result = yield gen.Task(pool.apply_async, func, args, kwargs) if isinstance(result, Exception): raise result raise Return(result) def handle_exceptions(func): """ Raise a WrapException so we get a more meaningful traceback""" @wraps(func) def inner(*args, **kwargs): try: return func(*args, **kwargs) except Exception: raise WrapException() return inner # Test worker functions @handle_exceptions def test2(x): raise Exception("eeee") @handle_exceptions def test(x): print x time.sleep(2) return "done" class TestHandler(tornado.web.RequestHandler): @gen.coroutine def get(self): try: result = yield async_run(test, "inside get") self.write("%s/n" % result) result = yield async_run(test2, "hi2") except Exception as e: print("caught exception in get") self.write("Caught an exception: %s" % e) finally: self.finish() app = tornado.web.Application([ (r"/test", TestHandler), ]) if __name__ == "__main__": pool = TornadoPool(4) app.listen(8888) IOLoop.instance().start()

Así es como se comporta para el cliente:

dan@dan:~$ curl localhost:8888/test done Caught an exception: Original traceback: Traceback (most recent call last): File "./mutli.py", line 123, in inner return func(*args, **kwargs) File "./mutli.py", line 131, in test2 raise Exception("eeee") Exception: eeee

Y si envío dos solicitudes de curvatura simultáneas, podemos ver que se manejan de forma asíncrona en el lado del servidor:

dan@dan:~$ ./mutli.py inside get inside get caught exception inside get caught exception inside get

Editar:

Tenga en cuenta que este código se vuelve más sencillo con Python 3, porque introduce un argumento de palabra clave error_callback en todos multiprocessing.Pool métodos de multiprocessing.Pool asíncrono.Pool. Esto hace que sea mucho más fácil de integrar con Tornado:

class TornadoPool(Pool): def apply_async(self, func, args=(), kwds={}, callback=None): '''''' Asynchronous equivalent of `apply()` builtin This version will call `callback` even if an exception is raised by `func`. '''''' super().apply_async(func, args, kwds, callback=callback, error_callback=callback) @gen.coroutine def async_run(func, *args, **kwargs): """ Runs the given function in a subprocess. This wraps the given function in a gen.Task and runs it in a multiprocessing.Pool. It is meant to be used as a Tornado co-routine. Note that if func returns an Exception (or an Exception sub-class), this function will raise the Exception, rather than return it. """ result = yield gen.Task(pool.apply_async, func, args, kwargs) raise Return(result)

Todo lo que debemos hacer en nuestro apply_async invalidado es llamar al padre con el argumento de palabra clave error_callback , además del kwarg de callback . No hay necesidad de anular ApplyResult .

Podemos llegar a ser más sofisticados usando una MetaClass en nuestro TornadoPool , para permitir que sus métodos *_async sean llamados directamente como si fueran coroutines:

import time from functools import wraps from multiprocessing.pool import Pool import tornado.web from tornado import gen from tornado.gen import Return from tornado import stack_context from tornado.ioloop import IOLoop from tornado.concurrent import Future def _argument_adapter(callback): def wrapper(*args, **kwargs): if kwargs or len(args) > 1: callback(Arguments(args, kwargs)) elif args: callback(args[0]) else: callback(None) return wrapper def PoolTask(func, *args, **kwargs): """ Task function for use with multiprocessing.Pool methods. This is very similar to tornado.gen.Task, except it sets the error_callback kwarg in addition to the callback kwarg. This way exceptions raised in pool worker methods get raised in the parent when the Task is yielded from. """ future = Future() def handle_exception(typ, value, tb): if future.done(): return False future.set_exc_info((typ, value, tb)) return True def set_result(result): if future.done(): return if isinstance(result, Exception): future.set_exception(result) else: future.set_result(result) with stack_context.ExceptionStackContext(handle_exception): cb = _argument_adapter(set_result) func(*args, callback=cb, error_callback=cb) return future def coro_runner(func): """ Wraps the given func in a PoolTask and returns it. """ @wraps(func) def wrapper(*args, **kwargs): return PoolTask(func, *args, **kwargs) return wrapper class MetaPool(type): """ Wrap all *_async methods in Pool with coro_runner. """ def __new__(cls, clsname, bases, dct): pdct = bases[0].__dict__ for attr in pdct: if attr.endswith("async") and not attr.startswith(''_''): setattr(bases[0], attr, coro_runner(pdct[attr])) return super().__new__(cls, clsname, bases, dct) class TornadoPool(Pool, metaclass=MetaPool): pass # Test worker functions def test2(x): print("hi2") raise Exception("eeee") def test(x): print(x) time.sleep(2) return "done" class TestHandler(tornado.web.RequestHandler): @gen.coroutine def get(self): try: result = yield pool.apply_async(test, ("inside get",)) self.write("%s/n" % result) result = yield pool.apply_async(test2, ("hi2",)) self.write("%s/n" % result) except Exception as e: print("caught exception in get") self.write("Caught an exception: %s" % e) raise finally: self.finish() app = tornado.web.Application([ (r"/test", TestHandler), ]) if __name__ == "__main__": pool = TornadoPool() app.listen(8888) IOLoop.instance().start()