hilos ejemplo java concurrency executorservice

java - ejemplo - Controlando el orden de ejecución de la tarea con ExecutorService



executorservice pool java (8)

Tengo un proceso que delega tareas asíncronas a un grupo de subprocesos. Necesito asegurarme de que ciertas tareas se ejecuten en orden. Así por ejemplo

Las tareas llegan en orden

Tareas a1, b1, c1, d1, e1, a2, a3, b2, f1

Las tareas se pueden ejecutar en cualquier orden, excepto cuando existe una dependencia natural, por lo que a1, a2, a3 se deben procesar en ese orden, ya sea asignando el mismo hilo o bloqueando esto hasta que sepa que se completó la tarea # anterior.

Actualmente, no utiliza el paquete Java Concurrency, pero estoy considerando cambiar para aprovechar la administración de subprocesos.

¿Alguien tiene una solución similar o sugerencias de cómo lograr esto?


Cuando envía un Runnable o un Callable a un ExecutorService , recibe un Future a cambio. Haga que los hilos que dependen de a1 pasen a Future de a1 y llame a Future.get() . Esto bloqueará hasta que el hilo se complete.

Asi que:

ExecutorService exec = Executor.newFixedThreadPool(5); Runnable a1 = ... final Future f1 = exec.submit(a1); Runnable a2 = new Runnable() { @Override public void run() { f1.get(); ... // do stuff } } exec.submit(a2);

y así.


En la biblioteca Habanero-Java , hay un concepto de tareas controladas por datos que se pueden usar para expresar dependencias entre tareas y evitar operaciones de bloqueo de subprocesos. Bajo las portadas, la biblioteca de Java-Habanero utiliza el ForkJoinPool de JDK (es decir, un servicio de ejecución).

Por ejemplo, su caso de uso para las tareas A1, A2, A3, ... podría expresarse de la siguiente manera:

HjFuture a1 = future(() -> { doA1(); return true; }); HjFuture a2 = futureAwait(a1, () -> { doA2(); return true; }); HjFuture a3 = futureAwait(a2, () -> { doA3(); return true; });

Tenga en cuenta que a1, a2 y a3 son solo referencias a objetos de tipo HjFuture y pueden mantenerse en sus estructuras de datos personalizadas para especificar las dependencias a medida que las tareas A2 y A3 entran en tiempo de ejecución.

Hay algunas diapositivas tutoriales disponibles . Puede encontrar más documentación como javadoc , resumen de API y primers .


Escribo un Ejecutor propio que garantiza la ordenación de tareas para tareas con la misma clave. Utiliza un mapa de colas para tareas de pedido con la misma clave. Cada tarea con clave ejecuta la siguiente tarea con la misma clave.

¡Esta solución no maneja RejectedExecutionException u otras excepciones del ejecutor delegado! Así que el ejecutor delegado debe ser "ilimitado".

import java.util.HashMap; import java.util.LinkedList; import java.util.Map; import java.util.Queue; import java.util.concurrent.Executor; /** * This Executor warrants task ordering for tasks with same key (key have to implement hashCode and equal methods correctly). */ public class OrderingExecutor implements Executor{ private final Executor delegate; private final Map<Object, Queue<Runnable>> keyedTasks = new HashMap<Object, Queue<Runnable>>(); public OrderingExecutor(Executor delegate){ this.delegate = delegate; } @Override public void execute(Runnable task) { // task without key can be executed immediately delegate.execute(task); } public void execute(Runnable task, Object key) { if (key == null){ // if key is null, execute without ordering execute(task); return; } boolean first; Runnable wrappedTask; synchronized (keyedTasks){ Queue<Runnable> dependencyQueue = keyedTasks.get(key); first = (dependencyQueue == null); if (dependencyQueue == null){ dependencyQueue = new LinkedList<Runnable>(); keyedTasks.put(key, dependencyQueue); } wrappedTask = wrap(task, dependencyQueue, key); if (!first) dependencyQueue.add(wrappedTask); } // execute method can block, call it outside synchronize block if (first) delegate.execute(wrappedTask); } private Runnable wrap(Runnable task, Queue<Runnable> dependencyQueue, Object key) { return new OrderedTask(task, dependencyQueue, key); } class OrderedTask implements Runnable{ private final Queue<Runnable> dependencyQueue; private final Runnable task; private final Object key; public OrderedTask(Runnable task, Queue<Runnable> dependencyQueue, Object key) { this.task = task; this.dependencyQueue = dependencyQueue; this.key = key; } @Override public void run() { try{ task.run(); } finally { Runnable nextTask = null; synchronized (keyedTasks){ if (dependencyQueue.isEmpty()){ keyedTasks.remove(key); }else{ nextTask = dependencyQueue.poll(); } } if (nextTask!=null) delegate.execute(nextTask); } } } }


He creado un OrderingExecutor para este problema. Si pasa la misma clave al método execute () con diferentes runnables, la ejecución de los runnables con la misma clave será en el orden en que se llama a execute () y nunca se superpondrá.

import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; /** * Special executor which can order the tasks if a common key is given. * Runnables submitted with non-null key will guaranteed to run in order for the same key. * */ public class OrderedExecutor { private static final Queue<Runnable> EMPTY_QUEUE = new QueueWithHashCodeAndEquals<Runnable>( new ConcurrentLinkedQueue<Runnable>()); private ConcurrentMap<Object, Queue<Runnable>> taskMap = new ConcurrentHashMap<Object, Queue<Runnable>>(); private Executor delegate; private volatile boolean stopped; public OrderedExecutor(Executor delegate) { this.delegate = delegate; } public void execute(Runnable runnable, Object key) { if (stopped) { return; } if (key == null) { delegate.execute(runnable); return; } Queue<Runnable> queueForKey = taskMap.computeIfPresent(key, (k, v) -> { v.add(runnable); return v; }); if (queueForKey == null) { // There was no running task with this key Queue<Runnable> newQ = new QueueWithHashCodeAndEquals<Runnable>(new ConcurrentLinkedQueue<Runnable>()); newQ.add(runnable); // Use putIfAbsent because this execute() method can be called concurrently as well queueForKey = taskMap.putIfAbsent(key, newQ); if (queueForKey != null) queueForKey.add(runnable); delegate.execute(new InternalRunnable(key)); } } public void shutdown() { stopped = true; taskMap.clear(); } /** * Own Runnable used by OrderedExecutor. * The runnable is associated with a specific key - the Queue&lt;Runnable> for this * key is polled. * If the queue is empty, it tries to remove the queue from taskMap. * */ private class InternalRunnable implements Runnable { private Object key; public InternalRunnable(Object key) { this.key = key; } @Override public void run() { while (true) { // There must be at least one task now Runnable r = taskMap.get(key).poll(); while (r != null) { r.run(); r = taskMap.get(key).poll(); } // The queue emptied // Remove from the map if and only if the queue is really empty boolean removed = taskMap.remove(key, EMPTY_QUEUE); if (removed) { // The queue has been removed from the map, // if a new task arrives with the same key, a new InternalRunnable // will be created break; } // If the queue has not been removed from the map it means that someone put a task into it // so we can safely continue the loop } } } /** * Special Queue implementation, with equals() and hashCode() methods. * By default, Java SE queues use identity equals() and default hashCode() methods. * This implementation uses Arrays.equals(Queue::toArray()) and Arrays.hashCode(Queue::toArray()). * * @param <E> The type of elements in the queue. */ private static class QueueWithHashCodeAndEquals<E> implements Queue<E> { private Queue<E> delegate; public QueueWithHashCodeAndEquals(Queue<E> delegate) { this.delegate = delegate; } public boolean add(E e) { return delegate.add(e); } public boolean offer(E e) { return delegate.offer(e); } public int size() { return delegate.size(); } public boolean isEmpty() { return delegate.isEmpty(); } public boolean contains(Object o) { return delegate.contains(o); } public E remove() { return delegate.remove(); } public E poll() { return delegate.poll(); } public E element() { return delegate.element(); } public Iterator<E> iterator() { return delegate.iterator(); } public E peek() { return delegate.peek(); } public Object[] toArray() { return delegate.toArray(); } public <T> T[] toArray(T[] a) { return delegate.toArray(a); } public boolean remove(Object o) { return delegate.remove(o); } public boolean containsAll(Collection<?> c) { return delegate.containsAll(c); } public boolean addAll(Collection<? extends E> c) { return delegate.addAll(c); } public boolean removeAll(Collection<?> c) { return delegate.removeAll(c); } public boolean retainAll(Collection<?> c) { return delegate.retainAll(c); } public void clear() { delegate.clear(); } @Override public boolean equals(Object obj) { if (!(obj instanceof QueueWithHashCodeAndEquals)) { return false; } QueueWithHashCodeAndEquals<?> other = (QueueWithHashCodeAndEquals<?>) obj; return Arrays.equals(toArray(), other.toArray()); } @Override public int hashCode() { return Arrays.hashCode(toArray()); } } }



Otra opción es crear su propio ejecutor, llamarlo OrderedExecutor, y crear una matriz de objetos encapsulados ThreadPoolExecutor, con 1 hilo por ejecutor interno. A continuación, proporciona un mecanismo para elegir uno de los objetos internos, por ejemplo, puede hacerlo proporcionando una interfaz que el usuario de su clase pueda implementar:

executor = new OrderedExecutor( 10 /* pool size */, new OrderedExecutor.Chooser() { public int choose( Runnable runnable ) { MyRunnable myRunnable = (MyRunnable)runnable; return myRunnable.someId(); }); executor.execute( new MyRunnable() );

La implementación de OrderedExecutor.execute () utilizará el Selector para obtener un int, modificará esto con el tamaño del grupo y ese será su índice en la matriz interna. La idea es que "someId ()" devolverá el mismo valor para todas las "a", etc.


Puede usar Executors.newSingleThreadExecutor (), pero solo usará un hilo para ejecutar sus tareas. Otra opción es usar CountDownLatch. Aquí hay un ejemplo simple:

public class Main2 { public static void main(String[] args) throws InterruptedException { final CountDownLatch cdl1 = new CountDownLatch(1); final CountDownLatch cdl2 = new CountDownLatch(1); final CountDownLatch cdl3 = new CountDownLatch(1); List<Runnable> list = new ArrayList<Runnable>(); list.add(new Runnable() { public void run() { System.out.println("Task 1"); // inform that task 1 is finished cdl1.countDown(); } }); list.add(new Runnable() { public void run() { // wait until task 1 is finished try { cdl1.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Task 2"); // inform that task 2 is finished cdl2.countDown(); } }); list.add(new Runnable() { public void run() { // wait until task 2 is finished try { cdl2.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Task 3"); // inform that task 3 is finished cdl3.countDown(); } }); ExecutorService es = Executors.newFixedThreadPool(200); for (int i = 0; i < 3; i++) { es.submit(list.get(i)); } es.shutdown(); es.awaitTermination(1, TimeUnit.MINUTES); } }


Cuando he hecho esto en el pasado, por lo general, la ordenación se maneja mediante un componente que luego envía callables / runnables a un Ejecutor.

Algo como.

  • Tengo una lista de tareas para ejecutar, algunas con dependencias.
  • Cree un Ejecutor y envuélvalo con un ExecutorCompletionService
  • Busque todas las tareas, cualquiera que no tenga dependencias, programe a través del servicio de finalización
  • Encuesta el servicio de finalización
  • A medida que cada tarea se completa
    • Agregarlo a una lista "completada"
    • Vuelva a evaluar cualquier tarea en espera que se envía a la "lista completa" para ver si están "en dependencia completa". Si es así programarlos
    • Enjuague repita hasta que todas las tareas estén enviadas / completadas

El servicio de finalización es una buena forma de poder realizar las tareas a medida que se completan, en lugar de intentar sondear un montón de futuros. Sin embargo, es probable que desee mantener un Map<Future, TaskIdentifier> que se rellena cuando se programa una tarea a través del servicio de finalización, de modo que cuando el servicio de finalización le brinde un Future completo, puede averiguar qué TaskIdentifier es.

Si alguna vez se encuentra en un estado en el que las tareas aún están pendientes de ejecución, pero no se ejecuta nada y no se puede programar nada, entonces tiene un problema de dependencia circular.