the source parallel over iterations into instream ejemplo divided are java java-8 java-stream

source - stream parallel java 8



Cálculo de estado de la corriente: sumas acumuladas (2)

Suponiendo que tengo un IntStream de Java, ¿es posible convertirlo en un IntStream con sumas acumuladas? Por ejemplo, una secuencia que comienza con [4, 2, 6, ...] se debe convertir a [4, 6, 12, ...].

De manera más general, ¿cómo debería uno ir implementando operaciones de transmisión de estado? Se siente que esto debería ser posible:

myIntStream.map(new Function<Integer, Integer> { int sum = 0; Integer apply(Integer value){ return sum += value; } );

Con la obvia restricción de que esto funciona solo en flujos secuenciales. Sin embargo, Stream.map requiere explícitamente una función de mapa sin estado. ¿Tengo razón al faltar una operación Stream.statefulMap o Stream.cumulative o es que falta el punto de los flujos de Java?

Compare, por ejemplo, con Haskell, donde la función scanl1 resuelve exactamente este ejemplo:

scanl1 (+) [1 2 3 4] = [1 3 6 10]


Es posible hacerlo con un colector que luego crea una nueva secuencia:

class Accumulator { public static void accept(List<Integer> list, Integer value) { list.add(value + (list.isEmpty() ? 0 : list.get(list.size() - 1))); } public static List<Integer> combine(List<Integer> list1, List<Integer> list2) { int total = list1.get(list1.size() - 1); list2.stream().map(n -> n + total).forEach(list1::add); return list1; } }

Esto se utiliza como:

myIntStream.parallel() .collect(ArrayList<Integer>::new, Accumulator::accept, Accumulator::combine) .stream();

Esperemos que pueda ver que el atributo importante de este recopilador es que incluso si la transmisión es paralela, ya que las instancias del Accumulator se combinan, ajusta los totales.

Obviamente, esto no es tan eficiente como una operación de mapa porque recopila la secuencia completa y luego produce una nueva secuencia. Pero eso no es solo un detalle de la implementación: es una función necesaria del hecho de que las secuencias están destinadas a ser potencialmente procesadas simultáneamente.

Lo he probado con IntStream.range(0, 10000).parallel() y funciona correctamente.


Puedes hacer esto con un número atómico. Por ejemplo:

import java.util.concurrent.atomic.AtomicLong; import java.util.stream.IntStream; import java.util.stream.LongStream; public class Accumulator { public static LongStream toCumulativeSumStream(IntStream ints){ AtomicLong sum = new AtomicLong(0); return ints.sequential().mapToLong(sum::addAndGet); } public static void main(String[] args){ LongStream sums = Accumulator.toCumulativeSumStream(IntStream.range(1, 5)); sums.forEachOrdered(System.out::println); } }

Esto produce:

1 3 6 10

He usado un Largo para almacenar las sumas, porque es totalmente posible que dos ints se sumen bien a lo largo de Integer.MAX_VALUE , y un largo tiene menos posibilidades de desbordamiento.