pandas - inner - ¿Cómo derretir Spark DataFrame?
pandas join (4)
¿Hay un equivalente de la función Pandas Melt en Apache Spark en PySpark o al menos en Scala?
Estaba ejecutando un conjunto de datos de muestra hasta ahora en python y ahora quiero usar Spark para todo el conjunto de datos.
Gracias por adelantado.
UPD
Finalmente encontré la implementación más efectiva para mí. Utiliza todos los recursos para el clúster en mi configuración de hilo.
import pandas as pd
pdf = pd.DataFrame({''A'': {0: ''a'', 1: ''b'', 2: ''c''},
''B'': {0: 1, 1: 3, 2: 5},
''C'': {0: 2, 1: 4, 2: 6},
''D'': {1: 7, 2: 9}})
pd.melt(pdf, id_vars=[''A''], value_vars=[''B'', ''C'', ''D''])
A variable value
0 a B 1.0
1 b B 3.0
2 c B 5.0
3 a C 2.0
4 b C 4.0
5 c C 6.0
6 a D NaN
7 b D 7.0
8 c D 9.0
Para un marco de datos muy amplio, el rendimiento disminuye con la generación de _vars_and_vals a partir de la respuesta del usuario 6910411.
Fue útil implementar la fusión a través de selectExpr
sdf = spark.createDataFrame(pdf)
melt(sdf, id_vars=[''A''], value_vars=[''B'', ''C'', ''D'']).show()
+---+--------+-----+
| A|variable|value|
+---+--------+-----+
| a| B| 1.0|
| a| C| 2.0|
| a| D| NaN|
| b| B| 3.0|
| b| C| 4.0|
| b| D| 7.0|
| c| B| 5.0|
| c| C| 6.0|
| c| D| 9.0|
+---+--------+-----+
Encontré esta pregunta en mi búsqueda de una implementación de
melt
en Spark para Scala.
Publicar mi puerto Scala en caso de que alguien también se encuentre con esto.
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame}
/** Extends the [[org.apache.spark.sql.DataFrame]] class
*
* @param df the data frame to melt
*/
implicit class DataFrameFunctions(df: DataFrame) {
/** Convert [[org.apache.spark.sql.DataFrame]] from wide to long format.
*
* melt is (kind of) the inverse of pivot
* melt is currently (02/2017) not implemented in spark
*
* @see reshape packe in R (https://cran.r-project.org/web/packages/reshape/index.html)
* @see this is a scala adaptation of http://.com/questions/41670103/pandas-melt-function-in-apache-spark
*
* @todo method overloading for simple calling
*
* @param id_vars the columns to preserve
* @param value_vars the columns to melt
* @param var_name the name for the column holding the melted columns names
* @param value_name the name for the column holding the values of the melted columns
*
*/
def melt(
id_vars: Seq[String], value_vars: Seq[String],
var_name: String = "variable", value_name: String = "value") : DataFrame = {
// Create array<struct<variable: str, value: ...>>
val _vars_and_vals = array((for (c <- value_vars) yield { struct(lit(c).alias(var_name), col(c).alias(value_name)) }): _*)
// Add to the DataFrame and explode
val _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))
val cols = id_vars.map(col _) ++ { for (x <- List(var_name, value_name)) yield { col("_vars_and_vals")(x).alias(x) }}
return _tmp.select(cols: _*)
}
}
Como no estoy tan avanzado teniendo en cuenta
Scala
, estoy seguro de que hay margen de mejora.
Cualquier comentario es bienvenido.
No hay una función integrada (si trabaja con el soporte de SQL y Hive habilitado, puede usar la
función de
stack
, pero no está expuesta en Spark y no tiene una implementación nativa), pero es trivial rodar la suya.
Importaciones requeridas:
from pyspark.sql.functions import array, col, explode, lit, struct
from pyspark.sql import DataFrame
from typing import Iterable
Implementación de ejemplo:
def melt(
df: DataFrame,
id_vars: Iterable[str], value_vars: Iterable[str],
var_name: str="variable", value_name: str="value") -> DataFrame:
"""Convert :class:`DataFrame` from wide to long format."""
# Create array<struct<variable: str, value: ...>>
_vars_and_vals = array(*(
struct(lit(c).alias(var_name), col(c).alias(value_name))
for c in value_vars))
# Add to the DataFrame and explode
_tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))
cols = id_vars + [
col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
return _tmp.select(*cols)
Y algunas pruebas (basadas en los doctest de Pandas ):
import pandas as pd
pdf = pd.DataFrame({''A'': {0: ''a'', 1: ''b'', 2: ''c''},
''B'': {0: 1, 1: 3, 2: 5},
''C'': {0: 2, 1: 4, 2: 6}})
pd.melt(pdf, id_vars=[''A''], value_vars=[''B'', ''C''])
A variable value
0 a B 1
1 b B 3
2 c B 5
3 a C 2
4 b C 4
5 c C 6
sdf = spark.createDataFrame(pdf)
melt(sdf, id_vars=[''A''], value_vars=[''B'', ''C'']).show()
+---+--------+-----+
| A|variable|value|
+---+--------+-----+
| a| B| 1|
| a| C| 2|
| b| B| 3|
| b| C| 4|
| c| B| 5|
| c| C| 6|
+---+--------+-----+
Nota: Para usar con versiones heredadas de Python, elimine las anotaciones de tipo.
Relacionado:
- r sparkR - equivalente a la función de fusión
- Reúnanse en sparklyr
Votado por la respuesta del usuario 6910411. Funciona como se esperaba, sin embargo, no puede manejar bien ninguno de los valores. así refactoré su función de fusión a lo siguiente:
from pyspark.sql.functions import explode
def melt(df):
sp = df.columns[1:]
return (df
.rdd
.map(lambda x: [str(x[0]), [(str(i[0]),
float(i[1] if i[1] else 0)) for i in zip(sp, x[1:])]],
preservesPartitioning = True)
.toDF()
.withColumn(''_2'', explode(''_2''))
.rdd.map(lambda x: [str(x[0]),
str(x[1][0]),
float(x[1][1] if x[1][1] else 0)],
preservesPartitioning = True)
.toDF()
)
La prueba es con el siguiente marco de datos:
columns=[''a'', ''b'', ''c'', ''d'', ''e'', ''f'']
pd_df = pd.DataFrame([[1,2,3,4,5,6], [4,5,6,7,9,8], [7,8,9,1,2,4], [8,3,9,8,7,4]], columns=columns)
df = spark.createDataFrame(pd_df)
+---+---+---+---+---+---+
| a| b| c| d| e| f|
+---+---+---+---+---+---+
| 1| 2| 3| 4| 5| 6|
| 4| 5| 6| 7| 9| 8|
| 7| 8| 9| 1| 2| 4|
| 8| 3| 9| 8| 7| 4|
+---+---+---+---+---+---+
cols = df.columns[1:]
df.selectExpr(''a'', "stack({}, {})".format(len(cols), '', ''.join(("''{}'', {}".format(i, i) for i in cols))))
+---+----+----+
| a|col0|col1|
+---+----+----+
| 1| b| 2|
| 1| c| 3|
| 1| d| 4|
| 1| e| 5|
| 1| f| 6|
| 4| b| 5|
| 4| c| 6|
| 4| d| 7|
| 4| e| 9|
| 4| f| 8|
| 7| b| 8|
| 7| c| 9|
...
from pyspark.sql.functions import array, col, explode, lit
from pyspark.sql.functions import create_map
from pyspark.sql import DataFrame
from typing import Iterable
from itertools import chain
def melt(
df: DataFrame,
id_vars: Iterable[str], value_vars: Iterable[str],
var_name: str="variable", value_name: str="value") -> DataFrame:
"""Convert :class:`DataFrame` from wide to long format."""
# Create map<key: value>
_vars_and_vals = create_map(
list(chain.from_iterable([
[lit(c), col(c)] for c in value_vars]
))
)
_tmp = df.select(*id_vars, explode(_vars_and_vals)) /
.withColumnRenamed(''key'', var_name) /
.withColumnRenamed(''value'', value_name)
return _tmp