python multithreading python-3.x python-multithreading concurrent.futures

python - ¿En qué se diferencia ThreadPoolExecutor(). Map de ThreadPoolExecutor(). Submit?



multithreading python-3.x (2)

A continuación se muestra un ejemplo de envío vs. mapa. Ambos aceptan los trabajos inmediatamente (enviados | asignados - inicio). Tardan el mismo tiempo en completarse, 11 segundos (tiempo del último resultado - inicio). Sin embargo, el envío proporciona resultados tan pronto como se completa cualquier subproceso en ThreadPoolExecutor maxThreads = 2. El mapa da los resultados en el orden en que se envían.

import time import concurrent.futures def worker(i): time.sleep(i) return i,time.time() e = concurrent.futures.ThreadPoolExecutor(2) arrIn = range(1,7)[::-1] print arrIn f = [] print ''start submit'',time.time() for i in arrIn: f.append(e.submit(worker,i)) print ''submitted'',time.time() for r in concurrent.futures.as_completed(f): print r.result(),time.time() print f = [] print ''start map'',time.time() f = e.map(worker,arrIn) print ''mapped'',time.time() for r in f: print r,time.time()

Salida:

[6, 5, 4, 3, 2, 1] start submit 1543473934.47 submitted 1543473934.47 (5, 1543473939.473743) 1543473939.47 (6, 1543473940.471591) 1543473940.47 (3, 1543473943.473639) 1543473943.47 (4, 1543473943.474192) 1543473943.47 (1, 1543473944.474617) 1543473944.47 (2, 1543473945.477609) 1543473945.48 start map 1543473945.48 mapped 1543473945.48 (6, 1543473951.483908) 1543473951.48 (5, 1543473950.484109) 1543473951.48 (4, 1543473954.48858) 1543473954.49 (3, 1543473954.488384) 1543473954.49 (2, 1543473956.493789) 1543473956.49 (1, 1543473955.493888) 1543473956.49

Simplemente estaba muy confundido por algún código que escribí. Me sorprendió descubrir que:

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: results = list(executor.map(f, iterable))

y

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: results = list(map(lambda x: executor.submit(f, x), iterable))

Producir diferentes resultados. El primero produce una lista del tipo que devuelve f , el segundo produce una lista de objetos concurrent.futures.Future que luego deben evaluarse con su método result() para obtener el valor que devolvió f .

Mi principal preocupación es que esto significa que executor.map no puede aprovechar concurrent.futures.as_completed , lo que parece ser una forma extremadamente conveniente de evaluar los resultados de algunas llamadas de larga duración a una base de datos que estoy realizando. volverse disponible.

No estoy del todo claro sobre cómo funcionan los objetos concurrent.futures.ThreadPoolExecutor - ingenuamente, preferiría (algo más detallado):

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: result_futures = list(map(lambda x: executor.submit(f, x), iterable)) results = [f.result() for f in futures.as_completed(result_futures)]

sobre el executor.map más conciso para aprovechar una posible ganancia en el rendimiento. ¿Me equivoco al hacerlo?


El problema es que transformas el resultado de ThreadPoolExecutor.map en una lista. Si no hace esto y en su lugar itera directamente sobre el generador resultante, los resultados todavía se muestran en el orden correcto, pero el ciclo continúa antes de que todos los resultados estén listos. Puedes probar esto con este ejemplo:

import time import concurrent.futures e = concurrent.futures.ThreadPoolExecutor(4) s = range(10) for i in e.map(time.sleep, s): print(i)

El hecho de que se mantenga el orden puede ser porque a veces es importante que obtenga los resultados en el mismo orden en que los asignó. Y es probable que los resultados no se envuelvan en objetos futuros porque en algunas situaciones puede tomar demasiado tiempo hacer otro mapa en la lista para obtener todos los resultados si los necesita. Y después de todo, en la mayoría de los casos es muy probable que el siguiente valor esté listo antes de que el bucle procese el primer valor. Esto se demuestra en este ejemplo:

import concurrent.futures executor = concurrent.futures.ThreadPoolExecutor() # Or ProcessPoolExecutor data = some_huge_list() results = executor.map(crunch_number, data) finals = [] for value in results: finals.append(do_some_stuff(value))

En este ejemplo, es probable que do_some_stuff tome más tiempo que crunch_number y, si este es realmente el caso, no es realmente una gran pérdida de rendimiento mientras aún mantiene el uso fácil del mapa.

Además, dado que los subprocesos de trabajo (/ procesos) comienzan a procesarse al principio de la lista y continúan hasta el final de la lista a la que envió, los resultados deben terminarse en el orden en que el iterador ya los proporcionó. Lo que significa que en la mayoría de los casos, executor.map está bien, pero en algunos casos, por ejemplo, si no importa en qué orden se procesan los valores y la función que se pasó al map toma tiempos muy diferentes para ejecutarse, el future.as_completed puede ser mas rapido