threads thread practice example concurrent java multithreading concurrency

practice - java thread pool



Espere hasta que se complete cualquiera de Future<T> (7)

¿Por qué no crear una cola de resultados y esperar en la cola? O más simplemente, use un CompletionService ya que eso es lo que es: un ExecutorService + cola de resultados.

Tengo pocas tareas asíncronas en ejecución y tengo que esperar hasta que al menos una de ellas haya finalizado (en el futuro probablemente tenga que esperar a que se terminen las tareas de M de las N tareas). Actualmente se presentan como Futuros, entonces necesito algo como

/** * Blocks current thread until one of specified futures is done and returns it. */ public static <T> Future<T> waitForAny(Collection<Future<T>> futures) throws AllFuturesFailedException

¿Hay algo como esto? O algo similar, no es necesario para el futuro. Actualmente recorro la colección de futuros, verifico si uno ha terminado, luego duermo por un tiempo y vuelvo a verificar. Parece que esta no es la mejor solución, porque si duermo durante un período prolongado, entonces se agrega una demora no deseada. Si duermo durante un período corto, puede afectar el rendimiento.

Podría intentar usar

new CountDownLatch(1)

y disminuir la cuenta atrás cuando la tarea está completa y hacer

countdown.await()

, pero lo encontré posible solo si controlo la creación futura. Es posible, pero requiere un rediseño del sistema, porque actualmente la lógica de la creación de tareas (envío de llamadas a ExecutorService) está separada de la decisión de esperar para qué futuro. También podría anular

<T> RunnableFuture<T> AbstractExecutorService.newTaskFor(Callable<T> callable)

y crear implementación personalizada de RunnableFuture con capacidad de adjuntar listener para recibir notificaciones cuando finalice la tarea, luego adjuntar dicho oyente a las tareas necesarias y usar CountDownLatch, pero eso significa que tengo que anular newTaskFor para cada ExecutorService que uso, y posiblemente habrá implementación. que no extienden AbstractExecutorService. También podría intentar envolver ExecutorService con el mismo propósito, pero luego tengo que decorar todos los métodos que producen Futures.

Todas estas soluciones pueden funcionar pero parecen muy antinaturales. Parece que me falta algo simple, como

WaitHandle.WaitAny(WaitHandle[] waitHandles)

Cª#. ¿Hay alguna solución conocida para este tipo de problema?

ACTUALIZAR:

Originalmente no tenía acceso a la creación Future en absoluto, por lo que no había una solución elegante. Después de rediseñar el sistema, obtuve acceso a la creación futura y pude agregar countDownLatch.countdown () al proceso de ejecución, luego puedo contar DownLatch.await () y todo funciona bien. Gracias por otras respuestas, no sabía acerca de ExecutorCompletionService y de hecho puede ser útil en tareas similares, pero en este caso particular no se pudo usar porque algunos Futures se crean sin ningún ejecutor: la tarea real se envía a otro servidor a través de la red. se completa de forma remota y se recibe la notificación de finalización.


Como no le importa cuál finaliza, ¿por qué no tener un solo WaitHandle para todos los hilos y esperar eso? Cualquiera que termine primero puede establecer el mango.


Esto es bastante fácil con wait () y notifyAll ().

Primero, defina un objeto de bloqueo. (Puedes usar cualquier clase para esto, pero me gusta ser explícito):

package com.javadude.sample; public class Lock {}

Luego, defina su hilo de trabajo. Él debe notificar ese objeto de bloqueo cuando haya terminado su procesamiento. Tenga en cuenta que la notificación debe estar en un bloqueo de bloque sincronizado en el objeto de bloqueo.

package com.javadude.sample; public class Worker extends Thread { private Lock lock_; private long timeToSleep_; private String name_; public Worker(Lock lock, String name, long timeToSleep) { lock_ = lock; timeToSleep_ = timeToSleep; name_ = name; } @Override public void run() { // do real work -- using a sleep here to simulate work try { sleep(timeToSleep_); } catch (InterruptedException e) { interrupt(); } System.out.println(name_ + " is done... notifying"); // notify whoever is waiting, in this case, the client synchronized (lock_) { lock_.notify(); } } }

Finalmente, puedes escribir a tu cliente:

package com.javadude.sample; public class Client { public static void main(String[] args) { Lock lock = new Lock(); Worker worker1 = new Worker(lock, "worker1", 15000); Worker worker2 = new Worker(lock, "worker2", 10000); Worker worker3 = new Worker(lock, "worker3", 5000); Worker worker4 = new Worker(lock, "worker4", 20000); boolean started = false; int numNotifies = 0; while (true) { synchronized (lock) { try { if (!started) { // need to do the start here so we grab the lock, just // in case one of the threads is fast -- if we had done the // starts outside the synchronized block, a fast thread could // get to its notification *before* the client is waiting for it worker1.start(); worker2.start(); worker3.start(); worker4.start(); started = true; } lock.wait(); } catch (InterruptedException e) { break; } numNotifies++; if (numNotifies == 4) { break; } System.out.println("Notified!"); } } System.out.println("Everyone has notified me... I''m done"); } }


Por lo que sé, Java no tiene una estructura análoga al método WaitHandle.WaitAny .

Me parece que esto podría lograrse a través de un decorador "WaitableFuture":

public WaitableFuture<T> extends Future<T> { private CountDownLatch countDownLatch; WaitableFuture(CountDownLatch countDownLatch) { super(); this.countDownLatch = countDownLatch; } void doTask() { super.doTask(); this.countDownLatch.countDown(); } }

Aunque esto solo funcionaría si se puede insertar antes del código de ejecución, ya que de lo contrario el código de ejecución no tendría el nuevo método doTask() . Pero realmente no veo forma de hacer esto sin sondear si de alguna manera no puedes controlar el objeto Future antes de la ejecución.

O si el futuro siempre se ejecuta en su propio hilo, y de alguna manera puede obtener ese hilo. Luego podrías engendrar un nuevo hilo para unir el otro hilo, luego manejar el mecanismo de espera después de que regrese la unión ... Esto sería realmente feo y provocaría muchos sobrecargas. Y si algunos objetos futuros no terminan, puede tener una gran cantidad de hilos bloqueados dependiendo de los hilos muertos. Si no tiene cuidado, esto podría perder la memoria y los recursos del sistema.

/** * Extremely ugly way of implementing WaitHandle.WaitAny for Thread.Join(). */ public static joinAny(Collection<Thread> threads, int numberToWaitFor) { CountDownLatch countDownLatch = new CountDownLatch(numberToWaitFor); foreach(Thread thread in threads) { (new Thread(new JoinThreadHelper(thread, countDownLatch))).start(); } countDownLatch.await(); } class JoinThreadHelper implements Runnable { Thread thread; CountDownLatch countDownLatch; JoinThreadHelper(Thread thread, CountDownLatch countDownLatch) { this.thread = thread; this.countDownLatch = countDownLatch; } void run() { this.thread.join(); this.countDownLatch.countDown(); } }


Vea esta opción:

public class WaitForAnyRedux { private static final int POOL_SIZE = 10; public static <T> T waitForAny(Collection<T> collection) throws InterruptedException, ExecutionException { List<Callable<T>> callables = new ArrayList<Callable<T>>(); for (final T t : collection) { Callable<T> callable = Executors.callable(new Thread() { @Override public void run() { synchronized (t) { try { t.wait(); } catch (InterruptedException e) { } } } }, t); callables.add(callable); } BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(POOL_SIZE); ExecutorService executorService = new ThreadPoolExecutor(POOL_SIZE, POOL_SIZE, 0, TimeUnit.SECONDS, queue); return executorService.invokeAny(callables); } static public void main(String[] args) throws InterruptedException, ExecutionException { final List<Integer> integers = new ArrayList<Integer>(); for (int i = 0; i < POOL_SIZE; i++) { integers.add(i); } (new Thread() { public void run() { Integer notified = null; try { notified = waitForAny(integers); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println("notified=" + notified); } }).start(); synchronized (integers) { integers.wait(3000); } Integer randomInt = integers.get((new Random()).nextInt(POOL_SIZE)); System.out.println("Waking up " + randomInt); synchronized (randomInt) { randomInt.notify(); } } }