procesos multitarea ejemplos crear concurrentes con python logging multiprocessing

ejemplos - multitarea en python



¿Cómo debo registrarme al usar multiprocesamiento en Python? (19)

En este momento tengo un módulo central en un marco que genera múltiples procesos utilizando el módulo de multiprocessing Python 2.6. Debido a que usa multiprocessing , existe un registro compatible con multiprocesamiento a nivel de módulo, LOG = multiprocessing.get_logger() . De acuerdo con los documentos , este registrador tiene bloqueos de proceso compartido para que usted no distorsione las cosas en sys.stderr (o en cualquier manejador de archivos) al tener múltiples procesos escribiendo en él simultáneamente.

El problema que tengo ahora es que los otros módulos en el marco no son compatibles con multiprocesamiento. Tal como lo veo, necesito hacer que todas las dependencias de este módulo central utilicen el registro con multiprocesamiento. Eso es molesto dentro del marco, y mucho menos para todos los clientes del framework. ¿Hay alternativas en las que no estoy pensando?


¿Qué le parece delegar todo el registro en otro proceso que lee todas las entradas de registro de una cola?

LOG_QUEUE = multiprocessing.JoinableQueue() class CentralLogger(multiprocessing.Process): def __init__(self, queue): multiprocessing.Process.__init__(self) self.queue = queue self.log = logger.getLogger(''some_config'') self.log.info("Started Central Logging process") def run(self): while True: log_level, message = self.queue.get() if log_level is None: self.log.info("Shutting down Central Logging process") break else: self.log.log(log_level, message) central_logger_process = CentralLogger(LOG_QUEUE) central_logger_process.start()

Simplemente comparta LOG_QUEUE a través de cualquiera de los mecanismos multiproceso o incluso herencia y ¡todo funciona bien!


A continuación hay otra solución con un enfoque en la simplicidad para cualquier otra persona (como yo) que esté aquí desde Google. ¡La tala debe ser fácil! Solo para 3.2 o superior.

import multiprocessing import logging from logging.handlers import QueueHandler, QueueListener import time import random def f(i): time.sleep(random.uniform(.01, .05)) logging.info(''function called with {} in worker thread.''.format(i)) time.sleep(random.uniform(.01, .05)) return i def worker_init(q): # all records from worker processes go to qh and then into q qh = QueueHandler(q) logger = logging.getLogger() logger.setLevel(logging.DEBUG) logger.addHandler(qh) def logger_init(): q = multiprocessing.Queue() # this is the handler for all log records handler = logging.StreamHandler() handler.setFormatter(logging.Formatter("%(levelname)s: %(asctime)s - %(process)s - %(message)s")) # ql gets records from the queue and sends them to the handler ql = QueueListener(q, handler) ql.start() logger = logging.getLogger() logger.setLevel(logging.DEBUG) # add the handler to the logger so records from this process are handled logger.addHandler(handler) return ql, q def main(): q_listener, q = logger_init() logging.info(''hello from main thread'') pool = multiprocessing.Pool(4, worker_init, [q]) for result in pool.map(f, range(10)): pass pool.close() pool.join() q_listener.stop() if __name__ == ''__main__'': main()


A continuación se muestra una clase que se puede utilizar en el entorno de Windows, requiere ActivePython. También puede heredar para otros manejadores de registro (StreamHandler, etc.)

class SyncronizedFileHandler(logging.FileHandler): MUTEX_NAME = ''logging_mutex'' def __init__(self , *args , **kwargs): self.mutex = win32event.CreateMutex(None , False , self.MUTEX_NAME) return super(SyncronizedFileHandler , self ).__init__(*args , **kwargs) def emit(self, *args , **kwargs): try: win32event.WaitForSingleObject(self.mutex , win32event.INFINITE) ret = super(SyncronizedFileHandler , self ).emit(*args , **kwargs) finally: win32event.ReleaseMutex(self.mutex) return ret

Y aquí hay un ejemplo que demuestra el uso:

import logging import random , time , os , sys , datetime from string import letters import win32api , win32event from multiprocessing import Pool def f(i): time.sleep(random.randint(0,10) * 0.1) ch = random.choice(letters) logging.info( ch * 30) def init_logging(): '''''' initilize the loggers '''''' formatter = logging.Formatter("%(levelname)s - %(process)d - %(asctime)s - %(filename)s - %(lineno)d - %(message)s") logger = logging.getLogger() logger.setLevel(logging.INFO) file_handler = SyncronizedFileHandler(sys.argv[1]) file_handler.setLevel(logging.INFO) file_handler.setFormatter(formatter) logger.addHandler(file_handler) #must be called in the parent and in every worker process init_logging() if __name__ == ''__main__'': #multiprocessing stuff pool = Pool(processes=10) imap_result = pool.imap(f , range(30)) for i , _ in enumerate(imap_result): pass


Acabo de escribir un controlador de registro propio que solo alimenta todo al proceso principal a través de una tubería. Solo lo he probado durante diez minutos, pero parece funcionar bastante bien.

( Nota: Esto está codificado en forma rígida a RotatingFileHandler , que es mi propio caso de uso).

Actualización: @javier ahora mantiene este enfoque como un paquete disponible en Pypi - ver multiprocessing-logging en Pypi, github en https://github.com/jruere/multiprocessing-logging

Actualización: Implementación!

Esto ahora usa una cola para el manejo correcto de concurrencia, y también se recupera de los errores correctamente. He estado usando esto en producción durante varios meses, y la versión actual a continuación funciona sin problemas.

from logging.handlers import RotatingFileHandler import multiprocessing, threading, logging, sys, traceback class MultiProcessingLog(logging.Handler): def __init__(self, name, mode, maxsize, rotate): logging.Handler.__init__(self) self._handler = RotatingFileHandler(name, mode, maxsize, rotate) self.queue = multiprocessing.Queue(-1) t = threading.Thread(target=self.receive) t.daemon = True t.start() def setFormatter(self, fmt): logging.Handler.setFormatter(self, fmt) self._handler.setFormatter(fmt) def receive(self): while True: try: record = self.queue.get() self._handler.emit(record) except (KeyboardInterrupt, SystemExit): raise except EOFError: break except: traceback.print_exc(file=sys.stderr) def send(self, s): self.queue.put_nowait(s) def _format_record(self, record): # ensure that exc_info and args # have been stringified. Removes any chance of # unpickleable things inside and possibly reduces # message size sent over the pipe if record.args: record.msg = record.msg % record.args record.args = None if record.exc_info: dummy = self.format(record) record.exc_info = None return record def emit(self, record): try: s = self._format_record(record) self.send(s) except (KeyboardInterrupt, SystemExit): raise except: self.handleError(record) def close(self): self._handler.close() logging.Handler.close(self)


Aquí está mi truco simple / solución alternativa ... no el más completo, pero fácilmente modificable y más fácil de leer y entender que otras respuestas que encontré antes de escribir esto:

import logging import multiprocessing class FakeLogger(object): def __init__(self, q): self.q = q def info(self, item): self.q.put(''INFO - {}''.format(item)) def debug(self, item): self.q.put(''DEBUG - {}''.format(item)) def critical(self, item): self.q.put(''CRITICAL - {}''.format(item)) def warning(self, item): self.q.put(''WARNING - {}''.format(item)) def some_other_func_that_gets_logger_and_logs(num): # notice the name get''s discarded # of course you can easily add this to your FakeLogger class local_logger = logging.getLogger(''local'') local_logger.info(''Hey I am logging this: {} and working on it to make this {}!''.format(num, num*2)) local_logger.debug(''hmm, something may need debugging here'') return num*2 def func_to_parallelize(data_chunk): # unpack our args the_num, logger_q = data_chunk # since we''re now in a new process, let''s monkeypatch the logging module logging.getLogger = lambda name=None: FakeLogger(logger_q) # now do the actual work that happens to log stuff too new_num = some_other_func_that_gets_logger_and_logs(the_num) return (the_num, new_num) if __name__ == ''__main__'': multiprocessing.freeze_support() m = multiprocessing.Manager() logger_q = m.Queue() # we have to pass our data to be parallel-processed # we also need to pass the Queue object so we can retrieve the logs parallelable_data = [(1, logger_q), (2, logger_q)] # set up a pool of processes so we can take advantage of multiple CPU cores pool_size = multiprocessing.cpu_count() * 2 pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=4) worker_output = pool.map(func_to_parallelize, parallelable_data) pool.close() # no more tasks pool.join() # wrap up current tasks # get the contents of our FakeLogger object while not logger_q.empty(): print logger_q.get() print ''worker output contained: {}''.format(worker_output)


Dado que podemos representar el registro de multiprocesos como muchos editores y un suscriptor (oyente), utilizar ZeroMQ para implementar los mensajes de PUB-SUB es una opción.

Además, el módulo PyZMQ , los enlaces de Python para ZMQ, implementa PUBHandler , que es un objeto para publicar mensajes de registro en un socket zmq.PUB.

Existe una solución en la web , para el registro centralizado desde una aplicación distribuida utilizando PyZMQ y PUBHandler, que se puede adoptar fácilmente para trabajar localmente con múltiples procesos de publicación.

formatters = { logging.DEBUG: logging.Formatter("[%(name)s] %(message)s"), logging.INFO: logging.Formatter("[%(name)s] %(message)s"), logging.WARN: logging.Formatter("[%(name)s] %(message)s"), logging.ERROR: logging.Formatter("[%(name)s] %(message)s"), logging.CRITICAL: logging.Formatter("[%(name)s] %(message)s") } # This one will be used by publishing processes class PUBLogger: def __init__(self, host, port=config.PUBSUB_LOGGER_PORT): self._logger = logging.getLogger(__name__) self._logger.setLevel(logging.DEBUG) self.ctx = zmq.Context() self.pub = self.ctx.socket(zmq.PUB) self.pub.connect(''tcp://{0}:{1}''.format(socket.gethostbyname(host), port)) self._handler = PUBHandler(self.pub) self._handler.formatters = formatters self._logger.addHandler(self._handler) @property def logger(self): return self._logger # This one will be used by listener process class SUBLogger: def __init__(self, ip, output_dir="", port=config.PUBSUB_LOGGER_PORT): self.output_dir = output_dir self._logger = logging.getLogger() self._logger.setLevel(logging.DEBUG) self.ctx = zmq.Context() self._sub = self.ctx.socket(zmq.SUB) self._sub.bind(''tcp://*:{1}''.format(ip, port)) self._sub.setsockopt(zmq.SUBSCRIBE, "") handler = handlers.RotatingFileHandler(os.path.join(output_dir, "client_debug.log"), "w", 100 * 1024 * 1024, 10) handler.setLevel(logging.DEBUG) formatter = logging.Formatter("%(asctime)s;%(levelname)s - %(message)s") handler.setFormatter(formatter) self._logger.addHandler(handler) @property def sub(self): return self._sub @property def logger(self): return self._logger # And that''s the way we actually run things: # Listener process will forever listen on SUB socket for incoming messages def run_sub_logger(ip, event): sub_logger = SUBLogger(ip) while not event.is_set(): try: topic, message = sub_logger.sub.recv_multipart(flags=zmq.NOBLOCK) log_msg = getattr(logging, topic.lower()) log_msg(message) except zmq.ZMQError as zmq_error: if zmq_error.errno == zmq.EAGAIN: pass # Publisher processes loggers should be initialized as follows: class Publisher: def __init__(self, stop_event, proc_id): self.stop_event = stop_event self.proc_id = proc_id self._logger = pub_logger.PUBLogger(''127.0.0.1'').logger def run(self): self._logger.info("{0} - Sending message".format(proc_id)) def run_worker(event, proc_id): worker = Publisher(event, proc_id) worker.run() # Starting subscriber process so we won''t loose publisher''s messages sub_logger_process = Process(target=run_sub_logger, args=(''127.0.0.1''), stop_event,)) sub_logger_process.start() #Starting publisher processes for i in range(MAX_WORKERS_PER_CLIENT): processes.append(Process(target=run_worker, args=(stop_event, i,))) for p in processes: p.start()


El libro de cocina de inicio de sesión de Python tiene dos ejemplos completos aquí: https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes

Utiliza QueueHandler , que es nuevo en Python 3.2 pero fácil de copiar en tu propio código (como hice yo mismo en python 2.7) desde: https://gist.github.com/vsajip/591589

Cada proceso pone su registro en la Queue , y luego un subproceso o proceso de listener (se proporciona un ejemplo para cada uno) los recoge y los escribe todos en un archivo, sin riesgo de corrupción o desfiguración.


La única forma de lidiar con esto de forma no intrusiva es:

  1. Genere cada proceso de trabajo de modo que su registro vaya a un descriptor de archivo diferente (al disco o a la tubería). Idealmente, todas las entradas de registro deben tener una marca de tiempo.
  2. El proceso de su controlador puede hacer una de las siguientes cosas:
    • Si usa archivos de disco: combine los archivos de registro al final de la ejecución, ordenados por marca de tiempo
    • Si usa tubos (recomendado): combine entradas de registro sobre la marcha desde todas las tuberías, en un archivo de registro central. (Por ejemplo, select periódicamente de los descriptores de archivos de las tuberías, realice merge-sort en las entradas de registro disponibles, y al mismo nivel que el registro centralizado. Repita).

Me gustó la respuesta de Zzzeek. Simplemente sustituiría a Pipe for a Queue, ya que si hay varios procesos / subprocesos que usan el mismo extremo de canal para generar mensajes de registro, quedarán distorsionados.


Otra alternativa más podrían ser los distintos manejadores de registro no basados ​​en archivos en el paquete de logging :

  • SocketHandler
  • DatagramHandler
  • SyslogHandler

(y otros)

De esta forma, podría tener fácilmente un daemon de registro en algún lugar donde pueda escribir de manera segura y manejar los resultados correctamente. (Por ejemplo, un servidor de socket simple que simplemente deshace el mensaje y lo emite a su propio manejador de archivos rotativo).

SyslogHandler se encargaría de esto por ti. Por supuesto, podría usar su propia instancia de syslog , no la del sistema.


También me gusta la respuesta de zzzeek, ​​pero Andre tiene razón al decir que se requiere una cola para evitar que se doblen. Tuve un poco de suerte con la tubería, pero vi que el cableado era algo esperado. Implementarlo resultó ser más difícil de lo que pensaba, particularmente debido a la ejecución en Windows, donde hay algunas restricciones adicionales sobre variables globales y cosas así (ver: ¿Cómo se implementa Python Multiprocesamiento en Windows? )

Pero, finalmente lo conseguí trabajando. Este ejemplo probablemente no sea perfecto, por lo que los comentarios y sugerencias son bienvenidos. Tampoco es compatible con la configuración del formateador ni nada que no sea el registrador de raíz. Básicamente, debe reiniciar el registrador en cada uno de los procesos del grupo con la cola y configurar los otros atributos en el registrador.

De nuevo, cualquier sugerencia sobre cómo mejorar el código es bienvenida. Ciertamente todavía no conozco todos los trucos de Python :-)

import multiprocessing, logging, sys, re, os, StringIO, threading, time, Queue class MultiProcessingLogHandler(logging.Handler): def __init__(self, handler, queue, child=False): logging.Handler.__init__(self) self._handler = handler self.queue = queue # we only want one of the loggers to be pulling from the queue. # If there is a way to do this without needing to be passed this # information, that would be great! if child == False: self.shutdown = False self.polltime = 1 t = threading.Thread(target=self.receive) t.daemon = True t.start() def setFormatter(self, fmt): logging.Handler.setFormatter(self, fmt) self._handler.setFormatter(fmt) def receive(self): #print "receive on" while (self.shutdown == False) or (self.queue.empty() == False): # so we block for a short period of time so that we can # check for the shutdown cases. try: record = self.queue.get(True, self.polltime) self._handler.emit(record) except Queue.Empty, e: pass def send(self, s): # send just puts it in the queue for the server to retrieve self.queue.put(s) def _format_record(self, record): ei = record.exc_info if ei: dummy = self.format(record) # just to get traceback text into record.exc_text record.exc_info = None # to avoid Unpickleable error return record def emit(self, record): try: s = self._format_record(record) self.send(s) except (KeyboardInterrupt, SystemExit): raise except: self.handleError(record) def close(self): time.sleep(self.polltime+1) # give some time for messages to enter the queue. self.shutdown = True time.sleep(self.polltime+1) # give some time for the server to time out and see the shutdown def __del__(self): self.close() # hopefully this aids in orderly shutdown when things are going poorly. def f(x): # just a logging command... logging.critical(''function number: '' + str(x)) # to make some calls take longer than others, so the output is "jumbled" as real MP programs are. time.sleep(x % 3) def initPool(queue, level): """ This causes the logging module to be initialized with the necessary info in pool threads to work correctly. """ logging.getLogger('''').addHandler(MultiProcessingLogHandler(logging.StreamHandler(), queue, child=True)) logging.getLogger('''').setLevel(level) if __name__ == ''__main__'': stream = StringIO.StringIO() logQueue = multiprocessing.Queue(100) handler= MultiProcessingLogHandler(logging.StreamHandler(stream), logQueue) logging.getLogger('''').addHandler(handler) logging.getLogger('''').setLevel(logging.DEBUG) logging.debug(''starting main'') # when bulding the pool on a Windows machine we also have to init the logger in all the instances with the queue and the level of logging. pool = multiprocessing.Pool(processes=10, initializer=initPool, initargs=[logQueue, logging.getLogger('''').getEffectiveLevel()] ) # start worker processes pool.map(f, range(0,50)) pool.close() logging.debug(''done'') logging.shutdown() print "stream output is:" print stream.getvalue()


Tengo una solución que es similar a la de ironhacker, excepto que utilizo logging.exception en algunos de mis códigos y descubrí que necesitaba formatear la excepción antes de pasarla de nuevo a través de la cola, ya que los tracebacks no son elegibles:

class QueueHandler(logging.Handler): def __init__(self, queue): logging.Handler.__init__(self) self.queue = queue def emit(self, record): if record.exc_info: # can''t pass exc_info across processes so just format now record.exc_text = self.formatException(record.exc_info) record.exc_info = None self.queue.put(record) def formatException(self, ei): sio = cStringIO.StringIO() traceback.print_exception(ei[0], ei[1], ei[2], None, sio) s = sio.getvalue() sio.close() if s[-1] == "/n": s = s[:-1] return s


Todas las soluciones actuales están demasiado acopladas a la configuración de registro mediante el uso de un controlador. Mi solución tiene la siguiente arquitectura y características:

  • Puede usar cualquier configuración de registro que desee
  • El registro se realiza en un hilo de daemon
  • Apagado seguro del daemon mediante el uso de un administrador de contexto
  • La comunicación con el hilo de registro se realiza mediante multiprocessing.Queue
  • En los subprocesos, logging.Logger (y las instancias ya definidas) son parcheados para enviar todos los registros a la cola
  • Nuevo : rastreo y mensaje de formato antes de enviar a la cola para evitar errores de decapado

El código con ejemplos de uso y resultados se puede encontrar en la siguiente Gist: https://gist.github.com/schlamar/7003737


Una variante de las otras que mantiene el hilo de registro y cola separados.

"""sample code for logging in subprocesses using multiprocessing * Little handler magic - The main process uses loggers and handlers as normal. * Only a simple handler is needed in the subprocess that feeds the queue. * Original logger name from subprocess is preserved when logged in main process. * As in the other implementations, a thread reads the queue and calls the handlers. Except in this implementation, the thread is defined outside of a handler, which makes the logger definitions simpler. * Works with multiple handlers. If the logger in the main process defines multiple handlers, they will all be fed records generated by the subprocesses loggers. tested with Python 2.5 and 2.6 on Linux and Windows """ import os import sys import time import traceback import multiprocessing, threading, logging, sys DEFAULT_LEVEL = logging.DEBUG formatter = logging.Formatter("%(levelname)s: %(asctime)s - %(name)s - %(process)s - %(message)s") class SubProcessLogHandler(logging.Handler): """handler used by subprocesses It simply puts items on a Queue for the main process to log. """ def __init__(self, queue): logging.Handler.__init__(self) self.queue = queue def emit(self, record): self.queue.put(record) class LogQueueReader(threading.Thread): """thread to write subprocesses log records to main process log This thread reads the records written by subprocesses and writes them to the handlers defined in the main process''s handlers. """ def __init__(self, queue): threading.Thread.__init__(self) self.queue = queue self.daemon = True def run(self): """read from the queue and write to the log handlers The logging documentation says logging is thread safe, so there shouldn''t be contention between normal logging (from the main process) and this thread. Note that we''re using the name of the original logger. """ # Thanks Mike for the error checking code. while True: try: record = self.queue.get() # get the logger for this record logger = logging.getLogger(record.name) logger.callHandlers(record) except (KeyboardInterrupt, SystemExit): raise except EOFError: break except: traceback.print_exc(file=sys.stderr) class LoggingProcess(multiprocessing.Process): def __init__(self, queue): multiprocessing.Process.__init__(self) self.queue = queue def _setupLogger(self): # create the logger to use. logger = logging.getLogger(''test.subprocess'') # The only handler desired is the SubProcessLogHandler. If any others # exist, remove them. In this case, on Unix and Linux the StreamHandler # will be inherited. for handler in logger.handlers: # just a check for my sanity assert not isinstance(handler, SubProcessLogHandler) logger.removeHandler(handler) # add the handler handler = SubProcessLogHandler(self.queue) handler.setFormatter(formatter) logger.addHandler(handler) # On Windows, the level will not be inherited. Also, we could just # set the level to log everything here and filter it in the main # process handlers. For now, just set it from the global default. logger.setLevel(DEFAULT_LEVEL) self.logger = logger def run(self): self._setupLogger() logger = self.logger # and here goes the logging p = multiprocessing.current_process() logger.info(''hello from process %s with pid %s'' % (p.name, p.pid)) if __name__ == ''__main__'': # queue used by the subprocess loggers queue = multiprocessing.Queue() # Just a normal logger logger = logging.getLogger(''test'') handler = logging.StreamHandler() handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(DEFAULT_LEVEL) logger.info(''hello from the main process'') # This thread will read from the subprocesses and write to the main log''s # handlers. log_queue_reader = LogQueueReader(queue) log_queue_reader.start() # create the processes. for i in range(10): p = LoggingProcess(queue) p.start() # The way I read the multiprocessing warning about Queue, joining a # process before it has finished feeding the Queue can cause a deadlock. # Also, Queue.empty() is not realiable, so just make sure all processes # are finished. # active_children joins subprocesses when they''re finished. while multiprocessing.active_children(): time.sleep(.1)


solo publique en alguna parte su instancia del registrador. De esta forma, los otros módulos y clientes pueden usar su API para obtener el registrador sin tener que import multiprocessing .



If you have deadlocks occurring in a combination of locks, threads and forks in the logging module, that is reported in bug report 6721 (see also related SO question ).

There is a small fixup solution posted here .

However, that will just fix any potential deadlocks in logging . That will not fix that things are maybe garbled up. See the other answers presented here.


One of the alternatives is to write the mutliprocessing logging to a known file and register an atexit handler to join on those processes read it back on stderr; however, you won''t get a real-time flow to the output messages on stderr that way.


To my children who meet the same issue in decades and found this question on this site I leave this answer.

Simplicity vs overcomplicating. Just use other tools. Python is awesome, but it was not designed to do some things.

The following snippet for logrotate daemon works for me and does not overcomplicate things. Schedule it to run hourly and

/var/log/mylogfile.log { size 1 copytruncate create rotate 10 missingok postrotate timeext=`date -d ''1 hour ago'' "+%Y-%m-%d_%H"` mv /var/log/mylogfile.log.1 /var/log/mylogfile-$timeext.log endscript }

This is how I install it (symlinks do not work for logrotate):

sudo cp /directpath/config/logrotate/myconfigname /etc/logrotate.d/myconfigname sudo cp /etc/cron.daily/logrotate /etc/cron.hourly/logrotate