python - multiple - sparksql pivot
Transponer columna a fila con Spark (6)
Estoy tratando de transponer algunas columnas de mi tabla a la fila. Estoy usando Python y Spark 1.5.0. Aquí está mi tabla inicial:
+-----+-----+-----+-------+
| A |col_1|col_2|col_...|
+-----+-------------------+
| 1 | 0.0| 0.6| ... |
| 2 | 0.6| 0.7| ... |
| 3 | 0.5| 0.9| ... |
| ...| ...| ...| ... |
Me gustaría tener algo como esto:
+-----+--------+-----------+
| A | col_id | col_value |
+-----+--------+-----------+
| 1 | col_1| 0.0|
| 1 | col_2| 0.6|
| ...| ...| ...|
| 2 | col_1| 0.6|
| 2 | col_2| 0.7|
| ...| ...| ...|
| 3 | col_1| 0.5|
| 3 | col_2| 0.9|
| ...| ...| ...|
¿Alguien sabe cómo puedo hacerlo? Gracias por tu ayuda.
Es relativamente simple de hacer con las funciones básicas de Spark SQL.
Pitón
from pyspark.sql.functions import array, col, explode, struct, lit
df = sc.parallelize([(1, 0.0, 0.6), (1, 0.6, 0.7)]).toDF(["A", "col_1", "col_2"])
def to_long(df, by):
# Filter dtypes and split into column names and type description
cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
# Spark SQL supports only homogeneous columns
assert len(set(dtypes)) == 1, "All columns have to be of the same type"
# Create and explode an array of (column_name, column_value) structs
kvs = explode(array([
struct(lit(c).alias("key"), col(c).alias("val")) for c in cols
])).alias("kvs")
return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"])
to_long(df, ["A"])
Scala :
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{array, col, explode, lit, struct}
val df = Seq((1, 0.0, 0.6), (1, 0.6, 0.7)).toDF("A", "col_1", "col_2")
def toLong(df: DataFrame, by: Seq[String]): DataFrame = {
val (cols, types) = df.dtypes.filter{ case (c, _) => !by.contains(c)}.unzip
require(types.distinct.size == 1, s"${types.distinct.toString}.length != 1")
val kvs = explode(array(
cols.map(c => struct(lit(c).alias("key"), col(c).alias("val"))): _*
))
val byExprs = by.map(col(_))
df
.select(byExprs :+ kvs.alias("_kvs"): _*)
.select(byExprs ++ Seq($"_kvs.key", $"_kvs.val"): _*)
}
toLong(df, Seq("A"))
Las bibliotecas locales de álgebra lineal de Spark son actualmente muy débiles: y no incluyen operaciones básicas como las anteriores.
Existe una JIRA para arreglar esto para Spark 2.1, pero eso no lo ayudará hoy .
Algo a considerar: realizar una transposición probablemente requerirá mezclar completamente los datos.
Por ahora necesitará escribir el código RDD directamente.
He escrito
transpose
en scala, pero no en python.
Aquí está la versión
scala
:
def transpose(mat: DMatrix) = {
val nCols = mat(0).length
val matT = mat
.flatten
.zipWithIndex
.groupBy {
_._2 % nCols
}
.toSeq.sortBy {
_._1
}
.map(_._2)
.map(_.map(_._1))
.toArray
matT
}
Entonces puede convertir eso a Python para su uso. No tengo ancho de banda para escribir / probar eso en este momento en particular: avíseme si no pudo hacer esa conversión.
Como mínimo, los siguientes se convierten fácilmente a
python
.
-
zipWithIndex
->enumerate()
(equivalente a python - crédito a @ zero323) -
map
->[someOperation(x) for x in ..]
-
groupBy
->itertools.groupBy()
Aquí está la implementación de
flatten
que no tiene un equivalente en python:
def flatten(L):
for item in L:
try:
for i in flatten(item):
yield i
except TypeError:
yield item
Por lo tanto, debería poder juntarlos para encontrar una solución.
Tomé la respuesta de Scala que escribió @javadba y creé una versión de Python para transponer todas las columnas en un
DataFrame
.
Esto podría ser un poco diferente de lo que OP estaba preguntando ...
from itertools import chain
from pyspark.sql import DataFrame
def _sort_transpose_tuple(tup):
x, y = tup
return x, tuple(zip(*sorted(y, key=lambda v_k: v_k[1], reverse=False)))[0]
def transpose(X):
"""Transpose a PySpark DataFrame.
Parameters
----------
X : PySpark ``DataFrame``
The ``DataFrame`` that should be tranposed.
"""
# validate
if not isinstance(X, DataFrame):
raise TypeError(''X should be a DataFrame, not a %s''
% type(X))
cols = X.columns
n_features = len(cols)
# Sorry for this unreadability...
return X.rdd.flatMap( # make into an RDD
lambda xs: chain(xs)).zipWithIndex().groupBy( # zip index
lambda val_idx: val_idx[1] % n_features).sortBy( # group by index % n_features as key
lambda grp_res: grp_res[0]).map( # sort by index % n_features key
lambda grp_res: _sort_transpose_tuple(grp_res)).map( # maintain order
lambda key_col: key_col[1]).toDF() # return to DF
Por ejemplo:
>>> X = sc.parallelize([(1,2,3), (4,5,6), (7,8,9)]).toDF()
>>> X.show()
+---+---+---+
| _1| _2| _3|
+---+---+---+
| 1| 2| 3|
| 4| 5| 6|
| 7| 8| 9|
+---+---+---+
>>> transpose(X).show()
+---+---+---+
| _1| _2| _3|
+---+---+---+
| 1| 4| 7|
| 2| 5| 8|
| 3| 6| 9|
+---+---+---+
Una forma de resolver con
pyspark sql
usando las funciones
create_map
y
explode
.
from pyspark.sql import functions as func
#Use `create_map` to create the map of columns with constant
df = df.withColumn(''mapCol'', /
func.create_map(func.lit(''col_1''),df.col_1,
func.lit(''col_2''),df.col_2,
func.lit(''col_3''),df.col_3
)
)
#Use explode function to explode the map
res = df.select(''*'',func.explode(df.mapCol).alias(''col_id'',''col_value''))
res.show()
Una forma muy práctica de implementar:
from pyspark.sql import Row
def rowExpander(row):
rowDict = row.asDict()
valA = rowDict.pop(''A'')
for k in rowDict:
yield Row(**{''A'': valA , ''colID'' : k, ''colValue'' : row[k]})
newDf = sqlContext.createDataFrame(df.rdd.flatMap(rowExpander)
Utiliza el mapa plano. Algo como a continuación debería funcionar
from pyspark.sql import Row
def rowExpander(row):
rowDict = row.asDict()
valA = rowDict.pop(''A'')
for k in rowDict:
yield Row(**{''A'': valA , ''colID'': k, ''colValue'': row[k]})
newDf = sqlContext.createDataFrame(df.rdd.flatMap(rowExpander))