scala apache-spark spark-dataframe apache-spark-sql rdd

scala - Cómo convertir objetos rdd a dataframe en spark



apache-spark spark-dataframe (10)

¿Cómo puedo convertir un RDD ( org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] ) en un Dataframe org.apache.spark.sql.DataFrame ? Convertí un marco de datos a rdd usando .rdd . Después de procesarlo, lo quiero de vuelta en el marco de datos. Cómo puedo hacer esto ?


Aquí hay un ejemplo simple de convertir su Lista en Spark RDD y luego convertir ese Spark RDD en Dataframe.

Tenga en cuenta que he usado la REPL scala de Spark-shell para ejecutar el siguiente código, Aquí sc es una instancia de SparkContext que está implícitamente disponible en Spark-shell. Espero que responda tu pregunta.

scala> val numList = List(1,2,3,4,5) numList: List[Int] = List(1, 2, 3, 4, 5) scala> val numRDD = sc.parallelize(numList) numRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[80] at parallelize at <console>:28 scala> val numDF = numRDD.toDF numDF: org.apache.spark.sql.DataFrame = [_1: int] scala> numDF.show +---+ | _1| +---+ | 1| | 2| | 3| | 4| | 5| +---+


En versiones más nuevas de spark (2.0+)

import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import org.apache.spark.sql._ import org.apache.spark.sql.types._ val spark = SparkSession .builder() .getOrCreate() import spark.implicits._ val dfSchema = Seq("col1", "col2", "col3") rdd.toDF(dfSchema: _*)


Este código funciona perfectamente desde Spark 2.x con Scala 2.11

Importar clases necesarias

import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}

Crear objeto SparkSession , aquí está la spark

val spark: SparkSession = SparkSession.builder.master("local").getOrCreate val sc = spark.sparkContext // Just used to create test RDDs

Hagamos un RDD para que sea DataFrame

val rdd = sc.parallelize( Seq( ("first", Array(2.0, 1.0, 2.1, 5.4)), ("test", Array(1.5, 0.5, 0.9, 3.7)), ("choose", Array(8.0, 2.9, 9.1, 2.5)) ) )

Método 1

Usando SparkSession.createDataFrame(RDD obj) .

val dfWithoutSchema = spark.createDataFrame(rdd) dfWithoutSchema.show() +------+--------------------+ | _1| _2| +------+--------------------+ | first|[2.0, 1.0, 2.1, 5.4]| | test|[1.5, 0.5, 0.9, 3.7]| |choose|[8.0, 2.9, 9.1, 2.5]| +------+--------------------+

Método 2

Usando SparkSession.createDataFrame(RDD obj) y especificando nombres de columna.

val dfWithSchema = spark.createDataFrame(rdd).toDF("id", "vals") dfWithSchema.show() +------+--------------------+ | id| vals| +------+--------------------+ | first|[2.0, 1.0, 2.1, 5.4]| | test|[1.5, 0.5, 0.9, 3.7]| |choose|[8.0, 2.9, 9.1, 2.5]| +------+--------------------+

Método 3 (respuesta real a la pregunta)

De esta manera, la entrada rdd debe ser del tipo RDD[Row] .

val rowsRdd: RDD[Row] = sc.parallelize( Seq( Row("first", 2.0, 7.0), Row("second", 3.5, 2.5), Row("third", 7.0, 5.9) ) )

crear el esquema

val schema = new StructType() .add(StructField("id", StringType, true)) .add(StructField("val1", DoubleType, true)) .add(StructField("val2", DoubleType, true))

Ahora aplique ambas rowsRdd y schema para createDataFrame()

val df = spark.createDataFrame(rowsRdd, schema) df.show() +------+----+----+ | id|val1|val2| +------+----+----+ | first| 2.0| 7.0| |second| 3.5| 2.5| | third| 7.0| 5.9| +------+----+----+


Método 1: (Scala)

val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ val df_2 = sc.parallelize(Seq((1L, 3.0, "a"), (2L, -1.0, "b"), (3L, 0.0, "c"))).toDF("x", "y", "z")

Método 2: (Scala)

case class temp(val1: String,val3 : Double) val rdd = sc.parallelize(Seq( Row("foo", 0.5), Row("bar", 0.0) )) val rows = rdd.map({case Row(val1:String,val3:Double) => temp(val1,val3)}).toDF() rows.show()

Método 1: (Python)

from pyspark.sql import Row l = [(''Alice'',2)] Person = Row(''name'',''age'') rdd = sc.parallelize(l) person = rdd.map(lambda r:Person(*r)) df2 = sqlContext.createDataFrame(person) df2.show()

Método 2: (Python)

from pyspark.sql.types import * l = [(''Alice'',2)] rdd = sc.parallelize(l) schema = StructType([StructField ("name" , StringType(), True) , StructField("age" , IntegerType(), True)]) df3 = sqlContext.createDataFrame(rdd, schema) df3.show()

Extrajo el valor del objeto de fila y luego aplicó la clase de caso para convertir rdd a DF

val temp1 = attrib1.map{case Row ( key: Int ) => s"$key" } val temp2 = attrib2.map{case Row ( key: Int) => s"$key" } case class RLT (id: String, attrib_1 : String, attrib_2 : String) import hiveContext.implicits._ val df = result.map{ s => RLT(s(0),s(1),s(2)) }.toDF


Para convertir una Array [Row] a DataFrame o Dataset, lo siguiente funciona de manera elegante:

Digamos que el esquema es el StructType para la fila, luego

val rows: Array[Row]=... implicit val encoder = RowEncoder.apply(schema) import spark.implicits._ rows.toDS


Suponga que tiene un DataFrame y desea realizar alguna modificación en los datos de los campos convirtiéndolo en RDD[Row] .

val aRdd = aDF.map(x=>Row(x.getAs[Long]("id"),x.getAs[List[String]]("role").head))

Para volver a convertir a DataFrame desde RDD , necesitamos definir el tipo de estructura del RDD .

Si el tipo de datos era Long , se convertirá en LongType en estructura.

Si String entonces StringType en estructura.

val aStruct = new StructType(Array(StructField("id",LongType,nullable = true),StructField("role",StringType,nullable = true)))

Ahora puede convertir el RDD a DataFrame utilizando el método createDataFrame .

val aNamedDF = sqlContext.createDataFrame(aRdd,aStruct)


Suponiendo que su RDD [fila] se llama rdd, puede usar:

val sqlContext = new SQLContext(sc) import sqlContext.implicits._ rdd.toDF()


SqlContext tiene varios métodos createDataFrame que crean un DataFrame dado un RDD . Me imagino que uno de estos funcionará para su contexto.

Por ejemplo:

def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

Crea un DataFrame a partir de un RDD que contiene filas usando el esquema dado.


Nota: esta respuesta se publicó originalmente here

Estoy publicando esta respuesta porque me gustaría compartir detalles adicionales sobre las opciones disponibles que no encontré en las otras respuestas

Para crear un DataFrame a partir de un RDD de filas, hay dos opciones principales:

1) Como ya se señaló, puede usar toDF() que puede importarse import sqlContext.implicits._ . Sin embargo, este enfoque solo funciona para los siguientes tipos de RDD:

  • RDD[Int]
  • RDD[Long]
  • RDD[String]
  • RDD[T <: scala.Product]

(fuente: Scaladoc del objeto SQLContext.implicits )

La última firma en realidad significa que puede funcionar para un RDD de tuplas o un RDD de clases de casos (porque las tuplas y las clases de casos son subclases de scala.Product ).

Entonces, para usar este enfoque para un RDD[Row] , debe RDD[T <: scala.Product] a un RDD[T <: scala.Product] . Esto se puede hacer asignando cada fila a una clase de caso personalizada o a una tupla, como en los siguientes fragmentos de código:

val df = rdd.map({ case Row(val1: String, ..., valN: Long) => (val1, ..., valN) }).toDF("col1_name", ..., "colN_name")

o

case class MyClass(val1: String, ..., valN: Long = 0L) val df = rdd.map({ case Row(val1: String, ..., valN: Long) => MyClass(val1, ..., valN) }).toDF("col1_name", ..., "colN_name")

El principal inconveniente de este enfoque (en mi opinión) es que debe establecer explícitamente el esquema del DataFrame resultante en la función de mapa, columna por columna. Tal vez esto se pueda hacer programáticamente si no conoce el esquema de antemano, pero las cosas pueden ponerse un poco confusas allí. Entonces, alternativamente, hay otra opción:

2) Puede usar createDataFrame(rowRDD: RDD[Row], schema: StructType) como en la respuesta aceptada, que está disponible en el objeto SQLContext . Ejemplo para convertir un RDD de un antiguo DataFrame:

val rdd = oldDF.rdd val newDF = oldDF.sqlContext.createDataFrame(rdd, oldDF.schema)

Tenga en cuenta que no es necesario establecer explícitamente ninguna columna de esquema. Reutilizamos el antiguo esquema del DF, que es de la clase StructType y puede ampliarse fácilmente. Sin embargo, este enfoque a veces no es posible, y en algunos casos puede ser menos eficiente que el primero.


One needs to create a schema, and attach it to the Rdd.

Suponiendo que val spark es un producto de un SparkSession.builder ...

import org.apache.spark._ import org.apache.spark.sql._ import org.apache.spark.sql.types._ /* Lets gin up some sample data: * As RDD''s and dataframes can have columns of differing types, lets make our * sample data a three wide, two tall, rectangle of mixed types. * A column of Strings, a column of Longs, and a column of Doubules */ val arrayOfArrayOfAnys = Array.ofDim[Any](2,3) arrayOfArrayOfAnys(0)(0)="aString" arrayOfArrayOfAnys(0)(1)=0L arrayOfArrayOfAnys(0)(2)=3.14159 arrayOfArrayOfAnys(1)(0)="bString" arrayOfArrayOfAnys(1)(1)=9876543210L arrayOfArrayOfAnys(1)(2)=2.71828 /* The way to convert an anything which looks rectangular, * (Array[Array[String]] or Array[Array[Any]] or Array[Row], ... ) into an RDD is to * throw it into sparkContext.parallelize. * http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext shows * the parallelize definition as * def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism) * so in our case our ArrayOfArrayOfAnys is treated as a sequence of ArraysOfAnys. * Will leave the numSlices as the defaultParallelism, as I have no particular cause to change it. */ val rddOfArrayOfArrayOfAnys=spark.sparkContext.parallelize(arrayOfArrayOfAnys) /* We''ll be using the sqlContext.createDataFrame to add a schema our RDD. * The RDD which goes into createDataFrame is an RDD[Row] which is not what we happen to have. * To convert anything one tall and several wide into a Row, one can use Row.fromSeq(thatThing.toSeq) * As we have an RDD[somethingWeDontWant], we can map each of the RDD rows into the desired Row type. */ val rddOfRows=rddOfArrayOfArrayOfAnys.map(f=> Row.fromSeq(f.toSeq) ) /* Now to construct our schema. This needs to be a StructType of 1 StructField per column in our dataframe. * https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.types.StructField shows the definition as * case class StructField(name: String, dataType: DataType, nullable: Boolean = true, metadata: Metadata = Metadata.empty) * Will leave the two default values in place for each of the columns: * nullability as true, * metadata as an empty Map[String,Any] * */ val schema = StructType( StructField("colOfStrings", StringType) :: StructField("colOfLongs" , LongType ) :: StructField("colOfDoubles", DoubleType) :: Nil ) val df=spark.sqlContext.createDataFrame(rddOfRows,schema) /* * +------------+----------+------------+ * |colOfStrings|colOfLongs|colOfDoubles| * +------------+----------+------------+ * | aString| 0| 3.14159| * | bString|9876543210| 2.71828| * +------------+----------+------------+ */ df.show

Los mismos pasos, pero con menos declaraciones de valores:

val arrayOfArrayOfAnys=Array( Array("aString",0L ,3.14159), Array("bString",9876543210L,2.71828) ) val rddOfRows=spark.sparkContext.parallelize(arrayOfArrayOfAnys).map(f=>Row.fromSeq(f.toSeq)) /* If one knows the datatypes, for instance from JDBC queries as to RDBC column metadata: * Consider constructing the schema from an Array[StructField]. This would allow looping over * the columns, with a match statement applying the appropriate sql datatypes as the second * StructField arguments. */ val sf=new Array[StructField](3) sf(0)=StructField("colOfStrings",StringType) sf(1)=StructField("colOfLongs" ,LongType ) sf(2)=StructField("colOfDoubles",DoubleType) val df=spark.sqlContext.createDataFrame(rddOfRows,StructType(sf.toList)) df.show