java java-8 batch-processing java-stream

Java 8 Stream con procesamiento por lotes



java-8 batch-processing (13)

Tengo un archivo grande que contiene una lista de elementos.

Me gustaría crear un lote de elementos, realizar una solicitud HTTP con este lote (todos los elementos son necesarios como parámetros en la solicitud HTTP). Puedo hacerlo muy fácilmente con un bucle for , pero como amante de Java 8, quiero intentar escribir esto con el framework Stream de Java 8 (y cosechar los beneficios del procesamiento diferido).

Ejemplo:

List<String> batch = new ArrayList<>(BATCH_SIZE); for (int i = 0; i < data.size(); i++) { batch.add(data.get(i)); if (batch.size() == BATCH_SIZE) process(batch); } if (batch.size() > 0) process(batch);

Quiero hacer algo largo la línea de lazyFileStream.group(500).map(processBatch).collect(toList())

Cuál sería la mejor forma de hacer esto?


¡Nota! Esta solución lee todo el archivo antes de ejecutar forEach.

Puede hacerlo con jOOλ , una biblioteca que extiende secuencias Java 8 para casos de uso secuenciales de subproceso único:

Seq.seq(lazyFileStream) // Seq<String> .zipWithIndex() // Seq<Tuple2<String, Long>> .groupBy(tuple -> tuple.v2 / 500) // Map<Long, List<String>> .forEach((index, batch) -> { process(batch); });

Detrás de escena, zipWithIndex() es solo:

static <T> Seq<Tuple2<T, Long>> zipWithIndex(Stream<T> stream) { final Iterator<T> it = stream.iterator(); class ZipWithIndex implements Iterator<Tuple2<T, Long>> { long index; @Override public boolean hasNext() { return it.hasNext(); } @Override public Tuple2<T, Long> next() { return tuple(it.next(), index++); } } return seq(new ZipWithIndex()); }

... mientras que groupBy() es la conveniencia API para:

default <K> Map<K, List<T>> groupBy(Function<? super T, ? extends K> classifier) { return collect(Collectors.groupingBy(classifier)); }

(Descargo de responsabilidad: trabajo para la empresa detrás de jOOλ)


Con Java 8 y com.google.common.collect.Lists , puede hacer algo como:

public class BatchProcessingUtil { public static <T,U> List<U> process(List<T> data, int batchSize, Function<List<T>, List<U>> processFunction) { List<List<T>> batches = Lists.partition(data, batchSize); return batches.stream() .map(processFunction) // Send each batch to the process function .flatMap(Collection::stream) // flat results to gather them in 1 stream .collect(Collectors.toList()); } }

Aquí T es el tipo de elementos en la lista de entrada y U el tipo de elementos en la lista de salida

Y puedes usarlo así:

List<String> userKeys = [... list of user keys] List<Users> users = BatchProcessingUtil.process( userKeys, 10, // Batch Size partialKeys -> service.getUsers(partialKeys) );


Ejemplo simple usando Spliterator

public static <T> Stream<List<T>> partition(Stream<T> stream, int batchSize){ List<List<T>> currentBatch = new ArrayList<List<T>>(); //just to make it mutable currentBatch.add(new ArrayList<T>(batchSize)); return Stream.concat(stream .sequential() .map(new Function<T, List<T>>(){ public List<T> apply(T t){ currentBatch.get(0).add(t); return currentBatch.get(0).size() == batchSize ? currentBatch.set(0,new ArrayList<>(batchSize)): null; } }), Stream.generate(()->currentBatch.get(0).isEmpty()?null:currentBatch.get(0)) .limit(1) ).filter(Objects::nonNull); }

La respuesta de Bruce es más completa, pero estaba buscando algo rápido y sucio para procesar un montón de archivos.


Escribí un Spliterator personalizado para escenarios como este. Llenará listas de un tamaño determinado de la secuencia de entrada. La ventaja de este enfoque es que realizará un procesamiento diferido y funcionará con otras funciones de flujo.

public static <T> Stream<List<T>> batches(Stream<T> stream, int batchSize) { return batchSize <= 0 ? Stream.of(stream.collect(Collectors.toList())) : StreamSupport.stream(new BatchSpliterator<>(stream.spliterator(), batchSize), stream.isParallel()); } private static class BatchSpliterator<E> implements Spliterator<List<E>> { private final Spliterator<E> base; private final int batchSize; public BatchSpliterator(Spliterator<E> base, int batchSize) { this.base = base; this.batchSize = batchSize; } @Override public boolean tryAdvance(Consumer<? super List<E>> action) { final List<E> batch = new ArrayList<>(batchSize); for (int i=0; i < batchSize && base.tryAdvance(batch::add); i++) ; if (batch.isEmpty()) return false; action.accept(batch); return true; } @Override public Spliterator<List<E>> trySplit() { if (base.estimateSize() <= batchSize) return null; final Spliterator<E> splitBase = this.base.trySplit(); return splitBase == null ? null : new BatchSpliterator<>(splitBase, batchSize); } @Override public long estimateSize() { final double baseSize = base.estimateSize(); return baseSize == 0 ? 0 : (long) Math.ceil(baseSize / (double) batchSize); } @Override public int characteristics() { return base.characteristics(); } }


Esta es una solución pura de Java que se evalúa perezosamente.

// read file into stream, try-with-resources try (Stream<String> stream = Files.lines(Paths.get(fileName))) { //skip header Spliterator<String> split = stream.skip(1).spliterator(); Chunker<String> chunker = new Chunker<String>(); while(true) { boolean more = split.tryAdvance(chunker::doSomething); if (!more) { break; } } } catch (IOException e) { e.printStackTrace(); } } static class Chunker<T> { int ct = 0; public void doSomething(T line) { System.out.println(ct++ + " " + line.toString()); if (ct % 100 == 0) { System.out.println("====================chunk====================="); } } }


La implementación pura de Java-8 también es posible:

int BATCH = 500; IntStream.range(0, (data.size()+BATCH-1)/BATCH) .mapToObj(i -> data.subList(i*BATCH, Math.min(data.size(), (i+1)*BATCH))) .forEach(batch -> process(batch));

Tenga en cuenta que, a diferencia de JOOl, puede funcionar bien en paralelo (siempre que sus data sean una lista de acceso aleatorio).


Para completar, aquí hay una solución de Guava .

Iterators.partition(stream.iterator(), batchSize).forEachRemaining(this::process);

En la pregunta, la colección está disponible, por lo que no se necesita una secuencia y se puede escribir como,

Iterables.partition(data, batchSize).forEach(this::process);


Puedes usar apache.commons:

ListUtils.partition(ListOfLines, 500).stream() .map(partition -> processBatch(partition) .collect(Collectors.toList());


También podría echar un vistazo a cyclops-react , soy el autor de esta biblioteca. Implementa la interfaz jOOλ (y, por extensión, JDK 8 Streams), pero a diferencia de JDK 8 Parallel Streams, se centra en las operaciones asincrónicas (como el bloqueo potencial de llamadas de E / S asíncronas). JDK Parallel Streams, por el contrario, se centra en el paralelismo de datos para operaciones vinculadas a la CPU. Funciona mediante la gestión de agregados de tareas basadas en el futuro bajo el capó, pero presenta una API Stream estándar extendida para los usuarios finales.

Este código de muestra puede ayudarlo a comenzar

LazyFutureStream.parallelCommonBuilder() .react(data) .grouped(BATCH_SIZE) .map(this::process) .run();

Hay un tutorial sobre el procesamiento por lotes aquí

Y un tutorial más general aquí

Para usar su propio grupo de subprocesos (que probablemente sea más apropiado para bloquear E / S), puede comenzar a procesar con

LazyReact reactor = new LazyReact(40); reactor.react(data) .grouped(BATCH_SIZE) .map(this::process) .run();


También puedes usar RxJava :

[A, B, C] [D, E, F]

o

@Test public void howScramblingCouldBeDone() { BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3) // the lambda in the map expression sucks a bit because Collections.shuffle acts on the list, rather than returning a shuffled one .map(list -> { Collections.shuffle(list); return list; }) .flatMap(List::stream) .forEach(System.out::println); }

o

A C B E D F


Tuvimos un problema similar que resolver. Queríamos tomar una secuencia que fuera más grande que la memoria del sistema (iterando a través de todos los objetos en una base de datos) y aleatorizar el orden lo mejor posible; pensamos que estaría bien almacenar 10,000 elementos y aleatorizarlos.

El objetivo era una función que tomaba una corriente.

De las soluciones propuestas aquí, parece haber una gama de opciones:

  • Utilice varias bibliotecas adicionales que no sean Java 8
  • Comience con algo que no sea una transmisión, por ejemplo, una lista de acceso aleatorio
  • Tener una secuencia que se pueda dividir fácilmente en un spliterator

Originalmente, nuestro instinto era usar un recopilador personalizado, pero esto significaba abandonar la transmisión. La solución de colector personalizada anterior es muy buena y casi la usamos.

Aquí hay una solución que engaña al usar el hecho de que Stream s puede darte un Iterator que puedes usar como una compuerta de escape para permitirte hacer algo extra que los streams no admiten. El Iterator se convierte de nuevo en una secuencia utilizando otro poco de brujería Java 8 StreamSupport .

Observable.from(data).buffer(BATCH_SIZE).forEach((batch) -> process(batch));

Un ejemplo simple de usar esto se vería así:

Observable.from(lazyFileStream).buffer(500).map((batch) -> process(batch)).toList();

Las impresiones anteriores

Observable.from(lazyFileStream).buffer(500).map(MyClass::process).toList();

Para nuestro caso de uso, queríamos mezclar los lotes y luego mantenerlos como una secuencia, se veía así:

/** * An iterator which returns batches of items taken from another iterator */ public class BatchingIterator<T> implements Iterator<List<T>> { /** * Given a stream, convert it to a stream of batches no greater than the * batchSize. * @param originalStream to convert * @param batchSize maximum size of a batch * @param <T> type of items in the stream * @return a stream of batches taken sequentially from the original stream */ public static <T> Stream<List<T>> batchedStreamOf(Stream<T> originalStream, int batchSize) { return asStream(new BatchingIterator<>(originalStream.iterator(), batchSize)); } private static <T> Stream<T> asStream(Iterator<T> iterator) { return StreamSupport.stream( Spliterators.spliteratorUnknownSize(iterator,ORDERED), false); } private int batchSize; private List<T> currentBatch; private Iterator<T> sourceIterator; public BatchingIterator(Iterator<T> sourceIterator, int batchSize) { this.batchSize = batchSize; this.sourceIterator = sourceIterator; } @Override public boolean hasNext() { prepareNextBatch(); return currentBatch!=null && !currentBatch.isEmpty(); } @Override public List<T> next() { return currentBatch; } private void prepareNextBatch() { currentBatch = new ArrayList<>(batchSize); while (sourceIterator.hasNext() && currentBatch.size() < batchSize) { currentBatch.add(sourceIterator.next()); } } }

Esto genera algo como (es aleatorio, muy diferente cada vez)

@Test public void getsBatches() { BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3) .forEach(System.out::println); }

La salsa secreta aquí es que siempre hay una secuencia, por lo que puede operar en una secuencia de lotes o hacer algo en cada lote y luego volver a flatMap a una secuencia. Aún mejor, todo lo anterior solo se ejecuta como el final para cada una de las forEach collect u otras. TIRE los datos a través de la secuencia.

¡Resulta que el iterator es un tipo especial de operación de terminación en un flujo y no hace que todo el flujo se ejecute y llegue a la memoria! ¡Gracias a los chicos de Java 8 por un diseño brillante!


Ejemplo puro de Java 8 que también funciona con flujos paralelos.

Cómo utilizar:

Stream<Integer> integerStream = IntStream.range(0, 45).parallel().boxed(); CsStreamUtil.processInBatch(integerStream, 10, batch -> System.out.println("Batch: " + batch));

La declaración e implementación del método:

public static <ElementType> void processInBatch(Stream<ElementType> stream, int batchSize, Consumer<Collection<ElementType>> batchProcessor) { List<ElementType> newBatch = new ArrayList<>(batchSize); stream.forEach(element -> { List<ElementType> fullBatch; synchronized (newBatch) { if (newBatch.size() < batchSize) { newBatch.add(element); return; } else { fullBatch = new ArrayList<>(newBatch); newBatch.clear(); newBatch.add(element); } } batchProcessor.accept(fullBatch); }); if (newBatch.size() > 0) batchProcessor.accept(new ArrayList<>(newBatch)); }


Solución pura de Java 8 :

Podemos crear un recopilador personalizado para hacer esto con elegancia, que toma un batch size y un Consumer para procesar cada lote:

import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Set; import java.util.function.*; import java.util.stream.Collector; import static java.util.Objects.requireNonNull; /** * Collects elements in the stream and calls the supplied batch processor * after the configured batch size is reached. * * In case of a parallel stream, the batch processor may be called with * elements less than the batch size. * * The elements are not kept in memory, and the final result will be an * empty list. * * @param <T> Type of the elements being collected */ class BatchCollector<T> implements Collector<T, List<T>, List<T>> { private final int batchSize; private final Consumer<List<T>> batchProcessor; /** * Constructs the batch collector * * @param batchSize the batch size after which the batchProcessor should be called * @param batchProcessor the batch processor which accepts batches of records to process */ BatchCollector(int batchSize, Consumer<List<T>> batchProcessor) { batchProcessor = requireNonNull(batchProcessor); this.batchSize = batchSize; this.batchProcessor = batchProcessor; } public Supplier<List<T>> supplier() { return ArrayList::new; } public BiConsumer<List<T>, T> accumulator() { return (ts, t) -> { ts.add(t); if (ts.size() >= batchSize) { batchProcessor.accept(ts); ts.clear(); } }; } public BinaryOperator<List<T>> combiner() { return (ts, ots) -> { // process each parallel list without checking for batch size // avoids adding all elements of one to another // can be modified if a strict batching mode is required batchProcessor.accept(ts); batchProcessor.accept(ots); return Collections.emptyList(); }; } public Function<List<T>, List<T>> finisher() { return ts -> { batchProcessor.accept(ts); return Collections.emptyList(); }; } public Set<Characteristics> characteristics() { return Collections.emptySet(); } }

Opcionalmente, cree una clase de utilidad auxiliar:

import java.util.List; import java.util.function.Consumer; import java.util.stream.Collector; public class StreamUtils { /** * Creates a new batch collector * @param batchSize the batch size after which the batchProcessor should be called * @param batchProcessor the batch processor which accepts batches of records to process * @param <T> the type of elements being processed * @return a batch collector instance */ public static <T> Collector<T, List<T>, List<T>> batchCollector(int batchSize, Consumer<List<T>> batchProcessor) { return new BatchCollector<T>(batchSize, batchProcessor); } }

Ejemplo de uso:

List<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); List<Integer> output = new ArrayList<>(); int batchSize = 3; Consumer<List<Integer>> batchProcessor = xs -> output.addAll(xs); input.stream() .collect(StreamUtils.batchCollector(batchSize, batchProcessor));

También publiqué mi código en GitHub, si alguien quiere echar un vistazo:

Enlace a Github