starmap set_start_method parallelize how from python multiprocessing dictionary

set_start_method - Cómo sincronizar un dict de python con multiprocesamiento



python pool process (4)

Estoy usando Python 2.6 y el módulo de multiprocesamiento para subprocesos múltiples. Ahora me gustaría tener un dict sincronizado (donde la única operación atómica que realmente necesito es el operador + = en un valor).

¿Debería envolver el dict con una llamada multiprocessing.sharedctypes.synchronized ()? ¿O es otra manera el camino a seguir?


Introducción

Parece que hay muchas sugerencias de sillones y no hay ejemplos prácticos. Ninguna de las respuestas enumeradas aquí sugiere el uso de multiprocesamiento y esto es bastante decepcionante e inquietante. Como amantes de Python debemos apoyar nuestras bibliotecas integradas, y si bien el procesamiento y la sincronización en paralelo nunca es un asunto trivial, creo que puede hacerse trivial con un diseño adecuado. ¡Esto se está volviendo extremadamente importante en las arquitecturas modernas de múltiples núcleos y no se puede enfatizar lo suficiente! Dicho esto, estoy muy lejos de estar satisfecho con la biblioteca de multiprocesamiento, ya que todavía está en sus etapas iniciales con bastantes escollos, errores y está orientado hacia la programación funcional (lo que detesto). Actualmente todavía prefiero el módulo Pyro (que está muy adelantado a su tiempo) sobre el multiprocesamiento debido a la severa limitación del multiprocesamiento al no poder compartir objetos recién creados mientras el servidor se está ejecutando. El método de clase "registro" de los objetos del administrador solo registrará realmente un objeto ANTES de que se inicie el administrador (o su servidor). Basta de charla, más código:

Servidor.py

from multiprocessing.managers import SyncManager class MyManager(SyncManager): pass syncdict = {} def get_dict(): return syncdict if __name__ == "__main__": MyManager.register("syncdict", get_dict) manager = MyManager(("127.0.0.1", 5000), authkey="password") manager.start() raw_input("Press any key to kill server".center(50, "-")) manager.shutdown()

En el ejemplo de código anterior, Server.py utiliza el SyncManager de multiproceso que puede suministrar objetos compartidos sincronizados. Este código no funcionará ejecutándose en el intérprete porque la biblioteca de multiprocesamiento es bastante delicada sobre cómo encontrar el "llamable" para cada objeto registrado. Al ejecutar Server.py se iniciará un SyncManager personalizado que comparte el diccionario syncdict para el uso de múltiples procesos y se puede conectar a los clientes en la misma máquina, o si se ejecuta en una dirección IP que no sea loopback, otras máquinas. En este caso, el servidor se ejecuta en bucle de retorno (127.0.0.1) en el puerto 5000. El uso del parámetro authkey usa conexiones seguras al manipular syncdict. Cuando se presiona alguna tecla, el administrador se apaga.

Client.py

from multiprocessing.managers import SyncManager import sys, time class MyManager(SyncManager): pass MyManager.register("syncdict") if __name__ == "__main__": manager = MyManager(("127.0.0.1", 5000), authkey="password") manager.connect() syncdict = manager.syncdict() print "dict = %s" % (dir(syncdict)) key = raw_input("Enter key to update: ") inc = float(raw_input("Enter increment: ")) sleep = float(raw_input("Enter sleep time (sec): ")) try: #if the key doesn''t exist create it if not syncdict.has_key(key): syncdict.update([(key, 0)]) #increment key value every sleep seconds #then print syncdict while True: syncdict.update([(key, syncdict.get(key) + inc)]) time.sleep(sleep) print "%s" % (syncdict) except KeyboardInterrupt: print "Killed client"

El cliente también debe crear un SyncManager personalizado, registrando "syncdict", esta vez sin pasar un llamador para recuperar el dictado compartido. Luego utiliza el SycnManager personalizado para conectarse utilizando la dirección IP de bucle de retorno (127.0.0.1) en el puerto 5000 y una clave de autenticación que establece una conexión segura con el administrador iniciado en Server.py. Recupera el dictado de dictado compartido llamando al nombre registrado en el administrador. Solicita al usuario lo siguiente:

  1. La clave en Syncdict para operar en
  2. La cantidad para incrementar el valor accedido por la clave en cada ciclo.
  3. La cantidad de tiempo para dormir por ciclo en segundos

El cliente luego verifica si existe la clave. Si no lo hace, crea la clave en el syncdict. Luego, el cliente ingresa un bucle "sin fin" en el que actualiza el valor de la clave por el incremento, duerme la cantidad especificada e imprime el syncdict solo para repetir este proceso hasta que se produzca una Interrupción de teclado (Ctrl + C).

Problemas molestos

  1. Los métodos de registro del Gerente DEBEN ser llamados antes de que se inicie el administrador, de lo contrario obtendrá excepciones aunque una llamada de directorio en el Administrador revelará que efectivamente tiene el método que se registró.
  2. Todas las manipulaciones del dict deben realizarse con métodos y no con asignaciones de dict (syncdict ["blast"] = 2 fallará miserablemente debido a la forma en que el multiprocesamiento comparte objetos personalizados)
  3. Usar el método dict de SyncManager aliviaría el molesto problema # 2, excepto que el molesto problema # 1 evita que el proxy devuelto por SyncManager.dict () se registre y comparta. (SyncManager.dict () solo se puede llamar DESPUÉS de que se inicie el administrador, y el registro solo funcionará ANTES de que se inicie el administrador, por lo que SyncManager.dict () solo es útil cuando se realiza la programación funcional y se pasa el proxy a Procesos como un argumento como el doc ejemplos hacen)
  4. El servidor Y el cliente tienen que registrarse, aunque intuitivamente parece que el cliente solo podrá resolverlo después de conectarse al administrador (agregue esto a sus desarrolladores de multiprocesamiento de lista de deseos)

Clausura

Espero que hayas disfrutado esta respuesta tan minuciosa y lenta como yo. Estaba teniendo muchos problemas para pensar directamente por qué luchaba tanto con el módulo de multiprocesamiento en el que Pyro hace que sea una brisa y ahora, gracias a esta respuesta, he acertado en la cabeza. Espero que esto sea útil para la comunidad de Python sobre cómo mejorar el módulo de multiprocesamiento ya que creo que tiene una gran promesa, pero en su infancia no llega a lo que es posible. A pesar de los molestos problemas descritos, creo que esta es una alternativa bastante viable y es bastante simple. También puede usar SyncManager.dict () y pasarlo a Procesos como un argumento de la forma en que se muestran los documentos y probablemente sería una solución aún más simple, dependiendo de sus requisitos, simplemente me parece antinatural.


¿Hay alguna razón por la que el diccionario deba compartirse en primer lugar? ¿Podría hacer que cada hilo mantenga su propia instancia de un diccionario y combinarlo al final del procesamiento del hilo o usar periódicamente una devolución de llamada para fusionar copias de los diccionarios de hilos individuales juntos?

No sé exactamente lo que está haciendo, así que tenga en cuenta que es posible que mi plan escrito no funcione de manera textual. Lo que sugiero es más una idea de diseño de alto nivel.


En respuesta a una solución apropiada para el problema de escritura concurrente. Hice una investigación muy rápida y descubrí que este artículo sugiere una solución de bloqueo / semáforo. ( http://effbot.org/zone/thread-synchronization.htm )

Si bien el ejemplo no es específico en un diccionario, estoy bastante seguro de que podría codificar un objeto envoltorio basado en clases para ayudarlo a trabajar con diccionarios basados ​​en esta idea.

Si tuviera el requisito de implementar algo como esto de una manera segura para el subproceso, probablemente usaría la solución Python Semaphore. (Suponiendo que mi técnica de fusión anterior no funcionaría). Creo que los semáforos generalmente ralentizan las eficiencias de los hilos debido a su naturaleza de bloqueo.

Desde el sitio:

Un semáforo es un mecanismo de bloqueo más avanzado. Un semáforo tiene un contador interno en lugar de un indicador de bloqueo, y solo se bloquea si más de un número dado de hilos ha intentado mantener el semáforo. Dependiendo de cómo se inicialice el semáforo, esto permite que múltiples hilos accedan a la misma sección de código simultáneamente.

semaphore = threading.BoundedSemaphore() semaphore.acquire() # decrements the counter ... access the shared resource; work with dictionary, add item or whatever. semaphore.release() # increments the counter


xmlrpclib un proceso separado para mantener el "dictado compartido": simplemente use, por ejemplo, xmlrpclib para hacer que esa pequeña cantidad de código esté disponible para los otros procesos, exponiendo a través de xmlrpclib, por ejemplo, una función que tome la key, increment para realizar el incremento y uno que tome solo el key y devolver el valor, con detalles semánticos (hay un valor predeterminado para las claves que faltan, etc., etc.) según las necesidades de su aplicación.

Luego, puede utilizar cualquier enfoque que desee para implementar el proceso dedicado de dictado compartido: desde un servidor de un solo hilo con un dict simple en la memoria, a un DB de sqlite simple, etc. Le sugiero que comience con el código " tan simple como pueda salirse con la suya "(dependiendo de si necesita un dictado compartido persistente , o la persistencia no es necesaria para usted), luego mida y optimice según sea necesario.