machine - spark python example
La mejor forma de obtener el valor máximo en una columna de marco de datos Spark (5)
Estoy tratando de descubrir la mejor manera de obtener el mayor valor en una columna de marco de datos Spark.
Considere el siguiente ejemplo:
df = spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
df.show()
Lo que crea:
+---+---+
| A| B|
+---+---+
|1.0|4.0|
|2.0|5.0|
|3.0|6.0|
+---+---+
Mi objetivo es encontrar el valor más grande en la columna A (por inspección, esto es 3.0). Usando PySpark, aquí hay cuatro enfoques en los que puedo pensar:
# Method 1: Use describe()
float(df.describe("A").filter("summary = ''max''").select("A").collect()[0].asDict()[''A''])
# Method 2: Use SQL
df.registerTempTable("df_table")
spark.sql("SELECT MAX(A) as maxval FROM df_table").collect()[0].asDict()[''maxval'']
# Method 3: Use groupby()
df.groupby().max(''A'').collect()[0].asDict()[''max(A)'']
# Method 4: Convert to RDD
df.select("A").rdd.max()[0]
Cada uno de los anteriores proporciona la respuesta correcta, pero a falta de una herramienta de perfil Spark, no puedo decir cuál es la mejor.
¿Alguna idea de intuición o empirismo sobre cuál de los métodos anteriores es más eficiente en términos de tiempo de ejecución de Spark o uso de recursos, o si existe un método más directo que los anteriores?
Creo que la mejor solución será usar head()
Considerando tu ejemplo:
+ --- + --- +
| A | B |
+ --- + --- +
| 1.0 | 4.0 |
| 2.0 | 5.0 |
| 3.0 | 6.0 |
+ --- + --- +
Usando el método agg y max de python podemos obtener el valor de la siguiente manera:
from pyspark.sql.functions import max df.agg(max(df.A)).head()[0]
Esto devolverá: 3.0
Asegúrate de tener la importación correcta:
from pyspark.sql.functions import max
La función máxima que usamos aquí es la función pySPark sql library, no la función max predeterminada de python.
El valor máximo para una columna particular de un marco de datos se puede lograr mediante el uso de:
your_max_value = df.agg({"your-column": "max"}).collect()[0][0]
En caso de que algunos se pregunten cómo hacerlo usando Scala (usando Spark 2.0. +), Aquí van:
scala> df.createOrReplaceTempView("TEMP_DF")
scala> val myMax = spark.sql("SELECT MAX(x) as maxval FROM TEMP_DF").
collect()(0).getInt(0)
scala> print(myMax)
117
Observación: Spark está diseñado para funcionar en Big Data: informática distribuida. El tamaño del DataFrame de ejemplo es muy pequeño, por lo que el orden de los ejemplos de la vida real puede modificarse con respecto al pequeño ejemplo.
Más lento: Método_1, porque .describe ("A") calcula min, max, mean, stddev y count (5 cálculos en toda la columna)
Medio: Método_4, porque, .rdd (transformación de DF a RDD) ralentiza el proceso.
Más rápido: Método_3 ~ Método_2 ~ método_5, porque la lógica es muy similar, por lo que el optimizador catalítico de Spark sigue una lógica muy similar con un número mínimo de operaciones (obtener el máximo de una columna en particular, recolectar un marco de datos de un solo valor); (.asDict () agrega un poco de tiempo extra comparando 3,2 a 5)
import pandas as pd
import time
time_dict = {}
dfff = self.spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
#-- For bigger/realistic dataframe just uncomment the following 3 lines
#lst = list(np.random.normal(0.0, 100.0, 100000))
#pdf = pd.DataFrame({''A'': lst, ''B'': lst, ''C'': lst, ''D'': lst})
#dfff = self.sqlContext.createDataFrame(pdf)
tic1 = int(round(time.time() * 1000))
# Method 1: Use describe()
max_val = float(dfff.describe("A").filter("summary = ''max''").select("A").collect()[0].asDict()[''A''])
tac1 = int(round(time.time() * 1000))
time_dict[''m1'']= tac1 - tic1
print (max_val)
tic2 = int(round(time.time() * 1000))
# Method 2: Use SQL
dfff.registerTempTable("df_table")
max_val = self.sqlContext.sql("SELECT MAX(A) as maxval FROM df_table").collect()[0].asDict()[''maxval'']
tac2 = int(round(time.time() * 1000))
time_dict[''m2'']= tac2 - tic2
print (max_val)
tic3 = int(round(time.time() * 1000))
# Method 3: Use groupby()
max_val = dfff.groupby().max(''A'').collect()[0].asDict()[''max(A)'']
tac3 = int(round(time.time() * 1000))
time_dict[''m3'']= tac3 - tic3
print (max_val)
tic4 = int(round(time.time() * 1000))
# Method 4: Convert to RDD
max_val = dfff.select("A").rdd.max()[0]
tac4 = int(round(time.time() * 1000))
time_dict[''m4'']= tac4 - tic4
print (max_val)
tic5 = int(round(time.time() * 1000))
# Method 4: Convert to RDD
max_val = dfff.agg({"A": "max"}).collect()[0][0]
tac5 = int(round(time.time() * 1000))
time_dict[''m5'']= tac5 - tic5
print (max_val)
print time_dict
Resultado en un nodo de borde de un clúster en milisegundos (ms):
DF pequeño (ms): {''m1'': 7096, ''m2'': 205, ''m3'': 165, ''m4'': 211, ''m5'': 180}
DF mayor (ms): {''m1'': 10260, ''m2'': 452, ''m3'': 465, ''m4'': 916, ''m5'': 373}
>df1.show()
+-----+--------------------+--------+----------+-----------+
|floor| timestamp| uid| x| y|
+-----+--------------------+--------+----------+-----------+
| 1|2014-07-19T16:00:...|600dfbe2| 103.79211|71.50419418|
| 1|2014-07-19T16:00:...|5e7b40e1| 110.33613|100.6828393|
| 1|2014-07-19T16:00:...|285d22e4|110.066315|86.48873585|
| 1|2014-07-19T16:00:...|74d917a1| 103.78499|71.45633073|
>row1 = df1.agg({"x": "max"}).collect()[0]
>print row1
Row(max(x)=110.33613)
>print row1["max(x)"]
110.33613
La respuesta es casi la misma que método3. pero parece que se puede eliminar "asDict ()" en method3