starmap set_start_method parallelize how from python sockets multiprocessing

set_start_method - python pool process



Forma correcta de cancelar aceptar y cerrar una conexión de escucha de procesamiento/multiproceso de Python (4)

(Estoy usando el módulo de pyprocessing en este ejemplo, pero reemplazar el procesamiento con multiprocesamiento probablemente debería funcionar si ejecuta Python 2.6 o utiliza el backport de multiprocesamiento )

Actualmente tengo un programa que escucha un socket de Unix (usando un processing.connection.Listener), acepta conexiones y engendra un hilo que maneja la solicitud. En cierto punto, quiero salir del proceso con elegancia, pero como la llamada accept () - está bloqueando y no veo la forma de cancelarla de una manera agradable. Tengo una manera que funciona aquí (OS X) al menos, estableciendo un manejador de señal y señalizando el proceso desde otro hilo así:

import processing from processing.connection import Listener import threading import time import os import signal import socket import errno # This is actually called by the connection handler. def closeme(): time.sleep(1) print ''Closing socket...'' listener.close() os.kill(processing.currentProcess().getPid(), signal.SIGPIPE) oldsig = signal.signal(signal.SIGPIPE, lambda s, f: None) listener = Listener(''/tmp/asdf'', ''AF_UNIX'') # This is a thread that handles one already accepted connection, left out for brevity threading.Thread(target=closeme).start() print ''Accepting...'' try: listener.accept() except socket.error, e: if e.args[0] != errno.EINTR: raise # Cleanup here... print ''Done...''

La única otra forma en que he pensado es en profundizar en la conexión (listener._listener._socket) y establecer la opción de no bloqueo ... pero eso probablemente tiene algunos efectos secundarios y en general da mucho miedo.

¿Alguien tiene una manera más elegante (y tal vez incluso correcta) de lograr esto? Debe ser portátil para OS X, Linux y BSD, pero la portabilidad de Windows, etc. no es necesaria.

Aclaración : ¡Gracias a todos! Como de costumbre, las ambigüedades en mi pregunta original se revelan :)

  • Necesito realizar una limpieza después de haber cancelado la escucha, y no siempre quiero salir de ese proceso.
  • Necesito poder acceder a este proceso desde otros procesos no generados desde el mismo padre, lo que hace que las Colas sean difíciles de manejar
  • Las razones para los hilos son que:
    • Acceden a un estado compartido. En realidad, más o menos una base de datos en memoria común, así que supongo que podría hacerse de manera diferente.
    • Debo poder tener varias conexiones aceptadas al mismo tiempo, pero los hilos actuales bloquean algo la mayor parte del tiempo. Cada conexión aceptada genera un nuevo hilo; esto para no bloquear a todos los clientes en operaciones de E / S.

En cuanto a los hilos frente a los procesos, utilizo hilos para hacer que mis operaciones de bloqueo no bloqueen y los procesos para permitir el multiprocesamiento.


Probablemente no sea lo ideal, pero puede liberar el bloque enviando al socket algunos datos del manejador de señal o el hilo que está terminando el proceso.

EDITAR: Otra forma de implementar esto podría ser utilizar las colas de conexión , ya que parecen soportar tiempos de espera (disculpas, leí mal su código en mi primera lectura).


Soy nuevo en el módulo de multiprocesamiento, pero me parece que mezclar el módulo de procesamiento y el módulo de subprocesamiento es contrario a la intuición, ¿no están dirigidos a resolver el mismo problema?

De todos modos, ¿qué tal envolver tus funciones de escucha en un proceso en sí mismo? No estoy seguro de cómo esto afecta el resto de tu código, pero esta puede ser una alternativa más limpia.

from multiprocessing import Process from multiprocessing.connection import Listener class ListenForConn(Process): def run(self): listener = Listener(''/tmp/asdf'', ''AF_UNIX'') listener.accept() # do your other handling here listen_process = ListenForConn() listen_process.start() print listen_process.is_alive() listen_process.terminate() listen_process.join() print listen_process.is_alive() print ''No more listen process.''


¿No es eso para lo que es selecto?

Solo llame a accept en el socket si la selección indica que no bloqueará ...

La selección tiene un tiempo de espera, por lo que puede aparecer de vez en cuando de vez en cuando para verificar si es hora de apagarla ...


Pensé que podría evitarlo, pero parece que tengo que hacer algo como esto:

from processing import connection connection.Listener.fileno = lambda self: self._listener._socket.fileno() import select l = connection.Listener(''/tmp/x'', ''AF_UNIX'') r, w, e = select.select((l, ), (), ()) if l in r: print "Accepting..." c = l.accept() # ...

Soy consciente de que esto infringe la ley de demeter y presenta algunos parches de mono malvados, pero parece que esta sería la forma más fácil de lograr esto. Si alguien tiene una solución más elegante, me gustaría escucharla :)