java - await - Cómo interrumpir la ejecución subyacente de CompletableFuture
java 8 promise (4)
¿Qué pasa?
/** @return {@link CompletableFuture} which when cancelled will interrupt the supplier
*/
public static <T> CompletableFuture<T> supplyAsyncInterruptibly(Supplier<T> supplier, Executor executor) {
return produceInterruptibleCompletableFuture((s) -> CompletableFuture.supplyAsync(s, executor), supplier);
}
// in case we want to do the same for similar methods later
private static <T> CompletableFuture<T> produceInterruptibleCompletableFuture(
Function<Supplier<T>,CompletableFuture<T>> completableFutureAsyncSupplier, Supplier<T> action) {
FutureTask<T> task = new FutureTask<>(action::get);
return addCancellationAction(completableFutureAsyncSupplier.apply(asSupplier(task)), () ->
task.cancel(true));
}
/** Ensures the specified action is executed if the given {@link CompletableFuture} is cancelled.
*/
public static <T> CompletableFuture<T> addCancellationAction(CompletableFuture<T> completableFuture,
@NonNull Runnable onCancellationAction) {
completableFuture.whenComplete((result, throwable) -> {
if (completableFuture.isCancelled()) {
onCancellationAction.run();
}
});
return completableFuture; // return original CompletableFuture
}
/** @return {@link Supplier} wrapper for the given {@link RunnableFuture} which calls {@link RunnableFuture#run()}
* followed by {@link RunnableFuture#get()}.
*/
public static <T> Supplier<T> asSupplier(RunnableFuture<T> futureTask) throws CompletionException {
return () -> {
try {
futureTask.run();
try {
return futureTask.get();
} catch (ExecutionException e) { // unwrap ExecutionExceptions
final Throwable cause = e.getCause();
throw (cause != null) ? cause : e;
}
} catch (CompletionException e) {
throw e;
} catch (Throwable t) {
throw new CompletionException(t);
}
};
}
Sé que el diseño de CompletableFuture
no controla su ejecución con interrupciones, pero supongo que algunos de ustedes pueden tener este problema. CompletableFuture
s es una muy buena manera de componer una ejecución asíncrona, pero dado el caso en el que desea que la ejecución subyacente se interrumpa o se detenga cuando se cancele el futuro, ¿cómo lo hacemos? ¿O simplemente debemos aceptar que cualquier CompletableFuture
cancelado o completado manualmente no afectará el trabajo que se realiza allí para completarlo?
Eso es, en mi opinión, obviamente un trabajo inútil que toma tiempo del ejecutor trabajador. Me pregunto qué enfoque o diseño podría ayudar en este caso?
ACTUALIZAR
Aquí hay una prueba simple para esto.
public class SimpleTest {
@Test
public void testCompletableFuture() throws Exception {
CompletableFuture<Void> cf = CompletableFuture.runAsync(()->longOperation());
bearSleep(1);
//cf.cancel(true);
cf.complete(null);
System.out.println("it should die now already");
bearSleep(7);
}
public static void longOperation(){
System.out.println("started");
bearSleep(5);
System.out.println("completed");
}
private static void bearSleep(long seconds){
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
System.out.println("OMG!!! Interrupt!!!");
}
}
}
Por favor, vea mi respuesta a la pregunta relacionada: Transformar el futuro de Java en un CompletableFuture
En el código mencionado allí, el comportamiento de CompletionStage se agrega a la subclase RunnableFuture (utilizada por las implementaciones de ExecutorService), por lo que puede interrumpirlo de la manera correcta.
Que hay de esto
public static <T> CompletableFuture<T> supplyAsync(final Supplier<T> supplier) {
final ExecutorService executorService = Executors.newFixedThreadPool(1);
final CompletableFuture<T> cf = new CompletableFuture<T>() {
@Override
public boolean complete(T value) {
if (isDone()) {
return false;
}
executorService.shutdownNow();
return super.complete(value);
}
@Override
public boolean completeExceptionally(Throwable ex) {
if (isDone()) {
return false;
}
executorService.shutdownNow();
return super.completeExceptionally(ex);
}
};
// submit task
executorService.submit(() -> {
try {
cf.complete(supplier.get());
} catch (Throwable ex) {
cf.completeExceptionally(ex);
}
});
return cf;
}
Prueba simple:
CompletableFuture<String> cf = supplyAsync(() -> {
try {
Thread.sleep(1000L);
} catch (Exception e) {
System.out.println("got interrupted");
return "got interrupted";
}
System.out.println("normal complete");
return "normal complete";
});
cf.complete("manual complete");
System.out.println(cf.get());
No me gusta la idea de tener que crear un servicio Executor cada vez, pero tal vez puedas encontrar una manera de reutilizar el ForkJoinPool.
Un CompletableFuture
no está relacionado con la acción asíncrona que eventualmente puede completarla.
Dado que (a diferencia de
FutureTask
) esta clase no tiene control directo sobre el cálculo que hace que se complete, la cancelación se trata simplemente como otra forma de finalización excepcional. El método decancel
tiene el mismo efecto quecompleteExceptionally(new CancellationException())
.
Puede que ni siquiera haya un hilo separado trabajando para completarlo (incluso puede haber muchos subprocesos trabajando en él). Incluso si existe, no hay un enlace desde CompletableFuture
a ningún hilo que tenga una referencia a él.
Como tal, no hay nada que puedas hacer a través de CompletableFuture
para interrumpir cualquier hilo que pueda estar ejecutando alguna tarea que lo complete. Tendrá que escribir su propia lógica que haga un seguimiento de cualquier instancia de Thread
que adquiera una referencia a CompletableFuture
con la intención de completarla.
Aquí hay un ejemplo del tipo de ejecución que creo que podrías evitar.
public static void main(String[] args) throws Exception {
ExecutorService service = Executors.newFixedThreadPool(1);
CompletableFuture<String> completable = new CompletableFuture<>();
Future<?> future = service.submit(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
if (Thread.interrupted()) {
return; // remains uncompleted
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
return; // remains uncompleted
}
}
completable.complete("done");
}
});
Thread.sleep(2000);
// not atomic across the two
boolean cancelled = future.cancel(true);
if (cancelled)
completable.cancel(true); // may not have been cancelled if execution has already completed
if (completable.isCancelled()) {
System.out.println("cancelled");
} else if (completable.isCompletedExceptionally()) {
System.out.println("exception");
} else {
System.out.println("success");
}
service.shutdown();
}
Esto supone que la tarea que se está ejecutando está configurada para manejar las interrupciones correctamente.