exception - ¿Cómo se comportan los flujos paralelos de Java 8 en una excepción lanzada?
exception-handling parallel-processing (1)
Cuando se lanza una excepción en una de las etapas, no se espera a que finalicen otras operaciones, la excepción se vuelve a lanzar a la persona que llama. Así es como ForkJoinPool maneja eso.
Por el contrario, findFirst, por ejemplo, cuando se ejecuta en paralelo, presentará el resultado al llamante solo después de que TODAS las operaciones hayan finalizado el procesamiento (incluso si el resultado se conoce antes de la necesidad de finalizar todas las operaciones).
Dicho de otra manera: volverá pronto, pero dejará todas las tareas en ejecución para finalizar.
EDITAR para responder al último comentario.
Esto está muy explicado por la respuesta de Holger (enlace en los comentarios), pero aquí hay algunos detalles.
1) Al matar a todos, PERO al hilo principal, también está eliminando todas las tareas que se suponía que debían ser manejadas por estos hilos. Entonces, ese número debería ser más de 250, ya que hay 1000 tareas y 4 subprocesos. ¿Supongo que esto devuelve 3 ?:
int result = ForkJoinPool.getCommonPoolParallelism();
En teoría, hay 1000 tareas, hay 4 subprocesos, cada uno se supone que maneja 250 tareas, luego se eliminan 3 de ellas, lo que significa que se pierden 750 tareas. Hay 250 tareas pendientes de ejecutar y ForkJoinPool abarcará 3 subprocesos nuevos para ejecutar estas 250 tareas restantes.
Algunas cosas que puede probar, cambie su flujo de esta manera (haciendo que el flujo no tenga el tamaño adecuado):
IntStream.generate(random::nextInt).limit(1000).parallel().forEach
Esta vez, habría muchas más operaciones que finalizarán, porque el índice de división inicial es desconocido y elegido por alguna otra estrategia. Lo que también podrías intentar es cambiar esto:
if (!Thread.currentThread().getName().equals("main") && throwException.compareAndSet(true, false)) {
a esto:
if (!Thread.currentThread().getName().equals("main")) {
Esta vez, siempre mataría todos los subprocesos además del principal, hasta cierto punto, donde ForkJoinPool no creará nuevos subprocesos, ya que la tarea es demasiado pequeña para dividir, por lo que no es necesario que haya otros subprocesos. En este caso, incluso menos tareas terminarían.
2) Su segundo ejemplo, cuando realmente mata el hilo principal, como está el código, no verá la ejecución real de otros hilos. Cambialo :
} catch (Exception e) {
System.out.println("Cought Exception. Resetting the afterExceptionCount to zero - 0.");
afterExceptionCount.set(0);
}
// give some time for other threads to finish their work. You could play commenting and de-commenting this line to see a big difference in results.
TimeUnit.SECONDS.sleep(60);
System.out.println("Overall count: " + overallCount.get());
System.out.println("After exception count: " + afterExceptionCount.get());
¿Cómo se comportan los flujos paralelos de Java 8 en una excepción lanzada en la cláusula de consumo, por ejemplo, en el manejo de forEach
? Por ejemplo, el siguiente código:
final AtomicBoolean throwException = new AtomicBoolean(true);
IntStream.range(0, 1000)
.parallel()
.forEach(i -> {
// Throw only on one of the threads.
if (throwException.compareAndSet(true, false)) {
throw new RuntimeException("One of the tasks threw an exception. Index: " + i);
});
¿Detiene los elementos manipulados inmediatamente? ¿Espera a que finalicen los elementos ya iniciados? ¿Espera a que termine toda la transmisión? ¿Comienza a manejar los elementos de la secuencia después de que se lanza la excepción?
¿Cuándo vuelve? Inmediatamente después de la excepción? Después de todo / parte de los elementos fueron manejados por el consumidor?
¿Los elementos continúan siendo manejados después de que la secuencia paralela lanzó la excepción? (Encontré un caso donde sucedió esto).
¿Hay una regla general aquí?
EDITAR (15-11-2016)
Tratando de determinar si el flujo paralelo regresa antes, encontré que no es determinado:
@Test
public void testParallelStreamWithException() {
AtomicInteger overallCount = new AtomicInteger(0);
AtomicInteger afterExceptionCount = new AtomicInteger(0);
AtomicBoolean throwException = new AtomicBoolean(true);
try {
IntStream.range(0, 1000)
.parallel()
.forEach(i -> {
overallCount.incrementAndGet();
afterExceptionCount.incrementAndGet();
try {
System.out.println(i + " Sleeping...");
Thread.sleep(1000);
System.out.println(i + " After Sleeping.");
}
catch (InterruptedException e) {
e.printStackTrace();
}
// Throw only on one of the threads and not on main thread.
if (!Thread.currentThread().getName().equals("main") && throwException.compareAndSet(true, false)) {
System.out.println("Throwing exception - " + i);
throw new RuntimeException("One of the tasks threw an exception. Index: " + i);
}
});
Assert.fail("Should not get here.");
}
catch (Exception e) {
System.out.println("Cought Exception. Resetting the afterExceptionCount to zero - 0.");
afterExceptionCount.set(0);
}
System.out.println("Overall count: " + overallCount.get());
System.out.println("After exception count: " + afterExceptionCount.get());
}
Retorno tardío al tirar no desde el hilo principal. Esto causó que muchos elementos nuevos se manejaran mucho después de que se lanzara la excepción. En mi máquina, se manejaron unos 200 elementos después de que se lanzó la excepción. PERO, no todos los 1000 elementos fueron manejados. Entonces, ¿cuál es la regla aquí? ¿Por qué se manejaron más elementos a pesar de que se lanzó la excepción?
La devolución anticipada al eliminar el signo de no ( !
) Hace que se genere la excepción en el hilo principal. Solo se procesaron los elementos ya iniciados y no se manejaron nuevos. El regreso temprano fue el caso aquí. No concuerda con el comportamiento anterior.
¿Que me estoy perdiendo aqui?