que - Java 8 stream combiner nunca llamado
que es adobe flash (2)
Bueno, eso es exactamente lo que solicitas al especificar Characteristics.CONCURRENT
:
Indica que este recopilador es concurrente , lo que significa que el contenedor de resultados puede admitir que la función del acumulador se llame simultáneamente con el mismo contenedor de resultados de varios subprocesos.
Si ese no es el caso, como con su Collector
, no debe especificar esa bandera.
Como nota al margen, new HashSet<Characteristics>(Arrays.asList(Characteristics.CONCURRENT, Characteristics.UNORDERED));
Es bastante ineficiente para especificar características. Solo puede usar EnumSet.of(Characteristics.CONCURRENT, Characteristics.UNORDERED)
. Cuando eliminas una característica concurrente incorrecta, puedes usar EnumSet.of(Characteristics.UNORDERED)
o Collections.singleton(Characteristics.UNORDERED)
, pero un HashSet
definitivamente es un exceso.
Estoy escribiendo un colector java 8 personalizado que se supone que calcula el promedio de un POJO que tiene un método getValue()
. Aquí está el código:
public static Collector<BoltAggregationData, BigDecimal[], BigDecimal> avgCollector = new Collector<BoltAggregationData, BigDecimal[], BigDecimal>() {
@Override
public Supplier<BigDecimal[]> supplier() {
return () -> {
BigDecimal[] start = new BigDecimal[2];
start[0] = BigDecimal.ZERO;
start[1] = BigDecimal.ZERO;
return start;
};
}
@Override
public BiConsumer<BigDecimal[], BoltAggregationData> accumulator() {
return (a,b) -> {
a[0] = a[0].add(b.getValue());
a[1] = a[1].add(BigDecimal.ONE);
};
}
@Override
public BinaryOperator<BigDecimal[]> combiner() {
return (a,b) -> {
a[0] = a[0].add(b[0]);
a[1] = a[1].add(b[1]);
return a;
};
}
@Override
public Function<BigDecimal[], BigDecimal> finisher() {
return (a) -> {
return a[0].divide(a[1], 6 , RoundingMode.HALF_UP);
};
}
private final Set<Characteristics> CHARACTERISTICS = new HashSet<Characteristics>(Arrays.asList(Characteristics.CONCURRENT, Characteristics.UNORDERED));
@Override
public Set<Characteristics> characteristics() {
return CHARACTERISTICS;
}
};
Todo funciona bien en el caso no paralelo. Sin embargo, cuando uso un parallelStream()
, a veces no funciona. Por ejemplo, dados los valores de 1 a 10, se calcula (53/9 en lugar de 55/10). Al depurar, el depurador nunca llega al punto de interrupción en la función combiner (). ¿Hay algún tipo de bandera que necesito establecer?
Parece que el problema es la característica CONCURRENT
, que hace algo más de lo que usted podría pensar:
Indica que este recopilador es concurrente , lo que significa que el contenedor de resultados puede admitir que la función del acumulador se llame simultáneamente con el mismo contenedor de resultados de varios subprocesos.
En lugar de llamar al combinador, el acumulador se llama simultáneamente, utilizando el mismo BigDecimal[] a
para todos los subprocesos. El acceso a a
no es atómico, por lo que sale mal:
Thread1 -> retrieves value of a[0]: 3
Thread2 -> retrieves value of a[0]: 3
Thread1 -> adds own value: 3 + 3 = 6
Thread2 -> adds own value: 3 + 4 = 7
Thread1 -> writes 6 to a[0]
Thread2 -> writes 7 to a[0]
Hacer el valor de a[0]
7 cuando debería ser 10. El mismo tipo de cosa puede suceder con a[1]
, por lo que los resultados pueden ser inconsistentes.
Si elimina la característica CONCURRENT
, el combinador se utilizará en su lugar.