thread termino saber hilos ejemplo create como java multithreading concurrency executorservice

termino - executorservice java ejemplo



ExecutorService que interrumpe las tareas después de un tiempo de espera (8)

¿Qué le parece usar el método ExecutorService.shutDownNow() como se describe en http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html ? Parece ser la solución más simple.

Estoy buscando una implementación ExecutorService que se puede proporcionar con un tiempo de espera. Las tareas que se envían al ExecutorService se interrumpen si tardan más tiempo que el tiempo de espera en ejecutarse. Implementar tal bestia no es una tarea tan difícil, pero me pregunto si alguien sabe de una implementación existente.

Esto es lo que surgió en función de algunos de los debates a continuación. ¿Algún comentario?

import java.util.List; import java.util.concurrent.*; public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor { private final long timeout; private final TimeUnit timeoutUnit; private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor(); private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>(); public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); this.timeout = timeout; this.timeoutUnit = timeoutUnit; } public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); this.timeout = timeout; this.timeoutUnit = timeoutUnit; } public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); this.timeout = timeout; this.timeoutUnit = timeoutUnit; } public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); this.timeout = timeout; this.timeoutUnit = timeoutUnit; } @Override public void shutdown() { timeoutExecutor.shutdown(); super.shutdown(); } @Override public List<Runnable> shutdownNow() { timeoutExecutor.shutdownNow(); return super.shutdownNow(); } @Override protected void beforeExecute(Thread t, Runnable r) { if(timeout > 0) { final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit); runningTasks.put(r, scheduled); } } @Override protected void afterExecute(Runnable r, Throwable t) { ScheduledFuture timeoutTask = runningTasks.remove(r); if(timeoutTask != null) { timeoutTask.cancel(false); } } class TimeoutTask implements Runnable { private final Thread thread; public TimeoutTask(Thread thread) { this.thread = thread; } @Override public void run() { thread.interrupt(); } } }


¿Qué pasa con esta idea alternativa?

  • dos tienen dos ejecutores:
    • uno para :
      • enviar la tarea, sin preocuparse por el tiempo de espera de la tarea
      • agregando el futuro resultado y el momento en que debería terminar en una estructura interna
    • uno para ejecutar un trabajo interno que verifica la estructura interna si algunas tareas tienen un tiempo de espera y si deben cancelarse.

Pequeña muestra está aquí:

public class AlternativeExecutorService { private final CopyOnWriteArrayList<ListenableFutureTask> futureQueue = new CopyOnWriteArrayList(); private final ScheduledThreadPoolExecutor scheduledExecutor = new ScheduledThreadPoolExecutor(1); // used for internal cleaning job private final ListeningExecutorService threadExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5)); // used for private ScheduledFuture scheduledFuture; private static final long INTERNAL_JOB_CLEANUP_FREQUENCY = 1000L; public AlternativeExecutorService() { scheduledFuture = scheduledExecutor.scheduleAtFixedRate(new TimeoutManagerJob(), 0, INTERNAL_JOB_CLEANUP_FREQUENCY, TimeUnit.MILLISECONDS); } public void pushTask(OwnTask task) { ListenableFuture<Void> future = threadExecutor.submit(task); // -> create your Callable futureQueue.add(new ListenableFutureTask(future, task, getCurrentMillisecondsTime())); // -> store the time when the task should end } public void shutdownInternalScheduledExecutor() { scheduledFuture.cancel(true); scheduledExecutor.shutdownNow(); } long getCurrentMillisecondsTime() { return Calendar.getInstance().get(Calendar.MILLISECOND); } class ListenableFutureTask { private final ListenableFuture<Void> future; private final OwnTask task; private final long milliSecEndTime; private ListenableFutureTask(ListenableFuture<Void> future, OwnTask task, long milliSecStartTime) { this.future = future; this.task = task; this.milliSecEndTime = milliSecStartTime + task.getTimeUnit().convert(task.getTimeoutDuration(), TimeUnit.MILLISECONDS); } ListenableFuture<Void> getFuture() { return future; } OwnTask getTask() { return task; } long getMilliSecEndTime() { return milliSecEndTime; } } class TimeoutManagerJob implements Runnable { CopyOnWriteArrayList<ListenableFutureTask> getCopyOnWriteArrayList() { return futureQueue; } @Override public void run() { long currentMileSecValue = getCurrentMillisecondsTime(); for (ListenableFutureTask futureTask : futureQueue) { consumeFuture(futureTask, currentMileSecValue); } } private void consumeFuture(ListenableFutureTask futureTask, long currentMileSecValue) { ListenableFuture<Void> future = futureTask.getFuture(); boolean isTimeout = futureTask.getMilliSecEndTime() >= currentMileSecValue; if (isTimeout) { if (!future.isDone()) { future.cancel(true); } futureQueue.remove(futureTask); } } } class OwnTask implements Callable<Void> { private long timeoutDuration; private TimeUnit timeUnit; OwnTask(long timeoutDuration, TimeUnit timeUnit) { this.timeoutDuration = timeoutDuration; this.timeUnit = timeUnit; } @Override public Void call() throws Exception { // do logic return null; } public long getTimeoutDuration() { return timeoutDuration; } public TimeUnit getTimeUnit() { return timeUnit; } } }


Después de una tonelada de tiempo para inspeccionar,
Finalmente, invokeAll método invokeAll de ExecutorService para resolver este problema.
Eso interrumpirá estrictamente la tarea mientras se ejecuta la tarea.
Aquí hay un ejemplo

ExecutorService executorService = Executors.newCachedThreadPool(); try { List<Callable<Object>> callables = new ArrayList<>(); // Add your long time task (callable) callables.add(new VaryLongTimeTask()); // Assign tasks for specific execution timeout (e.g. 2 sec) List<Future<Object>> futures = executorService.invokeAll(callables, 2000, TimeUnit.MILLISECONDS); for (Future<Object> future : futures) { // Getting result } } catch (InterruptedException e) { e.printStackTrace(); } executorService.shutdown();

El profesional es que también puede enviar ListenableFuture al mismo ExecutorService .
Simplemente cambie ligeramente la primera línea de código.

ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());

ListeningExecutorService es la función de Listening de ExecutorService en google guava project ( com.google.guava ))


Envuelva la tarea en FutureTask y puede especificar el tiempo de espera para FutureTask. Mira el ejemplo en mi respuesta a esta pregunta,

java native Process timeout


Lamentablemente, la solución es defectuosa. Hay un tipo de error con ScheduledThreadPoolExecutor , también se informa en esta pregunta : la cancelación de una tarea enviada no libera por completo los recursos de memoria asociados con la tarea; los recursos se lanzan solo cuando la tarea expira.

Si, por lo tanto, crea un TimeoutThreadPoolExecutor con un tiempo de caducidad bastante largo (un uso típico) y envía tareas lo suficientemente rápido, termina llenando la memoria, incluso si las tareas realmente se completaron con éxito.

Puede ver el problema con el siguiente programa de prueba (muy crudo):

public static void main(String[] args) throws InterruptedException { ExecutorService service = new TimeoutThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), 10, TimeUnit.MINUTES); //ExecutorService service = Executors.newFixedThreadPool(1); try { final AtomicInteger counter = new AtomicInteger(); for (long i = 0; i < 10000000; i++) { service.submit(new Runnable() { @Override public void run() { counter.incrementAndGet(); } }); if (i % 10000 == 0) { System.out.println(i + "/" + counter.get()); while (i > counter.get()) { Thread.sleep(10); } } } } finally { service.shutdown(); } }

El programa agota la memoria disponible, aunque espera a que se complete el Runnable s generado.

Pensé en esto por un tiempo, pero lamentablemente no pude encontrar una buena solución.

EDITAR: descubrí que este problema se informó como error JDK 6602600 , y parece que se ha solucionado recientemente.


Parece que el problema no está en el error JDK 6602600 (se resolvió en 2010-05-22), sino en una llamada incorrecta de suspensión (10) en círculo. Además, note que el hilo principal debe dar CHANCE directamente a otros hilos para realizar sus tareas por invocación SLEEP (0) en CADA rama del círculo exterior. Creo que es mejor utilizar Thread.yield () en lugar de Thread.sleep (0)

El resultado corregido parte del código de problema anterior es tal como este:

....................... ........................ Thread.yield(); if (i % 1000== 0) { System.out.println(i + "/" + counter.get()+ "/"+service.toString()); } // // while (i > counter.get()) { // Thread.sleep(10); // }

Funciona correctamente con la cantidad de contador externo de hasta 150 000 000 círculos probados.


Puede usar un ScheduledExecutorService para esto. Primero lo enviaría solo una vez para comenzar inmediatamente y retener el futuro que se crea. Después de eso, puede enviar una nueva tarea que cancelaría el futuro retenido después de un período de tiempo.

ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); final Future handler = executor.submit(new Callable(){ ... }); executor.schedule(new Runnable(){ public void run(){ handler.cancel(); } }, 10000, TimeUnit.MILLISECONDS);

Esto ejecutará su controlador (la funcionalidad principal será interrumpida) por 10 segundos, luego cancelará (es decir, interrumpirá) esa tarea específica.


Usando la respuesta de John W, creé una implementación que iniciaba correctamente el tiempo de espera cuando la tarea inicia su ejecución. Incluso escribo una prueba unitaria para ello :)

Sin embargo, no satisface mis necesidades, ya que algunas operaciones IO no interrumpen cuando se llama a Future.cancel() es decir, cuando se Thread.interrupted() ).

De todos modos, si alguien está interesado, creé una esencia: https://gist.github.com/amanteaux/64c54a913c1ae34ad7b86db109cbc0bf