java - procesamiento - ¿Puedes dividir una secuencia en dos flujos?
stream java 8 ejemplo (9)
Tengo un conjunto de datos representado por una secuencia de Java 8:
Stream<T> stream = ...;
Puedo ver cómo filtrarlo para obtener un subconjunto aleatorio, por ejemplo
Random r = new Random();
PrimitiveIterator.OfInt coin = r.ints(0, 2).iterator();
Stream<T> heads = stream.filter((x) -> (coin.nextInt() == 0));
También puedo ver cómo podría reducir esta secuencia para obtener, por ejemplo, dos listas que representan dos mitades aleatorias del conjunto de datos, y luego convertirlas en flujos. Pero, ¿hay una forma directa de generar dos secuencias desde la inicial? Algo como
(heads, tails) = stream.[some kind of split based on filter]
Gracias por cualquier idea.
Desafortunadamente, lo que pides está directamente mal visto en el JavaDoc de Stream :
Una secuencia debe ser operada (invocando una operación de flujo intermedio o terminal) solo una vez. Esto excluye, por ejemplo, las transmisiones "bifurcadas", donde la misma fuente alimenta dos o más canalizaciones, o múltiples cruces de la misma secuencia.
Puede solucionar esto utilizando el método peek
u otros métodos si realmente desea ese tipo de comportamiento. En este caso, lo que debe hacer es tratar de respaldar dos flujos de la misma fuente de Stream original con un filtro de bifurcación, duplicar su flujo y filtrar cada uno de los duplicados de manera apropiada.
Sin embargo, es posible que desee reconsiderar si un Stream
es la estructura adecuada para su caso de uso.
Esta fue la menos mala respuesta que pude encontrar.
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
public class Test {
public static <T, L, R> Pair<L, R> splitStream(Stream<T> inputStream, Predicate<T> predicate,
Function<Stream<T>, L> trueStreamProcessor, Function<Stream<T>, R> falseStreamProcessor) {
Map<Boolean, List<T>> partitioned = inputStream.collect(Collectors.partitioningBy(predicate));
L trueResult = trueStreamProcessor.apply(partitioned.get(Boolean.TRUE).stream());
R falseResult = falseStreamProcessor.apply(partitioned.get(Boolean.FALSE).stream());
return new ImmutablePair<L, R>(trueResult, falseResult);
}
public static void main(String[] args) {
Stream<Integer> stream = Stream.iterate(0, n -> n + 1).limit(10);
Pair<List<Integer>, String> results = splitStream(stream,
n -> n > 5,
s -> s.filter(n -> n % 2 == 0).collect(Collectors.toList()),
s -> s.map(n -> n.toString()).collect(Collectors.joining("|")));
System.out.println(results);
}
}
Esto toma un flujo de enteros y los divide en 5. Para aquellos mayores de 5, filtra solo números pares y los pone en una lista. Por lo demás, los une con |.
productos:
([6, 8],0|1|2|3|4|5)
No es ideal ya que recoge todo en colecciones intermedias rompiendo la corriente (¡y tiene demasiados argumentos!)
Esto está en contra del mecanismo general de Stream. Digamos que puedes dividir Stream S0 entre Sa y Sb como quisieras. Realizar cualquier operación de terminal, digamos count()
, en Sa necesariamente "consumirá" todos los elementos en S0. Por lo tanto, Sb perdió su fuente de datos.
Anteriormente, Stream tenía un método tee()
, creo, que duplicaba una secuencia a dos. Se elimina ahora.
Sin embargo, Stream tiene un método peek (), es posible que pueda usarlo para cumplir sus requisitos.
Me encontré con esta pregunta mientras buscaba una manera de filtrar ciertos elementos fuera de una transmisión y registrarlos como errores. Así que realmente no necesitaba dividir la transmisión tanto como adjuntar una acción de terminación prematura a un predicado con sintaxis discreta. Esto es lo que se me ocurrió:
public class MyProcess {
/* Return a Predicate that performs a bail-out action on non-matching items. */
private static <T> Predicate<T> withAltAction(Predicate<T> pred, Consumer<T> altAction) {
return x -> {
if (pred.test(x)) {
return true;
}
altAction.accept(x);
return false;
};
/* Example usage in non-trivial pipeline */
public void processItems(Stream<Item> stream) {
stream.filter(Objects::nonNull)
.peek(this::logItem)
.map(Item::getSubItems)
.filter(withAltAction(SubItem::isValid,
i -> logError(i, "Invalid")))
.peek(this::logSubItem)
.filter(withAltAction(i -> i.size() > 10,
i -> logError(i, "Too large")))
.map(SubItem::toDisplayItem)
.forEach(this::display);
}
}
Me encontré con esta pregunta y siento que un flujo de fork tiene algunos casos de uso que podrían ser válidos. Escribí el código siguiente como consumidor para que no haga nada, pero podría aplicarlo a las funciones y a cualquier otra cosa que pueda encontrar.
class PredicateSplitterConsumer<T> implements Consumer<T>
{
private Predicate<T> predicate;
private Consumer<T> positiveConsumer;
private Consumer<T> negativeConsumer;
public PredicateSplitterConsumer(Predicate<T> predicate, Consumer<T> positive, Consumer<T> negative)
{
this.predicate = predicate;
this.positiveConsumer = positive;
this.negativeConsumer = negative;
}
@Override
public void accept(T t)
{
if (predicate.test(t))
{
positiveConsumer.accept(t);
}
else
{
negativeConsumer.accept(t);
}
}
}
Ahora la implementación de su código podría ser algo como esto:
personsArray.forEach(
new PredicateSplitterConsumer<>(
person -> person.getDateOfBirth().isPresent(),
person -> System.out.println(person.getName()),
person -> System.out.println(person.getName() + " does not have Date of birth")));
No exactamente. No puedes obtener dos Stream
s de uno; esto no tiene sentido. ¿Cómo iterarías sobre uno sin necesidad de generar el otro al mismo tiempo? Una secuencia solo puede ser operada una vez.
Sin embargo, si desea volcarlos en una lista o algo así, podría hacer
stream.forEach((x) -> ((x == 0) ? heads : tails).add(x));
Qué tal si:
Supplier<Stream<Integer>> randomIntsStreamSupplier =
() -> (new Random()).ints(0, 2).boxed();
Stream<Integer> tails =
randomIntsStreamSupplier.get().filter(x->x.equals(0));
Stream<Integer> heads =
randomIntsStreamSupplier.get().filter(x->x.equals(1));
Un coleccionista puede ser usado para esto.
- Para dos categorías, use
Collectors.partitioningBy()
factory.
Esto creará un Map
de Boolean
a List
y colocará los elementos en una u otra lista basada en un Predicate
.
Nota: dado que la secuencia debe consumirse en su totalidad, esto no puede funcionar en transmisiones infinitas. Debido a que la secuencia se consume de todos modos, este método simplemente los coloca en Listas en lugar de crear una nueva secuencia con memoria.
Además, no es necesario el iterador, ni siquiera en el ejemplo de solo cabeza que proporcionó.
Random r = new Random();
Map<Boolean, List<String>> groups = stream
.collect(Collectors.partitioningBy(x -> r.nextBoolean()));
System.out.println(groups.get(false).size());
System.out.println(groups.get(true).size());
- Para más categorías, use una fábrica de
Collectors.groupingBy()
.
Map<Object, List<String>> groups = stream
.collect(Collectors.groupingBy(x -> r.nextInt(3)));
System.out.println(groups.get(0).size());
System.out.println(groups.get(1).size());
System.out.println(groups.get(2).size());
En caso de que las transmisiones no sean Stream
, sino una de las primitivas como IntStream
, este .collect(Collectors)
no está disponible. Tendrás que hacerlo de forma manual sin una fábrica de colectores. Su implementación se ve así:
IntStream intStream = IntStream.iterate(0, i -> i + 1).limit(1000000);
Predicate<Integer> p = x -> r.nextBoolean();
Map<Boolean, List<Integer>> groups = intStream.collect(() -> {
Map<Boolean, List<Integer>> map = new HashMap<>();
map.put(false, new ArrayList<>());
map.put(true, new ArrayList<>());
return map;
}, (map, x) -> {
boolean partition = p.test(x);
List<Integer> list = map.get(partition);
list.add(x);
}, (map1, map2) -> {
map1.get(false).addAll(map2.get(false));
map1.get(true).addAll(map2.get(true));
});
System.out.println(groups.get(false).size());
System.out.println(groups.get(true).size());
Editar
Como se señaló, la ''solución alternativa'' anterior no es segura para subprocesos. La conversión a una Stream
normal antes de la recolección es el camino a seguir:
Stream<Integer> stream = intStream.boxed();
no exactamente, pero puede lograr lo que necesita al invocar Collectors.groupingBy()
. usted crea una nueva Colección, y luego puede crear instancias de las secuencias en esa nueva colección.