streams procesamiento print parte examples ejemplo datos con collection java java-8 java-stream

java - procesamiento - Limitar un flujo por un predicado



stream java 8 ejemplo (16)

¿Existe una operación de flujo Java 8 que limite una Stream (potencialmente infinita) hasta que el primer elemento no coincida con un predicado? Algo que se parece a la operación takeWhile (inexistente) en el ejemplo a continuación e imprimiría todos los números menores que 10?

IntStream .iterate(1, n -> n + 1) .takeWhile(n -> n < 10) .forEach(System.out::println);

Si no existe tal operación, ¿cuál es la mejor forma de implementarla de una manera general?


Aquí está mi intento de usar solo la biblioteca de Java Stream.

IntStream.iterate(0, i -> i + 1) .filter(n -> { if (n < 10) { System.out.println(n); return false; } else { return true; } }) .findAny();


Aquí hay una versión hecha en ints - como se preguntó en la pregunta.

Uso:

StreamUtil.takeWhile(IntStream.iterate(1, n -> n + 1), n -> n < 10);

Aquí está el código de StreamUtil:

import java.util.PrimitiveIterator; import java.util.Spliterators; import java.util.function.IntConsumer; import java.util.function.IntPredicate; import java.util.stream.IntStream; import java.util.stream.StreamSupport; public class StreamUtil { public static IntStream takeWhile(IntStream stream, IntPredicate predicate) { return StreamSupport.intStream(new PredicateIntSpliterator(stream, predicate), false); } private static class PredicateIntSpliterator extends Spliterators.AbstractIntSpliterator { private final PrimitiveIterator.OfInt iterator; private final IntPredicate predicate; public PredicateIntSpliterator(IntStream stream, IntPredicate predicate) { super(Long.MAX_VALUE, IMMUTABLE); this.iterator = stream.iterator(); this.predicate = predicate; } @Override public boolean tryAdvance(IntConsumer action) { if (iterator.hasNext()) { int value = iterator.nextInt(); if (predicate.test(value)) { action.accept(value); return true; } } return false; } } }


Como seguimiento de la respuesta de @StuartMarks . Mi biblioteca StreamEx tiene la operación takeWhile que es compatible con la implementación actual de JDK-9. Cuando se ejecuta bajo JDK-9 simplemente delegará a la implementación de JDK (a través de MethodHandle.invokeExact que es realmente rápido). Cuando se ejecuta bajo JDK-8, se usará la implementación de "polyfill". Entonces, usando mi biblioteca, el problema se puede resolver así:

IntStreamEx.iterate(1, n -> n + 1) .takeWhile(n -> n < 10) .forEach(System.out::println);


En realidad, hay 2 formas de hacerlo en Java 8 sin bibliotecas adicionales o usando Java 9.

Si desea imprimir números del 2 al 20 en la consola, puede hacer esto:

IntStream.iterate(2, (i) -> i + 2).peek(System.out::println).allMatch(i -> i < 20);

o

IntStream.iterate(2, (i) -> i + 2).peek(System.out::println).anyMatch(i -> i >= 20);

El resultado es en ambos casos:

2 4 6 8 10 12 14 16 18 20

Nadie mencionó anyMatch todavía. Este es el motivo de esta publicación.


Esta es la fuente copiada de JDK 9 java.util.stream.Stream.takeWhile (Predicado).

static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> p) { class Taking extends Spliterators.AbstractSpliterator<T> implements Consumer<T> { private static final int CANCEL_CHECK_COUNT = 63; private final Spliterator<T> s; private int count; private T t; private final AtomicBoolean cancel = new AtomicBoolean(); private boolean takeOrDrop = true; Taking(Spliterator<T> s) { super(s.estimateSize(), s.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED)); this.s = s; } @Override public boolean tryAdvance(Consumer<? super T> action) { boolean test = true; if (takeOrDrop && // If can take (count != 0 || !cancel.get()) && // and if not cancelled s.tryAdvance(this) && // and if advanced one element (test = p.test(t))) { // and test on element passes action.accept(t); // then accept element return true; } else { // Taking is finished takeOrDrop = false; // Cancel all further traversal and splitting operations // only if test of element failed (short-circuited) if (!test) cancel.set(true); return false; } } @Override public Comparator<? super T> getComparator() { return s.getComparator(); } @Override public void accept(T t) { count = (count + 1) & CANCEL_CHECK_COUNT; this.t = t; } @Override public Spliterator<T> trySplit() { return null; } } return StreamSupport.stream(new Taking(stream.spliterator()), stream.isParallel()).onClose(stream::close); }


Estoy seguro de que esto se puede mejorar mucho: (alguien podría hacerlo seguro a subprocesos tal vez)

Stream<Integer> stream = Stream.iterate(0, n -> n + 1); TakeWhile.stream(stream, n -> n < 10000) .forEach(n -> System.out.print((n == 0 ? "" + n : "," + n)));

Un truco seguro ... No elegante, pero funciona ~: D

class TakeWhile<T> implements Iterator<T> { private final Iterator<T> iterator; private final Predicate<T> predicate; private volatile T next; private volatile boolean keepGoing = true; public TakeWhile(Stream<T> s, Predicate<T> p) { this.iterator = s.iterator(); this.predicate = p; } @Override public boolean hasNext() { if (!keepGoing) { return false; } if (next != null) { return true; } if (iterator.hasNext()) { next = iterator.next(); keepGoing = predicate.test(next); if (!keepGoing) { next = null; } } return next != null; } @Override public T next() { if (next == null) { if (!hasNext()) { throw new NoSuchElementException("Sorry. Nothing for you."); } } T temp = next; next = null; return temp; } public static <T> Stream<T> stream(Stream<T> s, Predicate<T> p) { TakeWhile tw = new TakeWhile(s, p); Spliterator split = Spliterators.spliterator(tw, Integer.MAX_VALUE, Spliterator.ORDERED); return StreamSupport.stream(split, false); } }


Incluso yo tenía un requisito similar: invocar el servicio web, si falla, vuelva a intentarlo 3 veces. Si falla incluso después de estas muchas pruebas, envíe una notificación por correo electrónico. Después de buscar en Google mucho, "anyMatch" llegó como un salvador. Mi código de muestra de la siguiente manera. En el siguiente ejemplo, si el método "webServiceCall" devuelve verdadero en la primera iteración en sí, la transmisión no itera más ya que hemos llamado "anyMatch". Creo que esto es lo que estás buscando.

importar java.util.stream.IntStream;

import io.netty.util.internal.ThreadLocalRandom;

clase TrialStreamMatch {

public static void main(String[] args) { if(!IntStream.range(1,3).anyMatch(integ -> webServiceCall(integ))){ //Code for sending email notifications } } public static boolean webServiceCall(int i){ //For time being, I have written a code for generating boolean randomly //This whole piece needs to be replaced by actual web-service client code boolean bool = ThreadLocalRandom.current().nextBoolean(); System.out.println("Iteration index :: "+i+" bool :: "+bool); //Return success status -- true or false return bool; }

}


Las operaciones takeWhile y dropWhile se han agregado a JDK 9. Su código de ejemplo

IntStream .iterate(1, n -> n + 1) .takeWhile(n -> n < 10) .forEach(System.out::println);

se comportará exactamente como lo espera al compilarse y ejecutarse bajo JDK 9.

JDK 9 ha sido lanzado. Está disponible para descargar aquí: http://jdk.java.net/9/


No se puede abortar una transmisión, excepto mediante una operación de cortocircuito en la terminal, lo que dejaría sin procesar algunos valores de flujo independientemente de su valor. Pero si solo quiere evitar operaciones en una secuencia, puede agregar una transformación y filtrar a la transmisión:

import java.util.Objects; class ThingProcessor { static Thing returnNullOnCondition(Thing thing) { return( (*** is condition met ***)? null : thing); } void processThings(Collection<Thing> thingsCollection) { thingsCollection.stream() *** regular stream processing *** .map(ThingProcessor::returnNullOnCondition) .filter(Objects::nonNull) *** continue stream processing *** } } // class ThingProcessor

Eso transforma el flujo de cosas en nulos cuando las cosas cumplen alguna condición, luego filtra nulos. Si está dispuesto a disfrutar de los efectos secundarios, puede establecer el valor de la condición en verdadero una vez que se encuentre algo, por lo que todas las cosas posteriores se filtran, independientemente de su valor. Pero incluso si no, puede guardar una gran cantidad de procesamiento (si no del todo) al filtrar los valores de la transmisión que no desea procesar.


Puedes usar java8 + rxjava .

import java.util.stream.IntStream; import rx.Observable; // Example 1) IntStream intStream = IntStream.iterate(1, n -> n + 1); Observable.from(() -> intStream.iterator()) .takeWhile(n -> { System.out.println(n); return n < 10; } ).subscribe() ; // Example 2 IntStream intStream = IntStream.iterate(1, n -> n + 1); Observable.from(() -> intStream.iterator()) .takeWhile(n -> n < 10) .forEach( n -> System.out.println(n));


Si tiene un problema diferente, puede que necesite una solución diferente, pero para su problema actual, simplemente iré con:

IntStream .iterate(1, n -> n + 1) .limit(10) .forEach(System.out::println);


Tal operación debería ser posible con un Java 8 Stream , pero no necesariamente se puede hacer de manera eficiente; por ejemplo, no se puede paralelizar necesariamente una operación de este tipo, ya que hay que mirar los elementos en orden.

La API no proporciona una manera fácil de hacerlo, pero lo más probable es tomar Stream.iterator() , ajustar el Iterator para tener una implementación "take-while", y luego volver a un Spliterator y luego una Stream . O, quizás, envuelva el Spliterator , aunque ya no se puede dividir en esta implementación.

Aquí hay una implementación no probada de takeWhile en un Spliterator :

static <T> Spliterator<T> takeWhile( Spliterator<T> splitr, Predicate<? super T> predicate) { return new Spliterators.AbstractSpliterator<T>(splitr.estimateSize(), 0) { boolean stillGoing = true; @Override public boolean tryAdvance(Consumer<? super T> consumer) { if (stillGoing) { boolean hadNext = splitr.tryAdvance(elem -> { if (predicate.test(elem)) { consumer.accept(elem); } else { stillGoing = false; } }); return hadNext && stillGoing; } return false; } }; } static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> predicate) { return StreamSupport.stream(takeWhile(stream.spliterator(), predicate), false); }


Tengo otra solución rápida implementando esto (que de hecho es muy sucio, pero entiendes la idea):

public static void main(String[] args) { System.out.println(StreamUtil.iterate(1, o -> o + 1).terminateOn(15) .map(o -> o.toString()).collect(Collectors.joining(", "))); } static interface TerminatedStream<T> { Stream<T> terminateOn(T e); } static class StreamUtil { static <T> TerminatedStream<T> iterate(T seed, UnaryOperator<T> op) { return new TerminatedStream<T>() { public Stream<T> terminateOn(T e) { Builder<T> builder = Stream.<T> builder().add(seed); T current = seed; while (!current.equals(e)) { current = op.apply(current); builder.add(current); } return builder.build(); } }; } }


Ve a buscar la biblioteca AbacusUtil . Proporciona la API exacta que desea y más:

IntStream.iterate(1, n -> n + 1).takeWhile(n -> n < 10).forEach(System.out::println);

Declaración: soy el desarrollador de AbacusUtil.


allMatch() es una función de cortocircuito, por lo que puede usarlo para detener el procesamiento. La principal desventaja es que debe realizar su prueba dos veces: una vez para ver si debe procesarla y otra vez para ver si continúa.

IntStream .iterate(1, n -> n + 1) .peek(n->{if (n<10) System.out.println(n);}) .allMatch(n->n < 10);


takeWhile es una de las funciones proporcionadas por la biblioteca protonpack .

Stream<Integer> infiniteInts = Stream.iterate(0, i -> i + 1); Stream<Integer> finiteInts = StreamUtils.takeWhile(infiniteInts, i -> i < 10); assertThat(finiteInts.collect(Collectors.toList()), hasSize(10));