scala - create - pyspark rdd to dataframe
Convierta RDD a Dataframe en Spark/Scala (3)
Primero debe convertir el Array
en una Row
y luego definir el esquema. Asumí que la mayoría de tus campos son Long
val rdd: RDD[Array[String]] = ???
val rows: RDD[Row] = rdd map {
case Array(callId, oCallId, callTime, duration, swId) =>
Row(callId.toLong, oCallId.toLong, callTime, duration.toLong, swId.toLong)
}
object schema {
val callId = StructField("callId", LongType)
val oCallId = StructField("oCallId", StringType)
val callTime = StructField("callTime", StringType)
val duration = StructField("duration", LongType)
val swId = StructField("swId", LongType)
val struct = StructType(Array(callId, oCallId, callTime, duration, swId))
}
sqlContext.createDataFrame(rows, schema.struct)
El RDD se ha creado con el formato Array[Array[String]]
y tiene los siguientes valores:
Array[Array[String]] = Array(Array(4580056797, 0, 2015-07-29 10:38:42, 0, 1, 1), Array(4580056797, 0, 2015-07-29 10:38:42, 0, 1, 1), Array(4580056797, 0, 2015-07-29 10:38:42, 0, 1, 1), Array(4580057445, 0, 2015-07-29 10:40:37, 0, 1, 1), Array(4580057445, 0, 2015-07-29 10:40:37, 0, 1, 1))
Quiero crear un DataFrame con el esquema:
val schemaString = "callId oCallId callTime duration calltype swId"
Próximos pasos:
scala> val rowRDD = rdd.map(p => Array(p(0), p(1), p(2),p(3),p(4),p(5).trim))
rowRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[14] at map at <console>:39
scala> val calDF = sqlContext.createDataFrame(rowRDD, schema)
Da el siguiente error:
consola: 45: error: valor del método sobrecargado createDataFrame con alternativas: (rdd: org.apache.spark.api.java.JavaRDD [ ], beanClass: Class [ ]) org.apache.spark.sql.DataFrame (rdd: org. apache.spark.rdd.RDD [ ], beanClass: Class [ ]) org.apache.spark.sql.DataFrame (rowRDD: org.apache.spark.api.java.JavaRDD [org.apache.spark.sql.Row] , esquema: org.apache.spark.sql.types.StructType) org.apache.spark.sql.DataFrame (rowRDD: org.apache.spark.rdd.RDD [org.apache.spark.sql.Row], esquema: org.apache.spark.sql.types.StructType) org.apache.spark.sql.DataFrame no se puede aplicar a (org.apache.spark.rdd.RDD [Array [String]],
org.apache.spark.sql.types.StructType) val calDF = sqlContext.createDataFrame (rowRDD, schema)
Solo pegue en una spark-shell
:
val a =
Array(
Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"),
Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"))
val rdd = sc.makeRDD(a)
case class X(callId: String, oCallId: String,
callTime: String, duration: String, calltype: String, swId: String)
Luego map()
sobre el RDD para crear instancias de la clase de caso, y luego cree el DataFrame usando toDF()
:
scala> val df = rdd.map {
case Array(s0, s1, s2, s3, s4, s5) => X(s0, s1, s2, s3, s4, s5) }.toDF()
df: org.apache.spark.sql.DataFrame =
[callId: string, oCallId: string, callTime: string,
duration: string, calltype: string, swId: string]
Esto infiere el esquema de la clase de caso.
Entonces puedes continuar con:
scala> df.printSchema()
root
|-- callId: string (nullable = true)
|-- oCallId: string (nullable = true)
|-- callTime: string (nullable = true)
|-- duration: string (nullable = true)
|-- calltype: string (nullable = true)
|-- swId: string (nullable = true)
scala> df.show()
+----------+-------+-------------------+--------+--------+----+
| callId|oCallId| callTime|duration|calltype|swId|
+----------+-------+-------------------+--------+--------+----+
|4580056797| 0|2015-07-29 10:38:42| 0| 1| 1|
|4580056797| 0|2015-07-29 10:38:42| 0| 1| 1|
+----------+-------+-------------------+--------+--------+----+
Si desea usar toDF()
en un programa normal (no en el spark-shell
), asegúrese (citado desde aquí ):
- Para
import sqlContext.implicits._
justo después de crear elSQLContext
- Defina la clase de caso fuera del método usando
toDF()
Supongo que su schema
es, como en la Guía de chispa , de la siguiente manera:
val schema =
StructType(
schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
Si observa la firma del createDataFrame , aquí está el que acepta un StructType como segundo argumento (para Scala)
def createDataFrame (rowRDD: RDD [Row], esquema: StructType): DataFrame
Crea un DataFrame desde un RDD que contiene Filas usando el esquema dado.
Entonces acepta como primer argumento un RDD[Row]
. Lo que tienes en rowRDD
es un RDD[Array[String]]
por lo que hay una discrepancia.
¿Necesita un RDD[Array[String]]
?
De lo contrario, puede usar lo siguiente para crear su marco de datos:
val rowRDD = rdd.map(p => Row(p(0), p(1), p(2),p(3),p(4),p(5).trim))