txt read how files content java parallel-processing java-8 java-stream

read - txt to string java 8



¿Es esto un error en Files.lines(), o estoy malinterpretando algo sobre flujos paralelos? (5)

Entorno: Ubuntu x86_64 (14.10), Oracle JDK 1.8u25

Intento usar una secuencia paralela de Files.lines() pero quiero .skip() la primera línea (es un archivo CSV con un encabezado). Por lo tanto, trato de hacer esto:

try ( final Stream<String> stream = Files.lines(thePath, StandardCharsets.UTF_8) .skip(1L).parallel(); ) { // etc }

Pero entonces una columna no pudo analizar a un int ...

Así que probé un código simple. El archivo es pregunta es muy simple:

$ cat info.csv startDate;treeDepth;nrMatchers;nrLines;nrChars;nrCodePoints;nrNodes 1422758875023;34;54;151;4375;4375;27486 $

Y el código es igualmente simple:

public static void main(final String... args) { final Path path = Paths.get("/home/fge/tmp/dd/info.csv"); Files.lines(path, StandardCharsets.UTF_8).skip(1L).parallel() .forEach(System.out::println); }

Y sistemáticamente obtengo el siguiente resultado (OK, solo lo ejecuté alrededor de 20 veces):

startDate;treeDepth;nrMatchers;nrLines;nrChars;nrCodePoints;nrNodes

¿Que me estoy perdiendo aqui?

EDITAR Parece que el problema, o malentendido, está mucho más arraigado que eso (los dos ejemplos a continuación fueron preparados por un compañero en ## java de FreeNode):

public static void main(final String... args) { new BufferedReader(new StringReader("Hello/nWorld")).lines() .skip(1L).parallel() .forEach(System.out::println); final Iterator<String> iter = Arrays.asList("Hello", "World").iterator(); final Spliterator<String> spliterator = Spliterators.spliteratorUnknownSize(iter, Spliterator.ORDERED); final Stream<String> s = StreamSupport.stream(spliterator, true); s.skip(1L).forEach(System.out::println); }

Esto imprime:

Hello Hello

Uh

@Holger sugirió que esto suceda para cualquier transmisión que se ORDERED y no SIZED con esta otra muestra:

Stream.of("Hello", "World") .filter(x -> true) .parallel() .skip(1L) .forEach(System.out::println);

Además, se deriva de toda la discusión que ya tuvo lugar que el problema (si es que es uno) es con .forEach() (como señaló por primera vez @SotiriosDelimanolis ).


Dado que el estado actual del problema es bastante opuesto a las declaraciones anteriores hechas aquí, debe tenerse en cuenta que ahora hay una declaración explícita de Brian Goetz sobre la propagación hacia atrás de la característica desordenada después de una operación de skip que se considera un error . También se afirma que ahora se considera que no tiene propagación hacia atrás del orden de una operación de terminal.

También hay un informe de error relacionado, JDK-8129120, cuyo estado es "arreglado en Java 9" y está respaldado a Java 8, actualización 60

Hice algunas pruebas con jdk1.8.0_60 y parece que la implementación ahora muestra un comportamiento más intuitivo.


El problema es que está utilizando una transmisión paralela junto con forEach y espera que la acción de omisión se base en el orden correcto de los elementos, que no es el caso aquí. Extracto de la documentación de java.util.stream.Stream.forEach :

Para las tuberías de flujo paralelo, esta operación no garantiza respetar el orden de encuentro del flujo, ya que al hacerlo se sacrificaría el beneficio del paralelismo.

Supongo que básicamente lo que sucede es que la operación de omisión se realiza primero en la segunda línea, no en la primera. Si hace secuencia secuencial o utiliza java.util.stream.Stream.forEachOrdered , puede ver que produce el resultado esperado. Otro enfoque sería utilizar Collectors .


Permítanme citar algo relevante: el Javadoc de skip :

Si bien skip () es generalmente una operación barata en tuberías de flujo secuencial, puede ser bastante costoso en tuberías paralelas ordenadas, especialmente para valores grandes de n, ya que skip (n) está obligado a omitir no solo n elementos, sino el primer n elementos en el orden del encuentro.

Ahora, es bastante seguro que Files.lines() tiene un orden de encuentro bien definido y es una secuencia ORDERED (si no fuera así, no habría ninguna garantía, incluso en la operación secuencial, de que el orden de encuentro coincida con el orden del archivo), por lo tanto está garantizado que la secuencia resultante constará determinísticamente de solo la segunda línea en su ejemplo.

Ya sea que haya algo más en esto, la garantía definitivamente está ahí.


Tengo una idea de cómo solucionar este problema, que no puedo ver en las discusiones anteriores. Puede recrear la secuencia dividiendo la tubería en dos tuberías mientras mantiene todo perezoso.

public static <T> Stream<T> recreate(Stream<T> stream) { return StreamSupport.stream(stream.spliterator(), stream.isParallel()) .onClose(stream::close); } public static void main(String[] args) { recreate(new BufferedReader(new StringReader("JUNK/n1/n2/n3/n4/n5")).lines() .skip(1).parallel()).forEach(System.out::println); }

Cuando recrea la secuencia desde el spliterator de secuencia inicial, crea efectivamente una nueva tubería. En la mayoría de los casos, recreate funcionará como no-op , pero la cuestión es que las tuberías primera y segunda no comparten los estados parallel y unordered . Entonces, incluso si está utilizando forEach (o cualquier otra operación de terminal desordenada), solo la segunda secuencia queda desordenada.

Internamente, una cosa bastante similar es concatenar su flujo con un flujo vacío:

Stream.concat(Stream.empty(), new BufferedReader(new StringReader("JUNK/n1/n2/n3/n4/n5")) .lines().skip(1).parallel()).forEach(System.out::println);

Aunque tiene un poco más de gastos generales.


ESTA RESPUESTA ESTÁ ACTUALIZADA: ¡LEA ESTA EN SU LUGAR!

Para responder rápidamente a la pregunta: ¡ Se pretende el comportamiento observado! No hay ningún error y todo está sucediendo de acuerdo con la documentación. Pero digamos que este comportamiento debe documentarse y comunicarse mejor. Debería hacerse más obvio cómo forEach ignora el pedido.

Primero cubriré los conceptos que permiten el comportamiento observado. Esto proporciona los antecedentes para diseccionar uno de los ejemplos dados en la pregunta. Haré esto en un nivel alto y luego nuevamente en un nivel muy bajo .

[TL; DR: Leer por sí solo, la explicación de alto nivel dará una respuesta aproximada.]

Concepto

En lugar de hablar sobre Stream s, que es el tipo operado o devuelto por métodos relacionados con stream, hablemos de las operaciones de stream y las canalizaciones de stream . El método llama lines , skip y parallel son operaciones de flujo que construyen una tubería de flujo [1] y, como otros han señalado, esa tubería se procesa como un todo cuando la operación de terminal para forEach se llama [2].

Una tubería podría considerarse como una serie de operaciones que, una tras otra, se ejecutan en toda la secuencia (por ejemplo, filtrar todos los elementos, asignar elementos restantes a números, sumar todos los números). ¡Pero esto es engañoso! Una mejor metáfora es que la operación de terminal extrae elementos individuales a través de cada operación [3] (por ejemplo, obtener el siguiente elemento sin filtrar, asignarlo, agregarlo a la suma, solicitar el siguiente elemento). Es posible que algunas operaciones intermedias necesiten atravesar varios elementos (por ejemplo, skip ) o incluso todos (por ejemplo, sort ) antes de que puedan devolver el siguiente elemento solicitado y esta es una de las fuentes de estado en una operación.

Cada operación señala sus características con estos StreamOpFlag s:

  • DISTINCT
  • SORTED
  • ORDERED
  • SIZED
  • SHORT_CIRCUIT

Se combinan a través de la fuente de flujo, las operaciones intermedias y la operación del terminal y conforman las características de la tubería (como un todo), que luego se utilizan para optimizaciones [4]. Del mismo modo, si una tubería se ejecuta en paralelo o no es una propiedad de toda la tubería [5].

Por lo tanto, cada vez que realice suposiciones con respecto a estas características, debe observar cuidadosamente todas las operaciones que construyen la tubería, independientemente del orden en que se apliquen y qué garantías ofrecen. Al hacerlo, tenga en cuenta cómo la operación del terminal atrae a cada elemento individual a través de la tubería.

Ejemplo

Veamos este caso especial:

BufferedReader fooBarReader = new BufferedReader(new StringReader("Foo/nBar")); fooBarReader.lines() .skip(1L) .parallel() .forEach(System.out::println);

Nivel alto

Independientemente de si su fuente de transmisión está ordenada o no (lo está), al llamar a forEach (en lugar de forEachOrdered ) declara que el orden no le importa [6], lo que efectivamente reduce la skip de "omitir los primeros n elementos" a "omitir cualquier elemento n " [7] (porque sin orden el primero deja de tener sentido).

Entonces le das a la tubería el derecho de ignorar el orden si eso promete una aceleración. Para la ejecución paralela, aparentemente lo cree así, por lo que se obtiene la salida observada. Por lo tanto, lo que observa es el comportamiento previsto y no hay error.

¡Tenga en cuenta que esto no entra en conflicto con la skip estado! Como se describió anteriormente, ser con estado no implica que de alguna manera almacena en caché toda la secuencia (menos los elementos omitidos) y todo lo que sigue se ejecuta en estos elementos. Simplemente significa que la operación tiene algún estado, es decir, la cantidad de elementos omitidos (bueno, en realidad no es tan fácil, pero con mi comprensión limitada de lo que está sucediendo, diría que es una simplificación justa).

Nivel bajo

Miremos con más detalle:

  1. BufferedReader.lines crea el Stream , llamémoslo _lines :
  2. .skip crea un nuevo Stream , llamémoslo _skip :
    • llama a ReferencePipeline.skip
    • que construye una operación de "corte" (generalización de salto y límite) con SliceOps.makeRef
    • esto crea una instancia anónima de ReferencePipeline.StatefulOp , que hace referencia a _lines como su fuente
  3. .parallel establece la bandera paralela para toda la tubería como se describió anteriormente
  4. .forEach realmente comienza la ejecución

Entonces, veamos cómo se ejecuta la tubería:

  1. Llamar a _skip.forEach crea un ForEachOp (llamémoslo _forEach ) y lo entrega a _skip.evaluate , que hace dos cosas:
    1. llama a sourceSpliterator para crear un spliterator alrededor de la fuente para esta etapa de canalización:
    2. llama a _forEach.evaluateParallel que crea una ForEachTask (porque no está ordenada; llamémosla _forEachTask ) y la invoca
  2. En _forEachTask.compute la tarea se divide de las primeras 1024 líneas, crea una nueva tarea para ella (llamémosla _forEachTask2 ), se da cuenta de que no quedan líneas y finaliza.
  3. Dentro del grupo de unión de la bifurcación, se llama _forEachTask.compute , intenta en vano dividirse nuevamente y finalmente comienza a copiar sus elementos en el sumidero (un contenedor compatible con la transmisión alrededor del System.out.println ) llamando a _skip.copyInto .
  4. Esto esencialmente delega la tarea al spliterator especificado. ¡Esto es _sliceSpliterator que se creó anteriormente! Entonces _sliceSpliterator.forEachRemaining es responsable de entregar los elementos no omitidos al println-sink:
    1. obtiene un fragmento (en este caso todo) de las líneas en un búfer y las cuenta
    2. intenta solicitar tantos permisos (supongo que debido a la paralelización) a través de acquirePermits
    3. con dos elementos en la fuente y uno para omitir, solo hay un permiso que adquiere (en general digamos n )
    4. permite que el búfer ponga los primeros n elementos (en este caso, solo el primero) en el sumidero

Entonces UnorderedSliceSpliterator.OfRef.forEachRemaining es donde el pedido se ignora finalmente y de verdad. No comparé esto con la variante ordenada, pero esta es mi suposición por qué se hace de esta manera:

  • bajo la paralelización, los elementos del spliterator en el búfer pueden intercalar con otras tareas que hacen lo mismo
  • esto hará que el seguimiento de su pedido sea extremadamente difícil
  • hacer eso o evitar el intercalado degrada el rendimiento y no tiene sentido si el orden es irrelevante
  • si se pierde el pedido, hay poco más que hacer que procesar los primeros n elementos permitidos

¿Alguna pregunta? ;) Perdón por seguir por tanto tiempo. Tal vez debería dejar de lado los detalles y hacer una publicación en el blog ...

Fuentes

[1] java.util.stream - Operaciones de transmisión y canalizaciones :

Las operaciones de flujo se dividen en operaciones intermedias y terminales , y se combinan para formar tuberías de flujo .

[2] java.util.stream - Operaciones de transmisión y canalizaciones :

El recorrido de la fuente de la tubería no comienza hasta que se ejecuta la operación terminal de la tubería.

[3] Esta metáfora representa mi comprensión de las corrientes. La fuente principal, además del código, es esta cita de java.util.stream - Operaciones de java.util.stream y canalizaciones (destacando la mía):

El procesamiento de flujos de manera perezosa permite eficiencias significativas; en una canalización como el ejemplo de filtro-mapeo-suma anterior, el filtrado, el mapeo y la suma pueden fusionarse en una sola pasada en los datos, con un estado intermedio mínimo. La pereza también permite evitar examinar todos los datos cuando no es necesario; para operaciones como "buscar la primera cadena de más de 1000 caracteres", solo es necesario examinar las cadenas suficientes para encontrar una que tenga las características deseadas sin examinar todas las cadenas disponibles en la fuente.

[4] StreamOpFlag :

En cada etapa de la tubería, se pueden calcular un flujo combinado y banderas de operación [... ], jadda, jadda sobre cómo se combinan las banderas en las operaciones de origen, intermedias y terminales ... para producir la salida de banderas de la tubería. Esas banderas se pueden usar para aplicar optimizaciones.

En el código puede ver esto en AbstractPipeline.combinedFlags , que se establece durante la construcción (y en algunos otros casos) combinando el indicador de la operación anterior y la nueva.

[5] java.util.stream - Paralelismo (al que no puedo vincular directamente - desplazarse un poco hacia abajo):

Cuando se inicia la operación del terminal, la tubería de flujo se ejecuta secuencialmente o en paralelo, dependiendo de la orientación del flujo en el que se invoca.

En el código puede ver que esto está en AbstractPipeline.sequential , parallel e isParallel , que establece / verifica un indicador booleano en la fuente de la secuencia, lo que lo hace irrelevante cuando se llama a los establecedores mientras se construye una secuencia.

[6] java.util.stream.Stream.forEach :

Realiza una acción para cada elemento de esta secuencia. [...] El comportamiento de esta operación es explícitamente no determinista.

Contrasta esto con java.util.stream.Stream.forEachOrdered :

Realiza una acción para cada elemento de esta secuencia, en el orden de encuentro de la secuencia si la secuencia tiene un orden de encuentro definido.

[7] Esto tampoco está claramente documentado, pero mi interpretación de este comentario en .skip() (muy acortada por mí):

[...] skip () [...] puede ser bastante costoso en tuberías paralelas ordenadas [...] ya que skip (n) está obligado a omitir no solo n elementos, sino los primeros n elementos en el orden de encuentro . [...] [R] eliminar la restricción de pedido [...] puede resultar en aceleraciones significativas de skip () en tuberías paralelas