java - motor - colectores electricos
¿Se puede usar alguna vez la función de combinador de un colector en secuencias secuenciales? (2)
Programa de muestra:
public final class CollectorTest
{
private CollectorTest()
{
}
private static <T> BinaryOperator<T> nope()
{
return (t, u) -> { throw new UnsupportedOperationException("nope"); };
}
public static void main(final String... args)
{
final Collector<Integer, ?, List<Integer>> c
= Collector.of(ArrayList::new, List::add, nope());
IntStream.range(0, 10_000_000).boxed().collect(c);
}
}
Entonces, para simplificar las cosas aquí, no hay una transformación final, por lo que el código resultante es bastante simple.
Ahora,
IntStream.range()
produce una secuencia secuencial.
Simplemente encajono los resultados en
Integer
sy luego mi artificial
Collector
recopila en una
List<Integer>
.
Bastante simple.
Y no importa cuántas veces ejecute este programa de ejemplo,
UnsupportedOperationException
nunca llega, lo que significa que nunca se llama a mi combinador ficticio.
Esperaba esto, pero ya he entendido mal las transmisiones lo suficiente como para tener que hacer la pregunta ...
¿Se puede llamar al combinador de un
Collector
cuando se
garantiza
que la secuencia sea secuencial?
Como se observó en los comentarios anteriores de @MarkoTopolnik y @Duncan, no hay garantía de que se llame a
Collector.combiner()
en modo secuencial para producir un resultado reducido.
De hecho, el documento de Java es un poco subjetivo en este punto, lo que puede conducir a una interpretación no apropiada.
(...) Una implementación paralela dividiría la entrada, crearía un contenedor de resultados para cada partición, acumularía el contenido de cada partición en un resultado secundario para esa partición y luego usaría la función de combinación para fusionar los resultados secundarios en un resultado combinado .
De acuerdo con NoBlogDefFound combinador se usa solo en modo paralelo. Vea la cita parcial a continuación:
combiner () se usa para unir dos acumuladores en uno solo. Se utiliza cuando el recopilador se ejecuta en paralelo, dividiendo el flujo de entrada y recolectando las piezas de forma independiente primero.
Para mostrar más claramente este problema, reescribo el primer código y pongo dos enfoques (serial y paralelo).
public final class CollectorTest
{
private CollectorTest()
{
}
private static <T> BinaryOperator<T> nope()
{
return (t, u) -> { throw new UnsupportedOperationException("nope"); };
}
public static void main(final String... args)
{
final Collector<Integer, ?, List<Integer>> c =
Collector
.of(ArrayList::new, List::add, nope());
// approach sequential
Stream<Integer> sequential = IntStream
.range(0, 10_000_000)
.boxed();
System.out.println("isParallel:" + sequential.isParallel());
sequential
.collect(c);
// approach parallel
Stream<Integer> parallel = IntStream
.range(0, 10_000_000)
.parallel()
.boxed();
System.out.println("isParallel:" + parallel.isParallel());
parallel
.collect(c);
}
}
Después de ejecutar este código podemos obtener el resultado:
isParallel:false
isParallel:true
Exception in thread "main" java.lang.UnsupportedOperationException: nope
at com..lambda.CollectorTest.lambda$nope$0(CollectorTest.java:18)
at com..lambda.CollectorTest$$Lambda$3/2001049719.apply(Unknown Source)
at java.util.stream.ReduceOps$3ReducingSink.combine(ReduceOps.java:174)
at java.util.stream.ReduceOps$3ReducingSink.combine(ReduceOps.java:160)
Entonces, de acuerdo con este resultado, podemos inferir que
Collector''s combiner
solo puede ser llamado por la ejecución paralela.
Una lectura cuidadosa del código de implementación de secuencias en
ReduceOps.java
revela que la función de combinación solo se llama cuando se completa un
ReduceTask
, y
ReduceTask
instancias de
ReduceTask
se usan solo cuando se evalúa una tubería en paralelo.
Por lo tanto,
en la implementación actual,
nunca se llama al combinador al evaluar una tubería secuencial.
Sin embargo, no hay nada en la especificación que garantice esto.
Un
Collector
es una interfaz que exige requisitos para sus implementaciones, y no se otorgan exenciones para las secuencias secuenciales.
Personalmente, me resulta difícil imaginar por qué la evaluación secuencial de la tubería podría necesitar llamar al combinador, pero alguien con más imaginación que yo podría encontrar un uso inteligente e implementarlo.
La especificación lo permite, y aunque la implementación de hoy no lo hace, aún tiene que pensarlo.
Esto no debería sorprender. El centro de diseño de la API de flujos es admitir la ejecución paralela en pie de igualdad con la ejecución secuencial. Por supuesto, es posible que un programa observe si se está ejecutando secuencialmente o en paralelo. Pero el diseño de la API es para soportar un estilo de programación que lo permita.
Si está escribiendo un recopilador y encuentra que es imposible (o inconveniente, o difícil) escribir una función de combinación asociativa, lo que lo lleva a querer restringir su flujo a la ejecución secuencial, tal vez esto significa que está yendo en la dirección incorrecta . Es hora de retroceder un poco y pensar en abordar el problema de una manera diferente.
Una operación de estilo de reducción común que no requiere una función de combinación asociativa se llama pliegue a la izquierda . La característica principal es que la función de plegado se aplica estrictamente de izquierda a derecha, procediendo de una en una. No conozco una forma de paralelizar el pliegue a la izquierda.
Cuando las personas intentan contorsionar a los coleccionistas de la forma en que hemos estado hablando, generalmente buscan algo como doblar a la izquierda. La API de Streams no tiene soporte directo de API para esta operación, pero es bastante fácil de escribir. Por ejemplo, suponga que desea reducir una lista de cadenas usando esta operación: repita la primera cadena y luego agregue la segunda. Es bastante fácil demostrar que esta operación no es asociativa:
List<String> list = Arrays.asList("a", "b", "c", "d", "e");
System.out.println(list.stream()
.collect(StringBuilder::new,
(a, b) -> a.append(a.toString()).append(b),
(a, b) -> a.append(a.toString()).append(b))); // BROKEN -- NOT ASSOCIATIVE
Ejecutar secuencialmente, esto produce la salida deseada:
aabaabcaabaabcdaabaabcaabaabcde
Pero cuando se ejecuta en paralelo, puede producir algo como esto:
aabaabccdde
Como "funciona" secuencialmente, podríamos aplicar esto llamando a
sequential()
y respaldar esto haciendo que el combinador arroje una excepción.
Además, se debe llamar al proveedor exactamente una vez.
No hay forma de combinar los resultados intermedios, por lo que si se llama al proveedor dos veces, ya estamos en problemas.
Pero como "sabemos" que se llama al proveedor solo una vez en modo secuencial, la mayoría de las personas no se preocupan por esto.
De hecho, he visto a personas escribir "proveedores" que devuelven algún objeto existente en lugar de crear uno nuevo, en violación del contrato del proveedor.
En este uso de la forma de 3 argumentos de
collect()
, tenemos dos de las tres funciones que rompen sus contratos.
¿No debería esto decirnos que hagamos las cosas de una manera diferente?
El trabajo principal aquí lo realiza la función de acumulador.
Para lograr una reducción de estilo de plegado, podemos aplicar esta función en un estricto orden de izquierda a derecha usando
forEachOrdered()
.
Tenemos que hacer un poco de configuración y código de acabado antes y después, pero eso no es problema:
StringBuilder a = new StringBuilder();
list.parallelStream()
.forEachOrdered(b -> a.append(a.toString()).append(b));
System.out.println(a.toString());
Naturalmente, esto funciona bien en paralelo, aunque los beneficios de rendimiento de la ejecución en paralelo pueden ser algo negados por los requisitos de pedido de
forEachOrdered()
.
En resumen, si desea hacer una reducción mutable pero carece de una función de combinación asociativa, lo que le lleva a restringir su flujo a una ejecución secuencial,
forEachRemaining()
el problema como una operación de
plegado a la izquierda
y use
forEachRemaining()
en su acumulador función.