scala - example - spark sql tutorial
Agregar una columna al marco de datos en Apache Spark 1.3 (4)
¿Es posible y cuál sería el método ordenado más eficiente para agregar una columna al Marco de datos?
Más específicamente, la columna puede servir como ID de fila para el marco de datos existente.
En un caso simplificado, leyendo el archivo y no tokenizando, puedo pensar en algo como a continuación (en Scala), pero se completa con errores (en la línea 3), y de todos modos no parece la mejor ruta posible:
var dataDF = sc.textFile("path/file").toDF()
val rowDF = sc.parallelize(1 to DataDF.count().toInt).toDF("ID")
dataDF = dataDF.withColumn("ID", rowDF("ID"))
Ha pasado un tiempo desde que publiqué la pregunta y parece que a otras personas también les gustaría obtener una respuesta. Debajo está lo que encontré.
Por lo tanto, la tarea original era agregar una columna con identificadores de fila (básicamente, una secuencia
1 to numRows
) a cualquier marco de datos dado, para que se pueda rastrear el orden / presencia de las filas (por ejemplo, cuando muestrea).
Esto se puede lograr mediante algo en este sentido:
sqlContext.textFile(file).
zipWithIndex().
map(case(d, i)=>i.toString + delimiter + d).
map(_.split(delimiter)).
map(s=>Row.fromSeq(s.toSeq))
Con respecto al caso general de agregar cualquier columna a cualquier marco de datos:
Los "más cercanos" a esta funcionalidad en Spark API son
withColumn
y
withColumnRenamed
.
Según los
documentos de Scala
, el primero
devuelve un nuevo DataFrame agregando una columna
.
En mi opinión, esta es una definición un poco confusa e incompleta.
Ambas funciones pueden operar solo en
this
marco de datos, es decir, dados dos marcos de datos
df1
y
df2
con columna
col
:
val df = df1.withColumn("newCol", df1("col") + 1) // -- OK
val df = df1.withColumn("newCol", df2("col") + 1) // -- FAIL
Por lo tanto, a menos que pueda transformar una columna en un marco de datos existente a la forma que necesita, no puede usar
withColumn
o
withColumnRenamed
para
withColumnRenamed
columnas arbitrarias (marcos de datos independientes u otros).
Como se comentó anteriormente, la solución alternativa puede ser usar una
join
, lo que sería bastante complicado, aunque posible, adjuntar las claves únicas como arriba con
zipWithIndex
a ambos marcos de datos o columnas podría funcionar.
Aunque la eficiencia es ...
Está claro que agregar una columna al marco de datos no es una funcionalidad fácil para el entorno distribuido y puede que no haya un método muy eficiente y ordenado para eso. Pero creo que sigue siendo muy importante tener esta funcionalidad central disponible, incluso con advertencias de rendimiento.
Puede usar row_number con la función Window como se muestra a continuación para obtener la identificación distinta para cada fila en un marco de datos.
df.withColumn("ID", row_number() over Window.orderBy("any column name in the dataframe"))
También puede usar
monotonically_increasing_id
para lo mismo que
df.withColumn("ID", monotonically_increasing_id())
Y también hay otras formas .
Tomé ayuda de la respuesta anterior.
Sin embargo, me parece incompleto si queremos cambiar un
DataFrame
y las API actuales son un poco diferentes en
Spark 1.6
.
zipWithIndex()
devuelve una
Tuple
de
(Row, Long)
que contiene cada fila y el índice correspondiente.
Podemos usarlo para crear una nueva
Row
acuerdo a nuestras necesidades.
val rdd = df.rdd.zipWithIndex()
.map(indexedRow => Row.fromSeq(indexedRow._2.toString +: indexedRow._1.toSeq))
val newstructure = StructType(Seq(StructField("Row number", StringType, true)).++(df.schema.fields))
sqlContext.createDataFrame(rdd, newstructure ).show
Espero que esto sea útil.
no estoy seguro si funciona en spark 1.3 pero en spark 1.5 lo uso con Columna:
import sqlContext.implicits._
import org.apache.spark.sql.functions._
df.withColumn("newName",lit("newValue"))
Lo uso cuando necesito usar un valor que no está relacionado con las columnas existentes del marco de datos
Esto es similar a la respuesta de @ NehaM pero más simple