scala - spark - ¿Cómo convierto un archivo csv a rdd?
spark scala tutorial (12)
¿Qué tal esto?
val Delimeter = ","
val textFile = sc.textFile("data.csv").map(line => line.split(Delimeter))
Soy nuevo en chispa. Quiero realizar algunas operaciones sobre datos particulares en un registro CSV.
Intento leer un archivo CSV y convertirlo a RDD. Mis operaciones adicionales se basan en el encabezado provisto en el archivo CSV.
(De los comentarios) Este es mi código hasta ahora:
final JavaRDD<String> File = sc.textFile(Filename).cache();
final JavaRDD<String> lines = File.flatMap(new FlatMapFunction<String, String>() {
@Override public Iterable<String> call(String s) {
return Arrays.asList(EOL.split(s));
}
});
final String heading=lines.first().toString();
Puedo obtener los valores de encabezado como este. Quiero asignar esto a cada registro en archivo CSV.
final String[] header=heading.split(" ");
Puedo obtener los valores de encabezado como este. Quiero asignar esto a cada registro en archivo CSV.
En java estoy usando CSVReader record.getColumnValue(Column header)
para obtener el valor particular. Necesito hacer algo similar a eso aquí.
A partir de Spark 2.0, CSV se puede leer directamente en un DataFrame
.
Si el archivo de datos no tiene una fila de encabezado, entonces sería:
val df = spark.read.csv("file://path/to/data.csv")
Eso cargará los datos, pero dará a cada columna nombres genéricos como _c0
, _c1
, etc.
Si hay encabezados, al agregar .option("header", "true")
se usará la primera fila para definir las columnas en el DataFrame
:
val df = spark.read
.option("header", "true")
.csv("file://path/to/data.csv")
Para un ejemplo concreto, digamos que tiene un archivo con los contenidos:
user,topic,hits
om,scala,120
daniel,spark,80
3754978,spark,1
A continuación, obtendrá el total de visitas agrupadas por tema:
import org.apache.spark.sql.functions._
import spark.implicits._
val rawData = spark.read
.option("header", "true")
.csv("file://path/to/data.csv")
// specifies the query, but does not execute it
val grouped = rawData.groupBy($"topic").agg(sum($"hits))
// runs the query, pulling the data to the master node
// can fail if the amount of data is too much to fit
// into the master node''s memory!
val collected = grouped.collect
// runs the query, writing the result back out
// in this case, changing format to Parquet since that can
// be nicer to work with in Spark
grouped.write.parquet("hdfs://some/output/directory/")
// runs the query, writing the result back out
// in this case, in CSV format with a header and
// coalesced to a single file. This is easier for human
// consumption but usually much slower.
grouped.coalesce(1)
.write
.option("header", "true")
.csv("hdfs://some/output/directory/")
Aquí hay otro ejemplo que usa Spark / Scala para convertir un CSV a RDD . Para una descripción más detallada, vea esta publicación .
def main(args: Array[String]): Unit = {
val csv = sc.textFile("/path/to/your/file.csv")
// split / clean data
val headerAndRows = csv.map(line => line.split(",").map(_.trim))
// get header
val header = headerAndRows.first
// filter out header (eh. just check if the first val matches the first header name)
val data = headerAndRows.filter(_(0) != header(0))
// splits to map (header/value pairs)
val maps = data.map(splits => header.zip(splits).toMap)
// filter out the user "me"
val result = maps.filter(map => map("user") != "me")
// print result
result.foreach(println)
}
Creo que puedes intentar cargar ese csv en un RDD y luego crear un dataframe a partir de ese RDD, aquí está el documento de creación de dataframe desde rdd: http://spark.apache.org/docs/latest/sql-programming-guide.html#interoperating-with-rdds
En primer lugar, debo decir que es mucho más simple si coloca sus encabezados en archivos separados: esta es la convención en Big Data.
De todos modos, la respuesta de Daniel es bastante buena, pero tiene una ineficacia y un error, así que voy a publicar la mía. La ineficiencia es que no necesita verificar cada registro para ver si es el encabezado, solo necesita verificar el primer registro para cada partición. El error es que al usar .split(",")
puede obtener una excepción lanzada u obtener la columna incorrecta cuando las entradas son la cadena vacía y ocurren al inicio o al final del registro, para corregir que necesita usar .split(",", -1)
. Así que aquí está el código completo:
val header =
scala.io.Source.fromInputStream(
hadoop.fs.FileSystem.get(new java.net.URI(filename), sc.hadoopConfiguration)
.open(new hadoop.fs.Path(path)))
.getLines.head
val columnIndex = header.split(",").indexOf(columnName)
sc.textFile(path).mapPartitions(iterator => {
val head = iterator.next()
if (head == header) iterator else Iterator(head) ++ iterator
})
.map(_.split(",", -1)(columnIndex))
Puntos finales, considere Parquet si solo quiere pescar algunas columnas. O al menos considere la implementación de una función de división perezosamente evaluada si tiene filas anchas.
Otra alternativa es utilizar el método mapPartitionsWithIndex
ya que obtendrá el número de índice de la partición y una lista de todas las líneas dentro de esa partición. Partición 0 y línea 0 será el encabezado
val rows = sc.textFile(path)
.mapPartitionsWithIndex({ (index: Int, rows: Iterator[String]) =>
val results = new ArrayBuffer[(String, Int)]
var first = true
while (rows.hasNext) {
// check for first line
if (index == 0 && first) {
first = false
rows.next // skip the first row
} else {
results += rows.next
}
}
results.toIterator
}, true)
rows.flatMap { row => row.split(",") }
Para spark scala, normalmente uso cuando no puedo usar los paquetes de chispa csv ...
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val rawdata = sc.textFile("hdfs://example.host:8020/user/example/example.csv")
val header = rawdata.first()
val tbldata = rawdata.filter(_(0) != header(0))
Podemos usar el nuevo DataFrameRDD para leer y escribir los datos CSV. Hay algunas ventajas de DataFrameRDD sobre NormalRDD:
- DataFrameRDD es un poco más rápido que NormalRDD ya que determinamos el esquema y ayuda a optimizar mucho en el tiempo de ejecución y nos proporciona una ganancia de rendimiento significativa.
- Incluso si la columna se desplaza en CSV automáticamente tomará la columna correcta ya que no estamos codificando el número de columna que estaba presente al leer los datos como archivo de texto y luego dividirlos y luego usar el número de columna para obtener los datos.
- En pocas líneas de código, puede leer el archivo CSV directamente.
Se le solicitará que tenga esta biblioteca: agréguelo en build.sbt
libraryDependencies += "com.databricks" % "spark-csv_2.10" % "1.2.0"
Código de Spark Scala para ello:
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val csvInPath = "/path/to/csv/abc.csv"
val df = sqlContext.read.format("com.databricks.spark.csv").option("header","true").load(csvInPath)
//format is for specifying the type of file you are reading
//header = true indicates that the first line is header in it
Para convertir a RDD normal tomando algunas de sus columnas y
val rddData = df.map(x=>Row(x.getAs("colA")))
//Do other RDD operation on it
Guardar el RDD al formato CSV:
val aDf = sqlContext.createDataFrame(rddData,StructType(Array(StructField("colANew",StringType,true))))
aDF.write.format("com.databricks.spark.csv").option("header","true").save("/csvOutPath/aCSVOp")
Como el encabezado se establece en verdadero, obtendremos el nombre del encabezado en todos los archivos de salida.
Puede usar la biblioteca spark-csv: https://github.com/databricks/spark-csv
Esto es directamente de la documentación:
import org.apache.spark.sql.SQLContext
SQLContext sqlContext = new SQLContext(sc);
HashMap<String, String> options = new HashMap<String, String>();
options.put("header", "true");
options.put("path", "cars.csv");
DataFrame df = sqlContext.load("com.databricks.spark.csv", options);
Recomiendo leer el encabezado directamente desde el controlador, no a través de Spark. Dos razones para esto: 1) Es una sola línea. No hay ventaja para un enfoque distribuido. 2) Necesitamos esta línea en el controlador, no en los nodos de trabajador.
Es algo parecido a esto:
// Ridiculous amount of code to read one line.
val uri = new java.net.URI(filename)
val conf = sc.hadoopConfiguration
val fs = hadoop.fs.FileSystem.get(uri, conf)
val path = new hadoop.fs.Path(filename)
val stream = fs.open(path)
val source = scala.io.Source.fromInputStream(stream)
val header = source.getLines.head
Ahora cuando haces el RDD puedes descartar el encabezado.
val csvRDD = sc.textFile(filename).filter(_ != header)
Entonces podemos hacer un RDD desde una columna, por ejemplo:
val idx = header.split(",").indexOf(columnName)
val columnRDD = csvRDD.map(_.split(",")(idx))
Sugeriría que lo intentaras
https://spark.apache.org/docs/latest/sql-programming-guide.html#rdds
JavaRDD<Person> people = sc.textFile("examples/src/main/resources/people.txt").map(
new Function<String, Person>() {
public Person call(String line) throws Exception {
String[] parts = line.split(",");
Person person = new Person();
person.setName(parts[0]);
person.setAge(Integer.parseInt(parts[1].trim()));
return person;
}
});
Tienes que tener una clase en esta persona de ejemplo con la especificación de tu encabezado de archivo y asociar tus datos al esquema y aplicar criterios como en mysql ... para obtener el resultado deseado
Un enfoque simplista sería tener una forma de preservar el encabezado.
Digamos que tienes un file.csv como:
user, topic, hits
om, scala, 120
daniel, spark, 80
3754978, spark, 1
Podemos definir una clase de encabezado que usa una versión analizada de la primera fila:
class SimpleCSVHeader(header:Array[String]) extends Serializable {
val index = header.zipWithIndex.toMap
def apply(array:Array[String], key:String):String = array(index(key))
}
Que podemos usar ese encabezado para abordar los datos más adelante:
val csv = sc.textFile("file.csv") // original file
val data = csv.map(line => line.split(",").map(elem => elem.trim)) //lines in rows
val header = new SimpleCSVHeader(data.take(1)(0)) // we build our header with the first line
val rows = data.filter(line => header(line,"user") != "user") // filter the header out
val users = rows.map(row => header(row,"user")
val usersByHits = rows.map(row => header(row,"user") -> header(row,"hits").toInt)
...
Tenga en cuenta que el header
no es mucho más que un simple mapa de una mnemotécnica para el índice de matriz. Casi todo esto podría hacerse en el lugar ordinal del elemento en la matriz, como user = row(0)
PD: Bienvenido a Scala :-)