Concurrencia en Python: grupo de subprocesos

Supongamos que tuviéramos que crear una gran cantidad de subprocesos para nuestras tareas multiproceso. Computacionalmente sería más costoso ya que puede haber muchos problemas de rendimiento debido a demasiados subprocesos. Un problema importante podría ser que el rendimiento se limite. Podemos resolver este problema creando un grupo de hilos. Un grupo de subprocesos puede definirse como el grupo de subprocesos previamente instanciados e inactivos, que están listos para recibir trabajo. Se prefiere crear un grupo de subprocesos a crear instancias de nuevos subprocesos para cada tarea cuando necesitamos realizar una gran cantidad de tareas. Un grupo de subprocesos puede gestionar la ejecución simultánea de una gran cantidad de subprocesos de la siguiente manera:

  • Si un subproceso en un grupo de subprocesos completa su ejecución, ese subproceso se puede reutilizar.

  • Si se termina un hilo, se creará otro hilo para reemplazar ese hilo.

Módulo Python - Concurrent.futures

La biblioteca estándar de Python incluye concurrent.futuresmódulo. Este módulo se agregó en Python 3.2 para proporcionar a los desarrolladores una interfaz de alto nivel para iniciar tareas asincrónicas. Es una capa de abstracción en la parte superior de los módulos de subprocesamiento y multiprocesamiento de Python para proporcionar la interfaz para ejecutar las tareas utilizando un grupo de subprocesos o procesos.

En nuestras secciones posteriores, aprenderemos sobre las diferentes clases del módulo concurrent.futures.

Clase ejecutor

Executores una clase abstracta del concurrent.futuresMódulo de Python. No se puede usar directamente y necesitamos usar una de las siguientes subclases concretas:

  • ThreadPoolExecutor
  • ProcessPoolExecutor

ThreadPoolExecutor - Una subclase concreta

Es una de las subclases concretas de la clase Ejecutor. La subclase utiliza subprocesos múltiples y obtenemos un grupo de subprocesos para enviar las tareas. Este grupo asigna tareas a los subprocesos disponibles y los programa para su ejecución.

¿Cómo crear un ThreadPoolExecutor?

Con la ayuda de concurrent.futures módulo y su subclase de hormigón Executor, podemos crear fácilmente un grupo de subprocesos. Para esto, necesitamos construir unThreadPoolExecutorcon la cantidad de subprocesos que queramos en el grupo. Por defecto, el número es 5. Luego, podemos enviar una tarea al grupo de subprocesos. Cuando nosotrossubmit() una tarea, recuperamos una Future. El objeto Future tiene un método llamadodone(), que dice si el futuro se ha resuelto. Con esto, se ha establecido un valor para ese objeto futuro en particular. Cuando finaliza una tarea, el ejecutor del grupo de subprocesos establece el valor del objeto futuro.

Ejemplo

from concurrent.futures import ThreadPoolExecutor
from time import sleep
def task(message):
   sleep(2)
   return message

def main():
   executor = ThreadPoolExecutor(5)
   future = executor.submit(task, ("Completed"))
   print(future.done())
   sleep(2)
   print(future.done())
   print(future.result())
if __name__ == '__main__':
main()

Salida

False
True
Completed

En el ejemplo anterior, un ThreadPoolExecutorha sido construido con 5 hilos. Luego, se envía una tarea, que esperará 2 segundos antes de dar el mensaje, al ejecutor del grupo de subprocesos. Como se ve en el resultado, la tarea no se completa hasta 2 segundos, por lo que la primera llamada adone()devolverá False. Después de 2 segundos, la tarea está terminada y obtenemos el resultado del futuro llamando alresult() método en él.

Creación de instancias de ThreadPoolExecutor - Administrador de contexto

Otra forma de instanciar ThreadPoolExecutores con la ayuda del administrador de contexto. Funciona de forma similar al método utilizado en el ejemplo anterior. La principal ventaja de usar el administrador de contexto es que se ve bien sintácticamente. La instanciación se puede hacer con la ayuda del siguiente código:

with ThreadPoolExecutor(max_workers = 5) as executor

Ejemplo

El siguiente ejemplo se tomó prestado de los documentos de Python. En este ejemplo, en primer lugarconcurrent.futuresel módulo debe ser importado. Entonces una función llamadaload_url()se crea que cargará la URL solicitada. La función luego creaThreadPoolExecutorcon los 5 hilos en la piscina. losThreadPoolExecutorse ha utilizado como administrador de contexto. Podemos obtener el resultado del futuro llamando alresult() método en él.

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
   'http://www.cnn.com/',
   'http://europe.wsj.com/',
   'http://www.bbc.co.uk/',
   'http://some-made-up-domain.com/']

def load_url(url, timeout):
   with urllib.request.urlopen(url, timeout = timeout) as conn:
   return conn.read()

with concurrent.futures.ThreadPoolExecutor(max_workers = 5) as executor:

   future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
   for future in concurrent.futures.as_completed(future_to_url):
   url = future_to_url[future]
   try:
      data = future.result()
   except Exception as exc:
      print('%r generated an exception: %s' % (url, exc))
   else:
      print('%r page is %d bytes' % (url, len(data)))

Salida

Lo siguiente sería el resultado del script de Python anterior:

'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'http://www.foxnews.com/' page is 229313 bytes
'http://www.cnn.com/' page is 168933 bytes
'http://www.bbc.co.uk/' page is 283893 bytes
'http://europe.wsj.com/' page is 938109 bytes

Uso de la función Executor.map ()

El pitón map()La función se usa ampliamente en una serie de tareas. Una de esas tareas es aplicar una determinada función a cada elemento dentro de los iterables. De manera similar, podemos asignar todos los elementos de un iterador a una función y enviarlos como trabajos independientes a outThreadPoolExecutor. Considere el siguiente ejemplo de secuencia de comandos de Python para comprender cómo funciona la función.

Ejemplo

En este ejemplo a continuación, la función de mapa se utiliza para aplicar la square() función a cada valor en la matriz de valores.

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
values = [2,3,4,5]
def square(n):
   return n * n
def main():
   with ThreadPoolExecutor(max_workers = 3) as executor:
      results = executor.map(square, values)
for result in results:
      print(result)
if __name__ == '__main__':
   main()

Salida

La secuencia de comandos de Python anterior genera la siguiente salida:

4
9
16
25