example ejemplos concurrency mapreduce

concurrency - ejemplos - ¿Cómo le digo a una máquina multi-core/multi-CPU que procese llamadas de función en un bucle en paralelo?



mongodb map reduce example (9)

Actualmente estoy diseñando una aplicación que tiene un módulo que cargará grandes cantidades de datos de una base de datos y los reducirá a un conjunto mucho más pequeño según varios cálculos, según las circunstancias.

Muchas de las operaciones más intensivas se comportan de manera determinista y se prestan a un procesamiento paralelo.

Siempre que tenga un bucle que itere sobre una gran cantidad de fragmentos de datos que llegan del DB y para cada uno llame a una función determinística sin efectos secundarios, ¿cómo lo haré para que el programa no espere a que la función regrese sino que establece las próximas llamadas van para que puedan procesarse en paralelo? Un enfoque ingenuo para demostrar el principio me haría por ahora.

He leído el documento de MapReduce de Google y si bien podría utilizar el principio general en varios lugares, por ahora, no me enfocaré en clusters grandes, sino que será una máquina de varios núcleos o multi-CPU para la versión 1.0 . Por lo tanto, actualmente no estoy seguro de si realmente puedo usar la biblioteca o tendré que rodar una versión básica embrutecida.

Estoy en una etapa inicial del proceso de diseño y hasta ahora me estoy enfocando en C-algo (para los bits críticos de velocidad) y Python (para los bits críticos de productividad) como mis idiomas. Si hay razones convincentes, podría cambiar, pero hasta ahora estoy contento con mi elección.

Tenga en cuenta que soy consciente del hecho de que podría llevar más tiempo recuperar el siguiente fragmento de la base de datos que procesar el actual y todo el proceso estaría vinculado a E / S. Sin embargo, supongo que por el momento no es así y, en la práctica, utilizo un clúster de db o almacenamiento en memoria caché u otra cosa para no estar enlazado a E / S en este punto.



Si está trabajando con un compilador que lo respaldará, le sugiero que consulte http://www.openmp.org para obtener una forma de anotar su código de modo que ciertos bucles se paralelicen.

También hace mucho más, y puede que le resulte muy útil.

Su página web informa que gcc4.2 admitirá openmp, por ejemplo.


Me puede estar perdiendo algo aquí, pero esto parece bastante sencillo usando pthreads.

Configure un pequeño grupo de subprocesos con N subprocesos y tenga un subproceso para controlarlos todos.

El hilo maestro simplemente se sienta en un bucle haciendo algo como:

  1. Obtener fragmento de datos de DB
  2. Encuentre el siguiente hilo libre Si ningún hilo es gratis, espere
  3. Entregar pedazo a hilo de trabajador
  4. Regrese y obtenga el siguiente fragmento de DB

Mientras tanto, los hilos del trabajador se sientan y hacen:

  1. Marcarme como libre
  2. Espere a que el hilo del mástil me dé una gran cantidad de datos
  3. Procesar el fragmento de datos
  4. Marcarme como libre de nuevo

El método por el cual implementa esto puede ser tan simple como dos matrices controladas por mutex. Uno tiene los hilos trabajados en él (el grupo de hilos) y el otro indicado si cada hilo correspondiente está libre u ocupado.

Ajustar N a su gusto ...


Si aún planea usar Python, es posible que desee echarle un vistazo a Processing . Utiliza procesos en lugar de hilos para computación paralela (debido a Python GIL) y proporciona clases para distribuir "elementos de trabajo" en varios procesos. Usando la clase de grupo, puede escribir código como el siguiente:

import processing def worker(i): return i*i num_workers = 2 pool = processing.Pool(num_workers) result = pool.imap(worker, range(100000))

Esta es una versión paralela de itertools.imap, que distribuye las llamadas a los procesos. También puede usar los métodos apply_async del grupo y almacenar objetos de resultado diferidos en una lista:

results = [] for i in range(10000): results.append(pool.apply_async(worker, i))

Para obtener más referencias, consulte la documentación de la clase Pool .

Gotchas:

  • el procesamiento utiliza fork (), por lo que debe tener cuidado con Win32
  • los objetos transferidos entre procesos deben ser elegibles
  • si los trabajadores son relativamente rápidos, puede modificar el tamaño en trozos, es decir, la cantidad de elementos de trabajo enviados a un proceso de trabajo en un solo lote
  • processing.Pool usa un hilo de fondo

Puede implementar el algoritmo de MapReduce de Google sin tener máquinas físicamente separadas. Simplemente considere cada una de esas "máquinas" como "hilos". Los hilos se distribuyen automáticamente en máquinas multi-core.


El mismo grupo de subprocesos se usa en Java. Pero los subprocesos de los grupos de subprocesos son serializables y se envían a otras computadoras y se deserializan para ejecutarse.


Desarrollé una biblioteca MapReduce para uso de múltiples subprocesos / multi-core en un único servidor. La biblioteca se encarga de todo y el usuario solo debe implementar Map and Reduce. Se posiciona como una biblioteca de Boost, pero todavía no se acepta como una lib formal. Consulte http://www.craighenderson.co.uk/mapreduce


El TBB de Intel o boost :: mpi también pueden ser de su interés.


Puede que le interese examinar el código de libdispatch , que es la implementación de código abierto del Grand Central Dispatch de Apple.