pelicula - tornado python mac
¿Cómo detengo el servidor web Tornado? (9)
He estado jugando un poco con el servidor web Tornado y he llegado a un punto en el que quiero detener el servidor web (por ejemplo, durante las pruebas de unidad). El siguiente ejemplo simple existe en la página web de Tornado :
import tornado.ioloop
import tornado.web
class MainHandler(tornado.web.RequestHandler):
def get(self):
self.write("Hello, world")
application = tornado.web.Application([
(r"/", MainHandler),
])
if __name__ == "__main__":
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()
Una vez que se tornado.ioloop.IOLoop.instance().start()
, bloquea el programa (o el hilo actual). Al leer el código fuente del objeto IOLoop
este ejemplo en la documentación de la función de stop
:
To use asynchronous methods from otherwise-synchronous code (such as
unit tests), you can start and stop the event loop like this:
ioloop = IOLoop()
async_method(ioloop=ioloop, callback=ioloop.stop)
ioloop.start()
ioloop.start() will return after async_method has run its callback,
whether that callback was invoked before or after ioloop.start.
Sin embargo, no tengo idea de cómo integrar esto en mi programa. De hecho, tengo una clase que encapsula el servidor web (que tiene sus propias funciones de start
y stop
), pero tan pronto como lo llamo inicio, el programa (o pruebas) se bloqueará de todos modos.
He intentado iniciar el servidor web en otro proceso (utilizando el paquete de multiprocessing
). Esta es la clase que está envolviendo el servidor web:
class Server:
def __init__(self, port=8888):
self.application = tornado.web.Application([ (r"/", Handler) ])
def server_thread(application, port):
http_server = tornado.httpserver.HTTPServer(application)
http_server.listen(port)
tornado.ioloop.IOLoop.instance().start()
self.process = Process(target=server_thread,
args=(self.application, port,))
def start(self):
self.process.start()
def stop(self):
ioloop = tornado.ioloop.IOLoop.instance()
ioloop.add_callback(ioloop.stop)
Sin embargo, la detención no parece detener completamente el servidor web, ya que aún se está ejecutando en la próxima prueba, incluso con esta configuración de prueba:
def setup_method(self, _function):
self.server = Server()
self.server.start()
time.sleep(0.5) # Wait for web server to start
def teardown_method(self, _function):
self.kstore.stop()
time.sleep(0.5)
¿Cómo puedo iniciar y detener un servidor web Tornado desde un programa Python?
Aquí está la solución de cómo detener a Torando de otro hilo. Schildmeijer proporcionó una buena pista, pero me tomó un tiempo para imaginar realmente el ejemplo final que funciona.
Por favor ver más abajo:
import threading
import tornado.ioloop
import tornado.web
import time
class MainHandler(tornado.web.RequestHandler):
def get(self):
self.write("Hello, world!/n")
def start_tornado(*args, **kwargs):
application = tornado.web.Application([
(r"/", MainHandler),
])
application.listen(8888)
print "Starting Torando"
tornado.ioloop.IOLoop.instance().start()
print "Tornado finished"
def stop_tornado():
ioloop = tornado.ioloop.IOLoop.instance()
ioloop.add_callback(ioloop.stop)
print "Asked Tornado to exit"
def main():
t = threading.Thread(target=start_tornado)
t.start()
time.sleep(5)
stop_tornado()
t.join()
if __name__ == "__main__":
main()
En caso de que no quiera molestarse con los hilos, podría detectar una señal de interrupción del teclado:
try:
tornado.ioloop.IOLoop.instance().start()
# signal : CTRL + BREAK on windows or CTRL + C on linux
except KeyboardInterrupt:
tornado.ioloop.IOLoop.instance().stop()
Hay un problema con la solución de Zaar Hai, a saber, que deja el zócalo abierto. La razón por la que buscaba una solución para detener Tornado es que estoy ejecutando pruebas unitarias contra mi servidor de aplicaciones y necesitaba una forma de iniciar / detener el servidor entre pruebas para tener un estado claro (sesión vacía, etc.). Al dejar el zócalo abierto, la segunda prueba siempre se encontró con un error de Address already in use
. Entonces se me ocurrió lo siguiente:
import logging as log
from time import sleep
from threading import Thread
import tornado
from tornado.httpserver import HTTPServer
server = None
thread = None
def start_app():
def start():
global server
server = HTTPServer(create_app())
server.listen(TEST_PORT, TEST_HOST)
tornado.ioloop.IOLoop.instance().start()
global thread
thread = Thread(target=start)
thread.start()
# wait for the server to fully initialize
sleep(0.5)
def stop_app():
server.stop()
# silence StreamClosedError Tornado is throwing after it is stopped
log.getLogger().setLevel(log.FATAL)
ioloop = tornado.ioloop.IOLoop.instance()
ioloop.add_callback(ioloop.stop)
thread.join()
Entonces, la idea principal aquí es mantener una referencia a la instancia de HTTPServer
y llamar a su método stop()
. Y create_app()
simplemente devuelve una instancia de la Application
configurada con controladores. Ahora puedes usar estos métodos en tus pruebas unitarias de esta manera:
class FoobarTest(unittest.TestCase):
def setUp(self):
start_app()
def tearDown(self):
stop_app()
def test_foobar(self):
# here the server is up and running, so you can make requests to it
pass
La IOloop.instance () de Tornado tiene problemas para detener una señal externa cuando se ejecuta en multiprocesamiento. Proceso.
La única solución que se me ocurrió que funciona de forma coherente es mediante el uso de Process.terminate ():
import tornado.ioloop, tornado.web
import time
import multiprocessing
class Handler(tornado.web.RequestHandler):
def get(self):
self.write("Hello, world")
application = tornado.web.Application([ (r"/", Handler) ])
class TornadoStop(Exception):
pass
def stop():
raise TornadoStop
class worker(multiprocessing.Process):
def __init__(self):
multiprocessing.Process.__init__(self)
application.listen(8888)
self.ioloop = tornado.ioloop.IOLoop.instance()
def run(self):
self.ioloop.start()
def stop(self, timeout = 0):
self.ioloop.stop()
time.sleep(timeout)
self.terminate()
if __name__ == "__main__":
w = worker()
print ''starting server''
w.start()
t = 2
print ''waiting {} seconds before stopping''.format(t)
for i in range(t):
time.sleep(1)
print i
print ''stopping''
w.stop(1)
print ''stopped''
Me topé con esto y encontré este problema yo mismo, y al usar la información de este hilo surgió lo siguiente. Simplemente tomé mi código de Tornado autónomo de trabajo (copiado de todos los ejemplos) y moví el código de inicio real a una función. Entonces llamé a la función como un hilo de rosca. Mi caso fue diferente, ya que la llamada de subprocesos se realizó desde mi código existente, donde recién importé las rutinas startTornado y stopTornado.
La sugerencia anterior parecía funcionar muy bien, así que pensé que iba a proporcionar el código de ejemplo faltante. Probé este código en Linux en un sistema FC16 (y arreglé mi tipo inicial o).
import tornado.ioloop, tornado.web
class Handler(tornado.web.RequestHandler):
def get(self):
self.write("Hello, world")
application = tornado.web.Application([ (r"/", Handler) ])
def startTornado():
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()
def stopTornado():
tornado.ioloop.IOLoop.instance().stop()
if __name__ == "__main__":
import time, threading
threading.Thread(target=startTornado).start()
print "Your web server will self destruct in 2 minutes"
time.sleep(120)
stopTornado()
Espero que esto ayude a la siguiente persona.
Para detener todo el ioloop, simplemente invoque el método ioloop.stop cuando haya terminado la prueba de la unidad. (Recuerde que el único método seguro de subprocesos (documentado) es ioloop.add_callback, es decir, si la prueba de la unidad es ejecutada por otro subproceso, podría incluir la invocación de detención en una devolución de llamada)
Si es suficiente para detener el servidor web http, invoque el httpserver httpserver .stop ()
Queremos utilizar un proceso de multiprocessing.Process
con un tornado.ioloop.IOLoop
para trabajar alrededor de la cPython GIL por desempeño e independencia. Para obtener acceso al IOLoop, necesitamos usar la Queue
para pasar la señal de apagado.
Aquí hay un ejemplo minimalista:
class Server(BokehServer)
def start(self, signal=None):
logger.info(''Starting server on http://localhost:%d''
% (self.port))
if signal is not None:
def shutdown():
if not signal.empty():
self.stop()
tornado.ioloop.PeriodicCallback(shutdown, 1000).start()
BokehServer.start(self)
self.ioloop.start()
def stop(self, *args, **kwargs): # args important for signals
logger.info(''Stopping server...'')
BokehServer.stop(self)
self.ioloop.stop()
El proceso
import multiprocessing as mp
import signal
from server import Server # noqa
class ServerProcess(mp.Process):
def __init__(self, *args, **kwargs):
self.server = Server(*args, **kwargs)
self.shutdown_signal = _mp.Queue(1)
mp.Process.__init__(self)
signal.signal(signal.SIGTERM, self.server.stop)
signal.signal(signal.SIGINT, self.server.stop)
def run(self):
self.server.start(signal=self.shutdown_signal)
def stop(self):
self.shutdown_signal.put(True)
if __name__ == ''__main__'':
p = ServerProcess()
p.start()
¡Aclamaciones!
Si necesita este comportamiento para la prueba de unidad, eche un vistazo a tornado.testing.AsyncTestCase .
De forma predeterminada, se construye un nuevo IOLoop para cada prueba y está disponible como self.io_loop. Este IOLoop se debe utilizar en la construcción de clientes / servidores HTTP, etc. Si el código que se está probando requiere un IOLoop global, las subclases deben reemplazar a get_new_ioloop para devolverlo.
Si necesita iniciar y detener una IOLoop para algún otro propósito y no puede llamar a ioloop.stop () desde una devolución de llamada por alguna razón, es posible una implementación de múltiples subprocesos. Sin embargo, para evitar las condiciones de carrera, debe sincronizar el acceso al ioloop, lo que en realidad es imposible. Algo como lo siguiente resultará en un punto muerto:
Hilo 1:
with lock:
ioloop.start()
Hilo 2:
with lock:
ioloop.stop()
porque el hilo 1 nunca liberará el bloqueo (inicio () está bloqueando) y el hilo 2 esperará hasta que se libere el bloqueo para detener el ioloop.
La única forma de hacerlo es que el subproceso 2 llame a ioloop.add_callback (ioloop.stop), que llamará a stop () en el subproceso 1 en la siguiente iteración del bucle de eventos. Tenga la seguridad de que ioloop.add_callback () es thread-safe .
Solo agrega esto antes del inicio ():
IOLoop.instance (). Add_timeout (10, IOLoop.instance (). Stop)
Registrará la función de parada como una devolución de llamada en el bucle y la ejecutará 10 segundos después del inicio