una requisitos reglas qué para necesito hsbc cuenta ahorro abrir java multithreading concurrency countdownlatch phaser

requisitos - reglas de java



¿Cuenta flexible para abajo? (6)

He encontrado un problema dos veces por el cual un hilo productor produce N elementos de trabajo, los envía a un ExecutorService y luego tiene que esperar hasta que se hayan procesado todos los N elementos.

Advertencias

  • N no se conoce de antemano . Si fuera, simplemente crearía CountDownLatch y luego tendría un hilo de productor await() hasta que todo el trabajo estuviera completo.
  • El uso de CompletionService es apropiado porque, aunque mi hilo productor debe bloquear (es decir, llamando a take() ), no hay forma de indicar que todo el trabajo está completo , para que el hilo productor deje de esperar.

Mi solución preferida actual es usar un contador de enteros, e incrementarlo cada vez que se envíe un elemento de trabajo y disminuirlo cuando se procesa un elemento de trabajo. Después de la suscripción de todas las N tareas, mi hilo productor deberá esperar en un bloqueo, verificando si el counter == 0 cada vez que se notifica. El / los hilo (s) del consumidor deberán notificar al productor si ha disminuido el contador y el nuevo valor es 0.

¿Hay un mejor enfoque para este problema o hay una construcción adecuada en java.util.concurrent que debería usar en lugar de "rodar la mía"?

Gracias por adelantado.


He usado un ExecutorCompletionService para algo como esto:

ExecutorCompletionService executor = ...; int count = 0; while (...) { executor.submit(new Processor()); count++; } //Now, pull the futures out of the queue: for (int i = 0; i < count; i++) { executor.take().get(); }

Esto implica mantener una cola de tareas que se han enviado, por lo que si su lista es arbitrariamente larga, su método podría ser mejor.

Pero asegúrese de usar un AtomicInteger para la coordinación, de modo que pueda incrementarlo en un subproceso y decrementarlo en los subprocesos de trabajo.


Lo que describiste es muy similar a usar un semáforo estándar pero usado ''hacia atrás''.

  • Tu semáforo comienza con 0 permisos.
  • Cada unidad de trabajo libera un permiso cuando se completa.
  • Bloqueas a la espera de adquirir N permisos.

Además, tiene la flexibilidad de adquirir solo M <N permisos, lo que es útil si desea verificar el estado intermedio. Por ejemplo, estoy probando una cola de mensajes acotados asíncronos, por lo que espero que la cola esté llena para algunos M <N, por lo que puedo adquirir M y comprobar que la cola está realmente llena y luego adquirir los permisos N - M restantes después de consumir los mensajes del cola.


Método independiente de Java 8+ que hace exactamente esto para un Stream utilizando un Phaser un solo paso. Iterator variantes de Iterator / Iterable son exactamente las mismas.

public static <X> int submitAndWait(Stream<X> items, Consumer<X> handleItem, Executor executor) { Phaser phaser = new Phaser(1); // 1 = register ourselves items.forEach(item -> { phaser.register(); // new task executor.execute(() -> { handleItem.accept(item); phaser.arrive(); // completed task }); }); phaser.arriveAndAwaitAdvance(); // block until all tasks are complete return phaser.getRegisteredParties()-1; // number of items } ... int recognised = submitAndWait(facesInPicture, this::detectFace, executor)

Para tu información Esto está bien para eventos singulares pero, si esto se llama simultáneamente, una solución que involucre a ForkJoinPool detendrá el bloqueo del hilo primario.


Por supuesto, podría usar un CountDownLatch protegido por una AtomicReference para que sus tareas se AtomicReference así:

public class MyTask extends Runnable { private final Runnable r; public MyTask(Runnable r, AtomicReference<CountDownLatch> l) { this.r = r; } public void run() { r.run(); while (l.get() == null) Thread.sleep(1000L); //handle Interrupted l.get().countDown(); } }

Observe que las tareas ejecutan su trabajo y luego giran hasta que se establece la cuenta regresiva (es decir, se conoce el número total de tareas). Tan pronto como se establece la cuenta atrás, la cuenta regresiva y sale. Estos se envían de la siguiente manera:

AtomicReference<CountDownLatch> l = new AtomicReference<CountDownLatch>(); executor.submit(new MyTask(r, l));

Después del punto de creación / envío de su trabajo, cuando sepa cuántas tareas ha creado :

latch.set(new CountDownLatch(nTasks)); latch.get().await();


Supongo que su productor no necesita saber cuándo la cola está vacía, sino que debe saber cuándo se ha completado la última tarea.

Yo agregaría un waitforWorkDone(producer) al consumidor. El productor puede agregar sus N tareas y llamar al método de espera. El método de espera bloquea el subproceso entrante si la cola de trabajo no está vacía y no hay tareas ejecutándose en este momento.

Las hebras del consumidor notifyAll() en el bloqueo de espera si su tarea se ha completado, la cola está vacía y no se está ejecutando ninguna otra tarea.


java.util.concurrent.Phaser parece que funcionaría bien para ti. Está previsto que se lance en Java 7, pero la versión más estable se puede encontrar en el sitio web del grupo de interés de jsr166 .

El phaser es una barrera cíclica glorificada. Puede registrar N números de participantes y, cuando esté listo, esperar su avance en la fase específica.

Un rápido ejemplo de cómo funcionaría:

final Phaser phaser = new Phaser(); public Runnable getRunnable(){ return new Runnable(){ public void run(){ ..do stuff... phaser.arriveAndDeregister(); } }; } public void doWork(){ phaser.register();//register self for(int i=0 ; i < N; i++){ phaser.register(); // register this task prior to execution executor.submit( getRunnable()); } phaser.arriveAndAwaitAdvance(); }