hilos - thread pool java
¿Hay un ExecutorService que usa el hilo actual? (6)
Aquí hay una implementación de Executor
realmente simple (no ExecutorService
, mind you) que solo usa el hilo actual. Robando esto de "Concurrencia de Java en la práctica" (lectura esencial).
public class CurrentThreadExecutor implements Executor {
public void execute(Runnable r) {
r.run();
}
}
ExecutorService
es una interfaz más elaborada, pero podría manejarse con el mismo enfoque.
Lo que busco es una forma compatible de configurar el uso de un grupo de subprocesos o no. Lo ideal es que el resto del código no se vea afectado en absoluto. Podría usar un grupo de hilos con 1 hilo pero eso no es exactamente lo que quiero. ¿Algunas ideas?
ExecutorService es = threads == 0 ? new CurrentThreadExecutor() : Executors.newThreadPoolExecutor(threads);
// es.execute / es.submit / new ExecutorCompletionService(es) etc
Escribí un ExecutorService basado en el AbstractExecutorService.
/**
* Executes all submitted tasks directly in the same thread as the caller.
*/
public class SameThreadExecutorService extends AbstractExecutorService {
//volatile because can be viewed by other threads
private volatile boolean terminated;
@Override
public void shutdown() {
terminated = true;
}
@Override
public boolean isShutdown() {
return terminated;
}
@Override
public boolean isTerminated() {
return terminated;
}
@Override
public boolean awaitTermination(long theTimeout, TimeUnit theUnit) throws InterruptedException {
shutdown(); // TODO ok to call shutdown? what if the client never called shutdown???
return terminated;
}
@Override
public List<Runnable> shutdownNow() {
return Collections.emptyList();
}
@Override
public void execute(Runnable theCommand) {
theCommand.run();
}
}
Estilo Java 8:
Executor e = Runnable::run;
Puede usar RejectedExecutionHandler para ejecutar la tarea en el hilo actual.
public static final ThreadPoolExecutor CURRENT_THREAD_EXECUTOR = new ThreadPoolExecutor(0, 0, 0, TimeUnit.DAYS, new SynchronousQueue<Runnable>(), new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
r.run();
}
});
Solo necesitas uno de estos.
Puede utilizar MoreExecutors.newDirectExecutorService()
, o MoreExecutors.directExecutor()
si no necesita un ExecutorService
.
Si incluir guayabas es demasiado pesado, puedes implementar algo casi tan bueno:
public final class SameThreadExecutorService extends ThreadPoolExecutor {
private final CountDownLatch signal = new CountDownLatch(1);
private SameThreadExecutorService() {
super(1, 1, 0, TimeUnit.DAYS, new SynchronousQueue<Runnable>(),
new ThreadPoolExecutor.CallerRunsPolicy());
}
@Override public void shutdown() {
super.shutdown();
signal.countDown();
}
public static ExecutorService getInstance() {
return SingletonHolder.instance;
}
private static class SingletonHolder {
static ExecutorService instance = createInstance();
}
private static ExecutorService createInstance() {
final SameThreadExecutorService instance
= new SameThreadExecutorService();
// The executor has one worker thread. Give it a Runnable that waits
// until the executor service is shut down.
// All other submitted tasks will use the RejectedExecutionHandler
// which runs tasks using the caller''s thread.
instance.submit(new Runnable() {
@Override public void run() {
boolean interrupted = false;
try {
while (true) {
try {
instance.signal.await();
break;
} catch (InterruptedException e) {
interrupted = true;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}});
return Executors.unconfigurableScheduledExecutorService(instance);
}
}
Tuve que usar el mismo "CurrentThreadExecutorService" para fines de prueba y, aunque todas las soluciones sugeridas fueron agradables (particularmente la que menciona el modo Guava ), se me ocurrió algo similar a lo que Peter Lawrey sugirió here .
Como mencionó Axelle Ziegler here , desafortunadamente la solución de Peter no funcionará realmente debido a la comprobación introducida en ThreadPoolExecutor
en el parámetro del constructor maximumPoolSize
(es decir, maximumPoolSize
no puede ser <=0
).
Para eludir eso, hice lo siguiente:
private static ExecutorService currentThreadExecutorService() {
CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
return new ThreadPoolExecutor(0, 1, 0L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), callerRunsPolicy) {
@Override
public void execute(Runnable command) {
callerRunsPolicy.rejectedExecution(command, this);
}
};
}