from create scala hadoop apache-spark

scala - create - pyspark rdd to dataframe



Convierta RDD a Dataframe en Spark/Scala (3)

Primero debe convertir el Array en una Row y luego definir el esquema. Asumí que la mayoría de tus campos son Long

val rdd: RDD[Array[String]] = ??? val rows: RDD[Row] = rdd map { case Array(callId, oCallId, callTime, duration, swId) => Row(callId.toLong, oCallId.toLong, callTime, duration.toLong, swId.toLong) } object schema { val callId = StructField("callId", LongType) val oCallId = StructField("oCallId", StringType) val callTime = StructField("callTime", StringType) val duration = StructField("duration", LongType) val swId = StructField("swId", LongType) val struct = StructType(Array(callId, oCallId, callTime, duration, swId)) } sqlContext.createDataFrame(rows, schema.struct)

El RDD se ha creado con el formato Array[Array[String]] y tiene los siguientes valores:

Array[Array[String]] = Array(Array(4580056797, 0, 2015-07-29 10:38:42, 0, 1, 1), Array(4580056797, 0, 2015-07-29 10:38:42, 0, 1, 1), Array(4580056797, 0, 2015-07-29 10:38:42, 0, 1, 1), Array(4580057445, 0, 2015-07-29 10:40:37, 0, 1, 1), Array(4580057445, 0, 2015-07-29 10:40:37, 0, 1, 1))

Quiero crear un DataFrame con el esquema:

val schemaString = "callId oCallId callTime duration calltype swId"

Próximos pasos:

scala> val rowRDD = rdd.map(p => Array(p(0), p(1), p(2),p(3),p(4),p(5).trim)) rowRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[14] at map at <console>:39 scala> val calDF = sqlContext.createDataFrame(rowRDD, schema)

Da el siguiente error:

consola: 45: error: valor del método sobrecargado createDataFrame con alternativas: (rdd: org.apache.spark.api.java.JavaRDD [ ], beanClass: Class [ ]) org.apache.spark.sql.DataFrame (rdd: org. apache.spark.rdd.RDD [ ], beanClass: Class [ ]) org.apache.spark.sql.DataFrame (rowRDD: org.apache.spark.api.java.JavaRDD [org.apache.spark.sql.Row] , esquema: org.apache.spark.sql.types.StructType) org.apache.spark.sql.DataFrame (rowRDD: org.apache.spark.rdd.RDD [org.apache.spark.sql.Row], esquema: org.apache.spark.sql.types.StructType) org.apache.spark.sql.DataFrame no se puede aplicar a (org.apache.spark.rdd.RDD [Array [String]],
org.apache.spark.sql.types.StructType) val calDF = sqlContext.createDataFrame (rowRDD, schema)


Solo pegue en una spark-shell :

val a = Array( Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"), Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1")) val rdd = sc.makeRDD(a) case class X(callId: String, oCallId: String, callTime: String, duration: String, calltype: String, swId: String)

Luego map() sobre el RDD para crear instancias de la clase de caso, y luego cree el DataFrame usando toDF() :

scala> val df = rdd.map { case Array(s0, s1, s2, s3, s4, s5) => X(s0, s1, s2, s3, s4, s5) }.toDF() df: org.apache.spark.sql.DataFrame = [callId: string, oCallId: string, callTime: string, duration: string, calltype: string, swId: string]

Esto infiere el esquema de la clase de caso.

Entonces puedes continuar con:

scala> df.printSchema() root |-- callId: string (nullable = true) |-- oCallId: string (nullable = true) |-- callTime: string (nullable = true) |-- duration: string (nullable = true) |-- calltype: string (nullable = true) |-- swId: string (nullable = true) scala> df.show() +----------+-------+-------------------+--------+--------+----+ | callId|oCallId| callTime|duration|calltype|swId| +----------+-------+-------------------+--------+--------+----+ |4580056797| 0|2015-07-29 10:38:42| 0| 1| 1| |4580056797| 0|2015-07-29 10:38:42| 0| 1| 1| +----------+-------+-------------------+--------+--------+----+

Si desea usar toDF() en un programa normal (no en el spark-shell ), asegúrese (citado desde aquí ):

  • Para import sqlContext.implicits._ justo después de crear el SQLContext
  • Defina la clase de caso fuera del método usando toDF()

Supongo que su schema es, como en la Guía de chispa , de la siguiente manera:

val schema = StructType( schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

Si observa la firma del createDataFrame , aquí está el que acepta un StructType como segundo argumento (para Scala)

def createDataFrame (rowRDD: RDD [Row], esquema: StructType): DataFrame

Crea un DataFrame desde un RDD que contiene Filas usando el esquema dado.

Entonces acepta como primer argumento un RDD[Row] . Lo que tienes en rowRDD es un RDD[Array[String]] por lo que hay una discrepancia.

¿Necesita un RDD[Array[String]] ?

De lo contrario, puede usar lo siguiente para crear su marco de datos:

val rowRDD = rdd.map(p => Row(p(0), p(1), p(2),p(3),p(4),p(5).trim))