parallel - python multiprocessor
Multiprocesamiento con bloques de contexto "fork" en Linux/Intel Xeon con Python 3.6.1? (1)
Como lo insinuó @jxh, las diferencias entre la horquilla y la semilla son importantes. La documentación sobre el multiprocesamiento indica en la sección 17.2.1.2 que la diferencia es que: forking preserva el medio ambiente y cosas como la entrada / salida estándar, mientras que spawn crea un nuevo proceso. Creo que quizás tenga algo en su entorno que cause problemas para la función de trabajo, probablemente en el código detrás de sus comentarios sobre otro procesamiento. El desove te da una pizarra limpia y las cosas van bien en esas condiciones.
Para determinar lo que está sucediendo, quisiera que cada trabajador imprima mensajes de diagnóstico, probablemente escritos en un archivo único para cada trabajador. abra / cierre ese archivo cada vez que desee escribir un mensaje, para que el contenido se actualice / vacíe.
La bifurcación no debe ser más rápida que el desove, porque la bifurcación debe copiar la información del entorno al nuevo proceso. En cualquier caso, este es solo el costo de inicio que es mínimo, supongo, porque el trabajador debe realizar algún trabajo de cálculo o de E / S enlazado que desee paralelizar.
Descripción del problema
Ajusté el código de esta respuesta un poco (ver más abajo). Sin embargo, cuando se ejecuta este script en Linux (por lo tanto, la línea de comandos: python script_name.py
) imprimirá jobs running: x
para todos los trabajos, pero luego parece que se atasca. Sin embargo, cuando uso el método spawn ( mp.set_start_method(''spawn'')
) funciona bien e inmediatamente comienza a imprimir el valor de la variable del counter
(consulte el método de listener
).
Pregunta
- ¿Por qué funciona solo cuando los procesos de desove?
- ¿Cómo puedo ajustar el código para que funcione con
fork
? (porque probablemente sea más rápido)
Código
import io
import csv
import multiprocessing as mp
NEWLINE = ''/n''
def file_searcher(file_path):
parsed_file = csv.DictReader(io.open(file_path, ''r'', encoding=''utf-8''), delimiter=''/t'')
manager = mp.Manager()
q = manager.Queue()
pool = mp.Pool(mp.cpu_count())
# put listener to work first
watcher = pool.apply_async(listener, (q,))
jobs = []
for row in parsed_file:
print(''jobs running: '' + str(len(jobs) + 1))
job = pool.apply_async(worker, (row, q))
jobs.append(job)
# collect results from the workers through the pool result queue
for job in jobs:
job.get()
#now we are done, kill the listener
q.put(''kill'')
pool.close()
pool.join()
def worker(genome_row, q):
complete_data = []
#data processing
#ftp connection to retrieve data
#etc.
q.put(complete_data)
return complete_data
def listener(q):
''''''listens for messages on the q, writes to file. ''''''
f = io.open(''output.txt'', ''w'', encoding=''utf-8'')
counter = 0
while 1:
m = q.get()
counter +=1
print(counter)
if m == ''kill'':
break
for x in m:
f.write(x + NEWLINE)
f.flush()
f.close()
if __name__ == "__main__":
file_searcher(''path_to_some_tab_del_file.txt'')
Información del procesador
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 20
On-line CPU(s) list: 0-19
Thread(s) per core: 1
Core(s) per socket: 1
Socket(s): 20
NUMA node(s): 2
Vendor ID: GenuineIntel
CPU family: 6
Model: 45
Model name: Intel(R) Xeon(R) CPU E5-2660 v3 @ 2.60GHz
Stepping: 2
CPU MHz: 2596.501
BogoMIPS: 5193.98
Hypervisor vendor: VMware
Virtualization type: full
L1d cache: 32K
L1i cache: 32K
L2 cache: 256K
L3 cache: 25600K
NUMA node0 CPU(s): 0-19
Versión del kernel de linux
3.10.0-514.26.2.el7.x86_64
Versión de pitón
Python 3.6.1 :: Continuum Analytics, Inc.
INICIAR SESIÓN
Agregué el código sugerido por @yacc, esto dará el siguiente registro:
[server scripts]$ python main_v3.py
[INFO/SyncManager-1] child process calling self.run()
[INFO/SyncManager-1] created temp directory /tmp/pymp-2a9stjh6
[INFO/SyncManager-1] manager serving at ''/tmp/pymp-2a9stjh6/listener-jxwseclw''
[DEBUG/MainProcess] requesting creation of a shared ''Queue'' object
[DEBUG/SyncManager-1] ''Queue'' callable returned object with id ''7f0842da56a0''
[DEBUG/MainProcess] INCREF ''7f0842da56a0''
[DEBUG/MainProcess] created semlock with handle 139673691570176
[DEBUG/MainProcess] created semlock with handle 139673691566080
[DEBUG/MainProcess] created semlock with handle 139673691561984
[DEBUG/MainProcess] created semlock with handle 139673691557888
[DEBUG/MainProcess] added worker
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-2] INCREF ''7f0842da56a0''
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-2] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-4] INCREF ''7f0842da56a0''
[INFO/ForkPoolWorker-4] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-3] INCREF ''7f0842da56a0''
[INFO/ForkPoolWorker-3] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-6] INCREF ''7f0842da56a0''
[DEBUG/ForkPoolWorker-5] INCREF ''7f0842da56a0''
[INFO/ForkPoolWorker-6] child process calling self.run()
[INFO/ForkPoolWorker-5] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-7] INCREF ''7f0842da56a0''
[DEBUG/ForkPoolWorker-8] INCREF ''7f0842da56a0''
[INFO/ForkPoolWorker-7] child process calling self.run()
[INFO/ForkPoolWorker-8] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-9] INCREF ''7f0842da56a0''
[INFO/ForkPoolWorker-9] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-10] INCREF ''7f0842da56a0''
[INFO/ForkPoolWorker-10] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-11] INCREF ''7f0842da56a0''
[INFO/ForkPoolWorker-11] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-12] INCREF ''7f0842da56a0''
[INFO/ForkPoolWorker-12] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-13] INCREF ''7f0842da56a0''
[INFO/ForkPoolWorker-13] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-14] INCREF ''7f0842da56a0''
[INFO/ForkPoolWorker-14] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-15] INCREF ''7f0842da56a0''
[INFO/ForkPoolWorker-15] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-16] INCREF ''7f0842da56a0''
[INFO/ForkPoolWorker-16] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-17] INCREF ''7f0842da56a0''
[INFO/ForkPoolWorker-17] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-18] INCREF ''7f0842da56a0''
[INFO/ForkPoolWorker-18] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-19] INCREF ''7f0842da56a0''
[INFO/ForkPoolWorker-19] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-20] INCREF ''7f0842da56a0''
[INFO/ForkPoolWorker-20] child process calling self.run()
jobs running: 1
jobs running: 2
jobs running: 3
jobs running: 4
[DEBUG/ForkPoolWorker-21] INCREF ''7f0842da56a0''
[INFO/ForkPoolWorker-21] child process calling self.run()
jobs running: 5
jobs running: 6
jobs running: 7
[DEBUG/ForkPoolWorker-2] INCREF ''7f0842da56a0''
jobs running: 8
written to file
jobs running: 9
jobs running: 10
[DEBUG/ForkPoolWorker-2] thread ''MainThread'' does not own a connection
[DEBUG/ForkPoolWorker-2] making connection to manager
jobs running: 11
jobs running: 12
jobs running: 13
jobs running: 14
jobs running: 15
[DEBUG/SyncManager-1] starting server thread to service ''ForkPoolWorker-2''
jobs running: 16
jobs running: 17
jobs running: 18
jobs running: 19
[DEBUG/ForkPoolWorker-4] INCREF ''7f0842da56a0''
[DEBUG/ForkPoolWorker-3] INCREF ''7f0842da56a0''
[DEBUG/ForkPoolWorker-5] INCREF ''7f0842da56a0''
[DEBUG/ForkPoolWorker-6] INCREF ''7f0842da56a0''
[DEBUG/ForkPoolWorker-7] INCREF ''7f0842da56a0''
[DEBUG/ForkPoolWorker-8] INCREF ''7f0842da56a0''
[DEBUG/ForkPoolWorker-10] INCREF ''7f0842da56a0''
[DEBUG/ForkPoolWorker-9] INCREF ''7f0842da56a0''
[DEBUG/ForkPoolWorker-11] INCREF ''7f0842da56a0''
[DEBUG/ForkPoolWorker-13] INCREF ''7f0842da56a0''
[DEBUG/ForkPoolWorker-14] INCREF ''7f0842da56a0''
[DEBUG/ForkPoolWorker-12] INCREF ''7f0842da56a0''
[DEBUG/ForkPoolWorker-15] INCREF ''7f0842da56a0''
[DEBUG/ForkPoolWorker-16] INCREF ''7f0842da56a0''
[DEBUG/ForkPoolWorker-18] INCREF ''7f0842da56a0''
[DEBUG/ForkPoolWorker-17] INCREF ''7f0842da56a0''
[DEBUG/ForkPoolWorker-20] INCREF ''7f0842da56a0''
[DEBUG/ForkPoolWorker-19] INCREF ''7f0842da56a0''
[DEBUG/ForkPoolWorker-21] INCREF ''7f0842da56a0''