parallel-processing - parallel - stream java 8 ejemplo
¿Hay una buena manera de extraer trozos de datos de un flujo de Java 8? (5)
Aquí está la solución de My Library: AbacusUtil :
stream.split(batchSize).parallel(threadNum).map(yourBatchProcessFunction);
En un proceso ETL, estoy recuperando muchas entidades de un repositorio de datos de Spring. Luego estoy usando un flujo paralelo para asignar las entidades a diferentes. Puedo usar un consumidor para almacenar esas nuevas entidades en otro repositorio, una por una, o recopilarlas en una Lista y almacenarlas en una sola operación masiva. El primero es costoso, mientras que el último puede exceder la memoria disponible.
¿Hay una buena manera de recopilar una cierta cantidad de elementos en la secuencia (como lo hace el límite), consumir esa porción y continuar en paralelo hasta que se procesen todos los elementos?
Es posible que pueda escribir su propio Collector
que acumule entidades y luego realice actualizaciones masivas.
El método Collector.accumulator()
puede agregar las entidades a un caché temporal interno hasta que el caché crezca demasiado. Cuando la memoria caché es lo suficientemente grande, puede hacer un almacenamiento masivo en su otro repositorio.
Collector.merge()
necesita combinar los cachés del recopilador de 2 subprocesos en un solo caché (y posiblemente fusionar)
Finalmente, se llama al método Collector.finisher()
cuando se realiza la transmisión, por lo que aquí también se almacena todo lo que queda en el caché.
Dado que ya está utilizando una transmisión paralela y parece que está bien hacer varias cargas al mismo tiempo, supongo que ya se ha manejado la seguridad de subprocesos.
ACTUALIZAR
Mi comentario con respecto a la seguridad de los subprocesos y las secuencias paralelas se refería al almacenamiento / almacenamiento real en el repositorio, no a la concurrencia en su colección temporal.
Cada colector debería (creo) ejecutarse en su propio hilo. Una secuencia paralela debe crear varias instancias de recopilador llamando al supplier()
varias veces. Por lo tanto, puede tratar una instancia de colector como un solo hilo y debería funcionar bien.
Por ejemplo, en el Javadoc para java.util.IntSummaryStatistics
dice:
Esta implementación no es segura para subprocesos. Sin embargo, es seguro usar Collectors.toIntStatistics () en un flujo paralelo, porque la implementación paralela de Stream.collect () proporciona la partición, el aislamiento y la fusión de resultados necesarios para una ejecución paralela segura y eficiente.
Mi enfoque para las operaciones masivas con fragmentación es usar un envoltorio divisorador de partición, y otra envoltura que anula la política de división predeterminada (progresión aritmética de tamaños de lote en incrementos de 1024) a la división simple de lote fijo. Úsalo así:
Stream<OriginalType> existingStream = ...;
Stream<List<OriginalType>> partitioned = partition(existingStream, 100, 1);
partitioned.forEach(chunk -> ... process the chunk ...);
Aquí está el código completo:
import java.util.ArrayList;
import java.util.List;
import java.util.Spliterator;
import java.util.Spliterators.AbstractSpliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class PartitioningSpliterator<E> extends AbstractSpliterator<List<E>>
{
private final Spliterator<E> spliterator;
private final int partitionSize;
public PartitioningSpliterator(Spliterator<E> toWrap, int partitionSize) {
super(toWrap.estimateSize(), toWrap.characteristics() | Spliterator.NONNULL);
if (partitionSize <= 0) throw new IllegalArgumentException(
"Partition size must be positive, but was " + partitionSize);
this.spliterator = toWrap;
this.partitionSize = partitionSize;
}
public static <E> Stream<List<E>> partition(Stream<E> in, int size) {
return StreamSupport.stream(new PartitioningSpliterator(in.spliterator(), size), false);
}
public static <E> Stream<List<E>> partition(Stream<E> in, int size, int batchSize) {
return StreamSupport.stream(
new FixedBatchSpliterator<>(new PartitioningSpliterator<>(in.spliterator(), size), batchSize), false);
}
@Override public boolean tryAdvance(Consumer<? super List<E>> action) {
final ArrayList<E> partition = new ArrayList<>(partitionSize);
while (spliterator.tryAdvance(partition::add)
&& partition.size() < partitionSize);
if (partition.isEmpty()) return false;
action.accept(partition);
return true;
}
@Override public long estimateSize() {
final long est = spliterator.estimateSize();
return est == Long.MAX_VALUE? est
: est / partitionSize + (est % partitionSize > 0? 1 : 0);
}
}
import static java.util.Spliterators.spliterator;
import java.util.Comparator;
import java.util.Spliterator;
import java.util.function.Consumer;
public abstract class FixedBatchSpliteratorBase<T> implements Spliterator<T> {
private final int batchSize;
private final int characteristics;
private long est;
public FixedBatchSpliteratorBase(int characteristics, int batchSize, long est) {
characteristics |= ORDERED;
if ((characteristics & SIZED) != 0) characteristics |= SUBSIZED;
this.characteristics = characteristics;
this.batchSize = batchSize;
this.est = est;
}
public FixedBatchSpliteratorBase(int characteristics, int batchSize) {
this(characteristics, batchSize, Long.MAX_VALUE);
}
public FixedBatchSpliteratorBase(int characteristics) {
this(characteristics, 64, Long.MAX_VALUE);
}
@Override public Spliterator<T> trySplit() {
final HoldingConsumer<T> holder = new HoldingConsumer<>();
if (!tryAdvance(holder)) return null;
final Object[] a = new Object[batchSize];
int j = 0;
do a[j] = holder.value; while (++j < batchSize && tryAdvance(holder));
if (est != Long.MAX_VALUE) est -= j;
return spliterator(a, 0, j, characteristics());
}
@Override public Comparator<? super T> getComparator() {
if (hasCharacteristics(SORTED)) return null;
throw new IllegalStateException();
}
@Override public long estimateSize() { return est; }
@Override public int characteristics() { return characteristics; }
static final class HoldingConsumer<T> implements Consumer<T> {
Object value;
@Override public void accept(T value) { this.value = value; }
}
}
import static java.util.stream.StreamSupport.stream;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;
public class FixedBatchSpliterator<T> extends FixedBatchSpliteratorBase<T> {
private final Spliterator<T> spliterator;
public FixedBatchSpliterator(Spliterator<T> toWrap, int batchSize, long est) {
super(toWrap.characteristics(), batchSize, est);
this.spliterator = toWrap;
}
public FixedBatchSpliterator(Spliterator<T> toWrap, int batchSize) {
this(toWrap, batchSize, toWrap.estimateSize());
}
public FixedBatchSpliterator(Spliterator<T> toWrap) {
this(toWrap, 64, toWrap.estimateSize());
}
public static <T> Stream<T> withBatchSize(Stream<T> in, int batchSize) {
return stream(new FixedBatchSpliterator<>(in.spliterator(), batchSize), true);
}
public static <T> FixedBatchSpliterator<T> batchedSpliterator(Spliterator<T> toWrap, int batchSize) {
return new FixedBatchSpliterator<>(toWrap, batchSize);
}
@Override public boolean tryAdvance(Consumer<? super T> action) {
return spliterator.tryAdvance(action);
}
@Override public void forEachRemaining(Consumer<? super T> action) {
spliterator.forEachRemaining(action);
}
}
Puedes usar un colector personalizado para hacer esto con elegancia.
Por favor vea mi respuesta a una pregunta similar aquí:
Colector de procesamiento por lotes personalizado
Luego, puede simplemente procesar por lotes el flujo en paralelo utilizando el recopilador anterior para almacenar los registros en su repositorio, ejemplo de uso:
List<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
int batchSize = 3;
Consumer<List<Integer>> batchProcessor = xs -> repository.save(xs);
input.parallelStream()
.map(i -> i + 1)
.collect(StreamUtils.batchCollector(batchSize, batchProcessor));
@Test
public void streamTest(){
Stream<Integer> data = Stream.generate(() -> {
//Block on IO
return blockOnIO();
});
AtomicInteger countDown = new AtomicInteger(1000);
final ArrayList[] buffer = new ArrayList[]{new ArrayList<Integer>()};
Object syncO = new Object();
data.parallel().unordered().map(i -> i * 1000).forEach(i->{
System.out.println(String.format("FE %s %d",Thread.currentThread().getName(), buffer[0].size()));
int c;
ArrayList<Integer> export=null;
synchronized (syncO) {
c = countDown.addAndGet(-1);
buffer[0].add(i);
if (c == 0) {
export=buffer[0];
buffer[0] = new ArrayList<Integer>();
countDown.set(1000);
}
}
if(export !=null){
sendBatch(export);
}
});
//export any remaining
sendBatch(buffer[0]);
}
Integer blockOnIO(){
try {
Thread.sleep(50);
return Integer.valueOf((int)Math.random()*1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
void sendBatch(ArrayList al){
assert al.size() == 1000;
System.out.println(String.format("LOAD %s %d",Thread.currentThread().getName(), al.size()));
}
Es posible que esto sea algo anticuado, pero debería lograr el procesamiento por lotes con un mínimo de bloqueo.
Producirá salida como
FE ForkJoinPool.commonPool-worker-2 996
FE ForkJoinPool.commonPool-worker-5 996
FE ForkJoinPool.commonPool-worker-4 998
FE ForkJoinPool.commonPool-worker-3 999
LOAD ForkJoinPool.commonPool-worker-3 1000
FE ForkJoinPool.commonPool-worker-6 0
FE ForkJoinPool.commonPool-worker-1 2
FE ForkJoinPool.commonPool-worker-7 2
FE ForkJoinPool.commonPool-worker-2 4