read - txt to string java 8
¿Es esto un error en Files.lines(), o estoy malinterpretando algo sobre flujos paralelos? (5)
Entorno: Ubuntu x86_64 (14.10), Oracle JDK 1.8u25
Intento usar una secuencia paralela de
Files.lines()
pero quiero
.skip()
la primera línea (es un archivo CSV con un encabezado).
Por lo tanto, trato de hacer esto:
try (
final Stream<String> stream = Files.lines(thePath, StandardCharsets.UTF_8)
.skip(1L).parallel();
) {
// etc
}
Pero entonces una columna no pudo analizar a un int ...
Así que probé un código simple. El archivo es pregunta es muy simple:
$ cat info.csv
startDate;treeDepth;nrMatchers;nrLines;nrChars;nrCodePoints;nrNodes
1422758875023;34;54;151;4375;4375;27486
$
Y el código es igualmente simple:
public static void main(final String... args)
{
final Path path = Paths.get("/home/fge/tmp/dd/info.csv");
Files.lines(path, StandardCharsets.UTF_8).skip(1L).parallel()
.forEach(System.out::println);
}
Y sistemáticamente obtengo el siguiente resultado (OK, solo lo ejecuté alrededor de 20 veces):
startDate;treeDepth;nrMatchers;nrLines;nrChars;nrCodePoints;nrNodes
¿Que me estoy perdiendo aqui?
EDITAR Parece que el problema, o malentendido, está mucho más arraigado que eso (los dos ejemplos a continuación fueron preparados por un compañero en ## java de FreeNode):
public static void main(final String... args)
{
new BufferedReader(new StringReader("Hello/nWorld")).lines()
.skip(1L).parallel()
.forEach(System.out::println);
final Iterator<String> iter
= Arrays.asList("Hello", "World").iterator();
final Spliterator<String> spliterator
= Spliterators.spliteratorUnknownSize(iter, Spliterator.ORDERED);
final Stream<String> s
= StreamSupport.stream(spliterator, true);
s.skip(1L).forEach(System.out::println);
}
Esto imprime:
Hello
Hello
Uh
@Holger sugirió que esto suceda para cualquier transmisión que se
ORDERED
y no
SIZED
con esta otra muestra:
Stream.of("Hello", "World")
.filter(x -> true)
.parallel()
.skip(1L)
.forEach(System.out::println);
Además, se deriva de toda la discusión que ya tuvo lugar que el problema (si es que es uno) es con
.forEach()
(como
señaló por primera vez @SotiriosDelimanolis
).
Dado que el estado actual del problema es bastante opuesto a las declaraciones anteriores hechas aquí, debe tenerse en cuenta que ahora hay una
declaración explícita de Brian Goetz
sobre la propagación hacia atrás de la característica desordenada después de una operación de
skip
que se considera un error .
También se afirma
que ahora se considera que no tiene propagación hacia atrás del orden de una operación de terminal.
También hay un informe de error relacionado, JDK-8129120, cuyo estado es "arreglado en Java 9" y está respaldado a Java 8, actualización 60
Hice algunas pruebas con
jdk1.8.0_60
y parece que la implementación ahora muestra un comportamiento más intuitivo.
El problema es que está utilizando una transmisión paralela junto con forEach y espera que la acción de omisión se base en el orden correcto de los elementos, que no es el caso aquí. Extracto de la documentación de java.util.stream.Stream.forEach :
Para las tuberías de flujo paralelo, esta operación no garantiza respetar el orden de encuentro del flujo, ya que al hacerlo se sacrificaría el beneficio del paralelismo.
Supongo que básicamente lo que sucede es que la operación de omisión se realiza primero en la segunda línea, no en la primera. Si hace secuencia secuencial o utiliza java.util.stream.Stream.forEachOrdered , puede ver que produce el resultado esperado. Otro enfoque sería utilizar Collectors .
Permítanme citar algo relevante: el Javadoc de
skip
:
Si bien skip () es generalmente una operación barata en tuberías de flujo secuencial, puede ser bastante costoso en tuberías paralelas ordenadas, especialmente para valores grandes de n, ya que skip (n) está obligado a omitir no solo n elementos, sino el primer n elementos en el orden del encuentro.
Ahora, es bastante seguro que
Files.lines()
tiene
un orden de encuentro bien definido y es una secuencia
ORDERED
(si no fuera así, no habría ninguna garantía, incluso en la operación secuencial, de que el orden de encuentro coincida con el orden del archivo), por lo tanto está garantizado que la secuencia resultante constará determinísticamente de solo la segunda línea en su ejemplo.
Ya sea que haya algo más en esto, la garantía definitivamente está ahí.
Tengo una idea de cómo solucionar este problema, que no puedo ver en las discusiones anteriores. Puede recrear la secuencia dividiendo la tubería en dos tuberías mientras mantiene todo perezoso.
public static <T> Stream<T> recreate(Stream<T> stream) {
return StreamSupport.stream(stream.spliterator(), stream.isParallel())
.onClose(stream::close);
}
public static void main(String[] args) {
recreate(new BufferedReader(new StringReader("JUNK/n1/n2/n3/n4/n5")).lines()
.skip(1).parallel()).forEach(System.out::println);
}
Cuando recrea la secuencia desde el spliterator de secuencia inicial, crea efectivamente una nueva tubería.
En la mayoría de los casos,
recreate
funcionará como
no-op
, pero la cuestión es que las tuberías primera y segunda no comparten los estados
parallel
y
unordered
.
Entonces, incluso si está utilizando
forEach
(o cualquier otra operación de terminal desordenada), solo la segunda secuencia queda desordenada.
Internamente, una cosa bastante similar es concatenar su flujo con un flujo vacío:
Stream.concat(Stream.empty(),
new BufferedReader(new StringReader("JUNK/n1/n2/n3/n4/n5"))
.lines().skip(1).parallel()).forEach(System.out::println);
Aunque tiene un poco más de gastos generales.
ESTA RESPUESTA ESTÁ ACTUALIZADA: ¡LEA ESTA EN SU LUGAR!
Para responder rápidamente a la pregunta: ¡
Se pretende el comportamiento observado!
No hay ningún error y todo está sucediendo de acuerdo con la documentación.
Pero digamos que este comportamiento debe documentarse y comunicarse mejor.
Debería hacerse más obvio cómo
forEach
ignora el pedido.
Primero cubriré los conceptos que permiten el comportamiento observado. Esto proporciona los antecedentes para diseccionar uno de los ejemplos dados en la pregunta. Haré esto en un nivel alto y luego nuevamente en un nivel muy bajo .
[TL; DR: Leer por sí solo, la explicación de alto nivel dará una respuesta aproximada.]
Concepto
En lugar de hablar sobre
Stream
s, que es el tipo operado o devuelto por métodos relacionados con stream, hablemos de las
operaciones de
stream
y las
canalizaciones de stream
.
El método llama
lines
,
skip
y
parallel
son operaciones de flujo que construyen una tubería de flujo [1] y, como otros han señalado, esa tubería se procesa como un todo cuando la operación de terminal para
forEach
se llama [2].
Una tubería podría considerarse como una serie de operaciones que, una tras otra, se ejecutan en toda la secuencia (por ejemplo, filtrar todos los elementos, asignar elementos restantes a números, sumar todos los números).
¡Pero esto es engañoso!
Una mejor metáfora es que la operación de terminal extrae elementos individuales a través de cada operación [3] (por ejemplo, obtener el siguiente elemento sin filtrar, asignarlo, agregarlo a la suma, solicitar el siguiente elemento).
Es posible que algunas operaciones intermedias necesiten atravesar varios elementos (por ejemplo,
skip
) o incluso todos (por ejemplo,
sort
) antes de que puedan devolver el siguiente elemento solicitado y esta es una de las fuentes de estado en una operación.
Cada operación señala sus características con estos
StreamOpFlag
s:
-
DISTINCT
-
SORTED
-
ORDERED
-
SIZED
-
SHORT_CIRCUIT
Se combinan a través de la fuente de flujo, las operaciones intermedias y la operación del terminal y conforman las características de la tubería (como un todo), que luego se utilizan para optimizaciones [4]. Del mismo modo, si una tubería se ejecuta en paralelo o no es una propiedad de toda la tubería [5].
Por lo tanto, cada vez que realice suposiciones con respecto a estas características, debe observar cuidadosamente todas las operaciones que construyen la tubería, independientemente del orden en que se apliquen y qué garantías ofrecen. Al hacerlo, tenga en cuenta cómo la operación del terminal atrae a cada elemento individual a través de la tubería.
Ejemplo
Veamos este caso especial:
BufferedReader fooBarReader = new BufferedReader(new StringReader("Foo/nBar"));
fooBarReader.lines()
.skip(1L)
.parallel()
.forEach(System.out::println);
Nivel alto
Independientemente de si su fuente de transmisión está ordenada o no (lo está), al llamar a
forEach
(en lugar de
forEachOrdered
) declara que el
orden no le importa
[6], lo que efectivamente reduce la
skip
de "omitir los primeros
n
elementos" a "omitir cualquier elemento
n
" [7] (porque sin orden el primero deja de tener sentido).
Entonces le das a la tubería el derecho de ignorar el orden si eso promete una aceleración. Para la ejecución paralela, aparentemente lo cree así, por lo que se obtiene la salida observada. Por lo tanto, lo que observa es el comportamiento previsto y no hay error.
¡Tenga en cuenta que esto
no entra
en
conflicto
con la
skip
estado!
Como se describió anteriormente, ser con estado no implica que de alguna manera almacena en caché toda la secuencia (menos los elementos omitidos) y todo lo que sigue se ejecuta en estos elementos.
Simplemente significa que la operación tiene algún estado, es decir, la cantidad de elementos omitidos (bueno, en realidad no es
tan fácil,
pero con mi comprensión limitada de lo que está sucediendo, diría que es una simplificación justa).
Nivel bajo
Miremos con más detalle:
-
BufferedReader.lines
crea elStream
, llamémoslo_lines
:-
crea un
Spliterator
ordenado -
lo entrega a
StreamSupport.stream
, que crea unaReferencePipeline.Head
y transforma el indicador de spliterator en un indicador de operación de flujo
-
crea un
-
.skip
crea un nuevoStream
, llamémoslo_skip
:-
llama a
ReferencePipeline.skip
-
que construye una operación de "corte" (generalización de salto y límite) con
SliceOps.makeRef
-
esto crea una instancia anónima de
ReferencePipeline.StatefulOp
, que hace referencia a_lines
como su fuente
-
llama a
-
.parallel
establece la bandera paralela para toda la tubería como se describió anteriormente -
.forEach
realmente comienza la ejecución
Entonces, veamos cómo se ejecuta la tubería:
-
Llamar a
_skip.forEach
crea unForEachOp
(llamémoslo_forEach
) y lo entrega a_skip.evaluate
, que hace dos cosas:-
llama a
sourceSpliterator
para crear un spliterator alrededor de la fuente para esta etapa de canalización:-
llama a
opEvaluateParallelLazy
en sí mismo (como resulta) -
esto determina que la secuencia no está ordenada y
crea un
UnorderedSliceSpliterator
(llamémoslo_sliceSpliterator
) conskip = 1
y sin límite.
-
llama a
-
llama a
_forEach.evaluateParallel
que crea unaForEachTask
(porque no está ordenada; llamémosla_forEachTask
) y la invoca
-
llama a
-
En
_forEachTask.compute
la tarea se divide de las primeras 1024 líneas, crea una nueva tarea para ella (llamémosla_forEachTask2
), se da cuenta de que no quedan líneas y finaliza. -
Dentro del grupo de unión de la bifurcación, se llama
_forEachTask.compute
, intenta en vano dividirse nuevamente y
finalmente comienza a copiar sus elementos en el sumidero
(un contenedor compatible con la transmisión alrededor del
System.out.println
) llamando a_skip.copyInto
. -
Esto esencialmente delega la tarea al spliterator especificado.
¡Esto es
_sliceSpliterator
que se creó anteriormente! Entonces_sliceSpliterator.forEachRemaining
es responsable de entregar los elementos no omitidos al println-sink:- obtiene un fragmento (en este caso todo) de las líneas en un búfer y las cuenta
-
intenta solicitar tantos permisos (supongo que debido a la paralelización) a través de
acquirePermits
- con dos elementos en la fuente y uno para omitir, solo hay un permiso que adquiere (en general digamos n )
- permite que el búfer ponga los primeros n elementos (en este caso, solo el primero) en el sumidero
Entonces
UnorderedSliceSpliterator.OfRef.forEachRemaining
es donde el pedido se ignora finalmente y de verdad.
No comparé esto con la variante ordenada, pero esta es mi suposición por qué se hace de esta manera:
- bajo la paralelización, los elementos del spliterator en el búfer pueden intercalar con otras tareas que hacen lo mismo
- esto hará que el seguimiento de su pedido sea extremadamente difícil
- hacer eso o evitar el intercalado degrada el rendimiento y no tiene sentido si el orden es irrelevante
- si se pierde el pedido, hay poco más que hacer que procesar los primeros n elementos permitidos
¿Alguna pregunta? ;) Perdón por seguir por tanto tiempo. Tal vez debería dejar de lado los detalles y hacer una publicación en el blog ...
Fuentes
[1]
java.util.stream
- Operaciones de transmisión y canalizaciones
:
Las operaciones de flujo se dividen en operaciones intermedias y terminales , y se combinan para formar tuberías de flujo .
[2]
java.util.stream
- Operaciones de transmisión y canalizaciones
:
El recorrido de la fuente de la tubería no comienza hasta que se ejecuta la operación terminal de la tubería.
[3] Esta metáfora representa mi comprensión de las corrientes.
La fuente principal, además del código, es esta cita de
java.util.stream
- Operaciones
de
java.util.stream
y canalizaciones
(destacando la mía):
El procesamiento de flujos de manera perezosa permite eficiencias significativas; en una canalización como el ejemplo de filtro-mapeo-suma anterior, el filtrado, el mapeo y la suma pueden fusionarse en una sola pasada en los datos, con un estado intermedio mínimo. La pereza también permite evitar examinar todos los datos cuando no es necesario; para operaciones como "buscar la primera cadena de más de 1000 caracteres", solo es necesario examinar las cadenas suficientes para encontrar una que tenga las características deseadas sin examinar todas las cadenas disponibles en la fuente.
[4] StreamOpFlag :
En cada etapa de la tubería, se pueden calcular un flujo combinado y banderas de operación [... ], jadda, jadda sobre cómo se combinan las banderas en las operaciones de origen, intermedias y terminales ... para producir la salida de banderas de la tubería. Esas banderas se pueden usar para aplicar optimizaciones.
En el código puede ver esto en
AbstractPipeline.combinedFlags
, que se establece durante la construcción (y en algunos otros casos) combinando el indicador de la operación anterior y la nueva.
[5]
java.util.stream
- Paralelismo
(al que no puedo vincular directamente - desplazarse un poco hacia abajo):
Cuando se inicia la operación del terminal, la tubería de flujo se ejecuta secuencialmente o en paralelo, dependiendo de la orientación del flujo en el que se invoca.
En el código puede ver que esto está en
AbstractPipeline.sequential
,
parallel
e
isParallel
, que establece / verifica un indicador booleano en la fuente de la secuencia, lo que lo hace irrelevante cuando se llama a los establecedores mientras se construye una secuencia.
[6] java.util.stream.Stream.forEach :
Realiza una acción para cada elemento de esta secuencia. [...] El comportamiento de esta operación es explícitamente no determinista.
Contrasta esto con java.util.stream.Stream.forEachOrdered :
Realiza una acción para cada elemento de esta secuencia, en el orden de encuentro de la secuencia si la secuencia tiene un orden de encuentro definido.
[7] Esto tampoco está claramente documentado, pero mi interpretación de este comentario en .skip() (muy acortada por mí):
[...] skip () [...] puede ser bastante costoso en tuberías paralelas ordenadas [...] ya que skip (n) está obligado a omitir no solo n elementos, sino los primeros n elementos en el orden de encuentro . [...] [R] eliminar la restricción de pedido [...] puede resultar en aceleraciones significativas de skip () en tuberías paralelas