thread parallel lock async python linux python-3.x multiprocessing fork

parallel - python multiprocessor



Multiprocesamiento con bloques de contexto "fork" en Linux/Intel Xeon con Python 3.6.1? (1)

Como lo insinuó @jxh, las diferencias entre la horquilla y la semilla son importantes. La documentación sobre el multiprocesamiento indica en la sección 17.2.1.2 que la diferencia es que: forking preserva el medio ambiente y cosas como la entrada / salida estándar, mientras que spawn crea un nuevo proceso. Creo que quizás tenga algo en su entorno que cause problemas para la función de trabajo, probablemente en el código detrás de sus comentarios sobre otro procesamiento. El desove te da una pizarra limpia y las cosas van bien en esas condiciones.

Para determinar lo que está sucediendo, quisiera que cada trabajador imprima mensajes de diagnóstico, probablemente escritos en un archivo único para cada trabajador. abra / cierre ese archivo cada vez que desee escribir un mensaje, para que el contenido se actualice / vacíe.

La bifurcación no debe ser más rápida que el desove, porque la bifurcación debe copiar la información del entorno al nuevo proceso. En cualquier caso, este es solo el costo de inicio que es mínimo, supongo, porque el trabajador debe realizar algún trabajo de cálculo o de E / S enlazado que desee paralelizar.

Descripción del problema
Ajusté el código de esta respuesta un poco (ver más abajo). Sin embargo, cuando se ejecuta este script en Linux (por lo tanto, la línea de comandos: python script_name.py ) imprimirá jobs running: x para todos los trabajos, pero luego parece que se atasca. Sin embargo, cuando uso el método spawn ( mp.set_start_method(''spawn'') ) funciona bien e inmediatamente comienza a imprimir el valor de la variable del counter (consulte el método de listener ).

Pregunta

  • ¿Por qué funciona solo cuando los procesos de desove?
  • ¿Cómo puedo ajustar el código para que funcione con fork ? (porque probablemente sea más rápido)

Código

import io import csv import multiprocessing as mp NEWLINE = ''/n'' def file_searcher(file_path): parsed_file = csv.DictReader(io.open(file_path, ''r'', encoding=''utf-8''), delimiter=''/t'') manager = mp.Manager() q = manager.Queue() pool = mp.Pool(mp.cpu_count()) # put listener to work first watcher = pool.apply_async(listener, (q,)) jobs = [] for row in parsed_file: print(''jobs running: '' + str(len(jobs) + 1)) job = pool.apply_async(worker, (row, q)) jobs.append(job) # collect results from the workers through the pool result queue for job in jobs: job.get() #now we are done, kill the listener q.put(''kill'') pool.close() pool.join() def worker(genome_row, q): complete_data = [] #data processing #ftp connection to retrieve data #etc. q.put(complete_data) return complete_data def listener(q): ''''''listens for messages on the q, writes to file. '''''' f = io.open(''output.txt'', ''w'', encoding=''utf-8'') counter = 0 while 1: m = q.get() counter +=1 print(counter) if m == ''kill'': break for x in m: f.write(x + NEWLINE) f.flush() f.close() if __name__ == "__main__": file_searcher(''path_to_some_tab_del_file.txt'')

Información del procesador

Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Byte Order: Little Endian CPU(s): 20 On-line CPU(s) list: 0-19 Thread(s) per core: 1 Core(s) per socket: 1 Socket(s): 20 NUMA node(s): 2 Vendor ID: GenuineIntel CPU family: 6 Model: 45 Model name: Intel(R) Xeon(R) CPU E5-2660 v3 @ 2.60GHz Stepping: 2 CPU MHz: 2596.501 BogoMIPS: 5193.98 Hypervisor vendor: VMware Virtualization type: full L1d cache: 32K L1i cache: 32K L2 cache: 256K L3 cache: 25600K NUMA node0 CPU(s): 0-19

Versión del kernel de linux

3.10.0-514.26.2.el7.x86_64

Versión de pitón

Python 3.6.1 :: Continuum Analytics, Inc.

INICIAR SESIÓN
Agregué el código sugerido por @yacc, esto dará el siguiente registro:

[server scripts]$ python main_v3.py [INFO/SyncManager-1] child process calling self.run() [INFO/SyncManager-1] created temp directory /tmp/pymp-2a9stjh6 [INFO/SyncManager-1] manager serving at ''/tmp/pymp-2a9stjh6/listener-jxwseclw'' [DEBUG/MainProcess] requesting creation of a shared ''Queue'' object [DEBUG/SyncManager-1] ''Queue'' callable returned object with id ''7f0842da56a0'' [DEBUG/MainProcess] INCREF ''7f0842da56a0'' [DEBUG/MainProcess] created semlock with handle 139673691570176 [DEBUG/MainProcess] created semlock with handle 139673691566080 [DEBUG/MainProcess] created semlock with handle 139673691561984 [DEBUG/MainProcess] created semlock with handle 139673691557888 [DEBUG/MainProcess] added worker [DEBUG/MainProcess] added worker [DEBUG/ForkPoolWorker-2] INCREF ''7f0842da56a0'' [DEBUG/MainProcess] added worker [INFO/ForkPoolWorker-2] child process calling self.run() [DEBUG/MainProcess] added worker [DEBUG/ForkPoolWorker-4] INCREF ''7f0842da56a0'' [INFO/ForkPoolWorker-4] child process calling self.run() [DEBUG/MainProcess] added worker [DEBUG/ForkPoolWorker-3] INCREF ''7f0842da56a0'' [INFO/ForkPoolWorker-3] child process calling self.run() [DEBUG/MainProcess] added worker [DEBUG/MainProcess] added worker [DEBUG/ForkPoolWorker-6] INCREF ''7f0842da56a0'' [DEBUG/ForkPoolWorker-5] INCREF ''7f0842da56a0'' [INFO/ForkPoolWorker-6] child process calling self.run() [INFO/ForkPoolWorker-5] child process calling self.run() [DEBUG/MainProcess] added worker [DEBUG/ForkPoolWorker-7] INCREF ''7f0842da56a0'' [DEBUG/ForkPoolWorker-8] INCREF ''7f0842da56a0'' [INFO/ForkPoolWorker-7] child process calling self.run() [INFO/ForkPoolWorker-8] child process calling self.run() [DEBUG/MainProcess] added worker [DEBUG/ForkPoolWorker-9] INCREF ''7f0842da56a0'' [INFO/ForkPoolWorker-9] child process calling self.run() [DEBUG/MainProcess] added worker [DEBUG/ForkPoolWorker-10] INCREF ''7f0842da56a0'' [INFO/ForkPoolWorker-10] child process calling self.run() [DEBUG/MainProcess] added worker [DEBUG/ForkPoolWorker-11] INCREF ''7f0842da56a0'' [INFO/ForkPoolWorker-11] child process calling self.run() [DEBUG/MainProcess] added worker [DEBUG/ForkPoolWorker-12] INCREF ''7f0842da56a0'' [INFO/ForkPoolWorker-12] child process calling self.run() [DEBUG/MainProcess] added worker [DEBUG/ForkPoolWorker-13] INCREF ''7f0842da56a0'' [INFO/ForkPoolWorker-13] child process calling self.run() [DEBUG/MainProcess] added worker [DEBUG/ForkPoolWorker-14] INCREF ''7f0842da56a0'' [INFO/ForkPoolWorker-14] child process calling self.run() [DEBUG/MainProcess] added worker [DEBUG/ForkPoolWorker-15] INCREF ''7f0842da56a0'' [INFO/ForkPoolWorker-15] child process calling self.run() [DEBUG/MainProcess] added worker [DEBUG/ForkPoolWorker-16] INCREF ''7f0842da56a0'' [INFO/ForkPoolWorker-16] child process calling self.run() [DEBUG/MainProcess] added worker [DEBUG/ForkPoolWorker-17] INCREF ''7f0842da56a0'' [INFO/ForkPoolWorker-17] child process calling self.run() [DEBUG/MainProcess] added worker [DEBUG/ForkPoolWorker-18] INCREF ''7f0842da56a0'' [INFO/ForkPoolWorker-18] child process calling self.run() [DEBUG/MainProcess] added worker [DEBUG/ForkPoolWorker-19] INCREF ''7f0842da56a0'' [INFO/ForkPoolWorker-19] child process calling self.run() [DEBUG/MainProcess] added worker [DEBUG/ForkPoolWorker-20] INCREF ''7f0842da56a0'' [INFO/ForkPoolWorker-20] child process calling self.run() jobs running: 1 jobs running: 2 jobs running: 3 jobs running: 4 [DEBUG/ForkPoolWorker-21] INCREF ''7f0842da56a0'' [INFO/ForkPoolWorker-21] child process calling self.run() jobs running: 5 jobs running: 6 jobs running: 7 [DEBUG/ForkPoolWorker-2] INCREF ''7f0842da56a0'' jobs running: 8 written to file jobs running: 9 jobs running: 10 [DEBUG/ForkPoolWorker-2] thread ''MainThread'' does not own a connection [DEBUG/ForkPoolWorker-2] making connection to manager jobs running: 11 jobs running: 12 jobs running: 13 jobs running: 14 jobs running: 15 [DEBUG/SyncManager-1] starting server thread to service ''ForkPoolWorker-2'' jobs running: 16 jobs running: 17 jobs running: 18 jobs running: 19 [DEBUG/ForkPoolWorker-4] INCREF ''7f0842da56a0'' [DEBUG/ForkPoolWorker-3] INCREF ''7f0842da56a0'' [DEBUG/ForkPoolWorker-5] INCREF ''7f0842da56a0'' [DEBUG/ForkPoolWorker-6] INCREF ''7f0842da56a0'' [DEBUG/ForkPoolWorker-7] INCREF ''7f0842da56a0'' [DEBUG/ForkPoolWorker-8] INCREF ''7f0842da56a0'' [DEBUG/ForkPoolWorker-10] INCREF ''7f0842da56a0'' [DEBUG/ForkPoolWorker-9] INCREF ''7f0842da56a0'' [DEBUG/ForkPoolWorker-11] INCREF ''7f0842da56a0'' [DEBUG/ForkPoolWorker-13] INCREF ''7f0842da56a0'' [DEBUG/ForkPoolWorker-14] INCREF ''7f0842da56a0'' [DEBUG/ForkPoolWorker-12] INCREF ''7f0842da56a0'' [DEBUG/ForkPoolWorker-15] INCREF ''7f0842da56a0'' [DEBUG/ForkPoolWorker-16] INCREF ''7f0842da56a0'' [DEBUG/ForkPoolWorker-18] INCREF ''7f0842da56a0'' [DEBUG/ForkPoolWorker-17] INCREF ''7f0842da56a0'' [DEBUG/ForkPoolWorker-20] INCREF ''7f0842da56a0'' [DEBUG/ForkPoolWorker-19] INCREF ''7f0842da56a0'' [DEBUG/ForkPoolWorker-21] INCREF ''7f0842da56a0''