inner pandas apache-spark pyspark apache-spark-sql melt

pandas - inner - ¿Cómo derretir Spark DataFrame?



pandas join (4)

¿Hay un equivalente de la función Pandas Melt en Apache Spark en PySpark o al menos en Scala?

Estaba ejecutando un conjunto de datos de muestra hasta ahora en python y ahora quiero usar Spark para todo el conjunto de datos.

Gracias por adelantado.


UPD

Finalmente encontré la implementación más efectiva para mí. Utiliza todos los recursos para el clúster en mi configuración de hilo.

import pandas as pd pdf = pd.DataFrame({''A'': {0: ''a'', 1: ''b'', 2: ''c''}, ''B'': {0: 1, 1: 3, 2: 5}, ''C'': {0: 2, 1: 4, 2: 6}, ''D'': {1: 7, 2: 9}}) pd.melt(pdf, id_vars=[''A''], value_vars=[''B'', ''C'', ''D'']) A variable value 0 a B 1.0 1 b B 3.0 2 c B 5.0 3 a C 2.0 4 b C 4.0 5 c C 6.0 6 a D NaN 7 b D 7.0 8 c D 9.0

Para un marco de datos muy amplio, el rendimiento disminuye con la generación de _vars_and_vals a partir de la respuesta del usuario 6910411.

Fue útil implementar la fusión a través de selectExpr

sdf = spark.createDataFrame(pdf) melt(sdf, id_vars=[''A''], value_vars=[''B'', ''C'', ''D'']).show() +---+--------+-----+ | A|variable|value| +---+--------+-----+ | a| B| 1.0| | a| C| 2.0| | a| D| NaN| | b| B| 3.0| | b| C| 4.0| | b| D| 7.0| | c| B| 5.0| | c| C| 6.0| | c| D| 9.0| +---+--------+-----+


Encontré esta pregunta en mi búsqueda de una implementación de melt en Spark para Scala.

Publicar mi puerto Scala en caso de que alguien también se encuentre con esto.

import org.apache.spark.sql.functions._ import org.apache.spark.sql.{DataFrame} /** Extends the [[org.apache.spark.sql.DataFrame]] class * * @param df the data frame to melt */ implicit class DataFrameFunctions(df: DataFrame) { /** Convert [[org.apache.spark.sql.DataFrame]] from wide to long format. * * melt is (kind of) the inverse of pivot * melt is currently (02/2017) not implemented in spark * * @see reshape packe in R (https://cran.r-project.org/web/packages/reshape/index.html) * @see this is a scala adaptation of http://.com/questions/41670103/pandas-melt-function-in-apache-spark * * @todo method overloading for simple calling * * @param id_vars the columns to preserve * @param value_vars the columns to melt * @param var_name the name for the column holding the melted columns names * @param value_name the name for the column holding the values of the melted columns * */ def melt( id_vars: Seq[String], value_vars: Seq[String], var_name: String = "variable", value_name: String = "value") : DataFrame = { // Create array<struct<variable: str, value: ...>> val _vars_and_vals = array((for (c <- value_vars) yield { struct(lit(c).alias(var_name), col(c).alias(value_name)) }): _*) // Add to the DataFrame and explode val _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals)) val cols = id_vars.map(col _) ++ { for (x <- List(var_name, value_name)) yield { col("_vars_and_vals")(x).alias(x) }} return _tmp.select(cols: _*) } }

Como no estoy tan avanzado teniendo en cuenta Scala , estoy seguro de que hay margen de mejora.

Cualquier comentario es bienvenido.


No hay una función integrada (si trabaja con el soporte de SQL y Hive habilitado, puede usar la función de stack , pero no está expuesta en Spark y no tiene una implementación nativa), pero es trivial rodar la suya. Importaciones requeridas:

from pyspark.sql.functions import array, col, explode, lit, struct from pyspark.sql import DataFrame from typing import Iterable

Implementación de ejemplo:

def melt( df: DataFrame, id_vars: Iterable[str], value_vars: Iterable[str], var_name: str="variable", value_name: str="value") -> DataFrame: """Convert :class:`DataFrame` from wide to long format.""" # Create array<struct<variable: str, value: ...>> _vars_and_vals = array(*( struct(lit(c).alias(var_name), col(c).alias(value_name)) for c in value_vars)) # Add to the DataFrame and explode _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals)) cols = id_vars + [ col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]] return _tmp.select(*cols)

Y algunas pruebas (basadas en los doctest de Pandas ):

import pandas as pd pdf = pd.DataFrame({''A'': {0: ''a'', 1: ''b'', 2: ''c''}, ''B'': {0: 1, 1: 3, 2: 5}, ''C'': {0: 2, 1: 4, 2: 6}}) pd.melt(pdf, id_vars=[''A''], value_vars=[''B'', ''C''])

A variable value 0 a B 1 1 b B 3 2 c B 5 3 a C 2 4 b C 4 5 c C 6

sdf = spark.createDataFrame(pdf) melt(sdf, id_vars=[''A''], value_vars=[''B'', ''C'']).show()

+---+--------+-----+ | A|variable|value| +---+--------+-----+ | a| B| 1| | a| C| 2| | b| B| 3| | b| C| 4| | c| B| 5| | c| C| 6| +---+--------+-----+

Nota: Para usar con versiones heredadas de Python, elimine las anotaciones de tipo.

Relacionado:

  • r sparkR - equivalente a la función de fusión
  • Reúnanse en sparklyr

Votado por la respuesta del usuario 6910411. Funciona como se esperaba, sin embargo, no puede manejar bien ninguno de los valores. así refactoré su función de fusión a lo siguiente:

from pyspark.sql.functions import explode def melt(df): sp = df.columns[1:] return (df .rdd .map(lambda x: [str(x[0]), [(str(i[0]), float(i[1] if i[1] else 0)) for i in zip(sp, x[1:])]], preservesPartitioning = True) .toDF() .withColumn(''_2'', explode(''_2'')) .rdd.map(lambda x: [str(x[0]), str(x[1][0]), float(x[1][1] if x[1][1] else 0)], preservesPartitioning = True) .toDF() )

La prueba es con el siguiente marco de datos:

columns=[''a'', ''b'', ''c'', ''d'', ''e'', ''f''] pd_df = pd.DataFrame([[1,2,3,4,5,6], [4,5,6,7,9,8], [7,8,9,1,2,4], [8,3,9,8,7,4]], columns=columns) df = spark.createDataFrame(pd_df) +---+---+---+---+---+---+ | a| b| c| d| e| f| +---+---+---+---+---+---+ | 1| 2| 3| 4| 5| 6| | 4| 5| 6| 7| 9| 8| | 7| 8| 9| 1| 2| 4| | 8| 3| 9| 8| 7| 4| +---+---+---+---+---+---+ cols = df.columns[1:] df.selectExpr(''a'', "stack({}, {})".format(len(cols), '', ''.join(("''{}'', {}".format(i, i) for i in cols)))) +---+----+----+ | a|col0|col1| +---+----+----+ | 1| b| 2| | 1| c| 3| | 1| d| 4| | 1| e| 5| | 1| f| 6| | 4| b| 5| | 4| c| 6| | 4| d| 7| | 4| e| 9| | 4| f| 8| | 7| b| 8| | 7| c| 9| ...

from pyspark.sql.functions import array, col, explode, lit from pyspark.sql.functions import create_map from pyspark.sql import DataFrame from typing import Iterable from itertools import chain def melt( df: DataFrame, id_vars: Iterable[str], value_vars: Iterable[str], var_name: str="variable", value_name: str="value") -> DataFrame: """Convert :class:`DataFrame` from wide to long format.""" # Create map<key: value> _vars_and_vals = create_map( list(chain.from_iterable([ [lit(c), col(c)] for c in value_vars] )) ) _tmp = df.select(*id_vars, explode(_vars_and_vals)) / .withColumnRenamed(''key'', var_name) / .withColumnRenamed(''value'', value_name) return _tmp