terminar - Cómo enhebrar una operación dentro de un bucle en Python
for i in range python español (3)
Digamos que tengo una lista muy grande y estoy realizando una operación como esta:
for item in items:
try:
api.my_operation(item)
except:
print ''error with item''
Mi problema es doble:
- Hay muchos artículos
- api.my_operation tarda una eternidad en regresar
Me gustaría utilizar multi-threading para hacer girar un montón de api.my_operations a la vez, así puedo procesar tal vez 5 o 10 o incluso 100 elementos a la vez.
Si my_operation () devuelve una excepción (porque quizás ya procesé ese elemento), está bien. No romperá nada. El ciclo puede continuar al siguiente elemento.
Nota : esto es para Python 2.7.3
Puede dividir el procesamiento en un número específico de subprocesos usando un enfoque como este:
import threading
def process(items, start, end):
for item in items[start:end]:
try:
api.my_operation(item)
except Exception:
print(''error with item'')
def split_processing(items, num_splits=4):
split_size = len(items) // num_splits
threads = []
for i in range(num_splits):
# determine the indices of the list this thread will handle
start = i * split_size
# special case on the last chunk to account for uneven splits
end = None if i+1 == num_splits else (i+1) * split_size
# create the thread
threads.append(
threading.Thread(target=process, args=(items, start, end)))
threads[-1].start() # start the thread we just created
# wait for all threads to finish
for t in threads:
t.join()
split_processing(items)
Editar : olvidé mencionar que esto funciona en Python 2.7.x.
Hay multiprocesamiento.pool, y el siguiente ejemplo ilustra cómo usar uno de ellos:
from multiprocessing.pool import ThreadPool as Pool
# from multiprocessing import Pool
pool_size = 5 # your "parallelness"
pool = Pool(pool_size)
def worker(item):
try:
api.my_operation(item)
except:
print(''error with item'')
for item in items:
pool.apply_async(worker, (item,))
pool.close()
pool.join()
Ahora bien, si efectivamente identifica que su proceso está vinculado a la CPU como se mencionó en @abarnert, cambie ThreadPool a la implementación del grupo de procesos (comentado en la importación de ThreadPool). Puede encontrar más detalles aquí: http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers
Primero, en Python, si su código está vinculado a la CPU, el multihilo no ayudará, ya que solo un hilo puede contener el bloqueo de intérprete global y, por lo tanto, ejecutar el código de Python a la vez. Entonces, necesitas usar procesos, no hilos.
Esto no es cierto si su operación "demora mucho en regresar" porque está vinculada a IO, es decir, esperando en la red o copias de discos o similares. Volveré a eso más tarde.
A continuación, la forma de procesar 5 o 10 o 100 elementos a la vez es crear un grupo de 5 o 10 o 100 trabajadores, y poner los artículos en una cola que el servicio de los trabajadores. Afortunadamente, las bibliotecas stdlib multiprocessing
y concurrent.futures
envuelven la mayoría de los detalles por usted.
El primero es más poderoso y flexible para la programación tradicional; el último es más simple si necesita componer esperando en el futuro; para casos triviales, realmente no importa cuál elijas. (En este caso, la implementación más obvia con cada toma 3 líneas con futures
, 4 líneas con multiprocessing
).
Si usa 2.6-2.7 o 3.0-3.1, los futures
no están incorporados, pero puede instalarlos desde PyPI ( pip install futures
).
Finalmente, es mucho más simple paralizar las cosas si puedes convertir toda la iteración del ciclo en una llamada a función (algo que podrías pasar, por ejemplo, pasar al map
), así que hagámoslo primero:
def try_my_operation(item):
try:
api.my_operation(item)
except:
print(''error with item'')
Poniendolo todo junto:
executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(try_my_operation, item) for item in items]
concurrent.futures.wait(futures)
Si tiene muchos trabajos relativamente pequeños, la sobrecarga del multiprocesamiento puede dañar las ganancias. La forma de resolver eso es combinar el trabajo en trabajos más grandes. Por ejemplo (usando grouper
de las recetas itertools
, que puedes copiar y pegar en tu código, o obtener del proyecto more-itertools
en PyPI):
def try_multiple_operations(items):
for item in items:
try:
api.my_operation(item)
except:
print(''error with item'')
executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(try_multiple_operations, group)
for group in grouper(5, items)]
concurrent.futures.wait(futures)
Finalmente, ¿qué ocurre si tu código está vinculado a IO? Entonces, los hilos son tan buenos como los procesos, y con menos sobrecarga (y menos limitaciones, pero esas limitaciones generalmente no le afectarán en casos como este). A veces, esa "menor sobrecarga" es suficiente para que no necesites lotes con subprocesos, pero lo haces con los procesos, lo cual es una buena victoria.
Entonces, ¿cómo se usan los hilos en lugar de los procesos? Simplemente cambie ProcessPoolExecutor
a ThreadPoolExecutor
.
Si no está seguro de si su código está vinculado a la CPU o a la IO, intente hacerlo de ambas formas.
¿Puedo hacer esto para múltiples funciones en mi script python? Por ejemplo, si tuviera otro bucle for en otra parte del código que quisiera paralelizar. ¿Es posible hacer dos funciones multihilo en el mismo script?
Sí. De hecho, hay dos formas diferentes de hacerlo.
Primero, puedes compartir el mismo ejecutor (hilo o proceso) y usarlo desde múltiples lugares sin ningún problema. El objetivo de las tareas y los futuros es que son autónomos; no te importa dónde se ejecutan, solo para ponerlos en cola y, finalmente, obtener la respuesta.
Alternativamente, puede tener dos ejecutores en el mismo programa sin problemas. Esto tiene un costo de rendimiento: si está utilizando ambos ejecutores al mismo tiempo, terminará tratando de ejecutar (por ejemplo) 16 hilos ocupados en 8 núcleos, lo que significa que habrá un cambio de contexto. Pero a veces vale la pena hacerlo porque, por ejemplo, los dos ejecutores rara vez están ocupados al mismo tiempo, y hace que su código sea mucho más simple. O tal vez un ejecutor está ejecutando tareas muy grandes que puede llevar un tiempo completar, y el otro ejecuta tareas muy pequeñas que deben completarse lo más rápido posible, porque la capacidad de respuesta es más importante que el rendimiento de parte de su programa.
Si no sabe cuál es apropiado para su programa, generalmente es el primero.