w3schools sparksql multiple how example columns python apache-spark pivot transpose

python - multiple - sparksql pivot



Transponer columna a fila con Spark (6)

Estoy tratando de transponer algunas columnas de mi tabla a la fila. Estoy usando Python y Spark 1.5.0. Aquí está mi tabla inicial:

+-----+-----+-----+-------+ | A |col_1|col_2|col_...| +-----+-------------------+ | 1 | 0.0| 0.6| ... | | 2 | 0.6| 0.7| ... | | 3 | 0.5| 0.9| ... | | ...| ...| ...| ... |

Me gustaría tener algo como esto:

+-----+--------+-----------+ | A | col_id | col_value | +-----+--------+-----------+ | 1 | col_1| 0.0| | 1 | col_2| 0.6| | ...| ...| ...| | 2 | col_1| 0.6| | 2 | col_2| 0.7| | ...| ...| ...| | 3 | col_1| 0.5| | 3 | col_2| 0.9| | ...| ...| ...|

¿Alguien sabe cómo puedo hacerlo? Gracias por tu ayuda.


Es relativamente simple de hacer con las funciones básicas de Spark SQL.

Pitón

from pyspark.sql.functions import array, col, explode, struct, lit df = sc.parallelize([(1, 0.0, 0.6), (1, 0.6, 0.7)]).toDF(["A", "col_1", "col_2"]) def to_long(df, by): # Filter dtypes and split into column names and type description cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by)) # Spark SQL supports only homogeneous columns assert len(set(dtypes)) == 1, "All columns have to be of the same type" # Create and explode an array of (column_name, column_value) structs kvs = explode(array([ struct(lit(c).alias("key"), col(c).alias("val")) for c in cols ])).alias("kvs") return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"]) to_long(df, ["A"])

Scala :

import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions.{array, col, explode, lit, struct} val df = Seq((1, 0.0, 0.6), (1, 0.6, 0.7)).toDF("A", "col_1", "col_2") def toLong(df: DataFrame, by: Seq[String]): DataFrame = { val (cols, types) = df.dtypes.filter{ case (c, _) => !by.contains(c)}.unzip require(types.distinct.size == 1, s"${types.distinct.toString}.length != 1") val kvs = explode(array( cols.map(c => struct(lit(c).alias("key"), col(c).alias("val"))): _* )) val byExprs = by.map(col(_)) df .select(byExprs :+ kvs.alias("_kvs"): _*) .select(byExprs ++ Seq($"_kvs.key", $"_kvs.val"): _*) } toLong(df, Seq("A"))


Las bibliotecas locales de álgebra lineal de Spark son actualmente muy débiles: y no incluyen operaciones básicas como las anteriores.

Existe una JIRA para arreglar esto para Spark 2.1, pero eso no lo ayudará hoy .

Algo a considerar: realizar una transposición probablemente requerirá mezclar completamente los datos.

Por ahora necesitará escribir el código RDD directamente. He escrito transpose en scala, pero no en python. Aquí está la versión scala :

def transpose(mat: DMatrix) = { val nCols = mat(0).length val matT = mat .flatten .zipWithIndex .groupBy { _._2 % nCols } .toSeq.sortBy { _._1 } .map(_._2) .map(_.map(_._1)) .toArray matT }

Entonces puede convertir eso a Python para su uso. No tengo ancho de banda para escribir / probar eso en este momento en particular: avíseme si no pudo hacer esa conversión.

Como mínimo, los siguientes se convierten fácilmente a python .

  • zipWithIndex -> enumerate() (equivalente a python - crédito a @ zero323)
  • map -> [someOperation(x) for x in ..]
  • groupBy -> itertools.groupBy()

Aquí está la implementación de flatten que no tiene un equivalente en python:

def flatten(L): for item in L: try: for i in flatten(item): yield i except TypeError: yield item

Por lo tanto, debería poder juntarlos para encontrar una solución.


Tomé la respuesta de Scala que escribió @javadba y creé una versión de Python para transponer todas las columnas en un DataFrame . Esto podría ser un poco diferente de lo que OP estaba preguntando ...

from itertools import chain from pyspark.sql import DataFrame def _sort_transpose_tuple(tup): x, y = tup return x, tuple(zip(*sorted(y, key=lambda v_k: v_k[1], reverse=False)))[0] def transpose(X): """Transpose a PySpark DataFrame. Parameters ---------- X : PySpark ``DataFrame`` The ``DataFrame`` that should be tranposed. """ # validate if not isinstance(X, DataFrame): raise TypeError(''X should be a DataFrame, not a %s'' % type(X)) cols = X.columns n_features = len(cols) # Sorry for this unreadability... return X.rdd.flatMap( # make into an RDD lambda xs: chain(xs)).zipWithIndex().groupBy( # zip index lambda val_idx: val_idx[1] % n_features).sortBy( # group by index % n_features as key lambda grp_res: grp_res[0]).map( # sort by index % n_features key lambda grp_res: _sort_transpose_tuple(grp_res)).map( # maintain order lambda key_col: key_col[1]).toDF() # return to DF

Por ejemplo:

>>> X = sc.parallelize([(1,2,3), (4,5,6), (7,8,9)]).toDF() >>> X.show() +---+---+---+ | _1| _2| _3| +---+---+---+ | 1| 2| 3| | 4| 5| 6| | 7| 8| 9| +---+---+---+ >>> transpose(X).show() +---+---+---+ | _1| _2| _3| +---+---+---+ | 1| 4| 7| | 2| 5| 8| | 3| 6| 9| +---+---+---+


Una forma de resolver con pyspark sql usando las funciones create_map y explode .

from pyspark.sql import functions as func #Use `create_map` to create the map of columns with constant df = df.withColumn(''mapCol'', / func.create_map(func.lit(''col_1''),df.col_1, func.lit(''col_2''),df.col_2, func.lit(''col_3''),df.col_3 ) ) #Use explode function to explode the map res = df.select(''*'',func.explode(df.mapCol).alias(''col_id'',''col_value'')) res.show()


Una forma muy práctica de implementar:

from pyspark.sql import Row def rowExpander(row): rowDict = row.asDict() valA = rowDict.pop(''A'') for k in rowDict: yield Row(**{''A'': valA , ''colID'' : k, ''colValue'' : row[k]}) newDf = sqlContext.createDataFrame(df.rdd.flatMap(rowExpander)


Utiliza el mapa plano. Algo como a continuación debería funcionar

from pyspark.sql import Row def rowExpander(row): rowDict = row.asDict() valA = rowDict.pop(''A'') for k in rowDict: yield Row(**{''A'': valA , ''colID'': k, ''colValue'': row[k]}) newDf = sqlContext.createDataFrame(df.rdd.flatMap(rowExpander))