invokeall - Manejo de excepciones de las tareas de Java ExecutorService
java executorservice thread pool (11)
ThreadPoolExecutor
usar la clase ThreadPoolExecutor
de Java para ejecutar una gran cantidad de tareas pesadas con un número fijo de hilos. Cada una de las tareas tiene muchos lugares durante los cuales puede fallar debido a excepciones.
He subclasificado ThreadPoolExecutor
y he anulado el método afterExecute
, que se supone que proporciona las excepciones no detectadas encontradas al ejecutar una tarea. Sin embargo, parece que no puedo hacer que funcione.
Por ejemplo:
public class ThreadPoolErrors extends ThreadPoolExecutor {
public ThreadPoolErrors() {
super( 1, // core threads
1, // max threads
1, // timeout
TimeUnit.MINUTES, // timeout units
new LinkedBlockingQueue<Runnable>() // work queue
);
}
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if(t != null) {
System.out.println("Got an error: " + t);
} else {
System.out.println("Everything''s fine--situation normal!");
}
}
public static void main( String [] args) {
ThreadPoolErrors threadPool = new ThreadPoolErrors();
threadPool.submit(
new Runnable() {
public void run() {
throw new RuntimeException("Ouch! Got an error.");
}
}
);
threadPool.shutdown();
}
}
La salida de este programa es "Todo está bien - ¡situación normal!" aunque el único Runnable enviado al grupo de subprocesos arroja una excepción. ¿Alguna pista de lo que está pasando aquí?
¡Gracias!
De los docs :
Nota: Cuando las acciones se envuelven en tareas (como FutureTask) de forma explícita o mediante métodos como enviar, estos objetos de tareas capturan y mantienen excepciones computacionales, por lo que no causan una terminación abrupta, y las excepciones internas no se pasan a este método .
Cuando envía un Runnable, queda envuelto en un futuro.
Su AfterExecute debería ser algo como esto:
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t == null && r instanceof Future<?>) {
try {
Future<?> future = (Future<?>) r;
if (future.isDone()) {
future.get();
}
} catch (CancellationException ce) {
t = ce;
} catch (ExecutionException ee) {
t = ee.getCause();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // ignore/reset
}
}
if (t != null) {
System.out.println(t);
}
}
En lugar de subclasificar ThreadPoolExecutor, le proporcionaría una instancia ThreadFactory que crea nuevos ThreadFactory y les proporciona un UncaughtExceptionHandler
Esto funciona
- Se deriva de SingleThreadExecutor, pero puedes adaptarlo fácilmente
- Código de Java 8 lamdas, pero fácil de arreglar
Creará un Ejecutor con un solo hilo, que puede obtener muchas tareas; y esperará a que la ejecución actual finalice para comenzar con la siguiente
En caso de error o excepción de uncaugth, el uncaughtExceptionHandler lo atrapará
public final class SingleThreadExecutorWithExceptions { public static ExecutorService newSingleThreadExecutorWithExceptions(final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) { ThreadFactory factory = (Runnable runnable) -> { final Thread newThread = new Thread(runnable, "SingleThreadExecutorWithExceptions"); newThread.setUncaughtExceptionHandler( (final Thread caugthThread,final Throwable throwable) -> { uncaughtExceptionHandler.uncaughtException(caugthThread, throwable); }); return newThread; }; return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), factory){ protected void afterExecute(Runnable runnable, Throwable throwable) { super.afterExecute(runnable, throwable); if (throwable == null && runnable instanceof Future) { try { Future future = (Future) runnable; if (future.isDone()) { future.get(); } } catch (CancellationException ce) { throwable = ce; } catch (ExecutionException ee) { throwable = ee.getCause(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); // ignore/reset } } if (throwable != null) { uncaughtExceptionHandler.uncaughtException(Thread.currentThread(),throwable); } } }); } private static class FinalizableDelegatedExecutorService extends DelegatedExecutorService { FinalizableDelegatedExecutorService(ExecutorService executor) { super(executor); } protected void finalize() { super.shutdown(); } } /** * A wrapper class that exposes only the ExecutorService methods * of an ExecutorService implementation. */ private static class DelegatedExecutorService extends AbstractExecutorService { private final ExecutorService e; DelegatedExecutorService(ExecutorService executor) { e = executor; } public void execute(Runnable command) { e.execute(command); } public void shutdown() { e.shutdown(); } public List shutdownNow() { return e.shutdownNow(); } public boolean isShutdown() { return e.isShutdown(); } public boolean isTerminated() { return e.isTerminated(); } public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return e.awaitTermination(timeout, unit); } public Future submit(Runnable task) { return e.submit(task); } public Future submit(Callable task) { return e.submit(task); } public Future submit(Runnable task, T result) { return e.submit(task, result); } public List> invokeAll(Collection> tasks) throws InterruptedException { return e.invokeAll(tasks); } public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { return e.invokeAll(tasks, timeout, unit); } public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { return e.invokeAny(tasks); } public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return e.invokeAny(tasks, timeout, unit); } } private SingleThreadExecutorWithExceptions() {} }
Esto se debe a AbstractExecutorService :: submit
está envolviendo su runnable
en RunnableFuture
(nada más que FutureTask
) como a continuación
AbstractExecutorService.java
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null); /////////HERE////////
execute(ftask);
return ftask;
}
Luego, execute
lo pasará a Worker
y Worker.run()
llamará al siguiente.
ThreadPoolExecutor.java
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); /////////HERE////////
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
Finalmente
task.run();
en la llamada de código anterior llamaremos aFutureTask.run()
. Aquí está el código del controlador de excepción, debido a esto NO está obteniendo la excepción esperada.
class FutureTask<V> implements RunnableFuture<V>
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) { /////////HERE////////
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
Estoy usando la clase VerboseRunnable
de jcabi-log , que se traga todas las excepciones y las registra. Muy conveniente, por ejemplo:
import com.jcabi.log.VerboseRunnable;
scheduler.scheduleWithFixedDelay(
new VerboseRunnable(
Runnable() {
public void run() {
// the code, which may throw
}
},
true // it means that all exceptions will be swallowed and logged
),
1, 1, TimeUnit.MILLISECONDS
);
La explicación de este comportamiento está en el javadoc para afterExecute :
Nota: Cuando las acciones se envuelven en tareas (como FutureTask) de forma explícita o mediante métodos como enviar, estos objetos de tareas capturan y mantienen excepciones computacionales, por lo que no causan una terminación abrupta, y las excepciones internas no se pasan a este método .
Lo solucioné envolviendo el ejecutable suministrado enviado al ejecutor.
CompletableFuture.runAsync(
() -> {
try {
runnable.run();
} catch (Throwable e) {
Log.info(Concurrency.class, "runAsync", e);
}
},
executorService
);
Otra solución sería usar ManagedTask y ManagedTaskListener .
Necesita un Callable o Runnable que implemente la interfaz ManagedTask .
El método getManagedTaskListener
devuelve la instancia que desea.
public ManagedTaskListener getManagedTaskListener() {
Y implementa en ManagedTaskListener el método taskDone
:
@Override
public void taskDone(Future<?> future, ManagedExecutorService executor, Object task, Throwable exception) {
if (exception != null) {
LOGGER.log(Level.SEVERE, exception.getMessage());
}
}
Más detalles sobre el ciclo de vida de la tarea administrada y el oyente .
Si desea supervisar la ejecución de la tarea, puede girar 1 o 2 hilos (quizás más dependiendo de la carga) y usarlos para tomar tareas de un contenedor ExecutionCompletionService.
Si su ExecutorService
proviene de una fuente externa (es decir, no es posible subclase ThreadPoolExecutor
y anular afterExecute()
), puede usar un proxy dinámico para lograr el comportamiento deseado:
public static ExecutorService errorAware(final ExecutorService executor) {
return (ExecutorService) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class[] {ExecutorService.class},
(proxy, method, args) -> {
if (method.getName().equals("submit")) {
final Object arg0 = args[0];
if (arg0 instanceof Runnable) {
args[0] = new Runnable() {
@Override
public void run() {
final Runnable task = (Runnable) arg0;
try {
task.run();
if (task instanceof Future<?>) {
final Future<?> future = (Future<?>) task;
if (future.isDone()) {
try {
future.get();
} catch (final CancellationException ce) {
// Your error-handling code here
ce.printStackTrace();
} catch (final ExecutionException ee) {
// Your error-handling code here
ee.getCause().printStackTrace();
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
} catch (final RuntimeException re) {
// Your error-handling code here
re.printStackTrace();
throw re;
} catch (final Error e) {
// Your error-handling code here
e.printStackTrace();
throw e;
}
}
};
} else if (arg0 instanceof Callable<?>) {
args[0] = new Callable<Object>() {
@Override
public Object call() throws Exception {
final Callable<?> task = (Callable<?>) arg0;
try {
return task.call();
} catch (final Exception e) {
// Your error-handling code here
e.printStackTrace();
throw e;
} catch (final Error e) {
// Your error-handling code here
e.printStackTrace();
throw e;
}
}
};
}
}
return method.invoke(executor, args);
});
}
ADVERTENCIA : se debe tener en cuenta que esta solución bloqueará el hilo de llamada.
Si desea procesar las excepciones lanzadas por la tarea, generalmente es mejor usar Callable
lugar de Runnable
.
Se permite que Callable.call()
arroje excepciones comprobadas, y éstas se propagan nuevamente a la Callable.call()
llamada:
Callable task = ...
Future future = executor.submit(task);
try {
future.get();
} catch (ExecutionException ex) {
ex.getCause().printStackTrace();
}
Si Callable.call()
arroja una excepción, esto será envuelto en una ExecutionException
y lanzado por Future.get()
.
Es probable que esto sea mucho más preferible a la subclasificación de ThreadPoolExecutor
. También le da la oportunidad de volver a enviar la tarea si la excepción es recuperable.