procesos proceso paralelo multitarea multiprocesamiento crear concurrentes con python multiprocessing pool keyboardinterrupt

proceso - python 2 multiprocessing



El teclado interrumpe con el grupo de multiprocesamiento de python (9)

¿Cómo puedo manejar los eventos de KeyboardInterrupt con los Pools de multiprocesamiento de python? Aquí hay un ejemplo simple:

from multiprocessing import Pool from time import sleep from sys import exit def slowly_square(i): sleep(1) return i*i def go(): pool = Pool(8) try: results = pool.map(slowly_square, range(40)) except KeyboardInterrupt: # **** THIS PART NEVER EXECUTES. **** pool.terminate() print "You cancelled the program!" sys.exit(1) print "/nFinally, here are the results: ", results if __name__ == "__main__": go()

Cuando ejecuto el código anterior, KeyboardInterrupt se levanta cuando presiono ^C , pero el proceso simplemente se cuelga en ese punto y tengo que matarlo externamente.

Quiero poder presionar ^C en cualquier momento y hacer que todos los procesos salgan con gracia.


Aunque parezca extraño, parece que también tienes que manejar KeyboardInterrupt en los niños. Hubiera esperado que esto funcionara como está escrito ... intente cambiar slowly_square a:

def slowly_square(i): try: sleep(1) return i * i except KeyboardInterrupt: print ''You EVIL bastard!'' return 0

Eso debería funcionar como esperabas.



Este es un error de Python. Cuando se espera una condición en threading.Condition.wait (), KeyboardInterrupt nunca se envía. Repro:

import threading cond = threading.Condition(threading.Lock()) cond.acquire() cond.wait(None) print "done"

La excepción KeyboardInterrupt no se entregará hasta que wait () vuelva y nunca vuelva, por lo que la interrupción nunca ocurre. KeyboardInterrupt casi seguramente interrumpirá una espera de condición.

Tenga en cuenta que esto no ocurre si se especifica un tiempo de espera; cond.wait (1) recibirá la interrupción de inmediato. Entonces, una solución es especificar un tiempo de espera. Para hacer eso, reemplace

results = pool.map(slowly_square, range(40))

con

results = pool.map_async(slowly_square, range(40)).get(9999999)

o similar.


La respuesta votada no aborda el problema central, sino un efecto secundario similar.

Jesse Noller, el autor de la biblioteca de multiprocesamiento, explica cómo lidiar correctamente con CTRL + C al usar multiprocessing.Pool en una publicación de blog anterior .

import signal from multiprocessing import Pool def initializer(): """Ignore CTRL+C in the worker process.""" signal.signal(signal.SIGINT, signal.SIG_IGN) pool = Pool(initializer=initializer) try: pool.map(perform_download, dowloads) except KeyboardInterrupt: pool.terminate() pool.join()


Parece que hay dos problemas que hacen que las excepciones resulten molestas al multiprocesamiento. La primera (señalada por Glenn) es que debe usar map_async con un tiempo de espera en lugar de un map para obtener una respuesta inmediata (es decir, no termine de procesar toda la lista). El segundo (señalado por Andrey) es que el multiprocesamiento no SystemExit excepciones que no heredan de Exception (p. SystemExit ., SystemExit ). Así que aquí está mi solución que trata con ambos:

import sys import functools import traceback import multiprocessing def _poolFunctionWrapper(function, arg): """Run function under the pool Wrapper around function to catch exceptions that don''t inherit from Exception (which aren''t caught by multiprocessing, so that you end up hitting the timeout). """ try: return function(arg) except: cls, exc, tb = sys.exc_info() if issubclass(cls, Exception): raise # No worries # Need to wrap the exception with something multiprocessing will recognise import traceback print "Unhandled exception %s (%s):/n%s" % (cls.__name__, exc, traceback.format_exc()) raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc)) def _runPool(pool, timeout, function, iterable): """Run the pool Wrapper around pool.map_async, to handle timeout. This is required so as to trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see http://.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool Further wraps the function in _poolFunctionWrapper to catch exceptions that don''t inherit from Exception. """ return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout) def myMap(function, iterable, numProcesses=1, timeout=9999): """Run the function on the iterable, optionally with multiprocessing""" if numProcesses > 1: pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1) mapFunc = functools.partial(_runPool, pool, timeout) else: pool = None mapFunc = map results = mapFunc(function, iterable) if pool is not None: pool.close() pool.join() return results


Por alguna razón, solo las excepciones heredadas de la clase Exception base se manejan normalmente. Como solución alternativa, puede volver a subir su KeyboardInterrupt como una instancia de Exception :

from multiprocessing import Pool import time class KeyboardInterruptError(Exception): pass def f(x): try: time.sleep(x) return x except KeyboardInterrupt: raise KeyboardInterruptError() def main(): p = Pool(processes=4) try: print ''starting the pool map'' print p.map(f, range(10)) p.close() print ''pool map complete'' except KeyboardInterrupt: print ''got ^C while pool mapping, terminating the pool'' p.terminate() print ''pool is terminated'' except Exception, e: print ''got exception: %r, terminating the pool'' % (e,) p.terminate() print ''pool is terminated'' finally: print ''joining pool processes'' p.join() print ''join complete'' print ''the end'' if __name__ == ''__main__'': main()

Normalmente obtendrías la siguiente salida:

staring the pool map [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] pool map complete joining pool processes join complete the end

Entonces, si presionas ^C , obtendrás:

staring the pool map got ^C while pool mapping, terminating the pool pool is terminated joining pool processes join complete the end



Según lo que he descubierto recientemente, la mejor solución es configurar los procesos de trabajo para ignorar por completo a SIGINT, y limitar todo el código de limpieza al proceso principal. Esto soluciona el problema de los procesos de trabajo inactivo y ocupado, y no requiere código de manejo de errores en los procesos secundarios.

import signal ... def init_worker(): signal.signal(signal.SIGINT, signal.SIG_IGN) ... def main() pool = multiprocessing.Pool(size, init_worker) ... except KeyboardInterrupt: pool.terminate() pool.join()

La explicación y el código de ejemplo completo se pueden encontrar en http://noswap.com/blog/python-multiprocessing-keyboardinterrupt/ y http://github.com/jreese/multiprocessing-keyboardinterrupt respectivamente.


Soy un novato en Python. Estaba buscando respuestas y tropecé con este y algunos otros blogs y videos de youtube. He intentado copiar pegar el código del autor anterior y reproducirlo en mi pitón 2.7.13 en Windows 7 de 64 bits. Está cerca de lo que quiero lograr.

Hice que mi hijo procese para ignorar el ControlC y hacer que el proceso principal finalice. Parece que eludir el proceso del niño evita este problema para mí.

#!/usr/bin/python from multiprocessing import Pool from time import sleep from sys import exit def slowly_square(i): try: print "<slowly_square> Sleeping and later running a square calculation..." sleep(1) return i * i except KeyboardInterrupt: print "<child processor> Don''t care if you say CtrlC" pass def go(): pool = Pool(8) try: results = pool.map(slowly_square, range(40)) except KeyboardInterrupt: # *** THIS PART NEVER EXECUTES. *** :( pool.terminate() pool.close() print "You cancelled the program!" exit(1) print "Finally, here are the results", results if __name__ == ''__main__'': go()