scala apache-spark dataframe apache-spark-sql

scala - Apache Spark cómo agregar una nueva columna de la lista/matriz al marco de datos de Spark



apache-spark dataframe (2)

Agregar para completar: el hecho de que la list entrada (que existe en la memoria del controlador) tiene el mismo tamaño que el DataFrame sugiere que, para empezar, este es un pequeño DataFrame, por lo que puede considerar collect() , comprimirlo con la list , y convertir de nuevo en un DataFrame si es necesario:

df.collect() .map(_.getAs[String]("row1")) .zip(list).toList .toDF("row1", "row2")

Eso no será más rápido, pero si los datos son realmente pequeños, podría ser insignificante y el código es (posiblemente) más claro.

Estoy usando Apache Spark 2.0 Dataframe / Dataset API. Deseo agregar una nueva columna a mi marco de datos desde la Lista de valores. Mi lista tiene el mismo número de valores como el marco de datos dado.

val list = List(4,5,10,7,2) val df = List("a","b","c","d","e").toDF("row1")

Me gustaría hacer algo como:

val appendedDF = df.withColumn("row2",somefunc(list)) df.show() // +----+------+ // |row1 |row2 | // +----+------+ // |a |4 | // |b |5 | // |c |10 | // |d |7 | // |e |2 | // +----+------+

Para cualquier idea que agradecería, mi marco de datos en realidad contiene más columnas.


Podrías hacerlo así:

import org.apache.spark.sql.Row import org.apache.spark.sql.types._ // create rdd from the list val rdd = sc.parallelize(List(4,5,10,7,2)) // rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[31] at parallelize at <console>:28 // zip the data frame with rdd val rdd_new = df.rdd.zip(rdd).map(r => Row.fromSeq(r._1.toSeq ++ Seq(r._2))) // rdd_new: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[33] at map at <console>:32 // create a new data frame from the rdd_new with modified schema spark.createDataFrame(rdd_new, df.schema.add("new_col", IntegerType)).show +----+-------+ |row1|new_col| +----+-------+ | a| 4| | b| 5| | c| 10| | d| 7| | e| 2| +----+-------+