streams procesamiento parte ejemplo datos con collection java parallel-processing java-8 java-stream reduce

procesamiento - stream java methods



¿Qué implica exactamente el requisito de reducir Stream()? (2)

¿Por qué el acumulador no es asociativo?

No es asociativo ya que el orden de las operaciones de resta determina el resultado final.

Si ejecuta un Stream serie, obtendrá el resultado esperado de:

0 - 1 - 2 - 3 - 4 - 5 - 6 = -21

Por otro lado, para Stream s paralelos, el trabajo se divide en varios hilos. Por ejemplo, si reduce se ejecuta en paralelo en 6 hilos, y luego se combinan los resultados intermedios, puede obtener un resultado diferente:

0 - 1 0 - 2 0 - 3 0 - 4 0 - 5 0 - 6 -1 -2 -3 -4 -5 -6 -1 - (-2) -3 - (-4) -5 - (-6) 1 1 1 1 - 1 0 - 1 -1

O, para hacer un largo ejemplo corto:

(1 - 2) - 3 = -4 1 - (2 - 3) = 2

Por lo tanto, la resta no es asociativa.

Por otro lado, a+b no causa el mismo problema, ya que la suma es un operador asociativo (es decir, (a+b)+c == a+(b+c) ).

El problema con el ejemplo de identidad es que cuando la reducción se ejecuta en paralelo en múltiples hilos, se agrega "X" al comienzo de cada resultado intermedio.

¿Qué exactamente sería una identidad adecuada para usar entonces?

Si cambia el valor de identidad a "" :

System.out.println(Arrays.asList("w","o","l","f")) .parallelStream() .reduce("", String::concat));

obtendrá "lobo" en lugar de "XwXoXlXf".

Al utilizar la operación de reduce() en una secuencia paralela, el libro de examen de OCP establece que hay ciertos principios que los argumentos de reduce() deben cumplir. Esos argumentos son los siguientes:

  1. La identidad debe definirse de manera que para todos los elementos en la secuencia u, combiner.apply (identidad, u) sea igual a u.
  2. El operador de acumulador op debe ser asociativo y sin estado de modo que (a op b) op c sea ​​igual a a op (b op c) .
  3. El operador combinador también debe ser asociativo y apátrida y compatible con la identidad, de modo que para todos los combiner.apply(u, accumulator.apply(identity, t)) es igual a accumulator.apply(u,t) .

El libro de examen brinda dos ejemplos para ilustrar estos principios. Consulte el código a continuación:

ejemplo para asociativo:

System.out.println(Arrays,asList(1,2,3,4,5,6)) .parallelStream() .reduce(0,(a,b) -> (a-b))); //NOT AN ASSOCIATIVE ACCUMULATOR

Lo que dice el libro de OCP sobre esto:

Puede generar -21, 3 o algún otro valor ya que la función del acumulador viola la propiedad de asociatividad.

ejemplo para el requisito de identidad:

System.out.println(Arrays.asList("w","o","l","f")) .parallelStream() .reduce("X", String::concat));

Lo que dice el libro de OCP sobre esto:

Puede ver otros problemas si usamos un parámetro de identidad que no es realmente un valor de identidad. Puede dar salida a XwXoXlXf. Como parte del proceso paralelo, la identidad se aplica a múltiples elementos en la secuencia, lo que genera datos muy inesperados.

No entiendo esos ejemplos. Con el ejemplo del acumulador, el acumulador comienza con 0 -1 = -1, luego -1 -2, que es = -3, luego -6, etc., todo el camino hasta -21. Entiendo que, dado que la matriz de matrices generada no está sincronizada, los resultados pueden ser impredecibles debido a la posibilidad de condiciones de carrera, etc., pero ¿por qué el acumulador no es asociativo? Woulden''t (a+b) causa resultados impredecibles también? Realmente no veo qué está mal con el acumulador que se usa en el ejemplo y por qué no es asociativo, pero de nuevo aún no entiendo exactamente qué es el ser con el principio asociativo.

No entiendo el ejemplo de identidad tampoco. Entiendo que el resultado podría ser XwXoXlXf si 4 hilos separados comenzaran a acumularse con la identidad al mismo tiempo, pero ¿qué tiene eso que ver con el parámetro de identidad? ¿Qué exactamente sería una identidad adecuada para usar entonces?

Me preguntaba si alguien podría aclararme un poco más sobre estos principios.

Gracias


Déjame darte dos ejemplos. Primero donde se rompe la identidad:

int result = Stream.of(1, 2, 3, 4, 5, 6) .parallel() .reduce(10, (a, b) -> a + b); System.out.println(result); // 81 on my run

Básicamente se ha roto esta regla: The identity value must be an identity for the accumulator function. This means that for all u, accumulator(identity, u) is equal to u The identity value must be an identity for the accumulator function. This means that for all u, accumulator(identity, u) is equal to u .

O para hacer es más simple, veamos si esa regla se cumple para algunos datos aleatorios de nuestro flujo:

Integer identity = 10; BinaryOperator<Integer> combiner = (x, y) -> x + y; boolean identityRespected = combiner.apply(identity, 1) == 1; System.out.println(identityRespected); // prints false

Y un segundo ejemplo:

/** * count letters, adding a bit more all the time */ private static int howMany(List<String> tokens) { return tokens.stream() .parallel() .reduce(0, // identity (i, s) -> { // accumulator return s.length() + i; }, (left, right) -> { // combiner return left + right + left; // notice the extra left here }); }

E invocas esto con:

List<String> left = Arrays.asList("aa", "bbb", "cccc", "ddddd", "eeeeee"); List<String> right = Arrays.asList("aa", "bbb", "cccc", "ddddd", "eeeeee", ""); System.out.println(howMany(left)); // 38 on my run System.out.println(howMany(right)); // 50 on my run

Básicamente se ha roto esta regla: Additionally, the combiner function must be compatible with the accumulator function o en el código:

// this must hold! // combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t) Integer identity = 0; String t = "aa"; Integer u = 3; // "bbb" BiFunction<Integer, String, Integer> accumulator = (Integer i, String s) -> i + s.length(); BinaryOperator<Integer> combiner = (left, right) -> left + right + left; int first = accumulator.apply(identity, t); // 2 int second = combiner.apply(u, first); // 3 + 2 + 3 = 8 Integer shouldBe8 = accumulator.apply(u, t); System.out.println(shouldBe8 == second); // false