tutorial spark machine learning for example español ejemplo data curso big and python apache-spark apache-spark-sql pyspark spark-dataframe

machine - spark python example



La mejor forma de obtener el valor máximo en una columna de marco de datos Spark (5)

Estoy tratando de descubrir la mejor manera de obtener el mayor valor en una columna de marco de datos Spark.

Considere el siguiente ejemplo:

df = spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"]) df.show()

Lo que crea:

+---+---+ | A| B| +---+---+ |1.0|4.0| |2.0|5.0| |3.0|6.0| +---+---+

Mi objetivo es encontrar el valor más grande en la columna A (por inspección, esto es 3.0). Usando PySpark, aquí hay cuatro enfoques en los que puedo pensar:

# Method 1: Use describe() float(df.describe("A").filter("summary = ''max''").select("A").collect()[0].asDict()[''A'']) # Method 2: Use SQL df.registerTempTable("df_table") spark.sql("SELECT MAX(A) as maxval FROM df_table").collect()[0].asDict()[''maxval''] # Method 3: Use groupby() df.groupby().max(''A'').collect()[0].asDict()[''max(A)''] # Method 4: Convert to RDD df.select("A").rdd.max()[0]

Cada uno de los anteriores proporciona la respuesta correcta, pero a falta de una herramienta de perfil Spark, no puedo decir cuál es la mejor.

¿Alguna idea de intuición o empirismo sobre cuál de los métodos anteriores es más eficiente en términos de tiempo de ejecución de Spark o uso de recursos, o si existe un método más directo que los anteriores?


Creo que la mejor solución será usar head()
Considerando tu ejemplo:
+ --- + --- +
| A | B |
+ --- + --- +
| 1.0 | 4.0 |
| 2.0 | 5.0 |
| 3.0 | 6.0 |
+ --- + --- +
Usando el método agg y max de python podemos obtener el valor de la siguiente manera:

from pyspark.sql.functions import max df.agg(max(df.A)).head()[0]

Esto devolverá: 3.0

Asegúrate de tener la importación correcta:
from pyspark.sql.functions import max La función máxima que usamos aquí es la función pySPark sql library, no la función max predeterminada de python.


El valor máximo para una columna particular de un marco de datos se puede lograr mediante el uso de:

your_max_value = df.agg({"your-column": "max"}).collect()[0][0]


En caso de que algunos se pregunten cómo hacerlo usando Scala (usando Spark 2.0. +), Aquí van:

scala> df.createOrReplaceTempView("TEMP_DF") scala> val myMax = spark.sql("SELECT MAX(x) as maxval FROM TEMP_DF"). collect()(0).getInt(0) scala> print(myMax) 117


Observación: Spark está diseñado para funcionar en Big Data: informática distribuida. El tamaño del DataFrame de ejemplo es muy pequeño, por lo que el orden de los ejemplos de la vida real puede modificarse con respecto al pequeño ejemplo.

Más lento: Método_1, porque .describe ("A") calcula min, max, mean, stddev y count (5 cálculos en toda la columna)

Medio: Método_4, porque, .rdd (transformación de DF a RDD) ralentiza el proceso.

Más rápido: Método_3 ~ Método_2 ~ método_5, porque la lógica es muy similar, por lo que el optimizador catalítico de Spark sigue una lógica muy similar con un número mínimo de operaciones (obtener el máximo de una columna en particular, recolectar un marco de datos de un solo valor); (.asDict () agrega un poco de tiempo extra comparando 3,2 a 5)

import pandas as pd import time time_dict = {} dfff = self.spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"]) #-- For bigger/realistic dataframe just uncomment the following 3 lines #lst = list(np.random.normal(0.0, 100.0, 100000)) #pdf = pd.DataFrame({''A'': lst, ''B'': lst, ''C'': lst, ''D'': lst}) #dfff = self.sqlContext.createDataFrame(pdf) tic1 = int(round(time.time() * 1000)) # Method 1: Use describe() max_val = float(dfff.describe("A").filter("summary = ''max''").select("A").collect()[0].asDict()[''A'']) tac1 = int(round(time.time() * 1000)) time_dict[''m1'']= tac1 - tic1 print (max_val) tic2 = int(round(time.time() * 1000)) # Method 2: Use SQL dfff.registerTempTable("df_table") max_val = self.sqlContext.sql("SELECT MAX(A) as maxval FROM df_table").collect()[0].asDict()[''maxval''] tac2 = int(round(time.time() * 1000)) time_dict[''m2'']= tac2 - tic2 print (max_val) tic3 = int(round(time.time() * 1000)) # Method 3: Use groupby() max_val = dfff.groupby().max(''A'').collect()[0].asDict()[''max(A)''] tac3 = int(round(time.time() * 1000)) time_dict[''m3'']= tac3 - tic3 print (max_val) tic4 = int(round(time.time() * 1000)) # Method 4: Convert to RDD max_val = dfff.select("A").rdd.max()[0] tac4 = int(round(time.time() * 1000)) time_dict[''m4'']= tac4 - tic4 print (max_val) tic5 = int(round(time.time() * 1000)) # Method 4: Convert to RDD max_val = dfff.agg({"A": "max"}).collect()[0][0] tac5 = int(round(time.time() * 1000)) time_dict[''m5'']= tac5 - tic5 print (max_val) print time_dict

Resultado en un nodo de borde de un clúster en milisegundos (ms):

DF pequeño (ms): {''m1'': 7096, ''m2'': 205, ''m3'': 165, ''m4'': 211, ''m5'': 180}

DF mayor (ms): {''m1'': 10260, ''m2'': 452, ''m3'': 465, ''m4'': 916, ''m5'': 373}


>df1.show() +-----+--------------------+--------+----------+-----------+ |floor| timestamp| uid| x| y| +-----+--------------------+--------+----------+-----------+ | 1|2014-07-19T16:00:...|600dfbe2| 103.79211|71.50419418| | 1|2014-07-19T16:00:...|5e7b40e1| 110.33613|100.6828393| | 1|2014-07-19T16:00:...|285d22e4|110.066315|86.48873585| | 1|2014-07-19T16:00:...|74d917a1| 103.78499|71.45633073| >row1 = df1.agg({"x": "max"}).collect()[0] >print row1 Row(max(x)=110.33613) >print row1["max(x)"] 110.33613

La respuesta es casi la misma que método3. pero parece que se puede eliminar "asDict ()" en method3