completionstage completablefuture await async allof java concurrency completable-future

java - await - Cómo interrumpir la ejecución subyacente de CompletableFuture



java 8 promise (4)

¿Qué pasa?

/** @return {@link CompletableFuture} which when cancelled will interrupt the supplier */ public static <T> CompletableFuture<T> supplyAsyncInterruptibly(Supplier<T> supplier, Executor executor) { return produceInterruptibleCompletableFuture((s) -> CompletableFuture.supplyAsync(s, executor), supplier); } // in case we want to do the same for similar methods later private static <T> CompletableFuture<T> produceInterruptibleCompletableFuture( Function<Supplier<T>,CompletableFuture<T>> completableFutureAsyncSupplier, Supplier<T> action) { FutureTask<T> task = new FutureTask<>(action::get); return addCancellationAction(completableFutureAsyncSupplier.apply(asSupplier(task)), () -> task.cancel(true)); } /** Ensures the specified action is executed if the given {@link CompletableFuture} is cancelled. */ public static <T> CompletableFuture<T> addCancellationAction(CompletableFuture<T> completableFuture, @NonNull Runnable onCancellationAction) { completableFuture.whenComplete((result, throwable) -> { if (completableFuture.isCancelled()) { onCancellationAction.run(); } }); return completableFuture; // return original CompletableFuture } /** @return {@link Supplier} wrapper for the given {@link RunnableFuture} which calls {@link RunnableFuture#run()} * followed by {@link RunnableFuture#get()}. */ public static <T> Supplier<T> asSupplier(RunnableFuture<T> futureTask) throws CompletionException { return () -> { try { futureTask.run(); try { return futureTask.get(); } catch (ExecutionException e) { // unwrap ExecutionExceptions final Throwable cause = e.getCause(); throw (cause != null) ? cause : e; } } catch (CompletionException e) { throw e; } catch (Throwable t) { throw new CompletionException(t); } }; }

Sé que el diseño de CompletableFuture no controla su ejecución con interrupciones, pero supongo que algunos de ustedes pueden tener este problema. CompletableFuture s es una muy buena manera de componer una ejecución asíncrona, pero dado el caso en el que desea que la ejecución subyacente se interrumpa o se detenga cuando se cancele el futuro, ¿cómo lo hacemos? ¿O simplemente debemos aceptar que cualquier CompletableFuture cancelado o completado manualmente no afectará el trabajo que se realiza allí para completarlo?

Eso es, en mi opinión, obviamente un trabajo inútil que toma tiempo del ejecutor trabajador. Me pregunto qué enfoque o diseño podría ayudar en este caso?

ACTUALIZAR

Aquí hay una prueba simple para esto.

public class SimpleTest { @Test public void testCompletableFuture() throws Exception { CompletableFuture<Void> cf = CompletableFuture.runAsync(()->longOperation()); bearSleep(1); //cf.cancel(true); cf.complete(null); System.out.println("it should die now already"); bearSleep(7); } public static void longOperation(){ System.out.println("started"); bearSleep(5); System.out.println("completed"); } private static void bearSleep(long seconds){ try { TimeUnit.SECONDS.sleep(seconds); } catch (InterruptedException e) { System.out.println("OMG!!! Interrupt!!!"); } } }


Por favor, vea mi respuesta a la pregunta relacionada: Transformar el futuro de Java en un CompletableFuture

En el código mencionado allí, el comportamiento de CompletionStage se agrega a la subclase RunnableFuture (utilizada por las implementaciones de ExecutorService), por lo que puede interrumpirlo de la manera correcta.


Que hay de esto

public static <T> CompletableFuture<T> supplyAsync(final Supplier<T> supplier) { final ExecutorService executorService = Executors.newFixedThreadPool(1); final CompletableFuture<T> cf = new CompletableFuture<T>() { @Override public boolean complete(T value) { if (isDone()) { return false; } executorService.shutdownNow(); return super.complete(value); } @Override public boolean completeExceptionally(Throwable ex) { if (isDone()) { return false; } executorService.shutdownNow(); return super.completeExceptionally(ex); } }; // submit task executorService.submit(() -> { try { cf.complete(supplier.get()); } catch (Throwable ex) { cf.completeExceptionally(ex); } }); return cf; }

Prueba simple:

CompletableFuture<String> cf = supplyAsync(() -> { try { Thread.sleep(1000L); } catch (Exception e) { System.out.println("got interrupted"); return "got interrupted"; } System.out.println("normal complete"); return "normal complete"; }); cf.complete("manual complete"); System.out.println(cf.get());

No me gusta la idea de tener que crear un servicio Executor cada vez, pero tal vez puedas encontrar una manera de reutilizar el ForkJoinPool.


Un CompletableFuture no está relacionado con la acción asíncrona que eventualmente puede completarla.

Dado que (a diferencia de FutureTask ) esta clase no tiene control directo sobre el cálculo que hace que se complete, la cancelación se trata simplemente como otra forma de finalización excepcional. El método de cancel tiene el mismo efecto que completeExceptionally(new CancellationException()) .

Puede que ni siquiera haya un hilo separado trabajando para completarlo (incluso puede haber muchos subprocesos trabajando en él). Incluso si existe, no hay un enlace desde CompletableFuture a ningún hilo que tenga una referencia a él.

Como tal, no hay nada que puedas hacer a través de CompletableFuture para interrumpir cualquier hilo que pueda estar ejecutando alguna tarea que lo complete. Tendrá que escribir su propia lógica que haga un seguimiento de cualquier instancia de Thread que adquiera una referencia a CompletableFuture con la intención de completarla.

Aquí hay un ejemplo del tipo de ejecución que creo que podrías evitar.

public static void main(String[] args) throws Exception { ExecutorService service = Executors.newFixedThreadPool(1); CompletableFuture<String> completable = new CompletableFuture<>(); Future<?> future = service.submit(new Runnable() { @Override public void run() { for (int i = 0; i < 10; i++) { if (Thread.interrupted()) { return; // remains uncompleted } try { Thread.sleep(1000); } catch (InterruptedException e) { return; // remains uncompleted } } completable.complete("done"); } }); Thread.sleep(2000); // not atomic across the two boolean cancelled = future.cancel(true); if (cancelled) completable.cancel(true); // may not have been cancelled if execution has already completed if (completable.isCancelled()) { System.out.println("cancelled"); } else if (completable.isCompletedExceptionally()) { System.out.println("exception"); } else { System.out.println("success"); } service.shutdown(); }

Esto supone que la tarea que se está ejecutando está configurada para manejar las interrupciones correctamente.