python multithreading flask pipe popen

python - Usar Popen en un hilo bloquea cada solicitud entrante de Flask-SocketIO



multithreading pipe (1)

Tengo la siguiente situación: Recibo una solicitud en un servidor socketio. Respondo (socket.emit (..)) y luego comienzo algo con una pesada carga de cómputo en otro hilo .

Si el cálculo pesado es causado por subprocess.Popen (usando subprocess.PIPE ), bloquea totalmente cada solicitud entrante siempre que se esté ejecutando, aunque suceda en un subproceso separado.

No hay problema: en este hilo se sugirió leer de forma asíncrona el resultado del subproceso con un tamaño de búfer de 1 para que entre estas lecturas otros hilos tengan la oportunidad de hacer algo. Desafortunadamente esto no me ayudó.

También ya monkeypatched puesto a monkeypatched evento y funciona bien, siempre y cuando no use subprocess.Popen con subprocess.PIPE en el subproceso.

En este ejemplo de código, puede ver que solo ocurre utilizando subprocess.Popen with subprocess.PIPE . Cuando #functionWithSimulatedHeavyLoad() comentario de #functionWithSimulatedHeavyLoad() y en su lugar comente functionWithHeavyLoad() todo funciona a la perfección.

from flask import Flask from flask.ext.socketio import SocketIO, emit import eventlet eventlet.monkey_patch() app = Flask(__name__) socketio = SocketIO(app) import time from threading import Thread @socketio.on(''client command'') def response(data, type = None, nonce = None): socketio.emit(''client response'', [''foo'']) thread = Thread(target = testThreadFunction) thread.daemon = True thread.start() def testThreadFunction(): #functionWithSimulatedHeavyLoad() functionWithHeavyLoad() def functionWithSimulatedHeavyLoad(): time.sleep(5) def functionWithHeavyLoad(): from datetime import datetime import subprocess import sys from queue import Queue, Empty ON_POSIX = ''posix'' in sys.builtin_module_names def enqueueOutput(out, queue): for line in iter(out.readline, b''''): if line == '''': break queue.put(line) out.close() # just anything that takes long to be computed shellCommand = ''find / test'' p = subprocess.Popen(shellCommand, universal_newlines=True, shell=True, stdout=subprocess.PIPE, bufsize=1, close_fds=ON_POSIX) q = Queue() t = Thread(target = enqueueOutput, args = (p.stdout, q)) t.daemon = True t.start() t.join() text = '''' while True: try: line = q.get_nowait() text += line print(line) except Empty: break socketio.emit(''client response'', {''text'': text}) socketio.run(app)

El cliente recibe el mensaje ''foo'' después de que se complete el trabajo de bloqueo en la función functionWithHeavyLoad (). Sin embargo, debería recibir el mensaje antes.

Esta muestra se puede copiar y pegar en un archivo .py y el comportamiento se puede reproducir instantáneamente.

Estoy usando Python 3.4.3, Flask 0.10.1, flask-socketio1.2, eventlet 0.17.4

Actualizar

Si pongo esto en la función functionWithHeavyLoad, realmente funciona y todo está bien:

import shlex shellCommand = shlex.split(''find / test'') popen = subprocess.Popen(shellCommand, stdout=subprocess.PIPE) lines_iterator = iter(popen.stdout.readline, b"") for line in lines_iterator: print(line) eventlet.sleep()

El problema es que utilicé find para cargas pesadas con el fin de hacer que la muestra para usted sea más fácilmente reproducible. Sin embargo, en mi código realmente uso tesseract "{0}" stdout -l deu como comando de venta. Esto (a diferencia de find ) todavía bloquea todo. ¿Es este un problema más bien tesseract que eventlet? Pero aún así: ¿cómo puede este bloqueo si ocurre en un hilo separado donde lee línea por línea con el cambio de contexto cuando la find no bloquea?


Gracias a esta pregunta aprendí algo nuevo hoy. Eventlet ofrece una versión de subproceso y sus funciones amigables con los greenlets, pero por alguna extraña razón, no modifica este módulo en la biblioteca estándar.

Enlace a la implementación eventlet del subproceso: https://github.com/eventlet/eventlet/blob/master/eventlet/green/subprocess.py

En cuanto al parche de eventos, los módulos parcheados son os, select, socket, thread, time, MySQLdb, builtins y psycopg2. No hay absolutamente ninguna referencia al subproceso en el parcheador.

La buena noticia es que pude trabajar con Popen() en una aplicación muy similar a la suya, después de reemplazar:

import subprocess

con:

from eventlet.green import subprocess

Pero tenga en cuenta que la versión actual de eventlet (0.17.4) no es compatible con la opción universal_newlines en Popen , obtendrá un error si lo usa. El soporte para esta opción está en el maestro (aquí está la commit que agregó la opción). Tendrá que eliminar esa opción de su llamada o instalar la rama maestra de eventlet directamente desde github.