tutorial and python multithreading python-multithreading

python multithreading and multiprocessing tutorial



Python-agregando al mismo archivo desde mĂșltiples hilos (3)

Estoy escribiendo una aplicación que agrega líneas al mismo archivo desde varios subprocesos.

Tengo un problema en el que algunas líneas se agregan sin una nueva línea.

¿Alguna solución para esto?

class PathThread(threading.Thread): def __init__(self, queue): threading.Thread.__init__(self) self.queue = queue def printfiles(self, p): for path, dirs, files in os.walk(p): for f in files: print(f, file=output) def run(self): while True: path = self.queue.get() self.printfiles(path) self.queue.task_done() pathqueue = Queue.Queue() paths = getThisFromSomeWhere() output = codecs.open(''file'', ''a'') # spawn threads for i in range(0, 5): t = PathThread(pathqueue) t.setDaemon(True) t.start() # add paths to queue for path in paths: pathqueue.put(path) # wait for queue to get empty pathqueue.join()


¿Y tal vez algunas nuevas líneas donde no deberían estar?

Debe tener en cuenta el hecho de que no se puede acceder a un recurso compartido por más de un subproceso a la vez o, de lo contrario, podrían producirse consecuencias imprevisibles (se denomina "operaciones atómicas" al utilizar subprocesos).

Eche un vistazo a esta página para una pequeña intuición: Mecanismos de sincronización de subprocesos en Python


La solución es escribir en el archivo en un solo hilo.

import Queue # or queue in Python 3 import threading class PrintThread(threading.Thread): def __init__(self, queue): threading.Thread.__init__(self) self.queue = queue def printfiles(self, p): for path, dirs, files in os.walk(p): for f in files: print(f, file=output) def run(self): while True: result = self.queue.get() self.printfiles(result) self.queue.task_done() class ProcessThread(threading.Thread): def __init__(self, in_queue, out_queue): threading.Thread.__init__(self) self.in_queue = in_queue self.out_queue = out_queue def run(self): while True: path = self.in_queue.get() result = self.process(path) self.out_queue.put(result) self.in_queue.task_done() def process(self, path): # Do the processing job here pathqueue = Queue.Queue() resultqueue = Queue.Queue() paths = getThisFromSomeWhere() output = codecs.open(''file'', ''a'') # spawn threads to process for i in range(0, 5): t = ProcessThread(pathqueue, resultqueue) t.setDaemon(True) t.start() # spawn threads to print t = PrintThread(resultqueue) t.setDaemon(True) t.start() # add paths to queue for path in paths: pathqueue.put(path) # wait for queue to get empty pathqueue.join() resultqueue.join()


el hecho de que nunca vea texto mezclado en la misma línea o nuevas líneas en el medio de una línea es una pista que realmente no necesita sincronizar adjuntando al archivo. el problema es que utiliza imprimir para escribir en un solo identificador de archivo. sospecho que la print realidad está haciendo 2 operaciones en el identificador de archivo en una llamada y esas operaciones están compitiendo entre los hilos. básicamente print está haciendo algo como:

file_handle.write(''whatever_text_you_pass_it'') file_handle.write(os.linesep)

y debido a que diferentes subprocesos hacen esto simultáneamente en el mismo identificador de archivo, a veces un subproceso recibe la primera escritura y el otro subproceso recibe la primera escritura y luego obtiene dos retornos de carro seguidos. o realmente cualquier permutación de estos.

la forma más sencilla de evitar esto es dejar de usar la print y simplemente write directamente. prueba algo como esto:

output.write(f + os.linesep)

Esto todavía me parece peligroso. No estoy seguro de qué garantías puede esperar con todos los subprocesos que utilizan el mismo objeto de identificador de archivo y compiten por su búfer interno. Personalmente identifiqué todo el problema y solo haga que cada hilo tenga su propio identificador de archivo. También tenga en cuenta que esto funciona porque el valor predeterminado para los vaciados del búfer de escritura es el búfer de línea, por lo que cuando hace un os.linesep del archivo finaliza en un os.linesep . para forzarlo a usar un búfer de línea envía un 1 como el tercer argumento de open . Puedes probarlo así:

#!/usr/bin/env python import os import sys import threading def hello(file_name, message, count): with open(file_name, ''a'', 1) as f: for i in range(0, count): f.write(message + os.linesep) if __name__ == ''__main__'': #start a file with open(''some.txt'', ''w'') as f: f.write(''this is the beginning'' + os.linesep) #make 10 threads write a million lines to the same file at the same time threads = [] for i in range(0, 10): threads.append(threading.Thread(target=hello, args=(''some.txt'', ''hey im thread %d'' % i, 1000000))) threads[-1].start() for t in threads: t.join() #check what the heck the file had uniq_lines = set() with open(''some.txt'', ''r'') as f: for l in f: uniq_lines.add(l) for u in uniq_lines: sys.stdout.write(u)

La salida se ve así:

hey im thread 6 hey im thread 7 hey im thread 9 hey im thread 8 hey im thread 3 this is the beginning hey im thread 5 hey im thread 4 hey im thread 1 hey im thread 0 hey im thread 2