ejemplos - multitarea en python
¿Cómo realizar mejor el multiprocesamiento dentro de las solicitudes con el servidor Tornado de Python? (3)
Estoy utilizando el servidor de Python sin bloqueo de E / S Tornado. Tengo una clase de solicitudes GET
que pueden tardar una cantidad de tiempo significativa en completarse (piense en el rango de 5 a 10 segundos). El problema es que el Tornado bloquea estas solicitudes para que las solicitudes rápidas subsiguientes se retengan hasta que se complete la solicitud lenta.
Miré: https://github.com/facebook/tornado/wiki/Threading-and-concurrency y llegué a la conclusión de que quería una combinación de # 3 (otros procesos) y # 4 (otras hebras). El # 4 solo tuvo problemas y no pude obtener un control confiable de vuelta al ioloop cuando había otro subproceso haciendo "heavy_lifting". (Supongo que esto se debió a la GIL y al hecho de que la tarea heavy_lifting tiene una alta carga de CPU y sigue alejando el control del ioloop principal, pero eso es una suposición).
Así que he estado haciendo un prototipo de cómo resolver esto haciendo tareas de "levantamiento pesado" dentro de estas solicitudes GET
lentas en un proceso separado y luego devolví una llamada al Tornado ioloop cuando el proceso se realiza para finalizar la solicitud. Esto libera el ioloop para manejar otras solicitudes.
He creado un ejemplo simple que demuestra una posible solución, pero tengo curiosidad por obtener comentarios de la comunidad sobre ella.
Mi pregunta es doble: ¿Cómo se puede simplificar este enfoque actual? ¿Qué trampas potencialmente existen con él?
El enfoque
Utilice el decorador
asynchronous
integrado de Tornado que permite que una solicitud permanezca abierta y que el ioloop continúe.Genere un proceso separado para las tareas de "trabajo pesado" utilizando el módulo de
multiprocessing
de python. Primero intenté usar el módulo dethreading
, pero no pude devolver el control al ioloop. También parece que elmutliprocessing
también aprovecharía los multinúcleos.Inicie un subproceso de "observador" en el proceso ioloop principal usando el módulo de
threading
, cuya tarea es observar unmultiprocessing.Queue
Cita los resultados de la tarea de "levantamiento pesado" cuando finaliza. Esto fue necesario porque necesitaba una manera de saber que la tarea de levantamiento pesado se había completado al mismo tiempo que podía notificar al ioloop que esta solicitud había finalizado.Asegúrese de que el hilo del "observador" renuncie al control del ioloop principal a menudo con las
time.sleep(0)
para que otras solicitudes continúen siendo procesadas fácilmente.Cuando haya un resultado en la cola, a continuación, agregue una devolución de llamada desde el subproceso del "observador" utilizando
tornado.ioloop.IOLoop.instance().add_callback()
que está documentada como la única forma segura de llamar a instancias de ioloop desde otros subprocesos.Asegúrese de llamar a
finish()
en la devolución de llamada para completar la solicitud y entregar una respuesta.
A continuación se muestra un código de ejemplo que muestra este enfoque. multi_tornado.py
es el servidor que implementa el esquema anterior y call_multi.py
es un script de ejemplo que llama al servidor de dos maneras diferentes para probar el servidor. Ambas pruebas llaman al servidor con 3 solicitudes GET
lentas seguidas por 20 solicitudes GET
rápidas. Los resultados se muestran tanto para la ejecución con como sin el subproceso activado.
En el caso de ejecutarlo sin "subprocesos", el bloque de 3 solicitudes lentas (cada una demora un poco más en completarse). Algunas de las 20 solicitudes rápidas se reparten entre algunas de las solicitudes lentas dentro del ioloop (no estoy totalmente seguro de cómo ocurre eso, pero podría ser un artefacto de que estoy ejecutando el script de prueba del servidor y del cliente en la misma máquina). El punto aquí es que todas las solicitudes rápidas se llevan a cabo en diversos grados.
En el caso de ejecutarlo con subprocesos habilitados, las 20 solicitudes rápidas se completan primero inmediatamente y las tres solicitudes lentas se completan casi al mismo tiempo después, ya que cada una se ha ejecutado en paralelo. Este es el comportamiento deseado. Las tres solicitudes lentas tardan 2,5 segundos en completarse en paralelo, mientras que en el caso de las tres solicitudes lentas tardan aproximadamente 3,5 segundos en total. Así que hay un 35% de aceleración general (supongo que debido a la compartición multinúcleo). Pero, lo que es más importante, las solicitudes rápidas se manejaron de inmediato en relación con las lentas.
No tengo mucha experiencia con la programación multiproceso, así que mientras esto parece funcionar aquí, tengo curiosidad por aprender:
¿Hay una manera más sencilla de lograr esto? ¿Qué monstruo puede estar al acecho en este enfoque?
(Nota: una solución de compromiso futura puede ser simplemente ejecutar más instancias de Tornado con un proxy inverso como nginx haciendo balanceo de carga. No importa lo que ejecutaré varias instancias con un equilibrador de carga, pero me preocupa simplemente lanzar hardware a este problema ya que parece que el hardware está tan directamente acoplado al problema en términos del bloqueo.)
Código de muestra
multi_tornado.py
(servidor de muestra):
import time
import threading
import multiprocessing
import math
from tornado.web import RequestHandler, Application, asynchronous
from tornado.ioloop import IOLoop
# run in some other process - put result in q
def heavy_lifting(q):
t0 = time.time()
for k in range(2000):
math.factorial(k)
t = time.time()
q.put(t - t0) # report time to compute in queue
class FastHandler(RequestHandler):
def get(self):
res = ''fast result '' + self.get_argument(''id'')
print res
self.write(res)
self.flush()
class MultiThreadedHandler(RequestHandler):
# Note: This handler can be called with threaded = True or False
def initialize(self, threaded=True):
self._threaded = threaded
self._q = multiprocessing.Queue()
def start_process(self, worker, callback):
# method to start process and watcher thread
self._callback = callback
if self._threaded:
# launch process
multiprocessing.Process(target=worker, args=(self._q,)).start()
# start watching for process to finish
threading.Thread(target=self._watcher).start()
else:
# threaded = False just call directly and block
worker(self._q)
self._watcher()
def _watcher(self):
# watches the queue for process result
while self._q.empty():
time.sleep(0) # relinquish control if not ready
# put callback back into the ioloop so we can finish request
response = self._q.get(False)
IOLoop.instance().add_callback(lambda: self._callback(response))
class SlowHandler(MultiThreadedHandler):
@asynchronous
def get(self):
# start a thread to watch for
self.start_process(heavy_lifting, self._on_response)
def _on_response(self, delta):
_id = self.get_argument(''id'')
res = ''slow result {} <--- {:0.3f} s''.format(_id, delta)
print res
self.write(res)
self.flush()
self.finish() # be sure to finish request
application = Application([
(r"/fast", FastHandler),
(r"/slow", SlowHandler, dict(threaded=False)),
(r"/slow_threaded", SlowHandler, dict(threaded=True)),
])
if __name__ == "__main__":
application.listen(8888)
IOLoop.instance().start()
call_multi.py
(cliente tester):
import sys
from tornado.ioloop import IOLoop
from tornado import httpclient
def run(slow):
def show_response(res):
print res.body
# make 3 "slow" requests on server
requests = []
for k in xrange(3):
uri = ''http://localhost:8888/{}?id={}''
requests.append(uri.format(slow, str(k + 1)))
# followed by 20 "fast" requests
for k in xrange(20):
uri = ''http://localhost:8888/fast?id={}''
requests.append(uri.format(k + 1))
# show results as they return
http_client = httpclient.AsyncHTTPClient()
print ''Scheduling Get Requests:''
print ''------------------------''
for req in requests:
print req
http_client.fetch(req, show_response)
# execute requests on server
print ''/nStart sending requests....''
IOLoop.instance().start()
if __name__ == ''__main__'':
scenario = sys.argv[1]
if scenario == ''slow'' or scenario == ''slow_threaded'':
run(scenario)
Resultados de la prueba
Al ejecutar python call_multi.py slow
(el comportamiento de bloqueo):
Scheduling Get Requests:
------------------------
http://localhost:8888/slow?id=1
http://localhost:8888/slow?id=2
http://localhost:8888/slow?id=3
http://localhost:8888/fast?id=1
http://localhost:8888/fast?id=2
http://localhost:8888/fast?id=3
http://localhost:8888/fast?id=4
http://localhost:8888/fast?id=5
http://localhost:8888/fast?id=6
http://localhost:8888/fast?id=7
http://localhost:8888/fast?id=8
http://localhost:8888/fast?id=9
http://localhost:8888/fast?id=10
http://localhost:8888/fast?id=11
http://localhost:8888/fast?id=12
http://localhost:8888/fast?id=13
http://localhost:8888/fast?id=14
http://localhost:8888/fast?id=15
http://localhost:8888/fast?id=16
http://localhost:8888/fast?id=17
http://localhost:8888/fast?id=18
http://localhost:8888/fast?id=19
http://localhost:8888/fast?id=20
Start sending requests....
slow result 1 <--- 1.338 s
fast result 1
fast result 2
fast result 3
fast result 4
fast result 5
fast result 6
fast result 7
slow result 2 <--- 1.169 s
slow result 3 <--- 1.130 s
fast result 8
fast result 9
fast result 10
fast result 11
fast result 13
fast result 12
fast result 14
fast result 15
fast result 16
fast result 18
fast result 17
fast result 19
fast result 20
Al ejecutar python call_multi.py slow_threaded
(el comportamiento deseado):
Scheduling Get Requests:
------------------------
http://localhost:8888/slow_threaded?id=1
http://localhost:8888/slow_threaded?id=2
http://localhost:8888/slow_threaded?id=3
http://localhost:8888/fast?id=1
http://localhost:8888/fast?id=2
http://localhost:8888/fast?id=3
http://localhost:8888/fast?id=4
http://localhost:8888/fast?id=5
http://localhost:8888/fast?id=6
http://localhost:8888/fast?id=7
http://localhost:8888/fast?id=8
http://localhost:8888/fast?id=9
http://localhost:8888/fast?id=10
http://localhost:8888/fast?id=11
http://localhost:8888/fast?id=12
http://localhost:8888/fast?id=13
http://localhost:8888/fast?id=14
http://localhost:8888/fast?id=15
http://localhost:8888/fast?id=16
http://localhost:8888/fast?id=17
http://localhost:8888/fast?id=18
http://localhost:8888/fast?id=19
http://localhost:8888/fast?id=20
Start sending requests....
fast result 1
fast result 2
fast result 3
fast result 4
fast result 5
fast result 6
fast result 7
fast result 8
fast result 9
fast result 10
fast result 11
fast result 12
fast result 13
fast result 14
fast result 15
fast result 19
fast result 20
fast result 17
fast result 16
fast result 18
slow result 2 <--- 2.485 s
slow result 3 <--- 2.491 s
slow result 1 <--- 2.517 s
Si está dispuesto a usar concurrent.futures.ProcessPoolExecutor
lugar de multiprocessing
, esto es realmente muy simple. El ioloop de Tornado ya admite concurrent.futures.Future
, por lo que jugarán muy bien juntos fuera de la caja. concurrent.futures
está incluido en Python 3.2+, y ha sido portado a Python 2.x.
Aquí hay un ejemplo:
import time
from concurrent.futures import ProcessPoolExecutor
from tornado.ioloop import IOLoop
from tornado import gen
def f(a, b, c, blah=None):
print "got %s %s %s and %s" % (a, b, c, blah)
time.sleep(5)
return "hey there"
@gen.coroutine
def test_it():
pool = ProcessPoolExecutor(max_workers=1)
fut = pool.submit(f, 1, 2, 3, blah="ok") # This returns a concurrent.futures.Future
print("running it asynchronously")
ret = yield fut
print("it returned %s" % ret)
pool.shutdown()
IOLoop.instance().run_sync(test_it)
Salida:
running it asynchronously
got 1 2 3 and ok
it returned hey there
ProcessPoolExecutor
tiene una API más limitada que multiprocessing.Pool
, pero si no necesita las funciones más avanzadas de multiprocessing.Pool
, vale la pena utilizarla porque la integración es mucho más sencilla.
Si sus solicitudes de obtención tardan tanto, entonces el tornado es el marco incorrecto.
Le sugiero que use nginx para enrutar los torneos rápidos al tornado y los más lentos a un servidor diferente.
PeterBe tiene un artículo interesante en el que ejecuta múltiples servidores Tornado y establece que uno de ellos es "el más lento" para manejar las solicitudes de larga duración.
multiprocessing.Pool
se puede integrar en el bucle de E / S de tornado
, pero es un poco desordenado. Se puede hacer una integración mucho más limpia usando concurrent.futures
(vea mi otra respuesta para más detalles), pero si está atascado en Python 2.xy no puede instalar el backport concurrent.futures
, aquí le explicamos cómo puede hacerlo estrictamente. utilizando multiprocessing
:
Los métodos multiprocessing.Pool.apply_async
y multiprocessing.Pool.map_async
tienen un parámetro de callback
opcional, lo que significa que ambos pueden potencialmente conectarse a un tornado.gen.Task
. Entonces, en la mayoría de los casos, ejecutar el código de forma asíncrona en un subproceso es tan simple como esto:
import multiprocessing
import contextlib
from tornado import gen
from tornado.gen import Return
from tornado.ioloop import IOLoop
from functools import partial
def worker():
print "async work here"
@gen.coroutine
def async_run(func, *args, **kwargs):
result = yield gen.Task(pool.apply_async, func, args, kwargs)
raise Return(result)
if __name__ == "__main__":
pool = multiprocessing.Pool(multiprocessing.cpu_count())
func = partial(async_run, worker)
IOLoop().run_sync(func)
Como mencioné, esto funciona bien en la mayoría de los casos. Pero si worker()
lanza una excepción, la callback
nunca se llama, lo que significa que la gen.Task
termina, y se cuelga para siempre. Ahora, si sabe que su trabajo nunca arrojará una excepción (porque envolvió todo en un try
/ except
, por ejemplo), puede utilizar este enfoque felizmente. Sin embargo, si desea dejar que las excepciones se escapen de su trabajador, la única solución que encontré fue subclasificar algunos componentes de multiprocesamiento y hacer que devuelvan la callback
incluso si el subproceso de trabajo generó una excepción:
from multiprocessing.pool import ApplyResult, Pool, RUN
import multiprocessing
class TornadoApplyResult(ApplyResult):
def _set(self, i, obj):
self._success, self._value = obj
if self._callback:
self._callback(self._value)
self._cond.acquire()
try:
self._ready = True
self._cond.notify()
finally:
self._cond.release()
del self._cache[self._job]
class TornadoPool(Pool):
def apply_async(self, func, args=(), kwds={}, callback=None):
'''''' Asynchronous equivalent of `apply()` builtin
This version will call `callback` even if an exception is
raised by `func`.
''''''
assert self._state == RUN
result = TornadoApplyResult(self._cache, callback)
self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
return result
...
if __name__ == "__main__":
pool = TornadoPool(multiprocessing.cpu_count())
...
Con estos cambios, el objeto de excepción será devuelto por gen.Task
, en lugar de gen.Task
cuelga indefinidamente. También actualicé mi método async_run
para volver a generar la excepción cuando se devolvió, e hice algunos otros cambios para proporcionar mejores rastreos para las excepciones lanzadas en los subprocesos de trabajo. Aquí está el código completo:
import multiprocessing
from multiprocessing.pool import Pool, ApplyResult, RUN
from functools import wraps
import tornado.web
from tornado.ioloop import IOLoop
from tornado.gen import Return
from tornado import gen
class WrapException(Exception):
def __init__(self):
exc_type, exc_value, exc_tb = sys.exc_info()
self.exception = exc_value
self.formatted = ''''.join(traceback.format_exception(exc_type, exc_value, exc_tb))
def __str__(self):
return ''/n%s/nOriginal traceback:/n%s'' % (Exception.__str__(self), self.formatted)
class TornadoApplyResult(ApplyResult):
def _set(self, i, obj):
self._success, self._value = obj
if self._callback:
self._callback(self._value)
self._cond.acquire()
try:
self._ready = True
self._cond.notify()
finally:
self._cond.release()
del self._cache[self._job]
class TornadoPool(Pool):
def apply_async(self, func, args=(), kwds={}, callback=None):
'''''' Asynchronous equivalent of `apply()` builtin
This version will call `callback` even if an exception is
raised by `func`.
''''''
assert self._state == RUN
result = TornadoApplyResult(self._cache, callback)
self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
return result
@gen.coroutine
def async_run(func, *args, **kwargs):
""" Runs the given function in a subprocess.
This wraps the given function in a gen.Task and runs it
in a multiprocessing.Pool. It is meant to be used as a
Tornado co-routine. Note that if func returns an Exception
(or an Exception sub-class), this function will raise the
Exception, rather than return it.
"""
result = yield gen.Task(pool.apply_async, func, args, kwargs)
if isinstance(result, Exception):
raise result
raise Return(result)
def handle_exceptions(func):
""" Raise a WrapException so we get a more meaningful traceback"""
@wraps(func)
def inner(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception:
raise WrapException()
return inner
# Test worker functions
@handle_exceptions
def test2(x):
raise Exception("eeee")
@handle_exceptions
def test(x):
print x
time.sleep(2)
return "done"
class TestHandler(tornado.web.RequestHandler):
@gen.coroutine
def get(self):
try:
result = yield async_run(test, "inside get")
self.write("%s/n" % result)
result = yield async_run(test2, "hi2")
except Exception as e:
print("caught exception in get")
self.write("Caught an exception: %s" % e)
finally:
self.finish()
app = tornado.web.Application([
(r"/test", TestHandler),
])
if __name__ == "__main__":
pool = TornadoPool(4)
app.listen(8888)
IOLoop.instance().start()
Así es como se comporta para el cliente:
dan@dan:~$ curl localhost:8888/test
done
Caught an exception:
Original traceback:
Traceback (most recent call last):
File "./mutli.py", line 123, in inner
return func(*args, **kwargs)
File "./mutli.py", line 131, in test2
raise Exception("eeee")
Exception: eeee
Y si envío dos solicitudes de curvatura simultáneas, podemos ver que se manejan de forma asíncrona en el lado del servidor:
dan@dan:~$ ./mutli.py
inside get
inside get
caught exception inside get
caught exception inside get
Editar:
Tenga en cuenta que este código se vuelve más sencillo con Python 3, porque introduce un argumento de palabra clave error_callback
en todos multiprocessing.Pool
métodos de multiprocessing.Pool
asíncrono.Pool. Esto hace que sea mucho más fácil de integrar con Tornado:
class TornadoPool(Pool):
def apply_async(self, func, args=(), kwds={}, callback=None):
'''''' Asynchronous equivalent of `apply()` builtin
This version will call `callback` even if an exception is
raised by `func`.
''''''
super().apply_async(func, args, kwds, callback=callback,
error_callback=callback)
@gen.coroutine
def async_run(func, *args, **kwargs):
""" Runs the given function in a subprocess.
This wraps the given function in a gen.Task and runs it
in a multiprocessing.Pool. It is meant to be used as a
Tornado co-routine. Note that if func returns an Exception
(or an Exception sub-class), this function will raise the
Exception, rather than return it.
"""
result = yield gen.Task(pool.apply_async, func, args, kwargs)
raise Return(result)
Todo lo que debemos hacer en nuestro apply_async
invalidado es llamar al padre con el argumento de palabra clave error_callback
, además del kwarg de callback
. No hay necesidad de anular ApplyResult
.
Podemos llegar a ser más sofisticados usando una MetaClass en nuestro TornadoPool
, para permitir que sus métodos *_async
sean llamados directamente como si fueran coroutines:
import time
from functools import wraps
from multiprocessing.pool import Pool
import tornado.web
from tornado import gen
from tornado.gen import Return
from tornado import stack_context
from tornado.ioloop import IOLoop
from tornado.concurrent import Future
def _argument_adapter(callback):
def wrapper(*args, **kwargs):
if kwargs or len(args) > 1:
callback(Arguments(args, kwargs))
elif args:
callback(args[0])
else:
callback(None)
return wrapper
def PoolTask(func, *args, **kwargs):
""" Task function for use with multiprocessing.Pool methods.
This is very similar to tornado.gen.Task, except it sets the
error_callback kwarg in addition to the callback kwarg. This
way exceptions raised in pool worker methods get raised in the
parent when the Task is yielded from.
"""
future = Future()
def handle_exception(typ, value, tb):
if future.done():
return False
future.set_exc_info((typ, value, tb))
return True
def set_result(result):
if future.done():
return
if isinstance(result, Exception):
future.set_exception(result)
else:
future.set_result(result)
with stack_context.ExceptionStackContext(handle_exception):
cb = _argument_adapter(set_result)
func(*args, callback=cb, error_callback=cb)
return future
def coro_runner(func):
""" Wraps the given func in a PoolTask and returns it. """
@wraps(func)
def wrapper(*args, **kwargs):
return PoolTask(func, *args, **kwargs)
return wrapper
class MetaPool(type):
""" Wrap all *_async methods in Pool with coro_runner. """
def __new__(cls, clsname, bases, dct):
pdct = bases[0].__dict__
for attr in pdct:
if attr.endswith("async") and not attr.startswith(''_''):
setattr(bases[0], attr, coro_runner(pdct[attr]))
return super().__new__(cls, clsname, bases, dct)
class TornadoPool(Pool, metaclass=MetaPool):
pass
# Test worker functions
def test2(x):
print("hi2")
raise Exception("eeee")
def test(x):
print(x)
time.sleep(2)
return "done"
class TestHandler(tornado.web.RequestHandler):
@gen.coroutine
def get(self):
try:
result = yield pool.apply_async(test, ("inside get",))
self.write("%s/n" % result)
result = yield pool.apply_async(test2, ("hi2",))
self.write("%s/n" % result)
except Exception as e:
print("caught exception in get")
self.write("Caught an exception: %s" % e)
raise
finally:
self.finish()
app = tornado.web.Application([
(r"/test", TestHandler),
])
if __name__ == "__main__":
pool = TornadoPool()
app.listen(8888)
IOLoop.instance().start()