java8 completionstage completablefuture await async java concurrency java-8 java-stream completable-future

completionstage - java promise



Java 8 CompletableFuture, Stream y Timeouts (4)

Puede envolver el trabajo en otro CompletableFuture y emitiría una excepción TimeoutException si se supera el tiempo dado. Puede separar el bloque catch de TimeoutException si quiere manejarlo especialmente.

List<String> collect = null; try { collect = CompletableFuture.supplyAsync(() -> Stream.of("1", "2", "3", "4", "5", "6", "7") .map(x -> CompletableFuture.supplyAsync(getStringSupplier(x))) .collect(Collectors.toList()) .stream() .map(CompletableFuture::join) .collect(Collectors.toList()) ).get(5, TimeUnit.SECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { e.printStackTrace(); //separate out the TimeoutException if you want to handle it differently } System.out.println(collect); //would be null in case of any exception

Estoy tratando de procesar cierta cantidad de datos al mismo tiempo usando CompletableFuture y Stream Hasta ahora tengo:

public static void main(String[] args) throws InterruptedException, ExecutionException { System.out.println("start"); List<String> collect = Stream.of("1", "2", "3", "4", "5", "6", "7") .map(x -> CompletableFuture.supplyAsync(getStringSupplier(x))) .collect(Collectors.toList()) .stream() .map(CompletableFuture::join) .collect(Collectors.toList()); System.out.println("stop out!"); } public static Supplier<String> getStringSupplier(String text) { return () -> { System.out.println("start " + text); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("stop " + text); return "asd" + text; }; }

Y la salida está bien:

start start 1 start 4 start 3 start 2 start 5 start 6 start 7 stop 4 stop 1 stop 5 stop 2 stop 6 stop 3 stop 7 stop out!

Sin embargo, en este momento quiero agregar tiempo de espera para ese trabajo. Digamos que debería ser cancelado después de 1 SEGUNDO. Y devuelve nulo o algún otro valor para collect lista. (Prefiero algún valor que indique la causa).

¿Cómo puedo lograr eso?

Gracias por la ayuda de antemano.


He encontrado la manera de hacerlo:

private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool( 1, new ThreadFactoryBuilder() .setDaemon(true) .setNameFormat("failAfter-%d") .build()); public static void main(String[] args) throws InterruptedException, ExecutionException { System.out.println("start"); final CompletableFuture<Object> oneSecondTimeout = failAfter(Duration.ofSeconds(1)) .exceptionally(xxx -> "timeout exception"); List<Object> collect = Stream.of("1", "2", "3", "4", "5", "6", "7") .map(x -> CompletableFuture.anyOf(createTaskSupplier(x) , oneSecondTimeout)) .collect(Collectors.toList()) .stream() .map(CompletableFuture::join) .collect(Collectors.toList()); System.out.println("stop out!"); System.out.println(collect); } public static CompletableFuture<String> createTaskSupplier(String x) { return CompletableFuture.supplyAsync(getStringSupplier(x)) .exceptionally(xx -> "PROCESSING ERROR : " + xx.getMessage()); } public static Supplier<String> getStringSupplier(String text) { return () -> { System.out.println("start " + text); try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } if (text.equals("1")) { throw new RuntimeException("LOGIC ERROR"); } try { if (text.equals("7")) TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("stop " + text); return "result " + text; }; } public static <T> CompletableFuture<T> failAfter(Duration duration) { final CompletableFuture<T> promise = new CompletableFuture<>(); scheduler.schedule(() -> { final TimeoutException ex = new TimeoutException("Timeout after " + duration); return promise.completeExceptionally(ex); }, duration.toMillis(), MILLISECONDS); return promise; }

Vuelve :

start start 1 start 3 start 4 start 2 start 5 start 6 start 7 stop 6 stop 4 stop 3 stop 5 stop 2 stop out! [PROCESSING ERROR : java.lang.RuntimeException: LOGIC ERROR, result 2, result 3, result 4, result 5, result 6, timeout exception]`

¿Qué piensas sobre eso, puedes detectar los defectos de esa solución?


puede probar el método sobrecargadoAsync sobrecargado de CompletableFuture con el parámetro ejecutor (CompletableFuture.supplyAsync (getStringSupplier (x), timeoutExecutorService)) y puede referir el enlace para timeoutExecutorService.


Para otros, que no están limitados con Java 8, puede usar el método completeOnTimeout , que se introdujo en Java 9.

List<String> collect = Stream.of("1", "2", "3", "4", "5", "6", "7") .map(x -> CompletableFuture.supplyAsync(getStringSupplier(x)) .completeOnTimeout(null , 1, SECONDS)) .filter(Objects::nonNull) .collect(toList()) .stream() .map(CompletableFuture::join) .collect(toList());