scala - Cómo convertir objetos rdd a dataframe en spark
apache-spark spark-dataframe (10)
¿Cómo puedo convertir un RDD (
org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
) en un Dataframe
org.apache.spark.sql.DataFrame
?
Convertí un marco de datos a rdd usando
.rdd
.
Después de procesarlo, lo quiero de vuelta en el marco de datos.
Cómo puedo hacer esto ?
Aquí hay un ejemplo simple de convertir su Lista en Spark RDD y luego convertir ese Spark RDD en Dataframe.
Tenga en cuenta que he usado la REPL scala de Spark-shell para ejecutar el siguiente código, Aquí sc es una instancia de SparkContext que está implícitamente disponible en Spark-shell. Espero que responda tu pregunta.
scala> val numList = List(1,2,3,4,5)
numList: List[Int] = List(1, 2, 3, 4, 5)
scala> val numRDD = sc.parallelize(numList)
numRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[80] at parallelize at <console>:28
scala> val numDF = numRDD.toDF
numDF: org.apache.spark.sql.DataFrame = [_1: int]
scala> numDF.show
+---+
| _1|
+---+
| 1|
| 2|
| 3|
| 4|
| 5|
+---+
En versiones más nuevas de spark (2.0+)
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
val spark = SparkSession
.builder()
.getOrCreate()
import spark.implicits._
val dfSchema = Seq("col1", "col2", "col3")
rdd.toDF(dfSchema: _*)
Este código funciona perfectamente desde Spark 2.x con Scala 2.11
Importar clases necesarias
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
Crear objeto
SparkSession
, aquí está la
spark
val spark: SparkSession = SparkSession.builder.master("local").getOrCreate
val sc = spark.sparkContext // Just used to create test RDDs
Hagamos un
RDD
para que sea
DataFrame
val rdd = sc.parallelize(
Seq(
("first", Array(2.0, 1.0, 2.1, 5.4)),
("test", Array(1.5, 0.5, 0.9, 3.7)),
("choose", Array(8.0, 2.9, 9.1, 2.5))
)
)
Método 1
Usando
SparkSession.createDataFrame(RDD obj)
.
val dfWithoutSchema = spark.createDataFrame(rdd)
dfWithoutSchema.show()
+------+--------------------+
| _1| _2|
+------+--------------------+
| first|[2.0, 1.0, 2.1, 5.4]|
| test|[1.5, 0.5, 0.9, 3.7]|
|choose|[8.0, 2.9, 9.1, 2.5]|
+------+--------------------+
Método 2
Usando
SparkSession.createDataFrame(RDD obj)
y especificando nombres de columna.
val dfWithSchema = spark.createDataFrame(rdd).toDF("id", "vals")
dfWithSchema.show()
+------+--------------------+
| id| vals|
+------+--------------------+
| first|[2.0, 1.0, 2.1, 5.4]|
| test|[1.5, 0.5, 0.9, 3.7]|
|choose|[8.0, 2.9, 9.1, 2.5]|
+------+--------------------+
Método 3 (respuesta real a la pregunta)
De esta manera, la entrada
rdd
debe ser del tipo
RDD[Row]
.
val rowsRdd: RDD[Row] = sc.parallelize(
Seq(
Row("first", 2.0, 7.0),
Row("second", 3.5, 2.5),
Row("third", 7.0, 5.9)
)
)
crear el esquema
val schema = new StructType()
.add(StructField("id", StringType, true))
.add(StructField("val1", DoubleType, true))
.add(StructField("val2", DoubleType, true))
Ahora aplique ambas
rowsRdd
y
schema
para
createDataFrame()
val df = spark.createDataFrame(rowsRdd, schema)
df.show()
+------+----+----+
| id|val1|val2|
+------+----+----+
| first| 2.0| 7.0|
|second| 3.5| 2.5|
| third| 7.0| 5.9|
+------+----+----+
Método 1: (Scala)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val df_2 = sc.parallelize(Seq((1L, 3.0, "a"), (2L, -1.0, "b"), (3L, 0.0, "c"))).toDF("x", "y", "z")
Método 2: (Scala)
case class temp(val1: String,val3 : Double)
val rdd = sc.parallelize(Seq(
Row("foo", 0.5), Row("bar", 0.0)
))
val rows = rdd.map({case Row(val1:String,val3:Double) => temp(val1,val3)}).toDF()
rows.show()
Método 1: (Python)
from pyspark.sql import Row
l = [(''Alice'',2)]
Person = Row(''name'',''age'')
rdd = sc.parallelize(l)
person = rdd.map(lambda r:Person(*r))
df2 = sqlContext.createDataFrame(person)
df2.show()
Método 2: (Python)
from pyspark.sql.types import *
l = [(''Alice'',2)]
rdd = sc.parallelize(l)
schema = StructType([StructField ("name" , StringType(), True) ,
StructField("age" , IntegerType(), True)])
df3 = sqlContext.createDataFrame(rdd, schema)
df3.show()
Extrajo el valor del objeto de fila y luego aplicó la clase de caso para convertir rdd a DF
val temp1 = attrib1.map{case Row ( key: Int ) => s"$key" }
val temp2 = attrib2.map{case Row ( key: Int) => s"$key" }
case class RLT (id: String, attrib_1 : String, attrib_2 : String)
import hiveContext.implicits._
val df = result.map{ s => RLT(s(0),s(1),s(2)) }.toDF
Para convertir una Array [Row] a DataFrame o Dataset, lo siguiente funciona de manera elegante:
Digamos que el esquema es el StructType para la fila, luego
val rows: Array[Row]=...
implicit val encoder = RowEncoder.apply(schema)
import spark.implicits._
rows.toDS
Suponga que tiene un
DataFrame
y desea realizar alguna modificación en los datos de los campos convirtiéndolo en
RDD[Row]
.
val aRdd = aDF.map(x=>Row(x.getAs[Long]("id"),x.getAs[List[String]]("role").head))
Para volver a convertir a
DataFrame
desde
RDD
, necesitamos definir el
tipo
de
estructura
del
RDD
.
Si el tipo de datos era
Long
, se convertirá en
LongType
en estructura.
Si
String
entonces
StringType
en estructura.
val aStruct = new StructType(Array(StructField("id",LongType,nullable = true),StructField("role",StringType,nullable = true)))
Ahora puede convertir el RDD a DataFrame utilizando el método createDataFrame .
val aNamedDF = sqlContext.createDataFrame(aRdd,aStruct)
Suponiendo que su RDD [fila] se llama rdd, puede usar:
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
rdd.toDF()
SqlContext
tiene varios métodos
createDataFrame
que crean un
DataFrame
dado un
RDD
.
Me imagino que uno de estos funcionará para su contexto.
Por ejemplo:
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
Crea un DataFrame a partir de un RDD que contiene filas usando el esquema dado.
Nota: esta respuesta se publicó originalmente here
Estoy publicando esta respuesta porque me gustaría compartir detalles adicionales sobre las opciones disponibles que no encontré en las otras respuestas
Para crear un DataFrame a partir de un RDD de filas, hay dos opciones principales:
1)
Como ya se señaló, puede usar
toDF()
que puede importarse
import sqlContext.implicits._
.
Sin embargo, este enfoque solo funciona para los siguientes tipos de RDD:
-
RDD[Int]
-
RDD[Long]
-
RDD[String]
-
RDD[T <: scala.Product]
(fuente:
Scaladoc
del objeto
SQLContext.implicits
)
La última firma en realidad significa que puede funcionar para un RDD de tuplas o un RDD de clases de casos (porque las tuplas y las clases de casos son subclases de
scala.Product
).
Entonces, para usar este enfoque para un
RDD[Row]
, debe
RDD[T <: scala.Product]
a un
RDD[T <: scala.Product]
.
Esto se puede hacer asignando cada fila a una clase de caso personalizada o a una tupla, como en los siguientes fragmentos de código:
val df = rdd.map({
case Row(val1: String, ..., valN: Long) => (val1, ..., valN)
}).toDF("col1_name", ..., "colN_name")
o
case class MyClass(val1: String, ..., valN: Long = 0L)
val df = rdd.map({
case Row(val1: String, ..., valN: Long) => MyClass(val1, ..., valN)
}).toDF("col1_name", ..., "colN_name")
El principal inconveniente de este enfoque (en mi opinión) es que debe establecer explícitamente el esquema del DataFrame resultante en la función de mapa, columna por columna. Tal vez esto se pueda hacer programáticamente si no conoce el esquema de antemano, pero las cosas pueden ponerse un poco confusas allí. Entonces, alternativamente, hay otra opción:
2)
Puede usar
createDataFrame(rowRDD: RDD[Row], schema: StructType)
como en la respuesta aceptada, que está disponible en el objeto
SQLContext
.
Ejemplo para convertir un RDD de un antiguo DataFrame:
val rdd = oldDF.rdd
val newDF = oldDF.sqlContext.createDataFrame(rdd, oldDF.schema)
Tenga en cuenta que no es necesario establecer explícitamente ninguna columna de esquema.
Reutilizamos el antiguo esquema del DF, que es de la clase
StructType
y puede ampliarse fácilmente.
Sin embargo, este enfoque a veces no es posible, y en algunos casos puede ser menos eficiente que el primero.
One needs to create a schema, and attach it to the Rdd.
Suponiendo que val spark es un producto de un SparkSession.builder ...
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
/* Lets gin up some sample data:
* As RDD''s and dataframes can have columns of differing types, lets make our
* sample data a three wide, two tall, rectangle of mixed types.
* A column of Strings, a column of Longs, and a column of Doubules
*/
val arrayOfArrayOfAnys = Array.ofDim[Any](2,3)
arrayOfArrayOfAnys(0)(0)="aString"
arrayOfArrayOfAnys(0)(1)=0L
arrayOfArrayOfAnys(0)(2)=3.14159
arrayOfArrayOfAnys(1)(0)="bString"
arrayOfArrayOfAnys(1)(1)=9876543210L
arrayOfArrayOfAnys(1)(2)=2.71828
/* The way to convert an anything which looks rectangular,
* (Array[Array[String]] or Array[Array[Any]] or Array[Row], ... ) into an RDD is to
* throw it into sparkContext.parallelize.
* http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext shows
* the parallelize definition as
* def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism)
* so in our case our ArrayOfArrayOfAnys is treated as a sequence of ArraysOfAnys.
* Will leave the numSlices as the defaultParallelism, as I have no particular cause to change it.
*/
val rddOfArrayOfArrayOfAnys=spark.sparkContext.parallelize(arrayOfArrayOfAnys)
/* We''ll be using the sqlContext.createDataFrame to add a schema our RDD.
* The RDD which goes into createDataFrame is an RDD[Row] which is not what we happen to have.
* To convert anything one tall and several wide into a Row, one can use Row.fromSeq(thatThing.toSeq)
* As we have an RDD[somethingWeDontWant], we can map each of the RDD rows into the desired Row type.
*/
val rddOfRows=rddOfArrayOfArrayOfAnys.map(f=>
Row.fromSeq(f.toSeq)
)
/* Now to construct our schema. This needs to be a StructType of 1 StructField per column in our dataframe.
* https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.types.StructField shows the definition as
* case class StructField(name: String, dataType: DataType, nullable: Boolean = true, metadata: Metadata = Metadata.empty)
* Will leave the two default values in place for each of the columns:
* nullability as true,
* metadata as an empty Map[String,Any]
*
*/
val schema = StructType(
StructField("colOfStrings", StringType) ::
StructField("colOfLongs" , LongType ) ::
StructField("colOfDoubles", DoubleType) ::
Nil
)
val df=spark.sqlContext.createDataFrame(rddOfRows,schema)
/*
* +------------+----------+------------+
* |colOfStrings|colOfLongs|colOfDoubles|
* +------------+----------+------------+
* | aString| 0| 3.14159|
* | bString|9876543210| 2.71828|
* +------------+----------+------------+
*/
df.show
Los mismos pasos, pero con menos declaraciones de valores:
val arrayOfArrayOfAnys=Array(
Array("aString",0L ,3.14159),
Array("bString",9876543210L,2.71828)
)
val rddOfRows=spark.sparkContext.parallelize(arrayOfArrayOfAnys).map(f=>Row.fromSeq(f.toSeq))
/* If one knows the datatypes, for instance from JDBC queries as to RDBC column metadata:
* Consider constructing the schema from an Array[StructField]. This would allow looping over
* the columns, with a match statement applying the appropriate sql datatypes as the second
* StructField arguments.
*/
val sf=new Array[StructField](3)
sf(0)=StructField("colOfStrings",StringType)
sf(1)=StructField("colOfLongs" ,LongType )
sf(2)=StructField("colOfDoubles",DoubleType)
val df=spark.sqlContext.createDataFrame(rddOfRows,StructType(sf.toList))
df.show