tutorial programas ejercicios ejemplos ejemplo comunicadores aplicaciones c++ count thread-safety mpi mpi-rma

c++ - programas - mpi pdf



Crear un contador que se mantenga sincronizado a través de procesos MPI (4)

Tengo bastante experiencia en el uso de los métodos básicos de comunicación y de grupo MPI2, y hago un poco de trabajo de simulación paralelo embarazoso usando MPI. Hasta ahora, he estructurado mi código para tener un nodo de despacho y un grupo de nodos de trabajadores. El nodo de envío tiene una lista de archivos de parámetros que se ejecutarán con el simulador. Semilla cada nodo de trabajador con un archivo de parámetros. Los nodos de trabajadores ejecutan su simulación y luego solicitan otro archivo de parámetros, que proporciona el nodo de despacho. Una vez que se han ejecutado todos los archivos de parámetros, el nodo de envío apaga cada nodo de trabajo, antes de apagarse.

Los archivos de parámetros se suelen llamar "Par_N.txt", donde N es el número de identificación (por ejemplo, N = 1-1000). Así que estaba pensando, si pudiera crear un contador, y podría tener este contador sincronizado en todos mis nodos, podría eliminar la necesidad de tener un nodo de envío y hacer que el sistema sea un poco más simple. Tan simple como esto suena en teoría, en la práctica sospecho que es un poco más difícil, ya que necesitaría asegurar que el contador esté bloqueado mientras se cambia, etc. Y pensé que podría haber una forma incorporada para que MPI maneja esto. ¿Alguna idea? ¿Estoy más pensando en esto?


No puedo pensar en ningún mecanismo incorporado para resolver ese problema, tendrías que implementarlo manualmente. A juzgar por sus comentarios, quiere descentralizar el programa, en cuyo caso cada proceso (o al menos grupos de procesos) debería mantener sus propios valores del contador y mantenerlo sincronizado. Esto probablemente podría hacerse con un uso inteligente de envíos / recepciones no bloqueantes, pero la semántica de estos no es trivial.

En cambio, resolvería el problema de la saturación simplemente emitiendo varios archivos a la vez a los procesos de los trabajadores. Esto reduciría el tráfico de la red y le permitirá mantener su configuración simple de despachador único.


No está claro si es necesario revisar los archivos en estricto orden o no. Si no es así, ¿por qué no simplemente hacer que cada nodo maneje todos los archivos donde N % total_workers == i --es decir, distribución cíclica del trabajo?


Parece que está utilizando su nodo de distribución para realizar el equilibrio dinámico de carga (asignando trabajo a los procesadores cuando estén disponibles). Un contador compartido que no requiera que todos los procesadores detengan no lo hará. Le recomendaría que se quede con lo que tiene ahora o haga lo que sugiere suszterpatt, envíe lotes de archivos a la vez.


Implementar un contador compartido no es trivial, pero una vez que lo haces y lo tienes en una biblioteca, puedes hacer mucho con él.

En el libro Using MPI-2 , que debería tener a mano si va a implementar esto, uno de los ejemplos (el código está disponible en línea ) es un contador compartido. El "no escalable" debería funcionar bien en varias docenas de procesos: el contador es una matriz de 0..size-1 de enteros, uno por rango, y luego la operación `obtener siguiente artículo de trabajo ''consiste en bloquear la ventana, leer la contribución de todos los demás al mostrador (en este caso, cuántos elementos han tomado), actualizar la suya (++), cerrar la ventana y calcular el total. Todo esto se hace con operaciones pasivas de un solo lado. (El de mayor escala simplemente usa un árbol en lugar de un conjunto de 1 día).

Entonces, el uso sería si dijeras rango 0 host el contador, y todos siguen haciendo unidades de trabajo y actualizando el contador para obtener el siguiente hasta que no haya más trabajo; luego esperas en una barrera o algo y finalizas.

Una vez que tenga algo como esto -utilizando un valor compartido para obtener la siguiente unidad de trabajo disponible- trabajando, entonces puede generalizar a un enfoque más sofisticado. Así que, como sugirió suzterpatt, todos los que toman "su parte" de las unidades de trabajo al principio funcionan de maravilla, pero ¿qué hacer si algunos finalizan más rápido que otros? La respuesta habitual ahora es el robo de trabajo; todos guardan su lista de unidades de trabajo en una cola, y luego, cuando uno se queda sin trabajo, roba unidades de trabajo del otro lado de alguien más dequeue, hasta que no queda más trabajo. Esta es realmente la versión completamente distribuida del maestro trabajador, donde no hay más trabajo de partición maestro único. Una vez que haya funcionado un solo contador compartido, puede crear mutex a partir de ellos, y desde allí puede implementar el dequeue. Pero si el contador compartido simple funciona lo suficientemente bien, es posible que no necesite ir allí.

Actualización: Ok, aquí hay un hacky-intento de hacer el contador compartido: mi versión del sencillo en el libro MPI-2 parece funcionar, pero no diría nada mucho más fuerte que eso (no he jugado con esto por mucho tiempo). Hay una implementación de contador simple (correspondiente a la versión sin escala en el libro MPI-2) con dos pruebas simples, una correspondiente aproximadamente a su caso de trabajo; cada elemento actualiza el contador para obtener un elemento de trabajo, luego hace el "trabajo" (duerme durante un período de tiempo aleatorio). Al final de cada prueba, se imprime la estructura de datos del contador, que es el número de incrementos que cada rango ha hecho.

#include <mpi.h> #include <stdlib.h> #include <stdio.h> #include <unistd.h> struct mpi_counter_t { MPI_Win win; int hostrank ; int myval; int *data; int rank, size; }; struct mpi_counter_t *create_counter(int hostrank) { struct mpi_counter_t *count; count = (struct mpi_counter_t *)malloc(sizeof(struct mpi_counter_t)); count->hostrank = hostrank; MPI_Comm_rank(MPI_COMM_WORLD, &(count->rank)); MPI_Comm_size(MPI_COMM_WORLD, &(count->size)); if (count->rank == hostrank) { MPI_Alloc_mem(count->size * sizeof(int), MPI_INFO_NULL, &(count->data)); for (int i=0; i<count->size; i++) count->data[i] = 0; MPI_Win_create(count->data, count->size * sizeof(int), sizeof(int), MPI_INFO_NULL, MPI_COMM_WORLD, &(count->win)); } else { count->data = NULL; MPI_Win_create(count->data, 0, 1, MPI_INFO_NULL, MPI_COMM_WORLD, &(count->win)); } count -> myval = 0; return count; } int increment_counter(struct mpi_counter_t *count, int increment) { int *vals = (int *)malloc( count->size * sizeof(int) ); int val; MPI_Win_lock(MPI_LOCK_EXCLUSIVE, count->hostrank, 0, count->win); for (int i=0; i<count->size; i++) { if (i == count->rank) { MPI_Accumulate(&increment, 1, MPI_INT, 0, i, 1, MPI_INT, MPI_SUM, count->win); } else { MPI_Get(&vals[i], 1, MPI_INT, 0, i, 1, MPI_INT, count->win); } } MPI_Win_unlock(0, count->win); count->myval += increment; vals[count->rank] = count->myval; val = 0; for (int i=0; i<count->size; i++) val += vals[i]; free(vals); return val; } void delete_counter(struct mpi_counter_t **count) { if ((*count)->rank == (*count)->hostrank) { MPI_Free_mem((*count)->data); } MPI_Win_free(&((*count)->win)); free((*count)); *count = NULL; return; } void print_counter(struct mpi_counter_t *count) { if (count->rank == count->hostrank) { for (int i=0; i<count->size; i++) { printf("%2d ", count->data[i]); } puts(""); } } int test1() { struct mpi_counter_t *c; int rank; int result; c = create_counter(0); MPI_Comm_rank(MPI_COMM_WORLD, &rank); result = increment_counter(c, 1); printf("%d got counter %d/n", rank, result); MPI_Barrier(MPI_COMM_WORLD); print_counter(c); delete_counter(&c); } int test2() { const int WORKITEMS=50; struct mpi_counter_t *c; int rank; int result = 0; c = create_counter(0); MPI_Comm_rank(MPI_COMM_WORLD, &rank); srandom(rank); while (result < WORKITEMS) { result = increment_counter(c, 1); if (result <= WORKITEMS) { printf("%d working on item %d.../n", rank, result); sleep(random() % 10); } else { printf("%d done/n", rank); } } MPI_Barrier(MPI_COMM_WORLD); print_counter(c); delete_counter(&c); } int main(int argc, char **argv) { MPI_Init(&argc, &argv); test1(); test2(); MPI_Finalize(); }