psutil - Procesamiento de un solo archivo desde múltiples procesos en python
psutil python (3)
Tengo un único archivo de texto grande en el que quiero procesar cada línea (realizar algunas operaciones) y almacenarlas en una base de datos. Dado que un solo programa simple tarda demasiado, quiero que se realice a través de múltiples procesos o hilos. Cada hilo / proceso debe leer los datos DIFERENTES (diferentes líneas) de ese único archivo y hacer algunas operaciones en su pedazo de datos (líneas) y ponerlos en la base de datos para que al final, tenga todos los datos procesados y mi la base de datos se descarga con los datos que necesito.
Pero no puedo descifrar cómo abordar esto.
Aquí hay un ejemplo realmente estúpido que cociné:
import os.path
import multiprocessing
def newlinebefore(f,n):
f.seek(n)
c=f.read(1)
while c!=''/n'' and n > 0:
n-=1
f.seek(n)
c=f.read(1)
f.seek(n)
return n
filename=''gpdata.dat'' #your filename goes here.
fsize=os.path.getsize(filename) #size of file (in bytes)
#break the file into 20 chunks for processing.
nchunks=20
initial_chunks=range(1,fsize,fsize/nchunks)
#You could also do something like:
#initial_chunks=range(1,fsize,max_chunk_size_in_bytes) #this should work too.
with open(filename,''r'') as f:
start_byte=sorted(set([newlinebefore(f,i) for i in initial_chunks]))
end_byte=[i-1 for i in start_byte] [1:] + [None]
def process_piece(filename,start,end):
with open(filename,''r'') as f:
f.seek(start+1)
if(end is None):
text=f.read()
else:
nbytes=end-start+1
text=f.read(nbytes)
# process text here. createing some object to be returned
# You could wrap text into a StringIO object if you want to be able to
# read from it the way you would a file.
returnobj=text
return returnobj
def wrapper(args):
return process_piece(*args)
filename_repeated=[filename]*len(start_byte)
args=zip(filename_repeated,start_byte,end_byte)
pool=multiprocessing.Pool(4)
result=pool.map(wrapper,args)
#Now take your results and write them to the database.
print "".join(result) #I just print it to make sure I get my file back ...
La parte difícil aquí es asegurarnos de dividir el archivo en caracteres de nueva línea para que no se pierda ninguna línea (o solo lea líneas parciales). Luego, cada proceso lee que es parte del archivo y devuelve un objeto que se puede poner en la base de datos por el hilo principal. Por supuesto, puede que incluso necesite hacer esta parte en fragmentos para que no tenga que mantener toda la información en la memoria a la vez. (Esto se logra con bastante facilidad: simplemente divida la lista "args" en X fragmentos y llame a pool.map(wrapper,chunk)
- Vea here )
Lo que está buscando es un patrón Productor / Consumidor
Ejemplo de subprocesamiento básico
Aquí hay un ejemplo básico usando el módulo de subprocesamiento (en lugar de multiprocesamiento)
import threading
import Queue
import sys
def do_work(in_queue, out_queue):
while True:
item = in_queue.get()
# process
result = item
out_queue.put(result)
in_queue.task_done()
if __name__ == "__main__":
work = Queue.Queue()
results = Queue.Queue()
total = 20
# start for workers
for i in xrange(4):
t = threading.Thread(target=do_work, args=(work, results))
t.daemon = True
t.start()
# produce data
for i in xrange(total):
work.put(i)
work.join()
# get the results
for i in xrange(total):
print results.get()
sys.exit()
No compartiría el objeto de archivo con los hilos. Haría un trabajo para ellos suministrando la queue con líneas de datos. Luego, cada hilo recogería una línea, la procesaría y luego la devolvería en la cola.
Hay algunas instalaciones más avanzadas integradas en el módulo de multiprocesamiento para compartir datos, como listas y tipos especiales de cola . Hay compensaciones al uso de multiprocesamiento frente a subprocesos y depende de si su trabajo está vinculado a la CPU o a la IO.
Ejemplo básico de multiprocesamiento.Pool
Aquí hay un ejemplo realmente básico de un conjunto de multiprocesamiento
from multiprocessing import Pool
def process_line(line):
return "FOO: %s" % line
if __name__ == "__main__":
pool = Pool(4)
with open(''file.txt'') as source_file:
# chunk the work into batches of 4 lines at a time
results = pool.map(process_line, source_file, 4)
print results
Un Pool es un objeto de conveniencia que administra sus propios procesos. Como un archivo abierto puede iterar sobre sus líneas, puede pasarlo a pool.map()
, que lo recorrerá y entregará líneas a la función de trabajador. Map bloques y devuelve el resultado completo cuando está hecho. Tenga en cuenta que este es un ejemplo excesivamente simplificado, y que el pool.map()
va a leer todo su archivo en la memoria de una sola vez antes de repartir el trabajo. Si espera tener archivos grandes, tenga esto en cuenta. Hay formas más avanzadas de diseñar una configuración de productor / consumidor.
Manual "grupo" con límite y reordenamiento de línea
Este es un ejemplo manual del Map , pero en lugar de consumir todo un iterable de una sola vez, puede establecer un tamaño de cola para que solo lo alimente pieza por pieza tan rápido como pueda procesarlo. También agregué los números de línea para que pueda rastrearlos y referirse a ellos si lo desea, más adelante.
from multiprocessing import Process, Manager
import time
import itertools
def do_work(in_queue, out_list):
while True:
item = in_queue.get()
line_no, line = item
# exit signal
if line == None:
return
# fake work
time.sleep(.5)
result = (line_no, line)
out_list.append(result)
if __name__ == "__main__":
num_workers = 4
manager = Manager()
results = manager.list()
work = manager.Queue(num_workers)
# start for workers
pool = []
for i in xrange(num_workers):
p = Process(target=do_work, args=(work, results))
p.start()
pool.append(p)
# produce data
with open("source.txt") as f:
iters = itertools.chain(f, (None,)*num_workers)
for num_and_line in enumerate(iters):
work.put(num_and_line)
for p in pool:
p.join()
# get the results
# example: [(1, "foo"), (10, "bar"), (0, "start")]
print sorted(results)
rompa el archivo grande en varios archivos más pequeños y haga que cada uno de ellos se procese en hilos separados.