xticks barplot python io multiprocessing mutex

barplot - Multiproceso de Python escribiendo de forma segura en un archivo



pandas plot (2)

@ GP89 mencionó una buena solución. Use una cola para enviar las tareas de escritura a un proceso dedicado que tenga acceso exclusivo de escritura al archivo. Todos los demás trabajadores tienen acceso de solo lectura. Esto eliminará las colisiones. Aquí hay un ejemplo que usa apply_async, pero también funcionará con el mapa:

import multiprocessing as mp import time fn = ''c:/temp/temp.txt'' def worker(arg, q): ''''''stupidly simulates long running process'''''' start = time.clock() s = ''this is a test'' txt = s for i in xrange(200000): txt += s done = time.clock() - start with open(fn, ''rb'') as f: size = len(f.read()) res = ''Process'' + str(arg), str(size), done q.put(res) return res def listener(q): ''''''listens for messages on the q, writes to file. '''''' f = open(fn, ''wb'') while 1: m = q.get() if m == ''kill'': f.write(''killed'') break f.write(str(m) + ''/n'') f.flush() f.close() def main(): #must use Manager queue here, or will not work manager = mp.Manager() q = manager.Queue() pool = mp.Pool(mp.cpu_count() + 2) #put listener to work first watcher = pool.apply_async(listener, (q,)) #fire off workers jobs = [] for i in range(80): job = pool.apply_async(worker, (i, q)) jobs.append(job) # collect results from the workers through the pool result queue for job in jobs: job.get() #now we are done, kill the listener q.put(''kill'') pool.close() if __name__ == "__main__": main()

Estoy tratando de resolver un gran problema numérico que implica muchos subproblemas, y estoy usando el módulo de multiprocesamiento de Python (específicamente Pool.map) para dividir diferentes subproblemas independientes en diferentes núcleos. Cada subproblema involucra el cálculo de muchos subproblemas secundarios, y estoy tratando de memorizar efectivamente estos resultados almacenándolos en un archivo si aún no han sido calculados por ningún proceso, de lo contrario omita el cálculo y simplemente lea los resultados del archivo.

Tengo problemas de simultaneidad con los archivos: diferentes procesos a veces comprueban si ya se ha calculado un subproblema secundario (al buscar el archivo donde se almacenarán los resultados), ver que no ha ejecutado el cálculo, luego intente escribir los resultados en el mismo archivo al mismo tiempo. ¿Cómo evito escribir colisiones como esta?


Me parece que debe usar el Administrador para guardar temporalmente sus resultados en una lista y luego escribir los resultados de la lista en un archivo. Además, use starmap para pasar el objeto que desea procesar y la lista administrada. El primer paso es construir el parámetro que se pasará a starmap, que incluye la lista administrada.

from multiprocessing import Manager from multiprocessing import Pool import pandas as pd``` def worker(row, param): # do something here and then append it to row x = param**2 row.append(x) if __name__ == ''__main__'': pool_parameter = [] # list of objects to process with Manager() as mgr: row = mgr.list([]) # build list of parameters to send to starmap for param in pool_parameter: params.append([row,param]) with Pool() as p: p.starmap(worker, params)

A partir de este punto, debe decidir cómo manejará la lista. Si tiene toneladas de RAM y un gran conjunto de datos, siéntase libre de concatenar usando pandas. Luego puede guardar el archivo muy fácilmente como un csv o un pickle.

df = pd.concat(row, ignore_index=True) df.to_pickle(''data.pickle'') df.to_csv(''data.csv'')