way read how files example big best scala csv streaming large-files

read - ¿Cómo leo un gran archivo CSV con la clase Scala Stream?



java stream file lines (3)

Espero que no te refieras a collection.immutable.Stream de Scala con Stream. Esto no es lo que quieres Stream es flojo, pero realiza memoizaciones.

No sé lo que planeas hacer, pero solo leer el archivo línea por línea debería funcionar muy bien sin usar grandes cantidades de memoria.

getLines debe evaluar de forma perezosa y no debe bloquearse (siempre que su archivo no tenga más de 2³² líneas, afaik). Si lo hace, pregunte en #scala o presente un ticket de error (o haga ambas).

¿Cómo leo un archivo CSV grande (> 1 Gb) con un Scala Stream? ¿Tienes un ejemplo de código? ¿O utilizarías una forma diferente de leer un gran archivo CSV sin cargarlo primero en la memoria?


Si está buscando procesar el archivo grande línea por línea y evita que el contenido del archivo completo se cargue en la memoria de una vez, puede usar el Iterator devuelto por scala.io.Source .

Tengo una pequeña función, tryProcessSource , (que contiene dos subfunciones) que uso exactamente para estos tipos de casos de uso. La función toma hasta cuatro parámetros, de los cuales solo el primero es requerido. Los otros parámetros tienen valores predeterminados correctos proporcionados.

Aquí está el perfil de la función (la implementación de la función completa se encuentra en la parte inferior):

def tryProcessSource( file: File, parseLine: (Int, String) => Option[List[String]] = (index, unparsedLine) => Some(List(unparsedLine)), filterLine: (Int, List[String]) => Option[Boolean] = (index, parsedValues) => Some(true), retainValues: (Int, List[String]) => Option[List[String]] = (index, parsedValues) => Some(parsedValues), ): Try[List[List[String]]] = { ??? }

El primer parámetro, file: File , es obligatorio. Y es solo cualquier instancia válida de java.io.File que apunta a un archivo de texto orientado a línea, como un archivo CSV.

El segundo parámetro, parseLine: (Int, String) => Option[List[String]] , es opcional. Y si se proporciona, debe ser una función que espera recibir dos parámetros de entrada; index: Int , unparsedLine: String . Y luego devuelve una Option[List[String]] . La función puede devolver una List[String] Some envuelta List[String] consta de los valores de columna válidos. O puede devolver un None que indica que todo el proceso de transmisión se cancela antes de tiempo. Si no se proporciona este parámetro, se proporciona un valor predeterminado de (index, line) => Some(List(line)) . Este valor predeterminado hace que la línea completa se devuelva como un único valor de String .

El tercer parámetro, filterLine: (Int, List[String]) => Option[Boolean] , es opcional. Y si se proporciona, debe ser una función que espera recibir dos parámetros de entrada; index: Int , parsedValues: List[String] . Y luego devuelve una Option[Boolean] . La función puede devolver un Boolean envuelto que indica si esta línea particular debe incluirse en la salida. O puede devolver un None que indica que todo el proceso de transmisión se cancela antes de tiempo. Si no se proporciona este parámetro, se proporciona un valor predeterminado de (index, values) => Some(true) . Este predeterminado da como resultado que todas las líneas estén incluidas.

El cuarto y último parámetro, retainValues: (Int, List[String]) => Option[List[String]] , es opcional. Y si se proporciona, debe ser una función que espera recibir dos parámetros de entrada; index: Int , parsedValues: List[String] . Y luego devuelve una Option[List[String]] . La función puede devolver una List[String] Some envuelta List[String] consiste en algún subconjunto y / o alteración de los valores de columna existentes. O puede devolver un None que indica que todo el proceso de transmisión se cancela antes de tiempo. Si no se proporciona este parámetro, se proporciona un valor predeterminado de (index, values) => Some(values) . Este valor predeterminado da como resultado los valores analizados por el segundo parámetro, parseLine .

Considere un archivo con los siguientes contenidos (4 líneas):

street,street2,city,state,zip 100 Main Str,,Irving,TX,75039 231 Park Ave,,Irving,TX,75039 1400 Beltline Rd,Apt 312,Dallas,Tx,75240

El siguiente perfil de llamada ...

val tryLinesDefaults = tryProcessSource(new File("path/to/file.csv"))

... da como resultado esta salida para tryLinesDefaults (el contenido inalterado del archivo):

Success( List( List("street,street2,city,state,zip"), List("100 Main Str,,Irving,TX,75039"), List("231 Park Ave,,Irving,TX,75039"), List("1400 Beltline Rd,Apt 312,Dallas,Tx,75240") ) )

El siguiente perfil de llamada ...

val tryLinesParseOnly = tryProcessSource( new File("path/to/file.csv") , parseLine = (index, unparsedLine) => Some(unparsedLine.split(",").toList) )

... da como resultado esta salida para tryLinesParseOnly (cada línea analizada en los valores de columna individuales):

Success( List( List("street","street2","city","state","zip"), List("100 Main Str","","Irving,TX","75039"), List("231 Park Ave","","Irving","TX","75039"), List("1400 Beltline Rd","Apt 312","Dallas","Tx","75240") ) )

El siguiente perfil de llamada ...

val tryLinesIrvingTxNoHeader = tryProcessSource( new File("C:/Users/Jim/Desktop/test.csv") , parseLine = (index, unparsedLine) => Some(unparsedLine.split(",").toList) , filterLine = (index, parsedValues) => Some( (index != 0) && //skip header line (parsedValues(2).toLowerCase == "Irving".toLowerCase) && //only Irving (parsedValues(3).toLowerCase == "Tx".toLowerCase) ) )

... da como resultado esta salida para tryLinesIrvingTxNoHeader (cada línea analizada en los valores de columna individuales, sin encabezado y solo las dos filas en Irving, Tx):

Success( List( List("100 Main Str","","Irving,TX","75039"), List("231 Park Ave","","Irving","TX","75039"), ) )

Aquí está toda la implementación de la función tryProcessSource :

import scala.io.Source import scala.util.Try import java.io.File def tryProcessSource( file: File, parseLine: (Int, String) => Option[List[String]] = (index, unparsedLine) => Some(List(unparsedLine)), filterLine: (Int, List[String]) => Option[Boolean] = (index, parsedValues) => Some(true), retainValues: (Int, List[String]) => Option[List[String]] = (index, parsedValues) => Some(parsedValues) ): Try[List[List[String]]] = { def usingSource[S <: Source, R](source: S)(transfer: S => R): Try[R] = try {Try(transfer(source))} finally {source.close()} def recursive( remaining: Iterator[(String, Int)], accumulator: List[List[String]], isEarlyAbort: Boolean = false ): List[List[String]] = { if (isEarlyAbort || !remaining.hasNext) accumulator else { val (line, index) = remaining.next parseLine(index, line) match { case Some(values) => filterLine(index, values) match { case Some(keep) => if (keep) retainValues(index, values) match { case Some(valuesNew) => recursive(remaining, valuesNew :: accumulator) //capture values case None => recursive(remaining, accumulator, isEarlyAbort = true) //early abort } else recursive(remaining, accumulator) //discard row case None => recursive(remaining, accumulator, isEarlyAbort = true) //early abort } case None => recursive(remaining, accumulator, isEarlyAbort = true) //early abort } } } Try(Source.fromFile(file)).flatMap( bufferedSource => usingSource(bufferedSource) { source => recursive(source.getLines().buffered.zipWithIndex, Nil).reverse } ) }

Si bien esta solución es relativamente breve, me tomó un tiempo considerable y muchos pases de refactorización antes de que finalmente pudiera llegar hasta aquí. Por favor, avíseme si ve alguna forma en que podría mejorarse.

ACTUALIZACIÓN: Acabo de preguntar el problema a continuación, ya que es una pregunta propia de . Y ahora tiene una respuesta que soluciona el error mencionado a continuación.

Tuve la idea de tratar de hacer que esto sea aún más genérico cambiando el parámetro retainValues a transformLine con la nueva definición de funciones genérico-ified a continuación. Sin embargo, sigo obteniendo el error de resaltado en IntelliJ "Expresión de tipo Algunos [Lista [Cadena]] no se ajusta a la opción de tipo esperado [A]" y no fue capaz de descubrir cómo cambiar el valor predeterminado para que el error se va.

def tryProcessSource2[A <: AnyRef]( file: File, parseLine: (Int, String) => Option[List[String]] = (index, unparsedLine) => Some(List(unparsedLine)), filterLine: (Int, List[String]) => Option[Boolean] = (index, parsedValues) => Some(true), transformLine: (Int, List[String]) => Option[A] = (index, parsedValues) => Some(parsedValues) ): Try[List[A]] = { ??? }

Cualquier ayuda sobre cómo hacer este trabajo sería muy apreciada.


Simplemente use Source.fromFile(...).getLines como ya indicó.

Eso devuelve un iterador, que ya es flojo (utilizaría la transmisión como una colección diferida en la que deseaba que los valores recuperados anteriormente se memorizaran, para que pueda leerlos de nuevo)

Si tiene problemas de memoria, entonces el problema radicará en lo que está haciendo después de getLines. Cualquier operación como toList , que obliga a una recopilación estricta, causará el problema.