procesamiento - streams java ejemplo
¿Por qué la secuencia paralela Files.list() se está volviendo mucho más lenta que al usar Collection.parallelStream()? (3)
El siguiente fragmento de código es parte de un método que obtiene una lista de directorios, llama a un método de extracción en cada archivo y serializa el objeto de droga resultante a xml.
try(Stream<Path> paths = Files.list(infoDir)) {
paths
.parallel()
.map(this::extract)
.forEachOrdered(drug -> {
try {
marshaller.write(drug);
} catch (JAXBException ex) {
ex.printStackTrace();
}
});
}
Aquí está el mismo código que hace exactamente lo mismo pero que usa una .list()
simple a .list()
para obtener la lista del directorio y llamar a .parallelStream()
en la lista resultante.
Arrays.asList(infoDir.toFile().list())
.parallelStream()
.map(f -> infoDir.resolve(f))
.map(this::extract)
.forEachOrdered(drug -> {
try {
marshaller.write(drug);
} catch (JAXBException ex) {
ex.printStackTrace();
}
});
Mi máquina es una MacBook Pro de cuatro núcleos, Java v 1.8.0_60 (compilación 1.8.0_60-b27).
Estoy procesando ~ 7000 archivos. Los promedios de 3 carreras:
Primera versión: Con .parallel()
: 20 segundos. Sin .parallel()
: 41 segundos
Segunda versión: Con .parallelStream()
: 12 segundos. Con .stream()
: 41 segundos.
Esos 8 segundos en modo paralelo parecen una enorme diferencia, dado que el método de extract
que se lee de la secuencia y hace todo el trabajo pesado y la llamada de write
realiza las escrituras finales no se modifica.
Como alternativa, puede usar este separador personalizado especialmente diseñado para DirectoryStream
:
public class DirectorySpliterator implements Spliterator<Path> {
Iterator<Path> iterator;
long est;
private DirectorySpliterator(Iterator<Path> iterator, long est) {
this.iterator = iterator;
this.est = est;
}
@Override
public boolean tryAdvance(Consumer<? super Path> action) {
if (iterator == null) {
return false;
}
Path path;
try {
synchronized (iterator) {
if (!iterator.hasNext()) {
iterator = null;
return false;
}
path = iterator.next();
}
} catch (DirectoryIteratorException e) {
throw new UncheckedIOException(e.getCause());
}
action.accept(path);
return true;
}
@Override
public Spliterator<Path> trySplit() {
if (iterator == null || est == 1)
return null;
long e = this.est >>> 1;
this.est -= e;
return new DirectorySpliterator(iterator, e);
}
@Override
public long estimateSize() {
return est;
}
@Override
public int characteristics() {
return DISTINCT | NONNULL;
}
public static Stream<Path> list(Path parent) throws IOException {
DirectoryStream<Path> ds = Files.newDirectoryStream(parent);
int splitSize = Runtime.getRuntime().availableProcessors() * 8;
DirectorySpliterator spltr = new DirectorySpliterator(ds.iterator(), splitSize);
return StreamSupport.stream(spltr, false).onClose(() -> {
try {
ds.close();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
}
}
Simplemente reemplace Files.list
con DirectorySpliterator.list
y se Files.list
uniformemente sin ningún búfer intermedio. Aquí utilizamos el hecho de que DirectoryStream
produce una lista de directorios sin ningún orden específico, por lo que cada hilo paralelo solo tomará una entrada subsiguiente (de manera sincronizada, ya que ya tenemos operaciones de E / S síncronas, la sincronización adicional tiene una sobrecarga de casi nada). ). El orden paralelo será diferente cada vez (incluso si se usa forEachOrdered
), pero Files.list()
tampoco garantiza el pedido.
La única parte no trivial aquí es cuántas tareas paralelas se deben crear. Como no sabemos cuántos archivos hay en la carpeta hasta que la atravesamos, es bueno usar availableProcessors()
como base. Creo alrededor de 8 x availableProcessors()
tareas individuales 8 x availableProcessors()
, lo que parece ser un buen compromiso de grano fino / grano grueso: si el procesamiento por elemento es desigual, tener más tareas que los procesadores ayudaría a equilibrar la carga.
El problema es que la implementación actual de Stream API junto con la implementación actual de IteratorSpliterator
para una fuente de tamaño desconocido divide estas fuentes en tareas paralelas. Tuvo suerte de tener más de 1024 archivos, de lo contrario no tendría ningún beneficio de paralelización. La implementación actual de la API de Stream tiene en cuenta el Spliterator
estimateSize()
devuelto por Spliterator
. El IteratorSpliterator
de tamaño desconocido devuelve Long.MAX_VALUE
antes de la división y su sufijo siempre devuelve Long.MAX_VALUE
también. Su estrategia de división es la siguiente:
- Definir el tamaño de lote actual. La fórmula actual es comenzar con 1024 elementos y aumentar aritméticamente (2048, 3072, 4096, 5120, etc.) hasta que se
MAX_BATCH
tamaño deMAX_BATCH
(que es 33554432 elementos). - Consumir elementos de entrada (en su caso, Rutas) en la matriz hasta que se alcance el tamaño del lote o se agote la entrada.
- Devuelva un
ArraySpliterator
iterando sobre la matriz creada como prefijo, dejándose como sufijo.
Supongamos que tienes 7000 archivos. La API de Stream solicita el tamaño estimado, Long.MAX_VALUE
devuelve Long.MAX_VALUE
. Bien, la API de Stream le pide al IteratorSpliterator
que se divida, recopila 1024 elementos del DirectoryStream
subyacente a la matriz y se divide a ArraySpliterator
(con un tamaño estimado de 1024) y a sí mismo (con un tamaño estimado que aún es Long.MAX_VALUE
). Como Long.MAX_VALUE
es mucho más que 1024, Stream API decide continuar dividiendo la parte más grande sin siquiera intentar dividir la parte más pequeña. Así que el árbol que se divide en general es así:
IteratorSpliterator (est. MAX_VALUE elements)
| |
ArraySpliterator (est. 1024 elements) IteratorSpliterator (est. MAX_VALUE elements)
| |
/---------------/ |
| |
ArraySpliterator (est. 2048 elements) IteratorSpliterator (est. MAX_VALUE elements)
| |
/---------------/ |
| |
ArraySpliterator (est. 3072 elements) IteratorSpliterator (est. MAX_VALUE elements)
| |
/---------------/ |
| |
ArraySpliterator (est. 856 elements) IteratorSpliterator (est. MAX_VALUE elements)
|
(split returns null: refuses to split anymore)
Entonces, después de eso, tiene cinco tareas paralelas para ser ejecutadas: actualmente contienen 1024, 2048, 3072, 856 y 0 elementos. Tenga en cuenta que aunque el último fragmento tiene 0 elementos, todavía informa que tiene elementos Long.MAX_VALUE
, por lo que Stream API también lo enviará a ForkJoinPool
. Lo malo es que Stream API cree que una mayor división de las primeras cuatro tareas es inútil ya que su tamaño estimado es mucho menor. Entonces, lo que obtienes es una división muy desigual de la entrada que utiliza cuatro núcleos de CPU como máximo (incluso si tienes mucho más). Si su procesamiento por elemento lleva aproximadamente el mismo tiempo para cualquier elemento, entonces todo el proceso esperará a que se complete la mayor parte (3072 elementos). Por lo tanto, la velocidad máxima que puede tener es 7000/3072 = 2.28x. Por lo tanto, si el procesamiento secuencial toma 41 segundos, entonces la transmisión paralela tomará alrededor de 41 / 2.28 = 18 segundos (lo cual está cerca de sus números reales).
Su solución alternativa está completamente bien. Tenga en cuenta que al usar Files.list().parallel()
también tiene todos los elementos de Path
entrada almacenados en la memoria (en los objetos ArraySpliterator
). Por lo tanto, no perderá más memoria si los descarga manualmente en la List
. Las implementaciones de listas respaldadas por arreglos como ArrayList
(que actualmente es creada por Collectors.toList()
) se pueden dividir de manera uniforme sin ningún problema, lo que resulta en una aceleración adicional.
¿Por qué tal caso no está optimizado? Por supuesto, no es un problema imposible (aunque la implementación podría ser bastante complicada). Parece que no es un problema de alta prioridad para los desarrolladores de JDK. Hubo varias discusiones sobre este tema en las listas de correo. Puede leer el mensaje de Paul Sandoz here donde comenta sobre mi esfuerzo de optimización.
Otra alternativa a su solución es usar .collect(Collectors.toList()).parallelStream()
en su flujo como
try(Stream<Path> paths = Files.list(infoDir)) {
paths
.collect(Collectors.toList())
.parallelStream()
.map(this::extract)
.forEachOrdered(drug -> {
try {
marshaller.write(drug);
} catch (JAXBException ex) {
ex.printStackTrace();
}
});
}
Con esto no necesita llamar a .map(f -> infoDir.resolve(f))
y el rendimiento debe ser similar a su segunda solución.