Concurrencia en Python: conjunto de procesos

El grupo de procesos se puede crear y usar de la misma manera que hemos creado y usado el grupo de subprocesos. El grupo de procesos se puede definir como el grupo de procesos preinstanciados e inactivos, que están listos para recibir trabajo. Se prefiere la creación de un grupo de procesos a la creación de instancias de nuevos procesos para cada tarea cuando necesitamos realizar una gran cantidad de tareas.

Módulo Python - Concurrent.futures

La biblioteca estándar de Python tiene un módulo llamado concurrent.futures. Este módulo se agregó en Python 3.2 para proporcionar a los desarrolladores una interfaz de alto nivel para iniciar tareas asincrónicas. Es una capa de abstracción en la parte superior de los módulos de subprocesamiento y multiprocesamiento de Python para proporcionar la interfaz para ejecutar las tareas utilizando un grupo de subprocesos o procesos.

En las secciones siguientes, veremos las diferentes subclases del módulo concurrent.futures.

Clase ejecutor

Executor es una clase abstracta del concurrent.futuresMódulo de Python. No se puede usar directamente y necesitamos usar una de las siguientes subclases concretas:

  • ThreadPoolExecutor
  • ProcessPoolExecutor

ProcessPoolExecutor: una subclase concreta

Es una de las subclases concretas de la clase Ejecutor. Utiliza multiprocesamiento y obtenemos un conjunto de procesos para enviar las tareas. Este grupo asigna tareas a los procesos disponibles y los programa para que se ejecuten.

¿Cómo crear un ProcessPoolExecutor?

Con la ayuda del concurrent.futures módulo y su subclase de hormigón Executor, podemos crear fácilmente un conjunto de procesos. Para esto, necesitamos construir unProcessPoolExecutorcon la cantidad de procesos que queremos en el grupo. De forma predeterminada, el número es 5. A continuación, se envía una tarea al grupo de procesos.

Ejemplo

Ahora consideraremos el mismo ejemplo que usamos al crear el grupo de subprocesos, la única diferencia es que ahora usaremos ProcessPoolExecutor en vez de ThreadPoolExecutor .

from concurrent.futures import ProcessPoolExecutor
from time import sleep
def task(message):
   sleep(2)
   return message

def main():
   executor = ProcessPoolExecutor(5)
   future = executor.submit(task, ("Completed"))
   print(future.done())
   sleep(2)
   print(future.done())
   print(future.result())
if __name__ == '__main__':
main()

Salida

False
False
Completed

En el ejemplo anterior, un procesoPoolExecutorha sido construido con 5 hilos. Luego, una tarea, que esperará 2 segundos antes de dar el mensaje, se envía al ejecutor del grupo de procesos. Como se ve en el resultado, la tarea no se completa hasta 2 segundos, por lo que la primera llamada adone()devolverá False. Después de 2 segundos, la tarea está terminada y obtenemos el resultado del futuro llamando alresult() método en él.

Creación de instancias ProcessPoolExecutor - Administrador de contexto

Otra forma de crear una instancia de ProcessPoolExecutor es con la ayuda del administrador de contexto. Funciona de forma similar al método utilizado en el ejemplo anterior. La principal ventaja de utilizar el administrador de contexto es que se ve bien sintácticamente. La instanciación se puede hacer con la ayuda del siguiente código:

with ProcessPoolExecutor(max_workers = 5) as executor

Ejemplo

Para una mejor comprensión, tomamos el mismo ejemplo que se usó al crear un grupo de subprocesos. En este ejemplo, debemos comenzar importando elconcurrent.futuresmódulo. Entonces una función llamadaload_url()se crea que cargará la URL solicitada. losProcessPoolExecutorluego se crea con el número de 5 subprocesos en el grupo. El procesoPoolExecutorse ha utilizado como administrador de contexto. Podemos obtener el resultado del futuro llamando alresult() método en él.

import concurrent.futures
from concurrent.futures import ProcessPoolExecutor
import urllib.request

URLS = ['http://www.foxnews.com/',
   'http://www.cnn.com/',
   'http://europe.wsj.com/',
   'http://www.bbc.co.uk/',
   'http://some-made-up-domain.com/']

def load_url(url, timeout):
   with urllib.request.urlopen(url, timeout = timeout) as conn:
      return conn.read()

def main():
   with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
      future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
      for future in concurrent.futures.as_completed(future_to_url):
      url = future_to_url[future]
      try:
         data = future.result()
      except Exception as exc:
         print('%r generated an exception: %s' % (url, exc))
      else:
         print('%r page is %d bytes' % (url, len(data)))

if __name__ == '__main__':
   main()

Salida

La secuencia de comandos de Python anterior generará la siguiente salida:

'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'http://www.foxnews.com/' page is 229476 bytes
'http://www.cnn.com/' page is 165323 bytes
'http://www.bbc.co.uk/' page is 284981 bytes
'http://europe.wsj.com/' page is 967575 bytes

Uso de la función Executor.map ()

El pitón map()La función se utiliza ampliamente para realizar una serie de tareas. Una de esas tareas es aplicar una determinada función a cada elemento dentro de los iterables. De manera similar, podemos asignar todos los elementos de un iterador a una función y enviarlos como trabajos independientes a laProcessPoolExecutor. Considere el siguiente ejemplo de secuencia de comandos de Python para comprender esto.

Ejemplo

Consideraremos el mismo ejemplo que usamos al crear un grupo de subprocesos usando el Executor.map()función. En el ejemplo dado a continuación, la función de mapa se utiliza para aplicarsquare() función a cada valor en la matriz de valores.

from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
values = [2,3,4,5]
def square(n):
   return n * n
def main():
   with ProcessPoolExecutor(max_workers = 3) as executor:
      results = executor.map(square, values)
   for result in results:
      print(result)
if __name__ == '__main__':
   main()

Salida

La secuencia de comandos de Python anterior generará la siguiente salida

4
9
16
25

¿Cuándo usar ProcessPoolExecutor y ThreadPoolExecutor?

Ahora que hemos estudiado las dos clases de Ejecutor, ThreadPoolExecutor y ProcessPoolExecutor, necesitamos saber cuándo usar qué ejecutor. Debemos elegir ProcessPoolExecutor en caso de cargas de trabajo vinculadas a la CPU y ThreadPoolExecutor en el caso de cargas de trabajo vinculadas a E / S.

Si usamos ProcessPoolExecutor, entonces no tenemos que preocuparnos por GIL porque utiliza multiprocesamiento. Además, el tiempo de ejecución será menor en comparación conThreadPoolExecution. Considere el siguiente ejemplo de secuencia de comandos de Python para comprender esto.

Ejemplo

import time
import concurrent.futures

value = [8000000, 7000000]

def counting(n):
   start = time.time()
   while n > 0:
      n -= 1
   return time.time() - start

def main():
   start = time.time()
   with concurrent.futures.ProcessPoolExecutor() as executor:
      for number, time_taken in zip(value, executor.map(counting, value)):
         print('Start: {} Time taken: {}'.format(number, time_taken))
   print('Total time taken: {}'.format(time.time() - start))

if __name__ == '__main__':
main()

Salida

Start: 8000000 Time taken: 1.5509998798370361
Start: 7000000 Time taken: 1.3259999752044678
Total time taken: 2.0840001106262207

Example- Python script with ThreadPoolExecutor:
import time
import concurrent.futures

value = [8000000, 7000000]

def counting(n):
   start = time.time()
   while n > 0:
      n -= 1
   return time.time() - start

def main():
   start = time.time()
   with concurrent.futures.ThreadPoolExecutor() as executor:
      for number, time_taken in zip(value, executor.map(counting, value)):
         print('Start: {} Time taken: {}'.format(number, time_taken))
      print('Total time taken: {}'.format(time.time() - start))

if __name__ == '__main__':
main()

Salida

Start: 8000000 Time taken: 3.8420000076293945
Start: 7000000 Time taken: 3.6010000705718994
Total time taken: 3.8480000495910645

De las salidas de los dos programas anteriores, podemos ver la diferencia de tiempo de ejecución mientras usamos ProcessPoolExecutor y ThreadPoolExecutor.