scala - Apache Spark cómo agregar una nueva columna de la lista/matriz al marco de datos de Spark
apache-spark dataframe (2)
Agregar para completar: el hecho de que la
list
entrada (que existe en la memoria del controlador) tiene el mismo tamaño que el
DataFrame
sugiere que, para empezar, este es un pequeño DataFrame, por lo que puede considerar
collect()
, comprimirlo con la
list
, y convertir de nuevo en un
DataFrame
si es necesario:
df.collect()
.map(_.getAs[String]("row1"))
.zip(list).toList
.toDF("row1", "row2")
Eso no será más rápido, pero si los datos son realmente pequeños, podría ser insignificante y el código es (posiblemente) más claro.
Estoy usando Apache Spark 2.0 Dataframe / Dataset API. Deseo agregar una nueva columna a mi marco de datos desde la Lista de valores. Mi lista tiene el mismo número de valores como el marco de datos dado.
val list = List(4,5,10,7,2)
val df = List("a","b","c","d","e").toDF("row1")
Me gustaría hacer algo como:
val appendedDF = df.withColumn("row2",somefunc(list))
df.show()
// +----+------+
// |row1 |row2 |
// +----+------+
// |a |4 |
// |b |5 |
// |c |10 |
// |d |7 |
// |e |2 |
// +----+------+
Para cualquier idea que agradecería, mi marco de datos en realidad contiene más columnas.
Podrías hacerlo así:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
// create rdd from the list
val rdd = sc.parallelize(List(4,5,10,7,2))
// rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[31] at parallelize at <console>:28
// zip the data frame with rdd
val rdd_new = df.rdd.zip(rdd).map(r => Row.fromSeq(r._1.toSeq ++ Seq(r._2)))
// rdd_new: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[33] at map at <console>:32
// create a new data frame from the rdd_new with modified schema
spark.createDataFrame(rdd_new, df.schema.add("new_col", IntegerType)).show
+----+-------+
|row1|new_col|
+----+-------+
| a| 4|
| b| 5|
| c| 10|
| d| 7|
| e| 2|
+----+-------+