streams procesamiento print parte ejemplo datos con collection java java-8 java-stream

print - procesamiento de datos con streams de java se 8-parte 2



Recoge pares sucesivos de una secuencia (18)

Dado un flujo como { 0, 1, 2, 3, 4 } ,

¿Cómo puedo transformarlo de la manera más elegante a una forma dada?

{ new Pair(0, 1), new Pair(1, 2), new Pair(2, 3), new Pair(3, 4) }

(suponiendo, por supuesto, he definido el par de clases)?

Editar: Esto no se trata estrictamente de ints o streams primitivos. La respuesta debe ser general para una transmisión de cualquier tipo.


Encontrar pares sucesivos

Si está dispuesto a utilizar una biblioteca de terceros y no necesita paralelismo, entonces jOOλ ofrece funciones de ventana de estilo SQL de la siguiente manera

System.out.println( Seq.of(0, 1, 2, 3, 4) .window() .filter(w -> w.lead().isPresent()) .map(w -> tuple(w.value(), w.lead().get())) // alternatively, use your new Pair() class .toList() );

Flexible

[(0, 1), (1, 2), (2, 3), (3, 4)]

La función lead() accede al siguiente valor en orden transversal desde la ventana.

Encontrar sucesivos triples / cuádruples / n-tuplas

Una pregunta en los comentarios fue pedir una solución más general, donde no se deben recopilar pares sino n-tuplas (o posiblemente listas). Aquí hay un enfoque alternativo:

int n = 3; System.out.println( Seq.of(0, 1, 2, 3, 4) .window(0, n - 1) .filter(w -> w.count() == n) .map(w -> w.window().toList()) .toList() );

Rendimiento de una lista de listas

[[0, 1, 2], [1, 2, 3], [2, 3, 4]]

Sin el filter(w -> w.count() == n) , el resultado sería

[[0, 1, 2], [1, 2, 3], [2, 3, 4], [3, 4], [4]]

Descargo de responsabilidad: trabajo para la compañía detrás de jOOλ


Como otros han observado, debido a la naturaleza del problema, se requiere cierta condición de estado.

Me enfrenté a un problema similar, en el que quería lo que era esencialmente la función SQL de Oracle LEAD. Mi intento de implementar eso está abajo.

/** * Stream that pairs each element in the stream with the next subsequent element. * The final pair will have only the first item, the second will be null. */ <T> Spliterator<Pair<T>> lead(final Stream<T> stream) { final Iterator<T> input = stream.sequential().iterator(); final Iterable<Pair<T>> iterable = () -> { return new Iterator<Pair<T>>() { Optional<T> current = getOptionalNext(input); @Override public boolean hasNext() { return current.isPresent(); } @Override public Pair<T> next() { Optional<T> next = getOptionalNext(input); final Pair<T> pair = next.isPresent() ? new Pair(current.get(), next.get()) : new Pair(current.get(), null); current = next; return pair; } }; }; return iterable.spliterator(); } private <T> Optional<T> getOptionalNext(final Iterator<T> iterator) { return iterator.hasNext() ? Optional.of(iterator.next()) : Optional.empty(); }


Ejecute un bucle for que se ejecute desde 0 hasta length-1 de su secuencia

for(int i = 0 ; i < stream.length-1 ; i++) { Pair pair = new Pair(stream[i], stream[i+1]); // then add your pair to an array }


En su caso, escribiría mi IntFunction personalizada que realiza un seguimiento del último int pasado y lo utiliza para mapear el IntStream original.

import java.util.function.IntFunction; import java.util.stream.IntStream; public class PairFunction implements IntFunction<PairFunction.Pair> { public static class Pair { private final int first; private final int second; public Pair(int first, int second) { this.first = first; this.second = second; } @Override public String toString() { return "[" + first + "|" + second + "]"; } } private int last; private boolean first = true; @Override public Pair apply(int value) { Pair pair = !first ? new Pair(last, value) : null; last = value; first = false; return pair; } public static void main(String[] args) { IntStream intStream = IntStream.of(0, 1, 2, 3, 4); final PairFunction pairFunction = new PairFunction(); intStream.mapToObj(pairFunction) .filter(p -> p != null) // filter out the null .forEach(System.out::println); // display each Pair } }


Este es un problema interesante. ¿Mi intento híbrido está por debajo de cualquier bien?

public static void main(String[] args) { List<Integer> list = Arrays.asList(1, 2, 3); Iterator<Integer> first = list.iterator(); first.next(); if (first.hasNext()) list.stream() .skip(1) .map(v -> new Pair(first.next(), v)) .forEach(System.out::println); }

Creo que no se presta para el procesamiento paralelo, y por lo tanto puede ser descalificado.


Estoy de acuerdo con @aepurniet, pero en lugar de mapa tienes que usar mapToObj

range(0, 100).mapToObj((i) -> new Pair(i, i+1)).forEach(System.out::println);


Implementé un contenedor spliterator que toma cada n elementos T del spliterator original y produce List<T> :

public class ConsecutiveSpliterator<T> implements Spliterator<List<T>> { private final Spliterator<T> wrappedSpliterator; private final int n; private final Deque<T> deque; private final Consumer<T> dequeConsumer; public ConsecutiveSpliterator(Spliterator<T> wrappedSpliterator, int n) { this.wrappedSpliterator = wrappedSpliterator; this.n = n; this.deque = new ArrayDeque<>(); this.dequeConsumer = deque::addLast; } @Override public boolean tryAdvance(Consumer<? super List<T>> action) { deque.pollFirst(); fillDeque(); if (deque.size() == n) { List<T> list = new ArrayList<>(deque); action.accept(list); return true; } else { return false; } } private void fillDeque() { while (deque.size() < n && wrappedSpliterator.tryAdvance(dequeConsumer)) ; } @Override public Spliterator<List<T>> trySplit() { return null; } @Override public long estimateSize() { return wrappedSpliterator.estimateSize(); } @Override public int characteristics() { return wrappedSpliterator.characteristics(); } }

El siguiente método se puede usar para crear una transmisión consecutiva:

public <E> Stream<List<E>> consecutiveStream(Stream<E> stream, int n) { Spliterator<E> spliterator = stream.spliterator(); Spliterator<List<E>> wrapper = new ConsecutiveSpliterator<>(spliterator, n); return StreamSupport.stream(wrapper, false); }

Uso de muestra:

consecutiveStream(Stream.of(0, 1, 2, 3, 4, 5), 2) .map(list -> new Pair(list.get(0), list.get(1))) .forEach(System.out::println);


La biblioteca de paquetes de protones proporciona la funcionalidad con ventana. Dada una clase Pair y un Stream, puedes hacerlo así:

Stream<Integer> st = Stream.iterate(0 , x -> x + 1); Stream<Pair<Integer, Integer>> pairs = StreamUtils.windowed(st, 2, 1) .map(l -> new Pair<>(l.get(0), l.get(1))) .moreStreamOps(...);

Ahora la secuencia de pairs contiene:

(0, 1) (1, 2) (2, 3) (3, 4) (4, ...) and so on


La biblioteca de flujos Java 8 está orientada principalmente a dividir flujos en fragmentos más pequeños para procesamiento en paralelo, por lo que las etapas de canalización son bastante limitadas y no se admite hacer cosas como obtener el índice del elemento de flujo actual y acceder a elementos de flujo adyacentes.

Una forma típica de resolver estos problemas, con algunas limitaciones, por supuesto, es impulsar la secuencia por índices y confiar en que los valores se procesen en alguna estructura de datos de acceso aleatorio como una ArrayList a partir de la cual se pueden recuperar los elementos. Si los valores estaban en arrayList , uno podría generar los pares según lo solicitado haciendo algo como esto:

IntStream.range(1, arrayList.size()) .mapToObj(i -> new Pair(arrayList.get(i-1), arrayList.get(i))) .forEach(System.out::println);

Por supuesto, la limitación es que la entrada no puede ser una secuencia infinita. Sin embargo, esta tubería se puede ejecutar en paralelo.


La operación es esencialmente con estado, por lo que en realidad no es lo que las transmisiones deben resolver: consulte la sección "Conductas sin estado" en el javadoc :

El mejor enfoque es evitar los parámetros de comportamiento con estado para transmitir las operaciones por completo

Una solución aquí es introducir estado en su flujo a través de un contador externo, aunque solo funcionará con un flujo secuencial.

public static void main(String[] args) { Stream<String> strings = Stream.of("a", "b", "c", "c"); AtomicReference<String> previous = new AtomicReference<>(); List<Pair> collect = strings.map(n -> { String p = previous.getAndSet(n); return p == null ? null : new Pair(p, n); }) .filter(p -> p != null) .collect(toList()); System.out.println(collect); } static class Pair<T> { private T left, right; Pair(T left, T right) { this.left = left; this.right = right; } @Override public String toString() { return "{" + left + "," + right + ''}''; } }


Mi biblioteca StreamEx , que amplía las transmisiones estándar, proporciona un método pairMap para todos los tipos de flujo. Para las transmisiones primitivas, no cambia el tipo de flujo, pero se puede usar para hacer algunos cálculos. El uso más común es calcular las diferencias:

int[] pairwiseDiffs = IntStreamEx.of(input).pairMap((a, b) -> (b-a)).toArray();

Para el flujo de objetos puede crear cualquier otro tipo de objeto. Mi biblioteca no proporciona ninguna nueva estructura de datos visible para el usuario como Pair (esa es la parte del concepto de la biblioteca). Sin embargo, si tiene su propia clase Pair y desea usarla, puede hacer lo siguiente:

Stream<Pair> pairs = IntStreamEx.of(input).boxed().pairMap(Pair::new);

O si ya tienes algo de Stream :

Stream<Pair> pairs = StreamEx.of(stream).pairMap(Pair::new);

Esta funcionalidad se implementa mediante spliterator personalizado . Tiene una sobrecarga bastante baja y se puede paralelizar muy bien. Por supuesto, funciona con cualquier fuente de transmisión, no solo como lista de acceso aleatorio como muchas otras soluciones. En muchas pruebas, funciona muy bien. Here''s un punto de referencia de JMH donde encontramos todos los valores de entrada que preceden a un valor mayor usando diferentes enfoques (vea this pregunta).


No es elegante, es una solución hackish, pero funciona para flujos infinitos

Stream<Pair> pairStream = Stream.iterate(0, (i) -> i + 1).map( // natural numbers new Function<Integer, Pair>() { Integer previous; @Override public Pair apply(Integer integer) { Pair pair = null; if (previous != null) pair = new Pair(previous, integer); previous = integer; return pair; } }).skip(1); // drop first null

Ahora puedes limitar tu transmisión todo el tiempo que quieras

pairStream.limit(1_000_000).forEach(i -> System.out.println(i));

PD Espero que haya una mejor solución, algo así como clojure (partition 2 1 stream)


Para calcular las diferencias sucesivas en el tiempo (valores x) de una serie temporal, utilizo el método de collect(...) la stream :

final List< Long > intervals = timeSeries.data().stream() .map( TimeSeries.Datum::x ) .collect( DifferenceCollector::new, DifferenceCollector::accept, DifferenceCollector::combine ) .intervals();

Donde el DifferenceCollector es algo como esto:

public class DifferenceCollector implements LongConsumer { private final List< Long > intervals = new ArrayList<>(); private Long lastTime; @Override public void accept( final long time ) { if( Objects.isNull( lastTime ) ) { lastTime = time; } else { intervals.add( time - lastTime ); lastTime = time; } } public void combine( final DifferenceCollector other ) { intervals.addAll( other.intervals ); lastTime = other.lastTime; } public List< Long > intervals() { return intervals; } }

Probablemente pueda modificar esto para satisfacer sus necesidades.


Podemos usar RxJava (biblioteca de extensión reactiva muy poderosa)

IntStream intStream = IntStream.iterate(1, n -> n + 1); Observable<List<Integer>> pairObservable = Observable.from(intStream::iterator).buffer(2,1); pairObservable.take(10).forEach(b -> { b.forEach(n -> System.out.println(n)); System.out.println(); });

El operator búfer transforma un Observable que emite elementos en un Observable que emite colecciones almacenadas en búfer de esos elementos.


Puede lograrlo utilizando una cola limitada para almacenar elementos que fluyen a través de la secuencia (que se basa en la idea que describí en detalle aquí: ¿es posible obtener el siguiente elemento en la secuencia? )

El ejemplo de Belows define primero la instancia de la clase BoundedQueue que almacenará los elementos que pasan por la secuencia (si no le gusta la idea de extender la LinkedList, consulte el enlace mencionado anteriormente para obtener un enfoque más genérico y alternativo). Más tarde, simplemente combina dos elementos posteriores en la instancia de Pair:

public class TwoSubsequentElems { public static void main(String[] args) { List<Integer> input = new ArrayList<Integer>(asList(0, 1, 2, 3, 4)); class BoundedQueue<T> extends LinkedList<T> { public BoundedQueue<T> save(T curElem) { if (size() == 2) { // we need to know only two subsequent elements pollLast(); // remove last to keep only requested number of elements } offerFirst(curElem); return this; } public T getPrevious() { return (size() < 2) ? null : getLast(); } public T getCurrent() { return (size() == 0) ? null : getFirst(); } } BoundedQueue<Integer> streamHistory = new BoundedQueue<Integer>(); final List<Pair<Integer>> answer = input.stream() .map(i -> streamHistory.save(i)) .filter(e -> e.getPrevious() != null) .map(e -> new Pair<Integer>(e.getPrevious(), e.getCurrent())) .collect(Collectors.toList()); answer.forEach(System.out::println); } }


Puedes hacer esto con el método Stream.reduce () (no he visto ninguna otra respuesta usando esta técnica).

public static <T> List<Pair<T, T>> consecutive(List<T> list) { List<Pair<T, T>> pairs = new LinkedList<>(); list.stream().reduce((a, b) -> { pairs.add(new Pair<>(a, b)); return b; }); return pairs; }


Puedes hacer esto en cyclops-react (yo contribuyo a esta biblioteca), usando el operador deslizante.

LazyFutureStream.of( 0, 1, 2, 3, 4 ) .sliding(2) .map(Pair::new);

O

ReactiveSeq.of( 0, 1, 2, 3, 4 ) .sliding(2) .map(Pair::new);

Suponiendo que el constructor Pair pueda aceptar una Colección con 2 elementos.

Si desea agrupar por 4, e incremente por 2 que también sea compatible.

ReactiveSeq.rangeLong( 0L,Long.MAX_VALUE) .sliding(4,2) .forEach(System.out::println);

Los métodos estáticos de Equivalant para crear una vista deslizante sobre java.util.stream.Stream también se proporcionan en cyclops-streams de la clase StreamUtils .

StreamUtils.sliding(Stream.of(1,2,3,4),2) .map(Pair::new);

Nota: - para el funcionamiento de un solo hilo ReactiveSeq sería más apropiado. LazyFutureStream amplía ReactiveSeq pero está orientado principalmente para uso concurrente / paralelo (es un flujo de futuros).

LazyFutureStream amplía ReactiveSeq, que amplía Seq desde el impresionante jOOλ (que extiende java.util.stream.Stream), por lo que las soluciones que presenta Lukas también funcionarían con cualquier tipo de Stream. Para cualquier persona interesada, las principales diferencias entre los operadores de ventana / deslizamiento son la obvia relación de potencia / complejidad relativa e idoneidad para el uso con flujos infinitos (el deslizamiento no consume la secuencia, sino que se almacena temporalmente a medida que fluye).


Una solución elegante sería usar zip . Algo como:

List<Integer> input = Arrays.asList(0, 1, 2, 3, 4); Stream<Pair> pairStream = Streams.zip(input.stream(), input.stream().substream(1), (a, b) -> new Pair(a, b) );

Esto es bastante conciso y elegante, sin embargo, utiliza una lista como entrada. Una fuente de transmisión infinita no se puede procesar de esta manera.

Otro problema (mucho más problemático) es que zip junto con toda la clase Streams se eliminó recientemente de la API. El código anterior solo funciona con b95 o anteriores. Entonces, con el último JDK, diría que no hay una solución elegante de estilo FP, y ahora mismo podemos esperar que de alguna manera se reintroduzca el zip en la API.