method - Java 8: detener la operación de reducción de examinar todos los elementos de Stream
reduce method java stream (6)
Estoy tratando de entender si hay una manera de terminar la operación de reducción sin examinar todo el flujo y no puedo encontrar una manera.
El caso de uso es aproximadamente el siguiente: permita que haya una larga lista de Integer
que se deben plegar en un Accumulator
. Cada examen de elemento es potencialmente costoso, por lo que dentro del Accumulator
, realizo una verificación del Accumulator
entrante para ver si es necesario realizar una operación costosa. Si no lo hacemos, simplemente devuelvo el acumulador.
Obviamente, esta es una buena solución para las listas pequeñas (er), pero las listas enormes incurren en costos innecesarios de visitas por elemento de la corriente que quisiera evitar.
Aquí hay un bosquejo de código - asume solo reducciones en serie.
class Accumulator {
private final Set<A> setA = new HashSet<>;
private final Set<B> setB = new HashSet<>;
}
class ResultSupplier implements Supplier<Result> {
private final List<Integer> ids;
@Override
public Result get() {
Accumulator acc = ids.stream().reduce(new Accumulator(), f(), (x, y) -> null);
return (acc.setA.size > 1) ? Result.invalid() : Result.valid(acc.setB);
}
private static BiFunction<Accumulator, Integer, Accumulator> f() {
return (acc, element) -> {
if (acc.setA.size() <= 1) {
// perform expensive ops and accumulate results
}
return acc;
};
}
}
Además de tener que atravesar todo el Stream
, hay otro hecho que no me gusta: tengo que verificar la misma condición dos veces (es decir, setA
tamaño).
He considerado las operaciones map()
y collect()
, pero parecían más de lo mismo y no encontraron que cambien materialmente el hecho de que simplemente no puedo terminar la operación de plegado sin examinar todo el flujo.
Además, mi pensamiento es que el corresponsal de Stream API imaginario takeWhile(p : (A) => boolean)
Stream también nos compraría nada, ya que la condición de terminación depende del acumulador, no de los elementos de stream per se.
Tenga en cuenta que soy relativamente nuevo en FP, así que, ¿hay alguna manera de hacer que esto funcione como lo espero? ¿He configurado incorrectamente todo el problema o es esta limitación por diseño?
Como se mencionó en los comentarios: El escenario de uso suena un poco dudoso. Por un lado, debido al uso de reduce
lugar de collect
, por otro lado, debido al hecho de que la condición que debe usarse para detener la reducción también aparece en el acumulador. Parece que simplemente limitar el flujo a un cierto número de elementos, o basarse en una condición, como se muestra en otra pregunta , puede ser más apropiado aquí.
Por supuesto, en la aplicación real, podría ser que la condición, de hecho, no esté relacionada con la cantidad de elementos que se han procesado. Para este caso, esbozo aquí una solución que básicamente corresponde a la respuesta de the8472 , y es muy similar a la solución de la pregunta mencionada anteriormente: utiliza un Stream
que se crea a partir de un Spliterator
que simplemente delega al Spliterator
original, a menos que Se cumple la condición de parada.
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class StopStreamReduction
{
public static void main(String[] args)
{
ResultSupplier r = new ResultSupplier();
System.out.println(r.get());
}
}
class Accumulator
{
final Set<Integer> set = new HashSet<Integer>();
}
class ResultSupplier implements Supplier<String>
{
private final List<Integer> ids;
ResultSupplier()
{
ids = new ArrayList<Integer>(Collections.nCopies(20, 1));
}
public String get()
{
//return getOriginal();
return getStopping();
}
private String getOriginal()
{
Accumulator acc =
ids.stream().reduce(new Accumulator(), f(), (x, y) -> null);
return (acc.set.size() > 11) ? "invalid" : String.valueOf(acc.set);
}
private String getStopping()
{
Spliterator<Integer> originalSpliterator = ids.spliterator();
Accumulator accumulator = new Accumulator();
Spliterator<Integer> stoppingSpliterator =
new Spliterators.AbstractSpliterator<Integer>(
originalSpliterator.estimateSize(), 0)
{
@Override
public boolean tryAdvance(Consumer<? super Integer> action)
{
return accumulator.set.size() > 10 ? false :
originalSpliterator.tryAdvance(action);
}
};
Stream<Integer> stream =
StreamSupport.stream(stoppingSpliterator, false);
Accumulator acc =
stream.reduce(accumulator, f(), (x, y) -> null);
return (acc.set.size() > 11) ? "invalid" : String.valueOf(acc.set);
}
private static int counter = 0;
private static BiFunction<Accumulator, Integer, Accumulator> f()
{
return (acc, element) -> {
System.out.print("Step " + counter);
if (acc.set.size() <= 10)
{
System.out.print(" expensive");
acc.set.add(counter);
}
System.out.println();
counter++;
return acc;
};
}
}
Editar en respuesta a los comentarios:
Por supuesto, es posible escribirlo "más funcional". Sin embargo, debido a las vagas descripciones en las preguntas y al ejemplo de código bastante "incompleto", es difícil encontrar "LA" solución más apropiada aquí. (Y "apropiado" se refiere a las advertencias específicas de la tarea real, y a la pregunta de qué tan funcional debería ser sin sacrificar la legibilidad).
Los posibles pasos de funcionalización podrían incluir la creación de una clase de StoppingSpliterator
Spliterator
genérica que opera en un Spliterator
delegado y tiene un Supplier<Boolean>
como condición de parada, y alimenta esto con un Predicate
en el Accumulator
real, junto con el uso de algunos métodos de utilidad y referencias de métodos. aquí y allá.
Pero nuevamente: es discutible si esta es realmente una solución apropiada, o si uno no debería usar más bien la solución simple y pragmática de la respuesta de Lukas Eder ...
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.StreamSupport;
public class StopStreamReduction
{
public static void main(String[] args)
{
List<Integer> collection =
new ArrayList<Integer>(Collections.nCopies(20, 1));
System.out.println(compute(collection));
}
private static String compute(List<Integer> collection)
{
Predicate<Accumulator> stopCondition = (a) -> a.set.size() > 10;
Accumulator result = reduceStopping(collection,
new Accumulator(), StopStreamReduction::accumulate, stopCondition);
return (result.set.size() > 11) ? "invalid" : String.valueOf(result.set);
}
private static int counter;
private static Accumulator accumulate(Accumulator a, Integer element)
{
System.out.print("Step " + counter);
if (a.set.size() <= 10)
{
System.out.print(" expensive");
a.set.add(counter);
}
System.out.println();
counter++;
return a;
}
static <U, T> U reduceStopping(
Collection<T> collection, U identity,
BiFunction<U, ? super T, U> accumulator,
Predicate<U> stopCondition)
{
// This assumes that the accumulator always returns
// the identity instance (with the accumulated values).
// This may not always be true!
return StreamSupport.stream(
new StoppingSpliterator<T>(
collection.spliterator(),
() -> stopCondition.test(identity)), false).
reduce(identity, accumulator, (x, y) -> null);
}
}
class Accumulator
{
final Set<Integer> set = new HashSet<Integer>();
}
class StoppingSpliterator<T> extends Spliterators.AbstractSpliterator<T>
{
private final Spliterator<T> delegate;
private final Supplier<Boolean> stopCondition;
StoppingSpliterator(Spliterator<T> delegate, Supplier<Boolean> stopCondition)
{
super(delegate.estimateSize(), 0);
this.delegate = delegate;
this.stopCondition = stopCondition;
}
@Override
public boolean tryAdvance(Consumer<? super T> action)
{
if (stopCondition.get())
{
return false;
}
return delegate.tryAdvance(action);
}
}
Creo que es posible lanzar una RuntimeException
de un tipo especial desde su colector personalizado (o reducir la operación) que incorpora el resultado dentro del objeto de excepción y lo atrapa fuera de la operación de collect
que desenvuelve el resultado. Sé que usar la excepción para un flujo de control no excepcional no es idiomático, pero debería funcionar en su caso incluso para flujos paralelos.
En realidad, hay muchos casos en que la reducción de cortocircuito podría ser útil. Por ejemplo, recopile los valores de enumeración en EnumSet
(puede detenerse tan pronto como descubra que ya se han recopilado todos los valores de enumeración posibles). O intersecte todos los elementos de Stream<Set>
(puede detenerse si su conjunto resultante se vacía después de algún paso: continuar con la reducción es inútil). Internamente, hay un indicador SHORT_CIRCUIT
que se usa en operaciones de flujo como findFirst
, pero no está expuesto a la API pública.
En lugar de comenzar con ids.stream()
puedes
- utilizar
ids.spliterator()
- envuelva el separador resultante en un separador personalizado que tenga una bandera booleana volátil
- hacer que tryAdvance del
tryAdvance
devuelva falso si se cambia el indicador - convierte tu spliterator personalizado en una secuencia con
StreamSupport.stream(Spliterator<T>, boolean)
- continúa tu flujo de tuberías como antes
- apague el flujo alternando el booleano cuando su acumulador esté lleno
Agrega algunos métodos de ayuda estática para mantenerlo funcional.
La API resultante podría ver esto
Accumulator acc = terminateableStream(ids, (stream, terminator) ->
stream.reduce(new Accumulator(terminator), f(), (x, y) -> null));
Además, mi opinión es que el corresponsal de la API Stream de imaginario takeWhile (p: (A) => boolean) tampoco nos compraría nada
Funciona si la condición depende del estado del acumulador y no de los miembros del flujo. Ese es esencialmente el enfoque que he descrito anteriormente.
Probablemente se prohibiría en un takeWhile
proporcionado por el JDK, pero una implementación personalizada utilizando spliterators es libre de adoptar un enfoque de estado.
Estoy de acuerdo con todas las respuestas anteriores. Lo estás haciendo mal al forzar una reducción en un acumulador mutable. Además, el proceso que está describiendo no se puede expresar como un conducto de transformaciones y reducciones.
Si realmente necesitas hacerlo al estilo de FP, lo haría como @ @8472 señala.
De todos modos, te ofrezco una nueva alternativa más compacta, similar a la solución de @lukas-eder, utilizando un iterador:
Function<Integer, Integer> costlyComputation = Function.identity();
Accumulator acc = new Accumulator();
Iterator<Integer> ids = Arrays.asList(1, 2, 3).iterator();
while (!acc.hasEnough() && ids.hasNext())
costlyComputation.andThen(acc::add).apply(ids.next());
Usted tiene dos preocupaciones diferentes con respecto a FP aquí:
Cómo dejar de iterar
Como dependes del estado mutable, la FP solo hará que tu vida sea más difícil. Puede iterar externamente la colección o usar un iterador como propongo.
Luego, use un if () para detener la iteración.
Puedes pensar en diferentes estrategias, pero al final del día, esto es lo que estás usando.
Prefiero el iterador porque es más idiomático (expresa mejor tu intención en este caso).
Cómo diseñar el Acumulador y la costosa operación.
Esto es lo más interesante para mí.
Una función pura no puede tener estado, debe recibir algo y debe devolver algo, y siempre es lo mismo para la misma entrada (como una función matemática). ¿Puedes expresar tu operación costosa como esta?
¿Necesita algún estado compartido con el Acumulador? Tal vez lo compartido no pertenezca a ninguno de ellos.
¿Transformará su entrada y luego la agregará al Acumulador o es responsabilidad del Acumulador? ¿Tiene sentido inyectar la función en el acumulador?
No existe una solución FP real, simplemente porque su acumulador completo no es FP. No podemos ayudarlo en este sentido, ya que no sabemos qué está haciendo realmente. Todo lo que vemos es que se basa en dos colecciones mutables y, por lo tanto, no puede ser parte de una solución de FP pura.
Si acepta las limitaciones y no existe una forma clara de utilizar la API de Stream
, puede esforzarse por lograrlo de manera sencilla . La forma simple incorpora un Predicate
estado que no es lo mejor, pero a veces inevitable:
public Result get() {
int limit = 1;
Set<A> setA=new HashSet<>();
Set<B> setB=new HashSet<>();
return ids.stream().anyMatch(i -> {
// perform expensive ops and accumulate results
return setA.size() > limit;
})? Result.invalid(): Result.valid(setB);
}
Pero quiero señalar que dada su lógica específica, es decir, que su resultado se considera inválido cuando el conjunto crece demasiado, su intento de procesar no demasiados elementos es una optimización del caso erróneo . No debes perder el esfuerzo en optimizar eso. Si un resultado válido es el resultado de procesar todos los elementos, entonces procese todos los elementos ...
Por supuesto, habrá una respuesta puramente interesante que podría ayudar a resolver este problema de la manera que usted pretende.
Mientras tanto, ¿por qué usar FP cuando la solución simple es pragmáticamente imperativa y su fuente de datos original es una List
todos modos, que ya está completamente materializada, y usará la reducción en serie, no la reducción paralela? Escribe esto en su lugar:
@Override
public Result get() {
Accumulator acc = new Accumulator();
for (Integer id : ids) {
if (acc.setA.size() <= 1) {
// perform expensive ops and accumulate results
}
// Easy:
if (enough)
break;
}
return (acc.setA.size > 1) ? Result.invalid() : Result.valid(acc.setB);
}